version 1.1, 2010/07/05 08:58:36
|
version 1.1.2.1, 2011/10/11 18:18:20
|
|
|
PEG_METHOD_ENTER(TRC_IND_HANDLER, | PEG_METHOD_ENTER(TRC_IND_HANDLER, |
"DestinationQueue::~DestinationQueue"); | "DestinationQueue::~DestinationQueue"); |
| |
if (!isIdle()) |
if (_queue.size() || _lastDeliveryRetryStatus == PENDING) |
{ | { |
_cleanup(LISTENER_NOT_ACTIVE); | _cleanup(LISTENER_NOT_ACTIVE); |
} | } |
|
|
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
} | } |
| |
Boolean DestinationQueue::isIdle() |
|
{ |
|
AutoMutex mtx(_queueMutex); |
|
return _queue.size() == 0 && _lastDeliveryRetryStatus != PENDING; |
|
} |
|
|
|
Sint64 DestinationQueue::getSequenceNumber() | Sint64 DestinationQueue::getSequenceNumber() |
{ | { |
AutoMutex mtx(_queueMutex); | AutoMutex mtx(_queueMutex); |
|
|
return nextSequenceNumber; | return nextSequenceNumber; |
} | } |
| |
void DestinationQueue::updateLastSuccessfulDeliveryTime() |
|
{ |
|
AutoMutex mtx(_queueMutex); |
|
_lastSuccessfulDeliveryTimeUsec = System::getCurrentTimeUsec(); |
|
} |
|
|
|
String DestinationQueue::_getSequenceContext( | String DestinationQueue::_getSequenceContext( |
const CIMInstance &indication) | const CIMInstance &indication) |
{ | { |
|
|
return sequenceNumber; | return sequenceNumber; |
} | } |
| |
void DestinationQueue::_traceDiscardedIndication( |
void DestinationQueue::_logDiscardedIndication( |
Uint32 reasonCode, const CIMInstance &indication) |
Uint32 reasonCode, |
|
const CIMInstance &indication, |
|
const String &message) |
{ | { |
| |
PEGASUS_ASSERT(reasonCode < | PEGASUS_ASSERT(reasonCode < |
sizeof(_indDiscardedReasons)/sizeof(const char*)); | sizeof(_indDiscardedReasons)/sizeof(const char*)); |
| |
PEG_TRACE((TRC_IND_HANDLER, Tracer::LEVEL3, |
String logMessage = _indDiscardedReasons[reasonCode]; |
"%s, Indication with SequenceContext %s and SequenceNumber %" |
logMessage.append(Char16('.')); |
PEGASUS_64BIT_CONVERSION_WIDTH "d is discarded", |
logMessage.append(message); |
_indDiscardedReasons[reasonCode], |
Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING, |
|
MessageLoaderParms( |
|
"HandlerService.DestinationQueue.INDICATION_DELIVERY_FAILED", |
|
"Failed to deliver an indication with SequenceContext \"$0\" " |
|
"and SequenceNumber \"$1\" : $2.", |
(const char*)_getSequenceContext(indication).getCString(), | (const char*)_getSequenceContext(indication).getCString(), |
_getSequenceNumber(indication))); |
_getSequenceNumber(indication), |
|
(const char*)logMessage.getCString())); |
} | } |
| |
void DestinationQueue::enqueue(IndicationInfo *info) |
void DestinationQueue::enqueue(CIMHandleIndicationRequestMessage *message) |
{ | { |
PEG_METHOD_ENTER(TRC_IND_HANDLER, | PEG_METHOD_ENTER(TRC_IND_HANDLER, |
"DestinationQueue::enqueue"); | "DestinationQueue::enqueue"); |
| |
|
Uint32 idx; |
|
CIMProperty prop; |
|
CIMInstance &indication = message->indicationInstance; |
|
|
|
if ((idx = indication.findProperty(_PROPERTY_SEQUENCECONTEXT)) |
|
!= PEG_NOT_FOUND) |
|
{ |
|
prop = indication.getProperty(idx); |
|
prop.setValue(getSequenceContext()); |
|
indication.removeProperty(idx); |
|
} |
|
else |
|
{ |
|
prop = CIMProperty( |
|
_PROPERTY_SEQUENCECONTEXT, |
|
getSequenceContext()); |
|
} |
|
indication.addProperty(prop); |
|
|
AutoMutex mtx(_queueMutex); | AutoMutex mtx(_queueMutex); |
|
Sint64 sequenceNumber = getSequenceNumber(); |
|
if ((idx = indication.findProperty(_PROPERTY_SEQUENCENUMBER)) |
|
!= PEG_NOT_FOUND) |
|
{ |
|
prop = indication.getProperty(idx); |
|
prop.setValue(sequenceNumber); |
|
indication.removeProperty(idx); |
|
} |
|
else |
|
{ |
|
prop = CIMProperty( |
|
_PROPERTY_SEQUENCENUMBER, |
|
sequenceNumber); |
|
} |
|
indication.addProperty(prop); |
|
|
|
IndicationInfo *info = new IndicationInfo( |
|
message->indicationInstance, |
|
message->subscriptionInstance, |
|
message->operationContext, |
|
message->nameSpace.getString(), |
|
this); |
_queue.insert_back(info); | _queue.insert_back(info); |
| |
info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec(); |
info->lastDeliveryRetryTimeUsec = 0; |
info->arrivalTimeUsec = info->lastDeliveryRetryTimeUsec; |
info->arrivalTimeUsec = System::getCurrentTimeUsec(); |
| |
if (_queue.size() > _maxIndicationDeliveryQueueSize) | if (_queue.size() > _maxIndicationDeliveryQueueSize) |
{ | { |
_queueFullDroppedIndications++; | _queueFullDroppedIndications++; |
IndicationInfo *temp = _queue.remove_front(); | IndicationInfo *temp = _queue.remove_front(); |
_traceDiscardedIndication( |
_logDiscardedIndication( |
DESTINATIONQUEUE_FULL, | DESTINATIONQUEUE_FULL, |
temp->indication); | temp->indication); |
delete temp; | delete temp; |
} | } |
|
|
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
} | } |
| |
|
|
_lastDeliveryRetryStatus = FAIL; | _lastDeliveryRetryStatus = FAIL; |
info->deliveryRetryAttemptsMade++; | info->deliveryRetryAttemptsMade++; |
| |
if (info->deliveryRetryAttemptsMade >= _maxDeliveryRetryAttempts) |
// Check for DeliveryRetryAttempts by adding the original delivery attempt. |
|
if (info->deliveryRetryAttemptsMade >= _maxDeliveryRetryAttempts + 1) |
{ | { |
_retryAttemptsExceededIndications++; | _retryAttemptsExceededIndications++; |
_traceDiscardedIndication( |
_logDiscardedIndication( |
DRA_EXCEEDED, | DRA_EXCEEDED, |
info->indication); |
info->indication, |
|
e.getMessage()); |
delete info; | delete info; |
PEG_METHOD_EXIT(); |
|
return; |
|
} | } |
|
else if (_queue.size() >= _maxIndicationDeliveryQueueSize) |
if (_queue.size() == _maxIndicationDeliveryQueueSize) |
|
{ | { |
_queueFullDroppedIndications++; | _queueFullDroppedIndications++; |
_traceDiscardedIndication( |
_logDiscardedIndication( |
DESTINATIONQUEUE_FULL, | DESTINATIONQUEUE_FULL, |
info->indication); | info->indication); |
PEG_METHOD_EXIT(); |
|
delete info; | delete info; |
} | } |
else | else |
{ | { |
info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec(); |
// To deliver the indications in the correct order, insert the |
_queue.insert_back(info); |
// delivery retry failed indications at the front of the queue. |
} |
_queue.insert_front(info); |
|
PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1, |
PEG_TRACE((TRC_IND_HANDLER, Tracer::LEVEL4, |
|
"Delivery failure for indication with SequenceContext %s and " | "Delivery failure for indication with SequenceContext %s and " |
"SequenceNumber %" PEGASUS_64BIT_CONVERSION_WIDTH "d." | "SequenceNumber %" PEGASUS_64BIT_CONVERSION_WIDTH "d." |
" DeliveryRetryAttempts made %u. Exception : %s", | " DeliveryRetryAttempts made %u. Exception : %s", |
|
|
_getSequenceNumber(info->indication), | _getSequenceNumber(info->indication), |
info->deliveryRetryAttemptsMade, | info->deliveryRetryAttemptsMade, |
(const char*)e.getMessage().getCString())); | (const char*)e.getMessage().getCString())); |
|
info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec(); |
|
} |
| |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
} | } |
|
|
if (info->subscription.getPath().identical(subscriptionPath)) | if (info->subscription.getPath().identical(subscriptionPath)) |
{ | { |
_subscriptionDeleteDroppedIndications++; | _subscriptionDeleteDroppedIndications++; |
_traceDiscardedIndication( |
_logDiscardedIndication( |
SUBSCRIPTION_NOT_ACTIVE, | SUBSCRIPTION_NOT_ACTIVE, |
info->indication); | info->indication); |
delete info; | delete info; |
|
|
IndicationInfo *info; | IndicationInfo *info; |
while ((info = _queue.remove_front())) | while ((info = _queue.remove_front())) |
{ | { |
_traceDiscardedIndication( |
_logDiscardedIndication( |
reasonCode, | reasonCode, |
info->indication); | info->indication); |
delete info; | delete info; |
|
|
{ | { |
AutoMutex mtx(_queueMutex); | AutoMutex mtx(_queueMutex); |
| |
nextIndDRIExpTimeUsec = 0; |
|
|
|
if (!_queue.size() || _lastDeliveryRetryStatus == PENDING) | if (!_queue.size() || _lastDeliveryRetryStatus == PENDING) |
{ | { |
|
// Maximum expiration time is equals to DeliveryRetryInterval. |
|
nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec; |
return 0; | return 0; |
} | } |
| |
|
nextIndDRIExpTimeUsec = 0; |
|
|
IndicationInfo *info; | IndicationInfo *info; |
| |
while (_queue.size()) | while (_queue.size()) |
|
|
{ | { |
_lifetimeExpiredIndications++; | _lifetimeExpiredIndications++; |
IndicationInfo *temp = _queue.remove_front(); | IndicationInfo *temp = _queue.remove_front(); |
_traceDiscardedIndication( |
_logDiscardedIndication( |
SIL_EXPIRED, | SIL_EXPIRED, |
temp->indication); | temp->indication); |
delete temp; | delete temp; |
|
|
_queue.remove_front(); | _queue.remove_front(); |
IndicationInfo *temp = _queue.front(); | IndicationInfo *temp = _queue.front(); |
| |
|
// The following algorithm is used to determine the elapsed |
|
// DeliveryRetryAttempts. To deliver the indication in order, |
|
// Server delays the newer indications until older indications |
|
// in the queue are attempted for delivery and their |
|
// DeliveryRetyAttempts are exceeded. The following algorithm |
|
// ensures that indications won't stay in the queue more than |
|
// (DeliveryRetryInterval * (DeliveryRetyAttempts + 1) time. |
|
Uint32 elapsedDeliveryRetryAttempts; |
|
if (info->lastDeliveryRetryTimeUsec) |
|
{ |
|
elapsedDeliveryRetryAttempts = |
|
((timeNowUsec - info->lastDeliveryRetryTimeUsec) |
|
/ _minDeliveryRetryIntervalUsec); |
|
} |
|
else |
|
{ |
|
elapsedDeliveryRetryAttempts = |
|
((timeNowUsec - info->arrivalTimeUsec) |
|
/ _minDeliveryRetryIntervalUsec); |
|
} |
|
|
|
if (elapsedDeliveryRetryAttempts) |
|
{ |
|
info->deliveryRetryAttemptsMade += |
|
elapsedDeliveryRetryAttempts - 1; |
|
} |
|
|
if (temp) | if (temp) |
{ | { |
if (timeNowUsec - temp->lastDeliveryRetryTimeUsec | if (timeNowUsec - temp->lastDeliveryRetryTimeUsec |