1 krisbash 1.1 /*
2 **==============================================================================
3 **
4 ** Open Management Infrastructure (OMI)
5 **
6 ** Copyright (c) Microsoft Corporation
7 **
8 ** Licensed under the Apache License, Version 2.0 (the "License"); you may not
9 ** use this file except in compliance with the License. You may obtain a copy
10 ** of the License at
11 **
12 ** http://www.apache.org/licenses/LICENSE-2.0
13 **
14 ** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 ** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED
16 ** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
17 ** MERCHANTABLITY OR NON-INFRINGEMENT.
18 **
19 ** See the Apache 2 License for the specific language governing permissions
20 ** and limitations under the License.
21 **
22 krisbash 1.1 **==============================================================================
23 */
24
25 #include "multiplex.h"
26 #include <omi_error/errorutil.h>
27
28 #define MUX_HASHTABLESIZE 100
29
30 /*
31 **==============================================================================
32 **
33 ** Data structures
34 **
35 **==============================================================================
36 */
37
38 typedef struct _ConnectionIn
39 {
40 StrandMany strand;
41
42 MuxIn* mux;
43 krisbash 1.1 // nothing else for now
44 }
45 ConnectionIn;
46
47 typedef struct _OperationOut
48 {
49 // managing outgoint interaction to dispatcher,providerMgr,etc.
50 StrandEntry strand;
51
52 MI_Uint64 key; // for now OperationOut address
53 }
54 OperationOut;
55
56 /*
57 **==============================================================================
58 */
59
60 STRAND_DEBUGNAME( ConnectionIn );
61 STRAND_DEBUGNAME( OperationOut );
62
63 /*
64 krisbash 1.1 **==============================================================================
65 */
66
67 void _OperationOut_Ack( _In_ Strand* self_)
68 {
69 //OperationOut* self = (OperationOut*)StrandEntry_FromStrand(self_);
70 DEBUG_ASSERT( NULL != self_ );
71 trace_OperationOut_Ack( &self_->info.interaction, self_->info.interaction.other );
72
73 // for now do nothing
74 //TODO eventually multiplexer should take care of flow control here
75 }
76
77 void _OperationOut_Cancel( _In_ Strand* self)
78 {
79 trace_OperationOut_Cancel( self );
80 // for now do nothing
81 }
82
83 void _OperationOut_Close( _In_ Strand* self_)
84 {
85 krisbash 1.1 //OperationOut* self = (OperationOut*)StrandEntry_FromStrand(self_);
86 DEBUG_ASSERT( NULL != self_ );
87 trace_OperationOut_Close( &self_->info.interaction, self_->info.interaction.other );
88
89 // Just close the other side
90 if( !self_->info.thisClosedOther )
91 Strand_Close( self_ );
92 }
93
94 /*
95 Object that implements a single operation coming out of a binary protocol
96 connection. Uses that one-to-many interface to multiplex multiple operations
97 in a single connection.
98
99 Behavior:
100 - Uses the default many-to-one post implementation that just enqueue messages
101 on the ConnectionIn
102 - Ack does nothing currently as there is not an explicit in-the-wire flow control
103 protocol implemented yet.
104 - Cancel is not currently implemented
105 - Close check if the connection has already been closed and if not
106 krisbash 1.1 just closes the other side
107 - Shutdown:
108 The objects are deleted thru the normal Strand logic. That is,
109 once the interaction is closed on both sides the object is auto-deleted.
110 */
111 static StrandFT _OperationOut_FT = {
112 NULL,
113 NULL,
114 _OperationOut_Ack,
115 _OperationOut_Cancel,
116 _OperationOut_Close,
117 NULL,
118 NULL,
119 NULL,
120 NULL,
121 NULL,
122 NULL,
123 NULL };
124
125 /*
126 **==============================================================================
127 krisbash 1.1 */
128
129 void _ConnectionIn_Post( _In_ Strand* self_, _In_ Message* msg )
130 {
131 ConnectionIn* self = (ConnectionIn*)StrandMany_FromStrand(self_);
132 DEBUG_ASSERT( NULL != self_ );
133 trace_ConnectionIn_Post(
134 self,
135 msg,
136 msg->tag,
137 MessageName(msg->tag),
138 msg->operationId,
139 &self->strand.strand.info.interaction,
140 self->strand.strand.info.interaction.other );
141
142 // For now ack immediately
143 //TODO eventually refined multiplexer should take care of flow control here
144 // Need to schedule (so no other Post occurs during Post)
145 StrandMany_ScheduleAck( &self->strand );
146
147 if( CancelMsgTag == msg->tag )
148 krisbash 1.1 {
149 StrandEntry* entry;
150
151 trace_ConnectionIn_PostCancel(
152 self_,
153 msg,
154 msg->tag,
155 MessageName(msg->tag),
156 msg->operationId,
157 &self->strand.strand.info.interaction,
158 self->strand.strand.info.interaction.other );
159
160 entry = self->strand.findEntryProc( &self->strand, msg );
161 if( NULL != entry )
162 {
163 StrandMany_CancelEntry( entry );
164 }
165 else
166 {
167 trace_ConnectionIn_PostCancelError(
168 self_,
169 krisbash 1.1 msg,
170 msg->tag,
171 MessageName(msg->tag),
172 msg->operationId,
173 &self->strand.strand.info.interaction,
174 self->strand.strand.info.interaction.other );
175 }
176 }
177 else
178 {
179 if( !StrandMany_PostFindEntry( &self->strand, msg ) )
180 {
181 OperationOut* newOperation;
182 PostResultMsg* resp;
183 MI_Result result = MI_RESULT_OK;
184
185 trace_ConnectionInPost_NewOp( Uint64ToPtr(msg->operationId), msg, MessageName(msg->tag) );
186
187 if( msg->tag == UnsubscribeReqTag )
188 {
189 // TODO we can remove this once client uses cancelation instead of UnsubscribeReq
190 krisbash 1.1 trace_ConnectionInPost_IgnoreUnsubscribeReq( msg, msg->operationId );
191 return;
192 }
193
194 newOperation = (OperationOut*)StrandEntry_New(
195 STRAND_DEBUG( OperationOut )
196 &self->strand,
197 &_OperationOut_FT,
198 sizeof(OperationOut),
199 STRAND_FLAG_ENTERSTRAND,
200 NULL);
201
202 if( NULL == newOperation )
203 {
204 trace_ConnectionInPost_NewOpFailed( Uint64ToPtr(msg->operationId) );
205 DEBUG_ASSERT( MI_FALSE );
206 result = MI_RESULT_FAILED;
207 }
208 else
209 {
210 // we have to set up the key before actually adding the entry
211 krisbash 1.1 newOperation->key = msg->operationId;
212
213 result = StrandMany_AddEntry( &newOperation->strand );
214 // We are about to call into open into the components on the right that may
215 // steal the thread synchronously and start posting back,
216 // therefore we need to leave the ConnectionIn strand that may be posted upon
217 // (see additional comment on OperationOut below)
218 Strand_Leave( &self->strand.strand );
219
220 if( MI_RESULT_OK != result )
221 {
222 trace_ConnectionInPost_CannotAddNewOp( Uint64ToPtr(msg->operationId) );
223 DEBUG_ASSERT( MI_FALSE );
224 }
225 else
226 {
227 // open interaction to the right
228 // Leave also OperationOut strand on open, otherwise any Post in the same thread will be delayed
229 // and the stack will eventually deadlock on in-proc providers that send
230 // several posts in the same open thread
231 Strand_Open(
232 krisbash 1.1 &newOperation->strand.strand,
233 self->mux->callback,
234 self->mux->callbackData,
235 msg,
236 MI_TRUE);
237 }
238 }
239
240 if( MI_RESULT_OK != result )
241 {
242 resp = (*self->mux->makeResultMessageCallback)( msg, NULL, NULL, MI_RESULT_TYPE_MI, result);
243
244 if (resp)
245 {
246 // We can be outside the strand here so use this SchedulePost instead of Post
247 StrandMany_SchedulePost( &self->strand, &resp->base );
248 }
249 else
250 {
251 trace_ConnectionInPost_MessageAllocFailed( result );
252 // TODO: return a pre-allocated msg once we have that change
253 krisbash 1.1 }
254 }
255 }
256 }
257 }
258
259 void _ConnectionIn_PostControl( _In_ Strand* self_, _In_ Message* msg)
260 {
261 DEBUG_ASSERT( MI_FALSE ); // not used yet
262 }
263
264 void _ConnectionIn_Ack( _In_ Strand* self_)
265 {
266 trace_OperationIn_Ack( &self_->info.interaction, self_->info.interaction.other );
267 // Nothing to do here
268 //TODO eventually multiplexer should take care of flow control here
269 }
270
271 void _ConnectionIn_Cancel( _In_ Strand* self_)
272 {
273 DEBUG_ASSERT( MI_FALSE ); // not used yet
274 krisbash 1.1 }
275
276 void _ConnectionIn_Close( _In_ Strand* self_)
277 {
278 ConnectionIn* self = (ConnectionIn*)StrandMany_FromStrand(self_);
279 DEBUG_ASSERT( NULL != self_ );
280 trace_ConnectionInClose( &self->strand.strand.info.interaction, self->strand.strand.info.interaction.other );
281
282 StrandMany_CloseAllEntries( &self->strand );
283 // Close back the protocol if there are no entries/operations
284 if (self->strand.numEntries == 0)
285 Strand_Close( &self->strand.strand );
286
287 if( self->mux->onCloseCallback )
288 {
289 (*self->mux->onCloseCallback)(self);
290 }
291 }
292
293 void _ConnectionIn_EntryDeleted( _In_ StrandMany* self )
294 {
295 krisbash 1.1 //if we are already closed and all operations on mux are closed
296 if ((self->strand.info.otherClosedThis) && (self->numEntries == 0))
297 Strand_Close( &self->strand );
298 }
299
300 /*
301 Object that implements a single connection coming in from binary protocol
302 Uses that one-to-many interface to fan-out multiple operations
303 from the same connection.
304
305 Behavior:
306 - Post checks if the message is a cancel, and if it is then finds the
307 corresponing operation by searching the built-in hash map by operationId
308 then it sends a Interaction Interface cancel directly to the operation.
309 If the message is NOT a cancel then it search the operation in the
310 built-in hash map by operationId and then 2 things can happen:
311 * There is a existing operation in which case it just delivers the
312 secondary message to the existing interaction
313 * This is a new operation and therefore creates the OperationOut object,
314 opens the interaction to the right with it and calls the callback for new
315 request defined in the MuxIn.
316 krisbash 1.1 - Ack does nothing currently as there is not an expecific in-the-wire flow control
317 protocol implemented yet.
318 - PostControl and Cancel are not currently implemented
319 - Close initiates the closing of all entries and if there are no entries remained
320 to be closes then closes the interaction back itself.
321 It also calls the OnCloseCallback method defined in MuxIn
322 - Shutdown:
323 The objects are deleted thru the normal Strand logic. That is,
324 once the interaction is closed on both sides and there are no
325 entries the object is auto-deleted.
326
327 Unique features and special Behavour:
328 - _ConnectionIn_EntryDeleted is executed once an entry is deleted,
329 that is to address the case where when the connection was closed not
330 all entries were deleted yet and therefore it needs to be finally closed
331 once there are no more entries.
332 */
333 static StrandFT _ConnectionIn_FT = {
334 _ConnectionIn_Post,
335 _ConnectionIn_PostControl,
336 _ConnectionIn_Ack,
337 krisbash 1.1 _ConnectionIn_Cancel,
338 _ConnectionIn_Close,
339 NULL,
340 NULL,
341 NULL,
342 NULL,
343 NULL,
344 NULL,
345 NULL };
346
347 /*
348 **==============================================================================
349 */
350
351 static StrandManyInternalFT _MuxIn_InternalFT = {
352 NULL,
353 _ConnectionIn_EntryDeleted,
354 NULL,
355 NULL,
356 NULL,
357 NULL,
358 krisbash 1.1 NULL,
359 NULL,
360 NULL,
361 NULL };
362
363
364 /*
365 **==============================================================================
366 **
367 ** Local functions
368 **
369 **==============================================================================
370 */
371 size_t _ConnectionIn_HashMapHashProc(const HashBucket* bucket)
372 {
373 const OperationOut* self = (const OperationOut*)StrandEntry_FromBucketConst(bucket);
374 return (size_t)self->key;
375 }
376
377 int _ConnectionIn_HashMapEqualProc(_In_ const HashBucket* bucket1, _In_ const HashBucket* bucket2)
378 {
379 krisbash 1.1 const OperationOut* entry1 = (const OperationOut*)StrandEntry_FromBucketConst(bucket1);
380 const OperationOut* entry2 = (const OperationOut*)StrandEntry_FromBucketConst(bucket2);
381 return entry1->key == entry2->key;
382 }
383
384 StrandEntry* _ConnectionIn_FindOperation(_In_ const StrandMany* parent_, _In_ const Message* msg)
385 {
386 ConnectionIn* parent = (ConnectionIn*)parent_;
387 OperationOut forSearch;
388 HashBucket* bucket;
389
390 forSearch.key = msg->operationId;
391
392 bucket = HashMap_Find(&parent->strand.many,&forSearch.strand.bucket);
393
394 if( NULL == bucket )
395 {
396 trace_ConnectionInFindRequest_CannotFind( parent, &parent->strand.strand, forSearch.key );
397 return NULL;
398 }
399 else
400 krisbash 1.1 {
401 OperationOut* self = (OperationOut*)StrandEntry_FromBucket(bucket);
402 trace_ConnectionInFindRequest_Found( parent, &parent->strand.strand, forSearch.key, self, &self->strand.strand );
403 return (StrandEntry*)self;
404 }
405 }
406
407 /*
408 **==============================================================================
409 **
410 ** Public API
411 **
412 **==============================================================================
413 */
414 void MuxIn_Open( _Inout_ InteractionOpenParams* params )
415 {
416 MuxIn* self = (MuxIn*)params->callbackData; // callbackData is the MuxIn object
417 ConnectionIn* connectionIn;
418
419 DEBUG_ASSERT( NULL != params->interaction );
420 DEBUG_ASSERT( NULL == params->msg );
421 krisbash 1.1 DEBUG_ASSERT( NULL != params->callbackData );
422
423 connectionIn = (ConnectionIn*)StrandMany_New(
424 STRAND_DEBUG( ConnectionIn )
425 &_ConnectionIn_FT,
426 &_MuxIn_InternalFT,
427 sizeof(ConnectionIn),
428 STRAND_FLAG_ENTERSTRAND,
429 params,
430 MUX_HASHTABLESIZE,
431 _ConnectionIn_HashMapHashProc,
432 _ConnectionIn_HashMapEqualProc,
433 _ConnectionIn_FindOperation );
434
435 if( NULL == connectionIn )
436 {
437 trace_MuxInOpen_AllocFailed();
438 Strand_FailOpen( params );
439 }
440 else
441 {
442 krisbash 1.1 connectionIn->mux = self;
443
444 Strand_Leave( &connectionIn->strand.strand );
445 // note that connectionIn is automatically deleted by strand management
446 }
447 }
448
|