version 1.25, 2008/06/19 17:57:04
|
version 1.26, 2008/08/14 17:44:28
|
|
|
"Unloading consumer " + consumersToUnload[i]->getName()); | "Unloading consumer " + consumersToUnload[i]->getName()); |
| |
//wait for the consumer worker thread to end | //wait for the consumer worker thread to end |
try |
|
{ |
|
Semaphore* _shutdownSemaphore = | Semaphore* _shutdownSemaphore = |
consumersToUnload[i]->getShutdownSemaphore(); | consumersToUnload[i]->getShutdownSemaphore(); |
if (_shutdownSemaphore) |
if (_shutdownSemaphore && !_shutdownSemaphore->time_wait(10000)) |
{ |
|
_shutdownSemaphore->time_wait(10000); |
|
} |
|
|
|
} catch (TimeOut &) |
|
{ | { |
PEG_TRACE_CSTRING( |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, |
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"Timed out while attempting to stop consumer thread."); | "Timed out while attempting to stop consumer thread."); |
} | } |
| |
|
|
| |
while (true) | while (true) |
{ | { |
try |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, |
{ |
|
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"_worker_routine::waiting " + name); | "_worker_routine::waiting " + name); |
| |
//wait to be signalled | //wait to be signalled |
myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE); |
if (!myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE)) |
|
{ |
|
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, |
|
"_worker_routine::Time to retry any outstanding indications."); |
| |
PEG_TRACE_STRING( |
// signal the queue in the same way we would, |
TRC_LISTENER, |
// if we received a new indication |
Tracer::LEVEL4, |
// this allows the thread to fall into the queue processing code |
|
myself->_check_queue->signal(); |
|
|
|
continue; |
|
} |
|
|
|
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, |
"_worker_routine::signalled " + name); | "_worker_routine::signalled " + name); |
| |
//check whether we received the shutdown signal | //check whether we received the shutdown signal |
if (myself->_dieNow) | if (myself->_dieNow) |
{ | { |
PEG_TRACE_STRING( |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, |
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"_worker_routine::shutdown received " + name); | "_worker_routine::shutdown received " + name); |
break; | break; |
} | } |
|
|
continue; | continue; |
} | } |
| |
PEG_TRACE_STRING( |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, |
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"_worker_routine::consumeIndication " + name); | "_worker_routine::consumeIndication " + name); |
| |
try | try |
|
|
event->getURL(), | event->getURL(), |
event->getIndicationInstance()); | event->getIndicationInstance()); |
| |
PEG_TRACE_STRING( |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, |
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"_worker_routine::processed indication successfully. " | "_worker_routine::processed indication successfully. " |
+ name); | + name); |
| |
delete event; | delete event; |
continue; | continue; |
|
} |
} catch (CIMException & ce) |
catch (CIMException & ce) |
{ | { |
//check for failure | //check for failure |
if (ce.getCode() == CIM_ERR_FAILED) | if (ce.getCode() == CIM_ERR_FAILED) |
{ | { |
PEG_TRACE_STRING( |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, |
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"_worker_routine::consumeIndication() temporary" | "_worker_routine::consumeIndication() temporary" |
" failure: " + ce.getMessage() + " " + name); | " failure: " + ce.getMessage() + " " + name); |
| |
|
|
//determine if we have hit the max retry count | //determine if we have hit the max retry count |
if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT) | if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT) |
{ | { |
PEG_TRACE_CSTRING( |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1, |
TRC_LISTENER, |
|
Tracer::LEVEL1, |
|
"Error: the maximum retry count has been " | "Error: the maximum retry count has been " |
"exceeded. Removing the event from " | "exceeded. Removing the event from " |
"the queue."); | "the queue."); |
|
|
| |
delete event; | delete event; |
continue; | continue; |
|
} |
} else |
else |
{ | { |
PEG_TRACE_CSTRING( |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, |
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"_worker_routine::placing failed indication " | "_worker_routine::placing failed indication " |
"back in queue"); | "back in queue"); |
tmpEventQueue.insert_back(event); | tmpEventQueue.insert_back(event); |
} | } |
|
} |
} else |
else |
{ | { |
PEG_TRACE_STRING( |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL1, |
TRC_LISTENER, |
|
Tracer::LEVEL1, |
|
"Error: consumeIndication() permanent failure: " | "Error: consumeIndication() permanent failure: " |
+ ce.getMessage()); | + ce.getMessage()); |
delete event; | delete event; |
continue; | continue; |
} | } |
|
} |
} catch (Exception & ex) |
catch (Exception & ex) |
{ | { |
PEG_TRACE_STRING( |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL1, |
TRC_LISTENER, |
|
Tracer::LEVEL1, |
|
"Error: consumeIndication() permanent failure: " | "Error: consumeIndication() permanent failure: " |
+ ex.getMessage()); | + ex.getMessage()); |
delete event; | delete event; |
continue; | continue; |
|
} |
} catch (...) |
catch (...) |
{ | { |
PEG_TRACE_CSTRING( |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1, |
TRC_LISTENER, |
"Error: consumeIndication() failed: Unknown exception."); |
Tracer::LEVEL1, |
|
"Error: consumeIndication() failed: " |
|
"Unknown exception."); |
|
delete event; | delete event; |
continue; | continue; |
} //end try | } //end try |
|
|
| |
} | } |
myself->_eventqueue.unlock(); | myself->_eventqueue.unlock(); |
|
|
} catch (TimeOut&) |
|
{ |
|
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"_worker_routine::Time to retry any outstanding indications."); |
|
|
|
// signal the queue in the same way we would, |
|
// if we received a new indication |
|
// this allows the thread to fall into the queue processing code |
|
myself->_check_queue->signal(); |
|
|
|
} //time_wait |
|
|
|
|
|
} //shutdown | } //shutdown |
| |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |