(file) Return to ConsumerManager.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

   1 karl  1.12 //%2006////////////////////////////////////////////////////////////////////////
   2 h.sterling 1.1  //
   3                 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4                 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5                 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6                 // IBM Corp.; EMC Corporation, The Open Group.
   7                 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8                 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9                 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10                 // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 karl       1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  12                 // EMC Corporation; Symantec Corporation; The Open Group.
  13 h.sterling 1.1  //
  14                 // Permission is hereby granted, free of charge, to any person obtaining a copy
  15                 // of this software and associated documentation files (the "Software"), to
  16                 // deal in the Software without restriction, including without limitation the
  17                 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  18                 // sell copies of the Software, and to permit persons to whom the Software is
  19                 // furnished to do so, subject to the following conditions:
  20                 // 
  21                 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  22                 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  23                 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  24                 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  25                 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  26                 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  27                 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  28                 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29                 //
  30                 //==============================================================================
  31                 //
  32                 // Author: Heather Sterling (hsterl@us.ibm.com)
  33                 //
  34 h.sterling 1.1  // Modified By: 
  35                 //
  36                 //%/////////////////////////////////////////////////////////////////////////////
  37                 
  38                 #include <Pegasus/Common/Config.h>
  39                 //#include <cstdlib>
  40                 //#include <dlfcn.h>
  41                 #include <Pegasus/Common/System.h>
  42                 #include <Pegasus/Common/FileSystem.h>
  43                 #include <Pegasus/Common/Tracer.h>
  44                 #include <Pegasus/Common/Logger.h>
  45                 #include <Pegasus/Common/XmlReader.h>
  46                 #include <Pegasus/Common/XmlParser.h>
  47                 #include <Pegasus/Common/XmlWriter.h>
  48                 #include <Pegasus/Common/IPC.h>
  49                 
  50                 #include "ConsumerManager.h"
  51                 
  52                 PEGASUS_NAMESPACE_BEGIN
  53                 PEGASUS_USING_STD;
  54                 
  55 h.sterling 1.1  //ATTN: Can we just use a properties file instead??  If we only have one property, we may want to just parse it ourselves.
  56                 // We may need to add more properties, however.  Potential per consumer properties: unloadOk, idleTimout, retryCount, etc
  57                 static struct OptionRow optionsTable[] =
  58                 //optionname defaultvalue rqd  type domain domainsize clname hlpmsg
  59                 {
  60                 {"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"},
  61                 };
  62                 
  63                 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
  64                 
  65                 //retry settings
  66                 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
  67 h.sterling 1.6  static const Uint32 DEFAULT_RETRY_LAPSE = 300000;  //ms = 5 minutes
  68 h.sterling 1.1  
  69 h.sterling 1.11 //constant for fake property that is added to instances when serializing to track the full URL
  70                 static const String URL_PROPERTY = "URLString";
  71 h.sterling 1.1  
  72                 
  73                 ConsumerManager::ConsumerManager(const String& consumerDir, const String& consumerConfigDir, Boolean enableConsumerUnload, Uint32 idleTimeout) : 
  74                 _consumerDir(consumerDir),
  75                 _consumerConfigDir(consumerConfigDir),
  76                 _enableConsumerUnload(enableConsumerUnload),
  77                 _idleTimeout(idleTimeout),
  78                 _forceShutdown(true)
  79                 {
  80                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
  81                 
  82                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer library directory: " + consumerDir);
  83                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer configuration directory: " + consumerConfigDir);
  84                 
  85                     Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
  86                                   "Consumer unload enabled %d: idle timeout %d",
  87                                   enableConsumerUnload,
  88                                   idleTimeout);
  89                 
  90                 
  91 h.sterling 1.8      //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
  92 h.sterling 1.6      //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
  93 h.sterling 1.1  
  94 kumpf      1.3      struct timeval deallocateWait = {15, 0};
  95                     _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
  96 h.sterling 1.1  
  97                     _init();
  98                 
  99                     PEG_METHOD_EXIT();
 100                 }
 101                 
 102                 ConsumerManager::~ConsumerManager()
 103                 {
 104                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
 105                 
 106                     unloadAllConsumers();
 107                 
 108 h.sterling 1.5      if (_thread_pool != NULL)
 109                     {
 110 kumpf      1.3      delete _thread_pool;
 111 h.sterling 1.5      }
 112 h.sterling 1.1  
 113                     ConsumerTable::Iterator i = _consumers.start();
 114                     for (; i!=0; i++)
 115                     {
 116                         DynamicConsumer* consumer = i.value();
 117                         delete consumer;
 118                     }
 119                 
 120                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
 121                 
 122                     ModuleTable::Iterator j = _modules.start();
 123                     for (;j!=0;j++)
 124                     {
 125                         ConsumerModule* module = j.value();
 126                         delete module;
 127                     }
 128                 
 129                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
 130                 
 131                     PEG_METHOD_EXIT();
 132                 }
 133 h.sterling 1.1  
 134                 void ConsumerManager::_init()
 135                 {
 136                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
 137                 
 138                     //check if there are any outstanding indications
 139                     Array<String> files;
 140                     Uint32 pos;
 141                     String consumerName;
 142                 
 143                     if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
 144                     {
 145                         for (Uint32 i = 0; i < files.size(); i++)
 146                         {
 147                             pos = files[i].find(".dat");
 148                             if (pos != PEG_NOT_FOUND)
 149                             {
 150                                 consumerName = files[i].subString(0, pos);
 151                 
 152                                 try
 153                                 {
 154 h.sterling 1.1                      PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!");
 155                                     getConsumer(consumerName);
 156                 
 157                                 } catch (...)
 158                                 {
 159                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]);
 160                                 }
 161                             }
 162                         }
 163                     }
 164                 
 165                     PEG_METHOD_EXIT();
 166                 }
 167                 
 168                 String ConsumerManager::getConsumerDir()
 169                 {
 170                     return _consumerDir;
 171                 }
 172                 
 173                 String ConsumerManager::getConsumerConfigDir()
 174                 {
 175 h.sterling 1.1      return _consumerConfigDir;
 176                 }
 177                 
 178                 Boolean ConsumerManager::getEnableConsumerUnload()
 179                 {
 180                     return _enableConsumerUnload;
 181                 }
 182                 
 183                 Uint32 ConsumerManager::getIdleTimeout()
 184                 {
 185                     return _idleTimeout;
 186                 }
 187                 
 188                 /** Retrieves the library name associated with the consumer name.  By default, the library name
 189                   * is the same as the consumer name.  However, you may specify a different library name in a consumer
 190                   * configuration file.  This file must be named "MyConsumer.txt" and contain the following:
 191                   *     location="libraryName"
 192                   *
 193                   * The config file is optional and is generally only needed in cases where there are strict requirements
 194                   * on library naming.
 195                   *
 196 h.sterling 1.1    * It is the responsibility of the caller to catch any exceptions thrown by this method.
 197                   */
 198                 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
 199                 {
 200                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
 201                 
 202                     //default library name is consumer name
 203                     String libraryName = consumerName;
 204                 
 205                     //check whether an alternative library name was specified in an optional consumer config file
 206                     String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf"));
 207                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile);
 208                 
 209                     if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
 210                     {
 211                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName);
 212                 
 213                         try
 214                         {
 215 h.sterling 1.6              //Bugzilla 3765 - Change this to use a member var when OptionManager has a reset option
 216 h.sterling 1.8              OptionManager _optionMgr;
 217                             _optionMgr.registerOptions(optionsTable, NUM_OPTIONS); //comment this line out later
 218 h.sterling 1.1              _optionMgr.mergeFile(configFile);
 219                             _optionMgr.checkRequiredOptions();
 220                 
 221                             if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY))
 222                             {
 223                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile); 
 224                                 libraryName = consumerName;
 225                             }
 226                 
 227                         } catch (Exception & ex)
 228                         {
 229                             throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE",
 230                                                                "Error reading $0: $1.",
 231                                                                configFile,
 232                                                                ex.getMessage()));
 233                         }
 234                     } else
 235                     {
 236                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName);
 237                     }
 238                 
 239 h.sterling 1.1      PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName);
 240                 
 241                     PEG_METHOD_EXIT();
 242                     return libraryName;
 243                 }
 244                 
 245                 /** Returns the DynamicConsumer for the consumerName.  If it already exists, we return the one in the cache.  If it
 246                  *  DNE, we create it and initialize it, and add it to the table.
 247                  * @throws Exception if we cannot successfully create and initialize the consumer
 248                  */ 
 249                 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
 250                 {
 251                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
 252                 
 253                     DynamicConsumer* consumer = 0;
 254                     CIMIndicationConsumerProvider* consumerRef = 0;
 255                     Boolean cached = false;
 256                     Boolean entryExists = false;
 257                 
 258                     AutoMutex lock(_consumerTableMutex);
 259                 
 260 h.sterling 1.1      if (_consumers.lookup(consumerName, consumer))
 261                     {
 262                         //why isn't this working??
 263                         entryExists = true;
 264                 
 265                         if (consumer && consumer->isLoaded())
 266                         {
 267                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName);
 268                             cached = true;
 269                         }
 270                     } else
 271                     {
 272                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName);
 273                         consumer = new DynamicConsumer(consumerName);
 274                         //ATTN: The above is a memory leak if _initConsumer throws an exception
 275                         //need to delete it in that case
 276                     }
 277                 
 278                     if (!cached)
 279                     {
 280                         _initConsumer(consumerName, consumer);
 281 h.sterling 1.1  
 282                         if (!entryExists)
 283                         {
 284                             _consumers.insert(consumerName, consumer);
 285                         }
 286                     }
 287                 
 288                     consumer->updateIdleTimer();
 289                 
 290                     PEG_METHOD_EXIT();
 291                     return consumer;
 292                 }
 293                 
 294                 /** Initializes a DynamicConsumer.
 295                  * Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist.
 296                  * @throws Exception if the consumer cannot be initialized
 297                  */
 298                 void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer)
 299                 {
 300                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
 301                 
 302 h.sterling 1.1      CIMIndicationConsumerProvider* base = 0;
 303                     ConsumerModule* module = 0;
 304                 
 305                     //lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one)
 306                     String libraryName = _getConsumerLibraryName(consumerName);
 307                     module = _lookupModule(libraryName);
 308                 
 309                     //build library path
 310                     String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName));
 311                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath);
 312                 
 313                     //load module
 314                     try
 315                     {
 316                         base = module->load(consumerName, libraryPath);
 317                         consumer->set(module, base);
 318                 
 319                     } catch (Exception& ex)
 320                     {
 321                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage());
 322                 
 323 h.sterling 1.1          throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 324                                                            "Cannot load module ($0:$1): Unknown exception.",
 325                                                            consumerName,
 326                                                            libraryName));
 327                     } catch (...)
 328                     {
 329                         throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 330                                                            "Cannot load module ($0:$1): Unknown exception.",
 331                                                            consumerName,
 332                                                            libraryName));
 333                     }
 334                 
 335                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName);
 336                 
 337                     //initialize consumer
 338                     try
 339                     {
 340                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " +  consumerName);
 341                 
 342                         consumer->initialize();
 343                 
 344 h.sterling 1.1          //ATTN: need to change this
 345                         Semaphore* semaphore = new Semaphore(0);  //blocking
 346                 
 347                         consumer->setShutdownSemaphore(semaphore);
 348                 
 349                         //start the worker thread
 350 konrad.r   1.7          if (_thread_pool->allocate_and_awaken(consumer,
 351 h.sterling 1.1                                            _worker_routine,
 352 konrad.r   1.7                                            semaphore) != PEGASUS_THREAD_OK)
 353 h.sterling 1.8      {
 354                         Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
 355                         "Not enough threads for consumer.");
 356 konrad.r   1.7   
 357 h.sterling 1.8          Tracer::trace(TRC_LISTENER, Tracer::LEVEL2,
 358                         "Could not allocate thread for consumer.");
 359 konrad.r   1.7  
 360 h.sterling 1.8         consumer->setShutdownSemaphore(0);
 361                        delete semaphore;
 362 konrad.r   1.7             throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
 363 h.sterling 1.8                          "Not enough threads for consumer worker routine."));
 364 konrad.r   1.7          }
 365 h.sterling 1.1  
 366 h.sterling 1.8          //wait until the listening thread has started.  Otherwise, there is a miniscule chance that the first event will be enqueued
 367                         //before the consumer is waiting for it and the first indication after loading the consumer will be lost
 368                         consumer->waitForEventThread();
 369                 
 370 h.sterling 1.1          //load any outstanding requests
 371 h.sterling 1.11         Array<IndicationDispatchEvent> outstandingIndications = _deserializeOutstandingIndications(consumerName);
 372 h.sterling 1.1          if (outstandingIndications.size())
 373                         {
 374                             //the consumer will signal itself in _loadOustandingIndications
 375                             consumer->_loadOutstandingIndications(outstandingIndications);
 376                         }
 377                 
 378                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);
 379                 
 380                     } catch (...)
 381                     {
 382                         module->unloadModule();
 383                         consumer->reset();
 384                         throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
 385                                                            "Cannot initialize consumer ($0).",
 386                                                            consumerName));        
 387                     }
 388                 
 389                     PEG_METHOD_EXIT();    
 390                 }
 391                 
 392                 
 393 h.sterling 1.1  /** Returns the ConsumerModule with the given library name.  If it already exists, we return the one in the cache.  If it
 394                  *  DNE, we create it and add it to the table.
 395                  * @throws Exception if we cannot successfully create and initialize the consumer
 396                  */ 
 397                 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName) 
 398                 {
 399                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
 400                 
 401                     AutoMutex lock(_moduleTableMutex);
 402                 
 403                     ConsumerModule* module = 0;
 404                 
 405                     //see if consumer module is cached
 406                     if (_modules.lookup(libraryName, module))
 407                     {
 408                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
 409                                          "Found Consumer Module" + libraryName + " in Consumer Manager Cache");
 410                 
 411                     } else
 412                     {
 413                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
 414 h.sterling 1.1                           "Creating Consumer Provider Module " + libraryName);
 415                 
 416                         module = new ConsumerModule(); 
 417                         _modules.insert(libraryName, module);
 418                     }
 419                 
 420                     PEG_METHOD_EXIT();
 421                     return(module);
 422                 }
 423                 
 424                 /** Returns true if there are active consumers
 425                  */ 
 426                 Boolean ConsumerManager::hasActiveConsumers()
 427                 {
 428                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
 429                 
 430                     AutoMutex lock(_consumerTableMutex);
 431                     DynamicConsumer* consumer = 0;
 432                 
 433                     try
 434                     {
 435 h.sterling 1.1          for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 436                         {
 437                             consumer = i.value();
 438                 
 439                             if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))
 440                             {
 441                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);
 442                                 PEG_METHOD_EXIT();
 443                                 return true;
 444                             }
 445                         }
 446                     } catch (...)
 447                     {
 448                         // Unexpected exception; do not assume that no providers are loaded
 449                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");
 450                         PEG_METHOD_EXIT();
 451                         return true;
 452                     }
 453                 
 454                     PEG_METHOD_EXIT();
 455                     return false;
 456 h.sterling 1.1  }
 457                 
 458                 /** Returns true if there are loaded consumers
 459                  */ 
 460                 Boolean ConsumerManager::hasLoadedConsumers()
 461                 {
 462                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
 463                 
 464                     AutoMutex lock(_consumerTableMutex);
 465                     DynamicConsumer* consumer = 0;
 466                 
 467                     try
 468                     {
 469                         for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 470                         {
 471                             consumer = i.value();
 472                 
 473                             if (consumer && consumer->isLoaded())
 474                             {
 475                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);
 476                                 PEG_METHOD_EXIT();
 477 h.sterling 1.1                  return true;
 478                             }
 479                         }
 480                     } catch (...)
 481                     {
 482                         // Unexpected exception; do not assume that no providers are loaded
 483                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");
 484                         PEG_METHOD_EXIT();
 485                         return true;
 486                     }
 487                 
 488                     PEG_METHOD_EXIT();
 489                     return false;
 490                 }
 491                 
 492                 
 493                 /** Shutting down a consumer consists of four major steps:
 494                  * 1) Send the shutdown signal.  This causes the worker routine to break out of the loop and exit.
 495                  * 2) Wait for the worker thread to end.  This may take a while if it's processing an indication.  This
 496                  *    is optional in a shutdown scenario.  If the listener is shutdown with a -f force, the listener
 497                  *    will not wait for the consumer to finish before shutting down.  Note that a normal shutdown only allows
 498 h.sterling 1.1   *    the current consumer indication to finish.  All other queued indications are serialized to a log and 
 499                  *    are sent when the consumer is reoaded.
 500                  * 3) Terminate the consumer provider interface.
 501                  * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0)
 502                  * 
 503                  * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all
 504                  * of the consumers so the threads can finish simultaneously.
 505                  * 
 506                  * ATTN: Should the normal shutdown wait for everything in the queue to be processed?  Just new indications
 507                  * to be processed?  I am not inclined to this solution since it could take a LOT of time.  By serializing 
 508                  * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL
 509                  * queued indications on shutdown.  
 510                  */ 
 511                 
 512                 /** Unloads all consumers.
 513                  */ 
 514                 void ConsumerManager::unloadAllConsumers()
 515                 {
 516                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
 517                 
 518                     AutoMutex lock(_consumerTableMutex);
 519 h.sterling 1.1  
 520                     if (!_consumers.size())
 521                     {
 522                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 523                         PEG_METHOD_EXIT();
 524                         return;
 525                     }
 526                 
 527                     if (!_forceShutdown)
 528                     {
 529                         //wait until all the consumers have finished processing the events in their queue
 530                         //ATTN: Should this have a timeout even though it's a force??
 531                         while (hasActiveConsumers())
 532                         {
 533                             pegasus_sleep(500);
 534                         }
 535                     }
 536                 
 537                     Array<DynamicConsumer*> loadedConsumers;
 538                 
 539                     ConsumerTable::Iterator i = _consumers.start();
 540 h.sterling 1.1      DynamicConsumer* consumer = 0;
 541                 
 542                     for (; i!=0; i++)
 543                     {
 544                         consumer = i.value();
 545                         if (consumer && consumer->isLoaded())
 546                         {
 547                             loadedConsumers.append(consumer);
 548                         }
 549                     }
 550                 
 551                     if (loadedConsumers.size())
 552                     {
 553                         try
 554                         {
 555                             _unloadConsumers(loadedConsumers);
 556                 
 557                         } catch (Exception& ex)
 558                         {
 559                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 560                         }
 561 h.sterling 1.1      } else
 562                     {
 563                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 564                     }
 565                 
 566                     PEG_METHOD_EXIT();
 567                 }
 568                 
 569                 /** Unloads idle consumers.
 570                  */ 
 571                 void ConsumerManager::unloadIdleConsumers()
 572                 {
 573                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
 574                 
 575                     AutoMutex lock(_consumerTableMutex);
 576                 
 577                     if (!_consumers.size())
 578                     {
 579                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 580                         PEG_METHOD_EXIT();
 581                         return;
 582 h.sterling 1.1      }
 583                 
 584                     Array<DynamicConsumer*> loadedConsumers;
 585                 
 586                     ConsumerTable::Iterator i = _consumers.start();
 587                     DynamicConsumer* consumer = 0;
 588                 
 589                     for (; i!=0; i++)
 590                     {
 591                         consumer = i.value();
 592                         if (consumer && consumer->isLoaded() && consumer->isIdle())
 593                         {
 594                             loadedConsumers.append(consumer);
 595                         }
 596                     }
 597                 
 598                     if (loadedConsumers.size())
 599                     {
 600                         try
 601                         {
 602                             _unloadConsumers(loadedConsumers);
 603 h.sterling 1.1  
 604                         } catch (Exception& ex)
 605                         {
 606                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 607                         }
 608                     } else
 609                     {
 610                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 611                     }
 612                 
 613                     PEG_METHOD_EXIT();
 614                 }
 615                 
 616                 /** Unloads a single consumer.
 617                  */ 
 618                 void ConsumerManager::unloadConsumer(const String& consumerName)
 619                 {
 620                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
 621                 
 622                     AutoMutex lock(_consumerTableMutex);
 623                 
 624 h.sterling 1.1      DynamicConsumer* consumer = 0;
 625                 
 626                     //check whether the consumer exists
 627                     if (!_consumers.lookup(consumerName, consumer))
 628                     {
 629                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);
 630                         return;
 631                     }
 632                 
 633                     //check whether the consumer is loaded
 634                     if (consumer && consumer->isLoaded())  //ATTN: forceShutdown?
 635                     {
 636                         //unload the consumer
 637                         Array<DynamicConsumer*> loadedConsumers;
 638                         loadedConsumers.append(consumer);
 639                 
 640                         try
 641                         {
 642                             _unloadConsumers(loadedConsumers);
 643                 
 644                         } catch (Exception& ex)
 645 h.sterling 1.1          {
 646                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 647                         }
 648                 
 649                     } else
 650                     {
 651                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);
 652                     }
 653                 
 654                     PEG_METHOD_EXIT();
 655                 }
 656                 
 657                 /** Unloads the consumers in the given array.
 658                  *  The consumerTable mutex MUST be locked prior to entering this method.
 659                  */ 
 660                 void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload)
 661                 {
 662                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
 663                 
 664                     //tell consumers to shutdown
 665                     for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 666 h.sterling 1.1      {
 667                         consumersToUnload[i]->sendShutdownSignal();
 668                     }
 669                 
 670                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");
 671                 
 672                     //wait for all the consumer worker threads to complete
 673                     //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last
 674                     //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop
 675                     //processing its requests.
 676                     for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 677                     {
 678                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());
 679                 
 680                         //wait for the consumer worker thread to end
 681                         try
 682                         {
 683                             Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();
 684                             if (_shutdownSemaphore)
 685                             {
 686                                 _shutdownSemaphore->time_wait(10000); 
 687 h.sterling 1.1              }
 688                 
 689                         } catch (TimeOut &)
 690                         {
 691                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");
 692                         }
 693                 
 694                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");
 695                 
 696                         try
 697                         {
 698                             //terminate consumer provider interface
 699                             consumersToUnload[i]->terminate();
 700                 
 701                             //unload consumer provider module
 702                             PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
 703                             consumersToUnload[i]->_module->unloadModule();
 704                 
 705                             //serialize outstanding indications
 706                             _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());
 707                 
 708 h.sterling 1.1              //reset the consumer
 709                             consumersToUnload[i]->reset();
 710                 
 711                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");
 712                 
 713                         } catch (Exception& e)
 714                         {
 715                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage()); 
 716                             //ATTN: throw exception? log warning?
 717                         }
 718                     }
 719                 
 720                     PEG_METHOD_EXIT();
 721                 }
 722                 
 723                 /** Serializes oustanding indications to a <MyConsumer>.dat file
 724                  */
 725 h.sterling 1.11 void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<IndicationDispatchEvent> indications)
 726 h.sterling 1.1  {
 727                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");
 728                 
 729                     if (!indications.size())
 730                     {
 731                         PEG_METHOD_EXIT();
 732                         return;
 733                     }
 734                 
 735                     String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
 736                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
 737                 
 738 mike       1.9      Buffer buffer;
 739 h.sterling 1.1  
 740                     // Open the log file and serialize remaining 
 741                     FILE* fileHandle = 0;
 742                     fileHandle = fopen((const char*)fileName.getCString(), "w"); 
 743                 
 744                     if (!fileHandle)
 745                     {
 746                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);
 747                 
 748                     } else
 749                     {
 750                         Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
 751                                       "Serializing %d outstanding requests for %s",
 752                                       indications.size(),
 753                                       (const char*)consumerName.getCString());
 754                 
 755                         //we have to put the array of instances under a valid root element or the parser complains 
 756                         XmlWriter::append(buffer, "<IRETURNVALUE>\n");
 757                 
 758 h.sterling 1.11 		CIMInstance cimInstance;
 759 h.sterling 1.1          for (Uint32 i = 0; i < indications.size(); i++)
 760                         {
 761 h.sterling 1.11 			//set the URL string property on the serializable instance
 762                 			CIMValue cimValue(CIMTYPE_STRING, false);
 763                 			cimValue.set(indications[i].getURL());
 764                 			cimInstance = indications[i].getIndicationInstance();
 765                 			CIMProperty cimProperty(URL_PROPERTY, cimValue);
 766                 			cimInstance.addProperty(cimProperty);
 767                 
 768                             XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
 769                 		}
 770 h.sterling 1.1  
 771                         XmlWriter::append(buffer, "</IRETURNVALUE>\0");
 772                 
 773                         fputs((const char*)buffer.getData(), fileHandle);
 774                 
 775                         fclose(fileHandle);
 776                     }
 777                 
 778                     PEG_METHOD_EXIT();
 779                 }
 780                 
 781                 /** Reads outstanding indications from a <MyConsumer>.dat file
 782                  */ 
 783 h.sterling 1.11 Array<IndicationDispatchEvent> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName)
 784 h.sterling 1.1  {
 785                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");
 786                 
 787                     String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
 788                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
 789                 
 790                     Array<CIMInstance> cimInstances;
 791 h.sterling 1.11 	Array<String>      urlStrings;
 792                 	Array<IndicationDispatchEvent> indications;
 793 h.sterling 1.1  
 794                     // Open the log file and serialize remaining indications
 795                     if (FileSystem::exists(fileName)  && FileSystem::canRead(fileName))
 796                     {
 797 mike       1.9          Buffer text;
 798 h.sterling 1.1          CIMInstance cimInstance;
 799 h.sterling 1.11 		CIMProperty cimProperty;
 800                 		CIMValue cimValue;
 801                 		String urlString;
 802 h.sterling 1.1          XmlEntry entry;
 803                 
 804                         try
 805                         {
 806                             FileSystem::loadFileToMemory(text, fileName);  //ATTN: Is this safe to use; what about CRLFs?
 807                             text.append('\0');
 808                 
 809                             //parse the file
 810                             XmlParser parser((char*)text.getData());
 811                             XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
 812                 
 813                             while (XmlReader::getNamedInstanceElement(parser, cimInstance))
 814                             {
 815 h.sterling 1.11 				Uint32 index = cimInstance.findProperty(URL_PROPERTY);
 816                 				if (index != PEG_NOT_FOUND)
 817                 				{
 818                 					//get the URL string property from the serialized instance and remove the property
 819                 					cimProperty = cimInstance.getProperty(index);
 820                 					cimValue = cimProperty.getValue();
 821                 					cimValue.get(urlString);
 822                 					cimInstance.removeProperty(index);
 823                 				}
 824                 				IndicationDispatchEvent* indicationEvent = new IndicationDispatchEvent(OperationContext(), urlString, cimInstance);
 825                                 indications.append(*indicationEvent);
 826 h.sterling 1.1              }
 827                 
 828                             XmlReader::expectEndTag(parser, "IRETURNVALUE");
 829                 
 830                             Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
 831                                           "Consumer %s has %d outstanding indications",
 832                                           (const char*)consumerName.getCString(),
 833 h.sterling 1.11                           indications.size());
 834 h.sterling 1.1  
 835                             //delete the file 
 836                             FileSystem::removeFile(fileName);
 837                 
 838                         } catch (Exception& ex)
 839                         {
 840                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);
 841                 
 842                         } catch (...)
 843                         {
 844                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);
 845                         }
 846                     }
 847                 
 848                     PEG_METHOD_EXIT();
 849 h.sterling 1.11     return indications;
 850 h.sterling 1.1  }
 851                 
 852                 
 853                 
 854                 /** 
 855                  * This is the main worker thread of the consumer.  By having only one thread per consumer, we eliminate a ton
 856                  * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive
 857                  * operations at once.  This also prevents one bad consumer from taking the entire listener down.  That being said,
 858                  * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread. 
 859                  * 
 860                  * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to
 861                  * complete, it may make sense to make the consumer multi-threaded.  The individual consumer can immediately spawn off
 862                  * new threads to handle indications, and return immediately to catch the next indication.  In this way, a consumer
 863                  * can attain extremely high performance. 
 864                  * 
 865                  * There are three different events that can signal us:
 866                  * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
 867                  * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state)
 868                  * 3) A retry signal (signalled from this routine itself)
 869                  * 
 870                  * The idea is that all new indications are put on the front of the queue and processed first.  All of the retry
 871 h.sterling 1.1   * indications are put on the back of the queue and are only processed AFTER all new indications are sent.
 872                  * Before processing each indication, we check to see whether or not the shutdown signal was given.  If so,
 873                  * we immediately break out of the loop, and another compenent serializes the remaining indications to a file.
 874                  * 
 875                  * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception.
 876                  * 
 877 h.sterling 1.5   * This function makes sure it waits until the default retry lapse has passed to avoid issues with the following scenario:
 878                  * 20 new indications come in, 10 of them are successful, 10 are not.
 879 h.sterling 1.1   * We were signalled 20 times, so we will pass the time_wait 20 times.  Perceivably, the process time on each indication
 880                  * could be minimal.  We could potentially proceed to process the retries after a very small time interval since
 881 h.sterling 1.5   * we would never hit the wait for the retry timeout.  
 882 h.sterling 1.1   * 
 883                  */ 
 884                 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param)
 885                 {
 886                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
 887                 
 888                     DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
 889                     String name = myself->getName();
 890                     DQueue<IndicationDispatchEvent> tmpEventQueue(true);
 891                 
 892                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);
 893                 
 894 h.sterling 1.8      myself->_listeningSemaphore->signal();
 895                 
 896 h.sterling 1.1      while (true)
 897                     {
 898                         try
 899                         {
 900                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name);
 901                 
 902                             //wait to be signalled
 903                             myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
 904                 
 905                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name);
 906                 
 907                             //check whether we received the shutdown signal
 908                             if (myself->_dieNow)
 909                             {
 910                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name);
 911                                 break;
 912                             }
 913                 
 914                             //create a temporary queue to store failed indications
 915                             tmpEventQueue.empty_list();
 916                 
 917 h.sterling 1.1              //continue processing events until the queue is empty
 918                             //make sure to check for the shutdown signal before every iteration
 919 h.sterling 1.10             // Note that any time during our processing of events the Listener may be enqueueing NEW events for us to process.
 920                             // Because we are popping off the front and new events are being thrown on the back if events are failing when we start
 921                             // But are succeeding by the end of the processing, events may be sent out of chronological order.
 922                             // However. Once we complete the current queue of events, we will always send old events to be retried before sending any
 923                             // new events added afterwards.
 924 h.sterling 1.1              while (myself->_eventqueue.size())
 925                             {
 926                                 //check for shutdown signal
 927                                 //this only breaks us out of the queue loop, but we will immediately get through the next wait from
 928                                 //the shutdown signal itself, at which time we break out of the main loop
 929                                 if (myself->_dieNow)
 930                                 {
 931                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name);
 932                                     break;
 933                                 }
 934                 
 935                                 //pop next indication off the queue
 936                                 IndicationDispatchEvent* event = 0;
 937                                 event = myself->_eventqueue.remove_first();  //what exceptions/errors can this throw?
 938                 
 939                                 if (!event)
 940                                 {
 941                                     //this should never happen
 942                                     continue;
 943                                 }
 944                 
 945 h.sterling 1.1                  PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);
 946                 
 947                                 try
 948                                 {
 949                                     myself->consumeIndication(event->getContext(),
 950                                                               event->getURL(),
 951                                                               event->getIndicationInstance());
 952                 
 953                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name);
 954                 
 955                                     delete event;
 956                                     continue;
 957                 
 958                                 } catch (CIMException & ce)
 959                                 {
 960                                     //check for failure
 961                                     if (ce.getCode() == CIM_ERR_FAILED)
 962                                     {
 963                                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);
 964 h.sterling 1.10                         
 965                                         // Here we simply determine if we should increment the retry count or not.
 966                                         // We don't want to count a forced retry from a new event to count as a retry. We just have to do it for
 967                                         // order's sake. If the retry Lapse has lapsed on this event then increment the counter.
 968                                         if (event->getRetries() > 0) {
 969                                             Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), CIMDateTime::getCurrentDateTime());
 970                                             if (differenceInMicroseconds >= (DEFAULT_RETRY_LAPSE * 1000))
 971                                                 event->increaseRetries();
 972                                         } else {
 973                                             event->increaseRetries();
 974                                         }
 975 h.sterling 1.1  
 976                                         //determine if we have hit the max retry count
 977                                         if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
 978                                         {
 979                                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2,
 980                                                              "Error: the maximum retry count has been exceeded.  Removing the event from the queue.");
 981                 
 982                                             Logger::put(
 983                                                        Logger::ERROR_LOG,
 984                                                        "ConsumerManager",
 985                                                        Logger::SEVERE,
 986                                                        "The following indication did not get processed successfully: $0", 
 987                                                        event->getIndicationInstance().getPath().toString());
 988                 
 989                                             delete event;
 990                                             continue;
 991                 
 992                                         } else
 993                                         {
 994                                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue");
 995                                             tmpEventQueue.insert_last(event);
 996 h.sterling 1.1                          }
 997                 
 998                                     } else
 999                                     {
1000                                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage());
1001                                         delete event;
1002                                         continue;
1003                                     }
1004                 
1005                                 } catch (Exception & ex)
1006                                 {
1007                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage());
1008                                     delete event;
1009                                     continue;
1010                 
1011                                 } catch (...)
1012                                 {
1013                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception.");
1014                                     delete event;
1015                                     continue;
1016                                 } //end try
1017 h.sterling 1.1  
1018                             } //while eventqueue
1019                 
1020 h.sterling 1.10             // Copy the failed indications back to the main queue
1021                             // We now lock the queue while adding the retries on to the queue so that new events can't get in in front
1022                             // Of those events we are retrying. Retried events happened before any new events coming in.
1023 h.sterling 1.1              IndicationDispatchEvent* tmpEvent = 0;
1024 h.sterling 1.10             myself->_eventqueue.try_lock();
1025 h.sterling 1.1              while (tmpEventQueue.size())
1026                             {
1027 h.sterling 1.10                 //there is no = operator for DQueue so we must do it manually
1028 h.sterling 1.1                  tmpEvent = tmpEventQueue.remove_first();
1029 h.sterling 1.10                 myself->_eventqueue.insert_last_no_lock(tmpEvent);
1030                                 
1031 h.sterling 1.1              }
1032 h.sterling 1.10             myself->_eventqueue.unlock();
1033 h.sterling 1.1  
1034                         } catch (TimeOut& te)
1035                         {
1036 h.sterling 1.10             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::Time to retry any outstanding indications.");
1037 h.sterling 1.1  
1038                             //signal the queue in the same way we would if we received a new indication
1039                             //this allows the thread to fall into the queue processing code
1040                             myself->_check_queue->signal();
1041                 
1042                         } //time_wait
1043                 
1044                 
1045                     } //shutdown
1046                 
1047                     PEG_METHOD_EXIT();
1048                     return 0;
1049                 }
1050                 
1051                 
1052                 PEGASUS_NAMESPACE_END
1053                 
1054                 
1055                 

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2