(file) Return to nioproc.c CVS log (file) (dir) Up to [OMI] / omi / provmgr

  1 krisbash 1.1 /*
  2              **==============================================================================
  3              **
  4              ** Open Management Infrastructure (OMI)
  5              **
  6              ** Copyright (c) Microsoft Corporation
  7              ** 
  8              ** Licensed under the Apache License, Version 2.0 (the "License"); you may not 
  9              ** use this file except in compliance with the License. You may obtain a copy 
 10              ** of the License at 
 11              **
 12              **     http://www.apache.org/licenses/LICENSE-2.0 
 13              **
 14              ** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 15              ** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED 
 16              ** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, 
 17              ** MERCHANTABLITY OR NON-INFRINGEMENT. 
 18              **
 19              ** See the Apache 2 License for the specific language governing permissions 
 20              ** and limitations under the License.
 21              **
 22 krisbash 1.1 **==============================================================================
 23              */
 24              
 25              #include <pal/atomic.h>
 26              #include <base/batch.h>
 27              #include <omi_error/omierror.h>
 28              #include "nioproc.h"
 29              #include "SubMgr.h"
 30              #include "provmgr.h"
 31              
 32              #define THREAD_NOT_JOINED 0
 33              #define THREAD_JOINED 1
 34              
 35              #define THREAD_NOT_RUNNING 0
 36              #define THREAD_RUNNING 1
 37              
 38              RequestHandler g_requesthandler;
 39              
 40              static RequestItem* RequestList_RemoveItem(_Inout_ RequestList* list);
 41              static MI_Result RequestList_RemoveSpecificItem(_Inout_ RequestList* list, _In_ RequestItem* item);
 42              
 43 krisbash 1.1 //
 44              // A proc running from a spawned thread to unsuscribe the indication opertion(s)
 45              //
 46              PAL_Uint32 THREAD_API noniothread_proc(void* p);
 47              
 48              _Use_decl_annotations_
 49              void RequestList_Init(RequestList* list)
 50              {
 51                  list->head = NULL;
 52                  list->tail = NULL;
 53                  ReadWriteLock_Init( &list->lock );
 54              }
 55              
 56              _Use_decl_annotations_
 57              void RequestHandler_Init(RequestHandler* handler)
 58              {
 59                  RequestList_Init(&handler->list);
 60                  Lock_Init(&handler->lock);
 61                  handler->running = THREAD_NOT_RUNNING;
 62                  handler->joined = THREAD_NOT_JOINED;
 63              }
 64 krisbash 1.1 
 65              //
 66              // shutdown handler, i.e. wait for unsubscribe thread to exit
 67              //
 68              static void RequestHandler_Shutdown(RequestHandler* handler)
 69              {
 70                  // wait for unsubscribe thread to exit
 71                  Thread unsubThread;
 72                  ptrdiff_t joined = THREAD_NOT_JOINED;
 73                  PAL_Uint32 ret;
 74                  Lock_Acquire( &handler->lock );
 75                  if ( handler->running == THREAD_RUNNING )
 76                  {
 77                      joined = THREAD_JOINED;
 78                      handler->joined = joined;
 79                      unsubThread = handler->thread;
 80                  }
 81                  Lock_Release( &handler->lock );
 82                  if ( joined == THREAD_JOINED )
 83                  {
 84                      Thread_Join( &unsubThread, &ret );
 85 krisbash 1.1         Thread_Destroy( &unsubThread );
 86                      trace_ProvMgr_Destroy_Join_nonioThread( ret );
 87                  }
 88              }
 89              
 90              _Use_decl_annotations_
 91              void RequestHandler_Finalize(RequestHandler* self)
 92              {
 93                  DEBUG_ASSERT( self );
 94                  RequestHandler_Shutdown( self );
 95                  {
 96                      RequestItem* ui = NULL;
 97                      while (NULL != (ui = RequestList_RemoveItem( &self->list )))
 98                      {
 99                          if (ui->type == REQUEST_SUBSCRIBE )
100                          {
101                              SubscribeProviderItem* spi = (SubscribeProviderItem*) ui;
102                              Message_Release( &spi->msg->base.base );
103                          }
104              
105                          trace_nioproc_FreeRequestItem(ui);
106 krisbash 1.1             PAL_Free(ui);
107                      }
108                  }
109                  self->joined = THREAD_NOT_JOINED;
110                  self->running = THREAD_NOT_RUNNING; 
111              }
112              
113              
114              //
115              // Create unsubscribe RequestItem
116              //
117              static RequestItem* _CreateUnsubscribeProviderItem(
118                  _In_opt_ SubscriptionContext* ctx,
119                  _In_ MI_Boolean invokeRequest,
120                  _In_ MI_Result finalResult )
121              {
122                  UnsubscribeProviderItem* ui = (UnsubscribeProviderItem*)PAL_Malloc(sizeof(UnsubscribeProviderItem));
123                  if (ui)
124                  {
125                      ui->base.type = REQUEST_UNSUBSCRIBE;
126                      ui->base.next = NULL;
127 krisbash 1.1         ui->ctx = ctx;
128                      ui->invokeRequest = invokeRequest;
129                      ui->finalResult = finalResult;
130                  }
131                  
132                  return (RequestItem*)ui;
133              }
134              
135              //
136              // Create subscribe RequestItem
137              //
138              static RequestItem* _CreateSubscribeProviderItem(
139                  _In_ Provider* provider,
140                  _In_ SubscribeReq* msg,
141                  _In_ SubscriptionContext* subscrContext )
142              {
143                  SubscribeProviderItem* ui = (SubscribeProviderItem*)PAL_Malloc(sizeof(SubscribeProviderItem));
144                  if (ui)
145                  {
146                      ui->base.type = REQUEST_SUBSCRIBE;
147                      ui->base.next = NULL;
148 krisbash 1.1         ui->provider = provider;
149                      ui->msg = msg;
150                      Message_AddRef( &msg->base.base );  // this reference is removed once the scheduled method (_Context_Aux_InvokeSubscribe) is executed
151                      ui->subscrContext = subscrContext;
152                  }
153                  
154                  trace_nioproc_CreateRequestItem((RequestItem*)ui);
155              
156                  return (RequestItem*)ui;
157              }
158              
159              //
160              // Get one request item and remove it from list
161              //
162              static RequestItem* RequestList_RemoveItem(_Inout_ RequestList* list)
163              {
164                  RequestItem* ui = NULL;
165                  ReadWriteLock_AcquireWrite(&list->lock);
166                  if (list->head)
167                  {
168                      DEBUG_ASSERT(NULL != list->tail);
169 krisbash 1.1         ui = list->head;
170                      list->head = ui->next;
171                      if (list->head == NULL)
172                          list->tail = NULL;
173                  }
174                  ReadWriteLock_ReleaseWrite(&list->lock);
175                  return ui;
176              }
177              
178              static MI_Result RequestList_RemoveSpecificItem(_Inout_ RequestList* list, _In_ RequestItem* item)
179              {
180                  RequestItem* current = NULL;
181                  RequestItem* prev = NULL;
182                  MI_Result result = MI_RESULT_NOT_FOUND;
183              
184                  DEBUG_ASSERT(NULL != list->head);
185              
186                  if(item == NULL || list == NULL)    
187                      return MI_RESULT_NOT_FOUND;
188                  
189                  ReadWriteLock_AcquireWrite(&list->lock);
190 krisbash 1.1 
191                  current = list->head;
192              
193                  while (current)
194                  {
195                      if(current == item)
196                      {
197                          if(!prev)
198                          {
199                              // The item is on the head of the list
200                              list->head = current->next;
201              
202                              if(list->head == NULL)
203                                  list->tail = NULL;
204                          }
205                          else 
206                          {
207                              // The item is on the middle or the end of the list
208                              prev->next = current->next;
209                              
210                              if(prev->next == NULL)
211 krisbash 1.1                     list->tail = prev;
212                          }
213              
214                          current->next = NULL;
215                          result = MI_RESULT_OK;
216                          break;
217                      }
218              
219                      prev = current;
220                      current = current->next;
221                  }
222              
223                  ReadWriteLock_ReleaseWrite(&list->lock);
224              
225                  return result;
226              }
227              
228              //
229              // Queue the request and spawn a new thread to perform the operation if not
230              // created yet.
231              //
232 krisbash 1.1 static MI_Result RequestList_ScheduleItem(
233                  _Inout_ RequestList* list,
234                  _In_ RequestItem* item )
235              {
236                  RequestHandler* handler = &g_requesthandler;
237                  MI_Result finalResult = MI_RESULT_OK;
238                  
239                  // insert request into list
240                  ReadWriteLock_AcquireWrite(&list->lock);
241                  if (list->tail)
242                  {
243                      DEBUG_ASSERT(NULL != list->head);        
244                      list->tail->next = item;
245                      list->tail = item;
246                  }
247                  else
248                  {
249                      DEBUG_ASSERT(NULL == list->head);
250                      list->head = list->tail = item;
251                  }
252                  ReadWriteLock_ReleaseWrite(&list->lock);
253 krisbash 1.1     
254                  // create thread if needed
255                  Lock_Acquire( &handler->lock );
256                  if (g_requesthandler.running == THREAD_NOT_RUNNING)
257                  {
258                      //
259                      // OMI has only one IO thread, which might be blocked if call Request to provider here,
260                      // thus create another thread to invoke Request to provider
261                      //
262                      int r = Thread_CreateJoinable(&handler->thread, noniothread_proc, NULL, (void*)handler);
263                      if ( r != 0 )
264                      {
265                          MI_Char buffer[128];
266                          int err = errno;
267                          trace_RequestList_ScheduleItem_CreateNonIOThreadFailed(err, ErrnoToString((MI_Uint32)err, buffer, MI_COUNT(buffer)));
268                          finalResult = MI_RESULT_SERVER_LIMITS_EXCEEDED;
269                      }
270                      else
271                      {
272                          g_requesthandler.running = THREAD_RUNNING;
273                      }
274 krisbash 1.1     }
275                  Lock_Release( &handler->lock );
276              
277                  if (finalResult != MI_RESULT_OK)
278                  {        
279                      // Since result is not OK, remove the item that we just added 
280                      RequestList_RemoveSpecificItem(list, item);
281                  }
282              
283                  return finalResult;
284              }
285              
286              //
287              // Queue the unsubscribe request and spawn a new thread to
288              // perform the unsubscribe operation if not created yet
289              //
290              _Use_decl_annotations_
291              MI_Result Schedule_UnsubscribeProvider(
292                  SubscriptionContext* ctx,
293                  MI_Boolean invokeRequest,
294                  MI_Result finalResult)
295 krisbash 1.1 {
296                  SubMgrSubscription* subscription;
297                  RequestItem* item = NULL;
298                  MI_Result r;
299                  DEBUG_ASSERT(ctx);
300              
301                  subscription = ctx->subscription;        
302              
303                  item = _CreateUnsubscribeProviderItem( ctx, invokeRequest, finalResult );
304                  if ( NULL == item )
305                  {
306                      /*
307                       * FATAL error (out of memory) happened, not much we can do here,
308                       * restart sever is the only option
309                       */
310                      trace_OutOfMemory();
311              
312                      return MI_RESULT_SERVER_LIMITS_EXCEEDED;
313                  }
314              
315                  trace_ScheduleRequest_UnsubscribeProvider(UintThreadID(), subscription);
316 krisbash 1.1 
317                  /* 
318                   * Add a reference to SubMgrSubscription, which will be released in
319                   * noniothread_proc
320                   */
321                  SubMgrSubscription_Addref( subscription );
322              
323                  r = RequestList_ScheduleItem( &g_requesthandler.list, item );
324                  if (r != MI_RESULT_OK )
325                  {
326                      /* Decrement the ref count to balance the addref above*/
327                      SubMgrSubscription_Release( subscription );
328                      PAL_Free(item);
329                  }
330                  return r;
331              }
332              
333              //
334              // Queue sending final result request and spawn a new thread to
335              // perform the unsubscribe operation if not created yet
336              //
337 krisbash 1.1 _Use_decl_annotations_
338              MI_Result Schedule_SendFinalResult(
339                  SubscriptionContext* ctx,
340                  MI_Result finalResult)
341              {
342                  return Schedule_UnsubscribeProvider( ctx, MI_FALSE, finalResult );
343              }
344              
345              //
346              // Queue subscribe request and spawn a new thread to
347              // perform the unsubscribe operation if not created yet
348              //
349              MI_Result Schedule_SubscribeRequest(
350                  _In_ Provider* provider,
351                  _In_ SubscribeReq* msg,
352                  _In_ SubscriptionContext* subscrContext )
353              {
354                  RequestItem* item = NULL;
355                  MI_Result r;
356                  DEBUG_ASSERT(provider);
357                  STRAND_ASSERTONSTRAND( &subscrContext->baseCtx.strand );
358 krisbash 1.1 
359                  item = _CreateSubscribeProviderItem( provider, msg, subscrContext );
360                  Strand_Leave( &subscrContext->baseCtx.strand );
361                  if ( NULL == item )
362                  {
363                      //
364                      // FATAL error (out of memory) happened, not much we can do here,
365                      // restart sever is the only option
366                      //
367                      trace_OutOfMemory();
368                      return MI_RESULT_SERVER_LIMITS_EXCEEDED;
369                  }
370              
371                  trace_ScheduleRequest_SubscribeProvider(UintThreadID(), provider, msg, subscrContext);
372              
373                  r = RequestList_ScheduleItem( &g_requesthandler.list, item );
374              
375                  if (r != MI_RESULT_OK )
376                  {
377                      // Decrement the ref count for the message. This is to balance the Add ref in _CreateSubscribeProviderItem
378                      Message_Release( &msg->base.base );
379 krisbash 1.1         trace_nioproc_FreeRequestItem(item);
380                      PAL_Free(item);
381                  }
382              
383                  return r;
384              }
385              
386              //
387              // A proc running from a spawned thread to unsuscribe the indication provider
388              //
389              PAL_Uint32 THREAD_API noniothread_proc(void* p)
390              {
391                  RequestHandler* handler = (RequestHandler*)p;
392                  trace_noniothread_proc_start(UintThreadID());
393                  while ( THREAD_NOT_JOINED == Atomic_Read(&handler->joined) )
394                  {
395                      RequestItem* ui = RequestList_RemoveItem( &handler->list );
396                      if (NULL == ui)
397                      {
398                          PAL_Boolean quit = PAL_FALSE;
399                          ReadWriteLock_AcquireRead( &handler->list.lock );
400 krisbash 1.1             if ( NULL == handler->list.head )
401                          {
402                              // shutdown current thread
403                              Lock_Acquire( &handler->lock );
404                              handler->running = THREAD_NOT_RUNNING;
405                              if ( handler->joined == THREAD_NOT_JOINED)
406                              {
407              #if !defined(_MSC_VER)
408                                  // if not joined yet, release thread resources
409                                  pthread_detach(handler->thread.__impl);
410              # endif
411                                  // if not joined yet, then close
412                                  Thread_Destroy( &handler->thread );
413                              }
414                              Lock_Release( &handler->lock );
415                              quit = PAL_TRUE;
416                          }
417                          ReadWriteLock_ReleaseRead( &handler->list.lock );
418              
419                          if ( PAL_TRUE == quit )
420                              break; // terminate current thread
421 krisbash 1.1             else
422                              continue;
423                      }
424                      else
425                      {
426                          switch( ui->type )
427                          {
428                          case REQUEST_UNSUBSCRIBE:
429                              {
430                                  UnsubscribeProviderItem* upi = (UnsubscribeProviderItem*) ui;
431                                  SubMgrSubscription* sub = upi->ctx->subscription;
432                                  DEBUG_ASSERT( sub );
433                                  SubscrContext_UnsubprvdOrSendfinalmsg(
434                                      upi->ctx,
435                                      upi->invokeRequest,
436                                      upi->finalResult);
437              
438                                  /*
439                                   * Release the ref that was incremented prior to queueing the request
440                                   */
441                                  SubMgrSubscription_Release( sub );
442 krisbash 1.1                 }
443                              break;
444                          case REQUEST_SUBSCRIBE:
445                              {
446                                  SubscribeProviderItem* spi = (SubscribeProviderItem*) ui;
447              
448                                  spi->subscrContext->baseCtx.provider = spi->provider;
449                                  spi->subscrContext->baseCtx.strand.info.stored.msg = &spi->msg->base.base;  // a reference was added to this message on _CreateSubscribeProviderItem
450              
451                                  Strand_ScheduleAux( &(spi->subscrContext->baseCtx.strand), CONTEXT_STRANDAUX_INVOKESUBSCRIBE );
452                              }
453                              break;
454                          }
455                          trace_nioproc_FreeRequestItem(ui);
456                          PAL_Free(ui);
457                      }
458                  }
459                  trace_noniothread_proc_end(UintThreadID());
460                  return 0;
461              }
462              

ViewCVS 0.9.2