1 venkat.puvvada 1.1 //%LICENSE////////////////////////////////////////////////////////////////
2 //
3 // Licensed to The Open Group (TOG) under one or more contributor license
4 // agreements. Refer to the OpenPegasusNOTICE.txt file distributed with
5 // this work for additional information regarding copyright ownership.
6 // Each contributor licenses this file to you under the OpenPegasus Open
7 // Source License; you may not use this file except in compliance with the
8 // License.
9 //
10 // Permission is hereby granted, free of charge, to any person obtaining a
11 // copy of this software and associated documentation files (the "Software"),
12 // to deal in the Software without restriction, including without limitation
13 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 // and/or sell copies of the Software, and to permit persons to whom the
15 // Software is furnished to do so, subject to the following conditions:
16 //
17 // The above copyright notice and this permission notice shall be included
18 // in all copies or substantial portions of the Software.
19 //
20 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
21 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
22 venkat.puvvada 1.1 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23 // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
24 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
25 // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
26 // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //////////////////////////////////////////////////////////////////////////
29 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include <Pegasus/Common/Time.h>
33 #include <Pegasus/Common/Tracer.h>
34 #include <Pegasus/Common/Constants.h>
35 #include <Pegasus/Common/StringConversion.h>
36 #include <Pegasus/Config/ConfigManager.h>
37 #include <Pegasus/Provider/CIMOMHandle.h>
38 #include "DestinationQueue.h"
39
40 PEGASUS_NAMESPACE_BEGIN
41
42 // Initialize with default values.
43 venkat.puvvada 1.1 Uint16 DestinationQueue::_maxDeliveryRetryAttempts = 3;
44 Uint64 DestinationQueue::_minDeliveryRetryIntervalUsec = 20 * 1000000;
45 Uint64 DestinationQueue::_sequenceIdentifierLifetimeUsec = 600 * 1000000;
46 String DestinationQueue::_indicationServiceName = "PG:IndicationService";
47 String DestinationQueue::_objectManagerName = "Pegasus";
48
|
49 karl 1.1.2.2 DestinationQueue::IndDiscardedReasonMsgs
50 DestinationQueue::indDiscardedReasonMsgs[] = {
51 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_LD_DELETED",
52 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\" was"
53 " not delivered due to the corresponding listener destination"
54 " instance was removed."},
55
56 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_"
57 "SUBSCRIPTION_DELETED",
58 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\" was"
59 " not delivered due to the corresponding subscription was disabled"
60 " or deleted."},
61
62 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_"
63 "DESTINATIONQUEUE_FULL",
64 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
65 " was discarded due to the destination queue was full."},
66
67 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_SIL_EXPIRED",
68 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
69 " was not delivered due to the sequence identifier lifetime expired."},
70 karl 1.1.2.2
71 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_DRA_EXCEEDED",
72 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
73 " was not delivered due to the maximum delivery retry"
74 " attempts exceeded. Exception : $2"},
75
76 {"HandlerService.DestinationQueue.INDICATION_DISCARDED_CIMSERVER_SHUTDOWN",
77 "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
78 " was not delivered due to the cimserver shutdown."}
|
79 venkat.puvvada 1.1 };
80
81 Uint64 DestinationQueue::_serverStartupTimeUsec
82 = System::getCurrentTimeUsec();
83
84 Boolean DestinationQueue::_initialized = false;
85 static Mutex _intializeMutex;
86
87 CIMInstance DestinationQueue::_getInstance(const CIMName &className)
88 {
89 CIMOMHandle cimomHandle;
90 Array<CIMInstance> instances =
91 cimomHandle.enumerateInstances(
92 OperationContext(),
93 PEGASUS_NAMESPACENAME_INTEROP,
94 className,
95 true,
96 false,
97 false,
98 false,
99 CIMPropertyList());
100 venkat.puvvada 1.1
101 return instances[0];
102 }
103
104 void DestinationQueue::_initIndicationServiceProperties()
105 {
106 PEG_METHOD_ENTER(TRC_IND_HANDLER,
107 "DestinationQueue::_initIndicationServiceProperties");
108
109 CIMInstance instance =
110 _getInstance(PEGASUS_CLASSNAME_CIM_INDICATIONSERVICE);
111
112 instance.getProperty(
113 instance.findProperty(
114 _PROPERTY_DELIVERY_RETRYATTEMPTS)).getValue().get(
115 _maxDeliveryRetryAttempts);
116
117 instance.getProperty(
118 instance.findProperty(
119 PEGASUS_PROPERTYNAME_NAME)).getValue().get(
120 _indicationServiceName);
121 venkat.puvvada 1.1
122 CIMValue value = instance.getProperty(
123 instance.findProperty(
124 _PROPERTY_DELIVERY_RETRYINTERVAL)).getValue();
125
126 if (value.getType() == CIMTYPE_UINT32)
127 {
128 Uint32 tval;
129 value.get(tval);
130 _minDeliveryRetryIntervalUsec = Uint64(tval) * 1000000;
131 }
132 else
133 {
134 value.get(_minDeliveryRetryIntervalUsec);
135 _minDeliveryRetryIntervalUsec*= 1000000;
136 }
137 // See DSP 1054 ver 1.1.0 Sec 7.10
138 _sequenceIdentifierLifetimeUsec = _maxDeliveryRetryAttempts *
139 _minDeliveryRetryIntervalUsec * 10;
140 PEG_METHOD_EXIT();
141 }
142 venkat.puvvada 1.1
143 void DestinationQueue::_initObjectManagerProperties()
144 {
145 PEG_METHOD_ENTER(TRC_IND_HANDLER,
146 "DestinationQueue::_initObjectManagerProperties");
147
148 CIMInstance instance =
149 _getInstance(PEGASUS_CLASSNAME_PG_OBJECTMANAGER);
150
151 instance.getProperty(
152 instance.findProperty(
153 PEGASUS_PROPERTYNAME_NAME)).getValue().get(
154 _objectManagerName);
155 PEG_METHOD_EXIT();
156 }
157
158 DestinationQueue::DestinationQueue(
159 const CIMInstance &handler):_handler(handler)
160 {
161 PEG_METHOD_ENTER(TRC_IND_HANDLER,
162 "DestinationQueue::DestinationQueue");
163 venkat.puvvada 1.1
164 if (!_initialized)
165 {
166 AutoMutex mtx(_intializeMutex);
167 if (!_initialized)
168 {
169 try
170 {
171 PEG_TRACE_CSTRING(TRC_IND_HANDLER, Tracer::LEVEL4,
172 "Initializaing the Destination Queue");
173 _initIndicationServiceProperties();
174 _initObjectManagerProperties();
175 }
176 catch(const Exception &e)
177 {
178 PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
179 "Exception %s caught while initializing the "
180 "DestinationQueue, using default values.",
181 (const char*)e.getMessage().getCString()));
182 }
183 catch(...)
184 venkat.puvvada 1.1 {
185 PEG_TRACE_CSTRING(TRC_IND_HANDLER,Tracer::LEVEL1,
186 "Unknown exception caught while initializing the "
187 "DestinationQueue, using default values.");
188 }
189 _initialized = true;
190 }
191 }
192
193 // Build the sequence context
194 _sequenceContext = _indicationServiceName;
195 _sequenceContext.append("-");
196 _sequenceContext.append(_objectManagerName);
197 _sequenceContext.append("-");
198
|
199 karl 1.1.2.2 _connection = 0;
200
|
201 venkat.puvvada 1.1 Uint32 len = 0;
202 char buffer[22];
203 const char* ptr = Uint64ToString(buffer, _serverStartupTimeUsec,len);
204 _sequenceContext.append(String(ptr, len));
205 _sequenceContext.append("-");
206
207 Uint32 idx = handler.findProperty(
208 PEGASUS_PROPERTYNAME_LSTNRDST_CREATIONTIME);
209
210 if (idx != PEG_NOT_FOUND)
211 {
212 Uint64 tvalue;
213 handler.getProperty(idx).getValue().get(tvalue);
214 Uint32 llen = 0;
215 char lbuffer[22];
216 const char* lptr = Uint64ToString(lbuffer, tvalue, llen);
217 _sequenceContext.append(String(lptr, llen));
218 }
219 else
220 {
221 _sequenceContext.append(String(ptr, len));
222 venkat.puvvada 1.1 }
223 _lastDeliveryRetryStatus = FAIL;
224
225 _sequenceNumber = 0;
226 _queueFullDroppedIndications = 0;
227 _lifetimeExpiredIndications = 0;
228 _retryAttemptsExceededIndications = 0;
229 _subscriptionDeleteDroppedIndications = 0;
230 _calcMaxQueueSize = true;
231 _lastSuccessfulDeliveryTimeUsec = 0;
232 _maxIndicationDeliveryQueueSize = 2400;
233
234 _queueCreationTimeUsec = System::getCurrentTimeUsec();
235 PEG_METHOD_EXIT();
236 }
237
238 DestinationQueue::~DestinationQueue()
239 {
240 PEG_METHOD_ENTER(TRC_IND_HANDLER,
241 "DestinationQueue::~DestinationQueue");
242
|
243 karl 1.1.2.1 if (_queue.size() || _lastDeliveryRetryStatus == PENDING)
|
244 venkat.puvvada 1.1 {
245 _cleanup(LISTENER_NOT_ACTIVE);
246 }
|
247 karl 1.1.2.2 delete _connection;
|
248 venkat.puvvada 1.1
249 PEG_METHOD_EXIT();
250 }
251
252 Sint64 DestinationQueue::getSequenceNumber()
253 {
254 AutoMutex mtx(_queueMutex);
255
256 // Determine max queue size, See PEP 324 for the algorithm
257 if (_calcMaxQueueSize)
258 {
259 if ((System::getCurrentTimeUsec() - _queueCreationTimeUsec)
260 >= _sequenceIdentifierLifetimeUsec)
261 {
262 // (10 * DeliveryRetryInterval * DeliveryRetryAttempts) /
263 // (Number of indications arrived over
264 // sequence-identifier-lifetime.)
265
266 _maxIndicationDeliveryQueueSize = _sequenceNumber;
267
268 if (_maxIndicationDeliveryQueueSize < 200)
269 venkat.puvvada 1.1 {
270 _maxIndicationDeliveryQueueSize = 200;
271 }
272 else if (_maxIndicationDeliveryQueueSize > 2400)
273 {
274 _maxIndicationDeliveryQueueSize = 2400;
275 }
276 _calcMaxQueueSize = false;
277 }
278 }
279
280 Sint64 nextSequenceNumber = _sequenceNumber++;
281
282 if (_sequenceNumber < 0)
283 {
284 _sequenceNumber = 0;
285 }
286
287 return nextSequenceNumber;
288 }
289
290 venkat.puvvada 1.1 String DestinationQueue::_getSequenceContext(
291 const CIMInstance &indication)
292 {
293 String sequenceContext;
294
295 indication.getProperty(
296 indication.findProperty(
297 _PROPERTY_SEQUENCECONTEXT)).getValue().get(sequenceContext);
298
299 return sequenceContext;
300 }
301
302 Sint64 DestinationQueue::_getSequenceNumber(
303 const CIMInstance &indication)
304 {
305 Sint64 sequenceNumber;
306
307 indication.getProperty(
308 indication.findProperty(
309 _PROPERTY_SEQUENCENUMBER)).getValue().get(sequenceNumber);
310
311 venkat.puvvada 1.1 return sequenceNumber;
312 }
313
|
314 karl 1.1.2.1 void DestinationQueue::_logDiscardedIndication(
315 Uint32 reasonCode,
316 const CIMInstance &indication,
317 const String &message)
|
318 venkat.puvvada 1.1 {
319 PEGASUS_ASSERT(reasonCode <
|
320 karl 1.1.2.2 sizeof(indDiscardedReasonMsgs)/sizeof(IndDiscardedReasonMsgs));
|
321 venkat.puvvada 1.1
|
322 karl 1.1.2.2 if (reasonCode == DRA_EXCEEDED)
323 {
324 Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING,
325 MessageLoaderParms(
326 indDiscardedReasonMsgs[reasonCode].key,
327 indDiscardedReasonMsgs[reasonCode].msg,
328 (const char*)_getSequenceContext(indication).getCString(),
329 _getSequenceNumber(indication),
330 (const char*)message.getCString()));
331 }
332 else
333 {
334 Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING,
335 MessageLoaderParms(
336 indDiscardedReasonMsgs[reasonCode].key,
337 indDiscardedReasonMsgs[reasonCode].msg,
338 (const char*)_getSequenceContext(indication).getCString(),
339 _getSequenceNumber(indication)));
340
341 }
|
342 venkat.puvvada 1.1 }
343
|
344 karl 1.1.2.1 void DestinationQueue::enqueue(CIMHandleIndicationRequestMessage *message)
|
345 venkat.puvvada 1.1 {
346 PEG_METHOD_ENTER(TRC_IND_HANDLER,
347 "DestinationQueue::enqueue");
348
|
349 karl 1.1.2.1 Uint32 idx;
350 CIMProperty prop;
351 CIMInstance &indication = message->indicationInstance;
352
353 if ((idx = indication.findProperty(_PROPERTY_SEQUENCECONTEXT))
354 != PEG_NOT_FOUND)
355 {
356 prop = indication.getProperty(idx);
357 prop.setValue(getSequenceContext());
358 indication.removeProperty(idx);
359 }
360 else
361 {
362 prop = CIMProperty(
363 _PROPERTY_SEQUENCECONTEXT,
364 getSequenceContext());
365 }
366 indication.addProperty(prop);
367
|
368 venkat.puvvada 1.1 AutoMutex mtx(_queueMutex);
|
369 karl 1.1.2.1 Sint64 sequenceNumber = getSequenceNumber();
370 if ((idx = indication.findProperty(_PROPERTY_SEQUENCENUMBER))
371 != PEG_NOT_FOUND)
372 {
373 prop = indication.getProperty(idx);
374 prop.setValue(sequenceNumber);
375 indication.removeProperty(idx);
376 }
377 else
378 {
379 prop = CIMProperty(
380 _PROPERTY_SEQUENCENUMBER,
381 sequenceNumber);
382 }
383 indication.addProperty(prop);
384
|
385 karl 1.1.2.2 DeliveryStatusAggregator *aggregator = 0;
386 if (message->deliveryStatusAggregator &&
387 message->deliveryStatusAggregator->waitUntilDelivered)
388 {
389 aggregator = message->deliveryStatusAggregator;
390 }
391
|
392 karl 1.1.2.1 IndicationInfo *info = new IndicationInfo(
393 message->indicationInstance,
394 message->subscriptionInstance,
395 message->operationContext,
396 message->nameSpace.getString(),
|
397 karl 1.1.2.2 this,
398 aggregator);
399
|
400 venkat.puvvada 1.1 _queue.insert_back(info);
401
|
402 karl 1.1.2.1 info->lastDeliveryRetryTimeUsec = 0;
403 info->arrivalTimeUsec = System::getCurrentTimeUsec();
|
404 venkat.puvvada 1.1
405 if (_queue.size() > _maxIndicationDeliveryQueueSize)
406 {
407 _queueFullDroppedIndications++;
408 IndicationInfo *temp = _queue.remove_front();
|
409 karl 1.1.2.1 _logDiscardedIndication(
|
410 venkat.puvvada 1.1 DESTINATIONQUEUE_FULL,
411 temp->indication);
412 delete temp;
413 }
|
414 karl 1.1.2.1
|
415 venkat.puvvada 1.1 PEG_METHOD_EXIT();
416 }
417
418 void DestinationQueue::updateDeliveryRetrySuccess(IndicationInfo *info)
419 {
420 PEG_METHOD_ENTER(TRC_IND_HANDLER,
421 "DestinationQueue::updateDeliveryRetrySuccess");
422
423 AutoMutex mtx(_queueMutex);
|
424 karl 1.1.2.2
|
425 venkat.puvvada 1.1 PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
426 _lastSuccessfulDeliveryTimeUsec = System::getCurrentTimeUsec();
427 _lastDeliveryRetryStatus = SUCCESS;
428
429 PEG_TRACE((TRC_IND_HANDLER, Tracer::LEVEL4,
430 "Indication with SequenceContext %s and SequenceNumber %"
431 PEGASUS_64BIT_CONVERSION_WIDTH "d is successfully delivered",
432 (const char*)_getSequenceContext(info->indication).getCString(),
433 _getSequenceNumber(info->indication)));
434
435 delete info;
436
437 PEG_METHOD_EXIT();
438 }
439
440 void DestinationQueue::updateDeliveryRetryFailure(
441 IndicationInfo *info,
442 const CIMException &e)
443 {
444 PEG_METHOD_ENTER(TRC_IND_HANDLER,
445 "DestinationQueue::updateDeliveryRetryFailure");
446 venkat.puvvada 1.1
447 AutoMutex mtx(_queueMutex);
448
|
449 karl 1.1.2.2 // We should not have any connection object here because indication
450 // delivery has failed.
451 PEGASUS_ASSERT(!_connection);
452
|
453 venkat.puvvada 1.1 PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
454 _lastDeliveryRetryStatus = FAIL;
455 info->deliveryRetryAttemptsMade++;
456
|
457 karl 1.1.2.1 // Check for DeliveryRetryAttempts by adding the original delivery attempt.
458 if (info->deliveryRetryAttemptsMade >= _maxDeliveryRetryAttempts + 1)
|
459 venkat.puvvada 1.1 {
460 _retryAttemptsExceededIndications++;
|
461 karl 1.1.2.1 _logDiscardedIndication(
|
462 venkat.puvvada 1.1 DRA_EXCEEDED,
|
463 karl 1.1.2.1 info->indication,
464 e.getMessage());
|
465 venkat.puvvada 1.1 delete info;
466 }
|
467 karl 1.1.2.1 else if (_queue.size() >= _maxIndicationDeliveryQueueSize)
|
468 venkat.puvvada 1.1 {
469 _queueFullDroppedIndications++;
|
470 karl 1.1.2.1 _logDiscardedIndication(
|
471 venkat.puvvada 1.1 DESTINATIONQUEUE_FULL,
472 info->indication);
473 delete info;
474 }
475 else
476 {
|
477 karl 1.1.2.1 // To deliver the indications in the correct order, insert the
478 // delivery retry failed indications at the front of the queue.
479 _queue.insert_front(info);
480 PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
481 "Delivery failure for indication with SequenceContext %s and "
482 "SequenceNumber %" PEGASUS_64BIT_CONVERSION_WIDTH "d."
483 " DeliveryRetryAttempts made %u. Exception : %s",
484 (const char*)_getSequenceContext(info->indication).getCString(),
485 _getSequenceNumber(info->indication),
486 info->deliveryRetryAttemptsMade,
487 (const char*)e.getMessage().getCString()));
|
488 venkat.puvvada 1.1 info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec();
489 }
490
491 PEG_METHOD_EXIT();
492 }
493
494 void DestinationQueue::_waitForNonPendingDeliveryStatus()
495 {
496 PEG_METHOD_ENTER(TRC_IND_HANDLER,
497 "DestinationQueue::_waitForNonPendingDeliveryStatus");
498
499 while (true)
500 {
501 {
502 AutoMutex mtx(_queueMutex);
503 if (_lastDeliveryRetryStatus != PENDING)
504 {
505 break;
506 }
507 }
508 Threads::yield();
509 venkat.puvvada 1.1 Threads::sleep(50);
510 }
511 PEG_METHOD_EXIT();
512 }
513
514 void DestinationQueue::deleteMatchedIndications(
515 const CIMObjectPath &subscriptionPath)
516 {
517 PEG_METHOD_ENTER(TRC_IND_HANDLER,
518 "DestinationQueue::deleteMatchedIndications");
519
520 _waitForNonPendingDeliveryStatus();
521
522 IndicationInfo *info;
523 AutoMutex mtx(_queueMutex);
524
525 for(Uint32 i = 0, n = _queue.size(); i < n; ++i)
526 {
527 info = _queue.remove_front();
528 if (info->subscription.getPath().identical(subscriptionPath))
529 {
530 venkat.puvvada 1.1 _subscriptionDeleteDroppedIndications++;
|
531 karl 1.1.2.1 _logDiscardedIndication(
|
532 venkat.puvvada 1.1 SUBSCRIPTION_NOT_ACTIVE,
533 info->indication);
534 delete info;
535 }
536 else
537 {
538 _queue.insert_back(info);
539 }
540 }
541 PEG_METHOD_EXIT();
542 }
543
544 void DestinationQueue::cleanup()
545 {
546 PEG_METHOD_ENTER(TRC_IND_HANDLER,
547 "DestinationQueue::cleanup");
548
549 _cleanup(LISTENER_NOT_ACTIVE);
550
551 PEG_METHOD_EXIT();
552 }
553 venkat.puvvada 1.1
554 void DestinationQueue::shutdown()
555 {
556 PEG_METHOD_ENTER(TRC_IND_HANDLER,
557 "DestinationQueue::shutdown");
558
559 _cleanup(CIMSERVER_SHUTDOWN);
560
561 PEG_METHOD_EXIT();
562 }
563
564 void DestinationQueue::_cleanup(int reasonCode)
565 {
566 _waitForNonPendingDeliveryStatus();
567
568 IndicationInfo *info;
569 while ((info = _queue.remove_front()))
570 {
|
571 karl 1.1.2.1 _logDiscardedIndication(
|
572 venkat.puvvada 1.1 reasonCode,
573 info->indication);
574 delete info;
575 }
576 }
577
578 IndicationInfo* DestinationQueue::getNextIndicationForDelivery(
579 Uint64 &timeNowUsec, Uint64 &nextIndDRIExpTimeUsec)
580 {
581 AutoMutex mtx(_queueMutex);
582
583 if (!_queue.size() || _lastDeliveryRetryStatus == PENDING)
584 {
|
585 karl 1.1.2.1 // Maximum expiration time is equals to DeliveryRetryInterval.
586 nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec;
|
587 karl 1.1.2.2
588 // If there are no indications in the queue, delete connection.
589 if (!_queue.size() && _lastDeliveryRetryStatus != PENDING)
590 {
591 delete _connection;
592 _connection = 0;
593 }
|
594 venkat.puvvada 1.1 return 0;
595 }
596
|
597 karl 1.1.2.1 nextIndDRIExpTimeUsec = 0;
598
|
599 venkat.puvvada 1.1 IndicationInfo *info;
600
601 while (_queue.size())
602 {
603 info = _queue.front();
604
605 if (timeNowUsec < info->arrivalTimeUsec ||
606 timeNowUsec < info->lastDeliveryRetryTimeUsec)
607 {
608 timeNowUsec = System::getCurrentTimeUsec();
609 }
610
611 if ((timeNowUsec - info->arrivalTimeUsec) >=
612 _sequenceIdentifierLifetimeUsec)
613 {
614 _lifetimeExpiredIndications++;
615 IndicationInfo *temp = _queue.remove_front();
|
616 karl 1.1.2.1 _logDiscardedIndication(
|
617 venkat.puvvada 1.1 SIL_EXPIRED,
618 temp->indication);
619 delete temp;
620 }
621 else if ((timeNowUsec - info->lastDeliveryRetryTimeUsec)
622 >= _minDeliveryRetryIntervalUsec)
623 {
624 _lastDeliveryRetryStatus = PENDING;
625 _queue.remove_front();
626 IndicationInfo *temp = _queue.front();
627
|
628 karl 1.1.2.1 // The following algorithm is used to determine the elapsed
629 // DeliveryRetryAttempts. To deliver the indication in order,
630 // Server delays the newer indications until older indications
631 // in the queue are attempted for delivery and their
632 // DeliveryRetyAttempts are exceeded. The following algorithm
633 // ensures that indications won't stay in the queue more than
634 // (DeliveryRetryInterval * (DeliveryRetyAttempts + 1) time.
635 Uint32 elapsedDeliveryRetryAttempts;
636 if (info->lastDeliveryRetryTimeUsec)
637 {
638 elapsedDeliveryRetryAttempts =
|
639 karl 1.1.2.2 ((timeNowUsec - info->lastDeliveryRetryTimeUsec)
|
640 karl 1.1.2.1 / _minDeliveryRetryIntervalUsec);
641 }
642 else
643 {
|
644 karl 1.1.2.2 elapsedDeliveryRetryAttempts =
645 ((timeNowUsec - info->arrivalTimeUsec)
|
646 karl 1.1.2.1 / _minDeliveryRetryIntervalUsec);
647 }
648
649 if (elapsedDeliveryRetryAttempts)
650 {
651 info->deliveryRetryAttemptsMade +=
652 elapsedDeliveryRetryAttempts - 1;
653 }
654
|
655 venkat.puvvada 1.1 if (temp)
656 {
657 if (timeNowUsec - temp->lastDeliveryRetryTimeUsec
658 < _minDeliveryRetryIntervalUsec)
659 {
660 nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
661 (timeNowUsec - temp->lastDeliveryRetryTimeUsec);
662 }
663
664 PEGASUS_ASSERT(nextIndDRIExpTimeUsec
665 <= _minDeliveryRetryIntervalUsec);
666 }
667
668 return info;
669 }
670 else
671 {
672 if (timeNowUsec - info->lastDeliveryRetryTimeUsec
673 < _minDeliveryRetryIntervalUsec)
674 {
675 nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
676 venkat.puvvada 1.1 (timeNowUsec - info->lastDeliveryRetryTimeUsec);
677 }
678
679 PEGASUS_ASSERT(nextIndDRIExpTimeUsec
680 <= _minDeliveryRetryIntervalUsec);
681
682 break;
683 }
684 }
685
686 return 0;
687 }
688
689 void DestinationQueue::getInfo(QueueInfo &qinfo)
690 {
691 AutoMutex mtx(_queueMutex);
692
693 qinfo.handlerName = _handler.getPath();
694 qinfo.queueCreationTimeUsec = _queueCreationTimeUsec;
695 qinfo.sequenceContext = _sequenceContext;
696 qinfo.nextSequenceNumber = _sequenceNumber;
697 venkat.puvvada 1.1 qinfo.maxQueueLength = _maxIndicationDeliveryQueueSize;
698 qinfo.sequenceIdentifierLifetimeSeconds =
699 _sequenceIdentifierLifetimeUsec / 1000000;
700 qinfo.size = _queue.size();
701 qinfo.queueFullDroppedIndications = _queueFullDroppedIndications;
702 qinfo.lifetimeExpiredIndications = _lifetimeExpiredIndications;
703 qinfo.retryAttemptsExceededIndications = _retryAttemptsExceededIndications;
704 qinfo.subscriptionDisableDroppedIndications =
705 _subscriptionDeleteDroppedIndications;
706 qinfo.lastSuccessfulDeliveryTimeUsec = _lastSuccessfulDeliveryTimeUsec;
707 }
708
709 PEGASUS_NAMESPACE_END
710
|