(file) Return to ConsumerManager.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

Diff for /pegasus/src/Pegasus/DynListener/ConsumerManager.cpp between version 1.20 and 1.21

version 1.20, 2007/06/12 16:17:36 version 1.21, 2008/01/15 18:19:34
Line 29 
Line 29 
 // //
 //============================================================================== //==============================================================================
 // //
 // Author: Heather Sterling (hsterl@us.ibm.com)  
 //  
 // Modified By:  
 //  
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include <Pegasus/Common/Config.h> #include <Pegasus/Common/Config.h>
Line 54 
Line 50 
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 //ATTN: Can we just use a properties file instead??  If we only have one property, we may want to just parse it ourselves.  //ATTN: Can we just use a properties file instead??  If we only have one
 // We may need to add more properties, however.  Potential per consumer properties: unloadOk, idleTimout, retryCount, etc  // property, we may want to just parse it ourselves.
   // We may need to add more properties, however.  Potential per consumer
   // properties: unloadOk, idleTimout, retryCount, etc
 static struct OptionRow optionsTable[] = static struct OptionRow optionsTable[] =
 //optionname defaultvalue rqd  type domain domainsize clname hlpmsg //optionname defaultvalue rqd  type domain domainsize clname hlpmsg
 { {
 {"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"},  {"location", "", false, Option::STRING, 0, 0,
    "location", "library name for the consumer"},
 }; };
  
 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]); const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
Line 68 
Line 67 
 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5; static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
 static const Uint32 DEFAULT_RETRY_LAPSE = 300000;  //ms = 5 minutes static const Uint32 DEFAULT_RETRY_LAPSE = 300000;  //ms = 5 minutes
  
 //constant for fake property that is added to instances when serializing to track the full URL  //constant for fake property that is added to instances when
   // serializing to track the full URL
 static const String URL_PROPERTY = "URLString"; static const String URL_PROPERTY = "URLString";
  
  
 ConsumerManager::ConsumerManager(const String& consumerDir, const String& consumerConfigDir, Boolean enableConsumerUnload, Uint32 idleTimeout) :  ConsumerManager::ConsumerManager(
       const String& consumerDir,
       const String& consumerConfigDir,
       Boolean enableConsumerUnload,
       Uint32 idleTimeout) :
 _consumerDir(consumerDir), _consumerDir(consumerDir),
 _consumerConfigDir(consumerConfigDir), _consumerConfigDir(consumerConfigDir),
 _enableConsumerUnload(enableConsumerUnload), _enableConsumerUnload(enableConsumerUnload),
Line 81 
Line 85 
 { {
     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
  
     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer library directory: " + consumerDir);      PEG_TRACE_STRING(
     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer configuration directory: " + consumerConfigDir);          TRC_LISTENER,
           Tracer::LEVEL4,
           "Consumer library directory: " + consumerDir);
       PEG_TRACE_STRING(
           TRC_LISTENER,
           Tracer::LEVEL4,
           "Consumer configuration directory: " + consumerConfigDir);
  
     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
                   "Consumer unload enabled %d: idle timeout %d",                   "Consumer unload enabled %d: idle timeout %d",
Line 150 
Line 160 
  
                 try                 try
                 {                 {
                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!");                      PEG_TRACE_STRING(
                           TRC_LISTENER,
                           Tracer::LEVEL4,
                           "Attempting to load indication for!" +
                               consumerName + "!");
                     getConsumer(consumerName);                     getConsumer(consumerName);
  
                 } catch (...)                 } catch (...)
                 {                 {
                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]);                      PEG_TRACE_STRING(
                           TRC_LISTENER,
                           Tracer::LEVEL4,
                           "Cannot load consumer from file " + files[i]);
                 }                 }
             }             }
         }         }
Line 184 
Line 201 
     return _idleTimeout;     return _idleTimeout;
 } }
  
 /** Retrieves the library name associated with the consumer name.  By default, the library name  /** Retrieves the library name associated with the consumer name.
   * is the same as the consumer name.  However, you may specify a different library name in a consumer   *  By default, the library name
   * configuration file.  This file must be named "MyConsumer.txt" and contain the following:   * is the same as the consumer name.  However, you may specify a different
   *     location="libraryName"   * library name in a consumer configuration file.  This file must be named
    *  "MyConsumer.txt" and contain the following: location="libraryName"
   *   *
   * The config file is optional and is generally only needed in cases where there are strict requirements   * The config file is optional and is generally only needed in cases where
   * on library naming.   * there are strict requirements on library naming.
   *   *
   * It is the responsibility of the caller to catch any exceptions thrown by this method.   * It is the responsibility of the caller to catch any exceptions
    * thrown by this method.
   */   */
 String ConsumerManager::_getConsumerLibraryName(const String & consumerName) String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
 { {
Line 201 
Line 220 
     //default library name is consumer name     //default library name is consumer name
     String libraryName = consumerName;     String libraryName = consumerName;
  
     //check whether an alternative library name was specified in an optional consumer config file      // check whether an alternative library name was specified in an optional
     String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf"));      // consumer config file
     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile);      String configFile = FileSystem::getAbsolutePath(
                               (const char*)_consumerConfigDir.getCString(),
                               String(consumerName + ".conf"));
       PEG_TRACE_STRING(
           TRC_LISTENER,
           Tracer::LEVEL4,
           "Looking for config file " + configFile);
  
     if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))     if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
     {     {
         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName);          PEG_TRACE_STRING(
               TRC_LISTENER,
               Tracer::LEVEL4,
               "Found config file for consumer " + consumerName);
  
         try         try
         {         {
             //Bugzilla 3765 - Change this to use a member var when OptionManager has a reset option              //Bugzilla 3765 - Change this to use a member var
               // when OptionManager has a reset option
             OptionManager _optionMgr;             OptionManager _optionMgr;
             _optionMgr.registerOptions(optionsTable, NUM_OPTIONS); //comment this line out later              //comment the following line out later
               _optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
             _optionMgr.mergeFile(configFile);             _optionMgr.mergeFile(configFile);
             _optionMgr.checkRequiredOptions();             _optionMgr.checkRequiredOptions();
  
             if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY))              if (!_optionMgr.lookupValue("location", libraryName) ||
                   (libraryName == String::EMPTY))
             {             {
                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile);                  PEG_TRACE_STRING(
                       TRC_LISTENER,
                       Tracer::LEVEL2,
                       "Warning: Using default library name since none was "
                           "specified in " + configFile);
                 libraryName = consumerName;                 libraryName = consumerName;
             }             }
  
         } catch (Exception & ex)         } catch (Exception & ex)
         {         {
             throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE",              throw Exception(
                         MessageLoaderParms(
                             "DynListener.ConsumerManager.INVALID_CONFIG_FILE",
                                                "Error reading $0: $1.",                                                "Error reading $0: $1.",
                                                configFile,                                                configFile,
                                                ex.getMessage()));                                                ex.getMessage()));
         }         }
     } else     } else
     {     {
         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName);          PEG_TRACE_STRING(
               TRC_LISTENER,
               Tracer::LEVEL4,
               "No config file exists for " + consumerName);
     }     }
  
     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName);      PEG_TRACE_STRING(
           TRC_LISTENER,
           Tracer::LEVEL4,
           "The library name for " + consumerName + " is " + libraryName);
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
     return libraryName;     return libraryName;
 } }
  
 /** Returns the DynamicConsumer for the consumerName.  If it already exists, we return the one in the cache.  If it  /** Returns the DynamicConsumer for the consumerName.  If it already exists,
    *  we return the one in the cache.  If it
  *  DNE, we create it and initialize it, and add it to the table.  *  DNE, we create it and initialize it, and add it to the table.
  * @throws Exception if we cannot successfully create and initialize the consumer   * @throws Exception if we cannot successfully create and
    *  initialize the consumer
  */  */
 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName) DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
 { {
Line 263 
Line 308 
  
         if (consumer && consumer->isLoaded())         if (consumer && consumer->isLoaded())
         {         {
             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName);              PEG_TRACE_STRING(
                   TRC_LISTENER,
                   Tracer::LEVEL3,
                   "Consumer exists in the cache and"
                       " is already loaded: " + consumerName);
             cached = true;             cached = true;
         }         }
     } else     } else
     {     {
         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName);          PEG_TRACE_STRING(
               TRC_LISTENER,
               Tracer::LEVEL3,
               "Consumer not found in cache, creating " + consumerName);
         consumer = new DynamicConsumer(consumerName);         consumer = new DynamicConsumer(consumerName);
         //ATTN: The above is a memory leak if _initConsumer throws an exception         //ATTN: The above is a memory leak if _initConsumer throws an exception
         //need to delete it in that case         //need to delete it in that case
Line 291 
Line 343 
 } }
  
 /** Initializes a DynamicConsumer. /** Initializes a DynamicConsumer.
  * Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist.   * Caller assumes responsibility for mutexing the operation as well as
    * ensuring the consumer does not already exist.
  * @throws Exception if the consumer cannot be initialized  * @throws Exception if the consumer cannot be initialized
  */  */
 void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer)  void ConsumerManager::_initConsumer(
            const String& consumerName,
            DynamicConsumer* consumer)
 { {
     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
  
     CIMIndicationConsumerProvider* base = 0;     CIMIndicationConsumerProvider* base = 0;
     ConsumerModule* module = 0;     ConsumerModule* module = 0;
  
     //lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one)      // lookup provider module in cache (if it exists, it returns
       // the cached module, otherwise it creates and returns a new one)
     String libraryName = _getConsumerLibraryName(consumerName);     String libraryName = _getConsumerLibraryName(consumerName);
     module = _lookupModule(libraryName);     module = _lookupModule(libraryName);
  
     //build library path     //build library path
     String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName));      String libraryPath = FileSystem::getAbsolutePath(
     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath);                               (const char*)_consumerDir.getCString(),
                                FileSystem::buildLibraryFileName(libraryName));
       PEG_TRACE_STRING(
           TRC_LISTENER,
           Tracer::LEVEL4,
           "Loading library: " + libraryPath);
  
     //load module     //load module
     try     try
Line 317 
Line 378 
  
     } catch (Exception& ex)     } catch (Exception& ex)
     {     {
         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage());          PEG_TRACE_STRING(
               TRC_LISTENER,
         throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",              Tracer::LEVEL2,
               "Error loading consumer module: " + ex.getMessage());
   
           throw Exception(
                     MessageLoaderParms(
                         "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
                                            "Cannot load module ($0:$1): Unknown exception.",                                            "Cannot load module ($0:$1): Unknown exception.",
                                            consumerName,                                            consumerName,
                                            libraryName));                                            libraryName));
     } catch (...)     } catch (...)
     {     {
         throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",          throw Exception(
                     MessageLoaderParms(
                         "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
                                            "Cannot load module ($0:$1): Unknown exception.",                                            "Cannot load module ($0:$1): Unknown exception.",
                                            consumerName,                                            consumerName,
                                            libraryName));                                            libraryName));
     }     }
  
     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName);      PEG_TRACE_STRING(
           TRC_LISTENER,
           Tracer::LEVEL4,
           "Successfully loaded consumer module " + libraryName);
  
     //initialize consumer     //initialize consumer
     try     try
     {     {
         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " +  consumerName);          PEG_TRACE_STRING(
               TRC_LISTENER,
               Tracer::LEVEL4,
               "Initializing Consumer " +  consumerName);
  
         consumer->initialize();         consumer->initialize();
  
Line 350 
Line 424 
                                           _worker_routine,                                           _worker_routine,
                                           semaphore) != PEGASUS_THREAD_OK)                                           semaphore) != PEGASUS_THREAD_OK)
     {     {
         Logger::put(Logger::STANDARD_LOG, System::CIMLISTENER, Logger::TRACE,              Logger::put(
                   Logger::STANDARD_LOG,
                   System::CIMLISTENER,
                   Logger::TRACE,
         "Not enough threads for consumer.");         "Not enough threads for consumer.");
  
         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,              PEG_TRACE_CSTRING(
                   TRC_LISTENER,
                   Tracer::LEVEL2,
         "Could not allocate thread for consumer.");         "Could not allocate thread for consumer.");
  
        consumer->setShutdownSemaphore(0);        consumer->setShutdownSemaphore(0);
        delete semaphore;        delete semaphore;
            throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",              throw Exception(
                   MessageLoaderParms(
                       "DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
                         "Not enough threads for consumer worker routine."));                         "Not enough threads for consumer worker routine."));
         }         }
  
         //wait until the listening thread has started.  Otherwise, there is a miniscule chance that the first event will be enqueued          //wait until the listening thread has started.
         //before the consumer is waiting for it and the first indication after loading the consumer will be lost          // Otherwise, there is a miniscule chance that the first event will
           // be enqueued before the consumer is waiting for it and the first
           // indication after loading the consumer will be lost
         consumer->waitForEventThread();         consumer->waitForEventThread();
  
         //load any outstanding requests         //load any outstanding requests
         Array<IndicationDispatchEvent> outstandingIndications = _deserializeOutstandingIndications(consumerName);          Array<IndicationDispatchEvent> outstandingIndications =
               _deserializeOutstandingIndications(consumerName);
         if (outstandingIndications.size())         if (outstandingIndications.size())
         {         {
             //the consumer will signal itself in _loadOustandingIndications             //the consumer will signal itself in _loadOustandingIndications
             consumer->_loadOutstandingIndications(outstandingIndications);             consumer->_loadOutstandingIndications(outstandingIndications);
         }         }
  
         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);          PEG_TRACE_STRING(
               TRC_LISTENER,
               Tracer::LEVEL4,
               "Successfully initialized consumer " + consumerName);
  
     } catch (...)     } catch (...)
     {     {
         module->unloadModule();         module->unloadModule();
         consumer->reset();         consumer->reset();
         throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",          throw Exception(
               MessageLoaderParms(
                   "DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
                                            "Cannot initialize consumer ($0).",                                            "Cannot initialize consumer ($0).",
                                            consumerName));                                            consumerName));
     }     }
Line 389 
Line 478 
 } }
  
  
 /** Returns the ConsumerModule with the given library name.  If it already exists, we return the one in the cache.  If it  /** Returns the ConsumerModule with the given library name.
    *  If it already exists, we return the one in the cache.  If it
  *  DNE, we create it and add it to the table.  *  DNE, we create it and add it to the table.
  * @throws Exception if we cannot successfully create and initialize the consumer   * @throws Exception if we cannot successfully create and
    *  initialize the consumer
  */  */
 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName) ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
 { {
Line 404 
Line 495 
     //see if consumer module is cached     //see if consumer module is cached
     if (_modules.lookup(libraryName, module))     if (_modules.lookup(libraryName, module))
     {     {
         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,          PEG_TRACE_STRING(
                          "Found Consumer Module" + libraryName + " in Consumer Manager Cache");              TRC_LISTENER,
               Tracer::LEVEL4,
               "Found Consumer Module" +
                   libraryName + " in Consumer Manager Cache");
  
     } else     } else
     {     {
         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,          PEG_TRACE_STRING(
               TRC_LISTENER,
               Tracer::LEVEL4,
                          "Creating Consumer Provider Module " + libraryName);                          "Creating Consumer Provider Module " + libraryName);
  
         module = new ConsumerModule();         module = new ConsumerModule();
Line 435 
Line 531 
         {         {
             consumer = i.value();             consumer = i.value();
  
             if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))              if (consumer &&
             {                  consumer->isLoaded() &&
                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);                  (consumer->getPendingIndications() > 0))
               {
                   PEG_TRACE_STRING(
                       TRC_LISTENER,
                       Tracer::LEVEL4,
                       "Found active consumer: " + consumer->_name);
                 PEG_METHOD_EXIT();                 PEG_METHOD_EXIT();
                 return true;                 return true;
             }             }
Line 445 
Line 546 
     } catch (...)     } catch (...)
     {     {
         // Unexpected exception; do not assume that no providers are loaded         // Unexpected exception; do not assume that no providers are loaded
         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");          PEG_TRACE_CSTRING(
               TRC_LISTENER,
               Tracer::LEVEL2,
               "Unexpected Exception in hasActiveConsumers.");
         PEG_METHOD_EXIT();         PEG_METHOD_EXIT();
         return true;         return true;
     }     }
Line 471 
Line 575 
  
             if (consumer && consumer->isLoaded())             if (consumer && consumer->isLoaded())
             {             {
                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);                  PEG_TRACE_STRING(
                       TRC_LISTENER,
                       Tracer::LEVEL4,
                       "Found loaded consumer: " + consumer->_name);
                 PEG_METHOD_EXIT();                 PEG_METHOD_EXIT();
                 return true;                 return true;
             }             }
Line 479 
Line 586 
     } catch (...)     } catch (...)
     {     {
         // Unexpected exception; do not assume that no providers are loaded         // Unexpected exception; do not assume that no providers are loaded
         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");          PEG_TRACE_CSTRING(
               TRC_LISTENER,
               Tracer::LEVEL2,
               "Unexpected Exception in hasLoadedConsumers.");
         PEG_METHOD_EXIT();         PEG_METHOD_EXIT();
         return true;         return true;
     }     }
Line 490 
Line 600 
  
  
 /** Shutting down a consumer consists of four major steps: /** Shutting down a consumer consists of four major steps:
  * 1) Send the shutdown signal.  This causes the worker routine to break out of the loop and exit.   * 1) Send the shutdown signal.  This causes the worker routine to break out
  * 2) Wait for the worker thread to end.  This may take a while if it's processing an indication.  This   *    of the loop and exit.
  *    is optional in a shutdown scenario.  If the listener is shutdown with a -f force, the listener   * 2) Wait for the worker thread to end.  This may take a while if it's
  *    will not wait for the consumer to finish before shutting down.  Note that a normal shutdown only allows   *    processing an indication.  This is optional in a shutdown scenario.
  *    the current consumer indication to finish.  All other queued indications are serialized to a log and   *    If the listener is shutdown with a -f force, the listener
    *    will not wait for the consumer to finish before shutting down.
    *    Note that a normal shutdown only allows the current consumer indication
    *    to finish.  All other queued indications are serialized to a log and
  *    are sent when the consumer is reoaded.  *    are sent when the consumer is reoaded.
  * 3) Terminate the consumer provider interface.  * 3) Terminate the consumer provider interface.
  * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0)   * 4) Decrement the module refcount (the module will automatically unload when
    *    it's refcount == 0)
  *  *
  * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all   * In a scenario where more multiple consumers are loaded, the shutdown signal
  * of the consumers so the threads can finish simultaneously.   * should be sent to all of the consumers so the threads can finish
    * simultaneously.
  *  *
  * ATTN: Should the normal shutdown wait for everything in the queue to be processed?  Just new indications   * ATTN: Should the normal shutdown wait for everything in the queue to be
  * to be processed?  I am not inclined to this solution since it could take a LOT of time.  By serializing   * processed?  Just new indications to be processed?  I am not inclined to this
  * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL   * solution since it could take a LOT of time.  By serializing and deserialing
  * queued indications on shutdown.   * indications between shutdown and startup, I feel like we do not need to
    * process ALL queued indications on shutdown.
  */  */
  
 /** Unloads all consumers. /** Unloads all consumers.
Line 518 
Line 634 
  
     if (!_consumers.size())     if (!_consumers.size())
     {     {
         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");          PEG_TRACE_CSTRING(
               TRC_LISTENER,
               Tracer::LEVEL4,
               "There are no consumers to unload.");
         PEG_METHOD_EXIT();         PEG_METHOD_EXIT();
         return;         return;
     }     }
  
     if (!_forceShutdown)     if (!_forceShutdown)
     {     {
         //wait until all the consumers have finished processing the events in their queue          // wait until all the consumers have finished processing the events in
           // their queue
         //ATTN: Should this have a timeout even though it's a force??         //ATTN: Should this have a timeout even though it's a force??
         while (hasActiveConsumers())         while (hasActiveConsumers())
         {         {
Line 555 
Line 675 
  
         } catch (Exception&)         } catch (Exception&)
         {         {
             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");              PEG_TRACE_CSTRING(
                   TRC_LISTENER,
                   Tracer::LEVEL4,
                   "Error unloading consumers.");
         }         }
     } else     } else
     {     {
         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");          PEG_TRACE_CSTRING(
               TRC_LISTENER,
               Tracer::LEVEL4,
               "There are no consumers to unload.");
     }     }
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
Line 575 
Line 701 
  
     if (!_consumers.size())     if (!_consumers.size())
     {     {
         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");          PEG_TRACE_CSTRING(
               TRC_LISTENER,
               Tracer::LEVEL4,
               "There are no consumers to unload.");
         PEG_METHOD_EXIT();         PEG_METHOD_EXIT();
         return;         return;
     }     }
Line 602 
Line 731 
  
         } catch (Exception&)         } catch (Exception&)
         {         {
             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");              PEG_TRACE_CSTRING(
                   TRC_LISTENER,
                   Tracer::LEVEL4,
                   "Error unloading consumers.");
         }         }
     } else     } else
     {     {
         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");          PEG_TRACE_CSTRING(
               TRC_LISTENER,
               Tracer::LEVEL4,
               "There are no consumers to unload.");
     }     }
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
Line 625 
Line 760 
     //check whether the consumer exists     //check whether the consumer exists
     if (!_consumers.lookup(consumerName, consumer))     if (!_consumers.lookup(consumerName, consumer))
     {     {
         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);          PEG_TRACE_STRING(
               TRC_LISTENER,
               Tracer::LEVEL3,
               "Error: cannot unload consumer, unknown consumer " + consumerName);
         return;         return;
     }     }
  
Line 642 
Line 780 
  
         } catch (Exception&)         } catch (Exception&)
         {         {
             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");              PEG_TRACE_CSTRING(
                   TRC_LISTENER,
                   Tracer::LEVEL4,
                   "Error unloading consumers.");
         }         }
  
     } else     } else
     {     {
         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);          PEG_TRACE_STRING(
               TRC_LISTENER,
               Tracer::LEVEL3,
               "Error: cannot unload consumer " + consumerName);
     }     }
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
Line 656 
Line 800 
 /** Unloads the consumers in the given array. /** Unloads the consumers in the given array.
  *  The consumerTable mutex MUST be locked prior to entering this method.  *  The consumerTable mutex MUST be locked prior to entering this method.
  */  */
 void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload)  void ConsumerManager::_unloadConsumers(
       Array<DynamicConsumer*> consumersToUnload)
 { {
     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
  
Line 666 
Line 811 
         consumersToUnload[i]->sendShutdownSignal();         consumersToUnload[i]->sendShutdownSignal();
     }     }
  
     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");      PEG_TRACE_CSTRING(
           #TRC_LISTENER,
           Tracer::LEVEL4,
           "Sent shutdown signal to all consumers.");
  
     //wait for all the consumer worker threads to complete     //wait for all the consumer worker threads to complete
     //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last      // since we can only shutdown after they are all complete,
     //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop      // it does not matter if the first, fifth, or last
     //processing its requests.      // consumer takes the longest; the wait time is equal to the time it takes
       // for the busiest consumer to stop processing its requests.
     for (Uint32 i = 0; i < consumersToUnload.size(); i++)     for (Uint32 i = 0; i < consumersToUnload.size(); i++)
     {     {
         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());          PEG_TRACE_STRING(
               TRC_LISTENER,
               Tracer::LEVEL3,
               "Unloading consumer " + consumersToUnload[i]->getName());
  
         //wait for the consumer worker thread to end         //wait for the consumer worker thread to end
         try         try
         {         {
             Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();              Semaphore* _shutdownSemaphore =
                   consumersToUnload[i]->getShutdownSemaphore();
             if (_shutdownSemaphore)             if (_shutdownSemaphore)
             {             {
                 _shutdownSemaphore->time_wait(10000);                 _shutdownSemaphore->time_wait(10000);
Line 687 
Line 840 
  
         } catch (TimeOut &)         } catch (TimeOut &)
         {         {
             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");              PEG_TRACE_CSTRING(
                   TRC_LISTENER,
                   Tracer::LEVEL2,
                   "Timed out while attempting to stop consumer thread.");
         }         }
  
         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");          PEG_TRACE_CSTRING(
               TRC_LISTENER,
               Tracer::LEVEL2,
               "Terminating consumer.");
  
         try         try
         {         {
Line 702 
Line 861 
             consumersToUnload[i]->_module->unloadModule();             consumersToUnload[i]->_module->unloadModule();
  
             //serialize outstanding indications             //serialize outstanding indications
             _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());              _serializeOutstandingIndications(
                   consumersToUnload[i]->getName(),
                   consumersToUnload[i]->_retrieveOutstandingIndications());
  
             //reset the consumer             //reset the consumer
             consumersToUnload[i]->reset();             consumersToUnload[i]->reset();
  
             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");              PEG_TRACE_CSTRING(
                   TRC_LISTENER,
                   Tracer::LEVEL3,
                   "Consumer library successfully unloaded.");
  
         } catch (Exception& e)         } catch (Exception& e)
         {         {
             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage());              PEG_TRACE_STRING(
                   TRC_LISTENER,
                   Tracer::LEVEL2,
                   "Error unloading consumer: " + e.getMessage());
             //ATTN: throw exception? log warning?             //ATTN: throw exception? log warning?
         }         }
     }     }
Line 721 
Line 888 
  
 /** Serializes oustanding indications to a <MyConsumer>.dat file /** Serializes oustanding indications to a <MyConsumer>.dat file
  */  */
 void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<IndicationDispatchEvent> indications)  void ConsumerManager::_serializeOutstandingIndications(
 {      const String& consumerName,
     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");      Array<IndicationDispatchEvent> indications)
   {
       PEG_METHOD_ENTER(
           TRC_LISTENER,
           "ConsumerManager::_serializeOutstandingIndications");
  
     if (!indications.size())     if (!indications.size())
     {     {
Line 731 
Line 902 
         return;         return;
     }     }
  
     String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));      String fileName = FileSystem::getAbsolutePath(
     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);                            (const char*)_consumerConfigDir.getCString(),
                             String(consumerName + ".dat"));
       PEG_TRACE_STRING(
           TRC_LISTENER,
           Tracer::LEVEL4,
           "Consumer dat file: " + fileName);
  
     Buffer buffer;     Buffer buffer;
  
Line 742 
Line 918 
  
     if (!fileHandle)     if (!fileHandle)
     {     {
         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);          PEG_TRACE_STRING(
               TRC_LISTENER,
               Tracer::LEVEL2,
               "Unable to open log file for " + consumerName);
  
     } else     } else
     {     {
Line 751 
Line 930 
                       indications.size(),                       indications.size(),
                       (const char*)consumerName.getCString()));                       (const char*)consumerName.getCString()));
  
         //we have to put the array of instances under a valid root element or the parser complains          // we have to put the array of instances under a valid root element
           // or the parser complains
         XmlWriter::append(buffer, "<IRETURNVALUE>\n");         XmlWriter::append(buffer, "<IRETURNVALUE>\n");
  
                 CIMInstance cimInstance;                 CIMInstance cimInstance;
Line 779 
Line 959 
  
 /** Reads outstanding indications from a <MyConsumer>.dat file /** Reads outstanding indications from a <MyConsumer>.dat file
  */  */
 Array<IndicationDispatchEvent> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName)  Array<IndicationDispatchEvent>
 {      ConsumerManager::_deserializeOutstandingIndications(
     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");          const String& consumerName)
   {
     String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));      PEG_METHOD_ENTER(
     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);          TRC_LISTENER,
           "ConsumerManager::_deserializeOutstandingIndications");
   
       String fileName = FileSystem::getAbsolutePath(
                             (const char*)_consumerConfigDir.getCString(),
                             String(consumerName + ".dat"));
       PEG_TRACE_STRING(
           TRC_LISTENER,
           Tracer::LEVEL4,
           "Consumer dat file: " + fileName);
  
     Array<CIMInstance> cimInstances;     Array<CIMInstance> cimInstances;
         Array<String>      urlStrings;         Array<String>      urlStrings;
Line 802 
Line 991 
  
         try         try
         {         {
             FileSystem::loadFileToMemory(text, fileName);  //ATTN: Is this safe to use; what about CRLFs?              //ATTN: Is this safe to use; what about CRLFs?
               FileSystem::loadFileToMemory(text, fileName);
  
             //parse the file             //parse the file
             XmlParser parser((char*)text.getData());             XmlParser parser((char*)text.getData());
Line 813 
Line 1003 
                                 Uint32 index = cimInstance.findProperty(URL_PROPERTY);                                 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
                                 if (index != PEG_NOT_FOUND)                                 if (index != PEG_NOT_FOUND)
                                 {                                 {
                                         //get the URL string property from the serialized instance and remove the property                      // get the URL string property from the serialized instance
                       // and remove the property
                                         cimProperty = cimInstance.getProperty(index);                                         cimProperty = cimInstance.getProperty(index);
                                         cimValue = cimProperty.getValue();                                         cimValue = cimProperty.getValue();
                                         cimValue.get(urlString);                                         cimValue.get(urlString);
                                         cimInstance.removeProperty(index);                                         cimInstance.removeProperty(index);
                                 }                                 }
                                 IndicationDispatchEvent* indicationEvent = new IndicationDispatchEvent(OperationContext(), urlString, cimInstance);                  IndicationDispatchEvent* indicationEvent =
                       new IndicationDispatchEvent(
                           OperationContext(),
                           urlString,
                           cimInstance);
   
                 indications.append(*indicationEvent);                 indications.append(*indicationEvent);
             }             }
  
Line 835 
Line 1031 
  
         } catch (Exception& ex)         } catch (Exception& ex)
         {         {
             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);              PEG_TRACE_STRING(
                   TRC_LISTENER,
                   Tracer::LEVEL3,
                   "Error parsing dat file: " +
                       ex.getMessage() + " " + consumerName);
  
         } catch (...)         } catch (...)
         {         {
             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);              PEG_TRACE_STRING(
                   TRC_LISTENER,
                   Tracer::LEVEL2,
                   "Error parsing dat file: Unknown Exception " + consumerName);
         }         }
     }     }
  
Line 850 
Line 1053 
  
  
 /** /**
  * This is the main worker thread of the consumer.  By having only one thread per consumer, we eliminate a ton   * This is the main worker thread of the consumer.  By having only one thread
  * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive   * per consumer, we eliminate a ton of synchronization issues and make it easy
  * operations at once.  This also prevents one bad consumer from taking the entire listener down.  That being said,   * to prevent the consumer from performing two mutually exclusive operations
  * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread.   * at once.  This also prevents one bad consumer from taking the entire
    * listener down.  That being said, it is up to the programmer to write smart
    * consumers, and to ensure that their actions don't deadlock
    * the worker thread.
  *  *
  * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to   * If a consumer receives a lot of traffic, or it's consumeIndication() method
  * complete, it may make sense to make the consumer multi-threaded.  The individual consumer can immediately spawn off   * takes a considerable amount of time to complete, it may make sense to make
  * new threads to handle indications, and return immediately to catch the next indication.  In this way, a consumer   * the consumer multi-threaded.  The individual consumer can immediately
  * can attain extremely high performance.   * spawn off* new threads to handle indications, and return immediately to
    * catch the next indication.  In this way, a consumer can attain
    * extremely high performance.
  *  *
  * There are three different events that can signal us:  * There are three different events that can signal us:
  * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)  * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
  * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state)   * 2) A shutdown signal (signalled from ConsumerManager, due to a listener
    *    shutdown or an idle consumer state)
  * 3) A retry signal (signalled from this routine itself)  * 3) A retry signal (signalled from this routine itself)
  *  *
  * The idea is that all new indications are put on the front of the queue and processed first.  All of the retry   * The idea is that all new indications are put on the front of the queue and
  * indications are put on the back of the queue and are only processed AFTER all new indications are sent.   * processed first.  All of the retry indications are put on the back of the
  * Before processing each indication, we check to see whether or not the shutdown signal was given.  If so,   * queue and are only processed AFTER all new indications are sent.
  * we immediately break out of the loop, and another compenent serializes the remaining indications to a file.   * Before processing each indication, we check to see whether or not the
    * shutdown signal was given.
    * If so, we immediately break out of the loop, and another compenent
    * serializes the remaining indications to a file.
  *  *
  * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception.   * An indication gets retried
    *     if the consumer throws a CIM_ERR_FAILED exception.
  *  *
  * This function makes sure it waits until the default retry lapse has passed to avoid issues with the following scenario:   * This function makes sure it waits until the default retry lapse has passed
    * to avoid issues with the following scenario:
  * 20 new indications come in, 10 of them are successful, 10 are not.  * 20 new indications come in, 10 of them are successful, 10 are not.
  * We were signalled 20 times, so we will pass the time_wait 20 times.  Perceivably, the process time on each indication   * We were signalled 20 times, so we will pass the time_wait 20 times.
  * could be minimal.  We could potentially proceed to process the retries after a very small time interval since   * Perceivably, the process time on each indication could be minimal.
  * we would never hit the wait for the retry timeout.   * We could potentially proceed to process the retries after a very small time
    * interval since we would never hit the wait for the retry timeout.
  *  *
  */  */
 ThreadReturnType PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param)  ThreadReturnType PEGASUS_THREAD_CDECL
       ConsumerManager::_worker_routine(void *param)
 { {
     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
  
Line 887 
Line 1103 
     String name = myself->getName();     String name = myself->getName();
     List<IndicationDispatchEvent,Mutex> tmpEventQueue;     List<IndicationDispatchEvent,Mutex> tmpEventQueue;
  
     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);      PEG_TRACE_STRING(
           TRC_LISTENER,
           Tracer::LEVEL2,
           "_worker_routine::entering loop for " + name);
  
     myself->_listeningSemaphore->signal();     myself->_listeningSemaphore->signal();
  
Line 895 
Line 1114 
     {     {
         try         try
         {         {
             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name);              PEG_TRACE_STRING(
                   TRC_LISTENER,
                   Tracer::LEVEL4,
                   "_worker_routine::waiting " + name);
  
             //wait to be signalled             //wait to be signalled
             myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);             myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
  
             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name);              PEG_TRACE_STRING(
                   TRC_LISTENER,
                   Tracer::LEVEL4,
                   "_worker_routine::signalled " + name);
  
             //check whether we received the shutdown signal             //check whether we received the shutdown signal
             if (myself->_dieNow)             if (myself->_dieNow)
             {             {
                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name);                  PEG_TRACE_STRING(
                       TRC_LISTENER,
                       Tracer::LEVEL4,
                       "_worker_routine::shutdown received " + name);
                 break;                 break;
             }             }
  
Line 914 
Line 1142 
  
             //continue processing events until the queue is empty             //continue processing events until the queue is empty
             //make sure to check for the shutdown signal before every iteration             //make sure to check for the shutdown signal before every iteration
             // Note that any time during our processing of events the Listener may be enqueueing NEW events for us to process.              // Note that any time during our processing of events the Listener
             // Because we are popping off the front and new events are being thrown on the back if events are failing when we start              // may be enqueueing NEW events for us to process.
             // But are succeeding by the end of the processing, events may be sent out of chronological order.              // Because we are popping off the front and new events are being
             // However. Once we complete the current queue of events, we will always send old events to be retried before sending any              // thrown on the back if events are failing when we start
               // But are succeeding by the end of the processing, events may be
               // sent out of chronological order.
               // However. Once we complete the current queue of events, we will
               // always send old events to be retried before sending any
             // new events added afterwards.             // new events added afterwards.
             while (myself->_eventqueue.size())             while (myself->_eventqueue.size())
             {             {
                 //check for shutdown signal                 //check for shutdown signal
                 //this only breaks us out of the queue loop, but we will immediately get through the next wait from                  //this only breaks us out of the queue loop, but we will
                 //the shutdown signal itself, at which time we break out of the main loop                  //immediately get through the next wait from
                   //the shutdown signal itself, at which time we break
                   //out of the main loop
                 if (myself->_dieNow)                 if (myself->_dieNow)
                 {                 {
                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name);                      PEG_TRACE_STRING(
                           TRC_LISTENER,
                           Tracer::LEVEL4,
                           "Received signal to shutdown,"
                               " jumping out of queue loop " + name);
                     break;                     break;
                 }                 }
  
                 //pop next indication off the queue                 //pop next indication off the queue
                 IndicationDispatchEvent* event = 0;                 IndicationDispatchEvent* event = 0;
                 event = myself->_eventqueue.remove_front();  //what exceptions/errors can this throw?                  //what exceptions/errors can this throw?
                   event = myself->_eventqueue.remove_front();
  
                 if (!event)                 if (!event)
                 {                 {
Line 940 
Line 1179 
                     continue;                     continue;
                 }                 }
  
                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);                  PEG_TRACE_STRING(
                       TRC_LISTENER,
                       Tracer::LEVEL4,
                       "_worker_routine::consumeIndication " + name);
  
                 try                 try
                 {                 {
Line 948 
Line 1190 
                                               event->getURL(),                                               event->getURL(),
                                               event->getIndicationInstance());                                               event->getIndicationInstance());
  
                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name);                      PEG_TRACE_STRING(
                           TRC_LISTENER,
                           Tracer::LEVEL4,
                           "_worker_routine::processed indication successfully. "
                               + name);
  
                     delete event;                     delete event;
                     continue;                     continue;
Line 958 
Line 1204 
                     //check for failure                     //check for failure
                     if (ce.getCode() == CIM_ERR_FAILED)                     if (ce.getCode() == CIM_ERR_FAILED)
                     {                     {
                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);                          PEG_TRACE_STRING(
                               TRC_LISTENER,
                               Tracer::LEVEL2,
                               "_worker_routine::consumeIndication() temporary"
                                   " failure: " + ce.getMessage() + " " + name);
   
                           // Here we simply determine if we should increment
                           // the retry count or not.
                           // We don't want to count a forced retry from a new
                           // event to count as a retry.
                           // We just have to do it for order's sake.
                           // If the retry Lapse has lapsed on this event,
                           // then increment the counter.
                           if (event->getRetries() > 0)
                           {
                               Sint64 differenceInMicroseconds =
                                   CIMDateTime::getDifference(
                                       event->getLastAttemptTime(),
                                       CIMDateTime::getCurrentDateTime());
  
                         // Here we simply determine if we should increment the retry count or not.                              if (differenceInMicroseconds >=
                         // We don't want to count a forced retry from a new event to count as a retry. We just have to do it for                                      (DEFAULT_RETRY_LAPSE * 1000))
                         // order's sake. If the retry Lapse has lapsed on this event then increment the counter.                              {
                         if (event->getRetries() > 0) {  
                             Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), CIMDateTime::getCurrentDateTime());  
                             if (differenceInMicroseconds >= (DEFAULT_RETRY_LAPSE * 1000))  
                                 event->increaseRetries();                                 event->increaseRetries();
                         } else {                              }
                           }
                           else
                           {
                             event->increaseRetries();                             event->increaseRetries();
                         }                         }
  
                         //determine if we have hit the max retry count                         //determine if we have hit the max retry count
                         if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)                         if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
                         {                         {
                             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,                              PEG_TRACE_CSTRING(
                                              "Error: the maximum retry count has been exceeded.  Removing the event from the queue.");                                  TRC_LISTENER,
                                   Tracer::LEVEL2,
                                   "Error: the maximum retry count has been "
                                       "exceeded.  Removing the event from "
                                           "the queue.");
  
                             Logger::put(                             Logger::put(
                                        Logger::ERROR_LOG,                                        Logger::ERROR_LOG,
                                        System::CIMLISTENER,                                        System::CIMLISTENER,
                                        Logger::SEVERE,                                        Logger::SEVERE,
                                        "The following indication did not get processed successfully: $0",                                  "The following indication did not get "
                                       "processed successfully: $0",
                                        event->getIndicationInstance().getPath().toString());                                        event->getIndicationInstance().getPath().toString());
  
                             delete event;                             delete event;
Line 989 
Line 1258 
  
                         } else                         } else
                         {                         {
                             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue");                              PEG_TRACE_CSTRING(
                                   TRC_LISTENER,
                                   Tracer::LEVEL4,
                                   "_worker_routine::placing failed indication "
                                       "back in queue");
                             tmpEventQueue.insert_back(event);                             tmpEventQueue.insert_back(event);
                         }                         }
  
                     } else                     } else
                     {                     {
                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage());                          PEG_TRACE_STRING(
                               TRC_LISTENER,
                               Tracer::LEVEL2,
                               "Error: consumeIndication() permanent failure: "
                                   + ce.getMessage());
                         delete event;                         delete event;
                         continue;                         continue;
                     }                     }
  
                 } catch (Exception & ex)                 } catch (Exception & ex)
                 {                 {
                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage());                      PEG_TRACE_STRING(
                           TRC_LISTENER,
                           Tracer::LEVEL2,
                           "Error: consumeIndication() permanent failure: "
                               + ex.getMessage());
                     delete event;                     delete event;
                     continue;                     continue;
  
                 } catch (...)                 } catch (...)
                 {                 {
                     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception.");                      PEG_TRACE_CSTRING(
                           TRC_LISTENER,
                           Tracer::LEVEL2,
                           "Error: consumeIndication() failed: "
                               "Unknown exception.");
                     delete event;                     delete event;
                     continue;                     continue;
                 } //end try                 } //end try
Line 1016 
Line 1301 
             } //while eventqueue             } //while eventqueue
  
             // Copy the failed indications back to the main queue             // Copy the failed indications back to the main queue
             // We now lock the queue while adding the retries on to the queue so that new events can't get in in front              // We now lock the queue while adding the retries on to the queue
             // Of those events we are retrying. Retried events happened before any new events coming in.              // so that new events can't get in in front
               // Of those events we are retrying. Retried events happened before
               // any new events coming in.
             IndicationDispatchEvent* tmpEvent = 0;             IndicationDispatchEvent* tmpEvent = 0;
             myself->_eventqueue.try_lock();             myself->_eventqueue.try_lock();
             while (tmpEventQueue.size())             while (tmpEventQueue.size())
Line 1030 
Line 1317 
  
         } catch (TimeOut&)         } catch (TimeOut&)
         {         {
             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::Time to retry any outstanding indications.");              PEG_TRACE_CSTRING(
                   TRC_LISTENER,
                   Tracer::LEVEL4,
                   "_worker_routine::Time to retry any outstanding indications.");
  
             //signal the queue in the same way we would if we received a new indication              // signal the queue in the same way we would,
               // if we received a new indication
             //this allows the thread to fall into the queue processing code             //this allows the thread to fall into the queue processing code
             myself->_check_queue->signal();             myself->_check_queue->signal();
  
Line 1047 
Line 1338 
  
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END
   
   
   


Legend:
Removed from v.1.20  
changed lines
  Added in v.1.21

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2