(file) Return to DestinationQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / HandlerService

Diff for /pegasus/src/Pegasus/HandlerService/DestinationQueue.cpp between version 1.1 and 1.1.2.1

version 1.1, 2010/07/05 08:58:36 version 1.1.2.1, 2011/10/11 18:18:20
Line 215 
Line 215 
     PEG_METHOD_ENTER(TRC_IND_HANDLER,     PEG_METHOD_ENTER(TRC_IND_HANDLER,
         "DestinationQueue::~DestinationQueue");         "DestinationQueue::~DestinationQueue");
  
     if (!isIdle())      if (_queue.size() || _lastDeliveryRetryStatus == PENDING)
     {     {
         _cleanup(LISTENER_NOT_ACTIVE);         _cleanup(LISTENER_NOT_ACTIVE);
     }     }
Line 223 
Line 223 
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 } }
  
 Boolean DestinationQueue::isIdle()  
 {  
     AutoMutex mtx(_queueMutex);  
     return _queue.size() == 0 && _lastDeliveryRetryStatus != PENDING;  
 }  
   
 Sint64 DestinationQueue::getSequenceNumber() Sint64 DestinationQueue::getSequenceNumber()
 { {
     AutoMutex mtx(_queueMutex);     AutoMutex mtx(_queueMutex);
Line 267 
Line 261 
     return nextSequenceNumber;     return nextSequenceNumber;
 } }
  
 void DestinationQueue::updateLastSuccessfulDeliveryTime()  
 {  
     AutoMutex mtx(_queueMutex);  
     _lastSuccessfulDeliveryTimeUsec = System::getCurrentTimeUsec();  
 }  
   
 String DestinationQueue::_getSequenceContext( String DestinationQueue::_getSequenceContext(
     const CIMInstance &indication)     const CIMInstance &indication)
 { {
Line 297 
Line 285 
     return sequenceNumber;     return sequenceNumber;
 } }
  
 void DestinationQueue::_traceDiscardedIndication(  void DestinationQueue::_logDiscardedIndication(
     Uint32 reasonCode, const CIMInstance &indication)      Uint32 reasonCode,
       const CIMInstance &indication,
       const String &message)
 { {
  
     PEGASUS_ASSERT(reasonCode <     PEGASUS_ASSERT(reasonCode <
         sizeof(_indDiscardedReasons)/sizeof(const char*));         sizeof(_indDiscardedReasons)/sizeof(const char*));
  
     PEG_TRACE((TRC_IND_HANDLER, Tracer::LEVEL3,      String logMessage = _indDiscardedReasons[reasonCode];
         "%s, Indication with SequenceContext %s and SequenceNumber %"      logMessage.append(Char16('.'));
             PEGASUS_64BIT_CONVERSION_WIDTH "d is discarded",      logMessage.append(message);
         _indDiscardedReasons[reasonCode],      Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING,
           MessageLoaderParms(
               "HandlerService.DestinationQueue.INDICATION_DELIVERY_FAILED",
               "Failed to deliver an indication with SequenceContext \"$0\" "
                   "and SequenceNumber \"$1\" : $2.",
         (const char*)_getSequenceContext(indication).getCString(),         (const char*)_getSequenceContext(indication).getCString(),
         _getSequenceNumber(indication)));              _getSequenceNumber(indication),
               (const char*)logMessage.getCString()));
 } }
  
 void DestinationQueue::enqueue(IndicationInfo *info)  void DestinationQueue::enqueue(CIMHandleIndicationRequestMessage *message)
 { {
     PEG_METHOD_ENTER(TRC_IND_HANDLER,     PEG_METHOD_ENTER(TRC_IND_HANDLER,
         "DestinationQueue::enqueue");         "DestinationQueue::enqueue");
  
       Uint32 idx;
       CIMProperty prop;
       CIMInstance &indication = message->indicationInstance;
   
       if ((idx = indication.findProperty(_PROPERTY_SEQUENCECONTEXT))
           != PEG_NOT_FOUND)
       {
           prop = indication.getProperty(idx);
           prop.setValue(getSequenceContext());
           indication.removeProperty(idx);
       }
       else
       {
           prop = CIMProperty(
               _PROPERTY_SEQUENCECONTEXT,
               getSequenceContext());
       }
       indication.addProperty(prop);
   
     AutoMutex mtx(_queueMutex);     AutoMutex mtx(_queueMutex);
       Sint64 sequenceNumber = getSequenceNumber();
       if ((idx = indication.findProperty(_PROPERTY_SEQUENCENUMBER))
           != PEG_NOT_FOUND)
       {
           prop = indication.getProperty(idx);
           prop.setValue(sequenceNumber);
           indication.removeProperty(idx);
       }
       else
       {
           prop = CIMProperty(
               _PROPERTY_SEQUENCENUMBER,
               sequenceNumber);
       }
       indication.addProperty(prop);
   
       IndicationInfo *info = new IndicationInfo(
           message->indicationInstance,
           message->subscriptionInstance,
           message->operationContext,
           message->nameSpace.getString(),
           this);
     _queue.insert_back(info);     _queue.insert_back(info);
  
     info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec();      info->lastDeliveryRetryTimeUsec = 0;
     info->arrivalTimeUsec = info->lastDeliveryRetryTimeUsec;      info->arrivalTimeUsec = System::getCurrentTimeUsec();
  
     if (_queue.size() > _maxIndicationDeliveryQueueSize)     if (_queue.size() > _maxIndicationDeliveryQueueSize)
     {     {
         _queueFullDroppedIndications++;         _queueFullDroppedIndications++;
         IndicationInfo *temp = _queue.remove_front();         IndicationInfo *temp = _queue.remove_front();
         _traceDiscardedIndication(          _logDiscardedIndication(
             DESTINATIONQUEUE_FULL,             DESTINATIONQUEUE_FULL,
             temp->indication);             temp->indication);
         delete temp;         delete temp;
     }     }
   
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 } }
  
Line 369 
Line 406 
     _lastDeliveryRetryStatus = FAIL;     _lastDeliveryRetryStatus = FAIL;
     info->deliveryRetryAttemptsMade++;     info->deliveryRetryAttemptsMade++;
  
     if (info->deliveryRetryAttemptsMade >= _maxDeliveryRetryAttempts)      // Check for DeliveryRetryAttempts by adding the original delivery attempt.
       if (info->deliveryRetryAttemptsMade >= _maxDeliveryRetryAttempts + 1)
     {     {
         _retryAttemptsExceededIndications++;         _retryAttemptsExceededIndications++;
         _traceDiscardedIndication(          _logDiscardedIndication(
             DRA_EXCEEDED,             DRA_EXCEEDED,
             info->indication);              info->indication,
               e.getMessage());
         delete info;         delete info;
         PEG_METHOD_EXIT();  
         return;  
     }     }
       else if (_queue.size() >= _maxIndicationDeliveryQueueSize)
     if (_queue.size() == _maxIndicationDeliveryQueueSize)  
     {     {
         _queueFullDroppedIndications++;         _queueFullDroppedIndications++;
         _traceDiscardedIndication(          _logDiscardedIndication(
             DESTINATIONQUEUE_FULL,             DESTINATIONQUEUE_FULL,
             info->indication);             info->indication);
         PEG_METHOD_EXIT();  
         delete info;         delete info;
     }     }
     else     else
     {     {
         info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec();          // To deliver the indications in the correct order, insert the
         _queue.insert_back(info);          // delivery retry failed indications at the front of the queue.
     }          _queue.insert_front(info);
           PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
     PEG_TRACE((TRC_IND_HANDLER, Tracer::LEVEL4,  
         "Delivery failure for indication with SequenceContext %s and "         "Delivery failure for indication with SequenceContext %s and "
             "SequenceNumber %" PEGASUS_64BIT_CONVERSION_WIDTH "d."             "SequenceNumber %" PEGASUS_64BIT_CONVERSION_WIDTH "d."
             " DeliveryRetryAttempts made %u. Exception : %s",             " DeliveryRetryAttempts made %u. Exception : %s",
Line 403 
Line 437 
         _getSequenceNumber(info->indication),         _getSequenceNumber(info->indication),
         info->deliveryRetryAttemptsMade,         info->deliveryRetryAttemptsMade,
         (const char*)e.getMessage().getCString()));         (const char*)e.getMessage().getCString()));
           info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec();
       }
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 } }
Line 444 
Line 480 
         if (info->subscription.getPath().identical(subscriptionPath))         if (info->subscription.getPath().identical(subscriptionPath))
         {         {
             _subscriptionDeleteDroppedIndications++;             _subscriptionDeleteDroppedIndications++;
             _traceDiscardedIndication(              _logDiscardedIndication(
                 SUBSCRIPTION_NOT_ACTIVE,                 SUBSCRIPTION_NOT_ACTIVE,
                 info->indication);                 info->indication);
             delete info;             delete info;
Line 484 
Line 520 
     IndicationInfo *info;     IndicationInfo *info;
     while ((info = _queue.remove_front()))     while ((info = _queue.remove_front()))
     {     {
         _traceDiscardedIndication(          _logDiscardedIndication(
             reasonCode,             reasonCode,
             info->indication);             info->indication);
         delete info;         delete info;
Line 496 
Line 532 
 { {
     AutoMutex mtx(_queueMutex);     AutoMutex mtx(_queueMutex);
  
     nextIndDRIExpTimeUsec = 0;  
   
     if (!_queue.size() || _lastDeliveryRetryStatus == PENDING)     if (!_queue.size() || _lastDeliveryRetryStatus == PENDING)
     {     {
           // Maximum expiration time is equals to DeliveryRetryInterval.
           nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec;
         return 0;         return 0;
     }     }
  
       nextIndDRIExpTimeUsec = 0;
   
     IndicationInfo *info;     IndicationInfo *info;
  
     while (_queue.size())     while (_queue.size())
Line 520 
Line 558 
         {         {
             _lifetimeExpiredIndications++;             _lifetimeExpiredIndications++;
             IndicationInfo *temp = _queue.remove_front();             IndicationInfo *temp = _queue.remove_front();
             _traceDiscardedIndication(              _logDiscardedIndication(
                 SIL_EXPIRED,                 SIL_EXPIRED,
                 temp->indication);                 temp->indication);
             delete temp;             delete temp;
Line 532 
Line 570 
             _queue.remove_front();             _queue.remove_front();
             IndicationInfo *temp = _queue.front();             IndicationInfo *temp = _queue.front();
  
               // The following algorithm is used to determine the elapsed
               // DeliveryRetryAttempts. To deliver the indication in order,
               // Server delays the newer indications until older indications
               // in the queue are attempted for delivery and their
               // DeliveryRetyAttempts are exceeded. The following algorithm
               // ensures that indications won't stay in the queue more than
               // (DeliveryRetryInterval * (DeliveryRetyAttempts + 1) time.
               Uint32 elapsedDeliveryRetryAttempts;
               if (info->lastDeliveryRetryTimeUsec)
               {
                   elapsedDeliveryRetryAttempts =
                       ((timeNowUsec - info->lastDeliveryRetryTimeUsec)
                           / _minDeliveryRetryIntervalUsec);
               }
               else
               {
                   elapsedDeliveryRetryAttempts =
                       ((timeNowUsec - info->arrivalTimeUsec)
                           / _minDeliveryRetryIntervalUsec);
               }
   
               if (elapsedDeliveryRetryAttempts)
               {
                   info->deliveryRetryAttemptsMade +=
                       elapsedDeliveryRetryAttempts - 1;
               }
   
             if (temp)             if (temp)
             {             {
                 if (timeNowUsec - temp->lastDeliveryRetryTimeUsec                 if (timeNowUsec - temp->lastDeliveryRetryTimeUsec


Legend:
Removed from v.1.1  
changed lines
  Added in v.1.1.2.1

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2