(file) Return to ConsumerManager.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

   1 karl  1.12 //%2006////////////////////////////////////////////////////////////////////////
   2 h.sterling 1.1  //
   3                 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4                 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5                 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6                 // IBM Corp.; EMC Corporation, The Open Group.
   7                 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8                 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9                 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10                 // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 karl       1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  12                 // EMC Corporation; Symantec Corporation; The Open Group.
  13 h.sterling 1.1  //
  14                 // Permission is hereby granted, free of charge, to any person obtaining a copy
  15                 // of this software and associated documentation files (the "Software"), to
  16                 // deal in the Software without restriction, including without limitation the
  17                 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  18                 // sell copies of the Software, and to permit persons to whom the Software is
  19                 // furnished to do so, subject to the following conditions:
  20                 // 
  21                 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  22                 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  23                 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  24                 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  25                 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  26                 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  27                 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  28                 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29                 //
  30                 //==============================================================================
  31                 //
  32                 //%/////////////////////////////////////////////////////////////////////////////
  33                 
  34 h.sterling 1.1  #include <Pegasus/Common/Config.h>
  35                 //#include <cstdlib>
  36                 //#include <dlfcn.h>
  37                 #include <Pegasus/Common/System.h>
  38                 #include <Pegasus/Common/FileSystem.h>
  39                 #include <Pegasus/Common/Tracer.h>
  40                 #include <Pegasus/Common/Logger.h>
  41                 #include <Pegasus/Common/XmlReader.h>
  42 mike       1.15 #include <Pegasus/Common/ThreadPool.h>
  43 h.sterling 1.1  #include <Pegasus/Common/XmlParser.h>
  44                 #include <Pegasus/Common/XmlWriter.h>
  45 mike       1.14 #include <Pegasus/Common/List.h>
  46 mike       1.15 #include <Pegasus/Common/Mutex.h>
  47 h.sterling 1.1  
  48                 #include "ConsumerManager.h"
  49                 
  50                 PEGASUS_NAMESPACE_BEGIN
  51                 PEGASUS_USING_STD;
  52                 
  53 marek      1.21 //ATTN: Can we just use a properties file instead??  If we only have one 
  54                 // property, we may want to just parse it ourselves.
  55                 // We may need to add more properties, however.  Potential per consumer
  56                 // properties: unloadOk, idleTimout, retryCount, etc
  57 h.sterling 1.1  static struct OptionRow optionsTable[] =
  58                 //optionname defaultvalue rqd  type domain domainsize clname hlpmsg
  59                 {
  60 marek      1.21 {"location", "", false, Option::STRING, 0, 0,
  61                  "location", "library name for the consumer"},
  62 h.sterling 1.1  };
  63                 
  64                 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
  65                 
  66                 //retry settings
  67                 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
  68 h.sterling 1.6  static const Uint32 DEFAULT_RETRY_LAPSE = 300000;  //ms = 5 minutes
  69 h.sterling 1.1  
  70 marek      1.21 //constant for fake property that is added to instances when 
  71                 // serializing to track the full URL
  72 h.sterling 1.11 static const String URL_PROPERTY = "URLString";
  73 h.sterling 1.1  
  74                 
  75 marek      1.21 ConsumerManager::ConsumerManager(
  76                     const String& consumerDir,
  77                     const String& consumerConfigDir,
  78                     Boolean enableConsumerUnload,
  79                     Uint32 idleTimeout) : 
  80                         _consumerDir(consumerDir),
  81                         _consumerConfigDir(consumerConfigDir),
  82                         _enableConsumerUnload(enableConsumerUnload),
  83                         _idleTimeout(idleTimeout),
  84                         _forceShutdown(true)
  85 h.sterling 1.1  {
  86                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
  87                 
  88 marek      1.21     PEG_TRACE_STRING(
  89                         TRC_LISTENER, 
  90                         Tracer::LEVEL4,
  91                         "Consumer library directory: " + consumerDir);
  92                     PEG_TRACE_STRING(
  93                         TRC_LISTENER,
  94                         Tracer::LEVEL4,
  95                         "Consumer configuration directory: " + consumerConfigDir);
  96 h.sterling 1.1  
  97 mike       1.20     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
  98 h.sterling 1.1                    "Consumer unload enabled %d: idle timeout %d",
  99                                   enableConsumerUnload,
 100 marek      1.18                   idleTimeout));
 101 h.sterling 1.1  
 102                 
 103 h.sterling 1.8      //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
 104 h.sterling 1.6      //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
 105 h.sterling 1.1  
 106 kumpf      1.3      struct timeval deallocateWait = {15, 0};
 107                     _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
 108 h.sterling 1.1  
 109                     _init();
 110                 
 111                     PEG_METHOD_EXIT();
 112                 }
 113                 
 114                 ConsumerManager::~ConsumerManager()
 115                 {
 116                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
 117                 
 118                     unloadAllConsumers();
 119                 
 120 kumpf      1.3      delete _thread_pool;
 121 h.sterling 1.1  
 122                     ConsumerTable::Iterator i = _consumers.start();
 123                     for (; i!=0; i++)
 124                     {
 125                         DynamicConsumer* consumer = i.value();
 126                         delete consumer;
 127                     }
 128                 
 129 marek      1.18     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
 130 h.sterling 1.1  
 131                     ModuleTable::Iterator j = _modules.start();
 132                     for (;j!=0;j++)
 133                     {
 134                         ConsumerModule* module = j.value();
 135                         delete module;
 136                     }
 137                 
 138 marek      1.18     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
 139 h.sterling 1.1  
 140                     PEG_METHOD_EXIT();
 141                 }
 142                 
 143                 void ConsumerManager::_init()
 144                 {
 145                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
 146                 
 147                     //check if there are any outstanding indications
 148                     Array<String> files;
 149                     Uint32 pos;
 150                     String consumerName;
 151                 
 152                     if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
 153                     {
 154                         for (Uint32 i = 0; i < files.size(); i++)
 155                         {
 156                             pos = files[i].find(".dat");
 157                             if (pos != PEG_NOT_FOUND)
 158                             {
 159                                 consumerName = files[i].subString(0, pos);
 160 h.sterling 1.1  
 161                                 try
 162                                 {
 163 marek      1.21                     PEG_TRACE_STRING(
 164                                         TRC_LISTENER,
 165                                         Tracer::LEVEL4,
 166                                         "Attempting to load indication for!" + 
 167                                             consumerName + "!");
 168 h.sterling 1.1                      getConsumer(consumerName);
 169                 
 170                                 } catch (...)
 171                                 {
 172 marek      1.21                     PEG_TRACE_STRING(
 173                                         TRC_LISTENER,
 174                                         Tracer::LEVEL4,
 175                                         "Cannot load consumer from file " + files[i]);
 176 h.sterling 1.1                  }
 177                             }
 178                         }
 179                     }
 180                 
 181                     PEG_METHOD_EXIT();
 182                 }
 183                 
 184                 String ConsumerManager::getConsumerDir()
 185                 {
 186                     return _consumerDir;
 187                 }
 188                 
 189                 String ConsumerManager::getConsumerConfigDir()
 190                 {
 191                     return _consumerConfigDir;
 192                 }
 193                 
 194                 Boolean ConsumerManager::getEnableConsumerUnload()
 195                 {
 196                     return _enableConsumerUnload;
 197 h.sterling 1.1  }
 198                 
 199                 Uint32 ConsumerManager::getIdleTimeout()
 200                 {
 201                     return _idleTimeout;
 202                 }
 203                 
 204 marek      1.21 /** Retrieves the library name associated with the consumer name.
 205                  *  By default, the library name
 206                  * is the same as the consumer name.  However, you may specify a different 
 207                  * library name in a consumer configuration file.  This file must be named 
 208                  *  "MyConsumer.txt" and contain the following: location="libraryName"
 209                  *
 210                  * The config file is optional and is generally only needed in cases where 
 211                  * there are strict requirements on library naming.
 212                  *
 213                  * It is the responsibility of the caller to catch any exceptions 
 214                  * thrown by this method.
 215                  */
 216 h.sterling 1.1  String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
 217                 {
 218                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
 219                 
 220                     //default library name is consumer name
 221                     String libraryName = consumerName;
 222                 
 223 marek      1.21     // check whether an alternative library name was specified in an optional
 224                     // consumer config file
 225                     String configFile = FileSystem::getAbsolutePath(
 226                                             (const char*)_consumerConfigDir.getCString(),
 227                                             String(consumerName + ".conf"));
 228                     PEG_TRACE_STRING(
 229                         TRC_LISTENER, 
 230                         Tracer::LEVEL4,
 231                         "Looking for config file " + configFile);
 232 h.sterling 1.1  
 233                     if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
 234                     {
 235 marek      1.21         PEG_TRACE_STRING(
 236                             TRC_LISTENER,
 237                             Tracer::LEVEL4,
 238                             "Found config file for consumer " + consumerName);
 239 h.sterling 1.1  
 240                         try
 241                         {
 242 marek      1.21             //Bugzilla 3765 - Change this to use a member var 
 243                             // when OptionManager has a reset option
 244 h.sterling 1.8              OptionManager _optionMgr;
 245 marek      1.21             //comment the following line out later
 246                             _optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
 247 h.sterling 1.1              _optionMgr.mergeFile(configFile);
 248                             _optionMgr.checkRequiredOptions();
 249                 
 250 marek      1.21             if (!_optionMgr.lookupValue("location", libraryName) || 
 251                                 (libraryName == String::EMPTY))
 252 h.sterling 1.1              {
 253 marek      1.21                 PEG_TRACE_STRING(
 254                                     TRC_LISTENER,
 255                                     Tracer::LEVEL2,
 256                                     "Warning: Using default library name since none was "
 257                                         "specified in " + configFile); 
 258 h.sterling 1.1                  libraryName = consumerName;
 259                             }
 260                 
 261                         } catch (Exception & ex)
 262                         {
 263 marek      1.21             throw Exception(
 264                                       MessageLoaderParms(
 265                                           "DynListener.ConsumerManager.INVALID_CONFIG_FILE",
 266                                           "Error reading $0: $1.",
 267                                           configFile,
 268                                           ex.getMessage()));
 269 h.sterling 1.1          }
 270                     } else
 271                     {
 272 marek      1.21         PEG_TRACE_STRING(
 273                             TRC_LISTENER,
 274                             Tracer::LEVEL4,
 275                             "No config file exists for " + consumerName);
 276 h.sterling 1.1      }
 277                 
 278 marek      1.21     PEG_TRACE_STRING(
 279                         TRC_LISTENER,
 280                         Tracer::LEVEL4,
 281                         "The library name for " + consumerName + " is " + libraryName);
 282 h.sterling 1.1  
 283                     PEG_METHOD_EXIT();
 284                     return libraryName;
 285                 }
 286                 
 287 marek      1.21 /** Returns the DynamicConsumer for the consumerName.  If it already exists,
 288                  *  we return the one in the cache.  If it
 289 h.sterling 1.1   *  DNE, we create it and initialize it, and add it to the table.
 290 marek      1.21  * @throws Exception if we cannot successfully create and
 291                  *  initialize the consumer
 292 h.sterling 1.1   */ 
 293                 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
 294                 {
 295                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
 296                 
 297                     DynamicConsumer* consumer = 0;
 298                     CIMIndicationConsumerProvider* consumerRef = 0;
 299                     Boolean cached = false;
 300                     Boolean entryExists = false;
 301                 
 302                     AutoMutex lock(_consumerTableMutex);
 303                 
 304                     if (_consumers.lookup(consumerName, consumer))
 305                     {
 306                         //why isn't this working??
 307                         entryExists = true;
 308                 
 309                         if (consumer && consumer->isLoaded())
 310                         {
 311 marek      1.21             PEG_TRACE_STRING(
 312                                 TRC_LISTENER,
 313                                 Tracer::LEVEL3,
 314                                 "Consumer exists in the cache and"
 315                                     " is already loaded: " + consumerName);
 316 h.sterling 1.1              cached = true;
 317                         }
 318                     } else
 319                     {
 320 marek      1.21         PEG_TRACE_STRING(
 321                             TRC_LISTENER,
 322                             Tracer::LEVEL3,
 323                             "Consumer not found in cache, creating " + consumerName);
 324 h.sterling 1.1          consumer = new DynamicConsumer(consumerName);
 325                         //ATTN: The above is a memory leak if _initConsumer throws an exception
 326                         //need to delete it in that case
 327                     }
 328                 
 329                     if (!cached)
 330                     {
 331                         _initConsumer(consumerName, consumer);
 332                 
 333                         if (!entryExists)
 334                         {
 335                             _consumers.insert(consumerName, consumer);
 336                         }
 337                     }
 338                 
 339                     consumer->updateIdleTimer();
 340                 
 341                     PEG_METHOD_EXIT();
 342                     return consumer;
 343                 }
 344                 
 345 h.sterling 1.1  /** Initializes a DynamicConsumer.
 346 marek      1.21  * Caller assumes responsibility for mutexing the operation as well as 
 347                  * ensuring the consumer does not already exist.
 348 h.sterling 1.1   * @throws Exception if the consumer cannot be initialized
 349                  */
 350 marek      1.21 void ConsumerManager::_initConsumer(
 351                          const String& consumerName,
 352                          DynamicConsumer* consumer)
 353 h.sterling 1.1  {
 354                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
 355                 
 356                     CIMIndicationConsumerProvider* base = 0;
 357                     ConsumerModule* module = 0;
 358                 
 359 marek      1.21     // lookup provider module in cache (if it exists, it returns 
 360                     // the cached module, otherwise it creates and returns a new one)
 361 h.sterling 1.1      String libraryName = _getConsumerLibraryName(consumerName);
 362                     module = _lookupModule(libraryName);
 363                 
 364                     //build library path
 365 marek      1.21     String libraryPath = FileSystem::getAbsolutePath(
 366                                              (const char*)_consumerDir.getCString(),
 367                                              FileSystem::buildLibraryFileName(libraryName));
 368                     PEG_TRACE_STRING(
 369                         TRC_LISTENER,
 370                         Tracer::LEVEL4,
 371                         "Loading library: " + libraryPath);
 372 h.sterling 1.1  
 373                     //load module
 374                     try
 375                     {
 376                         base = module->load(consumerName, libraryPath);
 377                         consumer->set(module, base);
 378                 
 379                     } catch (Exception& ex)
 380                     {
 381 marek      1.21         PEG_TRACE_STRING(
 382                             TRC_LISTENER,
 383                             Tracer::LEVEL2, 
 384                             "Error loading consumer module: " + ex.getMessage());
 385                 
 386                         throw Exception(
 387                                   MessageLoaderParms(
 388                                       "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 389                                       "Cannot load module ($0:$1): Unknown exception.",
 390                                       consumerName,
 391                                       libraryName));
 392 h.sterling 1.1      } catch (...)
 393                     {
 394 marek      1.21         throw Exception(
 395                                   MessageLoaderParms(
 396                                       "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 397                                       "Cannot load module ($0:$1): Unknown exception.",
 398                                       consumerName,
 399                                       libraryName));
 400 h.sterling 1.1      }
 401                 
 402 marek      1.21     PEG_TRACE_STRING(
 403                         TRC_LISTENER,
 404                         Tracer::LEVEL4,
 405                         "Successfully loaded consumer module " + libraryName);
 406 h.sterling 1.1  
 407                     //initialize consumer
 408                     try
 409                     {
 410 marek      1.21         PEG_TRACE_STRING(
 411                             TRC_LISTENER,
 412                             Tracer::LEVEL4,
 413                             "Initializing Consumer " +  consumerName);
 414 h.sterling 1.1  
 415                         consumer->initialize();
 416                 
 417                         //ATTN: need to change this
 418                         Semaphore* semaphore = new Semaphore(0);  //blocking
 419                 
 420                         consumer->setShutdownSemaphore(semaphore);
 421                 
 422                         //start the worker thread
 423 konrad.r   1.7          if (_thread_pool->allocate_and_awaken(consumer,
 424 h.sterling 1.1                                            _worker_routine,
 425 konrad.r   1.7                                            semaphore) != PEGASUS_THREAD_OK)
 426 marek      1.21         {
 427                             Logger::put(
 428                                 Logger::STANDARD_LOG,
 429                                 System::CIMLISTENER,
 430                                 Logger::TRACE,
 431                                 "Not enough threads for consumer.");
 432 konrad.r   1.7   
 433 marek      1.21             PEG_TRACE_CSTRING(
 434                                 TRC_LISTENER,
 435                                 Tracer::LEVEL2,
 436                                 "Could not allocate thread for consumer.");
 437                 
 438                             consumer->setShutdownSemaphore(0);
 439                             delete semaphore;
 440                             throw Exception(
 441                                 MessageLoaderParms(
 442                                     "DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
 443                                     "Not enough threads for consumer worker routine."));
 444 konrad.r   1.7          }
 445 h.sterling 1.1  
 446 marek      1.21         //wait until the listening thread has started.
 447                         // Otherwise, there is a miniscule chance that the first event will
 448                         // be enqueued before the consumer is waiting for it and the first
 449                         // indication after loading the consumer will be lost
 450 h.sterling 1.8          consumer->waitForEventThread();
 451                 
 452 h.sterling 1.1          //load any outstanding requests
 453 marek      1.21         Array<IndicationDispatchEvent> outstandingIndications = 
 454                             _deserializeOutstandingIndications(consumerName);
 455 h.sterling 1.1          if (outstandingIndications.size())
 456                         {
 457                             //the consumer will signal itself in _loadOustandingIndications
 458                             consumer->_loadOutstandingIndications(outstandingIndications);
 459                         }
 460                 
 461 marek      1.21         PEG_TRACE_STRING(
 462                             TRC_LISTENER,
 463                             Tracer::LEVEL4,
 464                             "Successfully initialized consumer " + consumerName);
 465 h.sterling 1.1  
 466                     } catch (...)
 467                     {
 468                         module->unloadModule();
 469                         consumer->reset();
 470 marek      1.21         throw Exception(
 471                             MessageLoaderParms(
 472                                 "DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
 473                                 "Cannot initialize consumer ($0).",
 474                                 consumerName));        
 475 h.sterling 1.1      }
 476                 
 477                     PEG_METHOD_EXIT();    
 478                 }
 479                 
 480                 
 481 marek      1.21 /** Returns the ConsumerModule with the given library name.
 482                  *  If it already exists, we return the one in the cache.  If it
 483 h.sterling 1.1   *  DNE, we create it and add it to the table.
 484 marek      1.21  * @throws Exception if we cannot successfully create and 
 485                  *  initialize the consumer
 486 h.sterling 1.1   */ 
 487 marek      1.21 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
 488 h.sterling 1.1  {
 489                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
 490                 
 491                     AutoMutex lock(_moduleTableMutex);
 492                 
 493                     ConsumerModule* module = 0;
 494                 
 495                     //see if consumer module is cached
 496                     if (_modules.lookup(libraryName, module))
 497                     {
 498 marek      1.21         PEG_TRACE_STRING(
 499                             TRC_LISTENER,
 500                             Tracer::LEVEL4,
 501                             "Found Consumer Module" + 
 502                                 libraryName + " in Consumer Manager Cache");
 503 h.sterling 1.1  
 504                     } else
 505                     {
 506 marek      1.21         PEG_TRACE_STRING(
 507                             TRC_LISTENER,
 508                             Tracer::LEVEL4,
 509                             "Creating Consumer Provider Module " + libraryName);
 510 h.sterling 1.1  
 511                         module = new ConsumerModule(); 
 512                         _modules.insert(libraryName, module);
 513                     }
 514                 
 515                     PEG_METHOD_EXIT();
 516                     return(module);
 517                 }
 518                 
 519                 /** Returns true if there are active consumers
 520                  */ 
 521                 Boolean ConsumerManager::hasActiveConsumers()
 522                 {
 523                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
 524                 
 525                     AutoMutex lock(_consumerTableMutex);
 526                     DynamicConsumer* consumer = 0;
 527                 
 528                     try
 529                     {
 530                         for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 531 h.sterling 1.1          {
 532                             consumer = i.value();
 533                 
 534 marek      1.21             if (consumer && 
 535                                 consumer->isLoaded() && 
 536                                 (consumer->getPendingIndications() > 0))
 537 h.sterling 1.1              {
 538 marek      1.21                 PEG_TRACE_STRING(
 539                                     TRC_LISTENER,
 540                                     Tracer::LEVEL4,
 541                                     "Found active consumer: " + consumer->_name);
 542 h.sterling 1.1                  PEG_METHOD_EXIT();
 543                                 return true;
 544                             }
 545                         }
 546                     } catch (...)
 547                     {
 548                         // Unexpected exception; do not assume that no providers are loaded
 549 marek      1.21         PEG_TRACE_CSTRING(
 550                             TRC_LISTENER,
 551                             Tracer::LEVEL2,
 552                             "Unexpected Exception in hasActiveConsumers.");
 553 h.sterling 1.1          PEG_METHOD_EXIT();
 554                         return true;
 555                     }
 556                 
 557                     PEG_METHOD_EXIT();
 558                     return false;
 559                 }
 560                 
 561                 /** Returns true if there are loaded consumers
 562                  */ 
 563                 Boolean ConsumerManager::hasLoadedConsumers()
 564                 {
 565                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
 566                 
 567                     AutoMutex lock(_consumerTableMutex);
 568                     DynamicConsumer* consumer = 0;
 569                 
 570                     try
 571                     {
 572                         for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 573                         {
 574 h.sterling 1.1              consumer = i.value();
 575                 
 576                             if (consumer && consumer->isLoaded())
 577                             {
 578 marek      1.21                 PEG_TRACE_STRING(
 579                                     TRC_LISTENER,
 580                                     Tracer::LEVEL4,
 581                                     "Found loaded consumer: " + consumer->_name);
 582 h.sterling 1.1                  PEG_METHOD_EXIT();
 583                                 return true;
 584                             }
 585                         }
 586                     } catch (...)
 587                     {
 588                         // Unexpected exception; do not assume that no providers are loaded
 589 marek      1.21         PEG_TRACE_CSTRING(
 590                             TRC_LISTENER,
 591                             Tracer::LEVEL2,
 592                             "Unexpected Exception in hasLoadedConsumers.");
 593 h.sterling 1.1          PEG_METHOD_EXIT();
 594                         return true;
 595                     }
 596                 
 597                     PEG_METHOD_EXIT();
 598                     return false;
 599                 }
 600                 
 601                 
 602                 /** Shutting down a consumer consists of four major steps:
 603 marek      1.21  * 1) Send the shutdown signal.  This causes the worker routine to break out 
 604                  *    of the loop and exit.
 605                  * 2) Wait for the worker thread to end.  This may take a while if it's
 606                  *    processing an indication.  This is optional in a shutdown scenario.
 607                  *    If the listener is shutdown with a -f force, the listener
 608                  *    will not wait for the consumer to finish before shutting down.
 609                  *    Note that a normal shutdown only allows the current consumer indication
 610                  *    to finish.  All other queued indications are serialized to a log and
 611 h.sterling 1.1   *    are sent when the consumer is reoaded.
 612                  * 3) Terminate the consumer provider interface.
 613 marek      1.21  * 4) Decrement the module refcount (the module will automatically unload when
 614                  *    it's refcount == 0)
 615 h.sterling 1.1   * 
 616 marek      1.21  * In a scenario where more multiple consumers are loaded, the shutdown signal
 617                  * should be sent to all of the consumers so the threads can finish
 618                  * simultaneously.
 619 h.sterling 1.1   * 
 620 marek      1.21  * ATTN: Should the normal shutdown wait for everything in the queue to be
 621                  * processed?  Just new indications to be processed?  I am not inclined to this
 622                  * solution since it could take a LOT of time.  By serializing and deserialing
 623                  * indications between shutdown and startup, I feel like we do not need to 
 624                  * process ALL queued indications on shutdown.
 625 h.sterling 1.1   */ 
 626                 
 627                 /** Unloads all consumers.
 628                  */ 
 629                 void ConsumerManager::unloadAllConsumers()
 630                 {
 631                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
 632                 
 633                     AutoMutex lock(_consumerTableMutex);
 634                 
 635                     if (!_consumers.size())
 636                     {
 637 marek      1.21         PEG_TRACE_CSTRING(
 638                             TRC_LISTENER,
 639                             Tracer::LEVEL4,
 640                             "There are no consumers to unload.");
 641 h.sterling 1.1          PEG_METHOD_EXIT();
 642                         return;
 643                     }
 644                 
 645                     if (!_forceShutdown)
 646                     {
 647 marek      1.21         // wait until all the consumers have finished processing the events in
 648                         // their queue
 649                         // ATTN: Should this have a timeout even though it's a force??
 650 h.sterling 1.1          while (hasActiveConsumers())
 651                         {
 652 mike       1.15             Threads::sleep(500);
 653 h.sterling 1.1          }
 654                     }
 655                 
 656                     Array<DynamicConsumer*> loadedConsumers;
 657                 
 658                     ConsumerTable::Iterator i = _consumers.start();
 659                     DynamicConsumer* consumer = 0;
 660                 
 661                     for (; i!=0; i++)
 662                     {
 663                         consumer = i.value();
 664                         if (consumer && consumer->isLoaded())
 665                         {
 666                             loadedConsumers.append(consumer);
 667                         }
 668                     }
 669                 
 670                     if (loadedConsumers.size())
 671                     {
 672                         try
 673                         {
 674 h.sterling 1.1              _unloadConsumers(loadedConsumers);
 675                 
 676 kumpf      1.16         } catch (Exception&)
 677 h.sterling 1.1          {
 678 marek      1.21             PEG_TRACE_CSTRING(
 679                                 TRC_LISTENER,
 680                                 Tracer::LEVEL4,
 681                                 "Error unloading consumers.");
 682 h.sterling 1.1          }
 683                     } else
 684                     {
 685 marek      1.21         PEG_TRACE_CSTRING(
 686                             TRC_LISTENER,
 687                             Tracer::LEVEL4,
 688                             "There are no consumers to unload.");
 689 h.sterling 1.1      }
 690                 
 691                     PEG_METHOD_EXIT();
 692                 }
 693                 
 694                 /** Unloads idle consumers.
 695                  */ 
 696                 void ConsumerManager::unloadIdleConsumers()
 697                 {
 698                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
 699                 
 700                     AutoMutex lock(_consumerTableMutex);
 701                 
 702                     if (!_consumers.size())
 703                     {
 704 marek      1.21         PEG_TRACE_CSTRING(
 705                             TRC_LISTENER,
 706                             Tracer::LEVEL4,
 707                             "There are no consumers to unload.");
 708 h.sterling 1.1          PEG_METHOD_EXIT();
 709                         return;
 710                     }
 711                 
 712                     Array<DynamicConsumer*> loadedConsumers;
 713                 
 714                     ConsumerTable::Iterator i = _consumers.start();
 715                     DynamicConsumer* consumer = 0;
 716                 
 717                     for (; i!=0; i++)
 718                     {
 719                         consumer = i.value();
 720                         if (consumer && consumer->isLoaded() && consumer->isIdle())
 721                         {
 722                             loadedConsumers.append(consumer);
 723                         }
 724                     }
 725                 
 726                     if (loadedConsumers.size())
 727                     {
 728                         try
 729 h.sterling 1.1          {
 730                             _unloadConsumers(loadedConsumers);
 731                 
 732 kumpf      1.16         } catch (Exception&)
 733 h.sterling 1.1          {
 734 marek      1.21             PEG_TRACE_CSTRING(
 735                                 TRC_LISTENER,
 736                                 Tracer::LEVEL4,
 737                                 "Error unloading consumers.");
 738 h.sterling 1.1          }
 739                     } else
 740                     {
 741 marek      1.21         PEG_TRACE_CSTRING(
 742                             TRC_LISTENER,
 743                             Tracer::LEVEL4,
 744                             "There are no consumers to unload.");
 745 h.sterling 1.1      }
 746                 
 747                     PEG_METHOD_EXIT();
 748                 }
 749                 
 750                 /** Unloads a single consumer.
 751                  */ 
 752                 void ConsumerManager::unloadConsumer(const String& consumerName)
 753                 {
 754                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
 755                 
 756                     AutoMutex lock(_consumerTableMutex);
 757                 
 758                     DynamicConsumer* consumer = 0;
 759                 
 760                     //check whether the consumer exists
 761                     if (!_consumers.lookup(consumerName, consumer))
 762                     {
 763 marek      1.21         PEG_TRACE_STRING(
 764                             TRC_LISTENER,
 765                             Tracer::LEVEL3,
 766                             "Error: cannot unload consumer, unknown consumer " + consumerName);
 767 h.sterling 1.1          return;
 768                     }
 769                 
 770                     //check whether the consumer is loaded
 771                     if (consumer && consumer->isLoaded())  //ATTN: forceShutdown?
 772                     {
 773                         //unload the consumer
 774                         Array<DynamicConsumer*> loadedConsumers;
 775                         loadedConsumers.append(consumer);
 776                 
 777                         try
 778                         {
 779                             _unloadConsumers(loadedConsumers);
 780                 
 781 kumpf      1.16         } catch (Exception&)
 782 h.sterling 1.1          {
 783 marek      1.21             PEG_TRACE_CSTRING(
 784                                 TRC_LISTENER, 
 785                                 Tracer::LEVEL4, 
 786                                 "Error unloading consumers.");
 787 h.sterling 1.1          }
 788                 
 789                     } else
 790                     {
 791 marek      1.21         PEG_TRACE_STRING(
 792                             TRC_LISTENER,
 793                             Tracer::LEVEL3,
 794                             "Error: cannot unload consumer " + consumerName);
 795 h.sterling 1.1      }
 796                 
 797                     PEG_METHOD_EXIT();
 798                 }
 799                 
 800                 /** Unloads the consumers in the given array.
 801                  *  The consumerTable mutex MUST be locked prior to entering this method.
 802                  */ 
 803 marek      1.21 void ConsumerManager::_unloadConsumers(
 804                     Array<DynamicConsumer*> consumersToUnload)
 805 h.sterling 1.1  {
 806                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
 807                 
 808                     //tell consumers to shutdown
 809                     for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 810                     {
 811                         consumersToUnload[i]->sendShutdownSignal();
 812                     }
 813                 
 814 marek      1.21     PEG_TRACE_CSTRING(
 815 kumpf      1.22         TRC_LISTENER,
 816 marek      1.21         Tracer::LEVEL4,
 817                         "Sent shutdown signal to all consumers.");
 818                 
 819                     // wait for all the consumer worker threads to complete
 820                     // since we can only shutdown after they are all complete, 
 821                     // it does not matter if the first, fifth, or last
 822                     // consumer takes the longest; the wait time is equal to the time it takes
 823                     // for the busiest consumer to stop processing its requests.
 824 h.sterling 1.1      for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 825                     {
 826 marek      1.21         PEG_TRACE_STRING(
 827                             TRC_LISTENER, 
 828                             Tracer::LEVEL3, 
 829                             "Unloading consumer " + consumersToUnload[i]->getName());
 830 h.sterling 1.1  
 831                         //wait for the consumer worker thread to end
 832                         try
 833                         {
 834 marek      1.21             Semaphore* _shutdownSemaphore = 
 835                                 consumersToUnload[i]->getShutdownSemaphore();
 836 h.sterling 1.1              if (_shutdownSemaphore)
 837                             {
 838                                 _shutdownSemaphore->time_wait(10000); 
 839                             }
 840                 
 841                         } catch (TimeOut &)
 842                         {
 843 marek      1.21             PEG_TRACE_CSTRING(
 844                                 TRC_LISTENER,
 845                                 Tracer::LEVEL2,
 846                                 "Timed out while attempting to stop consumer thread.");
 847 h.sterling 1.1          }
 848                 
 849 marek      1.21         PEG_TRACE_CSTRING(
 850                             TRC_LISTENER,
 851                             Tracer::LEVEL2, 
 852                             "Terminating consumer.");
 853 h.sterling 1.1  
 854                         try
 855                         {
 856                             //terminate consumer provider interface
 857                             consumersToUnload[i]->terminate();
 858                 
 859                             //unload consumer provider module
 860                             PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
 861                             consumersToUnload[i]->_module->unloadModule();
 862                 
 863                             //serialize outstanding indications
 864 marek      1.21             _serializeOutstandingIndications(
 865                                 consumersToUnload[i]->getName(),
 866                                 consumersToUnload[i]->_retrieveOutstandingIndications());
 867 h.sterling 1.1  
 868                             //reset the consumer
 869                             consumersToUnload[i]->reset();
 870                 
 871 marek      1.21             PEG_TRACE_CSTRING(
 872                                 TRC_LISTENER,
 873                                 Tracer::LEVEL3,
 874                                 "Consumer library successfully unloaded.");
 875 h.sterling 1.1  
 876                         } catch (Exception& e)
 877                         {
 878 marek      1.21             PEG_TRACE_STRING(
 879                                 TRC_LISTENER, 
 880                                 Tracer::LEVEL2, 
 881                                 "Error unloading consumer: " + e.getMessage()); 
 882 h.sterling 1.1              //ATTN: throw exception? log warning?
 883                         }
 884                     }
 885                 
 886                     PEG_METHOD_EXIT();
 887                 }
 888                 
 889                 /** Serializes oustanding indications to a <MyConsumer>.dat file
 890                  */
 891 marek      1.21 void ConsumerManager::_serializeOutstandingIndications(
 892                     const String& consumerName, 
 893                     Array<IndicationDispatchEvent> indications)
 894                 {
 895                     PEG_METHOD_ENTER(
 896                         TRC_LISTENER,
 897                         "ConsumerManager::_serializeOutstandingIndications");
 898 h.sterling 1.1  
 899                     if (!indications.size())
 900                     {
 901                         PEG_METHOD_EXIT();
 902                         return;
 903                     }
 904                 
 905 marek      1.21     String fileName = FileSystem::getAbsolutePath(
 906                                           (const char*)_consumerConfigDir.getCString(),
 907                                           String(consumerName + ".dat"));
 908                     PEG_TRACE_STRING(
 909                         TRC_LISTENER,
 910                         Tracer::LEVEL4,
 911                         "Consumer dat file: " + fileName);
 912 h.sterling 1.1  
 913 mike       1.9      Buffer buffer;
 914 h.sterling 1.1  
 915                     // Open the log file and serialize remaining 
 916                     FILE* fileHandle = 0;
 917                     fileHandle = fopen((const char*)fileName.getCString(), "w"); 
 918                 
 919                     if (!fileHandle)
 920                     {
 921 marek      1.21         PEG_TRACE_STRING(
 922                             TRC_LISTENER,
 923                             Tracer::LEVEL2,
 924                             "Unable to open log file for " + consumerName);
 925 h.sterling 1.1  
 926                     } else
 927                     {
 928 mike       1.20         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
 929 h.sterling 1.1                        "Serializing %d outstanding requests for %s",
 930                                       indications.size(),
 931 marek      1.18                       (const char*)consumerName.getCString()));
 932 h.sterling 1.1  
 933 marek      1.21         // we have to put the array of instances under a valid root element
 934                         // or the parser complains 
 935 h.sterling 1.1          XmlWriter::append(buffer, "<IRETURNVALUE>\n");
 936                 
 937 marek      1.21         CIMInstance cimInstance;
 938 h.sterling 1.1          for (Uint32 i = 0; i < indications.size(); i++)
 939                         {
 940 marek      1.21             //set the URL string property on the serializable instance
 941                             CIMValue cimValue(CIMTYPE_STRING, false);
 942                             cimValue.set(indications[i].getURL());
 943                             cimInstance = indications[i].getIndicationInstance();
 944                             CIMProperty cimProperty(URL_PROPERTY, cimValue);
 945                             cimInstance.addProperty(cimProperty);
 946 h.sterling 1.11 
 947                             XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
 948 marek      1.21         }
 949 h.sterling 1.1  
 950 kumpf      1.19         XmlWriter::append(buffer, "</IRETURNVALUE>");
 951 h.sterling 1.1  
 952                         fputs((const char*)buffer.getData(), fileHandle);
 953                 
 954                         fclose(fileHandle);
 955                     }
 956                 
 957                     PEG_METHOD_EXIT();
 958                 }
 959                 
 960                 /** Reads outstanding indications from a <MyConsumer>.dat file
 961                  */ 
 962 marek      1.21 Array<IndicationDispatchEvent> 
 963                     ConsumerManager::_deserializeOutstandingIndications(
 964                         const String& consumerName)
 965                 {
 966                     PEG_METHOD_ENTER(
 967                         TRC_LISTENER,
 968                         "ConsumerManager::_deserializeOutstandingIndications");
 969                 
 970                     String fileName = FileSystem::getAbsolutePath(
 971                                           (const char*)_consumerConfigDir.getCString(),
 972                                           String(consumerName + ".dat"));
 973                     PEG_TRACE_STRING(
 974                         TRC_LISTENER,
 975                         Tracer::LEVEL4,
 976                         "Consumer dat file: " + fileName);
 977 h.sterling 1.1  
 978                     Array<CIMInstance> cimInstances;
 979 marek      1.21     Array<String>      urlStrings;
 980                     Array<IndicationDispatchEvent> indications;
 981 h.sterling 1.1  
 982                     // Open the log file and serialize remaining indications
 983                     if (FileSystem::exists(fileName)  && FileSystem::canRead(fileName))
 984                     {
 985 mike       1.9          Buffer text;
 986 h.sterling 1.1          CIMInstance cimInstance;
 987 marek      1.21         CIMProperty cimProperty;
 988                         CIMValue cimValue;
 989                         String urlString;
 990 h.sterling 1.1          XmlEntry entry;
 991                 
 992                         try
 993                         {
 994 marek      1.21             //ATTN: Is this safe to use; what about CRLFs?
 995                             FileSystem::loadFileToMemory(text, fileName);
 996 h.sterling 1.1  
 997                             //parse the file
 998                             XmlParser parser((char*)text.getData());
 999                             XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
1000                 
1001                             while (XmlReader::getNamedInstanceElement(parser, cimInstance))
1002                             {
1003 marek      1.21                 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
1004                                 if (index != PEG_NOT_FOUND)
1005                                 {
1006                                     // get the URL string property from the serialized instance 
1007                                     // and remove the property
1008                                     cimProperty = cimInstance.getProperty(index);
1009                                     cimValue = cimProperty.getValue();
1010                                     cimValue.get(urlString);
1011                                     cimInstance.removeProperty(index);
1012                                 }
1013                                 IndicationDispatchEvent* indicationEvent = 
1014                                     new IndicationDispatchEvent(
1015                                         OperationContext(), 
1016                                         urlString, 
1017                                         cimInstance);
1018                 
1019 h.sterling 1.11                 indications.append(*indicationEvent);
1020 h.sterling 1.1              }
1021                 
1022                             XmlReader::expectEndTag(parser, "IRETURNVALUE");
1023                 
1024 mike       1.20             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
1025 h.sterling 1.1                            "Consumer %s has %d outstanding indications",
1026                                           (const char*)consumerName.getCString(),
1027 marek      1.18                           indications.size()));
1028 h.sterling 1.1  
1029                             //delete the file 
1030                             FileSystem::removeFile(fileName);
1031                 
1032                         } catch (Exception& ex)
1033                         {
1034 marek      1.21             PEG_TRACE_STRING(
1035                                 TRC_LISTENER,
1036                                 Tracer::LEVEL3,
1037                                 "Error parsing dat file: " + 
1038                                     ex.getMessage() + " " + consumerName);
1039 h.sterling 1.1  
1040                         } catch (...)
1041                         {
1042 marek      1.21             PEG_TRACE_STRING(
1043                                 TRC_LISTENER,
1044                                 Tracer::LEVEL2,
1045                                 "Error parsing dat file: Unknown Exception " + consumerName);
1046 h.sterling 1.1          }
1047                     }
1048                 
1049                     PEG_METHOD_EXIT();
1050 h.sterling 1.11     return indications;
1051 h.sterling 1.1  }
1052                 
1053                 
1054                 
1055                 /** 
1056 marek      1.21  * This is the main worker thread of the consumer.  By having only one thread
1057                  * per consumer, we eliminate a ton of synchronization issues and make it easy 
1058                  * to prevent the consumer from performing two mutually exclusive operations
1059                  * at once.  This also prevents one bad consumer from taking the entire 
1060                  * listener down.  That being said, it is up to the programmer to write smart
1061                  * consumers, and to ensure that their actions don't deadlock 
1062                  * the worker thread.
1063 h.sterling 1.1   * 
1064 marek      1.21  * If a consumer receives a lot of traffic, or it's consumeIndication() method
1065                  * takes a considerable amount of time to complete, it may make sense to make
1066                  * the consumer multi-threaded.  The individual consumer can immediately
1067                  * spawn off* new threads to handle indications, and return immediately to
1068                  * catch the next indication.  In this way, a consumer can attain 
1069                  * extremely high performance. 
1070 h.sterling 1.1   * 
1071                  * There are three different events that can signal us:
1072                  * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
1073 marek      1.21  * 2) A shutdown signal (signalled from ConsumerManager, due to a listener
1074                  *    shutdown or an idle consumer state)
1075 h.sterling 1.1   * 3) A retry signal (signalled from this routine itself)
1076                  * 
1077 marek      1.21  * The idea is that all new indications are put on the front of the queue and
1078                  * processed first.  All of the retry indications are put on the back of the 
1079                  * queue and are only processed AFTER all new indications are sent.
1080                  * Before processing each indication, we check to see whether or not the
1081                  * shutdown signal was given.  
1082                  * If so, we immediately break out of the loop, and another compenent 
1083                  * serializes the remaining indications to a file.
1084 h.sterling 1.1   * 
1085 marek      1.21  * An indication gets retried 
1086                  *     if the consumer throws a CIM_ERR_FAILED exception.
1087 h.sterling 1.1   * 
1088 marek      1.21  * This function makes sure it waits until the default retry lapse has passed 
1089                  * to avoid issues with the following scenario:
1090 h.sterling 1.5   * 20 new indications come in, 10 of them are successful, 10 are not.
1091 marek      1.21  * We were signalled 20 times, so we will pass the time_wait 20 times. 
1092                  * Perceivably, the process time on each indication could be minimal.
1093                  * We could potentially proceed to process the retries after a very small time
1094                  * interval since we would never hit the wait for the retry timeout.
1095 h.sterling 1.1   * 
1096                  */ 
1097 marek      1.21 ThreadReturnType PEGASUS_THREAD_CDECL 
1098                     ConsumerManager::_worker_routine(void *param)
1099 h.sterling 1.1  {
1100                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
1101                 
1102                     DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
1103                     String name = myself->getName();
1104 mike       1.15     List<IndicationDispatchEvent,Mutex> tmpEventQueue;
1105 h.sterling 1.1  
1106 marek      1.21     PEG_TRACE_STRING(
1107                         TRC_LISTENER,
1108                         Tracer::LEVEL2,
1109                         "_worker_routine::entering loop for " + name);
1110 h.sterling 1.1  
1111 h.sterling 1.8      myself->_listeningSemaphore->signal();
1112                 
1113 h.sterling 1.1      while (true)
1114                     {
1115                         try
1116                         {
1117 marek      1.21             PEG_TRACE_STRING(
1118                                 TRC_LISTENER,
1119                                 Tracer::LEVEL4,
1120                                 "_worker_routine::waiting " + name);
1121 h.sterling 1.1  
1122                             //wait to be signalled
1123                             myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
1124                 
1125 marek      1.21             PEG_TRACE_STRING(
1126                                 TRC_LISTENER,
1127                                 Tracer::LEVEL4,
1128                                 "_worker_routine::signalled " + name);
1129 h.sterling 1.1  
1130                             //check whether we received the shutdown signal
1131                             if (myself->_dieNow)
1132                             {
1133 marek      1.21                 PEG_TRACE_STRING(
1134                                     TRC_LISTENER,
1135                                     Tracer::LEVEL4,
1136                                     "_worker_routine::shutdown received " + name);
1137 h.sterling 1.1                  break;
1138                             }
1139                 
1140                             //create a temporary queue to store failed indications
1141 mike       1.14             tmpEventQueue.clear();
1142 h.sterling 1.1  
1143                             //continue processing events until the queue is empty
1144                             //make sure to check for the shutdown signal before every iteration
1145 marek      1.21             // Note that any time during our processing of events the Listener
1146                             // may be enqueueing NEW events for us to process.
1147                             // Because we are popping off the front and new events are being
1148                             // thrown on the back if events are failing when we start
1149                             // But are succeeding by the end of the processing, events may be
1150                             // sent out of chronological order.
1151                             // However. Once we complete the current queue of events, we will
1152                             // always send old events to be retried before sending any
1153 h.sterling 1.10             // new events added afterwards.
1154 h.sterling 1.1              while (myself->_eventqueue.size())
1155                             {
1156                                 //check for shutdown signal
1157 marek      1.21                 //this only breaks us out of the queue loop, but we will
1158                                 //immediately get through the next wait from
1159                                 //the shutdown signal itself, at which time we break
1160                                 //out of the main loop
1161 h.sterling 1.1                  if (myself->_dieNow)
1162                                 {
1163 marek      1.21                     PEG_TRACE_STRING(
1164                                         TRC_LISTENER,
1165                                         Tracer::LEVEL4,
1166                                         "Received signal to shutdown,"
1167                                             " jumping out of queue loop " + name);
1168 h.sterling 1.1                      break;
1169                                 }
1170                 
1171                                 //pop next indication off the queue
1172                                 IndicationDispatchEvent* event = 0;
1173 marek      1.21                 //what exceptions/errors can this throw?
1174                                 event = myself->_eventqueue.remove_front();
1175 h.sterling 1.1  
1176                                 if (!event)
1177                                 {
1178                                     //this should never happen
1179                                     continue;
1180                                 }
1181                 
1182 marek      1.21                 PEG_TRACE_STRING(
1183                                     TRC_LISTENER,
1184                                     Tracer::LEVEL4,
1185                                     "_worker_routine::consumeIndication " + name);
1186 h.sterling 1.1  
1187                                 try
1188                                 {
1189                                     myself->consumeIndication(event->getContext(),
1190                                                               event->getURL(),
1191                                                               event->getIndicationInstance());
1192                 
1193 marek      1.21                     PEG_TRACE_STRING(
1194                                         TRC_LISTENER,
1195                                         Tracer::LEVEL4,
1196                                         "_worker_routine::processed indication successfully. "
1197                                             + name);
1198 h.sterling 1.1  
1199                                     delete event;
1200                                     continue;
1201                 
1202                                 } catch (CIMException & ce)
1203                                 {
1204                                     //check for failure
1205                                     if (ce.getCode() == CIM_ERR_FAILED)
1206                                     {
1207 marek      1.21                         PEG_TRACE_STRING(
1208                                             TRC_LISTENER,
1209                                             Tracer::LEVEL2, 
1210                                             "_worker_routine::consumeIndication() temporary"
1211                                                 " failure: " + ce.getMessage() + " " + name);
1212 h.sterling 1.10                         
1213 marek      1.21                         // Here we simply determine if we should increment
1214                                         // the retry count or not.
1215                                         // We don't want to count a forced retry from a new
1216                                         // event to count as a retry. 
1217                                         // We just have to do it for order's sake.
1218                                         // If the retry Lapse has lapsed on this event,
1219                                         // then increment the counter.
1220                                         if (event->getRetries() > 0)
1221                                         {
1222                                             Sint64 differenceInMicroseconds = 
1223                                                 CIMDateTime::getDifference(
1224                                                     event->getLastAttemptTime(),
1225                                                     CIMDateTime::getCurrentDateTime());
1226                 
1227                                             if (differenceInMicroseconds >= 
1228                                                     (DEFAULT_RETRY_LAPSE * 1000))
1229                                             {
1230 h.sterling 1.10                                 event->increaseRetries();
1231 marek      1.21                             }
1232                                         }
1233                                         else
1234                                         {
1235 h.sterling 1.10                             event->increaseRetries();
1236                                         }
1237 h.sterling 1.1  
1238                                         //determine if we have hit the max retry count
1239                                         if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
1240                                         {
1241 marek      1.21                             PEG_TRACE_CSTRING(
1242                                                 TRC_LISTENER,
1243                                                 Tracer::LEVEL2,
1244                                                 "Error: the maximum retry count has been "
1245                                                     "exceeded.  Removing the event from "
1246                                                         "the queue.");
1247 h.sterling 1.1  
1248                                             Logger::put(
1249 marek      1.21                                 Logger::ERROR_LOG,
1250                                                 System::CIMLISTENER,
1251                                                 Logger::SEVERE,
1252                                                 "The following indication did not get "
1253                                                     "processed successfully: $0",
1254                                           event->getIndicationInstance().getPath().toString());
1255 h.sterling 1.1  
1256                                             delete event;
1257                                             continue;
1258                 
1259                                         } else
1260                                         {
1261 marek      1.21                             PEG_TRACE_CSTRING(
1262                                                 TRC_LISTENER,
1263                                                 Tracer::LEVEL4,
1264                                                 "_worker_routine::placing failed indication "
1265                                                     "back in queue");
1266 mike       1.14                             tmpEventQueue.insert_back(event);
1267 h.sterling 1.1                          }
1268                 
1269                                     } else
1270                                     {
1271 marek      1.21                         PEG_TRACE_STRING(
1272                                             TRC_LISTENER,
1273                                             Tracer::LEVEL2,
1274                                             "Error: consumeIndication() permanent failure: "
1275                                                 + ce.getMessage());
1276 h.sterling 1.1                          delete event;
1277                                         continue;
1278                                     }
1279                 
1280                                 } catch (Exception & ex)
1281                                 {
1282 marek      1.21                     PEG_TRACE_STRING(
1283                                         TRC_LISTENER,
1284                                         Tracer::LEVEL2,
1285                                         "Error: consumeIndication() permanent failure: "
1286                                             + ex.getMessage());
1287 h.sterling 1.1                      delete event;
1288                                     continue;
1289                 
1290                                 } catch (...)
1291                                 {
1292 marek      1.21                     PEG_TRACE_CSTRING(
1293                                         TRC_LISTENER,
1294                                         Tracer::LEVEL2,
1295                                         "Error: consumeIndication() failed: "
1296                                             "Unknown exception.");
1297 h.sterling 1.1                      delete event;
1298                                     continue;
1299                                 } //end try
1300                 
1301                             } //while eventqueue
1302                 
1303 h.sterling 1.10             // Copy the failed indications back to the main queue
1304 marek      1.21             // We now lock the queue while adding the retries on to the queue
1305                             // so that new events can't get in in front
1306                             // Of those events we are retrying. Retried events happened before
1307                             // any new events coming in.
1308 h.sterling 1.1              IndicationDispatchEvent* tmpEvent = 0;
1309 h.sterling 1.10             myself->_eventqueue.try_lock();
1310 h.sterling 1.1              while (tmpEventQueue.size())
1311                             {
1312 mike       1.14                 tmpEvent = tmpEventQueue.remove_front();
1313                                 myself->_eventqueue.insert_back(tmpEvent);
1314 h.sterling 1.10                 
1315 h.sterling 1.1              }
1316 h.sterling 1.10             myself->_eventqueue.unlock();
1317 h.sterling 1.1  
1318 kumpf      1.16         } catch (TimeOut&)
1319 h.sterling 1.1          {
1320 marek      1.21             PEG_TRACE_CSTRING(
1321                                 TRC_LISTENER,
1322                                 Tracer::LEVEL4,
1323                                 "_worker_routine::Time to retry any outstanding indications.");
1324                 
1325                             // signal the queue in the same way we would,
1326                             // if we received a new indication
1327                             // this allows the thread to fall into the queue processing code
1328 h.sterling 1.1              myself->_check_queue->signal();
1329                 
1330                         } //time_wait
1331                 
1332                 
1333                     } //shutdown
1334                 
1335                     PEG_METHOD_EXIT();
1336                     return 0;
1337                 }
1338                 
1339                 
1340                 PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2