version 1.20, 2007/06/12 16:17:36
|
version 1.21, 2008/01/15 18:19:34
|
|
|
// | // |
//============================================================================== | //============================================================================== |
// | // |
// Author: Heather Sterling (hsterl@us.ibm.com) |
|
// |
|
// Modified By: |
|
// |
|
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
| |
#include <Pegasus/Common/Config.h> | #include <Pegasus/Common/Config.h> |
|
|
PEGASUS_NAMESPACE_BEGIN | PEGASUS_NAMESPACE_BEGIN |
PEGASUS_USING_STD; | PEGASUS_USING_STD; |
| |
//ATTN: Can we just use a properties file instead?? If we only have one property, we may want to just parse it ourselves. |
//ATTN: Can we just use a properties file instead?? If we only have one |
// We may need to add more properties, however. Potential per consumer properties: unloadOk, idleTimout, retryCount, etc |
// property, we may want to just parse it ourselves. |
|
// We may need to add more properties, however. Potential per consumer |
|
// properties: unloadOk, idleTimout, retryCount, etc |
static struct OptionRow optionsTable[] = | static struct OptionRow optionsTable[] = |
//optionname defaultvalue rqd type domain domainsize clname hlpmsg | //optionname defaultvalue rqd type domain domainsize clname hlpmsg |
{ | { |
{"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"}, |
{"location", "", false, Option::STRING, 0, 0, |
|
"location", "library name for the consumer"}, |
}; | }; |
| |
const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]); | const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]); |
|
|
static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5; | static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5; |
static const Uint32 DEFAULT_RETRY_LAPSE = 300000; //ms = 5 minutes | static const Uint32 DEFAULT_RETRY_LAPSE = 300000; //ms = 5 minutes |
| |
//constant for fake property that is added to instances when serializing to track the full URL |
//constant for fake property that is added to instances when |
|
// serializing to track the full URL |
static const String URL_PROPERTY = "URLString"; | static const String URL_PROPERTY = "URLString"; |
| |
| |
ConsumerManager::ConsumerManager(const String& consumerDir, const String& consumerConfigDir, Boolean enableConsumerUnload, Uint32 idleTimeout) : |
ConsumerManager::ConsumerManager( |
|
const String& consumerDir, |
|
const String& consumerConfigDir, |
|
Boolean enableConsumerUnload, |
|
Uint32 idleTimeout) : |
_consumerDir(consumerDir), | _consumerDir(consumerDir), |
_consumerConfigDir(consumerConfigDir), | _consumerConfigDir(consumerConfigDir), |
_enableConsumerUnload(enableConsumerUnload), | _enableConsumerUnload(enableConsumerUnload), |
|
|
{ | { |
PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager"); | PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager"); |
| |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer library directory: " + consumerDir); |
PEG_TRACE_STRING( |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer configuration directory: " + consumerConfigDir); |
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Consumer library directory: " + consumerDir); |
|
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Consumer configuration directory: " + consumerConfigDir); |
| |
PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4, | PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4, |
"Consumer unload enabled %d: idle timeout %d", | "Consumer unload enabled %d: idle timeout %d", |
|
|
| |
try | try |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!"); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Attempting to load indication for!" + |
|
consumerName + "!"); |
getConsumer(consumerName); | getConsumer(consumerName); |
| |
} catch (...) | } catch (...) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Cannot load consumer from file " + files[i]); |
} | } |
} | } |
} | } |
|
|
return _idleTimeout; | return _idleTimeout; |
} | } |
| |
/** Retrieves the library name associated with the consumer name. By default, the library name |
/** Retrieves the library name associated with the consumer name. |
* is the same as the consumer name. However, you may specify a different library name in a consumer |
* By default, the library name |
* configuration file. This file must be named "MyConsumer.txt" and contain the following: |
* is the same as the consumer name. However, you may specify a different |
* location="libraryName" |
* library name in a consumer configuration file. This file must be named |
|
* "MyConsumer.txt" and contain the following: location="libraryName" |
* | * |
* The config file is optional and is generally only needed in cases where there are strict requirements |
* The config file is optional and is generally only needed in cases where |
* on library naming. |
* there are strict requirements on library naming. |
* | * |
* It is the responsibility of the caller to catch any exceptions thrown by this method. |
* It is the responsibility of the caller to catch any exceptions |
|
* thrown by this method. |
*/ | */ |
String ConsumerManager::_getConsumerLibraryName(const String & consumerName) | String ConsumerManager::_getConsumerLibraryName(const String & consumerName) |
{ | { |
|
|
//default library name is consumer name | //default library name is consumer name |
String libraryName = consumerName; | String libraryName = consumerName; |
| |
//check whether an alternative library name was specified in an optional consumer config file |
// check whether an alternative library name was specified in an optional |
String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf")); |
// consumer config file |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile); |
String configFile = FileSystem::getAbsolutePath( |
|
(const char*)_consumerConfigDir.getCString(), |
|
String(consumerName + ".conf")); |
|
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Looking for config file " + configFile); |
| |
if (FileSystem::exists(configFile) && FileSystem::canRead(configFile)) | if (FileSystem::exists(configFile) && FileSystem::canRead(configFile)) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Found config file for consumer " + consumerName); |
| |
try | try |
{ | { |
//Bugzilla 3765 - Change this to use a member var when OptionManager has a reset option |
//Bugzilla 3765 - Change this to use a member var |
|
// when OptionManager has a reset option |
OptionManager _optionMgr; | OptionManager _optionMgr; |
_optionMgr.registerOptions(optionsTable, NUM_OPTIONS); //comment this line out later |
//comment the following line out later |
|
_optionMgr.registerOptions(optionsTable, NUM_OPTIONS); |
_optionMgr.mergeFile(configFile); | _optionMgr.mergeFile(configFile); |
_optionMgr.checkRequiredOptions(); | _optionMgr.checkRequiredOptions(); |
| |
if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY)) |
if (!_optionMgr.lookupValue("location", libraryName) || |
|
(libraryName == String::EMPTY)) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"Warning: Using default library name since none was " |
|
"specified in " + configFile); |
libraryName = consumerName; | libraryName = consumerName; |
} | } |
| |
} catch (Exception & ex) | } catch (Exception & ex) |
{ | { |
throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE", |
throw Exception( |
|
MessageLoaderParms( |
|
"DynListener.ConsumerManager.INVALID_CONFIG_FILE", |
"Error reading $0: $1.", | "Error reading $0: $1.", |
configFile, | configFile, |
ex.getMessage())); | ex.getMessage())); |
} | } |
} else | } else |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"No config file exists for " + consumerName); |
} | } |
| |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"The library name for " + consumerName + " is " + libraryName); |
| |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return libraryName; | return libraryName; |
} | } |
| |
/** Returns the DynamicConsumer for the consumerName. If it already exists, we return the one in the cache. If it |
/** Returns the DynamicConsumer for the consumerName. If it already exists, |
|
* we return the one in the cache. If it |
* DNE, we create it and initialize it, and add it to the table. | * DNE, we create it and initialize it, and add it to the table. |
* @throws Exception if we cannot successfully create and initialize the consumer |
* @throws Exception if we cannot successfully create and |
|
* initialize the consumer |
*/ | */ |
DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName) | DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName) |
{ | { |
|
|
| |
if (consumer && consumer->isLoaded()) | if (consumer && consumer->isLoaded()) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL3, |
|
"Consumer exists in the cache and" |
|
" is already loaded: " + consumerName); |
cached = true; | cached = true; |
} | } |
} else | } else |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL3, |
|
"Consumer not found in cache, creating " + consumerName); |
consumer = new DynamicConsumer(consumerName); | consumer = new DynamicConsumer(consumerName); |
//ATTN: The above is a memory leak if _initConsumer throws an exception | //ATTN: The above is a memory leak if _initConsumer throws an exception |
//need to delete it in that case | //need to delete it in that case |
|
|
} | } |
| |
/** Initializes a DynamicConsumer. | /** Initializes a DynamicConsumer. |
* Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist. |
* Caller assumes responsibility for mutexing the operation as well as |
|
* ensuring the consumer does not already exist. |
* @throws Exception if the consumer cannot be initialized | * @throws Exception if the consumer cannot be initialized |
*/ | */ |
void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer) |
void ConsumerManager::_initConsumer( |
|
const String& consumerName, |
|
DynamicConsumer* consumer) |
{ | { |
PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer"); | PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer"); |
| |
CIMIndicationConsumerProvider* base = 0; | CIMIndicationConsumerProvider* base = 0; |
ConsumerModule* module = 0; | ConsumerModule* module = 0; |
| |
//lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one) |
// lookup provider module in cache (if it exists, it returns |
|
// the cached module, otherwise it creates and returns a new one) |
String libraryName = _getConsumerLibraryName(consumerName); | String libraryName = _getConsumerLibraryName(consumerName); |
module = _lookupModule(libraryName); | module = _lookupModule(libraryName); |
| |
//build library path | //build library path |
String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName)); |
String libraryPath = FileSystem::getAbsolutePath( |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath); |
(const char*)_consumerDir.getCString(), |
|
FileSystem::buildLibraryFileName(libraryName)); |
|
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Loading library: " + libraryPath); |
| |
//load module | //load module |
try | try |
|
|
| |
} catch (Exception& ex) | } catch (Exception& ex) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage()); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE", |
Tracer::LEVEL2, |
|
"Error loading consumer module: " + ex.getMessage()); |
|
|
|
throw Exception( |
|
MessageLoaderParms( |
|
"DynListener.ConsumerManager.CANNOT_LOAD_MODULE", |
"Cannot load module ($0:$1): Unknown exception.", | "Cannot load module ($0:$1): Unknown exception.", |
consumerName, | consumerName, |
libraryName)); | libraryName)); |
} catch (...) | } catch (...) |
{ | { |
throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE", |
throw Exception( |
|
MessageLoaderParms( |
|
"DynListener.ConsumerManager.CANNOT_LOAD_MODULE", |
"Cannot load module ($0:$1): Unknown exception.", | "Cannot load module ($0:$1): Unknown exception.", |
consumerName, | consumerName, |
libraryName)); | libraryName)); |
} | } |
| |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Successfully loaded consumer module " + libraryName); |
| |
//initialize consumer | //initialize consumer |
try | try |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " + consumerName); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Initializing Consumer " + consumerName); |
| |
consumer->initialize(); | consumer->initialize(); |
| |
|
|
_worker_routine, | _worker_routine, |
semaphore) != PEGASUS_THREAD_OK) | semaphore) != PEGASUS_THREAD_OK) |
{ | { |
Logger::put(Logger::STANDARD_LOG, System::CIMLISTENER, Logger::TRACE, |
Logger::put( |
|
Logger::STANDARD_LOG, |
|
System::CIMLISTENER, |
|
Logger::TRACE, |
"Not enough threads for consumer."); | "Not enough threads for consumer."); |
| |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
"Could not allocate thread for consumer."); | "Could not allocate thread for consumer."); |
| |
consumer->setShutdownSemaphore(0); | consumer->setShutdownSemaphore(0); |
delete semaphore; | delete semaphore; |
throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD", |
throw Exception( |
|
MessageLoaderParms( |
|
"DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD", |
"Not enough threads for consumer worker routine.")); | "Not enough threads for consumer worker routine.")); |
} | } |
| |
//wait until the listening thread has started. Otherwise, there is a miniscule chance that the first event will be enqueued |
//wait until the listening thread has started. |
//before the consumer is waiting for it and the first indication after loading the consumer will be lost |
// Otherwise, there is a miniscule chance that the first event will |
|
// be enqueued before the consumer is waiting for it and the first |
|
// indication after loading the consumer will be lost |
consumer->waitForEventThread(); | consumer->waitForEventThread(); |
| |
//load any outstanding requests | //load any outstanding requests |
Array<IndicationDispatchEvent> outstandingIndications = _deserializeOutstandingIndications(consumerName); |
Array<IndicationDispatchEvent> outstandingIndications = |
|
_deserializeOutstandingIndications(consumerName); |
if (outstandingIndications.size()) | if (outstandingIndications.size()) |
{ | { |
//the consumer will signal itself in _loadOustandingIndications | //the consumer will signal itself in _loadOustandingIndications |
consumer->_loadOutstandingIndications(outstandingIndications); | consumer->_loadOutstandingIndications(outstandingIndications); |
} | } |
| |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Successfully initialized consumer " + consumerName); |
| |
} catch (...) | } catch (...) |
{ | { |
module->unloadModule(); | module->unloadModule(); |
consumer->reset(); | consumer->reset(); |
throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER", |
throw Exception( |
|
MessageLoaderParms( |
|
"DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER", |
"Cannot initialize consumer ($0).", | "Cannot initialize consumer ($0).", |
consumerName)); | consumerName)); |
} | } |
|
|
} | } |
| |
| |
/** Returns the ConsumerModule with the given library name. If it already exists, we return the one in the cache. If it |
/** Returns the ConsumerModule with the given library name. |
|
* If it already exists, we return the one in the cache. If it |
* DNE, we create it and add it to the table. | * DNE, we create it and add it to the table. |
* @throws Exception if we cannot successfully create and initialize the consumer |
* @throws Exception if we cannot successfully create and |
|
* initialize the consumer |
*/ | */ |
ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName) | ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName) |
{ | { |
|
|
//see if consumer module is cached | //see if consumer module is cached |
if (_modules.lookup(libraryName, module)) | if (_modules.lookup(libraryName, module)) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, |
PEG_TRACE_STRING( |
"Found Consumer Module" + libraryName + " in Consumer Manager Cache"); |
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Found Consumer Module" + |
|
libraryName + " in Consumer Manager Cache"); |
| |
} else | } else |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
"Creating Consumer Provider Module " + libraryName); | "Creating Consumer Provider Module " + libraryName); |
| |
module = new ConsumerModule(); | module = new ConsumerModule(); |
|
|
{ | { |
consumer = i.value(); | consumer = i.value(); |
| |
if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0)) |
if (consumer && |
{ |
consumer->isLoaded() && |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name); |
(consumer->getPendingIndications() > 0)) |
|
{ |
|
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Found active consumer: " + consumer->_name); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return true; | return true; |
} | } |
|
|
} catch (...) | } catch (...) |
{ | { |
// Unexpected exception; do not assume that no providers are loaded | // Unexpected exception; do not assume that no providers are loaded |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"Unexpected Exception in hasActiveConsumers."); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return true; | return true; |
} | } |
|
|
| |
if (consumer && consumer->isLoaded()) | if (consumer && consumer->isLoaded()) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Found loaded consumer: " + consumer->_name); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return true; | return true; |
} | } |
|
|
} catch (...) | } catch (...) |
{ | { |
// Unexpected exception; do not assume that no providers are loaded | // Unexpected exception; do not assume that no providers are loaded |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"Unexpected Exception in hasLoadedConsumers."); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return true; | return true; |
} | } |
|
|
| |
| |
/** Shutting down a consumer consists of four major steps: | /** Shutting down a consumer consists of four major steps: |
* 1) Send the shutdown signal. This causes the worker routine to break out of the loop and exit. |
* 1) Send the shutdown signal. This causes the worker routine to break out |
* 2) Wait for the worker thread to end. This may take a while if it's processing an indication. This |
* of the loop and exit. |
* is optional in a shutdown scenario. If the listener is shutdown with a -f force, the listener |
* 2) Wait for the worker thread to end. This may take a while if it's |
* will not wait for the consumer to finish before shutting down. Note that a normal shutdown only allows |
* processing an indication. This is optional in a shutdown scenario. |
* the current consumer indication to finish. All other queued indications are serialized to a log and |
* If the listener is shutdown with a -f force, the listener |
|
* will not wait for the consumer to finish before shutting down. |
|
* Note that a normal shutdown only allows the current consumer indication |
|
* to finish. All other queued indications are serialized to a log and |
* are sent when the consumer is reoaded. | * are sent when the consumer is reoaded. |
* 3) Terminate the consumer provider interface. | * 3) Terminate the consumer provider interface. |
* 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0) |
* 4) Decrement the module refcount (the module will automatically unload when |
|
* it's refcount == 0) |
* | * |
* In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all |
* In a scenario where more multiple consumers are loaded, the shutdown signal |
* of the consumers so the threads can finish simultaneously. |
* should be sent to all of the consumers so the threads can finish |
|
* simultaneously. |
* | * |
* ATTN: Should the normal shutdown wait for everything in the queue to be processed? Just new indications |
* ATTN: Should the normal shutdown wait for everything in the queue to be |
* to be processed? I am not inclined to this solution since it could take a LOT of time. By serializing |
* processed? Just new indications to be processed? I am not inclined to this |
* and deserialing indications between shutdown and startup, I feel like we do not need to process ALL |
* solution since it could take a LOT of time. By serializing and deserialing |
* queued indications on shutdown. |
* indications between shutdown and startup, I feel like we do not need to |
|
* process ALL queued indications on shutdown. |
*/ | */ |
| |
/** Unloads all consumers. | /** Unloads all consumers. |
|
|
| |
if (!_consumers.size()) | if (!_consumers.size()) |
{ | { |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"There are no consumers to unload."); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return; | return; |
} | } |
| |
if (!_forceShutdown) | if (!_forceShutdown) |
{ | { |
//wait until all the consumers have finished processing the events in their queue |
// wait until all the consumers have finished processing the events in |
|
// their queue |
//ATTN: Should this have a timeout even though it's a force?? | //ATTN: Should this have a timeout even though it's a force?? |
while (hasActiveConsumers()) | while (hasActiveConsumers()) |
{ | { |
|
|
| |
} catch (Exception&) | } catch (Exception&) |
{ | { |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Error unloading consumers."); |
} | } |
} else | } else |
{ | { |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"There are no consumers to unload."); |
} | } |
| |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
|
|
| |
if (!_consumers.size()) | if (!_consumers.size()) |
{ | { |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"There are no consumers to unload."); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return; | return; |
} | } |
|
|
| |
} catch (Exception&) | } catch (Exception&) |
{ | { |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Error unloading consumers."); |
} | } |
} else | } else |
{ | { |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"There are no consumers to unload."); |
} | } |
| |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
|
|
//check whether the consumer exists | //check whether the consumer exists |
if (!_consumers.lookup(consumerName, consumer)) | if (!_consumers.lookup(consumerName, consumer)) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL3, |
|
"Error: cannot unload consumer, unknown consumer " + consumerName); |
return; | return; |
} | } |
| |
|
|
| |
} catch (Exception&) | } catch (Exception&) |
{ | { |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Error unloading consumers."); |
} | } |
| |
} else | } else |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL3, |
|
"Error: cannot unload consumer " + consumerName); |
} | } |
| |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
|
|
/** Unloads the consumers in the given array. | /** Unloads the consumers in the given array. |
* The consumerTable mutex MUST be locked prior to entering this method. | * The consumerTable mutex MUST be locked prior to entering this method. |
*/ | */ |
void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload) |
void ConsumerManager::_unloadConsumers( |
|
Array<DynamicConsumer*> consumersToUnload) |
{ | { |
PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers"); | PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers"); |
| |
|
|
consumersToUnload[i]->sendShutdownSignal(); | consumersToUnload[i]->sendShutdownSignal(); |
} | } |
| |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers."); |
PEG_TRACE_CSTRING( |
|
#TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Sent shutdown signal to all consumers."); |
| |
//wait for all the consumer worker threads to complete | //wait for all the consumer worker threads to complete |
//since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last |
// since we can only shutdown after they are all complete, |
//consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop |
// it does not matter if the first, fifth, or last |
//processing its requests. |
// consumer takes the longest; the wait time is equal to the time it takes |
|
// for the busiest consumer to stop processing its requests. |
for (Uint32 i = 0; i < consumersToUnload.size(); i++) | for (Uint32 i = 0; i < consumersToUnload.size(); i++) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName()); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL3, |
|
"Unloading consumer " + consumersToUnload[i]->getName()); |
| |
//wait for the consumer worker thread to end | //wait for the consumer worker thread to end |
try | try |
{ | { |
Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore(); |
Semaphore* _shutdownSemaphore = |
|
consumersToUnload[i]->getShutdownSemaphore(); |
if (_shutdownSemaphore) | if (_shutdownSemaphore) |
{ | { |
_shutdownSemaphore->time_wait(10000); | _shutdownSemaphore->time_wait(10000); |
|
|
| |
} catch (TimeOut &) | } catch (TimeOut &) |
{ | { |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"Timed out while attempting to stop consumer thread."); |
} | } |
| |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"Terminating consumer."); |
| |
try | try |
{ | { |
|
|
consumersToUnload[i]->_module->unloadModule(); | consumersToUnload[i]->_module->unloadModule(); |
| |
//serialize outstanding indications | //serialize outstanding indications |
_serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications()); |
_serializeOutstandingIndications( |
|
consumersToUnload[i]->getName(), |
|
consumersToUnload[i]->_retrieveOutstandingIndications()); |
| |
//reset the consumer | //reset the consumer |
consumersToUnload[i]->reset(); | consumersToUnload[i]->reset(); |
| |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL3, |
|
"Consumer library successfully unloaded."); |
| |
} catch (Exception& e) | } catch (Exception& e) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage()); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"Error unloading consumer: " + e.getMessage()); |
//ATTN: throw exception? log warning? | //ATTN: throw exception? log warning? |
} | } |
} | } |
|
|
| |
/** Serializes oustanding indications to a <MyConsumer>.dat file | /** Serializes oustanding indications to a <MyConsumer>.dat file |
*/ | */ |
void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<IndicationDispatchEvent> indications) |
void ConsumerManager::_serializeOutstandingIndications( |
{ |
const String& consumerName, |
PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications"); |
Array<IndicationDispatchEvent> indications) |
|
{ |
|
PEG_METHOD_ENTER( |
|
TRC_LISTENER, |
|
"ConsumerManager::_serializeOutstandingIndications"); |
| |
if (!indications.size()) | if (!indications.size()) |
{ | { |
|
|
return; | return; |
} | } |
| |
String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat")); |
String fileName = FileSystem::getAbsolutePath( |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName); |
(const char*)_consumerConfigDir.getCString(), |
|
String(consumerName + ".dat")); |
|
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Consumer dat file: " + fileName); |
| |
Buffer buffer; | Buffer buffer; |
| |
|
|
| |
if (!fileHandle) | if (!fileHandle) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"Unable to open log file for " + consumerName); |
| |
} else | } else |
{ | { |
|
|
indications.size(), | indications.size(), |
(const char*)consumerName.getCString())); | (const char*)consumerName.getCString())); |
| |
//we have to put the array of instances under a valid root element or the parser complains |
// we have to put the array of instances under a valid root element |
|
// or the parser complains |
XmlWriter::append(buffer, "<IRETURNVALUE>\n"); | XmlWriter::append(buffer, "<IRETURNVALUE>\n"); |
| |
CIMInstance cimInstance; | CIMInstance cimInstance; |
|
|
| |
/** Reads outstanding indications from a <MyConsumer>.dat file | /** Reads outstanding indications from a <MyConsumer>.dat file |
*/ | */ |
Array<IndicationDispatchEvent> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName) |
Array<IndicationDispatchEvent> |
{ |
ConsumerManager::_deserializeOutstandingIndications( |
PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications"); |
const String& consumerName) |
|
{ |
String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat")); |
PEG_METHOD_ENTER( |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName); |
TRC_LISTENER, |
|
"ConsumerManager::_deserializeOutstandingIndications"); |
|
|
|
String fileName = FileSystem::getAbsolutePath( |
|
(const char*)_consumerConfigDir.getCString(), |
|
String(consumerName + ".dat")); |
|
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Consumer dat file: " + fileName); |
| |
Array<CIMInstance> cimInstances; | Array<CIMInstance> cimInstances; |
Array<String> urlStrings; | Array<String> urlStrings; |
|
|
| |
try | try |
{ | { |
FileSystem::loadFileToMemory(text, fileName); //ATTN: Is this safe to use; what about CRLFs? |
//ATTN: Is this safe to use; what about CRLFs? |
|
FileSystem::loadFileToMemory(text, fileName); |
| |
//parse the file | //parse the file |
XmlParser parser((char*)text.getData()); | XmlParser parser((char*)text.getData()); |
|
|
Uint32 index = cimInstance.findProperty(URL_PROPERTY); | Uint32 index = cimInstance.findProperty(URL_PROPERTY); |
if (index != PEG_NOT_FOUND) | if (index != PEG_NOT_FOUND) |
{ | { |
//get the URL string property from the serialized instance and remove the property |
// get the URL string property from the serialized instance |
|
// and remove the property |
cimProperty = cimInstance.getProperty(index); | cimProperty = cimInstance.getProperty(index); |
cimValue = cimProperty.getValue(); | cimValue = cimProperty.getValue(); |
cimValue.get(urlString); | cimValue.get(urlString); |
cimInstance.removeProperty(index); | cimInstance.removeProperty(index); |
} | } |
IndicationDispatchEvent* indicationEvent = new IndicationDispatchEvent(OperationContext(), urlString, cimInstance); |
IndicationDispatchEvent* indicationEvent = |
|
new IndicationDispatchEvent( |
|
OperationContext(), |
|
urlString, |
|
cimInstance); |
|
|
indications.append(*indicationEvent); | indications.append(*indicationEvent); |
} | } |
| |
|
|
| |
} catch (Exception& ex) | } catch (Exception& ex) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL3, |
|
"Error parsing dat file: " + |
|
ex.getMessage() + " " + consumerName); |
| |
} catch (...) | } catch (...) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"Error parsing dat file: Unknown Exception " + consumerName); |
} | } |
} | } |
| |
|
|
| |
| |
/** | /** |
* This is the main worker thread of the consumer. By having only one thread per consumer, we eliminate a ton |
* This is the main worker thread of the consumer. By having only one thread |
* of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive |
* per consumer, we eliminate a ton of synchronization issues and make it easy |
* operations at once. This also prevents one bad consumer from taking the entire listener down. That being said, |
* to prevent the consumer from performing two mutually exclusive operations |
* it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread. |
* at once. This also prevents one bad consumer from taking the entire |
|
* listener down. That being said, it is up to the programmer to write smart |
|
* consumers, and to ensure that their actions don't deadlock |
|
* the worker thread. |
* | * |
* If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to |
* If a consumer receives a lot of traffic, or it's consumeIndication() method |
* complete, it may make sense to make the consumer multi-threaded. The individual consumer can immediately spawn off |
* takes a considerable amount of time to complete, it may make sense to make |
* new threads to handle indications, and return immediately to catch the next indication. In this way, a consumer |
* the consumer multi-threaded. The individual consumer can immediately |
* can attain extremely high performance. |
* spawn off* new threads to handle indications, and return immediately to |
|
* catch the next indication. In this way, a consumer can attain |
|
* extremely high performance. |
* | * |
* There are three different events that can signal us: | * There are three different events that can signal us: |
* 1) A new indication (signalled by DynamicListenerIndicationDispatcher) | * 1) A new indication (signalled by DynamicListenerIndicationDispatcher) |
* 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state) |
* 2) A shutdown signal (signalled from ConsumerManager, due to a listener |
|
* shutdown or an idle consumer state) |
* 3) A retry signal (signalled from this routine itself) | * 3) A retry signal (signalled from this routine itself) |
* | * |
* The idea is that all new indications are put on the front of the queue and processed first. All of the retry |
* The idea is that all new indications are put on the front of the queue and |
* indications are put on the back of the queue and are only processed AFTER all new indications are sent. |
* processed first. All of the retry indications are put on the back of the |
* Before processing each indication, we check to see whether or not the shutdown signal was given. If so, |
* queue and are only processed AFTER all new indications are sent. |
* we immediately break out of the loop, and another compenent serializes the remaining indications to a file. |
* Before processing each indication, we check to see whether or not the |
|
* shutdown signal was given. |
|
* If so, we immediately break out of the loop, and another compenent |
|
* serializes the remaining indications to a file. |
* | * |
* An indication gets retried if the consumer throws a CIM_ERR_FAILED exception. |
* An indication gets retried |
|
* if the consumer throws a CIM_ERR_FAILED exception. |
* | * |
* This function makes sure it waits until the default retry lapse has passed to avoid issues with the following scenario: |
* This function makes sure it waits until the default retry lapse has passed |
|
* to avoid issues with the following scenario: |
* 20 new indications come in, 10 of them are successful, 10 are not. | * 20 new indications come in, 10 of them are successful, 10 are not. |
* We were signalled 20 times, so we will pass the time_wait 20 times. Perceivably, the process time on each indication |
* We were signalled 20 times, so we will pass the time_wait 20 times. |
* could be minimal. We could potentially proceed to process the retries after a very small time interval since |
* Perceivably, the process time on each indication could be minimal. |
* we would never hit the wait for the retry timeout. |
* We could potentially proceed to process the retries after a very small time |
|
* interval since we would never hit the wait for the retry timeout. |
* | * |
*/ | */ |
ThreadReturnType PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param) |
ThreadReturnType PEGASUS_THREAD_CDECL |
|
ConsumerManager::_worker_routine(void *param) |
{ | { |
PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine"); | PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine"); |
| |
|
|
String name = myself->getName(); | String name = myself->getName(); |
List<IndicationDispatchEvent,Mutex> tmpEventQueue; | List<IndicationDispatchEvent,Mutex> tmpEventQueue; |
| |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"_worker_routine::entering loop for " + name); |
| |
myself->_listeningSemaphore->signal(); | myself->_listeningSemaphore->signal(); |
| |
|
|
{ | { |
try | try |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"_worker_routine::waiting " + name); |
| |
//wait to be signalled | //wait to be signalled |
myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE); | myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE); |
| |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"_worker_routine::signalled " + name); |
| |
//check whether we received the shutdown signal | //check whether we received the shutdown signal |
if (myself->_dieNow) | if (myself->_dieNow) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"_worker_routine::shutdown received " + name); |
break; | break; |
} | } |
| |
|
|
| |
//continue processing events until the queue is empty | //continue processing events until the queue is empty |
//make sure to check for the shutdown signal before every iteration | //make sure to check for the shutdown signal before every iteration |
// Note that any time during our processing of events the Listener may be enqueueing NEW events for us to process. |
// Note that any time during our processing of events the Listener |
// Because we are popping off the front and new events are being thrown on the back if events are failing when we start |
// may be enqueueing NEW events for us to process. |
// But are succeeding by the end of the processing, events may be sent out of chronological order. |
// Because we are popping off the front and new events are being |
// However. Once we complete the current queue of events, we will always send old events to be retried before sending any |
// thrown on the back if events are failing when we start |
|
// But are succeeding by the end of the processing, events may be |
|
// sent out of chronological order. |
|
// However. Once we complete the current queue of events, we will |
|
// always send old events to be retried before sending any |
// new events added afterwards. | // new events added afterwards. |
while (myself->_eventqueue.size()) | while (myself->_eventqueue.size()) |
{ | { |
//check for shutdown signal | //check for shutdown signal |
//this only breaks us out of the queue loop, but we will immediately get through the next wait from |
//this only breaks us out of the queue loop, but we will |
//the shutdown signal itself, at which time we break out of the main loop |
//immediately get through the next wait from |
|
//the shutdown signal itself, at which time we break |
|
//out of the main loop |
if (myself->_dieNow) | if (myself->_dieNow) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"Received signal to shutdown," |
|
" jumping out of queue loop " + name); |
break; | break; |
} | } |
| |
//pop next indication off the queue | //pop next indication off the queue |
IndicationDispatchEvent* event = 0; | IndicationDispatchEvent* event = 0; |
event = myself->_eventqueue.remove_front(); //what exceptions/errors can this throw? |
//what exceptions/errors can this throw? |
|
event = myself->_eventqueue.remove_front(); |
| |
if (!event) | if (!event) |
{ | { |
|
|
continue; | continue; |
} | } |
| |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"_worker_routine::consumeIndication " + name); |
| |
try | try |
{ | { |
|
|
event->getURL(), | event->getURL(), |
event->getIndicationInstance()); | event->getIndicationInstance()); |
| |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"_worker_routine::processed indication successfully. " |
|
+ name); |
| |
delete event; | delete event; |
continue; | continue; |
|
|
//check for failure | //check for failure |
if (ce.getCode() == CIM_ERR_FAILED) | if (ce.getCode() == CIM_ERR_FAILED) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"_worker_routine::consumeIndication() temporary" |
|
" failure: " + ce.getMessage() + " " + name); |
|
|
|
// Here we simply determine if we should increment |
|
// the retry count or not. |
|
// We don't want to count a forced retry from a new |
|
// event to count as a retry. |
|
// We just have to do it for order's sake. |
|
// If the retry Lapse has lapsed on this event, |
|
// then increment the counter. |
|
if (event->getRetries() > 0) |
|
{ |
|
Sint64 differenceInMicroseconds = |
|
CIMDateTime::getDifference( |
|
event->getLastAttemptTime(), |
|
CIMDateTime::getCurrentDateTime()); |
| |
// Here we simply determine if we should increment the retry count or not. |
if (differenceInMicroseconds >= |
// We don't want to count a forced retry from a new event to count as a retry. We just have to do it for |
(DEFAULT_RETRY_LAPSE * 1000)) |
// order's sake. If the retry Lapse has lapsed on this event then increment the counter. |
{ |
if (event->getRetries() > 0) { |
|
Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), CIMDateTime::getCurrentDateTime()); |
|
if (differenceInMicroseconds >= (DEFAULT_RETRY_LAPSE * 1000)) |
|
event->increaseRetries(); | event->increaseRetries(); |
} else { |
} |
|
} |
|
else |
|
{ |
event->increaseRetries(); | event->increaseRetries(); |
} | } |
| |
//determine if we have hit the max retry count | //determine if we have hit the max retry count |
if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT) | if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT) |
{ | { |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, |
PEG_TRACE_CSTRING( |
"Error: the maximum retry count has been exceeded. Removing the event from the queue."); |
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"Error: the maximum retry count has been " |
|
"exceeded. Removing the event from " |
|
"the queue."); |
| |
Logger::put( | Logger::put( |
Logger::ERROR_LOG, | Logger::ERROR_LOG, |
System::CIMLISTENER, | System::CIMLISTENER, |
Logger::SEVERE, | Logger::SEVERE, |
"The following indication did not get processed successfully: $0", |
"The following indication did not get " |
|
"processed successfully: $0", |
event->getIndicationInstance().getPath().toString()); | event->getIndicationInstance().getPath().toString()); |
| |
delete event; | delete event; |
|
|
| |
} else | } else |
{ | { |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue"); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"_worker_routine::placing failed indication " |
|
"back in queue"); |
tmpEventQueue.insert_back(event); | tmpEventQueue.insert_back(event); |
} | } |
| |
} else | } else |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage()); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"Error: consumeIndication() permanent failure: " |
|
+ ce.getMessage()); |
delete event; | delete event; |
continue; | continue; |
} | } |
| |
} catch (Exception & ex) | } catch (Exception & ex) |
{ | { |
PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage()); |
PEG_TRACE_STRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"Error: consumeIndication() permanent failure: " |
|
+ ex.getMessage()); |
delete event; | delete event; |
continue; | continue; |
| |
} catch (...) | } catch (...) |
{ | { |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL2, |
|
"Error: consumeIndication() failed: " |
|
"Unknown exception."); |
delete event; | delete event; |
continue; | continue; |
} //end try | } //end try |
|
|
} //while eventqueue | } //while eventqueue |
| |
// Copy the failed indications back to the main queue | // Copy the failed indications back to the main queue |
// We now lock the queue while adding the retries on to the queue so that new events can't get in in front |
// We now lock the queue while adding the retries on to the queue |
// Of those events we are retrying. Retried events happened before any new events coming in. |
// so that new events can't get in in front |
|
// Of those events we are retrying. Retried events happened before |
|
// any new events coming in. |
IndicationDispatchEvent* tmpEvent = 0; | IndicationDispatchEvent* tmpEvent = 0; |
myself->_eventqueue.try_lock(); | myself->_eventqueue.try_lock(); |
while (tmpEventQueue.size()) | while (tmpEventQueue.size()) |
|
|
| |
} catch (TimeOut&) | } catch (TimeOut&) |
{ | { |
PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::Time to retry any outstanding indications."); |
PEG_TRACE_CSTRING( |
|
TRC_LISTENER, |
|
Tracer::LEVEL4, |
|
"_worker_routine::Time to retry any outstanding indications."); |
| |
//signal the queue in the same way we would if we received a new indication |
// signal the queue in the same way we would, |
|
// if we received a new indication |
//this allows the thread to fall into the queue processing code | //this allows the thread to fall into the queue processing code |
myself->_check_queue->signal(); | myself->_check_queue->signal(); |
| |
|
|
| |
| |
PEGASUS_NAMESPACE_END | PEGASUS_NAMESPACE_END |
|
|
|
|
|
|