(file) Return to ConsumerManager.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

   1 karl  1.12 //%2006////////////////////////////////////////////////////////////////////////
   2 h.sterling 1.1  //
   3                 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4                 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5                 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6                 // IBM Corp.; EMC Corporation, The Open Group.
   7                 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8                 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9                 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10                 // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 karl       1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  12                 // EMC Corporation; Symantec Corporation; The Open Group.
  13 h.sterling 1.1  //
  14                 // Permission is hereby granted, free of charge, to any person obtaining a copy
  15                 // of this software and associated documentation files (the "Software"), to
  16                 // deal in the Software without restriction, including without limitation the
  17                 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  18                 // sell copies of the Software, and to permit persons to whom the Software is
  19                 // furnished to do so, subject to the following conditions:
  20                 // 
  21                 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  22                 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  23                 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  24                 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  25                 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  26                 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  27                 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  28                 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29                 //
  30                 //==============================================================================
  31                 //
  32                 // Author: Heather Sterling (hsterl@us.ibm.com)
  33                 //
  34 h.sterling 1.1  // Modified By: 
  35                 //
  36                 //%/////////////////////////////////////////////////////////////////////////////
  37                 
  38                 #include <Pegasus/Common/Config.h>
  39                 //#include <cstdlib>
  40                 //#include <dlfcn.h>
  41                 #include <Pegasus/Common/System.h>
  42                 #include <Pegasus/Common/FileSystem.h>
  43                 #include <Pegasus/Common/Tracer.h>
  44                 #include <Pegasus/Common/Logger.h>
  45                 #include <Pegasus/Common/XmlReader.h>
  46 mike       1.15 #include <Pegasus/Common/ThreadPool.h>
  47 h.sterling 1.1  #include <Pegasus/Common/XmlParser.h>
  48                 #include <Pegasus/Common/XmlWriter.h>
  49 mike       1.14 #include <Pegasus/Common/List.h>
  50 mike       1.15 #include <Pegasus/Common/Mutex.h>
  51 h.sterling 1.1  
  52                 #include "ConsumerManager.h"
  53                 
  54                 PEGASUS_NAMESPACE_BEGIN
  55                 PEGASUS_USING_STD;
  56                 
  57                 //ATTN: Can we just use a properties file instead??  If we only have one property, we may want to just parse it ourselves.
  58                 // We may need to add more properties, however.  Potential per consumer properties: unloadOk, idleTimout, retryCount, etc
  59                 static struct OptionRow optionsTable[] =
  60                 //optionname defaultvalue rqd  type domain domainsize clname hlpmsg
  61                 {
  62                 {"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"},
  63                 };
  64                 
  65                 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
  66                 
  67                 //retry settings
  68                 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
  69 h.sterling 1.6  static const Uint32 DEFAULT_RETRY_LAPSE = 300000;  //ms = 5 minutes
  70 h.sterling 1.1  
  71 h.sterling 1.11 //constant for fake property that is added to instances when serializing to track the full URL
  72                 static const String URL_PROPERTY = "URLString";
  73 h.sterling 1.1  
  74                 
  75                 ConsumerManager::ConsumerManager(const String& consumerDir, const String& consumerConfigDir, Boolean enableConsumerUnload, Uint32 idleTimeout) : 
  76                 _consumerDir(consumerDir),
  77                 _consumerConfigDir(consumerConfigDir),
  78                 _enableConsumerUnload(enableConsumerUnload),
  79                 _idleTimeout(idleTimeout),
  80                 _forceShutdown(true)
  81                 {
  82                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
  83                 
  84                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer library directory: " + consumerDir);
  85                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer configuration directory: " + consumerConfigDir);
  86                 
  87 marek      1.17.12.1     PEG_TRACE((__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
  88 h.sterling 1.1                         "Consumer unload enabled %d: idle timeout %d",
  89                                        enableConsumerUnload,
  90 marek      1.17.12.1                   idleTimeout));
  91 h.sterling 1.1       
  92                      
  93 h.sterling 1.8           //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
  94 h.sterling 1.6           //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
  95 h.sterling 1.1       
  96 kumpf      1.3           struct timeval deallocateWait = {15, 0};
  97                          _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
  98 h.sterling 1.1       
  99                          _init();
 100                      
 101                          PEG_METHOD_EXIT();
 102                      }
 103                      
 104                      ConsumerManager::~ConsumerManager()
 105                      {
 106                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
 107                      
 108                          unloadAllConsumers();
 109                      
 110 kumpf      1.3           delete _thread_pool;
 111 h.sterling 1.1       
 112                          ConsumerTable::Iterator i = _consumers.start();
 113                          for (; i!=0; i++)
 114                          {
 115                              DynamicConsumer* consumer = i.value();
 116                              delete consumer;
 117                          }
 118                      
 119 marek      1.17.12.1     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
 120 h.sterling 1.1       
 121                          ModuleTable::Iterator j = _modules.start();
 122                          for (;j!=0;j++)
 123                          {
 124                              ConsumerModule* module = j.value();
 125                              delete module;
 126                          }
 127                      
 128 marek      1.17.12.1     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
 129 h.sterling 1.1       
 130                          PEG_METHOD_EXIT();
 131                      }
 132                      
 133                      void ConsumerManager::_init()
 134                      {
 135                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
 136                      
 137                          //check if there are any outstanding indications
 138                          Array<String> files;
 139                          Uint32 pos;
 140                          String consumerName;
 141                      
 142                          if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
 143                          {
 144                              for (Uint32 i = 0; i < files.size(); i++)
 145                              {
 146                                  pos = files[i].find(".dat");
 147                                  if (pos != PEG_NOT_FOUND)
 148                                  {
 149                                      consumerName = files[i].subString(0, pos);
 150 h.sterling 1.1       
 151                                      try
 152                                      {
 153                                          PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!");
 154                                          getConsumer(consumerName);
 155                      
 156                                      } catch (...)
 157                                      {
 158                                          PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]);
 159                                      }
 160                                  }
 161                              }
 162                          }
 163                      
 164                          PEG_METHOD_EXIT();
 165                      }
 166                      
 167                      String ConsumerManager::getConsumerDir()
 168                      {
 169                          return _consumerDir;
 170                      }
 171 h.sterling 1.1       
 172                      String ConsumerManager::getConsumerConfigDir()
 173                      {
 174                          return _consumerConfigDir;
 175                      }
 176                      
 177                      Boolean ConsumerManager::getEnableConsumerUnload()
 178                      {
 179                          return _enableConsumerUnload;
 180                      }
 181                      
 182                      Uint32 ConsumerManager::getIdleTimeout()
 183                      {
 184                          return _idleTimeout;
 185                      }
 186                      
 187                      /** Retrieves the library name associated with the consumer name.  By default, the library name
 188                        * is the same as the consumer name.  However, you may specify a different library name in a consumer
 189                        * configuration file.  This file must be named "MyConsumer.txt" and contain the following:
 190                        *     location="libraryName"
 191                        *
 192 h.sterling 1.1         * The config file is optional and is generally only needed in cases where there are strict requirements
 193                        * on library naming.
 194                        *
 195                        * It is the responsibility of the caller to catch any exceptions thrown by this method.
 196                        */
 197                      String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
 198                      {
 199                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
 200                      
 201                          //default library name is consumer name
 202                          String libraryName = consumerName;
 203                      
 204                          //check whether an alternative library name was specified in an optional consumer config file
 205                          String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf"));
 206                          PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile);
 207                      
 208                          if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
 209                          {
 210                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName);
 211                      
 212                              try
 213 h.sterling 1.1               {
 214 h.sterling 1.6                   //Bugzilla 3765 - Change this to use a member var when OptionManager has a reset option
 215 h.sterling 1.8                   OptionManager _optionMgr;
 216                                  _optionMgr.registerOptions(optionsTable, NUM_OPTIONS); //comment this line out later
 217 h.sterling 1.1                   _optionMgr.mergeFile(configFile);
 218                                  _optionMgr.checkRequiredOptions();
 219                      
 220                                  if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY))
 221                                  {
 222                                      PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile); 
 223                                      libraryName = consumerName;
 224                                  }
 225                      
 226                              } catch (Exception & ex)
 227                              {
 228                                  throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE",
 229                                                                     "Error reading $0: $1.",
 230                                                                     configFile,
 231                                                                     ex.getMessage()));
 232                              }
 233                          } else
 234                          {
 235                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName);
 236                          }
 237                      
 238 h.sterling 1.1           PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName);
 239                      
 240                          PEG_METHOD_EXIT();
 241                          return libraryName;
 242                      }
 243                      
 244                      /** Returns the DynamicConsumer for the consumerName.  If it already exists, we return the one in the cache.  If it
 245                       *  DNE, we create it and initialize it, and add it to the table.
 246                       * @throws Exception if we cannot successfully create and initialize the consumer
 247                       */ 
 248                      DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
 249                      {
 250                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
 251                      
 252                          DynamicConsumer* consumer = 0;
 253                          CIMIndicationConsumerProvider* consumerRef = 0;
 254                          Boolean cached = false;
 255                          Boolean entryExists = false;
 256                      
 257                          AutoMutex lock(_consumerTableMutex);
 258                      
 259 h.sterling 1.1           if (_consumers.lookup(consumerName, consumer))
 260                          {
 261                              //why isn't this working??
 262                              entryExists = true;
 263                      
 264                              if (consumer && consumer->isLoaded())
 265                              {
 266                                  PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName);
 267                                  cached = true;
 268                              }
 269                          } else
 270                          {
 271                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName);
 272                              consumer = new DynamicConsumer(consumerName);
 273                              //ATTN: The above is a memory leak if _initConsumer throws an exception
 274                              //need to delete it in that case
 275                          }
 276                      
 277                          if (!cached)
 278                          {
 279                              _initConsumer(consumerName, consumer);
 280 h.sterling 1.1       
 281                              if (!entryExists)
 282                              {
 283                                  _consumers.insert(consumerName, consumer);
 284                              }
 285                          }
 286                      
 287                          consumer->updateIdleTimer();
 288                      
 289                          PEG_METHOD_EXIT();
 290                          return consumer;
 291                      }
 292                      
 293                      /** Initializes a DynamicConsumer.
 294                       * Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist.
 295                       * @throws Exception if the consumer cannot be initialized
 296                       */
 297                      void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer)
 298                      {
 299                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
 300                      
 301 h.sterling 1.1           CIMIndicationConsumerProvider* base = 0;
 302                          ConsumerModule* module = 0;
 303                      
 304                          //lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one)
 305                          String libraryName = _getConsumerLibraryName(consumerName);
 306                          module = _lookupModule(libraryName);
 307                      
 308                          //build library path
 309                          String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName));
 310                          PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath);
 311                      
 312                          //load module
 313                          try
 314                          {
 315                              base = module->load(consumerName, libraryPath);
 316                              consumer->set(module, base);
 317                      
 318                          } catch (Exception& ex)
 319                          {
 320                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage());
 321                      
 322 h.sterling 1.1               throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 323                                                                 "Cannot load module ($0:$1): Unknown exception.",
 324                                                                 consumerName,
 325                                                                 libraryName));
 326                          } catch (...)
 327                          {
 328                              throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 329                                                                 "Cannot load module ($0:$1): Unknown exception.",
 330                                                                 consumerName,
 331                                                                 libraryName));
 332                          }
 333                      
 334                          PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName);
 335                      
 336                          //initialize consumer
 337                          try
 338                          {
 339                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " +  consumerName);
 340                      
 341                              consumer->initialize();
 342                      
 343 h.sterling 1.1               //ATTN: need to change this
 344                              Semaphore* semaphore = new Semaphore(0);  //blocking
 345                      
 346                              consumer->setShutdownSemaphore(semaphore);
 347                      
 348                              //start the worker thread
 349 konrad.r   1.7               if (_thread_pool->allocate_and_awaken(consumer,
 350 h.sterling 1.1                                                 _worker_routine,
 351 konrad.r   1.7                                                 semaphore) != PEGASUS_THREAD_OK)
 352 h.sterling 1.8           {
 353 yi.zhou    1.17              Logger::put(Logger::STANDARD_LOG, System::CIMLISTENER, Logger::TRACE,
 354 h.sterling 1.8               "Not enough threads for consumer.");
 355 konrad.r   1.7        
 356 marek      1.17.12.1         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,
 357 h.sterling 1.8               "Could not allocate thread for consumer.");
 358 konrad.r   1.7       
 359 h.sterling 1.8              consumer->setShutdownSemaphore(0);
 360                             delete semaphore;
 361 konrad.r   1.7                  throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
 362 h.sterling 1.8                               "Not enough threads for consumer worker routine."));
 363 konrad.r   1.7               }
 364 h.sterling 1.1       
 365 h.sterling 1.8               //wait until the listening thread has started.  Otherwise, there is a miniscule chance that the first event will be enqueued
 366                              //before the consumer is waiting for it and the first indication after loading the consumer will be lost
 367                              consumer->waitForEventThread();
 368                      
 369 h.sterling 1.1               //load any outstanding requests
 370 h.sterling 1.11              Array<IndicationDispatchEvent> outstandingIndications = _deserializeOutstandingIndications(consumerName);
 371 h.sterling 1.1               if (outstandingIndications.size())
 372                              {
 373                                  //the consumer will signal itself in _loadOustandingIndications
 374                                  consumer->_loadOutstandingIndications(outstandingIndications);
 375                              }
 376                      
 377                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);
 378                      
 379                          } catch (...)
 380                          {
 381                              module->unloadModule();
 382                              consumer->reset();
 383                              throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
 384                                                                 "Cannot initialize consumer ($0).",
 385                                                                 consumerName));        
 386                          }
 387                      
 388                          PEG_METHOD_EXIT();    
 389                      }
 390                      
 391                      
 392 h.sterling 1.1       /** Returns the ConsumerModule with the given library name.  If it already exists, we return the one in the cache.  If it
 393                       *  DNE, we create it and add it to the table.
 394                       * @throws Exception if we cannot successfully create and initialize the consumer
 395                       */ 
 396                      ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName) 
 397                      {
 398                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
 399                      
 400                          AutoMutex lock(_moduleTableMutex);
 401                      
 402                          ConsumerModule* module = 0;
 403                      
 404                          //see if consumer module is cached
 405                          if (_modules.lookup(libraryName, module))
 406                          {
 407                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
 408                                               "Found Consumer Module" + libraryName + " in Consumer Manager Cache");
 409                      
 410                          } else
 411                          {
 412                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
 413 h.sterling 1.1                                "Creating Consumer Provider Module " + libraryName);
 414                      
 415                              module = new ConsumerModule(); 
 416                              _modules.insert(libraryName, module);
 417                          }
 418                      
 419                          PEG_METHOD_EXIT();
 420                          return(module);
 421                      }
 422                      
 423                      /** Returns true if there are active consumers
 424                       */ 
 425                      Boolean ConsumerManager::hasActiveConsumers()
 426                      {
 427                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
 428                      
 429                          AutoMutex lock(_consumerTableMutex);
 430                          DynamicConsumer* consumer = 0;
 431                      
 432                          try
 433                          {
 434 h.sterling 1.1               for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 435                              {
 436                                  consumer = i.value();
 437                      
 438                                  if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))
 439                                  {
 440                                      PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);
 441                                      PEG_METHOD_EXIT();
 442                                      return true;
 443                                  }
 444                              }
 445                          } catch (...)
 446                          {
 447                              // Unexpected exception; do not assume that no providers are loaded
 448 marek      1.17.12.1         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");
 449 h.sterling 1.1               PEG_METHOD_EXIT();
 450                              return true;
 451                          }
 452                      
 453                          PEG_METHOD_EXIT();
 454                          return false;
 455                      }
 456                      
 457                      /** Returns true if there are loaded consumers
 458                       */ 
 459                      Boolean ConsumerManager::hasLoadedConsumers()
 460                      {
 461                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
 462                      
 463                          AutoMutex lock(_consumerTableMutex);
 464                          DynamicConsumer* consumer = 0;
 465                      
 466                          try
 467                          {
 468                              for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 469                              {
 470 h.sterling 1.1                   consumer = i.value();
 471                      
 472                                  if (consumer && consumer->isLoaded())
 473                                  {
 474                                      PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);
 475                                      PEG_METHOD_EXIT();
 476                                      return true;
 477                                  }
 478                              }
 479                          } catch (...)
 480                          {
 481                              // Unexpected exception; do not assume that no providers are loaded
 482 marek      1.17.12.1         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");
 483 h.sterling 1.1               PEG_METHOD_EXIT();
 484                              return true;
 485                          }
 486                      
 487                          PEG_METHOD_EXIT();
 488                          return false;
 489                      }
 490                      
 491                      
 492                      /** Shutting down a consumer consists of four major steps:
 493                       * 1) Send the shutdown signal.  This causes the worker routine to break out of the loop and exit.
 494                       * 2) Wait for the worker thread to end.  This may take a while if it's processing an indication.  This
 495                       *    is optional in a shutdown scenario.  If the listener is shutdown with a -f force, the listener
 496                       *    will not wait for the consumer to finish before shutting down.  Note that a normal shutdown only allows
 497                       *    the current consumer indication to finish.  All other queued indications are serialized to a log and 
 498                       *    are sent when the consumer is reoaded.
 499                       * 3) Terminate the consumer provider interface.
 500                       * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0)
 501                       * 
 502                       * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all
 503                       * of the consumers so the threads can finish simultaneously.
 504 h.sterling 1.1        * 
 505                       * ATTN: Should the normal shutdown wait for everything in the queue to be processed?  Just new indications
 506                       * to be processed?  I am not inclined to this solution since it could take a LOT of time.  By serializing 
 507                       * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL
 508                       * queued indications on shutdown.  
 509                       */ 
 510                      
 511                      /** Unloads all consumers.
 512                       */ 
 513                      void ConsumerManager::unloadAllConsumers()
 514                      {
 515                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
 516                      
 517                          AutoMutex lock(_consumerTableMutex);
 518                      
 519                          if (!_consumers.size())
 520                          {
 521 marek      1.17.12.1         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 522 h.sterling 1.1               PEG_METHOD_EXIT();
 523                              return;
 524                          }
 525                      
 526                          if (!_forceShutdown)
 527                          {
 528                              //wait until all the consumers have finished processing the events in their queue
 529                              //ATTN: Should this have a timeout even though it's a force??
 530                              while (hasActiveConsumers())
 531                              {
 532 mike       1.15                  Threads::sleep(500);
 533 h.sterling 1.1               }
 534                          }
 535                      
 536                          Array<DynamicConsumer*> loadedConsumers;
 537                      
 538                          ConsumerTable::Iterator i = _consumers.start();
 539                          DynamicConsumer* consumer = 0;
 540                      
 541                          for (; i!=0; i++)
 542                          {
 543                              consumer = i.value();
 544                              if (consumer && consumer->isLoaded())
 545                              {
 546                                  loadedConsumers.append(consumer);
 547                              }
 548                          }
 549                      
 550                          if (loadedConsumers.size())
 551                          {
 552                              try
 553                              {
 554 h.sterling 1.1                   _unloadConsumers(loadedConsumers);
 555                      
 556 kumpf      1.16              } catch (Exception&)
 557 h.sterling 1.1               {
 558 marek      1.17.12.1             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 559 h.sterling 1.1               }
 560                          } else
 561                          {
 562 marek      1.17.12.1         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 563 h.sterling 1.1           }
 564                      
 565                          PEG_METHOD_EXIT();
 566                      }
 567                      
 568                      /** Unloads idle consumers.
 569                       */ 
 570                      void ConsumerManager::unloadIdleConsumers()
 571                      {
 572                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
 573                      
 574                          AutoMutex lock(_consumerTableMutex);
 575                      
 576                          if (!_consumers.size())
 577                          {
 578 marek      1.17.12.1         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 579 h.sterling 1.1               PEG_METHOD_EXIT();
 580                              return;
 581                          }
 582                      
 583                          Array<DynamicConsumer*> loadedConsumers;
 584                      
 585                          ConsumerTable::Iterator i = _consumers.start();
 586                          DynamicConsumer* consumer = 0;
 587                      
 588                          for (; i!=0; i++)
 589                          {
 590                              consumer = i.value();
 591                              if (consumer && consumer->isLoaded() && consumer->isIdle())
 592                              {
 593                                  loadedConsumers.append(consumer);
 594                              }
 595                          }
 596                      
 597                          if (loadedConsumers.size())
 598                          {
 599                              try
 600 h.sterling 1.1               {
 601                                  _unloadConsumers(loadedConsumers);
 602                      
 603 kumpf      1.16              } catch (Exception&)
 604 h.sterling 1.1               {
 605 marek      1.17.12.1             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 606 h.sterling 1.1               }
 607                          } else
 608                          {
 609 marek      1.17.12.1         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 610 h.sterling 1.1           }
 611                      
 612                          PEG_METHOD_EXIT();
 613                      }
 614                      
 615                      /** Unloads a single consumer.
 616                       */ 
 617                      void ConsumerManager::unloadConsumer(const String& consumerName)
 618                      {
 619                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
 620                      
 621                          AutoMutex lock(_consumerTableMutex);
 622                      
 623                          DynamicConsumer* consumer = 0;
 624                      
 625                          //check whether the consumer exists
 626                          if (!_consumers.lookup(consumerName, consumer))
 627                          {
 628                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);
 629                              return;
 630                          }
 631 h.sterling 1.1       
 632                          //check whether the consumer is loaded
 633                          if (consumer && consumer->isLoaded())  //ATTN: forceShutdown?
 634                          {
 635                              //unload the consumer
 636                              Array<DynamicConsumer*> loadedConsumers;
 637                              loadedConsumers.append(consumer);
 638                      
 639                              try
 640                              {
 641                                  _unloadConsumers(loadedConsumers);
 642                      
 643 kumpf      1.16              } catch (Exception&)
 644 h.sterling 1.1               {
 645 marek      1.17.12.1             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 646 h.sterling 1.1               }
 647                      
 648                          } else
 649                          {
 650                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);
 651                          }
 652                      
 653                          PEG_METHOD_EXIT();
 654                      }
 655                      
 656                      /** Unloads the consumers in the given array.
 657                       *  The consumerTable mutex MUST be locked prior to entering this method.
 658                       */ 
 659                      void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload)
 660                      {
 661                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
 662                      
 663                          //tell consumers to shutdown
 664                          for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 665                          {
 666                              consumersToUnload[i]->sendShutdownSignal();
 667 h.sterling 1.1           }
 668                      
 669 marek      1.17.12.1     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");
 670 h.sterling 1.1       
 671                          //wait for all the consumer worker threads to complete
 672                          //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last
 673                          //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop
 674                          //processing its requests.
 675                          for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 676                          {
 677                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());
 678                      
 679                              //wait for the consumer worker thread to end
 680                              try
 681                              {
 682                                  Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();
 683                                  if (_shutdownSemaphore)
 684                                  {
 685                                      _shutdownSemaphore->time_wait(10000); 
 686                                  }
 687                      
 688                              } catch (TimeOut &)
 689                              {
 690 marek      1.17.12.1             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");
 691 h.sterling 1.1               }
 692                      
 693 marek      1.17.12.1         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");
 694 h.sterling 1.1       
 695                              try
 696                              {
 697                                  //terminate consumer provider interface
 698                                  consumersToUnload[i]->terminate();
 699                      
 700                                  //unload consumer provider module
 701                                  PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
 702                                  consumersToUnload[i]->_module->unloadModule();
 703                      
 704                                  //serialize outstanding indications
 705                                  _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());
 706                      
 707                                  //reset the consumer
 708                                  consumersToUnload[i]->reset();
 709                      
 710 marek      1.17.12.1             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");
 711 h.sterling 1.1       
 712                              } catch (Exception& e)
 713                              {
 714                                  PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage()); 
 715                                  //ATTN: throw exception? log warning?
 716                              }
 717                          }
 718                      
 719                          PEG_METHOD_EXIT();
 720                      }
 721                      
 722                      /** Serializes oustanding indications to a <MyConsumer>.dat file
 723                       */
 724 h.sterling 1.11      void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<IndicationDispatchEvent> indications)
 725 h.sterling 1.1       {
 726                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");
 727                      
 728                          if (!indications.size())
 729                          {
 730                              PEG_METHOD_EXIT();
 731                              return;
 732                          }
 733                      
 734                          String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
 735                          PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
 736                      
 737 mike       1.9           Buffer buffer;
 738 h.sterling 1.1       
 739                          // Open the log file and serialize remaining 
 740                          FILE* fileHandle = 0;
 741                          fileHandle = fopen((const char*)fileName.getCString(), "w"); 
 742                      
 743                          if (!fileHandle)
 744                          {
 745                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);
 746                      
 747                          } else
 748                          {
 749 marek      1.17.12.1         PEG_TRACE((__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
 750 h.sterling 1.1                             "Serializing %d outstanding requests for %s",
 751                                            indications.size(),
 752 marek      1.17.12.1                       (const char*)consumerName.getCString()));
 753 h.sterling 1.1       
 754                              //we have to put the array of instances under a valid root element or the parser complains 
 755                              XmlWriter::append(buffer, "<IRETURNVALUE>\n");
 756                      
 757 h.sterling 1.11      		CIMInstance cimInstance;
 758 h.sterling 1.1               for (Uint32 i = 0; i < indications.size(); i++)
 759                              {
 760 h.sterling 1.11      			//set the URL string property on the serializable instance
 761                      			CIMValue cimValue(CIMTYPE_STRING, false);
 762                      			cimValue.set(indications[i].getURL());
 763                      			cimInstance = indications[i].getIndicationInstance();
 764                      			CIMProperty cimProperty(URL_PROPERTY, cimValue);
 765                      			cimInstance.addProperty(cimProperty);
 766                      
 767                                  XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
 768                      		}
 769 h.sterling 1.1       
 770                              XmlWriter::append(buffer, "</IRETURNVALUE>\0");
 771                      
 772                              fputs((const char*)buffer.getData(), fileHandle);
 773                      
 774                              fclose(fileHandle);
 775                          }
 776                      
 777                          PEG_METHOD_EXIT();
 778                      }
 779                      
 780                      /** Reads outstanding indications from a <MyConsumer>.dat file
 781                       */ 
 782 h.sterling 1.11      Array<IndicationDispatchEvent> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName)
 783 h.sterling 1.1       {
 784                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");
 785                      
 786                          String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
 787                          PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
 788                      
 789                          Array<CIMInstance> cimInstances;
 790 h.sterling 1.11      	Array<String>      urlStrings;
 791                      	Array<IndicationDispatchEvent> indications;
 792 h.sterling 1.1       
 793                          // Open the log file and serialize remaining indications
 794                          if (FileSystem::exists(fileName)  && FileSystem::canRead(fileName))
 795                          {
 796 mike       1.9               Buffer text;
 797 h.sterling 1.1               CIMInstance cimInstance;
 798 h.sterling 1.11      		CIMProperty cimProperty;
 799                      		CIMValue cimValue;
 800                      		String urlString;
 801 h.sterling 1.1               XmlEntry entry;
 802                      
 803                              try
 804                              {
 805                                  FileSystem::loadFileToMemory(text, fileName);  //ATTN: Is this safe to use; what about CRLFs?
 806                                  text.append('\0');
 807                      
 808                                  //parse the file
 809                                  XmlParser parser((char*)text.getData());
 810                                  XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
 811                      
 812                                  while (XmlReader::getNamedInstanceElement(parser, cimInstance))
 813                                  {
 814 h.sterling 1.11      				Uint32 index = cimInstance.findProperty(URL_PROPERTY);
 815                      				if (index != PEG_NOT_FOUND)
 816                      				{
 817                      					//get the URL string property from the serialized instance and remove the property
 818                      					cimProperty = cimInstance.getProperty(index);
 819                      					cimValue = cimProperty.getValue();
 820                      					cimValue.get(urlString);
 821                      					cimInstance.removeProperty(index);
 822                      				}
 823                      				IndicationDispatchEvent* indicationEvent = new IndicationDispatchEvent(OperationContext(), urlString, cimInstance);
 824                                      indications.append(*indicationEvent);
 825 h.sterling 1.1                   }
 826                      
 827                                  XmlReader::expectEndTag(parser, "IRETURNVALUE");
 828                      
 829 marek      1.17.12.1             PEG_TRACE((__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
 830 h.sterling 1.1                                 "Consumer %s has %d outstanding indications",
 831                                                (const char*)consumerName.getCString(),
 832 marek      1.17.12.1                           indications.size()));
 833 h.sterling 1.1       
 834                                  //delete the file 
 835                                  FileSystem::removeFile(fileName);
 836                      
 837                              } catch (Exception& ex)
 838                              {
 839                                  PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);
 840                      
 841                              } catch (...)
 842                              {
 843                                  PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);
 844                              }
 845                          }
 846                      
 847                          PEG_METHOD_EXIT();
 848 h.sterling 1.11          return indications;
 849 h.sterling 1.1       }
 850                      
 851                      
 852                      
 853                      /** 
 854                       * This is the main worker thread of the consumer.  By having only one thread per consumer, we eliminate a ton
 855                       * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive
 856                       * operations at once.  This also prevents one bad consumer from taking the entire listener down.  That being said,
 857                       * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread. 
 858                       * 
 859                       * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to
 860                       * complete, it may make sense to make the consumer multi-threaded.  The individual consumer can immediately spawn off
 861                       * new threads to handle indications, and return immediately to catch the next indication.  In this way, a consumer
 862                       * can attain extremely high performance. 
 863                       * 
 864                       * There are three different events that can signal us:
 865                       * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
 866                       * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state)
 867                       * 3) A retry signal (signalled from this routine itself)
 868                       * 
 869                       * The idea is that all new indications are put on the front of the queue and processed first.  All of the retry
 870 h.sterling 1.1        * indications are put on the back of the queue and are only processed AFTER all new indications are sent.
 871                       * Before processing each indication, we check to see whether or not the shutdown signal was given.  If so,
 872                       * we immediately break out of the loop, and another compenent serializes the remaining indications to a file.
 873                       * 
 874                       * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception.
 875                       * 
 876 h.sterling 1.5        * This function makes sure it waits until the default retry lapse has passed to avoid issues with the following scenario:
 877                       * 20 new indications come in, 10 of them are successful, 10 are not.
 878 h.sterling 1.1        * We were signalled 20 times, so we will pass the time_wait 20 times.  Perceivably, the process time on each indication
 879                       * could be minimal.  We could potentially proceed to process the retries after a very small time interval since
 880 h.sterling 1.5        * we would never hit the wait for the retry timeout.  
 881 h.sterling 1.1        * 
 882                       */ 
 883 mike       1.15      ThreadReturnType PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param)
 884 h.sterling 1.1       {
 885                          PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
 886                      
 887                          DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
 888                          String name = myself->getName();
 889 mike       1.15          List<IndicationDispatchEvent,Mutex> tmpEventQueue;
 890 h.sterling 1.1       
 891                          PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);
 892                      
 893 h.sterling 1.8           myself->_listeningSemaphore->signal();
 894                      
 895 h.sterling 1.1           while (true)
 896                          {
 897                              try
 898                              {
 899                                  PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name);
 900                      
 901                                  //wait to be signalled
 902                                  myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
 903                      
 904                                  PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name);
 905                      
 906                                  //check whether we received the shutdown signal
 907                                  if (myself->_dieNow)
 908                                  {
 909                                      PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name);
 910                                      break;
 911                                  }
 912                      
 913                                  //create a temporary queue to store failed indications
 914 mike       1.14                  tmpEventQueue.clear();
 915 h.sterling 1.1       
 916                                  //continue processing events until the queue is empty
 917                                  //make sure to check for the shutdown signal before every iteration
 918 h.sterling 1.10                  // Note that any time during our processing of events the Listener may be enqueueing NEW events for us to process.
 919                                  // Because we are popping off the front and new events are being thrown on the back if events are failing when we start
 920                                  // But are succeeding by the end of the processing, events may be sent out of chronological order.
 921                                  // However. Once we complete the current queue of events, we will always send old events to be retried before sending any
 922                                  // new events added afterwards.
 923 h.sterling 1.1                   while (myself->_eventqueue.size())
 924                                  {
 925                                      //check for shutdown signal
 926                                      //this only breaks us out of the queue loop, but we will immediately get through the next wait from
 927                                      //the shutdown signal itself, at which time we break out of the main loop
 928                                      if (myself->_dieNow)
 929                                      {
 930                                          PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name);
 931                                          break;
 932                                      }
 933                      
 934                                      //pop next indication off the queue
 935                                      IndicationDispatchEvent* event = 0;
 936 mike       1.14                      event = myself->_eventqueue.remove_front();  //what exceptions/errors can this throw?
 937 h.sterling 1.1       
 938                                      if (!event)
 939                                      {
 940                                          //this should never happen
 941                                          continue;
 942                                      }
 943                      
 944                                      PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);
 945                      
 946                                      try
 947                                      {
 948                                          myself->consumeIndication(event->getContext(),
 949                                                                    event->getURL(),
 950                                                                    event->getIndicationInstance());
 951                      
 952                                          PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name);
 953                      
 954                                          delete event;
 955                                          continue;
 956                      
 957                                      } catch (CIMException & ce)
 958 h.sterling 1.1                       {
 959                                          //check for failure
 960                                          if (ce.getCode() == CIM_ERR_FAILED)
 961                                          {
 962                                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);
 963 h.sterling 1.10                              
 964                                              // Here we simply determine if we should increment the retry count or not.
 965                                              // We don't want to count a forced retry from a new event to count as a retry. We just have to do it for
 966                                              // order's sake. If the retry Lapse has lapsed on this event then increment the counter.
 967                                              if (event->getRetries() > 0) {
 968                                                  Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), CIMDateTime::getCurrentDateTime());
 969                                                  if (differenceInMicroseconds >= (DEFAULT_RETRY_LAPSE * 1000))
 970                                                      event->increaseRetries();
 971                                              } else {
 972                                                  event->increaseRetries();
 973                                              }
 974 h.sterling 1.1       
 975                                              //determine if we have hit the max retry count
 976                                              if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
 977                                              {
 978 marek      1.17.12.1                             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,
 979 h.sterling 1.1                                                    "Error: the maximum retry count has been exceeded.  Removing the event from the queue.");
 980                      
 981                                                  Logger::put(
 982                                                             Logger::ERROR_LOG,
 983 yi.zhou    1.17                                             System::CIMLISTENER,
 984 h.sterling 1.1                                              Logger::SEVERE,
 985                                                             "The following indication did not get processed successfully: $0", 
 986                                                             event->getIndicationInstance().getPath().toString());
 987                      
 988                                                  delete event;
 989                                                  continue;
 990                      
 991                                              } else
 992                                              {
 993 marek      1.17.12.1                             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue");
 994 mike       1.14                                  tmpEventQueue.insert_back(event);
 995 h.sterling 1.1                               }
 996                      
 997                                          } else
 998                                          {
 999                                              PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage());
1000                                              delete event;
1001                                              continue;
1002                                          }
1003                      
1004                                      } catch (Exception & ex)
1005                                      {
1006                                          PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage());
1007                                          delete event;
1008                                          continue;
1009                      
1010                                      } catch (...)
1011                                      {
1012 marek      1.17.12.1                     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception.");
1013 h.sterling 1.1                           delete event;
1014                                          continue;
1015                                      } //end try
1016                      
1017                                  } //while eventqueue
1018                      
1019 h.sterling 1.10                  // Copy the failed indications back to the main queue
1020                                  // We now lock the queue while adding the retries on to the queue so that new events can't get in in front
1021                                  // Of those events we are retrying. Retried events happened before any new events coming in.
1022 h.sterling 1.1                   IndicationDispatchEvent* tmpEvent = 0;
1023 h.sterling 1.10                  myself->_eventqueue.try_lock();
1024 h.sterling 1.1                   while (tmpEventQueue.size())
1025                                  {
1026 mike       1.14                      tmpEvent = tmpEventQueue.remove_front();
1027                                      myself->_eventqueue.insert_back(tmpEvent);
1028 h.sterling 1.10                      
1029 h.sterling 1.1                   }
1030 h.sterling 1.10                  myself->_eventqueue.unlock();
1031 h.sterling 1.1       
1032 kumpf      1.16              } catch (TimeOut&)
1033 h.sterling 1.1               {
1034 marek      1.17.12.1             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::Time to retry any outstanding indications.");
1035 h.sterling 1.1       
1036                                  //signal the queue in the same way we would if we received a new indication
1037                                  //this allows the thread to fall into the queue processing code
1038                                  myself->_check_queue->signal();
1039                      
1040                              } //time_wait
1041                      
1042                      
1043                          } //shutdown
1044                      
1045                          PEG_METHOD_EXIT();
1046                          return 0;
1047                      }
1048                      
1049                      
1050                      PEGASUS_NAMESPACE_END
1051                      
1052                      
1053                      

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2