1 venkat.puvvada 1.1 //%LICENSE////////////////////////////////////////////////////////////////
2 //
3 // Licensed to The Open Group (TOG) under one or more contributor license
4 // agreements. Refer to the OpenPegasusNOTICE.txt file distributed with
5 // this work for additional information regarding copyright ownership.
6 // Each contributor licenses this file to you under the OpenPegasus Open
7 // Source License; you may not use this file except in compliance with the
8 // License.
9 //
10 // Permission is hereby granted, free of charge, to any person obtaining a
11 // copy of this software and associated documentation files (the "Software"),
12 // to deal in the Software without restriction, including without limitation
13 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 // and/or sell copies of the Software, and to permit persons to whom the
15 // Software is furnished to do so, subject to the following conditions:
16 //
17 // The above copyright notice and this permission notice shall be included
18 // in all copies or substantial portions of the Software.
19 //
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 venkat.puvvada 1.1 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23 // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
24 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
25 // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
26 // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //////////////////////////////////////////////////////////////////////////
29 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include <Pegasus/Common/Time.h>
33 #include <Pegasus/Common/Tracer.h>
34 #include <Pegasus/Common/Constants.h>
35 #include <Pegasus/Common/StringConversion.h>
36 #include <Pegasus/Config/ConfigManager.h>
37 #include <Pegasus/Provider/CIMOMHandle.h>
38 #include "DestinationQueue.h"
39
40 PEGASUS_NAMESPACE_BEGIN
41
42 // Initialize with default values.
43 venkat.puvvada 1.1 Uint16 DestinationQueue::_maxDeliveryRetryAttempts = 3;
44 Uint64 DestinationQueue::_minDeliveryRetryIntervalUsec = 20 * 1000000;
45 Uint64 DestinationQueue::_sequenceIdentifierLifetimeUsec = 600 * 1000000;
46 String DestinationQueue::_indicationServiceName = "PG:IndicationService";
47 String DestinationQueue::_objectManagerName = "Pegasus";
48
|
49 venkat.puvvada 1.6 DestinationQueue::IndDiscardedReasonMsgs
50 DestinationQueue::indDiscardedReasonMsgs[] = {
51 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_LD_DELETED",
52 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\" was"
53 " not delivered due to the corresponding listener destination"
54 " instance was removed."},
55
56 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_"
57 "SUBSCRIPTION_DELETED",
58 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\" was"
59 " not delivered due to the corresponding subscription was disabled"
60 " or deleted."},
61
62 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_"
63 "DESTINATIONQUEUE_FULL",
64 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
65 " was discarded due to the destination queue was full."},
66
67 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_SIL_EXPIRED",
68 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
69 " was not delivered due to the sequence identifier lifetime expired."},
70 venkat.puvvada 1.6
71 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_DRA_EXCEEDED",
72 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
73 " was not delivered due to the maximum delivery retry"
74 " attempts exceeded. Exception : $2"},
75
76 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_CIMSERVER_SHUTDOWN",
77 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
78 " was not delivered due to the cimserver shutdown."}
|
79 venkat.puvvada 1.1 };
80
81 Uint64 DestinationQueue::_serverStartupTimeUsec
82 = System::getCurrentTimeUsec();
83
84 Boolean DestinationQueue::_initialized = false;
85 static Mutex _intializeMutex;
86
87 CIMInstance DestinationQueue::_getInstance(const CIMName &className)
88 {
89 CIMOMHandle cimomHandle;
90 Array<CIMInstance> instances =
91 cimomHandle.enumerateInstances(
92 OperationContext(),
93 PEGASUS_NAMESPACENAME_INTEROP,
94 className,
95 true,
96 false,
97 false,
98 false,
99 CIMPropertyList());
100 venkat.puvvada 1.1
101 return instances[0];
102 }
103
104 void DestinationQueue::_initIndicationServiceProperties()
105 {
106 PEG_METHOD_ENTER(TRC_IND_HANDLER,
107 "DestinationQueue::_initIndicationServiceProperties");
108
109 CIMInstance instance =
110 _getInstance(PEGASUS_CLASSNAME_CIM_INDICATIONSERVICE);
111
112 instance.getProperty(
113 instance.findProperty(
114 _PROPERTY_DELIVERY_RETRYATTEMPTS)).getValue().get(
115 _maxDeliveryRetryAttempts);
116
117 instance.getProperty(
118 instance.findProperty(
119 PEGASUS_PROPERTYNAME_NAME)).getValue().get(
120 _indicationServiceName);
121 venkat.puvvada 1.1
122 CIMValue value = instance.getProperty(
123 instance.findProperty(
124 _PROPERTY_DELIVERY_RETRYINTERVAL)).getValue();
125
126 if (value.getType() == CIMTYPE_UINT32)
127 {
128 Uint32 tval;
129 value.get(tval);
130 _minDeliveryRetryIntervalUsec = Uint64(tval) * 1000000;
131 }
132 else
133 {
134 value.get(_minDeliveryRetryIntervalUsec);
135 _minDeliveryRetryIntervalUsec*= 1000000;
136 }
137 // See DSP 1054 ver 1.1.0 Sec 7.10
138 _sequenceIdentifierLifetimeUsec = _maxDeliveryRetryAttempts *
139 _minDeliveryRetryIntervalUsec * 10;
140 PEG_METHOD_EXIT();
141 }
142 venkat.puvvada 1.1
143 void DestinationQueue::_initObjectManagerProperties()
144 {
145 PEG_METHOD_ENTER(TRC_IND_HANDLER,
146 "DestinationQueue::_initObjectManagerProperties");
147
148 CIMInstance instance =
149 _getInstance(PEGASUS_CLASSNAME_PG_OBJECTMANAGER);
150
151 instance.getProperty(
152 instance.findProperty(
153 PEGASUS_PROPERTYNAME_NAME)).getValue().get(
154 _objectManagerName);
155 PEG_METHOD_EXIT();
156 }
157
158 DestinationQueue::DestinationQueue(
159 const CIMInstance &handler):_handler(handler)
160 {
161 PEG_METHOD_ENTER(TRC_IND_HANDLER,
162 "DestinationQueue::DestinationQueue");
163 venkat.puvvada 1.1
164 if (!_initialized)
165 {
166 AutoMutex mtx(_intializeMutex);
167 if (!_initialized)
168 {
169 try
170 {
171 PEG_TRACE_CSTRING(TRC_IND_HANDLER, Tracer::LEVEL4,
172 "Initializaing the Destination Queue");
173 _initIndicationServiceProperties();
174 _initObjectManagerProperties();
175 }
176 catch(const Exception &e)
177 {
178 PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
179 "Exception %s caught while initializing the "
180 "DestinationQueue, using default values.",
181 (const char*)e.getMessage().getCString()));
182 }
183 catch(...)
184 venkat.puvvada 1.1 {
185 PEG_TRACE_CSTRING(TRC_IND_HANDLER,Tracer::LEVEL1,
186 "Unknown exception caught while initializing the "
187 "DestinationQueue, using default values.");
188 }
189 _initialized = true;
190 }
191 }
192
193 // Build the sequence context
194 _sequenceContext = _indicationServiceName;
195 _sequenceContext.append("-");
196 _sequenceContext.append(_objectManagerName);
197 _sequenceContext.append("-");
198
199 Uint32 len = 0;
200 char buffer[22];
201 const char* ptr = Uint64ToString(buffer, _serverStartupTimeUsec,len);
202 _sequenceContext.append(String(ptr, len));
203 _sequenceContext.append("-");
204
205 venkat.puvvada 1.1 Uint32 idx = handler.findProperty(
206 PEGASUS_PROPERTYNAME_LSTNRDST_CREATIONTIME);
207
208 if (idx != PEG_NOT_FOUND)
209 {
210 Uint64 tvalue;
211 handler.getProperty(idx).getValue().get(tvalue);
212 Uint32 llen = 0;
213 char lbuffer[22];
214 const char* lptr = Uint64ToString(lbuffer, tvalue, llen);
215 _sequenceContext.append(String(lptr, llen));
216 }
217 else
218 {
219 _sequenceContext.append(String(ptr, len));
220 }
221 _lastDeliveryRetryStatus = FAIL;
222
223 _sequenceNumber = 0;
224 _queueFullDroppedIndications = 0;
225 _lifetimeExpiredIndications = 0;
226 venkat.puvvada 1.1 _retryAttemptsExceededIndications = 0;
227 _subscriptionDeleteDroppedIndications = 0;
228 _calcMaxQueueSize = true;
229 _lastSuccessfulDeliveryTimeUsec = 0;
230 _maxIndicationDeliveryQueueSize = 2400;
231
232 _queueCreationTimeUsec = System::getCurrentTimeUsec();
233 PEG_METHOD_EXIT();
234 }
235
236 DestinationQueue::~DestinationQueue()
237 {
238 PEG_METHOD_ENTER(TRC_IND_HANDLER,
239 "DestinationQueue::~DestinationQueue");
240
|
241 marek 1.2 if (_queue.size() || _lastDeliveryRetryStatus == PENDING)
|
242 venkat.puvvada 1.1 {
243 _cleanup(LISTENER_NOT_ACTIVE);
244 }
245
246 PEG_METHOD_EXIT();
247 }
248
249 Sint64 DestinationQueue::getSequenceNumber()
250 {
251 AutoMutex mtx(_queueMutex);
252
253 // Determine max queue size, See PEP 324 for the algorithm
254 if (_calcMaxQueueSize)
255 {
256 if ((System::getCurrentTimeUsec() - _queueCreationTimeUsec)
257 >= _sequenceIdentifierLifetimeUsec)
258 {
259 // (10 * DeliveryRetryInterval * DeliveryRetryAttempts) /
260 // (Number of indications arrived over
261 // sequence-identifier-lifetime.)
262
263 venkat.puvvada 1.1 _maxIndicationDeliveryQueueSize = _sequenceNumber;
264
265 if (_maxIndicationDeliveryQueueSize < 200)
266 {
267 _maxIndicationDeliveryQueueSize = 200;
268 }
269 else if (_maxIndicationDeliveryQueueSize > 2400)
270 {
271 _maxIndicationDeliveryQueueSize = 2400;
272 }
273 _calcMaxQueueSize = false;
274 }
275 }
276
277 Sint64 nextSequenceNumber = _sequenceNumber++;
278
279 if (_sequenceNumber < 0)
280 {
281 _sequenceNumber = 0;
282 }
283
284 venkat.puvvada 1.1 return nextSequenceNumber;
285 }
286
287 String DestinationQueue::_getSequenceContext(
288 const CIMInstance &indication)
289 {
290 String sequenceContext;
291
292 indication.getProperty(
293 indication.findProperty(
294 _PROPERTY_SEQUENCECONTEXT)).getValue().get(sequenceContext);
295
296 return sequenceContext;
297 }
298
299 Sint64 DestinationQueue::_getSequenceNumber(
300 const CIMInstance &indication)
301 {
302 Sint64 sequenceNumber;
303
304 indication.getProperty(
305 venkat.puvvada 1.1 indication.findProperty(
306 _PROPERTY_SEQUENCENUMBER)).getValue().get(sequenceNumber);
307
308 return sequenceNumber;
309 }
310
|
311 venkat.puvvada 1.4 void DestinationQueue::_logDiscardedIndication(
312 Uint32 reasonCode,
313 const CIMInstance &indication,
314 const String &message)
|
315 venkat.puvvada 1.1 {
316 PEGASUS_ASSERT(reasonCode <
|
317 venkat.puvvada 1.6 sizeof(indDiscardedReasonMsgs)/sizeof(IndDiscardedReasonMsgs));
|
318 venkat.puvvada 1.1
|
319 venkat.puvvada 1.6 if (reasonCode == DRA_EXCEEDED)
320 {
321 Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING,
322 MessageLoaderParms(
323 indDiscardedReasonMsgs[reasonCode].key,
324 indDiscardedReasonMsgs[reasonCode].msg,
325 (const char*)_getSequenceContext(indication).getCString(),
326 _getSequenceNumber(indication),
327 (const char*)message.getCString()));
328 }
329 else
330 {
331 Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING,
332 MessageLoaderParms(
333 indDiscardedReasonMsgs[reasonCode].key,
334 indDiscardedReasonMsgs[reasonCode].msg,
335 (const char*)_getSequenceContext(indication).getCString(),
336 _getSequenceNumber(indication)));
337
338 }
|
339 venkat.puvvada 1.1 }
340
|
341 marek 1.2 void DestinationQueue::enqueue(CIMHandleIndicationRequestMessage *message)
|
342 venkat.puvvada 1.1 {
343 PEG_METHOD_ENTER(TRC_IND_HANDLER,
344 "DestinationQueue::enqueue");
345
|
346 marek 1.2 Uint32 idx;
347 CIMProperty prop;
348 CIMInstance &indication = message->indicationInstance;
349
350 if ((idx = indication.findProperty(_PROPERTY_SEQUENCECONTEXT))
351 != PEG_NOT_FOUND)
352 {
353 prop = indication.getProperty(idx);
354 prop.setValue(getSequenceContext());
355 indication.removeProperty(idx);
356 }
357 else
358 {
359 prop = CIMProperty(
360 _PROPERTY_SEQUENCECONTEXT,
361 getSequenceContext());
362 }
363 indication.addProperty(prop);
364
|
365 venkat.puvvada 1.1 AutoMutex mtx(_queueMutex);
|
366 marek 1.2 Sint64 sequenceNumber = getSequenceNumber();
367 if ((idx = indication.findProperty(_PROPERTY_SEQUENCENUMBER))
368 != PEG_NOT_FOUND)
369 {
370 prop = indication.getProperty(idx);
371 prop.setValue(sequenceNumber);
372 indication.removeProperty(idx);
373 }
374 else
375 {
376 prop = CIMProperty(
377 _PROPERTY_SEQUENCENUMBER,
378 sequenceNumber);
379 }
380 indication.addProperty(prop);
381
|
382 venkat.puvvada 1.7 DeliveryStatusAggregator *aggregator = 0;
383 if (message->deliveryStatusAggregator &&
384 message->deliveryStatusAggregator->waitUntilDelivered)
385 {
386 aggregator = message->deliveryStatusAggregator;
387 }
388
389
|
390 marek 1.2 IndicationInfo *info = new IndicationInfo(
391 message->indicationInstance,
392 message->subscriptionInstance,
393 message->operationContext,
394 message->nameSpace.getString(),
|
395 venkat.puvvada 1.5 this,
|
396 venkat.puvvada 1.7 aggregator);
|
397 venkat.puvvada 1.5
|
398 venkat.puvvada 1.1 _queue.insert_back(info);
399
|
400 marek 1.2 info->lastDeliveryRetryTimeUsec = 0;
401 info->arrivalTimeUsec = System::getCurrentTimeUsec();
|
402 venkat.puvvada 1.1
403 if (_queue.size() > _maxIndicationDeliveryQueueSize)
404 {
405 _queueFullDroppedIndications++;
406 IndicationInfo *temp = _queue.remove_front();
|
407 venkat.puvvada 1.4 _logDiscardedIndication(
|
408 venkat.puvvada 1.1 DESTINATIONQUEUE_FULL,
409 temp->indication);
410 delete temp;
411 }
|
412 marek 1.2
|
413 venkat.puvvada 1.1 PEG_METHOD_EXIT();
414 }
415
416 void DestinationQueue::updateDeliveryRetrySuccess(IndicationInfo *info)
417 {
418 PEG_METHOD_ENTER(TRC_IND_HANDLER,
419 "DestinationQueue::updateDeliveryRetrySuccess");
420
421 AutoMutex mtx(_queueMutex);
422 PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
423 _lastSuccessfulDeliveryTimeUsec = System::getCurrentTimeUsec();
424 _lastDeliveryRetryStatus = SUCCESS;
425
426 PEG_TRACE((TRC_IND_HANDLER, Tracer::LEVEL4,
427 "Indication with SequenceContext %s and SequenceNumber %"
428 PEGASUS_64BIT_CONVERSION_WIDTH "d is successfully delivered",
429 (const char*)_getSequenceContext(info->indication).getCString(),
430 _getSequenceNumber(info->indication)));
431
432 delete info;
433
434 venkat.puvvada 1.1 PEG_METHOD_EXIT();
435 }
436
437 void DestinationQueue::updateDeliveryRetryFailure(
438 IndicationInfo *info,
439 const CIMException &e)
440 {
441 PEG_METHOD_ENTER(TRC_IND_HANDLER,
442 "DestinationQueue::updateDeliveryRetryFailure");
443
444 AutoMutex mtx(_queueMutex);
445
446 PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
447 _lastDeliveryRetryStatus = FAIL;
448 info->deliveryRetryAttemptsMade++;
449
|
450 marek 1.2 // Check for DeliveryRetryAttempts by adding the original delivery attempt.
451 if (info->deliveryRetryAttemptsMade >= _maxDeliveryRetryAttempts + 1)
|
452 venkat.puvvada 1.1 {
453 _retryAttemptsExceededIndications++;
|
454 venkat.puvvada 1.4 _logDiscardedIndication(
|
455 venkat.puvvada 1.1 DRA_EXCEEDED,
|
456 venkat.puvvada 1.4 info->indication,
457 e.getMessage());
|
458 venkat.puvvada 1.1 delete info;
459 }
|
460 marek 1.2 else if (_queue.size() >= _maxIndicationDeliveryQueueSize)
|
461 venkat.puvvada 1.1 {
462 _queueFullDroppedIndications++;
|
463 venkat.puvvada 1.4 _logDiscardedIndication(
|
464 venkat.puvvada 1.1 DESTINATIONQUEUE_FULL,
465 info->indication);
466 delete info;
467 }
468 else
469 {
|
470 marek 1.2 // To deliver the indications in the correct order, insert the
471 // delivery retry failed indications at the front of the queue.
472 _queue.insert_front(info);
|
473 venkat.puvvada 1.4 PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
474 "Delivery failure for indication with SequenceContext %s and "
475 "SequenceNumber %" PEGASUS_64BIT_CONVERSION_WIDTH "d."
476 " DeliveryRetryAttempts made %u. Exception : %s",
477 (const char*)_getSequenceContext(info->indication).getCString(),
478 _getSequenceNumber(info->indication),
479 info->deliveryRetryAttemptsMade,
480 (const char*)e.getMessage().getCString()));
|
481 venkat.puvvada 1.1 info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec();
482 }
483
484 PEG_METHOD_EXIT();
485 }
486
487 void DestinationQueue::_waitForNonPendingDeliveryStatus()
488 {
489 PEG_METHOD_ENTER(TRC_IND_HANDLER,
490 "DestinationQueue::_waitForNonPendingDeliveryStatus");
491
492 while (true)
493 {
494 {
495 AutoMutex mtx(_queueMutex);
496 if (_lastDeliveryRetryStatus != PENDING)
497 {
498 break;
499 }
500 }
501 Threads::yield();
502 venkat.puvvada 1.1 Threads::sleep(50);
503 }
504 PEG_METHOD_EXIT();
505 }
506
507 void DestinationQueue::deleteMatchedIndications(
508 const CIMObjectPath &subscriptionPath)
509 {
510 PEG_METHOD_ENTER(TRC_IND_HANDLER,
511 "DestinationQueue::deleteMatchedIndications");
512
513 _waitForNonPendingDeliveryStatus();
514
515 IndicationInfo *info;
516 AutoMutex mtx(_queueMutex);
517
518 for(Uint32 i = 0, n = _queue.size(); i < n; ++i)
519 {
520 info = _queue.remove_front();
521 if (info->subscription.getPath().identical(subscriptionPath))
522 {
523 venkat.puvvada 1.1 _subscriptionDeleteDroppedIndications++;
|
524 venkat.puvvada 1.4 _logDiscardedIndication(
|
525 venkat.puvvada 1.1 SUBSCRIPTION_NOT_ACTIVE,
526 info->indication);
527 delete info;
528 }
529 else
530 {
531 _queue.insert_back(info);
532 }
533 }
534 PEG_METHOD_EXIT();
535 }
536
537 void DestinationQueue::cleanup()
538 {
539 PEG_METHOD_ENTER(TRC_IND_HANDLER,
540 "DestinationQueue::cleanup");
541
542 _cleanup(LISTENER_NOT_ACTIVE);
543
544 PEG_METHOD_EXIT();
545 }
546 venkat.puvvada 1.1
547 void DestinationQueue::shutdown()
548 {
549 PEG_METHOD_ENTER(TRC_IND_HANDLER,
550 "DestinationQueue::shutdown");
551
552 _cleanup(CIMSERVER_SHUTDOWN);
553
554 PEG_METHOD_EXIT();
555 }
556
557 void DestinationQueue::_cleanup(int reasonCode)
558 {
559 _waitForNonPendingDeliveryStatus();
560
561 IndicationInfo *info;
562 while ((info = _queue.remove_front()))
563 {
|
564 venkat.puvvada 1.4 _logDiscardedIndication(
|
565 venkat.puvvada 1.1 reasonCode,
566 info->indication);
567 delete info;
568 }
569 }
570
571 IndicationInfo* DestinationQueue::getNextIndicationForDelivery(
572 Uint64 &timeNowUsec, Uint64 &nextIndDRIExpTimeUsec)
573 {
574 AutoMutex mtx(_queueMutex);
575
576 if (!_queue.size() || _lastDeliveryRetryStatus == PENDING)
577 {
|
578 venkat.puvvada 1.3 // Maximum expiration time is equals to DeliveryRetryInterval.
579 nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec;
|
580 venkat.puvvada 1.1 return 0;
581 }
582
|
583 venkat.puvvada 1.3 nextIndDRIExpTimeUsec = 0;
584
|
585 venkat.puvvada 1.1 IndicationInfo *info;
586
587 while (_queue.size())
588 {
589 info = _queue.front();
590
591 if (timeNowUsec < info->arrivalTimeUsec ||
592 timeNowUsec < info->lastDeliveryRetryTimeUsec)
593 {
594 timeNowUsec = System::getCurrentTimeUsec();
595 }
596
597 if ((timeNowUsec - info->arrivalTimeUsec) >=
598 _sequenceIdentifierLifetimeUsec)
599 {
600 _lifetimeExpiredIndications++;
601 IndicationInfo *temp = _queue.remove_front();
|
602 venkat.puvvada 1.4 _logDiscardedIndication(
|
603 venkat.puvvada 1.1 SIL_EXPIRED,
604 temp->indication);
605 delete temp;
606 }
607 else if ((timeNowUsec - info->lastDeliveryRetryTimeUsec)
608 >= _minDeliveryRetryIntervalUsec)
609 {
610 _lastDeliveryRetryStatus = PENDING;
611 _queue.remove_front();
612 IndicationInfo *temp = _queue.front();
613
|
614 marek 1.2 // The following algorithm is used to determine the elapsed
615 // DeliveryRetryAttempts. To deliver the indication in order,
616 // Server delays the newer indications until older indications
617 // in the queue are attempted for delivery and their
618 // DeliveryRetyAttempts are exceeded. The following algorithm
619 // ensures that indications won't stay in the queue more than
620 // (DeliveryRetryInterval * (DeliveryRetyAttempts + 1) time.
621 Uint32 elapsedDeliveryRetryAttempts;
622 if (info->lastDeliveryRetryTimeUsec)
623 {
624 elapsedDeliveryRetryAttempts =
625 ((timeNowUsec - info->lastDeliveryRetryTimeUsec)
626 / _minDeliveryRetryIntervalUsec);
627 }
628 else
629 {
630 elapsedDeliveryRetryAttempts =
631 ((timeNowUsec - info->arrivalTimeUsec)
632 / _minDeliveryRetryIntervalUsec);
633 }
634
635 marek 1.2 if (elapsedDeliveryRetryAttempts)
636 {
637 info->deliveryRetryAttemptsMade +=
638 elapsedDeliveryRetryAttempts - 1;
639 }
640
|
641 venkat.puvvada 1.1 if (temp)
642 {
643 if (timeNowUsec - temp->lastDeliveryRetryTimeUsec
644 < _minDeliveryRetryIntervalUsec)
645 {
646 nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
647 (timeNowUsec - temp->lastDeliveryRetryTimeUsec);
648 }
649
650 PEGASUS_ASSERT(nextIndDRIExpTimeUsec
651 <= _minDeliveryRetryIntervalUsec);
652 }
653
654 return info;
655 }
656 else
657 {
658 if (timeNowUsec - info->lastDeliveryRetryTimeUsec
659 < _minDeliveryRetryIntervalUsec)
660 {
661 nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
662 venkat.puvvada 1.1 (timeNowUsec - info->lastDeliveryRetryTimeUsec);
663 }
664
665 PEGASUS_ASSERT(nextIndDRIExpTimeUsec
666 <= _minDeliveryRetryIntervalUsec);
667
668 break;
669 }
670 }
671
672 return 0;
673 }
674
675 void DestinationQueue::getInfo(QueueInfo &qinfo)
676 {
677 AutoMutex mtx(_queueMutex);
678
679 qinfo.handlerName = _handler.getPath();
680 qinfo.queueCreationTimeUsec = _queueCreationTimeUsec;
681 qinfo.sequenceContext = _sequenceContext;
682 qinfo.nextSequenceNumber = _sequenceNumber;
683 venkat.puvvada 1.1 qinfo.maxQueueLength = _maxIndicationDeliveryQueueSize;
684 qinfo.sequenceIdentifierLifetimeSeconds =
685 _sequenceIdentifierLifetimeUsec / 1000000;
686 qinfo.size = _queue.size();
687 qinfo.queueFullDroppedIndications = _queueFullDroppedIndications;
688 qinfo.lifetimeExpiredIndications = _lifetimeExpiredIndications;
689 qinfo.retryAttemptsExceededIndications = _retryAttemptsExceededIndications;
690 qinfo.subscriptionDisableDroppedIndications =
691 _subscriptionDeleteDroppedIndications;
692 qinfo.lastSuccessfulDeliveryTimeUsec = _lastSuccessfulDeliveryTimeUsec;
693 }
694
695 PEGASUS_NAMESPACE_END
696
|