(file) Return to ConsumerManager.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

   1 karl  1.12 //%2006////////////////////////////////////////////////////////////////////////
   2 h.sterling 1.1  //
   3                 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4                 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5                 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6                 // IBM Corp.; EMC Corporation, The Open Group.
   7                 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8                 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9                 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10                 // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 karl       1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  12                 // EMC Corporation; Symantec Corporation; The Open Group.
  13 h.sterling 1.1  //
  14                 // Permission is hereby granted, free of charge, to any person obtaining a copy
  15                 // of this software and associated documentation files (the "Software"), to
  16                 // deal in the Software without restriction, including without limitation the
  17                 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  18                 // sell copies of the Software, and to permit persons to whom the Software is
  19                 // furnished to do so, subject to the following conditions:
  20                 // 
  21                 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  22                 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  23                 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  24                 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  25                 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  26                 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  27                 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  28                 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29                 //
  30                 //==============================================================================
  31                 //
  32                 // Author: Heather Sterling (hsterl@us.ibm.com)
  33                 //
  34 h.sterling 1.1  // Modified By: 
  35                 //
  36                 //%/////////////////////////////////////////////////////////////////////////////
  37                 
  38                 #include <Pegasus/Common/Config.h>
  39                 //#include <cstdlib>
  40                 //#include <dlfcn.h>
  41                 #include <Pegasus/Common/System.h>
  42                 #include <Pegasus/Common/FileSystem.h>
  43                 #include <Pegasus/Common/Tracer.h>
  44                 #include <Pegasus/Common/Logger.h>
  45                 #include <Pegasus/Common/XmlReader.h>
  46 mike       1.15 #include <Pegasus/Common/ThreadPool.h>
  47 h.sterling 1.1  #include <Pegasus/Common/XmlParser.h>
  48                 #include <Pegasus/Common/XmlWriter.h>
  49 mike       1.14 #include <Pegasus/Common/List.h>
  50 mike       1.15 #include <Pegasus/Common/Mutex.h>
  51 h.sterling 1.1  
  52                 #include "ConsumerManager.h"
  53                 
  54                 PEGASUS_NAMESPACE_BEGIN
  55                 PEGASUS_USING_STD;
  56                 
  57                 //ATTN: Can we just use a properties file instead??  If we only have one property, we may want to just parse it ourselves.
  58                 // We may need to add more properties, however.  Potential per consumer properties: unloadOk, idleTimout, retryCount, etc
  59                 static struct OptionRow optionsTable[] =
  60                 //optionname defaultvalue rqd  type domain domainsize clname hlpmsg
  61                 {
  62                 {"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"},
  63                 };
  64                 
  65                 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
  66                 
  67                 //retry settings
  68                 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
  69 h.sterling 1.6  static const Uint32 DEFAULT_RETRY_LAPSE = 300000;  //ms = 5 minutes
  70 h.sterling 1.1  
  71 h.sterling 1.11 //constant for fake property that is added to instances when serializing to track the full URL
  72                 static const String URL_PROPERTY = "URLString";
  73 h.sterling 1.1  
  74                 
  75                 ConsumerManager::ConsumerManager(const String& consumerDir, const String& consumerConfigDir, Boolean enableConsumerUnload, Uint32 idleTimeout) : 
  76                 _consumerDir(consumerDir),
  77                 _consumerConfigDir(consumerConfigDir),
  78                 _enableConsumerUnload(enableConsumerUnload),
  79                 _idleTimeout(idleTimeout),
  80                 _forceShutdown(true)
  81                 {
  82                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
  83                 
  84                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer library directory: " + consumerDir);
  85                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer configuration directory: " + consumerConfigDir);
  86                 
  87 marek      1.18     PEG_TRACE((__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
  88 h.sterling 1.1                    "Consumer unload enabled %d: idle timeout %d",
  89                                   enableConsumerUnload,
  90 marek      1.18                   idleTimeout));
  91 h.sterling 1.1  
  92                 
  93 h.sterling 1.8      //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
  94 h.sterling 1.6      //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
  95 h.sterling 1.1  
  96 kumpf      1.3      struct timeval deallocateWait = {15, 0};
  97                     _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
  98 h.sterling 1.1  
  99                     _init();
 100                 
 101                     PEG_METHOD_EXIT();
 102                 }
 103                 
 104                 ConsumerManager::~ConsumerManager()
 105                 {
 106                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
 107                 
 108                     unloadAllConsumers();
 109                 
 110 kumpf      1.3      delete _thread_pool;
 111 h.sterling 1.1  
 112                     ConsumerTable::Iterator i = _consumers.start();
 113                     for (; i!=0; i++)
 114                     {
 115                         DynamicConsumer* consumer = i.value();
 116                         delete consumer;
 117                     }
 118                 
 119 marek      1.18     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
 120 h.sterling 1.1  
 121                     ModuleTable::Iterator j = _modules.start();
 122                     for (;j!=0;j++)
 123                     {
 124                         ConsumerModule* module = j.value();
 125                         delete module;
 126                     }
 127                 
 128 marek      1.18     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
 129 h.sterling 1.1  
 130                     PEG_METHOD_EXIT();
 131                 }
 132                 
 133                 void ConsumerManager::_init()
 134                 {
 135                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
 136                 
 137                     //check if there are any outstanding indications
 138                     Array<String> files;
 139                     Uint32 pos;
 140                     String consumerName;
 141                 
 142                     if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
 143                     {
 144                         for (Uint32 i = 0; i < files.size(); i++)
 145                         {
 146                             pos = files[i].find(".dat");
 147                             if (pos != PEG_NOT_FOUND)
 148                             {
 149                                 consumerName = files[i].subString(0, pos);
 150 h.sterling 1.1  
 151                                 try
 152                                 {
 153                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!");
 154                                     getConsumer(consumerName);
 155                 
 156                                 } catch (...)
 157                                 {
 158                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]);
 159                                 }
 160                             }
 161                         }
 162                     }
 163                 
 164                     PEG_METHOD_EXIT();
 165                 }
 166                 
 167                 String ConsumerManager::getConsumerDir()
 168                 {
 169                     return _consumerDir;
 170                 }
 171 h.sterling 1.1  
 172                 String ConsumerManager::getConsumerConfigDir()
 173                 {
 174                     return _consumerConfigDir;
 175                 }
 176                 
 177                 Boolean ConsumerManager::getEnableConsumerUnload()
 178                 {
 179                     return _enableConsumerUnload;
 180                 }
 181                 
 182                 Uint32 ConsumerManager::getIdleTimeout()
 183                 {
 184                     return _idleTimeout;
 185                 }
 186                 
 187                 /** Retrieves the library name associated with the consumer name.  By default, the library name
 188                   * is the same as the consumer name.  However, you may specify a different library name in a consumer
 189                   * configuration file.  This file must be named "MyConsumer.txt" and contain the following:
 190                   *     location="libraryName"
 191                   *
 192 h.sterling 1.1    * The config file is optional and is generally only needed in cases where there are strict requirements
 193                   * on library naming.
 194                   *
 195                   * It is the responsibility of the caller to catch any exceptions thrown by this method.
 196                   */
 197                 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
 198                 {
 199                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
 200                 
 201                     //default library name is consumer name
 202                     String libraryName = consumerName;
 203                 
 204                     //check whether an alternative library name was specified in an optional consumer config file
 205                     String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf"));
 206                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile);
 207                 
 208                     if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
 209                     {
 210                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName);
 211                 
 212                         try
 213 h.sterling 1.1          {
 214 h.sterling 1.6              //Bugzilla 3765 - Change this to use a member var when OptionManager has a reset option
 215 h.sterling 1.8              OptionManager _optionMgr;
 216                             _optionMgr.registerOptions(optionsTable, NUM_OPTIONS); //comment this line out later
 217 h.sterling 1.1              _optionMgr.mergeFile(configFile);
 218                             _optionMgr.checkRequiredOptions();
 219                 
 220                             if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY))
 221                             {
 222                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile); 
 223                                 libraryName = consumerName;
 224                             }
 225                 
 226                         } catch (Exception & ex)
 227                         {
 228                             throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE",
 229                                                                "Error reading $0: $1.",
 230                                                                configFile,
 231                                                                ex.getMessage()));
 232                         }
 233                     } else
 234                     {
 235                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName);
 236                     }
 237                 
 238 h.sterling 1.1      PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName);
 239                 
 240                     PEG_METHOD_EXIT();
 241                     return libraryName;
 242                 }
 243                 
 244                 /** Returns the DynamicConsumer for the consumerName.  If it already exists, we return the one in the cache.  If it
 245                  *  DNE, we create it and initialize it, and add it to the table.
 246                  * @throws Exception if we cannot successfully create and initialize the consumer
 247                  */ 
 248                 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
 249                 {
 250                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
 251                 
 252                     DynamicConsumer* consumer = 0;
 253                     CIMIndicationConsumerProvider* consumerRef = 0;
 254                     Boolean cached = false;
 255                     Boolean entryExists = false;
 256                 
 257                     AutoMutex lock(_consumerTableMutex);
 258                 
 259 h.sterling 1.1      if (_consumers.lookup(consumerName, consumer))
 260                     {
 261                         //why isn't this working??
 262                         entryExists = true;
 263                 
 264                         if (consumer && consumer->isLoaded())
 265                         {
 266                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName);
 267                             cached = true;
 268                         }
 269                     } else
 270                     {
 271                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName);
 272                         consumer = new DynamicConsumer(consumerName);
 273                         //ATTN: The above is a memory leak if _initConsumer throws an exception
 274                         //need to delete it in that case
 275                     }
 276                 
 277                     if (!cached)
 278                     {
 279                         _initConsumer(consumerName, consumer);
 280 h.sterling 1.1  
 281                         if (!entryExists)
 282                         {
 283                             _consumers.insert(consumerName, consumer);
 284                         }
 285                     }
 286                 
 287                     consumer->updateIdleTimer();
 288                 
 289                     PEG_METHOD_EXIT();
 290                     return consumer;
 291                 }
 292                 
 293                 /** Initializes a DynamicConsumer.
 294                  * Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist.
 295                  * @throws Exception if the consumer cannot be initialized
 296                  */
 297                 void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer)
 298                 {
 299                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
 300                 
 301 h.sterling 1.1      CIMIndicationConsumerProvider* base = 0;
 302                     ConsumerModule* module = 0;
 303                 
 304                     //lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one)
 305                     String libraryName = _getConsumerLibraryName(consumerName);
 306                     module = _lookupModule(libraryName);
 307                 
 308                     //build library path
 309                     String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName));
 310                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath);
 311                 
 312                     //load module
 313                     try
 314                     {
 315                         base = module->load(consumerName, libraryPath);
 316                         consumer->set(module, base);
 317                 
 318                     } catch (Exception& ex)
 319                     {
 320                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage());
 321                 
 322 h.sterling 1.1          throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 323                                                            "Cannot load module ($0:$1): Unknown exception.",
 324                                                            consumerName,
 325                                                            libraryName));
 326                     } catch (...)
 327                     {
 328                         throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 329                                                            "Cannot load module ($0:$1): Unknown exception.",
 330                                                            consumerName,
 331                                                            libraryName));
 332                     }
 333                 
 334                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName);
 335                 
 336                     //initialize consumer
 337                     try
 338                     {
 339                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " +  consumerName);
 340                 
 341                         consumer->initialize();
 342                 
 343 h.sterling 1.1          //ATTN: need to change this
 344                         Semaphore* semaphore = new Semaphore(0);  //blocking
 345                 
 346                         consumer->setShutdownSemaphore(semaphore);
 347                 
 348                         //start the worker thread
 349 konrad.r   1.7          if (_thread_pool->allocate_and_awaken(consumer,
 350 h.sterling 1.1                                            _worker_routine,
 351 konrad.r   1.7                                            semaphore) != PEGASUS_THREAD_OK)
 352 h.sterling 1.8      {
 353 yi.zhou    1.17         Logger::put(Logger::STANDARD_LOG, System::CIMLISTENER, Logger::TRACE,
 354 h.sterling 1.8          "Not enough threads for consumer.");
 355 konrad.r   1.7   
 356 marek      1.18         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,
 357 h.sterling 1.8          "Could not allocate thread for consumer.");
 358 konrad.r   1.7  
 359 h.sterling 1.8         consumer->setShutdownSemaphore(0);
 360                        delete semaphore;
 361 konrad.r   1.7             throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
 362 h.sterling 1.8                          "Not enough threads for consumer worker routine."));
 363 konrad.r   1.7          }
 364 h.sterling 1.1  
 365 h.sterling 1.8          //wait until the listening thread has started.  Otherwise, there is a miniscule chance that the first event will be enqueued
 366                         //before the consumer is waiting for it and the first indication after loading the consumer will be lost
 367                         consumer->waitForEventThread();
 368                 
 369 h.sterling 1.1          //load any outstanding requests
 370 h.sterling 1.11         Array<IndicationDispatchEvent> outstandingIndications = _deserializeOutstandingIndications(consumerName);
 371 h.sterling 1.1          if (outstandingIndications.size())
 372                         {
 373                             //the consumer will signal itself in _loadOustandingIndications
 374                             consumer->_loadOutstandingIndications(outstandingIndications);
 375                         }
 376                 
 377                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);
 378                 
 379                     } catch (...)
 380                     {
 381                         module->unloadModule();
 382                         consumer->reset();
 383                         throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
 384                                                            "Cannot initialize consumer ($0).",
 385                                                            consumerName));        
 386                     }
 387                 
 388                     PEG_METHOD_EXIT();    
 389                 }
 390                 
 391                 
 392 h.sterling 1.1  /** Returns the ConsumerModule with the given library name.  If it already exists, we return the one in the cache.  If it
 393                  *  DNE, we create it and add it to the table.
 394                  * @throws Exception if we cannot successfully create and initialize the consumer
 395                  */ 
 396                 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName) 
 397                 {
 398                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
 399                 
 400                     AutoMutex lock(_moduleTableMutex);
 401                 
 402                     ConsumerModule* module = 0;
 403                 
 404                     //see if consumer module is cached
 405                     if (_modules.lookup(libraryName, module))
 406                     {
 407                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
 408                                          "Found Consumer Module" + libraryName + " in Consumer Manager Cache");
 409                 
 410                     } else
 411                     {
 412                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
 413 h.sterling 1.1                           "Creating Consumer Provider Module " + libraryName);
 414                 
 415                         module = new ConsumerModule(); 
 416                         _modules.insert(libraryName, module);
 417                     }
 418                 
 419                     PEG_METHOD_EXIT();
 420                     return(module);
 421                 }
 422                 
 423                 /** Returns true if there are active consumers
 424                  */ 
 425                 Boolean ConsumerManager::hasActiveConsumers()
 426                 {
 427                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
 428                 
 429                     AutoMutex lock(_consumerTableMutex);
 430                     DynamicConsumer* consumer = 0;
 431                 
 432                     try
 433                     {
 434 h.sterling 1.1          for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 435                         {
 436                             consumer = i.value();
 437                 
 438                             if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))
 439                             {
 440                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);
 441                                 PEG_METHOD_EXIT();
 442                                 return true;
 443                             }
 444                         }
 445                     } catch (...)
 446                     {
 447                         // Unexpected exception; do not assume that no providers are loaded
 448 marek      1.18         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");
 449 h.sterling 1.1          PEG_METHOD_EXIT();
 450                         return true;
 451                     }
 452                 
 453                     PEG_METHOD_EXIT();
 454                     return false;
 455                 }
 456                 
 457                 /** Returns true if there are loaded consumers
 458                  */ 
 459                 Boolean ConsumerManager::hasLoadedConsumers()
 460                 {
 461                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
 462                 
 463                     AutoMutex lock(_consumerTableMutex);
 464                     DynamicConsumer* consumer = 0;
 465                 
 466                     try
 467                     {
 468                         for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 469                         {
 470 h.sterling 1.1              consumer = i.value();
 471                 
 472                             if (consumer && consumer->isLoaded())
 473                             {
 474                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);
 475                                 PEG_METHOD_EXIT();
 476                                 return true;
 477                             }
 478                         }
 479                     } catch (...)
 480                     {
 481                         // Unexpected exception; do not assume that no providers are loaded
 482 marek      1.18         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");
 483 h.sterling 1.1          PEG_METHOD_EXIT();
 484                         return true;
 485                     }
 486                 
 487                     PEG_METHOD_EXIT();
 488                     return false;
 489                 }
 490                 
 491                 
 492                 /** Shutting down a consumer consists of four major steps:
 493                  * 1) Send the shutdown signal.  This causes the worker routine to break out of the loop and exit.
 494                  * 2) Wait for the worker thread to end.  This may take a while if it's processing an indication.  This
 495                  *    is optional in a shutdown scenario.  If the listener is shutdown with a -f force, the listener
 496                  *    will not wait for the consumer to finish before shutting down.  Note that a normal shutdown only allows
 497                  *    the current consumer indication to finish.  All other queued indications are serialized to a log and 
 498                  *    are sent when the consumer is reoaded.
 499                  * 3) Terminate the consumer provider interface.
 500                  * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0)
 501                  * 
 502                  * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all
 503                  * of the consumers so the threads can finish simultaneously.
 504 h.sterling 1.1   * 
 505                  * ATTN: Should the normal shutdown wait for everything in the queue to be processed?  Just new indications
 506                  * to be processed?  I am not inclined to this solution since it could take a LOT of time.  By serializing 
 507                  * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL
 508                  * queued indications on shutdown.  
 509                  */ 
 510                 
 511                 /** Unloads all consumers.
 512                  */ 
 513                 void ConsumerManager::unloadAllConsumers()
 514                 {
 515                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
 516                 
 517                     AutoMutex lock(_consumerTableMutex);
 518                 
 519                     if (!_consumers.size())
 520                     {
 521 marek      1.18         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 522 h.sterling 1.1          PEG_METHOD_EXIT();
 523                         return;
 524                     }
 525                 
 526                     if (!_forceShutdown)
 527                     {
 528                         //wait until all the consumers have finished processing the events in their queue
 529                         //ATTN: Should this have a timeout even though it's a force??
 530                         while (hasActiveConsumers())
 531                         {
 532 mike       1.15             Threads::sleep(500);
 533 h.sterling 1.1          }
 534                     }
 535                 
 536                     Array<DynamicConsumer*> loadedConsumers;
 537                 
 538                     ConsumerTable::Iterator i = _consumers.start();
 539                     DynamicConsumer* consumer = 0;
 540                 
 541                     for (; i!=0; i++)
 542                     {
 543                         consumer = i.value();
 544                         if (consumer && consumer->isLoaded())
 545                         {
 546                             loadedConsumers.append(consumer);
 547                         }
 548                     }
 549                 
 550                     if (loadedConsumers.size())
 551                     {
 552                         try
 553                         {
 554 h.sterling 1.1              _unloadConsumers(loadedConsumers);
 555                 
 556 kumpf      1.16         } catch (Exception&)
 557 h.sterling 1.1          {
 558 marek      1.18             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 559 h.sterling 1.1          }
 560                     } else
 561                     {
 562 marek      1.18         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 563 h.sterling 1.1      }
 564                 
 565                     PEG_METHOD_EXIT();
 566                 }
 567                 
 568                 /** Unloads idle consumers.
 569                  */ 
 570                 void ConsumerManager::unloadIdleConsumers()
 571                 {
 572                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
 573                 
 574                     AutoMutex lock(_consumerTableMutex);
 575                 
 576                     if (!_consumers.size())
 577                     {
 578 marek      1.18         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 579 h.sterling 1.1          PEG_METHOD_EXIT();
 580                         return;
 581                     }
 582                 
 583                     Array<DynamicConsumer*> loadedConsumers;
 584                 
 585                     ConsumerTable::Iterator i = _consumers.start();
 586                     DynamicConsumer* consumer = 0;
 587                 
 588                     for (; i!=0; i++)
 589                     {
 590                         consumer = i.value();
 591                         if (consumer && consumer->isLoaded() && consumer->isIdle())
 592                         {
 593                             loadedConsumers.append(consumer);
 594                         }
 595                     }
 596                 
 597                     if (loadedConsumers.size())
 598                     {
 599                         try
 600 h.sterling 1.1          {
 601                             _unloadConsumers(loadedConsumers);
 602                 
 603 kumpf      1.16         } catch (Exception&)
 604 h.sterling 1.1          {
 605 marek      1.18             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 606 h.sterling 1.1          }
 607                     } else
 608                     {
 609 marek      1.18         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 610 h.sterling 1.1      }
 611                 
 612                     PEG_METHOD_EXIT();
 613                 }
 614                 
 615                 /** Unloads a single consumer.
 616                  */ 
 617                 void ConsumerManager::unloadConsumer(const String& consumerName)
 618                 {
 619                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
 620                 
 621                     AutoMutex lock(_consumerTableMutex);
 622                 
 623                     DynamicConsumer* consumer = 0;
 624                 
 625                     //check whether the consumer exists
 626                     if (!_consumers.lookup(consumerName, consumer))
 627                     {
 628                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);
 629                         return;
 630                     }
 631 h.sterling 1.1  
 632                     //check whether the consumer is loaded
 633                     if (consumer && consumer->isLoaded())  //ATTN: forceShutdown?
 634                     {
 635                         //unload the consumer
 636                         Array<DynamicConsumer*> loadedConsumers;
 637                         loadedConsumers.append(consumer);
 638                 
 639                         try
 640                         {
 641                             _unloadConsumers(loadedConsumers);
 642                 
 643 kumpf      1.16         } catch (Exception&)
 644 h.sterling 1.1          {
 645 marek      1.18             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 646 h.sterling 1.1          }
 647                 
 648                     } else
 649                     {
 650                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);
 651                     }
 652                 
 653                     PEG_METHOD_EXIT();
 654                 }
 655                 
 656                 /** Unloads the consumers in the given array.
 657                  *  The consumerTable mutex MUST be locked prior to entering this method.
 658                  */ 
 659                 void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload)
 660                 {
 661                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
 662                 
 663                     //tell consumers to shutdown
 664                     for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 665                     {
 666                         consumersToUnload[i]->sendShutdownSignal();
 667 h.sterling 1.1      }
 668                 
 669 marek      1.18     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");
 670 h.sterling 1.1  
 671                     //wait for all the consumer worker threads to complete
 672                     //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last
 673                     //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop
 674                     //processing its requests.
 675                     for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 676                     {
 677                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());
 678                 
 679                         //wait for the consumer worker thread to end
 680                         try
 681                         {
 682                             Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();
 683                             if (_shutdownSemaphore)
 684                             {
 685                                 _shutdownSemaphore->time_wait(10000); 
 686                             }
 687                 
 688                         } catch (TimeOut &)
 689                         {
 690 marek      1.18             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");
 691 h.sterling 1.1          }
 692                 
 693 marek      1.18         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");
 694 h.sterling 1.1  
 695                         try
 696                         {
 697                             //terminate consumer provider interface
 698                             consumersToUnload[i]->terminate();
 699                 
 700                             //unload consumer provider module
 701                             PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
 702                             consumersToUnload[i]->_module->unloadModule();
 703                 
 704                             //serialize outstanding indications
 705                             _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());
 706                 
 707                             //reset the consumer
 708                             consumersToUnload[i]->reset();
 709                 
 710 marek      1.18             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");
 711 h.sterling 1.1  
 712                         } catch (Exception& e)
 713                         {
 714                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage()); 
 715                             //ATTN: throw exception? log warning?
 716                         }
 717                     }
 718                 
 719                     PEG_METHOD_EXIT();
 720                 }
 721                 
 722                 /** Serializes oustanding indications to a <MyConsumer>.dat file
 723                  */
 724 h.sterling 1.11 void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<IndicationDispatchEvent> indications)
 725 h.sterling 1.1  {
 726                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");
 727                 
 728                     if (!indications.size())
 729                     {
 730                         PEG_METHOD_EXIT();
 731                         return;
 732                     }
 733                 
 734                     String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
 735                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
 736                 
 737 mike       1.9      Buffer buffer;
 738 h.sterling 1.1  
 739                     // Open the log file and serialize remaining 
 740                     FILE* fileHandle = 0;
 741                     fileHandle = fopen((const char*)fileName.getCString(), "w"); 
 742                 
 743                     if (!fileHandle)
 744                     {
 745                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);
 746                 
 747                     } else
 748                     {
 749 marek      1.18         PEG_TRACE((__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
 750 h.sterling 1.1                        "Serializing %d outstanding requests for %s",
 751                                       indications.size(),
 752 marek      1.18                       (const char*)consumerName.getCString()));
 753 h.sterling 1.1  
 754                         //we have to put the array of instances under a valid root element or the parser complains 
 755                         XmlWriter::append(buffer, "<IRETURNVALUE>\n");
 756                 
 757 h.sterling 1.11 		CIMInstance cimInstance;
 758 h.sterling 1.1          for (Uint32 i = 0; i < indications.size(); i++)
 759                         {
 760 h.sterling 1.11 			//set the URL string property on the serializable instance
 761                 			CIMValue cimValue(CIMTYPE_STRING, false);
 762                 			cimValue.set(indications[i].getURL());
 763                 			cimInstance = indications[i].getIndicationInstance();
 764                 			CIMProperty cimProperty(URL_PROPERTY, cimValue);
 765                 			cimInstance.addProperty(cimProperty);
 766                 
 767                             XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
 768                 		}
 769 h.sterling 1.1  
 770 kumpf      1.19         XmlWriter::append(buffer, "</IRETURNVALUE>");
 771 h.sterling 1.1  
 772                         fputs((const char*)buffer.getData(), fileHandle);
 773                 
 774                         fclose(fileHandle);
 775                     }
 776                 
 777                     PEG_METHOD_EXIT();
 778                 }
 779                 
 780                 /** Reads outstanding indications from a <MyConsumer>.dat file
 781                  */ 
 782 h.sterling 1.11 Array<IndicationDispatchEvent> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName)
 783 h.sterling 1.1  {
 784                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");
 785                 
 786                     String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
 787                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
 788                 
 789                     Array<CIMInstance> cimInstances;
 790 h.sterling 1.11 	Array<String>      urlStrings;
 791                 	Array<IndicationDispatchEvent> indications;
 792 h.sterling 1.1  
 793                     // Open the log file and serialize remaining indications
 794                     if (FileSystem::exists(fileName)  && FileSystem::canRead(fileName))
 795                     {
 796 mike       1.9          Buffer text;
 797 h.sterling 1.1          CIMInstance cimInstance;
 798 h.sterling 1.11 		CIMProperty cimProperty;
 799                 		CIMValue cimValue;
 800                 		String urlString;
 801 h.sterling 1.1          XmlEntry entry;
 802                 
 803                         try
 804                         {
 805                             FileSystem::loadFileToMemory(text, fileName);  //ATTN: Is this safe to use; what about CRLFs?
 806                 
 807                             //parse the file
 808                             XmlParser parser((char*)text.getData());
 809                             XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
 810                 
 811                             while (XmlReader::getNamedInstanceElement(parser, cimInstance))
 812                             {
 813 h.sterling 1.11 				Uint32 index = cimInstance.findProperty(URL_PROPERTY);
 814                 				if (index != PEG_NOT_FOUND)
 815                 				{
 816                 					//get the URL string property from the serialized instance and remove the property
 817                 					cimProperty = cimInstance.getProperty(index);
 818                 					cimValue = cimProperty.getValue();
 819                 					cimValue.get(urlString);
 820                 					cimInstance.removeProperty(index);
 821                 				}
 822                 				IndicationDispatchEvent* indicationEvent = new IndicationDispatchEvent(OperationContext(), urlString, cimInstance);
 823                                 indications.append(*indicationEvent);
 824 h.sterling 1.1              }
 825                 
 826                             XmlReader::expectEndTag(parser, "IRETURNVALUE");
 827                 
 828 marek      1.18             PEG_TRACE((__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
 829 h.sterling 1.1                            "Consumer %s has %d outstanding indications",
 830                                           (const char*)consumerName.getCString(),
 831 marek      1.18                           indications.size()));
 832 h.sterling 1.1  
 833                             //delete the file 
 834                             FileSystem::removeFile(fileName);
 835                 
 836                         } catch (Exception& ex)
 837                         {
 838                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);
 839                 
 840                         } catch (...)
 841                         {
 842                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);
 843                         }
 844                     }
 845                 
 846                     PEG_METHOD_EXIT();
 847 h.sterling 1.11     return indications;
 848 h.sterling 1.1  }
 849                 
 850                 
 851                 
 852                 /** 
 853                  * This is the main worker thread of the consumer.  By having only one thread per consumer, we eliminate a ton
 854                  * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive
 855                  * operations at once.  This also prevents one bad consumer from taking the entire listener down.  That being said,
 856                  * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread. 
 857                  * 
 858                  * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to
 859                  * complete, it may make sense to make the consumer multi-threaded.  The individual consumer can immediately spawn off
 860                  * new threads to handle indications, and return immediately to catch the next indication.  In this way, a consumer
 861                  * can attain extremely high performance. 
 862                  * 
 863                  * There are three different events that can signal us:
 864                  * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
 865                  * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state)
 866                  * 3) A retry signal (signalled from this routine itself)
 867                  * 
 868                  * The idea is that all new indications are put on the front of the queue and processed first.  All of the retry
 869 h.sterling 1.1   * indications are put on the back of the queue and are only processed AFTER all new indications are sent.
 870                  * Before processing each indication, we check to see whether or not the shutdown signal was given.  If so,
 871                  * we immediately break out of the loop, and another compenent serializes the remaining indications to a file.
 872                  * 
 873                  * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception.
 874                  * 
 875 h.sterling 1.5   * This function makes sure it waits until the default retry lapse has passed to avoid issues with the following scenario:
 876                  * 20 new indications come in, 10 of them are successful, 10 are not.
 877 h.sterling 1.1   * We were signalled 20 times, so we will pass the time_wait 20 times.  Perceivably, the process time on each indication
 878                  * could be minimal.  We could potentially proceed to process the retries after a very small time interval since
 879 h.sterling 1.5   * we would never hit the wait for the retry timeout.  
 880 h.sterling 1.1   * 
 881                  */ 
 882 mike       1.15 ThreadReturnType PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param)
 883 h.sterling 1.1  {
 884                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
 885                 
 886                     DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
 887                     String name = myself->getName();
 888 mike       1.15     List<IndicationDispatchEvent,Mutex> tmpEventQueue;
 889 h.sterling 1.1  
 890                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);
 891                 
 892 h.sterling 1.8      myself->_listeningSemaphore->signal();
 893                 
 894 h.sterling 1.1      while (true)
 895                     {
 896                         try
 897                         {
 898                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name);
 899                 
 900                             //wait to be signalled
 901                             myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
 902                 
 903                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name);
 904                 
 905                             //check whether we received the shutdown signal
 906                             if (myself->_dieNow)
 907                             {
 908                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name);
 909                                 break;
 910                             }
 911                 
 912                             //create a temporary queue to store failed indications
 913 mike       1.14             tmpEventQueue.clear();
 914 h.sterling 1.1  
 915                             //continue processing events until the queue is empty
 916                             //make sure to check for the shutdown signal before every iteration
 917 h.sterling 1.10             // Note that any time during our processing of events the Listener may be enqueueing NEW events for us to process.
 918                             // Because we are popping off the front and new events are being thrown on the back if events are failing when we start
 919                             // But are succeeding by the end of the processing, events may be sent out of chronological order.
 920                             // However. Once we complete the current queue of events, we will always send old events to be retried before sending any
 921                             // new events added afterwards.
 922 h.sterling 1.1              while (myself->_eventqueue.size())
 923                             {
 924                                 //check for shutdown signal
 925                                 //this only breaks us out of the queue loop, but we will immediately get through the next wait from
 926                                 //the shutdown signal itself, at which time we break out of the main loop
 927                                 if (myself->_dieNow)
 928                                 {
 929                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name);
 930                                     break;
 931                                 }
 932                 
 933                                 //pop next indication off the queue
 934                                 IndicationDispatchEvent* event = 0;
 935 mike       1.14                 event = myself->_eventqueue.remove_front();  //what exceptions/errors can this throw?
 936 h.sterling 1.1  
 937                                 if (!event)
 938                                 {
 939                                     //this should never happen
 940                                     continue;
 941                                 }
 942                 
 943                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);
 944                 
 945                                 try
 946                                 {
 947                                     myself->consumeIndication(event->getContext(),
 948                                                               event->getURL(),
 949                                                               event->getIndicationInstance());
 950                 
 951                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name);
 952                 
 953                                     delete event;
 954                                     continue;
 955                 
 956                                 } catch (CIMException & ce)
 957 h.sterling 1.1                  {
 958                                     //check for failure
 959                                     if (ce.getCode() == CIM_ERR_FAILED)
 960                                     {
 961                                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);
 962 h.sterling 1.10                         
 963                                         // Here we simply determine if we should increment the retry count or not.
 964                                         // We don't want to count a forced retry from a new event to count as a retry. We just have to do it for
 965                                         // order's sake. If the retry Lapse has lapsed on this event then increment the counter.
 966                                         if (event->getRetries() > 0) {
 967                                             Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), CIMDateTime::getCurrentDateTime());
 968                                             if (differenceInMicroseconds >= (DEFAULT_RETRY_LAPSE * 1000))
 969                                                 event->increaseRetries();
 970                                         } else {
 971                                             event->increaseRetries();
 972                                         }
 973 h.sterling 1.1  
 974                                         //determine if we have hit the max retry count
 975                                         if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
 976                                         {
 977 marek      1.18                             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,
 978 h.sterling 1.1                                               "Error: the maximum retry count has been exceeded.  Removing the event from the queue.");
 979                 
 980                                             Logger::put(
 981                                                        Logger::ERROR_LOG,
 982 yi.zhou    1.17                                        System::CIMLISTENER,
 983 h.sterling 1.1                                         Logger::SEVERE,
 984                                                        "The following indication did not get processed successfully: $0", 
 985                                                        event->getIndicationInstance().getPath().toString());
 986                 
 987                                             delete event;
 988                                             continue;
 989                 
 990                                         } else
 991                                         {
 992 marek      1.18                             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue");
 993 mike       1.14                             tmpEventQueue.insert_back(event);
 994 h.sterling 1.1                          }
 995                 
 996                                     } else
 997                                     {
 998                                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage());
 999                                         delete event;
1000                                         continue;
1001                                     }
1002                 
1003                                 } catch (Exception & ex)
1004                                 {
1005                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage());
1006                                     delete event;
1007                                     continue;
1008                 
1009                                 } catch (...)
1010                                 {
1011 marek      1.18                     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception.");
1012 h.sterling 1.1                      delete event;
1013                                     continue;
1014                                 } //end try
1015                 
1016                             } //while eventqueue
1017                 
1018 h.sterling 1.10             // Copy the failed indications back to the main queue
1019                             // We now lock the queue while adding the retries on to the queue so that new events can't get in in front
1020                             // Of those events we are retrying. Retried events happened before any new events coming in.
1021 h.sterling 1.1              IndicationDispatchEvent* tmpEvent = 0;
1022 h.sterling 1.10             myself->_eventqueue.try_lock();
1023 h.sterling 1.1              while (tmpEventQueue.size())
1024                             {
1025 mike       1.14                 tmpEvent = tmpEventQueue.remove_front();
1026                                 myself->_eventqueue.insert_back(tmpEvent);
1027 h.sterling 1.10                 
1028 h.sterling 1.1              }
1029 h.sterling 1.10             myself->_eventqueue.unlock();
1030 h.sterling 1.1  
1031 kumpf      1.16         } catch (TimeOut&)
1032 h.sterling 1.1          {
1033 marek      1.18             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::Time to retry any outstanding indications.");
1034 h.sterling 1.1  
1035                             //signal the queue in the same way we would if we received a new indication
1036                             //this allows the thread to fall into the queue processing code
1037                             myself->_check_queue->signal();
1038                 
1039                         } //time_wait
1040                 
1041                 
1042                     } //shutdown
1043                 
1044                     PEG_METHOD_EXIT();
1045                     return 0;
1046                 }
1047                 
1048                 
1049                 PEGASUS_NAMESPACE_END
1050                 
1051                 
1052                 

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2