1 krisbash 1.1 /*
2 **==============================================================================
3 **
4 ** Open Management Infrastructure (OMI)
5 **
6 ** Copyright (c) Microsoft Corporation
7 **
8 ** Licensed under the Apache License, Version 2.0 (the "License"); you may not
9 ** use this file except in compliance with the License. You may obtain a copy
10 ** of the License at
11 **
12 ** http://www.apache.org/licenses/LICENSE-2.0
13 **
14 ** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 ** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED
16 ** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
17 ** MERCHANTABLITY OR NON-INFRINGEMENT.
18 **
19 ** See the Apache 2 License for the specific language governing permissions
20 ** and limitations under the License.
21 **
22 krisbash 1.1 **==============================================================================
23 */
24
25 #include <pal/atomic.h>
26 #include <base/batch.h>
27 #include <omi_error/omierror.h>
28 #include "nioproc.h"
29 #include "SubMgr.h"
30 #include "provmgr.h"
31
32 #define THREAD_NOT_JOINED 0
33 #define THREAD_JOINED 1
34
35 #define THREAD_NOT_RUNNING 0
36 #define THREAD_RUNNING 1
37
38 RequestHandler g_requesthandler;
39
40 static RequestItem* RequestList_RemoveItem(_Inout_ RequestList* list);
41 static MI_Result RequestList_RemoveSpecificItem(_Inout_ RequestList* list, _In_ RequestItem* item);
42
43 krisbash 1.1 //
44 // A proc running from a spawned thread to unsuscribe the indication opertion(s)
45 //
46 PAL_Uint32 THREAD_API noniothread_proc(void* p);
47
48 _Use_decl_annotations_
49 void RequestList_Init(RequestList* list)
50 {
51 list->head = NULL;
52 list->tail = NULL;
53 ReadWriteLock_Init( &list->lock );
54 }
55
56 _Use_decl_annotations_
57 void RequestHandler_Init(RequestHandler* handler)
58 {
59 RequestList_Init(&handler->list);
60 Lock_Init(&handler->lock);
61 handler->running = THREAD_NOT_RUNNING;
62 handler->joined = THREAD_NOT_JOINED;
63 }
64 krisbash 1.1
65 //
66 // shutdown handler, i.e. wait for unsubscribe thread to exit
67 //
68 static void RequestHandler_Shutdown(RequestHandler* handler)
69 {
70 // wait for unsubscribe thread to exit
71 Thread unsubThread;
72 ptrdiff_t joined = THREAD_NOT_JOINED;
73 PAL_Uint32 ret;
74 Lock_Acquire( &handler->lock );
75 if ( handler->running == THREAD_RUNNING )
76 {
77 joined = THREAD_JOINED;
78 handler->joined = joined;
79 unsubThread = handler->thread;
80 }
81 Lock_Release( &handler->lock );
82 if ( joined == THREAD_JOINED )
83 {
84 Thread_Join( &unsubThread, &ret );
85 krisbash 1.1 Thread_Destroy( &unsubThread );
86 trace_ProvMgr_Destroy_Join_nonioThread( ret );
87 }
88 }
89
90 _Use_decl_annotations_
91 void RequestHandler_Finalize(RequestHandler* self)
92 {
93 DEBUG_ASSERT( self );
94 RequestHandler_Shutdown( self );
95 {
96 RequestItem* ui = NULL;
97 while (NULL != (ui = RequestList_RemoveItem( &self->list )))
98 {
99 if (ui->type == REQUEST_SUBSCRIBE )
100 {
101 SubscribeProviderItem* spi = (SubscribeProviderItem*) ui;
102 Message_Release( &spi->msg->base.base );
103 }
104
105 trace_nioproc_FreeRequestItem(ui);
106 krisbash 1.1 PAL_Free(ui);
107 }
108 }
109 self->joined = THREAD_NOT_JOINED;
110 self->running = THREAD_NOT_RUNNING;
111 }
112
113
114 //
115 // Create unsubscribe RequestItem
116 //
117 static RequestItem* _CreateUnsubscribeProviderItem(
118 _In_opt_ SubscriptionContext* ctx,
119 _In_ MI_Boolean invokeRequest,
120 _In_ MI_Result finalResult )
121 {
122 UnsubscribeProviderItem* ui = (UnsubscribeProviderItem*)PAL_Malloc(sizeof(UnsubscribeProviderItem));
123 if (ui)
124 {
125 ui->base.type = REQUEST_UNSUBSCRIBE;
126 ui->base.next = NULL;
127 krisbash 1.1 ui->ctx = ctx;
128 ui->invokeRequest = invokeRequest;
129 ui->finalResult = finalResult;
130 }
131
132 return (RequestItem*)ui;
133 }
134
135 //
136 // Create subscribe RequestItem
137 //
138 static RequestItem* _CreateSubscribeProviderItem(
139 _In_ Provider* provider,
140 _In_ SubscribeReq* msg,
141 _In_ SubscriptionContext* subscrContext )
142 {
143 SubscribeProviderItem* ui = (SubscribeProviderItem*)PAL_Malloc(sizeof(SubscribeProviderItem));
144 if (ui)
145 {
146 ui->base.type = REQUEST_SUBSCRIBE;
147 ui->base.next = NULL;
148 krisbash 1.1 ui->provider = provider;
149 ui->msg = msg;
150 Message_AddRef( &msg->base.base ); // this reference is removed once the scheduled method (_Context_Aux_InvokeSubscribe) is executed
151 ui->subscrContext = subscrContext;
152 }
153
154 trace_nioproc_CreateRequestItem((RequestItem*)ui);
155
156 return (RequestItem*)ui;
157 }
158
159 //
160 // Get one request item and remove it from list
161 //
162 static RequestItem* RequestList_RemoveItem(_Inout_ RequestList* list)
163 {
164 RequestItem* ui = NULL;
165 ReadWriteLock_AcquireWrite(&list->lock);
166 if (list->head)
167 {
168 DEBUG_ASSERT(NULL != list->tail);
169 krisbash 1.1 ui = list->head;
170 list->head = ui->next;
171 if (list->head == NULL)
172 list->tail = NULL;
173 }
174 ReadWriteLock_ReleaseWrite(&list->lock);
175 return ui;
176 }
177
178 static MI_Result RequestList_RemoveSpecificItem(_Inout_ RequestList* list, _In_ RequestItem* item)
179 {
180 RequestItem* current = NULL;
181 RequestItem* prev = NULL;
182 MI_Result result = MI_RESULT_NOT_FOUND;
183
184 DEBUG_ASSERT(NULL != list->head);
185
186 if(item == NULL || list == NULL)
187 return MI_RESULT_NOT_FOUND;
188
189 ReadWriteLock_AcquireWrite(&list->lock);
190 krisbash 1.1
191 current = list->head;
192
193 while (current)
194 {
195 if(current == item)
196 {
197 if(!prev)
198 {
199 // The item is on the head of the list
200 list->head = current->next;
201
202 if(list->head == NULL)
203 list->tail = NULL;
204 }
205 else
206 {
207 // The item is on the middle or the end of the list
208 prev->next = current->next;
209
210 if(prev->next == NULL)
211 krisbash 1.1 list->tail = prev;
212 }
213
214 current->next = NULL;
215 result = MI_RESULT_OK;
216 break;
217 }
218
219 prev = current;
220 current = current->next;
221 }
222
223 ReadWriteLock_ReleaseWrite(&list->lock);
224
225 return result;
226 }
227
228 //
229 // Queue the request and spawn a new thread to perform the operation if not
230 // created yet.
231 //
232 krisbash 1.1 static MI_Result RequestList_ScheduleItem(
233 _Inout_ RequestList* list,
234 _In_ RequestItem* item )
235 {
236 RequestHandler* handler = &g_requesthandler;
237 MI_Result finalResult = MI_RESULT_OK;
238
239 // insert request into list
240 ReadWriteLock_AcquireWrite(&list->lock);
241 if (list->tail)
242 {
243 DEBUG_ASSERT(NULL != list->head);
244 list->tail->next = item;
245 list->tail = item;
246 }
247 else
248 {
249 DEBUG_ASSERT(NULL == list->head);
250 list->head = list->tail = item;
251 }
252 ReadWriteLock_ReleaseWrite(&list->lock);
253 krisbash 1.1
254 // create thread if needed
255 Lock_Acquire( &handler->lock );
256 if (g_requesthandler.running == THREAD_NOT_RUNNING)
257 {
258 //
259 // OMI has only one IO thread, which might be blocked if call Request to provider here,
260 // thus create another thread to invoke Request to provider
261 //
262 int r = Thread_CreateJoinable(&handler->thread, noniothread_proc, NULL, (void*)handler);
263 if ( r != 0 )
264 {
265 MI_Char buffer[128];
266 int err = errno;
267 trace_RequestList_ScheduleItem_CreateNonIOThreadFailed(err, ErrnoToString((MI_Uint32)err, buffer, MI_COUNT(buffer)));
268 finalResult = MI_RESULT_SERVER_LIMITS_EXCEEDED;
269 }
270 else
271 {
272 g_requesthandler.running = THREAD_RUNNING;
273 }
274 krisbash 1.1 }
275 Lock_Release( &handler->lock );
276
277 if (finalResult != MI_RESULT_OK)
278 {
279 // Since result is not OK, remove the item that we just added
280 RequestList_RemoveSpecificItem(list, item);
281 }
282
283 return finalResult;
284 }
285
286 //
287 // Queue the unsubscribe request and spawn a new thread to
288 // perform the unsubscribe operation if not created yet
289 //
290 _Use_decl_annotations_
291 MI_Result Schedule_UnsubscribeProvider(
292 SubscriptionContext* ctx,
293 MI_Boolean invokeRequest,
294 MI_Result finalResult)
295 krisbash 1.1 {
296 SubMgrSubscription* subscription;
297 RequestItem* item = NULL;
298 MI_Result r;
299 DEBUG_ASSERT(ctx);
300
301 subscription = ctx->subscription;
302
303 item = _CreateUnsubscribeProviderItem( ctx, invokeRequest, finalResult );
304 if ( NULL == item )
305 {
306 /*
307 * FATAL error (out of memory) happened, not much we can do here,
308 * restart sever is the only option
309 */
310 trace_OutOfMemory();
311
312 return MI_RESULT_SERVER_LIMITS_EXCEEDED;
313 }
314
315 trace_ScheduleRequest_UnsubscribeProvider(UintThreadID(), subscription);
316 krisbash 1.1
317 /*
318 * Add a reference to SubMgrSubscription, which will be released in
319 * noniothread_proc
320 */
321 SubMgrSubscription_Addref( subscription );
322
323 r = RequestList_ScheduleItem( &g_requesthandler.list, item );
324 if (r != MI_RESULT_OK )
325 {
326 /* Decrement the ref count to balance the addref above*/
327 SubMgrSubscription_Release( subscription );
328 PAL_Free(item);
329 }
330 return r;
331 }
332
333 //
334 // Queue sending final result request and spawn a new thread to
335 // perform the unsubscribe operation if not created yet
336 //
337 krisbash 1.1 _Use_decl_annotations_
338 MI_Result Schedule_SendFinalResult(
339 SubscriptionContext* ctx,
340 MI_Result finalResult)
341 {
342 return Schedule_UnsubscribeProvider( ctx, MI_FALSE, finalResult );
343 }
344
345 //
346 // Queue subscribe request and spawn a new thread to
347 // perform the unsubscribe operation if not created yet
348 //
349 MI_Result Schedule_SubscribeRequest(
350 _In_ Provider* provider,
351 _In_ SubscribeReq* msg,
352 _In_ SubscriptionContext* subscrContext )
353 {
354 RequestItem* item = NULL;
355 MI_Result r;
356 DEBUG_ASSERT(provider);
357 STRAND_ASSERTONSTRAND( &subscrContext->baseCtx.strand );
358 krisbash 1.1
359 item = _CreateSubscribeProviderItem( provider, msg, subscrContext );
360 Strand_Leave( &subscrContext->baseCtx.strand );
361 if ( NULL == item )
362 {
363 //
364 // FATAL error (out of memory) happened, not much we can do here,
365 // restart sever is the only option
366 //
367 trace_OutOfMemory();
368 return MI_RESULT_SERVER_LIMITS_EXCEEDED;
369 }
370
371 trace_ScheduleRequest_SubscribeProvider(UintThreadID(), provider, msg, subscrContext);
372
373 r = RequestList_ScheduleItem( &g_requesthandler.list, item );
374
375 if (r != MI_RESULT_OK )
376 {
377 // Decrement the ref count for the message. This is to balance the Add ref in _CreateSubscribeProviderItem
378 Message_Release( &msg->base.base );
379 krisbash 1.1 trace_nioproc_FreeRequestItem(item);
380 PAL_Free(item);
381 }
382
383 return r;
384 }
385
386 //
387 // A proc running from a spawned thread to unsuscribe the indication provider
388 //
389 PAL_Uint32 THREAD_API noniothread_proc(void* p)
390 {
391 RequestHandler* handler = (RequestHandler*)p;
392 trace_noniothread_proc_start(UintThreadID());
393 while ( THREAD_NOT_JOINED == Atomic_Read(&handler->joined) )
394 {
395 RequestItem* ui = RequestList_RemoveItem( &handler->list );
396 if (NULL == ui)
397 {
398 PAL_Boolean quit = PAL_FALSE;
399 ReadWriteLock_AcquireRead( &handler->list.lock );
400 krisbash 1.1 if ( NULL == handler->list.head )
401 {
402 // shutdown current thread
403 Lock_Acquire( &handler->lock );
404 handler->running = THREAD_NOT_RUNNING;
405 if ( handler->joined == THREAD_NOT_JOINED)
406 {
407 #if !defined(_MSC_VER)
408 // if not joined yet, release thread resources
409 pthread_detach(handler->thread.__impl);
410 # endif
411 // if not joined yet, then close
412 Thread_Destroy( &handler->thread );
413 }
414 Lock_Release( &handler->lock );
415 quit = PAL_TRUE;
416 }
417 ReadWriteLock_ReleaseRead( &handler->list.lock );
418
419 if ( PAL_TRUE == quit )
420 break; // terminate current thread
421 krisbash 1.1 else
422 continue;
423 }
424 else
425 {
426 switch( ui->type )
427 {
428 case REQUEST_UNSUBSCRIBE:
429 {
430 UnsubscribeProviderItem* upi = (UnsubscribeProviderItem*) ui;
431 SubMgrSubscription* sub = upi->ctx->subscription;
432 DEBUG_ASSERT( sub );
433 SubscrContext_UnsubprvdOrSendfinalmsg(
434 upi->ctx,
435 upi->invokeRequest,
436 upi->finalResult);
437
438 /*
439 * Release the ref that was incremented prior to queueing the request
440 */
441 SubMgrSubscription_Release( sub );
442 krisbash 1.1 }
443 break;
444 case REQUEST_SUBSCRIBE:
445 {
446 SubscribeProviderItem* spi = (SubscribeProviderItem*) ui;
447
448 spi->subscrContext->baseCtx.provider = spi->provider;
449 spi->subscrContext->baseCtx.strand.info.stored.msg = &spi->msg->base.base; // a reference was added to this message on _CreateSubscribeProviderItem
450
451 Strand_ScheduleAux( &(spi->subscrContext->baseCtx.strand), CONTEXT_STRANDAUX_INVOKESUBSCRIBE );
452 }
453 break;
454 }
455 trace_nioproc_FreeRequestItem(ui);
456 PAL_Free(ui);
457 }
458 }
459 trace_noniothread_proc_end(UintThreadID());
460 return 0;
461 }
462
|