(file) Return to ConsumerManager.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

   1 karl  1.12 //%2006////////////////////////////////////////////////////////////////////////
   2 h.sterling 1.1  //
   3                 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4                 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5                 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6                 // IBM Corp.; EMC Corporation, The Open Group.
   7                 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8                 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9                 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10                 // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 karl       1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  12                 // EMC Corporation; Symantec Corporation; The Open Group.
  13 h.sterling 1.1  //
  14                 // Permission is hereby granted, free of charge, to any person obtaining a copy
  15                 // of this software and associated documentation files (the "Software"), to
  16                 // deal in the Software without restriction, including without limitation the
  17                 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  18                 // sell copies of the Software, and to permit persons to whom the Software is
  19                 // furnished to do so, subject to the following conditions:
  20                 // 
  21                 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  22                 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  23                 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  24                 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  25                 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  26                 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  27                 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  28                 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29                 //
  30                 //==============================================================================
  31                 //
  32                 // Author: Heather Sterling (hsterl@us.ibm.com)
  33                 //
  34 h.sterling 1.1  // Modified By: 
  35                 //
  36                 //%/////////////////////////////////////////////////////////////////////////////
  37                 
  38                 #include <Pegasus/Common/Config.h>
  39                 //#include <cstdlib>
  40                 //#include <dlfcn.h>
  41                 #include <Pegasus/Common/System.h>
  42                 #include <Pegasus/Common/FileSystem.h>
  43                 #include <Pegasus/Common/Tracer.h>
  44                 #include <Pegasus/Common/Logger.h>
  45                 #include <Pegasus/Common/XmlReader.h>
  46 mike       1.15 #include <Pegasus/Common/ThreadPool.h>
  47 h.sterling 1.1  #include <Pegasus/Common/XmlParser.h>
  48                 #include <Pegasus/Common/XmlWriter.h>
  49 mike       1.14 #include <Pegasus/Common/List.h>
  50 mike       1.15 #include <Pegasus/Common/Mutex.h>
  51 h.sterling 1.1  
  52                 #include "ConsumerManager.h"
  53                 
  54                 PEGASUS_NAMESPACE_BEGIN
  55                 PEGASUS_USING_STD;
  56                 
  57                 //ATTN: Can we just use a properties file instead??  If we only have one property, we may want to just parse it ourselves.
  58                 // We may need to add more properties, however.  Potential per consumer properties: unloadOk, idleTimout, retryCount, etc
  59                 static struct OptionRow optionsTable[] =
  60                 //optionname defaultvalue rqd  type domain domainsize clname hlpmsg
  61                 {
  62                 {"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"},
  63                 };
  64                 
  65                 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
  66                 
  67                 //retry settings
  68                 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
  69 h.sterling 1.6  static const Uint32 DEFAULT_RETRY_LAPSE = 300000;  //ms = 5 minutes
  70 h.sterling 1.1  
  71 h.sterling 1.11 //constant for fake property that is added to instances when serializing to track the full URL
  72                 static const String URL_PROPERTY = "URLString";
  73 h.sterling 1.1  
  74                 
  75                 ConsumerManager::ConsumerManager(const String& consumerDir, const String& consumerConfigDir, Boolean enableConsumerUnload, Uint32 idleTimeout) : 
  76                 _consumerDir(consumerDir),
  77                 _consumerConfigDir(consumerConfigDir),
  78                 _enableConsumerUnload(enableConsumerUnload),
  79                 _idleTimeout(idleTimeout),
  80                 _forceShutdown(true)
  81                 {
  82                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
  83                 
  84                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer library directory: " + consumerDir);
  85                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer configuration directory: " + consumerConfigDir);
  86                 
  87                     Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
  88                                   "Consumer unload enabled %d: idle timeout %d",
  89                                   enableConsumerUnload,
  90                                   idleTimeout);
  91                 
  92                 
  93 h.sterling 1.8      //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
  94 h.sterling 1.6      //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
  95 h.sterling 1.1  
  96 kumpf      1.3      struct timeval deallocateWait = {15, 0};
  97                     _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
  98 h.sterling 1.1  
  99                     _init();
 100                 
 101                     PEG_METHOD_EXIT();
 102                 }
 103                 
 104                 ConsumerManager::~ConsumerManager()
 105                 {
 106                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
 107                 
 108                     unloadAllConsumers();
 109                 
 110 kumpf      1.3      delete _thread_pool;
 111 h.sterling 1.1  
 112                     ConsumerTable::Iterator i = _consumers.start();
 113                     for (; i!=0; i++)
 114                     {
 115                         DynamicConsumer* consumer = i.value();
 116                         delete consumer;
 117                     }
 118                 
 119                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
 120                 
 121                     ModuleTable::Iterator j = _modules.start();
 122                     for (;j!=0;j++)
 123                     {
 124                         ConsumerModule* module = j.value();
 125                         delete module;
 126                     }
 127                 
 128                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
 129                 
 130                     PEG_METHOD_EXIT();
 131                 }
 132 h.sterling 1.1  
 133                 void ConsumerManager::_init()
 134                 {
 135                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
 136                 
 137                     //check if there are any outstanding indications
 138                     Array<String> files;
 139                     Uint32 pos;
 140                     String consumerName;
 141                 
 142                     if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
 143                     {
 144                         for (Uint32 i = 0; i < files.size(); i++)
 145                         {
 146                             pos = files[i].find(".dat");
 147                             if (pos != PEG_NOT_FOUND)
 148                             {
 149                                 consumerName = files[i].subString(0, pos);
 150                 
 151                                 try
 152                                 {
 153 h.sterling 1.1                      PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!");
 154                                     getConsumer(consumerName);
 155                 
 156                                 } catch (...)
 157                                 {
 158                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]);
 159                                 }
 160                             }
 161                         }
 162                     }
 163                 
 164                     PEG_METHOD_EXIT();
 165                 }
 166                 
 167                 String ConsumerManager::getConsumerDir()
 168                 {
 169                     return _consumerDir;
 170                 }
 171                 
 172                 String ConsumerManager::getConsumerConfigDir()
 173                 {
 174 h.sterling 1.1      return _consumerConfigDir;
 175                 }
 176                 
 177                 Boolean ConsumerManager::getEnableConsumerUnload()
 178                 {
 179                     return _enableConsumerUnload;
 180                 }
 181                 
 182                 Uint32 ConsumerManager::getIdleTimeout()
 183                 {
 184                     return _idleTimeout;
 185                 }
 186                 
 187                 /** Retrieves the library name associated with the consumer name.  By default, the library name
 188                   * is the same as the consumer name.  However, you may specify a different library name in a consumer
 189                   * configuration file.  This file must be named "MyConsumer.txt" and contain the following:
 190                   *     location="libraryName"
 191                   *
 192                   * The config file is optional and is generally only needed in cases where there are strict requirements
 193                   * on library naming.
 194                   *
 195 h.sterling 1.1    * It is the responsibility of the caller to catch any exceptions thrown by this method.
 196                   */
 197                 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
 198                 {
 199                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
 200                 
 201                     //default library name is consumer name
 202                     String libraryName = consumerName;
 203                 
 204                     //check whether an alternative library name was specified in an optional consumer config file
 205                     String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf"));
 206                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile);
 207                 
 208                     if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
 209                     {
 210                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName);
 211                 
 212                         try
 213                         {
 214 h.sterling 1.6              //Bugzilla 3765 - Change this to use a member var when OptionManager has a reset option
 215 h.sterling 1.8              OptionManager _optionMgr;
 216                             _optionMgr.registerOptions(optionsTable, NUM_OPTIONS); //comment this line out later
 217 h.sterling 1.1              _optionMgr.mergeFile(configFile);
 218                             _optionMgr.checkRequiredOptions();
 219                 
 220                             if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY))
 221                             {
 222                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile); 
 223                                 libraryName = consumerName;
 224                             }
 225                 
 226                         } catch (Exception & ex)
 227                         {
 228                             throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE",
 229                                                                "Error reading $0: $1.",
 230                                                                configFile,
 231                                                                ex.getMessage()));
 232                         }
 233                     } else
 234                     {
 235                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName);
 236                     }
 237                 
 238 h.sterling 1.1      PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName);
 239                 
 240                     PEG_METHOD_EXIT();
 241                     return libraryName;
 242                 }
 243                 
 244                 /** Returns the DynamicConsumer for the consumerName.  If it already exists, we return the one in the cache.  If it
 245                  *  DNE, we create it and initialize it, and add it to the table.
 246                  * @throws Exception if we cannot successfully create and initialize the consumer
 247                  */ 
 248                 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
 249                 {
 250                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
 251                 
 252                     DynamicConsumer* consumer = 0;
 253                     CIMIndicationConsumerProvider* consumerRef = 0;
 254                     Boolean cached = false;
 255                     Boolean entryExists = false;
 256                 
 257                     AutoMutex lock(_consumerTableMutex);
 258                 
 259 h.sterling 1.1      if (_consumers.lookup(consumerName, consumer))
 260                     {
 261                         //why isn't this working??
 262                         entryExists = true;
 263                 
 264                         if (consumer && consumer->isLoaded())
 265                         {
 266                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName);
 267                             cached = true;
 268                         }
 269                     } else
 270                     {
 271                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName);
 272                         consumer = new DynamicConsumer(consumerName);
 273                         //ATTN: The above is a memory leak if _initConsumer throws an exception
 274                         //need to delete it in that case
 275                     }
 276                 
 277                     if (!cached)
 278                     {
 279                         _initConsumer(consumerName, consumer);
 280 h.sterling 1.1  
 281                         if (!entryExists)
 282                         {
 283                             _consumers.insert(consumerName, consumer);
 284                         }
 285                     }
 286                 
 287                     consumer->updateIdleTimer();
 288                 
 289                     PEG_METHOD_EXIT();
 290                     return consumer;
 291                 }
 292                 
 293                 /** Initializes a DynamicConsumer.
 294                  * Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist.
 295                  * @throws Exception if the consumer cannot be initialized
 296                  */
 297                 void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer)
 298                 {
 299                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
 300                 
 301 h.sterling 1.1      CIMIndicationConsumerProvider* base = 0;
 302                     ConsumerModule* module = 0;
 303                 
 304                     //lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one)
 305                     String libraryName = _getConsumerLibraryName(consumerName);
 306                     module = _lookupModule(libraryName);
 307                 
 308                     //build library path
 309                     String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName));
 310                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath);
 311                 
 312                     //load module
 313                     try
 314                     {
 315                         base = module->load(consumerName, libraryPath);
 316                         consumer->set(module, base);
 317                 
 318                     } catch (Exception& ex)
 319                     {
 320                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage());
 321                 
 322 h.sterling 1.1          throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 323                                                            "Cannot load module ($0:$1): Unknown exception.",
 324                                                            consumerName,
 325                                                            libraryName));
 326                     } catch (...)
 327                     {
 328                         throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 329                                                            "Cannot load module ($0:$1): Unknown exception.",
 330                                                            consumerName,
 331                                                            libraryName));
 332                     }
 333                 
 334                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName);
 335                 
 336                     //initialize consumer
 337                     try
 338                     {
 339                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " +  consumerName);
 340                 
 341                         consumer->initialize();
 342                 
 343 h.sterling 1.1          //ATTN: need to change this
 344                         Semaphore* semaphore = new Semaphore(0);  //blocking
 345                 
 346                         consumer->setShutdownSemaphore(semaphore);
 347                 
 348                         //start the worker thread
 349 konrad.r   1.7          if (_thread_pool->allocate_and_awaken(consumer,
 350 h.sterling 1.1                                            _worker_routine,
 351 konrad.r   1.7                                            semaphore) != PEGASUS_THREAD_OK)
 352 h.sterling 1.8      {
 353 yi.zhou    1.17         Logger::put(Logger::STANDARD_LOG, System::CIMLISTENER, Logger::TRACE,
 354 h.sterling 1.8          "Not enough threads for consumer.");
 355 konrad.r   1.7   
 356 h.sterling 1.8          Tracer::trace(TRC_LISTENER, Tracer::LEVEL2,
 357                         "Could not allocate thread for consumer.");
 358 konrad.r   1.7  
 359 h.sterling 1.8         consumer->setShutdownSemaphore(0);
 360                        delete semaphore;
 361 konrad.r   1.7             throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
 362 h.sterling 1.8                          "Not enough threads for consumer worker routine."));
 363 konrad.r   1.7          }
 364 h.sterling 1.1  
 365 h.sterling 1.8          //wait until the listening thread has started.  Otherwise, there is a miniscule chance that the first event will be enqueued
 366                         //before the consumer is waiting for it and the first indication after loading the consumer will be lost
 367                         consumer->waitForEventThread();
 368                 
 369 h.sterling 1.1          //load any outstanding requests
 370 h.sterling 1.11         Array<IndicationDispatchEvent> outstandingIndications = _deserializeOutstandingIndications(consumerName);
 371 h.sterling 1.1          if (outstandingIndications.size())
 372                         {
 373                             //the consumer will signal itself in _loadOustandingIndications
 374                             consumer->_loadOutstandingIndications(outstandingIndications);
 375                         }
 376                 
 377                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);
 378                 
 379                     } catch (...)
 380                     {
 381                         module->unloadModule();
 382                         consumer->reset();
 383                         throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
 384                                                            "Cannot initialize consumer ($0).",
 385                                                            consumerName));        
 386                     }
 387                 
 388                     PEG_METHOD_EXIT();    
 389                 }
 390                 
 391                 
 392 h.sterling 1.1  /** Returns the ConsumerModule with the given library name.  If it already exists, we return the one in the cache.  If it
 393                  *  DNE, we create it and add it to the table.
 394                  * @throws Exception if we cannot successfully create and initialize the consumer
 395                  */ 
 396                 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName) 
 397                 {
 398                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
 399                 
 400                     AutoMutex lock(_moduleTableMutex);
 401                 
 402                     ConsumerModule* module = 0;
 403                 
 404                     //see if consumer module is cached
 405                     if (_modules.lookup(libraryName, module))
 406                     {
 407                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
 408                                          "Found Consumer Module" + libraryName + " in Consumer Manager Cache");
 409                 
 410                     } else
 411                     {
 412                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
 413 h.sterling 1.1                           "Creating Consumer Provider Module " + libraryName);
 414                 
 415                         module = new ConsumerModule(); 
 416                         _modules.insert(libraryName, module);
 417                     }
 418                 
 419                     PEG_METHOD_EXIT();
 420                     return(module);
 421                 }
 422                 
 423                 /** Returns true if there are active consumers
 424                  */ 
 425                 Boolean ConsumerManager::hasActiveConsumers()
 426                 {
 427                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
 428                 
 429                     AutoMutex lock(_consumerTableMutex);
 430                     DynamicConsumer* consumer = 0;
 431                 
 432                     try
 433                     {
 434 h.sterling 1.1          for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 435                         {
 436                             consumer = i.value();
 437                 
 438                             if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))
 439                             {
 440                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);
 441                                 PEG_METHOD_EXIT();
 442                                 return true;
 443                             }
 444                         }
 445                     } catch (...)
 446                     {
 447                         // Unexpected exception; do not assume that no providers are loaded
 448                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");
 449                         PEG_METHOD_EXIT();
 450                         return true;
 451                     }
 452                 
 453                     PEG_METHOD_EXIT();
 454                     return false;
 455 h.sterling 1.1  }
 456                 
 457                 /** Returns true if there are loaded consumers
 458                  */ 
 459                 Boolean ConsumerManager::hasLoadedConsumers()
 460                 {
 461                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
 462                 
 463                     AutoMutex lock(_consumerTableMutex);
 464                     DynamicConsumer* consumer = 0;
 465                 
 466                     try
 467                     {
 468                         for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 469                         {
 470                             consumer = i.value();
 471                 
 472                             if (consumer && consumer->isLoaded())
 473                             {
 474                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);
 475                                 PEG_METHOD_EXIT();
 476 h.sterling 1.1                  return true;
 477                             }
 478                         }
 479                     } catch (...)
 480                     {
 481                         // Unexpected exception; do not assume that no providers are loaded
 482                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");
 483                         PEG_METHOD_EXIT();
 484                         return true;
 485                     }
 486                 
 487                     PEG_METHOD_EXIT();
 488                     return false;
 489                 }
 490                 
 491                 
 492                 /** Shutting down a consumer consists of four major steps:
 493                  * 1) Send the shutdown signal.  This causes the worker routine to break out of the loop and exit.
 494                  * 2) Wait for the worker thread to end.  This may take a while if it's processing an indication.  This
 495                  *    is optional in a shutdown scenario.  If the listener is shutdown with a -f force, the listener
 496                  *    will not wait for the consumer to finish before shutting down.  Note that a normal shutdown only allows
 497 h.sterling 1.1   *    the current consumer indication to finish.  All other queued indications are serialized to a log and 
 498                  *    are sent when the consumer is reoaded.
 499                  * 3) Terminate the consumer provider interface.
 500                  * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0)
 501                  * 
 502                  * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all
 503                  * of the consumers so the threads can finish simultaneously.
 504                  * 
 505                  * ATTN: Should the normal shutdown wait for everything in the queue to be processed?  Just new indications
 506                  * to be processed?  I am not inclined to this solution since it could take a LOT of time.  By serializing 
 507                  * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL
 508                  * queued indications on shutdown.  
 509                  */ 
 510                 
 511                 /** Unloads all consumers.
 512                  */ 
 513                 void ConsumerManager::unloadAllConsumers()
 514                 {
 515                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
 516                 
 517                     AutoMutex lock(_consumerTableMutex);
 518 h.sterling 1.1  
 519                     if (!_consumers.size())
 520                     {
 521                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 522                         PEG_METHOD_EXIT();
 523                         return;
 524                     }
 525                 
 526                     if (!_forceShutdown)
 527                     {
 528                         //wait until all the consumers have finished processing the events in their queue
 529                         //ATTN: Should this have a timeout even though it's a force??
 530                         while (hasActiveConsumers())
 531                         {
 532 mike       1.15             Threads::sleep(500);
 533 h.sterling 1.1          }
 534                     }
 535                 
 536                     Array<DynamicConsumer*> loadedConsumers;
 537                 
 538                     ConsumerTable::Iterator i = _consumers.start();
 539                     DynamicConsumer* consumer = 0;
 540                 
 541                     for (; i!=0; i++)
 542                     {
 543                         consumer = i.value();
 544                         if (consumer && consumer->isLoaded())
 545                         {
 546                             loadedConsumers.append(consumer);
 547                         }
 548                     }
 549                 
 550                     if (loadedConsumers.size())
 551                     {
 552                         try
 553                         {
 554 h.sterling 1.1              _unloadConsumers(loadedConsumers);
 555                 
 556 kumpf      1.16         } catch (Exception&)
 557 h.sterling 1.1          {
 558                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 559                         }
 560                     } else
 561                     {
 562                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 563                     }
 564                 
 565                     PEG_METHOD_EXIT();
 566                 }
 567                 
 568                 /** Unloads idle consumers.
 569                  */ 
 570                 void ConsumerManager::unloadIdleConsumers()
 571                 {
 572                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
 573                 
 574                     AutoMutex lock(_consumerTableMutex);
 575                 
 576                     if (!_consumers.size())
 577                     {
 578 h.sterling 1.1          PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 579                         PEG_METHOD_EXIT();
 580                         return;
 581                     }
 582                 
 583                     Array<DynamicConsumer*> loadedConsumers;
 584                 
 585                     ConsumerTable::Iterator i = _consumers.start();
 586                     DynamicConsumer* consumer = 0;
 587                 
 588                     for (; i!=0; i++)
 589                     {
 590                         consumer = i.value();
 591                         if (consumer && consumer->isLoaded() && consumer->isIdle())
 592                         {
 593                             loadedConsumers.append(consumer);
 594                         }
 595                     }
 596                 
 597                     if (loadedConsumers.size())
 598                     {
 599 h.sterling 1.1          try
 600                         {
 601                             _unloadConsumers(loadedConsumers);
 602                 
 603 kumpf      1.16         } catch (Exception&)
 604 h.sterling 1.1          {
 605                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 606                         }
 607                     } else
 608                     {
 609                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 610                     }
 611                 
 612                     PEG_METHOD_EXIT();
 613                 }
 614                 
 615                 /** Unloads a single consumer.
 616                  */ 
 617                 void ConsumerManager::unloadConsumer(const String& consumerName)
 618                 {
 619                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
 620                 
 621                     AutoMutex lock(_consumerTableMutex);
 622                 
 623                     DynamicConsumer* consumer = 0;
 624                 
 625 h.sterling 1.1      //check whether the consumer exists
 626                     if (!_consumers.lookup(consumerName, consumer))
 627                     {
 628                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);
 629                         return;
 630                     }
 631                 
 632                     //check whether the consumer is loaded
 633                     if (consumer && consumer->isLoaded())  //ATTN: forceShutdown?
 634                     {
 635                         //unload the consumer
 636                         Array<DynamicConsumer*> loadedConsumers;
 637                         loadedConsumers.append(consumer);
 638                 
 639                         try
 640                         {
 641                             _unloadConsumers(loadedConsumers);
 642                 
 643 kumpf      1.16         } catch (Exception&)
 644 h.sterling 1.1          {
 645                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 646                         }
 647                 
 648                     } else
 649                     {
 650                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);
 651                     }
 652                 
 653                     PEG_METHOD_EXIT();
 654                 }
 655                 
 656                 /** Unloads the consumers in the given array.
 657                  *  The consumerTable mutex MUST be locked prior to entering this method.
 658                  */ 
 659                 void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload)
 660                 {
 661                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
 662                 
 663                     //tell consumers to shutdown
 664                     for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 665 h.sterling 1.1      {
 666                         consumersToUnload[i]->sendShutdownSignal();
 667                     }
 668                 
 669                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");
 670                 
 671                     //wait for all the consumer worker threads to complete
 672                     //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last
 673                     //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop
 674                     //processing its requests.
 675                     for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 676                     {
 677                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());
 678                 
 679                         //wait for the consumer worker thread to end
 680                         try
 681                         {
 682                             Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();
 683                             if (_shutdownSemaphore)
 684                             {
 685                                 _shutdownSemaphore->time_wait(10000); 
 686 h.sterling 1.1              }
 687                 
 688                         } catch (TimeOut &)
 689                         {
 690                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");
 691                         }
 692                 
 693                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");
 694                 
 695                         try
 696                         {
 697                             //terminate consumer provider interface
 698                             consumersToUnload[i]->terminate();
 699                 
 700                             //unload consumer provider module
 701                             PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
 702                             consumersToUnload[i]->_module->unloadModule();
 703                 
 704                             //serialize outstanding indications
 705                             _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());
 706                 
 707 h.sterling 1.1              //reset the consumer
 708                             consumersToUnload[i]->reset();
 709                 
 710                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");
 711                 
 712                         } catch (Exception& e)
 713                         {
 714                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage()); 
 715                             //ATTN: throw exception? log warning?
 716                         }
 717                     }
 718                 
 719                     PEG_METHOD_EXIT();
 720                 }
 721                 
 722                 /** Serializes oustanding indications to a <MyConsumer>.dat file
 723                  */
 724 h.sterling 1.11 void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<IndicationDispatchEvent> indications)
 725 h.sterling 1.1  {
 726                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");
 727                 
 728                     if (!indications.size())
 729                     {
 730                         PEG_METHOD_EXIT();
 731                         return;
 732                     }
 733                 
 734                     String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
 735                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
 736                 
 737 mike       1.9      Buffer buffer;
 738 h.sterling 1.1  
 739                     // Open the log file and serialize remaining 
 740                     FILE* fileHandle = 0;
 741                     fileHandle = fopen((const char*)fileName.getCString(), "w"); 
 742                 
 743                     if (!fileHandle)
 744                     {
 745                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);
 746                 
 747                     } else
 748                     {
 749                         Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
 750                                       "Serializing %d outstanding requests for %s",
 751                                       indications.size(),
 752                                       (const char*)consumerName.getCString());
 753                 
 754                         //we have to put the array of instances under a valid root element or the parser complains 
 755                         XmlWriter::append(buffer, "<IRETURNVALUE>\n");
 756                 
 757 h.sterling 1.11 		CIMInstance cimInstance;
 758 h.sterling 1.1          for (Uint32 i = 0; i < indications.size(); i++)
 759                         {
 760 h.sterling 1.11 			//set the URL string property on the serializable instance
 761                 			CIMValue cimValue(CIMTYPE_STRING, false);
 762                 			cimValue.set(indications[i].getURL());
 763                 			cimInstance = indications[i].getIndicationInstance();
 764                 			CIMProperty cimProperty(URL_PROPERTY, cimValue);
 765                 			cimInstance.addProperty(cimProperty);
 766                 
 767                             XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
 768                 		}
 769 h.sterling 1.1  
 770                         XmlWriter::append(buffer, "</IRETURNVALUE>\0");
 771                 
 772                         fputs((const char*)buffer.getData(), fileHandle);
 773                 
 774                         fclose(fileHandle);
 775                     }
 776                 
 777                     PEG_METHOD_EXIT();
 778                 }
 779                 
 780                 /** Reads outstanding indications from a <MyConsumer>.dat file
 781                  */ 
 782 h.sterling 1.11 Array<IndicationDispatchEvent> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName)
 783 h.sterling 1.1  {
 784                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");
 785                 
 786                     String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
 787                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
 788                 
 789                     Array<CIMInstance> cimInstances;
 790 h.sterling 1.11 	Array<String>      urlStrings;
 791                 	Array<IndicationDispatchEvent> indications;
 792 h.sterling 1.1  
 793                     // Open the log file and serialize remaining indications
 794                     if (FileSystem::exists(fileName)  && FileSystem::canRead(fileName))
 795                     {
 796 mike       1.9          Buffer text;
 797 h.sterling 1.1          CIMInstance cimInstance;
 798 h.sterling 1.11 		CIMProperty cimProperty;
 799                 		CIMValue cimValue;
 800                 		String urlString;
 801 h.sterling 1.1          XmlEntry entry;
 802                 
 803                         try
 804                         {
 805                             FileSystem::loadFileToMemory(text, fileName);  //ATTN: Is this safe to use; what about CRLFs?
 806                             text.append('\0');
 807                 
 808                             //parse the file
 809                             XmlParser parser((char*)text.getData());
 810                             XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
 811                 
 812                             while (XmlReader::getNamedInstanceElement(parser, cimInstance))
 813                             {
 814 h.sterling 1.11 				Uint32 index = cimInstance.findProperty(URL_PROPERTY);
 815                 				if (index != PEG_NOT_FOUND)
 816                 				{
 817                 					//get the URL string property from the serialized instance and remove the property
 818                 					cimProperty = cimInstance.getProperty(index);
 819                 					cimValue = cimProperty.getValue();
 820                 					cimValue.get(urlString);
 821                 					cimInstance.removeProperty(index);
 822                 				}
 823                 				IndicationDispatchEvent* indicationEvent = new IndicationDispatchEvent(OperationContext(), urlString, cimInstance);
 824                                 indications.append(*indicationEvent);
 825 h.sterling 1.1              }
 826                 
 827                             XmlReader::expectEndTag(parser, "IRETURNVALUE");
 828                 
 829                             Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
 830                                           "Consumer %s has %d outstanding indications",
 831                                           (const char*)consumerName.getCString(),
 832 h.sterling 1.11                           indications.size());
 833 h.sterling 1.1  
 834                             //delete the file 
 835                             FileSystem::removeFile(fileName);
 836                 
 837                         } catch (Exception& ex)
 838                         {
 839                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);
 840                 
 841                         } catch (...)
 842                         {
 843                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);
 844                         }
 845                     }
 846                 
 847                     PEG_METHOD_EXIT();
 848 h.sterling 1.11     return indications;
 849 h.sterling 1.1  }
 850                 
 851                 
 852                 
 853                 /** 
 854                  * This is the main worker thread of the consumer.  By having only one thread per consumer, we eliminate a ton
 855                  * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive
 856                  * operations at once.  This also prevents one bad consumer from taking the entire listener down.  That being said,
 857                  * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread. 
 858                  * 
 859                  * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to
 860                  * complete, it may make sense to make the consumer multi-threaded.  The individual consumer can immediately spawn off
 861                  * new threads to handle indications, and return immediately to catch the next indication.  In this way, a consumer
 862                  * can attain extremely high performance. 
 863                  * 
 864                  * There are three different events that can signal us:
 865                  * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
 866                  * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state)
 867                  * 3) A retry signal (signalled from this routine itself)
 868                  * 
 869                  * The idea is that all new indications are put on the front of the queue and processed first.  All of the retry
 870 h.sterling 1.1   * indications are put on the back of the queue and are only processed AFTER all new indications are sent.
 871                  * Before processing each indication, we check to see whether or not the shutdown signal was given.  If so,
 872                  * we immediately break out of the loop, and another compenent serializes the remaining indications to a file.
 873                  * 
 874                  * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception.
 875                  * 
 876 h.sterling 1.5   * This function makes sure it waits until the default retry lapse has passed to avoid issues with the following scenario:
 877                  * 20 new indications come in, 10 of them are successful, 10 are not.
 878 h.sterling 1.1   * We were signalled 20 times, so we will pass the time_wait 20 times.  Perceivably, the process time on each indication
 879                  * could be minimal.  We could potentially proceed to process the retries after a very small time interval since
 880 h.sterling 1.5   * we would never hit the wait for the retry timeout.  
 881 h.sterling 1.1   * 
 882                  */ 
 883 mike       1.15 ThreadReturnType PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param)
 884 h.sterling 1.1  {
 885                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
 886                 
 887                     DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
 888                     String name = myself->getName();
 889 mike       1.15     List<IndicationDispatchEvent,Mutex> tmpEventQueue;
 890 h.sterling 1.1  
 891                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);
 892                 
 893 h.sterling 1.8      myself->_listeningSemaphore->signal();
 894                 
 895 h.sterling 1.1      while (true)
 896                     {
 897                         try
 898                         {
 899                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name);
 900                 
 901                             //wait to be signalled
 902                             myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
 903                 
 904                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name);
 905                 
 906                             //check whether we received the shutdown signal
 907                             if (myself->_dieNow)
 908                             {
 909                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name);
 910                                 break;
 911                             }
 912                 
 913                             //create a temporary queue to store failed indications
 914 mike       1.14             tmpEventQueue.clear();
 915 h.sterling 1.1  
 916                             //continue processing events until the queue is empty
 917                             //make sure to check for the shutdown signal before every iteration
 918 h.sterling 1.10             // Note that any time during our processing of events the Listener may be enqueueing NEW events for us to process.
 919                             // Because we are popping off the front and new events are being thrown on the back if events are failing when we start
 920                             // But are succeeding by the end of the processing, events may be sent out of chronological order.
 921                             // However. Once we complete the current queue of events, we will always send old events to be retried before sending any
 922                             // new events added afterwards.
 923 h.sterling 1.1              while (myself->_eventqueue.size())
 924                             {
 925                                 //check for shutdown signal
 926                                 //this only breaks us out of the queue loop, but we will immediately get through the next wait from
 927                                 //the shutdown signal itself, at which time we break out of the main loop
 928                                 if (myself->_dieNow)
 929                                 {
 930                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name);
 931                                     break;
 932                                 }
 933                 
 934                                 //pop next indication off the queue
 935                                 IndicationDispatchEvent* event = 0;
 936 mike       1.14                 event = myself->_eventqueue.remove_front();  //what exceptions/errors can this throw?
 937 h.sterling 1.1  
 938                                 if (!event)
 939                                 {
 940                                     //this should never happen
 941                                     continue;
 942                                 }
 943                 
 944                                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);
 945                 
 946                                 try
 947                                 {
 948                                     myself->consumeIndication(event->getContext(),
 949                                                               event->getURL(),
 950                                                               event->getIndicationInstance());
 951                 
 952                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name);
 953                 
 954                                     delete event;
 955                                     continue;
 956                 
 957                                 } catch (CIMException & ce)
 958 h.sterling 1.1                  {
 959                                     //check for failure
 960                                     if (ce.getCode() == CIM_ERR_FAILED)
 961                                     {
 962                                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);
 963 h.sterling 1.10                         
 964                                         // Here we simply determine if we should increment the retry count or not.
 965                                         // We don't want to count a forced retry from a new event to count as a retry. We just have to do it for
 966                                         // order's sake. If the retry Lapse has lapsed on this event then increment the counter.
 967                                         if (event->getRetries() > 0) {
 968                                             Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), CIMDateTime::getCurrentDateTime());
 969                                             if (differenceInMicroseconds >= (DEFAULT_RETRY_LAPSE * 1000))
 970                                                 event->increaseRetries();
 971                                         } else {
 972                                             event->increaseRetries();
 973                                         }
 974 h.sterling 1.1  
 975                                         //determine if we have hit the max retry count
 976                                         if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
 977                                         {
 978                                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2,
 979                                                              "Error: the maximum retry count has been exceeded.  Removing the event from the queue.");
 980                 
 981                                             Logger::put(
 982                                                        Logger::ERROR_LOG,
 983 yi.zhou    1.17                                        System::CIMLISTENER,
 984 h.sterling 1.1                                         Logger::SEVERE,
 985                                                        "The following indication did not get processed successfully: $0", 
 986                                                        event->getIndicationInstance().getPath().toString());
 987                 
 988                                             delete event;
 989                                             continue;
 990                 
 991                                         } else
 992                                         {
 993                                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue");
 994 mike       1.14                             tmpEventQueue.insert_back(event);
 995 h.sterling 1.1                          }
 996                 
 997                                     } else
 998                                     {
 999                                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage());
1000                                         delete event;
1001                                         continue;
1002                                     }
1003                 
1004                                 } catch (Exception & ex)
1005                                 {
1006                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage());
1007                                     delete event;
1008                                     continue;
1009                 
1010                                 } catch (...)
1011                                 {
1012                                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception.");
1013                                     delete event;
1014                                     continue;
1015                                 } //end try
1016 h.sterling 1.1  
1017                             } //while eventqueue
1018                 
1019 h.sterling 1.10             // Copy the failed indications back to the main queue
1020                             // We now lock the queue while adding the retries on to the queue so that new events can't get in in front
1021                             // Of those events we are retrying. Retried events happened before any new events coming in.
1022 h.sterling 1.1              IndicationDispatchEvent* tmpEvent = 0;
1023 h.sterling 1.10             myself->_eventqueue.try_lock();
1024 h.sterling 1.1              while (tmpEventQueue.size())
1025                             {
1026 mike       1.14                 tmpEvent = tmpEventQueue.remove_front();
1027                                 myself->_eventqueue.insert_back(tmpEvent);
1028 h.sterling 1.10                 
1029 h.sterling 1.1              }
1030 h.sterling 1.10             myself->_eventqueue.unlock();
1031 h.sterling 1.1  
1032 kumpf      1.16         } catch (TimeOut&)
1033 h.sterling 1.1          {
1034 h.sterling 1.10             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::Time to retry any outstanding indications.");
1035 h.sterling 1.1  
1036                             //signal the queue in the same way we would if we received a new indication
1037                             //this allows the thread to fall into the queue processing code
1038                             myself->_check_queue->signal();
1039                 
1040                         } //time_wait
1041                 
1042                 
1043                     } //shutdown
1044                 
1045                     PEG_METHOD_EXIT();
1046                     return 0;
1047                 }
1048                 
1049                 
1050                 PEGASUS_NAMESPACE_END
1051                 
1052                 
1053                 

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2