1 krisbash 1.1 /*
2 **==============================================================================
3 **
4 ** Open Management Infrastructure (OMI)
5 **
6 ** Copyright (c) Microsoft Corporation
7 **
|
8 krisbash 1.2 ** Licensed under the Apache License, Version 2.0 (the "License"); you may not
9 ** use this file except in compliance with the License. You may obtain a copy
10 ** of the License at
11 **
12 ** http://www.apache.org/licenses/LICENSE-2.0
|
13 krisbash 1.1 **
14 ** THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
15 krisbash 1.2 ** KIND, EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED
16 ** WARRANTIES OR CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE,
17 ** MERCHANTABLITY OR NON-INFRINGEMENT.
|
18 krisbash 1.1 **
|
19 krisbash 1.2 ** See the Apache 2 License for the specific language governing permissions
|
20 krisbash 1.1 ** and limitations under the License.
21 **
22 **==============================================================================
23 */
24
25 #include "SubMgr.h"
26 #include <indication/common/indilog.h>
|
27 krisbash 1.2 #include <pal/intsafe.h>
|
28 krisbash 1.1
29 unsigned int UintThreadID()
30 {
31 ThreadID id = Thread_ID();
32 return *((unsigned int*)(void*)&id);
33 }
34
35 /*
36 *
37 * SubscriptionManager Functions
38 *
39 */
40
41 SubscriptionManager* SubMgr_New()
42 {
43 SubscriptionManager* subMgr = (SubscriptionManager*)PAL_Calloc(1, sizeof(SubscriptionManager));
44 if (!subMgr)
45 LOGD_ALLOC_OOM;
46 return subMgr;
47 }
48
49 krisbash 1.1 _Use_decl_annotations_
50 void SubMgr_Delete(SubscriptionManager* mgr )
51 {
52 SubMgr_Finalize( mgr );
53 PAL_Free(mgr);
54 }
55
56 /*
57 * Perform generic initialization of a SubscriptionManager object.
58 */
59 _Use_decl_annotations_
60 void SubMgr_Init(
61 SubscriptionManager* mgr,
62 Provider* provider )
63 {
64 DEBUG_ASSERT (mgr);
65 RecursiveLock_Init( &mgr->enablelock );
66 mgr->terminating = MI_FALSE;
67 mgr->enabled = MI_FALSE;
68 mgr->provider = provider;
69 SubscriptionList_Init( &mgr->subscrList );
70 krisbash 1.1 }
71
72 _Use_decl_annotations_
73 void SubMgr_Finalize(
74 SubscriptionManager* self)
75 {
76 DEBUG_ASSERT( self );
77 if (self->aggrCtx)
78 {
79 AggrContext_Delete(self->aggrCtx);
80 self->aggrCtx = NULL;
81 }
82 if (self->lifecycleCtx)
83 {
84 LifeContext_Delete(self->lifecycleCtx);
85 self->lifecycleCtx = NULL;
86 }
87 SubscriptionList_Finalize( &self->subscrList );
88 }
89
90 _Use_decl_annotations_
91 krisbash 1.1 MI_Result SubMgr_CreateSubscription(
92 SubscriptionManager* mgr,
93 Provider* provider,
94 InteractionOpenParams* interactionParams,
95 SubscriptionContext** subscrContext )
96 {
97 SubMgrSubscription* subscription = NULL;
98 MI_Result result = MI_RESULT_OK;
99 Context_Type ctxType;
100 SubscribeReq* msg = (SubscribeReq*)interactionParams->msg;
101 SubscriptionContext* subContext;
102
103 DEBUG_ASSERT ( mgr && provider && interactionParams && interactionParams->msg && subscrContext );
104 DEBUG_ASSERT( SubscribeReqTag == interactionParams->msg->tag );
105
|
106 krisbash 1.2 /*
|
107 krisbash 1.1 * Create the SubscriptionContext and prepare it for use in case the
108 * provider attempts to PostIndication immediately after
|
109 krisbash 1.2 * EnableIndications without waiting for Subscribe.
|
110 krisbash 1.1 */
111 subContext = (SubscriptionContext*)Batch_GetClear(msg->base.base.batch, sizeof(SubscriptionContext));
112 if (!subContext)
113 {
114 trace_SubscrContextFailed();
115 return MI_RESULT_SERVER_LIMITS_EXCEEDED;
116 }
117
118 /* This refCount is released in CONTEXT_STRANDAUX_INVOKESUBSCRIBE. */
119 subscription = SubMgrSubscription_New(msg);
120 if (!subscription)
121 {
122 trace_SubMgrSubscription_AllocFailed();
123 return MI_RESULT_FAILED;
124 }
125
126 if (SubMgrSubscription_IsQueryValid(subscription, provider->classDecl) == MI_FALSE)
127 {
128 SubMgrSubscription_Release(subscription);
129 return MI_RESULT_INVALID_QUERY;
130 }
131 krisbash 1.1
132 *subscrContext = subContext;
133
134 subscription->subscribeCtx = *subscrContext; // TODO: should this action be wrapped?
135
136 ctxType = SubscriptionTargetType_IsLifecycle((SubscriptionTargetType)msg->targetType ) ? CTX_TYPE_IND_LIFECYCLE : CTX_TYPE_IND_SUBSCRIPTION; // TODO: Move to Context.h?
137 result = SubscrContext_Init( *subscrContext, provider, ctxType, interactionParams, subscription);
138 if ( MI_RESULT_OK != result)
139 {
140 trace_SubscriptionContext_InitFailed();
141 subscription->subscribeCtx = NULL;
142 SubMgrSubscription_Release(subscription);
143 return result;
144 }
145 SubMgrSubscription_SetState( subscription, SubscriptionState_Initialized );
146
147 return MI_RESULT_OK;
148 }
149
150 _Use_decl_annotations_
151 MI_Result SubMgr_DeleteSubscription(
152 krisbash 1.1 SubscriptionManager* mgr,
153 SubMgrSubscription* subscription )
154 {
155 MI_Result r;
156 DEBUG_ASSERT ( mgr && subscription );
157
158 trace_SubMgr_DeleteSubscription_Start(UintThreadID(), mgr, subscription, (MI_Uint32)mgr->subscrList.count, mgr->subscrList.head, mgr->subscrList.tail);
159
160 r = SubscriptionList_DeleteSubscription( &mgr->subscrList, subscription, mgr );
161
162 return r;
163 }
164
165 _Use_decl_annotations_
166 MI_Result SubMgr_CancelAllSubscriptions(
167 SubscriptionManager* mgr,
168 MI_Result result,
169 const ZChar* errorMessage,
170 const MI_Instance* cimError )
171 {
172 DEBUG_ASSERT (mgr);
173 krisbash 1.1 trace_SubMgr_CancelAllSubscriptions( UintThreadID(), mgr);
174
175 return SubscriptionList_CancelAllSubscription( &mgr->subscrList );
176 }
177
178 static SubMgrSubscription* _SubscriptionList_GetSubscription(
179 _In_ const SubscriptionList* self,
180 _In_ MI_Uint64 subscriptionID)
181 {
182 SubMgrSubscription* subscription;
183 SubscriptionList* list = (SubscriptionList*)self;
184 ReadWriteLock_AcquireRead(&list->lock);
185 for (subscription = (SubMgrSubscription*)self->head; subscription; subscription = subscription->next)
186 {
187 if (subscription->subscriptionID == subscriptionID)
188 {
189 break;
190 }
191 }
192 ReadWriteLock_ReleaseRead(&list->lock);
193
194 krisbash 1.1 return subscription;
195 }
196
197 static SubMgrSubscription* _SubscriptionList_GetSubscriptionByContext(
198 _In_ const SubscriptionList* self,
199 _In_ SubscriptionContext* subCtx )
200 {
201 SubMgrSubscription* subscription;
202 SubscriptionList* list = (SubscriptionList*)self;
203
204 /* thread safely get subscriptions and add ref count */
205 ReadWriteLock_AcquireRead( &list->lock );
206 subscription = (SubMgrSubscription*) self->head;
207 while( subscription && (subscription->subscribeCtx != subCtx) )
208 {
209 subscription = subscription->next;
210 }
211 if (subscription)
212 SubMgrSubscription_Addref(subscription);
213 ReadWriteLock_ReleaseRead( &list->lock );
214
215 krisbash 1.1 return subscription;
216 }
217
218 /*
|
219 krisbash 1.2 * Examines the SubscriptionManager to determine if it has any matching
|
220 krisbash 1.1 * subscriptions.
221 */
222 _Use_decl_annotations_
223 SubMgrSubscription* SubMgr_GetSubscription(
224 const SubscriptionManager* mgr,
225 MI_Uint64 subscriptionID )
226 {
227 DEBUG_ASSERT( mgr );
228 return _SubscriptionList_GetSubscription( &mgr->subscrList, subscriptionID);
229 }
230
231 /* search subscription based on subctx, add refcount to subscription if found */
232 _Use_decl_annotations_
233 SubMgrSubscription* SubMgr_GetSubscriptionByContext(
234 const SubscriptionManager* mgr,
235 SubscriptionContext* subCtx)
236 {
237 SubMgrSubscription* subscription;
238
239 DEBUG_ASSERT( mgr && subCtx );
240
241 krisbash 1.1 subscription = _SubscriptionList_GetSubscriptionByContext( &mgr->subscrList, subCtx );
242
243 if ( NULL == subscription )
244 trace_SubMgr_GetSubscriptionByContext_NotFound(UintThreadID(), (void*)mgr, subCtx);
245
246 return subscription;
247 }
248
249 /* read subscription list snapshot into an array, no refcount added to subscription(s) */
250 _Use_decl_annotations_
251 MI_Result SubMgr_GetSubscriptionList(
252 const SubscriptionManager* mgr,
253 SubMgrSubscriptionPtr** subs,
254 size_t* count)
255 {
256 return SubscriptionList_GetList( & mgr->subscrList, MI_TRUE, subs, count );
257 }
258
259
260 /*
261 * Perform generic initialization of an SubscriptionManager object.
262 krisbash 1.1 */
263 _Use_decl_annotations_
264 SubMgrSubscription* _SubMgrSubscription_New(
265 SubscribeReq* msg,
266 CallSite cs)
267 {
268 SubMgrSubscription* subscription = (SubMgrSubscription*)PAL_Calloc(1, sizeof(SubMgrSubscription));
269 const char* file = "";
270 size_t line = 0;
271
272 #if defined(CONFIG_ENABLE_DEBUG)
273 file = cs.file;
274 line = cs.line;
275 #endif /* defined(CONFIG_ENABLE_DEBUG) */
276
277 if (!subscription)
278 {
279 LOGD_ALLOC_OOM;
280 return NULL;
281 }
282
|
283 krisbash 1.2 /*
|
284 krisbash 1.1 * Set refcount to 1, which will be released in context.c:CONTEXT_STRANDAUX_INVOKESUBSCRIBE.
285 */
286 subscription->refcount = 1;
287 subscription->finalmsgbit = 0;
288 subscription->unsubscribebit = 0;
289
290 SubMgrSubscription_SetState( subscription, SubscriptionState_Initialized );
291
292 subscription->msg = msg;
293 subscription->subscriptionID = msg->subscriptionID;
294
295 subscription->filter = InstanceFilter_New( &(msg->base.base) );
296 if (!subscription->filter)
297 {
298 trace_InstanceFilter_AllocFailed();
299 SubMgrSubscription_Release( subscription );
300 return NULL;
301 }
302
303 RecursiveLock_Init(&subscription->postlock);
304
305 krisbash 1.1 trace_SubMgrSubscription_New(file, (MI_Uint32)line, (void*)subscription, tcs(msg->className), (unsigned int)subscription->refcount);
306
307 /* filter must be initialized separately based on the request message. */
308 return subscription;
309 }
310
311 _Use_decl_annotations_
312 void _SubMgrSubscription_Addref(
313 SubMgrSubscription* subscription,
314 CallSite cs)
315 {
316 ptrdiff_t count = Atomic_Inc(&subscription->refcount);
317 const char* file = "";
318 size_t line = 0;
319
320 #if defined(CONFIG_ENABLE_DEBUG)
321 file = cs.file;
322 line = cs.line;
323 #endif /* defined(CONFIG_ENABLE_DEBUG) */
324
325 trace_SubMgrSubscription_Addref(file, (MI_Uint32)line, (void*)subscription, (unsigned int)count);
326 krisbash 1.1 }
327
328
329 _Use_decl_annotations_
|
330 krisbash 1.2 void _SubMgrSubscription_Release(
|
331 krisbash 1.1 SubMgrSubscription* subscription,
332 CallSite cs)
333 {
334 void *sub = subscription;
335 const char* file = "";
336 size_t line = 0;
337
338 #if defined(CONFIG_ENABLE_DEBUG)
339 file = cs.file;
340 line = cs.line;
341 #endif /* defined(CONFIG_ENABLE_DEBUG) */
342
343 if (subscription)
344 {
345 /* Finalize the subscription if ref count decreased to 0 */
346 ptrdiff_t count = Atomic_Dec(&subscription->refcount);
347 if (count == 0)
348 {
349 trace_SubMgrSubscription_Release_Finalized( UintThreadID(), subscription );
350
351 if (subscription->filter)
352 krisbash 1.1 {
353 InstanceFilter_Destroy( subscription->filter );
354 }
355
356 if (subscription->subscribeCtx)
357 {
358 SubscrContext_Close(subscription->subscribeCtx);
359 subscription->subscribeCtx = NULL;
360 }
361 PAL_Free(subscription);
362 }
363 trace_SubMgrSubscription_Release(file, (MI_Uint32)line, sub, (unsigned int)count);
364 }
365
|
366 krisbash 1.2 /* Done to satisfy OACR since it doesn't detect a change to subscription when
|
367 krisbash 1.1 * it is free'd. This just modified the local variable subscription, not the
368 * actual ptr in the caller. */
369 subscription = NULL;
370 }
371
372 _Use_decl_annotations_
373 MI_Result SubMgrSubscription_SetState(
374 SubMgrSubscription* subscription,
375 SubscriptionState state )
376 {
377 subscription->state = state;
378 return MI_RESULT_OK;
379 }
380
381 _Use_decl_annotations_
382 SubscriptionTargetType SubMgrSubscription_GetSupportedTypes(
383 SubMgrSubscription* subscription )
384 {
385 if (subscription &&
386 subscription->msg)
387 {
388 krisbash 1.1 return (SubscriptionTargetType)subscription->msg->targetType;
389 }
390 else
391 {
392 return SUBSCRIP_TARGET_UNSUPPORTED;
393 }
394 }
395
396 _Use_decl_annotations_
397 void SubMgrSubscription_AcuquirePostLock(SubMgrSubscription* self)
398 {
399 RecursiveLock_Acquire(&self->postlock);
400
401 trace_SubMgrSubscription_AcquirePostLock(UintThreadID(), (void*)self, self->subscribeCtx);
402 }
403
404 _Use_decl_annotations_
405 void SubMgrSubscription_ReleasePostLock(SubMgrSubscription* self)
406 {
407 trace_SubMgrSubscription_ReleasePostLock(UintThreadID(), (void*)self, self->subscribeCtx);
408
409 krisbash 1.1 RecursiveLock_Release(&self->postlock);
410 }
411
412 _Use_decl_annotations_
413 MI_Boolean SubMgrSubscription_ShouldCallUnsubscribe(
414 SubMgrSubscription* self)
415 {
416 MI_Boolean shouldunsubscribe = MI_FALSE;
417
418 if ( Atomic_CompareAndSwap(&self->unsubscribebit, 0, 1) == 0 )
419 shouldunsubscribe = MI_TRUE;
420 else
421 trace_SubMgrSubscription_ShouldCallUnsubscribe_AlreadyUnsubscribed( UintThreadID(), self );
422
|
423 krisbash 1.2 trace_SubMgrSubscription_ShouldCallUnsubscribe(UintThreadID(), self, shouldunsubscribe);
|
424 krisbash 1.1
425 return shouldunsubscribe;
426 }
427
428 _Use_decl_annotations_
429 MI_Boolean SubMgrSubscription_ShouldSendFinalMsg(
430 SubMgrSubscription* self)
431 {
432 MI_Boolean shouldsendfinalmsg = MI_FALSE;
433 if ( Atomic_CompareAndSwap(&self->finalmsgbit, 0, 1) == 0 )
434 shouldsendfinalmsg = MI_TRUE;
435 else
436 trace_SubMgrSubscription_ShouldSendFinalMsg_AlreadySent( UintThreadID(), self );
437
438 trace_SubMgrSubscription_ShouldSendFinalMsg(UintThreadID(), self, shouldsendfinalmsg);
439
440 return shouldsendfinalmsg;
441 }
442
443 _Use_decl_annotations_
444 MI_Boolean SubMgrSubscription_CancelStarted(
445 krisbash 1.1 SubMgrSubscription* self)
446 {
447 MI_Boolean cancelstarted = MI_FALSE;
448 if ( ReadWithFence(&self->unsubscribebit) == 1 )
449 cancelstarted = MI_TRUE;
450 else if ( ReadWithFence(&self->finalmsgbit) == 1 )
451 cancelstarted = MI_TRUE;
452 return cancelstarted;
453 }
454
455 /* Validate the filter using the provided class decl. */
456 _Use_decl_annotations_
457 MI_Boolean SubMgrSubscription_IsQueryValid(
458 SubMgrSubscription* self,
459 const MI_ClassDecl* cd )
460 {
461 WQL* wql = InstanceFilter_GetWQL(self->filter);
462 if (wql)
463 {
464 if (WQL_Validate(wql, cd) == 0)
465 {
466 krisbash 1.1 return MI_TRUE;
467 }
468 trace_QueryValidationFailed(tcs(wql->text));
469 }
470 return MI_FALSE;
471 }
472
473 /*
474 * There are 3 places needs to take enablelock
475 * 1- Subscribe request
476 * 2- post result on aggregation context
477 * 3- last subscription was removed
478 *
479 * For 2 & 3, enablebit to make sure only take the lock once
480 *
481 * parameters:
482 * terminate -- MI_TRUE means being called by Provider_TerminateIndication
483 *
484 * return value:
485 * MI_TRUE -- acquired the lock, otherwise no lock acquired
486 *
487 krisbash 1.1 */
488 const TChar* _OpNames[] = {
489 PAL_T("SubscribeProvider"),
490 PAL_T("DisableProvider"),
491 PAL_T("Terminate")
492 };
493
494 _Use_decl_annotations_
495 MI_Boolean SubMgr_AcquireEnableLock(
496 SubscriptionManager* self,
497 AcquireEnableLockOperationType optype)
498 {
499 trace_SubscriptionManager_AcquireEnableLock_Start(UintThreadID(), (void*)self, _OpNames[(int)optype]);
500
501 RecursiveLock_Acquire( &self->enablelock );
502
503 switch( optype )
504 {
505 case AcquireFromSubscribe:
506 if ( MI_TRUE == SubMgr_IsTerminating( self ) )
507 {
508 krisbash 1.1 trace_SubscriptionManager_AcquireEnableLock_AlreadyTerminated( UintThreadID(), self );
509 RecursiveLock_Release( &self->enablelock );
510 return MI_FALSE;
511 }
512 break;
513 case AcquireFromDisable:
514 if ( MI_FALSE == SubMgr_IsSubListEmpty( self ) )
515 {
516 if ( MI_FALSE == SubMgr_IsTerminating( self ) )
517 {
518 trace_SubscriptionManager_AcquireEnableLock_IgnoreDisableCall( UintThreadID(), self );
519 RecursiveLock_Release( &self->enablelock );
520 return MI_FALSE;
521 }
522 else
523 DEBUG_ASSERT (0);
524 }
525 break;
526 case AcquireFromTerminate:
527 if ( MI_FALSE == SubMgr_IsTerminating( self ) )
528 {
529 krisbash 1.1 SubMgr_SetTerminating( self, MI_TRUE );
530 if ( MI_FALSE == SubMgr_IsSubListEmpty( self ) )
531 {
532 trace_SubscriptionManager_AcquireEnableLock_CancelAll( UintThreadID(), self );
533 SubMgr_CancelAllSubscriptions( self, MI_RESULT_FAILED, NULL, NULL );
534 }
535 /* no subscription added yet, can happen during first subscribe */
536 else
537 {
538 Provider_InvokeDisable( self->provider );
539 }
540 }
541 if ( MI_TRUE == SubMgr_IsEnabled(self) )
542 SubMgr_SetEnabled( self, MI_FALSE );
543 break;
544 }
545
546 trace_SubscriptionManager_AcquireEnableLock_Complete(UintThreadID(), (void*)self, _OpNames[(int)optype]);
547
548 return MI_TRUE;
549 }
550 krisbash 1.1
551 _Use_decl_annotations_
552 void SubMgr_ReleaseEnableLock(
553 SubscriptionManager* self)
554 {
555 trace_SubscriptionManager_ReleaseEnableLock(UintThreadID(), (void*)self);
556 RecursiveLock_Release(&self->enablelock);
557 }
558
559 _Use_decl_annotations_
560 AggregationContext* SubMgr_CreateAggrContext(
561 SubscriptionManager* self)
562 {
563 MI_Result result;
564 Provider* provider = self->provider;
565 AggregationContext* aggrContext = AggrContext_New();
566 DEBUG_ASSERT( !self->aggrCtx );
567
568 if (!aggrContext)
569 {
570 trace_OutOfMemory();
571 krisbash 1.1 return NULL;
572 }
573
574 result = AggrContext_Init(aggrContext, provider, self);
575 if (MI_RESULT_OK != result)
576 {
577 trace_AggregationContext_InitFailed();
578 AggrContext_Delete(aggrContext);
|
579 krisbash 1.2 return NULL;
|
580 krisbash 1.1 }
581
582 self->aggrCtx = aggrContext;
583 return aggrContext;
584 }
585
586 _Use_decl_annotations_
587 AggregationContext* SubMgr_RemoveAggrContext(
588 SubscriptionManager* self)
589 {
590 AggregationContext* aggrctx = self->aggrCtx;
591 if ( aggrctx )
592 self->aggrCtx = NULL;
593 return aggrctx;
594 }
595
596 _Use_decl_annotations_
597 MI_Boolean SubMgr_IsSubListEmpty(
598 SubscriptionManager* mgr )
599 {
600 return (mgr->subscrList.count == 0);
601 krisbash 1.1 }
602
603 _Use_decl_annotations_
604 MI_Boolean SubMgr_IsEnabled(
605 const SubscriptionManager* mgr )
606 {
607 return mgr->enabled;
608 }
609
610 _Use_decl_annotations_
611 void SubMgr_SetEnabled(
612 SubscriptionManager* mgr,
613 MI_Boolean enabled)
614 {
615 mgr->enabled = enabled;
616 }
617
618 _Use_decl_annotations_
619 MI_Boolean SubMgr_IsTerminating(
620 const SubscriptionManager* mgr )
621 {
622 krisbash 1.1 return mgr->terminating;
623 }
624
625 _Use_decl_annotations_
626 void SubMgr_SetTerminating(
627 SubscriptionManager* mgr,
628 MI_Boolean terminating)
629 {
630 mgr->terminating = terminating;
631 }
632
633 _Use_decl_annotations_
634 void SubMgr_SetAllCancelledSafe(
635 SubscriptionManager* mgr,
636 MI_Boolean allcancelled)
637 {
638 DEBUG_ASSERT ( mgr );
639 SubscriptionList_SetAllCancelledSafe( &mgr->subscrList, allcancelled );
640 }
641
642 _Use_decl_annotations_
643 krisbash 1.1 void SubMgr_SetEnableThread(
644 SubscriptionManager* mgr)
645 {
646 mgr->enableThreadID = Thread_ID();
647 }
648
649 _Use_decl_annotations_
650 MI_Boolean SubMgr_IsEnableThread(
651 const SubscriptionManager* mgr)
652 {
653 ThreadID threadId = Thread_ID();
654 return Thread_Equal( &threadId, (ThreadID*)&mgr->enableThreadID ) ? MI_TRUE : MI_FALSE;
655 }
656
657 _Use_decl_annotations_
658 MI_Boolean SubMgr_CanPostIndication(
659 const SubscriptionManager* mgr)
660 {
661 MI_Boolean isEnableThread = SubMgr_IsEnableThread(mgr);
662 MI_Boolean canPostIndication = isEnableThread ? MI_FALSE : MI_TRUE;
663 if ( MI_FALSE == canPostIndication )
664 krisbash 1.1 trace_SubMgr_CanPostIndication_Fail( UintThreadID() );
665 return canPostIndication;
666 }
667
668 static MI_Result _SubscriptionList_EnsureArray(
669 _In_ SubscriptionList* self)
670 {
671 size_t capacity = self->capacity;
|
672 krisbash 1.2 SubMgrSubscriptionPtr* subarray = NULL;
|
673 krisbash 1.1 if ( self->count <= capacity )
674 return MI_RESULT_OK;
675
676 if ( capacity == 0 )
677 capacity = 32;
678 else
679 capacity *= 2;
680
681 if ( capacity < self->count )
682 {
683 /* integer overflow */
684 trace_SubscriptionList_EnsureArray_Overflow( UintThreadID() );
685 return MI_RESULT_FAILED;
686 }
687
688 if ( self->subarray )
689 {
690 PAL_Free(self->subarray);
691 self->subarray = NULL;
692 self->capacity = 0;
693 }
694 krisbash 1.1
|
695 krisbash 1.2 size_t allocSize = 0;
696 if (SizeTMult(sizeof (SubMgrSubscriptionPtr), capacity, &allocSize) == S_OK)
697 {
698 subarray = (SubMgrSubscriptionPtr*)PAL_Malloc( allocSize );
699 }
700
|
701 krisbash 1.1 if (subarray == NULL)
702 {
703 trace_OutOfMemory();
704 return MI_RESULT_SERVER_LIMITS_EXCEEDED;
705 }
706
707 self->subarray = subarray;
708 self->capacity = capacity;
709 return MI_RESULT_OK;
710 }
711
712 _Use_decl_annotations_
713 void SubscriptionList_Init(
714 SubscriptionList* self)
715 {
716 memset( self, 0, sizeof( SubscriptionList ) );
717 ReadWriteLock_Init( &self->lock );
718 }
719
720 _Use_decl_annotations_
721 void SubscriptionList_Finalize(
722 krisbash 1.1 SubscriptionList* self)
723 {
724 if ( self->subarray )
725 {
726 PAL_Free(self->subarray);
727 self->subarray = NULL;
728 self->capacity = 0;
729 }
730 }
731
732 _Use_decl_annotations_
733 void SubscriptionList_AddSubscription(
734 SubscriptionList* self,
735 SubMgrSubscription* subscription)
736 {
737 DEBUG_ASSERT ( self );
738 /* thread safely add subscription to list */
739 ReadWriteLock_AcquireWrite(&self->lock);
740 DEBUG_ASSERT ( MI_FALSE == self->allcancelled );
741 List_Append( &self->head, &self->tail, (ListElem*)subscription );
742 SubMgrSubscription_Addref(subscription);
743 krisbash 1.1 self->count++;
744 ReadWriteLock_ReleaseWrite(&self->lock);
745 }
746
747 _Use_decl_annotations_
748 MI_Result SubscriptionList_DeleteSubscription(
749 SubscriptionList* self,
750 SubMgrSubscription* subscription,
751 SubscriptionManager* mgr )
752 {
753 SubMgrSubscription* sub;
754 /* thread safely remove the subscription from list */
755 ReadWriteLock_AcquireWrite( &self->lock );
756 sub = (SubMgrSubscription*) self->head;
757 /* find the subscription first */
758 while( sub )
759 {
760 if ( sub == subscription )
761 break;
762 sub = sub->next;
763 }
764 krisbash 1.1
765 /* Remove from the list if found */
766 if ( sub )
767 {
768 List_Remove( &self->head, &self->tail, (ListElem*)sub );
769 self->count--;
770
771 /* Done after remove so that it can operate on the updated list */
772 if (mgr->lifecycleCtx)
773 {
774 LifeContext_UpdateSupportedTypes(mgr->lifecycleCtx);
775 }
776 }
777
778 ReadWriteLock_ReleaseWrite( &self->lock );
779
780 if (sub)
781 {
782 SubMgrSubscription_Release(sub);
783 return MI_RESULT_OK;
784 }
785 krisbash 1.1 return MI_RESULT_NOT_FOUND;
786 }
787
788 _Use_decl_annotations_
789 MI_Result SubscriptionList_CancelAllSubscription(
790 SubscriptionList* self )
791 {
792 SubMgrSubscriptionPtr* sublist;
793 size_t i, count;
794 MI_Result r;
795
796 DEBUG_ASSERT ( MI_FALSE == self->allcancelled );
797
798 r = SubscriptionList_GetList( self, MI_TRUE, &sublist, &count );
799 if ( r != MI_RESULT_OK )
800 return r;
801
802 SubscriptionList_SetAllCancelledSafe( self, MI_TRUE );
803
804 for ( i = 0; i < count; i++ )
805 {
806 krisbash 1.1 /*TODO: what if a subscription was being cancelled. confirm */
807 Strand_ScheduleCancel( &sublist[i]->subscribeCtx->baseCtx.strand );
808 SubMgrSubscription_Release( sublist[i] );
809 }
810
811 return MI_RESULT_OK;
812 }
813
814 _Use_decl_annotations_
815 MI_Result SubscriptionList_GetList(
816 const SubscriptionList* self,
817 MI_Boolean addref,
818 SubMgrSubscriptionPtr** subs,
819 size_t* count)
820 {
821 MI_Result r;
822 SubscriptionList* list = (SubscriptionList*)self;
823
824 *subs = NULL;
825 *count = 0;
826
827 krisbash 1.1 //
828 // thread safely read subscriptions
829 //
830 ReadWriteLock_AcquireRead( &list->lock );
831
832 r = _SubscriptionList_EnsureArray( list );
833
834 if ( r == MI_RESULT_OK )
835 {
836 size_t i = 0;
837 SubMgrSubscription* subscription = (SubMgrSubscription*) list->head;
838 while(subscription)
839 {
840 if ( MI_TRUE == addref )
841 SubMgrSubscription_Addref( subscription );
842 list->subarray[i++] = subscription;
843 subscription = subscription->next;
844 }
845 DEBUG_ASSERT( i == list->count );
846 *count = list->count;
847 *subs = list->subarray;
848 krisbash 1.1 }
849
850 ReadWriteLock_ReleaseRead( &list->lock );
851
852 return r;
853 }
854
855 _Use_decl_annotations_
856 void SubscriptionList_SetAllCancelled(
857 SubscriptionList* self,
858 MI_Boolean allcancelled)
859 {
860 self->allcancelled = allcancelled;
861 }
862
863 void SubscriptionList_SetAllCancelledSafe(
864 _Inout_ SubscriptionList* self,
865 _In_ MI_Boolean allcancelled)
866 {
867 ReadWriteLock_AcquireWrite( &self->lock );
868 SubscriptionList_SetAllCancelled(self, allcancelled);
869 krisbash 1.1 ReadWriteLock_ReleaseWrite( &self->lock );
870 }
871
|