1 venkat.puvvada 1.1 //%LICENSE////////////////////////////////////////////////////////////////
2 //
3 // Licensed to The Open Group (TOG) under one or more contributor license
4 // agreements. Refer to the OpenPegasusNOTICE.txt file distributed with
5 // this work for additional information regarding copyright ownership.
6 // Each contributor licenses this file to you under the OpenPegasus Open
7 // Source License; you may not use this file except in compliance with the
8 // License.
9 //
10 // Permission is hereby granted, free of charge, to any person obtaining a
11 // copy of this software and associated documentation files (the "Software"),
12 // to deal in the Software without restriction, including without limitation
13 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 // and/or sell copies of the Software, and to permit persons to whom the
15 // Software is furnished to do so, subject to the following conditions:
16 //
17 // The above copyright notice and this permission notice shall be included
18 // in all copies or substantial portions of the Software.
19 //
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 venkat.puvvada 1.1 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23 // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
24 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
25 // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
26 // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //////////////////////////////////////////////////////////////////////////
29 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include <Pegasus/Common/Time.h>
33 #include <Pegasus/Common/Tracer.h>
34 #include <Pegasus/Common/Constants.h>
35 #include <Pegasus/Common/StringConversion.h>
|
36 karl 1.10 #include <Pegasus/Common/MessageQueueService.h>
|
37 venkat.puvvada 1.1 #include <Pegasus/Config/ConfigManager.h>
38 #include <Pegasus/Provider/CIMOMHandle.h>
39 #include "DestinationQueue.h"
40
41 PEGASUS_NAMESPACE_BEGIN
42
43 // Initialize with default values.
44 Uint16 DestinationQueue::_maxDeliveryRetryAttempts = 3;
45 Uint64 DestinationQueue::_minDeliveryRetryIntervalUsec = 20 * 1000000;
|
46 karl 1.10 Uint64 DestinationQueue::_minSubscriptionRemovalTimeIntervalUsec
47 = 2592000 * 1000000ULL; // Default 30 days, Refer DSP1054.
|
48 venkat.puvvada 1.1 Uint64 DestinationQueue::_sequenceIdentifierLifetimeUsec = 600 * 1000000;
|
49 karl 1.10
|
50 venkat.puvvada 1.1 String DestinationQueue::_indicationServiceName = "PG:IndicationService";
51 String DestinationQueue::_objectManagerName = "Pegasus";
|
52 karl 1.10 Uint32 DestinationQueue::_indicationServiceQid;
|
53 venkat.puvvada 1.1
|
54 venkat.puvvada 1.6 DestinationQueue::IndDiscardedReasonMsgs
55 DestinationQueue::indDiscardedReasonMsgs[] = {
56 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_LD_DELETED",
57 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\" was"
58 " not delivered due to the corresponding listener destination"
59 " instance was removed."},
60
61 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_"
62 "SUBSCRIPTION_DELETED",
63 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\" was"
64 " not delivered due to the corresponding subscription was disabled"
65 " or deleted."},
66
67 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_"
68 "DESTINATIONQUEUE_FULL",
69 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
70 " was discarded due to the destination queue was full."},
71
72 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_SIL_EXPIRED",
73 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
74 " was not delivered due to the sequence identifier lifetime expired."},
75 venkat.puvvada 1.6
76 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_DRA_EXCEEDED",
77 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
78 " was not delivered due to the maximum delivery retry"
79 " attempts exceeded. Exception : $2"},
80
81 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_CIMSERVER_SHUTDOWN",
82 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
83 " was not delivered due to the cimserver shutdown."}
|
84 venkat.puvvada 1.1 };
85
86 Uint64 DestinationQueue::_serverStartupTimeUsec
87 = System::getCurrentTimeUsec();
88
89 Boolean DestinationQueue::_initialized = false;
90 static Mutex _intializeMutex;
91
92 CIMInstance DestinationQueue::_getInstance(const CIMName &className)
93 {
94 CIMOMHandle cimomHandle;
95 Array<CIMInstance> instances =
96 cimomHandle.enumerateInstances(
97 OperationContext(),
98 PEGASUS_NAMESPACENAME_INTEROP,
99 className,
100 true,
101 false,
102 false,
103 false,
104 CIMPropertyList());
105 venkat.puvvada 1.1
106 return instances[0];
107 }
108
109 void DestinationQueue::_initIndicationServiceProperties()
110 {
111 PEG_METHOD_ENTER(TRC_IND_HANDLER,
112 "DestinationQueue::_initIndicationServiceProperties");
113
114 CIMInstance instance =
115 _getInstance(PEGASUS_CLASSNAME_CIM_INDICATIONSERVICE);
116
117 instance.getProperty(
118 instance.findProperty(
119 _PROPERTY_DELIVERY_RETRYATTEMPTS)).getValue().get(
120 _maxDeliveryRetryAttempts);
121
122 instance.getProperty(
123 instance.findProperty(
124 PEGASUS_PROPERTYNAME_NAME)).getValue().get(
125 _indicationServiceName);
126 venkat.puvvada 1.1
127 CIMValue value = instance.getProperty(
128 instance.findProperty(
129 _PROPERTY_DELIVERY_RETRYINTERVAL)).getValue();
130
131 if (value.getType() == CIMTYPE_UINT32)
132 {
133 Uint32 tval;
134 value.get(tval);
135 _minDeliveryRetryIntervalUsec = Uint64(tval) * 1000000;
136 }
137 else
138 {
139 value.get(_minDeliveryRetryIntervalUsec);
140 _minDeliveryRetryIntervalUsec*= 1000000;
141 }
142 // See DSP 1054 ver 1.1.0 Sec 7.10
143 _sequenceIdentifierLifetimeUsec = _maxDeliveryRetryAttempts *
144 _minDeliveryRetryIntervalUsec * 10;
|
145 karl 1.10
146 Uint32 subRemoveIntervalValue;
147 instance.getProperty(
148 instance.findProperty(
149 _PROPERTY_SUBSCRIPTIONREMOVALTIMEINTERVAL)).getValue().
150 get(subRemoveIntervalValue);
151
152 _minSubscriptionRemovalTimeIntervalUsec =
153 subRemoveIntervalValue * 1000000ULL;
154
|
155 venkat.puvvada 1.1 PEG_METHOD_EXIT();
156 }
157
158 void DestinationQueue::_initObjectManagerProperties()
159 {
160 PEG_METHOD_ENTER(TRC_IND_HANDLER,
161 "DestinationQueue::_initObjectManagerProperties");
162
163 CIMInstance instance =
164 _getInstance(PEGASUS_CLASSNAME_PG_OBJECTMANAGER);
165
166 instance.getProperty(
167 instance.findProperty(
168 PEGASUS_PROPERTYNAME_NAME)).getValue().get(
169 _objectManagerName);
170 PEG_METHOD_EXIT();
171 }
172
173 DestinationQueue::DestinationQueue(
174 const CIMInstance &handler):_handler(handler)
175 {
176 venkat.puvvada 1.1 PEG_METHOD_ENTER(TRC_IND_HANDLER,
177 "DestinationQueue::DestinationQueue");
178
179 if (!_initialized)
180 {
181 AutoMutex mtx(_intializeMutex);
182 if (!_initialized)
183 {
184 try
185 {
186 PEG_TRACE_CSTRING(TRC_IND_HANDLER, Tracer::LEVEL4,
187 "Initializaing the Destination Queue");
188 _initIndicationServiceProperties();
189 _initObjectManagerProperties();
|
190 karl 1.10 _indicationServiceQid = MessageQueueService::find_service_qid(
191 PEGASUS_QUEUENAME_INDICATIONSERVICE);
|
192 venkat.puvvada 1.1 }
193 catch(const Exception &e)
194 {
195 PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
196 "Exception %s caught while initializing the "
197 "DestinationQueue, using default values.",
198 (const char*)e.getMessage().getCString()));
199 }
200 catch(...)
201 {
202 PEG_TRACE_CSTRING(TRC_IND_HANDLER,Tracer::LEVEL1,
203 "Unknown exception caught while initializing the "
204 "DestinationQueue, using default values.");
205 }
206 _initialized = true;
207 }
208 }
209
210 // Build the sequence context
211 _sequenceContext = _indicationServiceName;
212 _sequenceContext.append("-");
213 venkat.puvvada 1.1 _sequenceContext.append(_objectManagerName);
214 _sequenceContext.append("-");
215
|
216 venkat.puvvada 1.8 _connection = 0;
217
|
218 venkat.puvvada 1.1 Uint32 len = 0;
219 char buffer[22];
220 const char* ptr = Uint64ToString(buffer, _serverStartupTimeUsec,len);
221 _sequenceContext.append(String(ptr, len));
222 _sequenceContext.append("-");
223
224 Uint32 idx = handler.findProperty(
225 PEGASUS_PROPERTYNAME_LSTNRDST_CREATIONTIME);
226
227 if (idx != PEG_NOT_FOUND)
228 {
229 Uint64 tvalue;
230 handler.getProperty(idx).getValue().get(tvalue);
231 Uint32 llen = 0;
232 char lbuffer[22];
233 const char* lptr = Uint64ToString(lbuffer, tvalue, llen);
234 _sequenceContext.append(String(lptr, llen));
235 }
236 else
237 {
238 _sequenceContext.append(String(ptr, len));
239 venkat.puvvada 1.1 }
240 _lastDeliveryRetryStatus = FAIL;
241
242 _sequenceNumber = 0;
243 _queueFullDroppedIndications = 0;
244 _lifetimeExpiredIndications = 0;
245 _retryAttemptsExceededIndications = 0;
246 _subscriptionDeleteDroppedIndications = 0;
247 _calcMaxQueueSize = true;
248 _maxIndicationDeliveryQueueSize = 2400;
249
|
250 karl 1.10 _lastSuccessfulDeliveryTimeUsec =
251 _queueCreationTimeUsec = System::getCurrentTimeUsec();
252
|
253 venkat.puvvada 1.1 PEG_METHOD_EXIT();
254 }
255
256 DestinationQueue::~DestinationQueue()
257 {
258 PEG_METHOD_ENTER(TRC_IND_HANDLER,
259 "DestinationQueue::~DestinationQueue");
260
|
261 marek 1.2 if (_queue.size() || _lastDeliveryRetryStatus == PENDING)
|
262 venkat.puvvada 1.1 {
263 _cleanup(LISTENER_NOT_ACTIVE);
264 }
|
265 venkat.puvvada 1.8 delete _connection;
|
266 venkat.puvvada 1.1
267 PEG_METHOD_EXIT();
268 }
269
270 Sint64 DestinationQueue::getSequenceNumber()
271 {
272 AutoMutex mtx(_queueMutex);
273
274 // Determine max queue size, See PEP 324 for the algorithm
275 if (_calcMaxQueueSize)
276 {
277 if ((System::getCurrentTimeUsec() - _queueCreationTimeUsec)
278 >= _sequenceIdentifierLifetimeUsec)
279 {
280 // (10 * DeliveryRetryInterval * DeliveryRetryAttempts) /
281 // (Number of indications arrived over
282 // sequence-identifier-lifetime.)
283
284 _maxIndicationDeliveryQueueSize = _sequenceNumber;
285
286 if (_maxIndicationDeliveryQueueSize < 200)
287 venkat.puvvada 1.1 {
288 _maxIndicationDeliveryQueueSize = 200;
289 }
290 else if (_maxIndicationDeliveryQueueSize > 2400)
291 {
292 _maxIndicationDeliveryQueueSize = 2400;
293 }
294 _calcMaxQueueSize = false;
295 }
296 }
297
298 Sint64 nextSequenceNumber = _sequenceNumber++;
299
300 if (_sequenceNumber < 0)
301 {
302 _sequenceNumber = 0;
303 }
304
305 return nextSequenceNumber;
306 }
307
308 venkat.puvvada 1.1 String DestinationQueue::_getSequenceContext(
309 const CIMInstance &indication)
310 {
311 String sequenceContext;
312
313 indication.getProperty(
314 indication.findProperty(
315 _PROPERTY_SEQUENCECONTEXT)).getValue().get(sequenceContext);
316
317 return sequenceContext;
318 }
319
320 Sint64 DestinationQueue::_getSequenceNumber(
321 const CIMInstance &indication)
322 {
323 Sint64 sequenceNumber;
324
325 indication.getProperty(
326 indication.findProperty(
327 _PROPERTY_SEQUENCENUMBER)).getValue().get(sequenceNumber);
328
329 venkat.puvvada 1.1 return sequenceNumber;
330 }
331
|
332 venkat.puvvada 1.4 void DestinationQueue::_logDiscardedIndication(
333 Uint32 reasonCode,
334 const CIMInstance &indication,
335 const String &message)
|
336 venkat.puvvada 1.1 {
337 PEGASUS_ASSERT(reasonCode <
|
338 venkat.puvvada 1.6 sizeof(indDiscardedReasonMsgs)/sizeof(IndDiscardedReasonMsgs));
|
339 venkat.puvvada 1.1
|
340 venkat.puvvada 1.6 if (reasonCode == DRA_EXCEEDED)
341 {
342 Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING,
343 MessageLoaderParms(
344 indDiscardedReasonMsgs[reasonCode].key,
345 indDiscardedReasonMsgs[reasonCode].msg,
346 (const char*)_getSequenceContext(indication).getCString(),
347 _getSequenceNumber(indication),
348 (const char*)message.getCString()));
349 }
350 else
351 {
352 Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING,
353 MessageLoaderParms(
354 indDiscardedReasonMsgs[reasonCode].key,
355 indDiscardedReasonMsgs[reasonCode].msg,
356 (const char*)_getSequenceContext(indication).getCString(),
357 _getSequenceNumber(indication)));
|
358 karl 1.10
|
359 venkat.puvvada 1.6 }
|
360 venkat.puvvada 1.1 }
361
|
362 marek 1.2 void DestinationQueue::enqueue(CIMHandleIndicationRequestMessage *message)
|
363 venkat.puvvada 1.1 {
364 PEG_METHOD_ENTER(TRC_IND_HANDLER,
365 "DestinationQueue::enqueue");
366
|
367 marek 1.2 Uint32 idx;
368 CIMProperty prop;
369 CIMInstance &indication = message->indicationInstance;
370
371 if ((idx = indication.findProperty(_PROPERTY_SEQUENCECONTEXT))
372 != PEG_NOT_FOUND)
373 {
374 prop = indication.getProperty(idx);
375 prop.setValue(getSequenceContext());
376 indication.removeProperty(idx);
377 }
378 else
379 {
380 prop = CIMProperty(
381 _PROPERTY_SEQUENCECONTEXT,
382 getSequenceContext());
383 }
384 indication.addProperty(prop);
385
|
386 venkat.puvvada 1.1 AutoMutex mtx(_queueMutex);
|
387 marek 1.2 Sint64 sequenceNumber = getSequenceNumber();
388 if ((idx = indication.findProperty(_PROPERTY_SEQUENCENUMBER))
389 != PEG_NOT_FOUND)
390 {
391 prop = indication.getProperty(idx);
392 prop.setValue(sequenceNumber);
393 indication.removeProperty(idx);
394 }
395 else
396 {
397 prop = CIMProperty(
398 _PROPERTY_SEQUENCENUMBER,
399 sequenceNumber);
400 }
401 indication.addProperty(prop);
402
|
403 venkat.puvvada 1.7 DeliveryStatusAggregator *aggregator = 0;
404 if (message->deliveryStatusAggregator &&
405 message->deliveryStatusAggregator->waitUntilDelivered)
406 {
407 aggregator = message->deliveryStatusAggregator;
408 }
409
|
410 marek 1.2 IndicationInfo *info = new IndicationInfo(
411 message->indicationInstance,
412 message->subscriptionInstance,
413 message->operationContext,
414 message->nameSpace.getString(),
|
415 venkat.puvvada 1.5 this,
|
416 venkat.puvvada 1.7 aggregator);
|
417 venkat.puvvada 1.5
|
418 venkat.puvvada 1.1 _queue.insert_back(info);
419
|
420 marek 1.2 info->lastDeliveryRetryTimeUsec = 0;
421 info->arrivalTimeUsec = System::getCurrentTimeUsec();
|
422 venkat.puvvada 1.1
423 if (_queue.size() > _maxIndicationDeliveryQueueSize)
424 {
425 _queueFullDroppedIndications++;
426 IndicationInfo *temp = _queue.remove_front();
|
427 venkat.puvvada 1.4 _logDiscardedIndication(
|
428 venkat.puvvada 1.1 DESTINATIONQUEUE_FULL,
429 temp->indication);
430 delete temp;
431 }
|
432 marek 1.2
|
433 venkat.puvvada 1.1 PEG_METHOD_EXIT();
434 }
435
436 void DestinationQueue::updateDeliveryRetrySuccess(IndicationInfo *info)
437 {
438 PEG_METHOD_ENTER(TRC_IND_HANDLER,
439 "DestinationQueue::updateDeliveryRetrySuccess");
440
441 AutoMutex mtx(_queueMutex);
|
442 venkat.puvvada 1.8
|
443 venkat.puvvada 1.1 PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
444 _lastSuccessfulDeliveryTimeUsec = System::getCurrentTimeUsec();
445 _lastDeliveryRetryStatus = SUCCESS;
446
447 PEG_TRACE((TRC_IND_HANDLER, Tracer::LEVEL4,
448 "Indication with SequenceContext %s and SequenceNumber %"
449 PEGASUS_64BIT_CONVERSION_WIDTH "d is successfully delivered",
450 (const char*)_getSequenceContext(info->indication).getCString(),
451 _getSequenceNumber(info->indication)));
452
453 delete info;
454
455 PEG_METHOD_EXIT();
456 }
457
458 void DestinationQueue::updateDeliveryRetryFailure(
459 IndicationInfo *info,
460 const CIMException &e)
461 {
462 PEG_METHOD_ENTER(TRC_IND_HANDLER,
463 "DestinationQueue::updateDeliveryRetryFailure");
464 venkat.puvvada 1.1
465 AutoMutex mtx(_queueMutex);
466
|
467 venkat.puvvada 1.8 // We should not have any connection object here because indication
468 // delivery has failed.
469 PEGASUS_ASSERT(!_connection);
470
|
471 venkat.puvvada 1.1 PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
472 _lastDeliveryRetryStatus = FAIL;
473 info->deliveryRetryAttemptsMade++;
474
|
475 karl 1.10 // If the last successful delivery time is greater than or equal to
476 // SubscriptionRemovalTimeInterval, send message to indication service
477 // to reconcile OnFatalErrorPolicy
478 if (System::getCurrentTimeUsec() - _lastSuccessfulDeliveryTimeUsec >=
479 _minSubscriptionRemovalTimeIntervalUsec)
480 {
481 CIMProcessIndicationResponseMessage *response =
482 new CIMProcessIndicationResponseMessage(
483 XmlWriter::getNextMessageId(),
484 CIMException(CIM_ERR_FAILED),
485 QueueIdStack(_indicationServiceQid),
486 String(),
487 info->subscription);
488 response->dest = _indicationServiceQid;
489 MessageQueueService::SendForget(response);
490 }
491
|
492 marek 1.2 // Check for DeliveryRetryAttempts by adding the original delivery attempt.
493 if (info->deliveryRetryAttemptsMade >= _maxDeliveryRetryAttempts + 1)
|
494 venkat.puvvada 1.1 {
495 _retryAttemptsExceededIndications++;
|
496 venkat.puvvada 1.4 _logDiscardedIndication(
|
497 venkat.puvvada 1.1 DRA_EXCEEDED,
|
498 venkat.puvvada 1.4 info->indication,
499 e.getMessage());
|
500 venkat.puvvada 1.1 delete info;
501 }
|
502 marek 1.2 else if (_queue.size() >= _maxIndicationDeliveryQueueSize)
|
503 venkat.puvvada 1.1 {
504 _queueFullDroppedIndications++;
|
505 venkat.puvvada 1.4 _logDiscardedIndication(
|
506 venkat.puvvada 1.1 DESTINATIONQUEUE_FULL,
507 info->indication);
508 delete info;
509 }
510 else
511 {
|
512 marek 1.2 // To deliver the indications in the correct order, insert the
513 // delivery retry failed indications at the front of the queue.
514 _queue.insert_front(info);
|
515 venkat.puvvada 1.4 PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
516 "Delivery failure for indication with SequenceContext %s and "
517 "SequenceNumber %" PEGASUS_64BIT_CONVERSION_WIDTH "d."
518 " DeliveryRetryAttempts made %u. Exception : %s",
519 (const char*)_getSequenceContext(info->indication).getCString(),
520 _getSequenceNumber(info->indication),
521 info->deliveryRetryAttemptsMade,
522 (const char*)e.getMessage().getCString()));
|
523 venkat.puvvada 1.1 info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec();
524 }
525
526 PEG_METHOD_EXIT();
527 }
528
529 void DestinationQueue::_waitForNonPendingDeliveryStatus()
530 {
531 PEG_METHOD_ENTER(TRC_IND_HANDLER,
532 "DestinationQueue::_waitForNonPendingDeliveryStatus");
533
534 while (true)
535 {
536 {
537 AutoMutex mtx(_queueMutex);
538 if (_lastDeliveryRetryStatus != PENDING)
539 {
540 break;
541 }
542 }
543 Threads::yield();
544 venkat.puvvada 1.1 Threads::sleep(50);
545 }
546 PEG_METHOD_EXIT();
547 }
548
549 void DestinationQueue::deleteMatchedIndications(
550 const CIMObjectPath &subscriptionPath)
551 {
552 PEG_METHOD_ENTER(TRC_IND_HANDLER,
553 "DestinationQueue::deleteMatchedIndications");
554
555 _waitForNonPendingDeliveryStatus();
556
557 IndicationInfo *info;
558 AutoMutex mtx(_queueMutex);
559
560 for(Uint32 i = 0, n = _queue.size(); i < n; ++i)
561 {
562 info = _queue.remove_front();
563 if (info->subscription.getPath().identical(subscriptionPath))
564 {
565 venkat.puvvada 1.1 _subscriptionDeleteDroppedIndications++;
|
566 venkat.puvvada 1.4 _logDiscardedIndication(
|
567 venkat.puvvada 1.1 SUBSCRIPTION_NOT_ACTIVE,
568 info->indication);
569 delete info;
570 }
571 else
572 {
573 _queue.insert_back(info);
574 }
575 }
576 PEG_METHOD_EXIT();
577 }
578
579 void DestinationQueue::cleanup()
580 {
581 PEG_METHOD_ENTER(TRC_IND_HANDLER,
582 "DestinationQueue::cleanup");
583
584 _cleanup(LISTENER_NOT_ACTIVE);
585
586 PEG_METHOD_EXIT();
587 }
588 venkat.puvvada 1.1
589 void DestinationQueue::shutdown()
590 {
591 PEG_METHOD_ENTER(TRC_IND_HANDLER,
592 "DestinationQueue::shutdown");
593
594 _cleanup(CIMSERVER_SHUTDOWN);
595
596 PEG_METHOD_EXIT();
597 }
598
599 void DestinationQueue::_cleanup(int reasonCode)
600 {
601 _waitForNonPendingDeliveryStatus();
602
603 IndicationInfo *info;
604 while ((info = _queue.remove_front()))
605 {
|
606 venkat.puvvada 1.4 _logDiscardedIndication(
|
607 venkat.puvvada 1.1 reasonCode,
608 info->indication);
609 delete info;
610 }
611 }
612
613 IndicationInfo* DestinationQueue::getNextIndicationForDelivery(
614 Uint64 &timeNowUsec, Uint64 &nextIndDRIExpTimeUsec)
615 {
616 AutoMutex mtx(_queueMutex);
617
618 if (!_queue.size() || _lastDeliveryRetryStatus == PENDING)
619 {
|
620 venkat.puvvada 1.3 // Maximum expiration time is equals to DeliveryRetryInterval.
621 nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec;
|
622 venkat.puvvada 1.8
623 // If there are no indications in the queue, delete connection.
624 if (!_queue.size() && _lastDeliveryRetryStatus != PENDING)
625 {
626 delete _connection;
627 _connection = 0;
628 }
|
629 venkat.puvvada 1.1 return 0;
630 }
631
|
632 venkat.puvvada 1.3 nextIndDRIExpTimeUsec = 0;
633
|
634 venkat.puvvada 1.1 IndicationInfo *info;
635
636 while (_queue.size())
637 {
638 info = _queue.front();
639
640 if (timeNowUsec < info->arrivalTimeUsec ||
641 timeNowUsec < info->lastDeliveryRetryTimeUsec)
642 {
643 timeNowUsec = System::getCurrentTimeUsec();
644 }
645
646 if ((timeNowUsec - info->arrivalTimeUsec) >=
647 _sequenceIdentifierLifetimeUsec)
648 {
649 _lifetimeExpiredIndications++;
650 IndicationInfo *temp = _queue.remove_front();
|
651 venkat.puvvada 1.4 _logDiscardedIndication(
|
652 venkat.puvvada 1.1 SIL_EXPIRED,
653 temp->indication);
654 delete temp;
655 }
656 else if ((timeNowUsec - info->lastDeliveryRetryTimeUsec)
657 >= _minDeliveryRetryIntervalUsec)
658 {
659 _lastDeliveryRetryStatus = PENDING;
660 _queue.remove_front();
661 IndicationInfo *temp = _queue.front();
662
|
663 marek 1.2 // The following algorithm is used to determine the elapsed
664 // DeliveryRetryAttempts. To deliver the indication in order,
665 // Server delays the newer indications until older indications
666 // in the queue are attempted for delivery and their
667 // DeliveryRetyAttempts are exceeded. The following algorithm
668 // ensures that indications won't stay in the queue more than
669 // (DeliveryRetryInterval * (DeliveryRetyAttempts + 1) time.
670 Uint32 elapsedDeliveryRetryAttempts;
671 if (info->lastDeliveryRetryTimeUsec)
672 {
673 elapsedDeliveryRetryAttempts =
|
674 venkat.puvvada 1.8 ((timeNowUsec - info->lastDeliveryRetryTimeUsec)
|
675 marek 1.2 / _minDeliveryRetryIntervalUsec);
676 }
677 else
678 {
|
679 venkat.puvvada 1.8 elapsedDeliveryRetryAttempts =
680 ((timeNowUsec - info->arrivalTimeUsec)
|
681 marek 1.2 / _minDeliveryRetryIntervalUsec);
682 }
683
684 if (elapsedDeliveryRetryAttempts)
685 {
686 info->deliveryRetryAttemptsMade +=
687 elapsedDeliveryRetryAttempts - 1;
688 }
689
|
690 venkat.puvvada 1.1 if (temp)
691 {
692 if (timeNowUsec - temp->lastDeliveryRetryTimeUsec
693 < _minDeliveryRetryIntervalUsec)
694 {
695 nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
696 (timeNowUsec - temp->lastDeliveryRetryTimeUsec);
697 }
698
699 PEGASUS_ASSERT(nextIndDRIExpTimeUsec
700 <= _minDeliveryRetryIntervalUsec);
701 }
702
703 return info;
704 }
705 else
706 {
707 if (timeNowUsec - info->lastDeliveryRetryTimeUsec
708 < _minDeliveryRetryIntervalUsec)
709 {
710 nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
711 venkat.puvvada 1.1 (timeNowUsec - info->lastDeliveryRetryTimeUsec);
712 }
713
714 PEGASUS_ASSERT(nextIndDRIExpTimeUsec
715 <= _minDeliveryRetryIntervalUsec);
716
717 break;
718 }
719 }
720
721 return 0;
722 }
723
|
724 ashok.pathak 1.9 void DestinationQueue::setDeliveryRetryAttempts( Uint16 DeliveryRetryAttempts )
725 {
726 AutoMutex mtx(_intializeMutex);
727 _maxDeliveryRetryAttempts = DeliveryRetryAttempts ;
728 _sequenceIdentifierLifetimeUsec = _maxDeliveryRetryAttempts *
729 _minDeliveryRetryIntervalUsec * 10;
730 }
731
732 void DestinationQueue::setminDeliveryRetryInterval(
733 Uint32 minDeliveryRetryInterval)
734 {
735 AutoMutex mtx(_intializeMutex);
736 _minDeliveryRetryIntervalUsec = Uint64(minDeliveryRetryInterval)*1000000 ;
737 _sequenceIdentifierLifetimeUsec = _maxDeliveryRetryAttempts *
738 _minDeliveryRetryIntervalUsec * 10;
739 }
740
|
741 venkat.puvvada 1.1 void DestinationQueue::getInfo(QueueInfo &qinfo)
742 {
743 AutoMutex mtx(_queueMutex);
744
745 qinfo.handlerName = _handler.getPath();
746 qinfo.queueCreationTimeUsec = _queueCreationTimeUsec;
747 qinfo.sequenceContext = _sequenceContext;
748 qinfo.nextSequenceNumber = _sequenceNumber;
749 qinfo.maxQueueLength = _maxIndicationDeliveryQueueSize;
750 qinfo.sequenceIdentifierLifetimeSeconds =
751 _sequenceIdentifierLifetimeUsec / 1000000;
752 qinfo.size = _queue.size();
753 qinfo.queueFullDroppedIndications = _queueFullDroppedIndications;
754 qinfo.lifetimeExpiredIndications = _lifetimeExpiredIndications;
755 qinfo.retryAttemptsExceededIndications = _retryAttemptsExceededIndications;
756 qinfo.subscriptionDisableDroppedIndications =
757 _subscriptionDeleteDroppedIndications;
|
758 karl 1.10 /* If the last successful delivery time is equals to the queue creation
759 * time, indication delivery for this destination was never successful
760 */
761 qinfo.lastSuccessfulDeliveryTimeUsec =
762 _lastSuccessfulDeliveryTimeUsec == _queueCreationTimeUsec ? 0 :
763 _lastSuccessfulDeliveryTimeUsec;
|
764 venkat.puvvada 1.1 }
765
766 PEGASUS_NAMESPACE_END
|