version 1.9, 2005/10/31 17:39:17
|
version 1.10, 2005/11/01 20:44:54
|
|
|
* could be minimal. We could potentially proceed to process the retries after a very small time interval since | * could be minimal. We could potentially proceed to process the retries after a very small time interval since |
* we would never hit the wait for the retry timeout. | * we would never hit the wait for the retry timeout. |
* | * |
* ATTN: Outstanding issue with this strategy -- 20 new indications come in, 19 of them come in before the first one |
|
* is processed. Because new indications are first in, first out, the 19 indications will be processed in reverse order. |
|
* Is this a problem? Short answer - NO. They could arrive in reverse order anyways depending on the network stack. |
|
* |
|
*/ | */ |
PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param) | PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param) |
{ | { |
|
|
| |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name); | PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name); |
| |
PEGASUS_STD(cout) << "Worker thread started for consumer : " << name << endl; |
|
|
|
myself->_listeningSemaphore->signal(); | myself->_listeningSemaphore->signal(); |
| |
while (true) | while (true) |
|
|
break; | break; |
} | } |
| |
//signal must have been due to an incoming event |
|
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::indication received " + name); |
|
|
|
//create a temporary queue to store failed indications | //create a temporary queue to store failed indications |
tmpEventQueue.empty_list(); | tmpEventQueue.empty_list(); |
| |
//continue processing events until the queue is empty | //continue processing events until the queue is empty |
//make sure to check for the shutdown signal before every iteration | //make sure to check for the shutdown signal before every iteration |
|
// Note that any time during our processing of events the Listener may be enqueueing NEW events for us to process. |
|
// Because we are popping off the front and new events are being thrown on the back if events are failing when we start |
|
// But are succeeding by the end of the processing, events may be sent out of chronological order. |
|
// However. Once we complete the current queue of events, we will always send old events to be retried before sending any |
|
// new events added afterwards. |
while (myself->_eventqueue.size()) | while (myself->_eventqueue.size()) |
{ | { |
//check for shutdown signal | //check for shutdown signal |
|
|
continue; | continue; |
} | } |
| |
//check retry status. do not retry until the retry time has elapsed |
|
if (event->getRetries() > 0) |
|
{ |
|
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Last attempt time from event is " + event->getLastAttemptTime().toString()); |
|
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Current time is " + CIMDateTime::getCurrentDateTime().toString()); |
|
Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), |
|
CIMDateTime::getCurrentDateTime()); |
|
|
|
if (differenceInMicroseconds < (DEFAULT_RETRY_LAPSE * 1000)) |
|
{ |
|
//do not retry; just add to the retry queue |
|
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::cannot retry event until retry time has elapsed"); |
|
tmpEventQueue.insert_last(event); |
|
continue; |
|
} |
|
} |
|
|
|
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name); | PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name); |
| |
try | try |
|
|
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name); | PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name); |
| |
//update event parameters |
// Here we simply determine if we should increment the retry count or not. |
|
// We don't want to count a forced retry from a new event to count as a retry. We just have to do it for |
|
// order's sake. If the retry Lapse has lapsed on this event then increment the counter. |
|
if (event->getRetries() > 0) { |
|
Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), CIMDateTime::getCurrentDateTime()); |
|
if (differenceInMicroseconds >= (DEFAULT_RETRY_LAPSE * 1000)) |
event->increaseRetries(); | event->increaseRetries(); |
|
} else { |
|
event->increaseRetries(); |
|
} |
| |
//determine if we have hit the max retry count | //determine if we have hit the max retry count |
if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT) | if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT) |
|
|
| |
} //while eventqueue | } //while eventqueue |
| |
//copy the failed indications back to the main queue |
// Copy the failed indications back to the main queue |
//since we are always adding the failed indications to the back, it should not interfere with the |
// We now lock the queue while adding the retries on to the queue so that new events can't get in in front |
//dispatcher adding events to the front |
// Of those events we are retrying. Retried events happened before any new events coming in. |
|
|
//there is no = operator for DQueue so we must do it manually |
|
IndicationDispatchEvent* tmpEvent = 0; | IndicationDispatchEvent* tmpEvent = 0; |
|
myself->_eventqueue.try_lock(); |
while (tmpEventQueue.size()) | while (tmpEventQueue.size()) |
{ | { |
|
//there is no = operator for DQueue so we must do it manually |
tmpEvent = tmpEventQueue.remove_first(); | tmpEvent = tmpEventQueue.remove_first(); |
myself->_eventqueue.insert_last(tmpEvent); |
myself->_eventqueue.insert_last_no_lock(tmpEvent); |
|
|
} | } |
|
myself->_eventqueue.unlock(); |
| |
} catch (TimeOut& te) | } catch (TimeOut& te) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::Time to retry any outstanding indications."); |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::Time to retry any outstanding indications."); |
| |
//signal the queue in the same way we would if we received a new indication | //signal the queue in the same way we would if we received a new indication |
//this allows the thread to fall into the queue processing code | //this allows the thread to fall into the queue processing code |