(file) Return to multiplex.c CVS log (file) (dir) Up to [OMI] / omi / base

  1 krisbash 1.1 /*
  2              **==============================================================================
  3              **
  4              ** Open Management Infrastructure (OMI)
  5              **
  6              ** Copyright (c) Microsoft Corporation
  7              ** 
  8              ** Licensed under the Apache License, Version 2.0 (the "License"); you may not 
  9              ** use this file except in compliance with the License. You may obtain a copy 
 10              ** of the License at 
 11              **
 12              **     http://www.apache.org/licenses/LICENSE-2.0 
 13              **
 14              ** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 15              ** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED 
 16              ** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, 
 17              ** MERCHANTABLITY OR NON-INFRINGEMENT. 
 18              **
 19              ** See the Apache 2 License for the specific language governing permissions 
 20              ** and limitations under the License.
 21              **
 22 krisbash 1.1 **==============================================================================
 23              */
 24              
 25              #include "multiplex.h"
 26              #include <omi_error/errorutil.h>
 27              
 28              #define MUX_HASHTABLESIZE   100
 29              
 30              /*
 31              **==============================================================================
 32              **
 33              ** Data structures
 34              **
 35              **==============================================================================
 36              */
 37              
 38              typedef struct _ConnectionIn
 39              {
 40                  StrandMany      strand;
 41              
 42                  MuxIn*          mux;
 43 krisbash 1.1     // nothing else for now
 44              }
 45              ConnectionIn;
 46              
 47              typedef struct _OperationOut
 48              {
 49                  // managing outgoint interaction to dispatcher,providerMgr,etc.
 50                  StrandEntry     strand;  
 51              
 52                  MI_Uint64       key;  // for now OperationOut address
 53              } 
 54              OperationOut;
 55              
 56              /*
 57              **==============================================================================
 58              */
 59              
 60              STRAND_DEBUGNAME( ConnectionIn );
 61              STRAND_DEBUGNAME( OperationOut );
 62              
 63              /*
 64 krisbash 1.1 **==============================================================================
 65              */
 66              
 67              void _OperationOut_Ack( _In_ Strand* self_)
 68              {
 69                  //OperationOut* self = (OperationOut*)StrandEntry_FromStrand(self_);
 70                  DEBUG_ASSERT( NULL != self_ );
 71                  trace_OperationOut_Ack( &self_->info.interaction, self_->info.interaction.other );
 72              
 73                  // for now do nothing
 74                  //TODO eventually multiplexer should take care of flow control here
 75              }
 76              
 77              void _OperationOut_Cancel( _In_ Strand* self)
 78              {
 79                  trace_OperationOut_Cancel( self );
 80                  // for now do nothing
 81              }
 82              
 83              void _OperationOut_Close( _In_ Strand* self_)
 84              {
 85 krisbash 1.1     //OperationOut* self = (OperationOut*)StrandEntry_FromStrand(self_);
 86                  DEBUG_ASSERT( NULL != self_ );
 87                  trace_OperationOut_Close( &self_->info.interaction, self_->info.interaction.other );
 88              
 89                  // Just close the other side
 90                  if( !self_->info.thisClosedOther )
 91                      Strand_Close( self_ );
 92              }
 93              
 94              /*
 95                  Object that implements a single operation coming out of a binary protocol 
 96                  connection. Uses that one-to-many interface to multiplex multiple operations
 97                  in a single connection.
 98              
 99                  Behavior:
100                  - Uses the default many-to-one post implementation that just enqueue messages
101                     on the ConnectionIn
102                  - Ack does nothing currently as there is not an explicit in-the-wire flow control 
103                     protocol implemented yet.
104                  - Cancel is not currently implemented
105                  - Close check if the connection has already been closed and if not
106 krisbash 1.1        just closes the other side
107                  - Shutdown: 
108                     The objects are deleted thru the normal Strand logic. That is,
109                     once the interaction is closed on both sides the object is auto-deleted.
110              */
111              static StrandFT _OperationOut_FT = { 
112                  NULL, 
113                  NULL, 
114                  _OperationOut_Ack, 
115                  _OperationOut_Cancel, 
116                  _OperationOut_Close,
117                  NULL,
118                  NULL,
119                  NULL,
120                  NULL,
121                  NULL,
122                  NULL,
123                  NULL };
124              
125              /*
126              **==============================================================================
127 krisbash 1.1 */
128              
129              void _ConnectionIn_Post( _In_ Strand* self_, _In_ Message* msg )
130              {
131                  ConnectionIn* self = (ConnectionIn*)StrandMany_FromStrand(self_);
132                  DEBUG_ASSERT( NULL != self_ );
133                  trace_ConnectionIn_Post(
134                      self,
135                      msg,
136                      msg->tag,
137                      MessageName(msg->tag),
138                      msg->operationId,
139                      &self->strand.strand.info.interaction, 
140                      self->strand.strand.info.interaction.other );
141              
142                  // For now ack immediately
143                  //TODO eventually refined multiplexer should take care of flow control here
144                  // Need to schedule (so no other Post occurs during Post)
145                  StrandMany_ScheduleAck( &self->strand );
146                  
147                  if( CancelMsgTag == msg->tag )
148 krisbash 1.1     {
149                      StrandEntry* entry;
150                      
151                      trace_ConnectionIn_PostCancel(
152                          self_,
153                          msg,
154                          msg->tag,
155                          MessageName(msg->tag),
156                          msg->operationId,
157                          &self->strand.strand.info.interaction, 
158                          self->strand.strand.info.interaction.other );
159                      
160                      entry = self->strand.findEntryProc( &self->strand, msg );
161                      if( NULL != entry )
162                      {
163                          StrandMany_CancelEntry( entry );
164                      }
165                      else
166                      {
167                          trace_ConnectionIn_PostCancelError(
168                              self_,
169 krisbash 1.1                 msg,
170                              msg->tag,
171                              MessageName(msg->tag),
172                              msg->operationId,
173                              &self->strand.strand.info.interaction, 
174                              self->strand.strand.info.interaction.other );
175                      }
176                  }
177                  else
178                  {
179                      if( !StrandMany_PostFindEntry( &self->strand, msg ) )
180                      {
181                          OperationOut* newOperation;
182                          PostResultMsg* resp;
183                          MI_Result result = MI_RESULT_OK;
184              
185                          trace_ConnectionInPost_NewOp( Uint64ToPtr(msg->operationId), msg, MessageName(msg->tag) );
186              
187                          if( msg->tag == UnsubscribeReqTag )
188                          {
189                              // TODO we can remove this once client uses cancelation instead of UnsubscribeReq
190 krisbash 1.1                 trace_ConnectionInPost_IgnoreUnsubscribeReq( msg, msg->operationId );
191                              return;
192                          }
193              
194                          newOperation = (OperationOut*)StrandEntry_New( 
195                              STRAND_DEBUG( OperationOut ) 
196                              &self->strand, 
197                              &_OperationOut_FT, 
198                              sizeof(OperationOut),
199                              STRAND_FLAG_ENTERSTRAND,
200                              NULL);
201              
202                          if( NULL == newOperation )
203                          {
204                              trace_ConnectionInPost_NewOpFailed( Uint64ToPtr(msg->operationId) );
205                              DEBUG_ASSERT( MI_FALSE );
206                              result = MI_RESULT_FAILED;
207                          }
208                          else
209                          {
210                              // we have to set up the key before actually adding the entry
211 krisbash 1.1                 newOperation->key = msg->operationId;
212                              
213                              result = StrandMany_AddEntry( &newOperation->strand );
214                              // We are about to call into open into the components on the right that may
215                              // steal the thread synchronously and start posting back, 
216                              // therefore we need to leave the ConnectionIn strand that may be posted upon
217                              // (see additional comment on OperationOut below)
218                              Strand_Leave( &self->strand.strand );   
219              
220                              if( MI_RESULT_OK != result )
221                              {
222                                  trace_ConnectionInPost_CannotAddNewOp( Uint64ToPtr(msg->operationId) );
223                                  DEBUG_ASSERT( MI_FALSE );
224                              }
225                              else
226                              {
227                                  // open interaction to the right
228                                  // Leave also OperationOut strand on open, otherwise any Post in the same thread will be delayed
229                                  // and the stack will eventually deadlock on in-proc providers that send
230                                  // several posts in the same open thread
231                                  Strand_Open(
232 krisbash 1.1                         &newOperation->strand.strand, 
233                                      self->mux->callback,
234                                      self->mux->callbackData,
235                                      msg,
236                                      MI_TRUE);
237                              }
238                          }
239              
240                          if( MI_RESULT_OK != result )
241                          {
242                              resp = (*self->mux->makeResultMessageCallback)( msg, NULL, NULL, MI_RESULT_TYPE_MI, result);
243                              
244                              if (resp)
245                              {
246                                  // We can be outside the strand here so use this SchedulePost instead of Post
247                                  StrandMany_SchedulePost( &self->strand, &resp->base );
248                              }
249                              else
250                              {
251                                  trace_ConnectionInPost_MessageAllocFailed( result );
252                                  // TODO: return a pre-allocated msg once we have that change
253 krisbash 1.1                 }
254                          }
255                      }
256                  }
257              }
258              
259              void _ConnectionIn_PostControl( _In_ Strand* self_, _In_ Message* msg)
260              {
261                  DEBUG_ASSERT( MI_FALSE );  // not used yet
262              }
263              
264              void _ConnectionIn_Ack( _In_ Strand* self_)
265              {
266                  trace_OperationIn_Ack( &self_->info.interaction, self_->info.interaction.other );
267                  // Nothing to do here
268                  //TODO eventually multiplexer should take care of flow control here
269              }
270              
271              void _ConnectionIn_Cancel( _In_ Strand* self_)
272              {
273                  DEBUG_ASSERT( MI_FALSE );  // not used yet
274 krisbash 1.1 }
275              
276              void _ConnectionIn_Close( _In_ Strand* self_)
277              {
278                  ConnectionIn* self = (ConnectionIn*)StrandMany_FromStrand(self_);
279                  DEBUG_ASSERT( NULL != self_ );
280                  trace_ConnectionInClose( &self->strand.strand.info.interaction, self->strand.strand.info.interaction.other );
281              
282                  StrandMany_CloseAllEntries( &self->strand );
283                  // Close back the protocol if there are no entries/operations
284                  if (self->strand.numEntries == 0)
285                      Strand_Close( &self->strand.strand );
286              
287                  if( self->mux->onCloseCallback )
288                  {
289                      (*self->mux->onCloseCallback)(self);
290                  }
291              }
292              
293              void _ConnectionIn_EntryDeleted( _In_ StrandMany* self )
294              {
295 krisbash 1.1     //if we are already closed and all operations on mux are closed
296                  if ((self->strand.info.otherClosedThis) && (self->numEntries == 0))
297                      Strand_Close( &self->strand );
298              }
299              
300              /*
301                  Object that implements a single connection coming in from binary protocol 
302                  Uses that one-to-many interface to fan-out multiple operations
303                  from the same connection.
304              
305                  Behavior:
306                  - Post checks if the message is a cancel, and if it is then finds the
307                     corresponing operation by searching the built-in hash map by operationId
308                     then it sends a Interaction Interface cancel directly to the operation.
309                     If the message is NOT a cancel then it search the operation in the 
310                     built-in hash map by operationId and then 2 things can happen:
311                     * There is a existing operation in which case it just delivers the 
312                        secondary message to the existing interaction 
313                     * This is a new operation and therefore creates the OperationOut object,
314                        opens the interaction to the right with it and calls the callback for new
315                        request defined in the MuxIn.
316 krisbash 1.1     - Ack does nothing currently as there is not an expecific in-the-wire flow control 
317                     protocol implemented yet.
318                  - PostControl and Cancel are not currently implemented
319                  - Close initiates the closing of all entries and if there are no entries remained
320                     to be closes then closes the interaction back itself.
321                     It also calls the OnCloseCallback method defined in MuxIn
322                  - Shutdown: 
323                     The objects are deleted thru the normal Strand logic. That is,
324                     once the interaction is closed on both sides and there are no
325                     entries the object is auto-deleted.
326              
327                  Unique features and special Behavour:
328                  - _ConnectionIn_EntryDeleted is executed once an entry is deleted,
329                     that is to address the case where when the connection was closed not 
330                     all entries were deleted yet and therefore it needs to be finally closed
331                     once there are no more entries.
332              */
333              static StrandFT _ConnectionIn_FT = { 
334                  _ConnectionIn_Post, 
335                  _ConnectionIn_PostControl, 
336                  _ConnectionIn_Ack, 
337 krisbash 1.1     _ConnectionIn_Cancel, 
338                  _ConnectionIn_Close,
339                  NULL,
340                  NULL,
341                  NULL,
342                  NULL,
343                  NULL,
344                  NULL,
345                  NULL };
346              
347              /*
348              **==============================================================================
349              */
350              
351              static StrandManyInternalFT _MuxIn_InternalFT = {
352                  NULL,
353                  _ConnectionIn_EntryDeleted,
354                  NULL,
355                  NULL,
356                  NULL,
357                  NULL,
358 krisbash 1.1     NULL,
359                  NULL,
360                  NULL,
361                  NULL };
362              
363              
364              /*
365              **==============================================================================
366              **
367              ** Local functions
368              **
369              **==============================================================================
370              */
371              size_t _ConnectionIn_HashMapHashProc(const HashBucket* bucket)
372              {
373                  const OperationOut* self = (const OperationOut*)StrandEntry_FromBucketConst(bucket);
374                  return (size_t)self->key;
375              }
376              
377              int _ConnectionIn_HashMapEqualProc(_In_ const HashBucket* bucket1, _In_ const HashBucket* bucket2)
378              {
379 krisbash 1.1     const OperationOut* entry1 = (const OperationOut*)StrandEntry_FromBucketConst(bucket1);
380                  const OperationOut* entry2 = (const OperationOut*)StrandEntry_FromBucketConst(bucket2);
381                  return entry1->key == entry2->key;
382              }
383              
384              StrandEntry* _ConnectionIn_FindOperation(_In_ const StrandMany* parent_, _In_ const Message* msg)
385              {
386                  ConnectionIn* parent = (ConnectionIn*)parent_;
387                  OperationOut forSearch;
388                  HashBucket* bucket;
389              
390                  forSearch.key = msg->operationId;
391                  
392                  bucket = HashMap_Find(&parent->strand.many,&forSearch.strand.bucket);
393              
394                  if( NULL == bucket )
395                  {
396                      trace_ConnectionInFindRequest_CannotFind( parent, &parent->strand.strand, forSearch.key );
397                      return NULL;
398                  }
399                  else
400 krisbash 1.1     {
401                      OperationOut* self = (OperationOut*)StrandEntry_FromBucket(bucket);
402                      trace_ConnectionInFindRequest_Found( parent, &parent->strand.strand, forSearch.key, self, &self->strand.strand );
403                      return (StrandEntry*)self;
404                  }
405              }   
406              
407              /*
408              **==============================================================================
409              **
410              ** Public API
411              **
412              **==============================================================================
413              */
414              void MuxIn_Open( _Inout_ InteractionOpenParams* params )
415              {
416                  MuxIn* self = (MuxIn*)params->callbackData; // callbackData is the MuxIn object
417                  ConnectionIn* connectionIn;
418                  
419                  DEBUG_ASSERT( NULL != params->interaction );
420                  DEBUG_ASSERT( NULL == params->msg );
421 krisbash 1.1     DEBUG_ASSERT( NULL != params->callbackData );
422              
423                  connectionIn = (ConnectionIn*)StrandMany_New(
424                                          STRAND_DEBUG( ConnectionIn )
425                                          &_ConnectionIn_FT,
426                                          &_MuxIn_InternalFT,
427                                          sizeof(ConnectionIn),
428                                          STRAND_FLAG_ENTERSTRAND,
429                                          params,
430                                          MUX_HASHTABLESIZE,
431                                          _ConnectionIn_HashMapHashProc,
432                                          _ConnectionIn_HashMapEqualProc,
433                                          _ConnectionIn_FindOperation );
434              
435                  if( NULL == connectionIn )
436                  {
437                      trace_MuxInOpen_AllocFailed();
438                      Strand_FailOpen( params );
439                  }
440                  else
441                  {
442 krisbash 1.1         connectionIn->mux = self;
443              
444                      Strand_Leave( &connectionIn->strand.strand );
445                      // note that connectionIn is automatically deleted by strand management
446                  }
447              }
448              

ViewCVS 0.9.2