(file) Return to ConsumerManager.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

Diff for /pegasus/src/Pegasus/DynListener/ConsumerManager.cpp between version 1.9 and 1.10

version 1.9, 2005/10/31 17:39:17 version 1.10, 2005/11/01 20:44:54
Line 853 
Line 853 
  * could be minimal.  We could potentially proceed to process the retries after a very small time interval since  * could be minimal.  We could potentially proceed to process the retries after a very small time interval since
  * we would never hit the wait for the retry timeout.  * we would never hit the wait for the retry timeout.
  *  *
  * ATTN: Outstanding issue with this strategy -- 20 new indications come in, 19 of them come in before the first one  
  * is processed.  Because new indications are first in, first out, the 19 indications will be processed in reverse order.  
  * Is this a problem? Short answer - NO.  They could arrive in reverse order anyways depending on the network stack.  
  *  
  */  */
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param)
 { {
Line 868 
Line 864 
  
     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);
  
     PEGASUS_STD(cout) << "Worker thread started for consumer : " << name << endl;  
   
     myself->_listeningSemaphore->signal();     myself->_listeningSemaphore->signal();
  
     while (true)     while (true)
Line 890 
Line 884 
                 break;                 break;
             }             }
  
             //signal must have been due to an incoming event  
             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::indication received " + name);  
   
             //create a temporary queue to store failed indications             //create a temporary queue to store failed indications
             tmpEventQueue.empty_list();             tmpEventQueue.empty_list();
  
             //continue processing events until the queue is empty             //continue processing events until the queue is empty
             //make sure to check for the shutdown signal before every iteration             //make sure to check for the shutdown signal before every iteration
               // Note that any time during our processing of events the Listener may be enqueueing NEW events for us to process.
               // Because we are popping off the front and new events are being thrown on the back if events are failing when we start
               // But are succeeding by the end of the processing, events may be sent out of chronological order.
               // However. Once we complete the current queue of events, we will always send old events to be retried before sending any
               // new events added afterwards.
             while (myself->_eventqueue.size())             while (myself->_eventqueue.size())
             {             {
                 //check for shutdown signal                 //check for shutdown signal
Line 920 
Line 915 
                     continue;                     continue;
                 }                 }
  
                 //check retry status. do not retry until the retry time has elapsed  
                 if (event->getRetries() > 0)  
                 {  
                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Last attempt time from event is " + event->getLastAttemptTime().toString());  
                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Current time is " + CIMDateTime::getCurrentDateTime().toString());  
                     Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(),  
                                                                                  CIMDateTime::getCurrentDateTime());  
   
                     if (differenceInMicroseconds < (DEFAULT_RETRY_LAPSE * 1000))  
                     {  
                         //do not retry; just add to the retry queue  
                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::cannot retry event until retry time has elapsed");  
                         tmpEventQueue.insert_last(event);  
                         continue;  
                     }  
                 }  
   
                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);
  
                 try                 try
Line 957 
Line 935 
                     {                     {
                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);
  
                         //update event parameters                          // Here we simply determine if we should increment the retry count or not.
                           // We don't want to count a forced retry from a new event to count as a retry. We just have to do it for
                           // order's sake. If the retry Lapse has lapsed on this event then increment the counter.
                           if (event->getRetries() > 0) {
                               Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), CIMDateTime::getCurrentDateTime());
                               if (differenceInMicroseconds >= (DEFAULT_RETRY_LAPSE * 1000))
                         event->increaseRetries();                         event->increaseRetries();
                           } else {
                               event->increaseRetries();
                           }
  
                         //determine if we have hit the max retry count                         //determine if we have hit the max retry count
                         if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)                         if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
Line 1004 
Line 990 
  
             } //while eventqueue             } //while eventqueue
  
             //copy the failed indications back to the main queue              // Copy the failed indications back to the main queue
             //since we are always adding the failed indications to the back, it should not interfere with the              // We now lock the queue while adding the retries on to the queue so that new events can't get in in front
             //dispatcher adding events to the front              // Of those events we are retrying. Retried events happened before any new events coming in.
   
             //there is no = operator for DQueue so we must do it manually  
             IndicationDispatchEvent* tmpEvent = 0;             IndicationDispatchEvent* tmpEvent = 0;
               myself->_eventqueue.try_lock();
             while (tmpEventQueue.size())             while (tmpEventQueue.size())
             {             {
                   //there is no = operator for DQueue so we must do it manually
                 tmpEvent = tmpEventQueue.remove_first();                 tmpEvent = tmpEventQueue.remove_first();
                 myself->_eventqueue.insert_last(tmpEvent);                  myself->_eventqueue.insert_last_no_lock(tmpEvent);
   
             }             }
               myself->_eventqueue.unlock();
  
         } catch (TimeOut& te)         } catch (TimeOut& te)
         {         {
             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::Time to retry any outstanding indications.");              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::Time to retry any outstanding indications.");
  
             //signal the queue in the same way we would if we received a new indication             //signal the queue in the same way we would if we received a new indication
             //this allows the thread to fall into the queue processing code             //this allows the thread to fall into the queue processing code


Legend:
Removed from v.1.9  
changed lines
  Added in v.1.10

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2