(file) Return to ConsumerManager.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

   1 martin 1.29 //%LICENSE////////////////////////////////////////////////////////////////
   2 martin 1.30 //
   3 martin 1.29 // Licensed to The Open Group (TOG) under one or more contributor license
   4             // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
   5             // this work for additional information regarding copyright ownership.
   6             // Each contributor licenses this file to you under the OpenPegasus Open
   7             // Source License; you may not use this file except in compliance with the
   8             // License.
   9 martin 1.30 //
  10 martin 1.29 // Permission is hereby granted, free of charge, to any person obtaining a
  11             // copy of this software and associated documentation files (the "Software"),
  12             // to deal in the Software without restriction, including without limitation
  13             // the rights to use, copy, modify, merge, publish, distribute, sublicense,
  14             // and/or sell copies of the Software, and to permit persons to whom the
  15             // Software is furnished to do so, subject to the following conditions:
  16 martin 1.30 //
  17 martin 1.29 // The above copyright notice and this permission notice shall be included
  18             // in all copies or substantial portions of the Software.
  19 martin 1.30 //
  20 martin 1.29 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  21 martin 1.30 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  22 martin 1.29 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
  23             // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
  24             // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
  25             // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
  26             // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  27 martin 1.30 //
  28 martin 1.29 //////////////////////////////////////////////////////////////////////////
  29 h.sterling 1.1  //
  30                 //%/////////////////////////////////////////////////////////////////////////////
  31                 
  32                 #include <Pegasus/Common/Config.h>
  33                 //#include <cstdlib>
  34                 //#include <dlfcn.h>
  35                 #include <Pegasus/Common/System.h>
  36                 #include <Pegasus/Common/FileSystem.h>
  37                 #include <Pegasus/Common/Tracer.h>
  38                 #include <Pegasus/Common/Logger.h>
  39                 #include <Pegasus/Common/XmlReader.h>
  40 mike       1.15 #include <Pegasus/Common/ThreadPool.h>
  41 h.sterling 1.1  #include <Pegasus/Common/XmlParser.h>
  42                 #include <Pegasus/Common/XmlWriter.h>
  43 mike       1.14 #include <Pegasus/Common/List.h>
  44 mike       1.15 #include <Pegasus/Common/Mutex.h>
  45 h.sterling 1.1  
  46                 #include "ConsumerManager.h"
  47                 
  48                 PEGASUS_NAMESPACE_BEGIN
  49                 PEGASUS_USING_STD;
  50                 
  51 kumpf      1.31 //ATTN: Can we just use a properties file instead??  If we only have one
  52 marek      1.21 // property, we may want to just parse it ourselves.
  53                 // We may need to add more properties, however.  Potential per consumer
  54                 // properties: unloadOk, idleTimout, retryCount, etc
  55 h.sterling 1.1  static struct OptionRow optionsTable[] =
  56                 //optionname defaultvalue rqd  type domain domainsize clname hlpmsg
  57                 {
  58 marek      1.21 {"location", "", false, Option::STRING, 0, 0,
  59                  "location", "library name for the consumer"},
  60 h.sterling 1.1  };
  61                 
  62                 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
  63                 
  64                 //retry settings
  65                 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
  66 h.sterling 1.6  static const Uint32 DEFAULT_RETRY_LAPSE = 300000;  //ms = 5 minutes
  67 h.sterling 1.1  
  68 kumpf      1.31 //constant for fake property that is added to instances when
  69 marek      1.21 // serializing to track the full URL
  70 h.sterling 1.11 static const String URL_PROPERTY = "URLString";
  71 h.sterling 1.1  
  72                 
  73 marek      1.21 ConsumerManager::ConsumerManager(
  74                     const String& consumerDir,
  75                     const String& consumerConfigDir,
  76                     Boolean enableConsumerUnload,
  77 kumpf      1.31     Uint32 idleTimeout) :
  78 marek      1.21         _consumerDir(consumerDir),
  79                         _consumerConfigDir(consumerConfigDir),
  80                         _enableConsumerUnload(enableConsumerUnload),
  81                         _idleTimeout(idleTimeout),
  82                         _forceShutdown(true)
  83 h.sterling 1.1  {
  84                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
  85                 
  86 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
  87                          "Consumer library directory: %s",
  88                          (const char*)consumerDir.getCString()));
  89                      PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
  90                          "Consumer configuration directory: %s",
  91                          (const char*)consumerConfigDir.getCString()));
  92 h.sterling  1.1  
  93 mike        1.20     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
  94 thilo.boehm 1.28         "Consumer unload enabled %d: idle timeout %d",
  95                           enableConsumerUnload,idleTimeout));
  96 h.sterling  1.1  
  97                  
  98 h.sterling  1.8      //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
  99 h.sterling  1.6      //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
 100 h.sterling  1.1  
 101 kumpf       1.3      struct timeval deallocateWait = {15, 0};
 102                      _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
 103 h.sterling  1.1  
 104                      _init();
 105                  
 106                      PEG_METHOD_EXIT();
 107                  }
 108                  
 109                  ConsumerManager::~ConsumerManager()
 110                  {
 111                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
 112                  
 113                      unloadAllConsumers();
 114                  
 115 kumpf       1.3      delete _thread_pool;
 116 h.sterling  1.1  
 117                      ConsumerTable::Iterator i = _consumers.start();
 118                      for (; i!=0; i++)
 119                      {
 120                          DynamicConsumer* consumer = i.value();
 121                          delete consumer;
 122                      }
 123                  
 124 marek       1.18     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
 125 h.sterling  1.1  
 126                      ModuleTable::Iterator j = _modules.start();
 127                      for (;j!=0;j++)
 128                      {
 129                          ConsumerModule* module = j.value();
 130                          delete module;
 131                      }
 132                  
 133 marek       1.18     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
 134 h.sterling  1.1  
 135                      PEG_METHOD_EXIT();
 136                  }
 137                  
 138                  void ConsumerManager::_init()
 139                  {
 140                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
 141                  
 142                      //check if there are any outstanding indications
 143                      Array<String> files;
 144                      Uint32 pos;
 145                      String consumerName;
 146                  
 147                      if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
 148                      {
 149                          for (Uint32 i = 0; i < files.size(); i++)
 150                          {
 151                              pos = files[i].find(".dat");
 152                              if (pos != PEG_NOT_FOUND)
 153                              {
 154                                  consumerName = files[i].subString(0, pos);
 155 h.sterling  1.1  
 156                                  try
 157                                  {
 158 thilo.boehm 1.28                     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 159                                          "Attempting to load indication for '%s'",
 160                                          (const char*)consumerName.getCString()));
 161 h.sterling  1.1                      getConsumer(consumerName);
 162                  
 163                                  } catch (...)
 164                                  {
 165 thilo.boehm 1.28                     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 166                                          "Cannot load consumer from file '%s'",
 167                                          (const char*)files[i].getCString()));
 168 h.sterling  1.1                  }
 169                              }
 170                          }
 171                      }
 172                  
 173                      PEG_METHOD_EXIT();
 174                  }
 175                  
 176                  String ConsumerManager::getConsumerDir()
 177                  {
 178                      return _consumerDir;
 179                  }
 180                  
 181                  String ConsumerManager::getConsumerConfigDir()
 182                  {
 183                      return _consumerConfigDir;
 184                  }
 185                  
 186                  Boolean ConsumerManager::getEnableConsumerUnload()
 187                  {
 188                      return _enableConsumerUnload;
 189 h.sterling  1.1  }
 190                  
 191                  Uint32 ConsumerManager::getIdleTimeout()
 192                  {
 193                      return _idleTimeout;
 194                  }
 195                  
 196 marek       1.21 /** Retrieves the library name associated with the consumer name.
 197                   *  By default, the library name
 198 kumpf       1.31  * is the same as the consumer name.  However, you may specify a different
 199                   * library name in a consumer configuration file.  This file must be named
 200 marek       1.21  *  "MyConsumer.txt" and contain the following: location="libraryName"
 201                   *
 202 kumpf       1.31  * The config file is optional and is generally only needed in cases where
 203 marek       1.21  * there are strict requirements on library naming.
 204                   *
 205 kumpf       1.31  * It is the responsibility of the caller to catch any exceptions
 206 marek       1.21  * thrown by this method.
 207                   */
 208 h.sterling  1.1  String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
 209                  {
 210                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
 211                  
 212                      //default library name is consumer name
 213                      String libraryName = consumerName;
 214                  
 215 marek       1.21     // check whether an alternative library name was specified in an optional
 216                      // consumer config file
 217                      String configFile = FileSystem::getAbsolutePath(
 218                                              (const char*)_consumerConfigDir.getCString(),
 219                                              String(consumerName + ".conf"));
 220 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
 221                          "Looking for config file %s",(const char*)configFile.getCString()));
 222 h.sterling  1.1  
 223                      if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
 224                      {
 225 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 226                              "Found config file for consumer %s",
 227                              (const char*)consumerName.getCString()));
 228 h.sterling  1.1  
 229                          try
 230                          {
 231 kumpf       1.31             //Bugzilla 3765 - Change this to use a member var
 232 marek       1.21             // when OptionManager has a reset option
 233 h.sterling  1.8              OptionManager _optionMgr;
 234 marek       1.21             //comment the following line out later
 235                              _optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
 236 h.sterling  1.1              _optionMgr.mergeFile(configFile);
 237                              _optionMgr.checkRequiredOptions();
 238                  
 239 kumpf       1.31             if (!_optionMgr.lookupValue("location", libraryName) ||
 240 marek       1.21                 (libraryName == String::EMPTY))
 241 h.sterling  1.1              {
 242 thilo.boehm 1.28                 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
 243 marek       1.21                     "Warning: Using default library name since none was "
 244 kumpf       1.31                     "specified in %s",(const char*)configFile.getCString()));
 245 h.sterling  1.1                  libraryName = consumerName;
 246                              }
 247                  
 248                          } catch (Exception & ex)
 249                          {
 250 marek       1.21             throw Exception(
 251                                        MessageLoaderParms(
 252                                            "DynListener.ConsumerManager.INVALID_CONFIG_FILE",
 253                                            "Error reading $0: $1.",
 254                                            configFile,
 255                                            ex.getMessage()));
 256 h.sterling  1.1          }
 257                      } else
 258                      {
 259 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 260                              "No config file exists for %s",
 261                              (const char*)consumerName.getCString()));
 262 h.sterling  1.1      }
 263                  
 264 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 265                          "The library name for %s is %s",
 266                          (const char*)consumerName.getCString(),
 267                          (const char*)libraryName.getCString()));
 268 h.sterling  1.1  
 269                      PEG_METHOD_EXIT();
 270                      return libraryName;
 271                  }
 272                  
 273 marek       1.21 /** Returns the DynamicConsumer for the consumerName.  If it already exists,
 274                   *  we return the one in the cache.  If it
 275 h.sterling  1.1   *  DNE, we create it and initialize it, and add it to the table.
 276 marek       1.21  * @throws Exception if we cannot successfully create and
 277                   *  initialize the consumer
 278 kumpf       1.31  */
 279 h.sterling  1.1  DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
 280                  {
 281                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
 282                  
 283                      DynamicConsumer* consumer = 0;
 284                      CIMIndicationConsumerProvider* consumerRef = 0;
 285                      Boolean cached = false;
 286                      Boolean entryExists = false;
 287                  
 288                      AutoMutex lock(_consumerTableMutex);
 289                  
 290                      if (_consumers.lookup(consumerName, consumer))
 291                      {
 292                          //why isn't this working??
 293                          entryExists = true;
 294                  
 295                          if (consumer && consumer->isLoaded())
 296                          {
 297 thilo.boehm 1.28             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 298                                  "Consumer exists in the cache and is already loaded: %s",
 299                                  (const char*)consumerName.getCString()));
 300 h.sterling  1.1              cached = true;
 301                          }
 302                      } else
 303                      {
 304 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
 305                              "Consumer not found in cache, creating %s",
 306                              (const char*)consumerName.getCString()));
 307 h.sterling  1.1          consumer = new DynamicConsumer(consumerName);
 308                          //ATTN: The above is a memory leak if _initConsumer throws an exception
 309                          //need to delete it in that case
 310                      }
 311                  
 312                      if (!cached)
 313                      {
 314                          _initConsumer(consumerName, consumer);
 315                  
 316                          if (!entryExists)
 317                          {
 318                              _consumers.insert(consumerName, consumer);
 319                          }
 320                      }
 321                  
 322                      consumer->updateIdleTimer();
 323                  
 324                      PEG_METHOD_EXIT();
 325                      return consumer;
 326                  }
 327                  
 328 h.sterling  1.1  /** Initializes a DynamicConsumer.
 329 kumpf       1.31  * Caller assumes responsibility for mutexing the operation as well as
 330 marek       1.21  * ensuring the consumer does not already exist.
 331 h.sterling  1.1   * @throws Exception if the consumer cannot be initialized
 332                   */
 333 marek       1.21 void ConsumerManager::_initConsumer(
 334                           const String& consumerName,
 335                           DynamicConsumer* consumer)
 336 h.sterling  1.1  {
 337                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
 338                  
 339                      CIMIndicationConsumerProvider* base = 0;
 340                      ConsumerModule* module = 0;
 341                  
 342 kumpf       1.31     // lookup provider module in cache (if it exists, it returns
 343 marek       1.21     // the cached module, otherwise it creates and returns a new one)
 344 h.sterling  1.1      String libraryName = _getConsumerLibraryName(consumerName);
 345                      module = _lookupModule(libraryName);
 346                  
 347                      //build library path
 348 marek       1.21     String libraryPath = FileSystem::getAbsolutePath(
 349                                               (const char*)_consumerDir.getCString(),
 350                                               FileSystem::buildLibraryFileName(libraryName));
 351 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Loading library: %s",
 352                          (const char*)libraryPath.getCString()));
 353 h.sterling  1.1  
 354                      //load module
 355                      try
 356                      {
 357                          base = module->load(consumerName, libraryPath);
 358                          consumer->set(module, base);
 359                  
 360                      } catch (Exception& ex)
 361                      {
 362 kumpf       1.31         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 363 thilo.boehm 1.28             "Error loading consumer module: %s",
 364                              (const char*)ex.getMessage().getCString()));
 365 marek       1.21 
 366                          throw Exception(
 367                                    MessageLoaderParms(
 368                                        "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 369                                        "Cannot load module ($0:$1): Unknown exception.",
 370                                        consumerName,
 371                                        libraryName));
 372 h.sterling  1.1      } catch (...)
 373                      {
 374 marek       1.21         throw Exception(
 375                                    MessageLoaderParms(
 376                                        "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 377                                        "Cannot load module ($0:$1): Unknown exception.",
 378                                        consumerName,
 379                                        libraryName));
 380 h.sterling  1.1      }
 381                  
 382 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 383                          "Successfully loaded consumer module %s",
 384                          (const char*)libraryName.getCString()));
 385 h.sterling  1.1  
 386                      //initialize consumer
 387                      try
 388                      {
 389 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Initializing Consumer %s",
 390                              (const char*)consumerName.getCString()));
 391 h.sterling  1.1  
 392                          consumer->initialize();
 393                  
 394 venkat.puvvada 1.32         Semaphore* semaphore = consumer->getShutdownSemaphore();
 395 h.sterling     1.1  
 396                             //start the worker thread
 397 konrad.r       1.7          if (_thread_pool->allocate_and_awaken(consumer,
 398 h.sterling     1.1                                            _worker_routine,
 399 konrad.r       1.7                                            semaphore) != PEGASUS_THREAD_OK)
 400 marek          1.21         {
 401                                 PEG_TRACE_CSTRING(
 402                                     TRC_LISTENER,
 403 marek          1.24                 Tracer::LEVEL1,
 404 marek          1.21                 "Could not allocate thread for consumer.");
 405                     
 406                                 throw Exception(
 407                                     MessageLoaderParms(
 408                                         "DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
 409                                         "Not enough threads for consumer worker routine."));
 410 konrad.r       1.7          }
 411 h.sterling     1.1  
 412 marek          1.21         //wait until the listening thread has started.
 413                             // Otherwise, there is a miniscule chance that the first event will
 414                             // be enqueued before the consumer is waiting for it and the first
 415                             // indication after loading the consumer will be lost
 416 h.sterling     1.8          consumer->waitForEventThread();
 417                     
 418 h.sterling     1.1          //load any outstanding requests
 419 kumpf          1.31         Array<IndicationDispatchEvent> outstandingIndications =
 420 marek          1.21             _deserializeOutstandingIndications(consumerName);
 421 h.sterling     1.1          if (outstandingIndications.size())
 422                             {
 423                                 //the consumer will signal itself in _loadOustandingIndications
 424                                 consumer->_loadOutstandingIndications(outstandingIndications);
 425                             }
 426                     
 427 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 428                                 "Successfully initialized consumer %s",
 429                                 (const char*)consumerName.getCString()));
 430 h.sterling     1.1  
 431                         } catch (...)
 432                         {
 433                             module->unloadModule();
 434                             consumer->reset();
 435 marek          1.21         throw Exception(
 436                                 MessageLoaderParms(
 437                                     "DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
 438                                     "Cannot initialize consumer ($0).",
 439 kumpf          1.31                 consumerName));
 440 h.sterling     1.1      }
 441                     
 442 kumpf          1.31     PEG_METHOD_EXIT();
 443 h.sterling     1.1  }
 444                     
 445                     
 446 marek          1.21 /** Returns the ConsumerModule with the given library name.
 447                      *  If it already exists, we return the one in the cache.  If it
 448 h.sterling     1.1   *  DNE, we create it and add it to the table.
 449 kumpf          1.31  * @throws Exception if we cannot successfully create and
 450 marek          1.21  *  initialize the consumer
 451 kumpf          1.31  */
 452 marek          1.21 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
 453 h.sterling     1.1  {
 454                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
 455                     
 456                         AutoMutex lock(_moduleTableMutex);
 457                     
 458                         ConsumerModule* module = 0;
 459                     
 460                         //see if consumer module is cached
 461                         if (_modules.lookup(libraryName, module))
 462                         {
 463 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 464 kumpf          1.31             "Found Consumer Module %s in Consumer Manager Cache",
 465 thilo.boehm    1.28             (const char*)libraryName.getCString()));
 466 h.sterling     1.1  
 467                         } else
 468                         {
 469 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 470                                 "Creating Consumer Provider Module %s",
 471                                 (const char*)libraryName.getCString()));
 472 h.sterling     1.1  
 473 kumpf          1.31         module = new ConsumerModule();
 474 h.sterling     1.1          _modules.insert(libraryName, module);
 475                         }
 476                     
 477                         PEG_METHOD_EXIT();
 478                         return(module);
 479                     }
 480                     
 481                     /** Returns true if there are active consumers
 482 kumpf          1.31  */
 483 h.sterling     1.1  Boolean ConsumerManager::hasActiveConsumers()
 484                     {
 485                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
 486                     
 487                         AutoMutex lock(_consumerTableMutex);
 488                         DynamicConsumer* consumer = 0;
 489                     
 490                         try
 491                         {
 492                             for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 493                             {
 494                                 consumer = i.value();
 495                     
 496 kumpf          1.31             if (consumer &&
 497                                     consumer->isLoaded() &&
 498 marek          1.21                 (consumer->getPendingIndications() > 0))
 499 h.sterling     1.1              {
 500 thilo.boehm    1.28                 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 501                                         "Found active consumer: %s" ,
 502                                         (const char*)consumer->_name.getCString()));
 503 h.sterling     1.1                  PEG_METHOD_EXIT();
 504                                     return true;
 505                                 }
 506                             }
 507                         } catch (...)
 508                         {
 509                             // Unexpected exception; do not assume that no providers are loaded
 510 thilo.boehm    1.28         PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
 511 marek          1.21             "Unexpected Exception in hasActiveConsumers.");
 512 h.sterling     1.1          PEG_METHOD_EXIT();
 513                             return true;
 514                         }
 515                     
 516                         PEG_METHOD_EXIT();
 517                         return false;
 518                     }
 519                     
 520                     /** Returns true if there are loaded consumers
 521 kumpf          1.31  */
 522 h.sterling     1.1  Boolean ConsumerManager::hasLoadedConsumers()
 523                     {
 524                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
 525                     
 526                         AutoMutex lock(_consumerTableMutex);
 527                         DynamicConsumer* consumer = 0;
 528                     
 529                         try
 530                         {
 531                             for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 532                             {
 533                                 consumer = i.value();
 534                     
 535                                 if (consumer && consumer->isLoaded())
 536                                 {
 537 thilo.boehm    1.28                 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 538                                          "Found loaded consumer: %s",
 539                                          (const char*)consumer->_name.getCString()));
 540 h.sterling     1.1                  PEG_METHOD_EXIT();
 541                                     return true;
 542                                 }
 543                             }
 544                         } catch (...)
 545                         {
 546                             // Unexpected exception; do not assume that no providers are loaded
 547 thilo.boehm    1.28         PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
 548 marek          1.21             "Unexpected Exception in hasLoadedConsumers.");
 549 h.sterling     1.1          PEG_METHOD_EXIT();
 550                             return true;
 551                         }
 552                     
 553                         PEG_METHOD_EXIT();
 554                         return false;
 555                     }
 556                     
 557                     
 558                     /** Shutting down a consumer consists of four major steps:
 559 kumpf          1.31  * 1) Send the shutdown signal.  This causes the worker routine to break out
 560 marek          1.21  *    of the loop and exit.
 561                      * 2) Wait for the worker thread to end.  This may take a while if it's
 562                      *    processing an indication.  This is optional in a shutdown scenario.
 563                      *    If the listener is shutdown with a -f force, the listener
 564                      *    will not wait for the consumer to finish before shutting down.
 565                      *    Note that a normal shutdown only allows the current consumer indication
 566                      *    to finish.  All other queued indications are serialized to a log and
 567 h.sterling     1.1   *    are sent when the consumer is reoaded.
 568                      * 3) Terminate the consumer provider interface.
 569 marek          1.21  * 4) Decrement the module refcount (the module will automatically unload when
 570                      *    it's refcount == 0)
 571 kumpf          1.31  *
 572 marek          1.21  * In a scenario where more multiple consumers are loaded, the shutdown signal
 573                      * should be sent to all of the consumers so the threads can finish
 574                      * simultaneously.
 575 kumpf          1.31  *
 576 marek          1.21  * ATTN: Should the normal shutdown wait for everything in the queue to be
 577                      * processed?  Just new indications to be processed?  I am not inclined to this
 578                      * solution since it could take a LOT of time.  By serializing and deserialing
 579 kumpf          1.31  * indications between shutdown and startup, I feel like we do not need to
 580 marek          1.21  * process ALL queued indications on shutdown.
 581 kumpf          1.31  */
 582 h.sterling     1.1  
 583                     /** Unloads all consumers.
 584 kumpf          1.31  */
 585 h.sterling     1.1  void ConsumerManager::unloadAllConsumers()
 586                     {
 587                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
 588                     
 589                         AutoMutex lock(_consumerTableMutex);
 590                     
 591                         if (!_consumers.size())
 592                         {
 593 marek          1.21         PEG_TRACE_CSTRING(
 594                                 TRC_LISTENER,
 595                                 Tracer::LEVEL4,
 596                                 "There are no consumers to unload.");
 597 h.sterling     1.1          PEG_METHOD_EXIT();
 598                             return;
 599                         }
 600                     
 601                         if (!_forceShutdown)
 602                         {
 603 marek          1.21         // wait until all the consumers have finished processing the events in
 604                             // their queue
 605                             // ATTN: Should this have a timeout even though it's a force??
 606 h.sterling     1.1          while (hasActiveConsumers())
 607                             {
 608 mike           1.15             Threads::sleep(500);
 609 h.sterling     1.1          }
 610                         }
 611                     
 612                         Array<DynamicConsumer*> loadedConsumers;
 613                     
 614                         ConsumerTable::Iterator i = _consumers.start();
 615                         DynamicConsumer* consumer = 0;
 616                     
 617                         for (; i!=0; i++)
 618                         {
 619                             consumer = i.value();
 620                             if (consumer && consumer->isLoaded())
 621                             {
 622                                 loadedConsumers.append(consumer);
 623                             }
 624                         }
 625                     
 626                         if (loadedConsumers.size())
 627                         {
 628                             try
 629                             {
 630 h.sterling     1.1              _unloadConsumers(loadedConsumers);
 631                     
 632 kumpf          1.16         } catch (Exception&)
 633 h.sterling     1.1          {
 634 marek          1.21             PEG_TRACE_CSTRING(
 635                                     TRC_LISTENER,
 636 marek          1.25                 Tracer::LEVEL2,
 637 marek          1.21                 "Error unloading consumers.");
 638 h.sterling     1.1          }
 639                         } else
 640                         {
 641 marek          1.21         PEG_TRACE_CSTRING(
 642                                 TRC_LISTENER,
 643                                 Tracer::LEVEL4,
 644                                 "There are no consumers to unload.");
 645 h.sterling     1.1      }
 646                     
 647                         PEG_METHOD_EXIT();
 648                     }
 649                     
 650                     /** Unloads idle consumers.
 651 kumpf          1.31  */
 652 h.sterling     1.1  void ConsumerManager::unloadIdleConsumers()
 653                     {
 654                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
 655                     
 656                         AutoMutex lock(_consumerTableMutex);
 657                     
 658                         if (!_consumers.size())
 659                         {
 660 marek          1.21         PEG_TRACE_CSTRING(
 661                                 TRC_LISTENER,
 662                                 Tracer::LEVEL4,
 663                                 "There are no consumers to unload.");
 664 h.sterling     1.1          PEG_METHOD_EXIT();
 665                             return;
 666                         }
 667                     
 668                         Array<DynamicConsumer*> loadedConsumers;
 669                     
 670                         ConsumerTable::Iterator i = _consumers.start();
 671                         DynamicConsumer* consumer = 0;
 672                     
 673                         for (; i!=0; i++)
 674                         {
 675                             consumer = i.value();
 676                             if (consumer && consumer->isLoaded() && consumer->isIdle())
 677                             {
 678                                 loadedConsumers.append(consumer);
 679                             }
 680                         }
 681                     
 682                         if (loadedConsumers.size())
 683                         {
 684                             try
 685 h.sterling     1.1          {
 686                                 _unloadConsumers(loadedConsumers);
 687                     
 688 kumpf          1.16         } catch (Exception&)
 689 h.sterling     1.1          {
 690 marek          1.21             PEG_TRACE_CSTRING(
 691                                     TRC_LISTENER,
 692 marek          1.25                 Tracer::LEVEL2,
 693 marek          1.21                 "Error unloading consumers.");
 694 h.sterling     1.1          }
 695                         } else
 696                         {
 697 marek          1.21         PEG_TRACE_CSTRING(
 698                                 TRC_LISTENER,
 699                                 Tracer::LEVEL4,
 700                                 "There are no consumers to unload.");
 701 h.sterling     1.1      }
 702                     
 703                         PEG_METHOD_EXIT();
 704                     }
 705                     
 706                     /** Unloads a single consumer.
 707 kumpf          1.31  */
 708 h.sterling     1.1  void ConsumerManager::unloadConsumer(const String& consumerName)
 709                     {
 710                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
 711                     
 712                         AutoMutex lock(_consumerTableMutex);
 713                     
 714                         DynamicConsumer* consumer = 0;
 715                     
 716                         //check whether the consumer exists
 717                         if (!_consumers.lookup(consumerName, consumer))
 718                         {
 719 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
 720                                 "Error: cannot unload consumer, unknown consumer %s",
 721                                 (const char*)consumerName.getCString()));
 722 h.sterling     1.1          return;
 723                         }
 724                     
 725                         //check whether the consumer is loaded
 726                         if (consumer && consumer->isLoaded())  //ATTN: forceShutdown?
 727                         {
 728                             //unload the consumer
 729                             Array<DynamicConsumer*> loadedConsumers;
 730                             loadedConsumers.append(consumer);
 731                     
 732                             try
 733                             {
 734                                 _unloadConsumers(loadedConsumers);
 735                     
 736 kumpf          1.16         } catch (Exception&)
 737 h.sterling     1.1          {
 738 thilo.boehm    1.28             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
 739 marek          1.21                 "Error unloading consumers.");
 740 h.sterling     1.1          }
 741                     
 742                         } else
 743                         {
 744 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
 745                                 "Error: cannot unload the not loaded consumer %s",
 746                                 (const char*)consumerName.getCString()));
 747 h.sterling     1.1      }
 748                     
 749                         PEG_METHOD_EXIT();
 750                     }
 751                     
 752                     /** Unloads the consumers in the given array.
 753                      *  The consumerTable mutex MUST be locked prior to entering this method.
 754 kumpf          1.31  */
 755 marek          1.21 void ConsumerManager::_unloadConsumers(
 756                         Array<DynamicConsumer*> consumersToUnload)
 757 h.sterling     1.1  {
 758                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
 759                     
 760                         //tell consumers to shutdown
 761                         for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 762                         {
 763                             consumersToUnload[i]->sendShutdownSignal();
 764                         }
 765                     
 766 thilo.boehm    1.28     PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL3,
 767 marek          1.21         "Sent shutdown signal to all consumers.");
 768                     
 769                         // wait for all the consumer worker threads to complete
 770 kumpf          1.31     // since we can only shutdown after they are all complete,
 771 marek          1.21     // it does not matter if the first, fifth, or last
 772                         // consumer takes the longest; the wait time is equal to the time it takes
 773                         // for the busiest consumer to stop processing its requests.
 774 h.sterling     1.1      for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 775                         {
 776 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,"Unloading consumer %s",
 777                                 (const char*)consumersToUnload[i]->getName().getCString()));
 778 h.sterling     1.1  
 779                             //wait for the consumer worker thread to end
 780 kumpf          1.31         Semaphore* _shutdownSemaphore =
 781 kumpf          1.26             consumersToUnload[i]->getShutdownSemaphore();
 782                             if (_shutdownSemaphore && !_shutdownSemaphore->time_wait(10000))
 783 h.sterling     1.1          {
 784 kumpf          1.26             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,
 785 marek          1.21                 "Timed out while attempting to stop consumer thread.");
 786 h.sterling     1.1          }
 787                     
 788 thilo.boehm    1.28         PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL2,"Terminating consumer.");
 789 h.sterling     1.1  
 790                             try
 791                             {
 792                                 //terminate consumer provider interface
 793                                 consumersToUnload[i]->terminate();
 794                     
 795                                 //unload consumer provider module
 796                                 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
 797                                 consumersToUnload[i]->_module->unloadModule();
 798                     
 799                                 //serialize outstanding indications
 800 marek          1.21             _serializeOutstandingIndications(
 801                                     consumersToUnload[i]->getName(),
 802                                     consumersToUnload[i]->_retrieveOutstandingIndications());
 803 h.sterling     1.1  
 804                                 //reset the consumer
 805                                 consumersToUnload[i]->reset();
 806                     
 807 thilo.boehm    1.28             PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL2,
 808 marek          1.21                 "Consumer library successfully unloaded.");
 809 h.sterling     1.1  
 810                             } catch (Exception& e)
 811                             {
 812 kumpf          1.31             PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
 813 thilo.boehm    1.28                 "Error unloading consumer: %s",
 814 kumpf          1.31                 (const char*)e.getMessage().getCString()));
 815 h.sterling     1.1              //ATTN: throw exception? log warning?
 816                             }
 817                         }
 818                     
 819                         PEG_METHOD_EXIT();
 820                     }
 821                     
 822                     /** Serializes oustanding indications to a <MyConsumer>.dat file
 823                      */
 824 marek          1.21 void ConsumerManager::_serializeOutstandingIndications(
 825 kumpf          1.31     const String& consumerName,
 826 marek          1.21     Array<IndicationDispatchEvent> indications)
 827                     {
 828                         PEG_METHOD_ENTER(
 829                             TRC_LISTENER,
 830                             "ConsumerManager::_serializeOutstandingIndications");
 831 h.sterling     1.1  
 832                         if (!indications.size())
 833                         {
 834                             PEG_METHOD_EXIT();
 835                             return;
 836                         }
 837                     
 838 marek          1.21     String fileName = FileSystem::getAbsolutePath(
 839                                               (const char*)_consumerConfigDir.getCString(),
 840                                               String(consumerName + ".dat"));
 841 thilo.boehm    1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Consumer dat file: %s",
 842                             (const char*)fileName.getCString()));
 843 h.sterling     1.1  
 844 mike           1.9      Buffer buffer;
 845 h.sterling     1.1  
 846 kumpf          1.31     // Open the log file and serialize remaining
 847 h.sterling     1.1      FILE* fileHandle = 0;
 848 kumpf          1.31     fileHandle = fopen((const char*)fileName.getCString(), "w");
 849 h.sterling     1.1  
 850                         if (!fileHandle)
 851                         {
 852 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 853                                 "Unable to open log file for %s",
 854                                 (const char*)consumerName.getCString()));
 855 h.sterling     1.1  
 856                         } else
 857                         {
 858 mike           1.20         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
 859 h.sterling     1.1                        "Serializing %d outstanding requests for %s",
 860                                           indications.size(),
 861 marek          1.18                       (const char*)consumerName.getCString()));
 862 h.sterling     1.1  
 863 marek          1.21         // we have to put the array of instances under a valid root element
 864 kumpf          1.31         // or the parser complains
 865 h.sterling     1.1          XmlWriter::append(buffer, "<IRETURNVALUE>\n");
 866                     
 867 marek          1.21         CIMInstance cimInstance;
 868 h.sterling     1.1          for (Uint32 i = 0; i < indications.size(); i++)
 869                             {
 870 marek          1.21             //set the URL string property on the serializable instance
 871                                 CIMValue cimValue(CIMTYPE_STRING, false);
 872                                 cimValue.set(indications[i].getURL());
 873                                 cimInstance = indications[i].getIndicationInstance();
 874                                 CIMProperty cimProperty(URL_PROPERTY, cimValue);
 875                                 cimInstance.addProperty(cimProperty);
 876 h.sterling     1.11 
 877                                 XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
 878 marek          1.21         }
 879 h.sterling     1.1  
 880 kumpf          1.19         XmlWriter::append(buffer, "</IRETURNVALUE>");
 881 h.sterling     1.1  
 882                             fputs((const char*)buffer.getData(), fileHandle);
 883                     
 884                             fclose(fileHandle);
 885                         }
 886                     
 887                         PEG_METHOD_EXIT();
 888                     }
 889                     
 890                     /** Reads outstanding indications from a <MyConsumer>.dat file
 891 kumpf          1.31  */
 892                     Array<IndicationDispatchEvent>
 893 marek          1.21     ConsumerManager::_deserializeOutstandingIndications(
 894                             const String& consumerName)
 895                     {
 896                         PEG_METHOD_ENTER(
 897                             TRC_LISTENER,
 898                             "ConsumerManager::_deserializeOutstandingIndications");
 899                     
 900                         String fileName = FileSystem::getAbsolutePath(
 901                                               (const char*)_consumerConfigDir.getCString(),
 902                                               String(consumerName + ".dat"));
 903 thilo.boehm    1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 904                             "Consumer dat file: %s",(const char*)fileName.getCString()));
 905 h.sterling     1.1  
 906                         Array<CIMInstance> cimInstances;
 907 marek          1.21     Array<String>      urlStrings;
 908                         Array<IndicationDispatchEvent> indications;
 909 h.sterling     1.1  
 910                         // Open the log file and serialize remaining indications
 911                         if (FileSystem::exists(fileName)  && FileSystem::canRead(fileName))
 912                         {
 913 mike           1.9          Buffer text;
 914 h.sterling     1.1          CIMInstance cimInstance;
 915 marek          1.21         CIMProperty cimProperty;
 916                             CIMValue cimValue;
 917                             String urlString;
 918 h.sterling     1.1          XmlEntry entry;
 919                     
 920                             try
 921                             {
 922 marek          1.21             //ATTN: Is this safe to use; what about CRLFs?
 923                                 FileSystem::loadFileToMemory(text, fileName);
 924 h.sterling     1.1  
 925                                 //parse the file
 926                                 XmlParser parser((char*)text.getData());
 927                                 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
 928                     
 929                                 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
 930                                 {
 931 marek          1.21                 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
 932                                     if (index != PEG_NOT_FOUND)
 933                                     {
 934 kumpf          1.31                     // get the URL string property from the serialized instance
 935 marek          1.21                     // and remove the property
 936                                         cimProperty = cimInstance.getProperty(index);
 937                                         cimValue = cimProperty.getValue();
 938                                         cimValue.get(urlString);
 939                                         cimInstance.removeProperty(index);
 940                                     }
 941 kumpf          1.31                 IndicationDispatchEvent* indicationEvent =
 942 marek          1.21                     new IndicationDispatchEvent(
 943 kumpf          1.31                         OperationContext(),
 944                                             urlString,
 945 marek          1.21                         cimInstance);
 946                     
 947 h.sterling     1.11                 indications.append(*indicationEvent);
 948 h.sterling     1.1              }
 949                     
 950                                 XmlReader::expectEndTag(parser, "IRETURNVALUE");
 951                     
 952 mike           1.20             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
 953 h.sterling     1.1                            "Consumer %s has %d outstanding indications",
 954                                               (const char*)consumerName.getCString(),
 955 marek          1.18                           indications.size()));
 956 h.sterling     1.1  
 957 kumpf          1.31             //delete the file
 958 h.sterling     1.1              FileSystem::removeFile(fileName);
 959                     
 960                             } catch (Exception& ex)
 961                             {
 962 thilo.boehm    1.28             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 963                                     "Error parsing dat file for consumer %s: %s",
 964                                     (const char*)consumerName.getCString(),
 965                                     (const char*)ex.getMessage().getCString()));
 966 h.sterling     1.1  
 967                             } catch (...)
 968                             {
 969 thilo.boehm    1.28             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 970                                     "Error parsing dat file for consumer %s: Unknown Exception",
 971                                     (const char*)consumerName.getCString()));
 972 h.sterling     1.1          }
 973                         }
 974                     
 975                         PEG_METHOD_EXIT();
 976 h.sterling     1.11     return indications;
 977 h.sterling     1.1  }
 978                     
 979                     
 980                     
 981 kumpf          1.31 /**
 982 marek          1.21  * This is the main worker thread of the consumer.  By having only one thread
 983 kumpf          1.31  * per consumer, we eliminate a ton of synchronization issues and make it easy
 984 marek          1.21  * to prevent the consumer from performing two mutually exclusive operations
 985 kumpf          1.31  * at once.  This also prevents one bad consumer from taking the entire
 986 marek          1.21  * listener down.  That being said, it is up to the programmer to write smart
 987 kumpf          1.31  * consumers, and to ensure that their actions don't deadlock
 988 marek          1.21  * the worker thread.
 989 kumpf          1.31  *
 990 marek          1.21  * If a consumer receives a lot of traffic, or it's consumeIndication() method
 991                      * takes a considerable amount of time to complete, it may make sense to make
 992                      * the consumer multi-threaded.  The individual consumer can immediately
 993                      * spawn off* new threads to handle indications, and return immediately to
 994 kumpf          1.31  * catch the next indication.  In this way, a consumer can attain
 995                      * extremely high performance.
 996                      *
 997 h.sterling     1.1   * There are three different events that can signal us:
 998                      * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
 999 marek          1.21  * 2) A shutdown signal (signalled from ConsumerManager, due to a listener
1000                      *    shutdown or an idle consumer state)
1001 h.sterling     1.1   * 3) A retry signal (signalled from this routine itself)
1002 kumpf          1.31  *
1003 marek          1.21  * The idea is that all new indications are put on the front of the queue and
1004 kumpf          1.31  * processed first.  All of the retry indications are put on the back of the
1005 marek          1.21  * queue and are only processed AFTER all new indications are sent.
1006                      * Before processing each indication, we check to see whether or not the
1007 kumpf          1.31  * shutdown signal was given.
1008                      * If so, we immediately break out of the loop, and another compenent
1009 marek          1.21  * serializes the remaining indications to a file.
1010 kumpf          1.31  *
1011                      * An indication gets retried
1012 marek          1.21  *     if the consumer throws a CIM_ERR_FAILED exception.
1013 kumpf          1.31  *
1014                      * This function makes sure it waits until the default retry lapse has passed
1015 marek          1.21  * to avoid issues with the following scenario:
1016 h.sterling     1.5   * 20 new indications come in, 10 of them are successful, 10 are not.
1017 kumpf          1.31  * We were signalled 20 times, so we will pass the time_wait 20 times.
1018 marek          1.21  * Perceivably, the process time on each indication could be minimal.
1019                      * We could potentially proceed to process the retries after a very small time
1020                      * interval since we would never hit the wait for the retry timeout.
1021 kumpf          1.31  *
1022                      */
1023                     ThreadReturnType PEGASUS_THREAD_CDECL
1024 marek          1.21     ConsumerManager::_worker_routine(void *param)
1025 h.sterling     1.1  {
1026                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
1027                     
1028                         DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
1029                         String name = myself->getName();
1030 mike           1.15     List<IndicationDispatchEvent,Mutex> tmpEventQueue;
1031 h.sterling     1.1  
1032 thilo.boehm    1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
1033                             "_worker_routine::entering loop for %s",
1034                             (const char*)name.getCString()));
1035 h.sterling     1.1  
1036 h.sterling     1.8      myself->_listeningSemaphore->signal();
1037                     
1038 h.sterling     1.1      while (true)
1039                         {
1040 kumpf          1.31         PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,"_worker_routine::waiting %s",
1041 thilo.boehm    1.28             (const char*)name.getCString()));
1042                     
1043 kumpf          1.26 
1044                             //wait to be signalled
1045                             if (!myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE))
1046 h.sterling     1.1          {
1047 kumpf          1.26             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4,
1048                                     "_worker_routine::Time to retry any outstanding indications.");
1049                     
1050                                 // signal the queue in the same way we would,
1051                                 // if we received a new indication
1052                                 // this allows the thread to fall into the queue processing code
1053                                 myself->_check_queue->signal();
1054                     
1055                                 continue;
1056                             }
1057                     
1058 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1059                                 "_worker_routine::signalled %s",(const char*)name.getCString()));
1060 h.sterling     1.1  
1061 kumpf          1.26         //check whether we received the shutdown signal
1062                             if (myself->_dieNow)
1063                             {
1064 thilo.boehm    1.28             PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1065                                     "_worker_routine::shutdown received %s",
1066                                     (const char*)name.getCString()));
1067 kumpf          1.26             break;
1068                             }
1069 h.sterling     1.1  
1070 kumpf          1.26         //create a temporary queue to store failed indications
1071                             tmpEventQueue.clear();
1072 h.sterling     1.1  
1073 kumpf          1.26         //continue processing events until the queue is empty
1074                             //make sure to check for the shutdown signal before every iteration
1075                             // Note that any time during our processing of events the Listener
1076                             // may be enqueueing NEW events for us to process.
1077                             // Because we are popping off the front and new events are being
1078                             // thrown on the back if events are failing when we start
1079                             // But are succeeding by the end of the processing, events may be
1080                             // sent out of chronological order.
1081                             // However. Once we complete the current queue of events, we will
1082                             // always send old events to be retried before sending any
1083                             // new events added afterwards.
1084                             while (myself->_eventqueue.size())
1085                             {
1086                                 //check for shutdown signal
1087                                 //this only breaks us out of the queue loop, but we will
1088                                 //immediately get through the next wait from
1089                                 //the shutdown signal itself, at which time we break
1090                                 //out of the main loop
1091 h.sterling     1.1              if (myself->_dieNow)
1092                                 {
1093 thilo.boehm    1.28                 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
1094                                         "Received signal to shutdown, jumping out of queue loop %s",
1095                                         (const char*)name.getCString()));
1096 h.sterling     1.1                  break;
1097                                 }
1098                     
1099 kumpf          1.26             //pop next indication off the queue
1100                                 IndicationDispatchEvent* event = 0;
1101                                 //what exceptions/errors can this throw?
1102                                 event = myself->_eventqueue.remove_front();
1103 h.sterling     1.1  
1104 kumpf          1.26             if (!event)
1105 h.sterling     1.1              {
1106 kumpf          1.26                 //this should never happen
1107                                     continue;
1108                                 }
1109 h.sterling     1.1  
1110 thilo.boehm    1.28             PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1111                                     "_worker_routine::consumeIndication %s",
1112                                     (const char*)name.getCString()));
1113 h.sterling     1.1  
1114 kumpf          1.26             try
1115                                 {
1116                                     myself->consumeIndication(event->getContext(),
1117                                                               event->getURL(),
1118                                                               event->getIndicationInstance());
1119                     
1120 thilo.boehm    1.28                 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1121                                         "_worker_routine::processed indication successfully. %s",
1122                                         (const char*)name.getCString()));
1123 h.sterling     1.1  
1124 kumpf          1.26                 delete event;
1125                                     continue;
1126                                 }
1127                                 catch (CIMException & ce)
1128                                 {
1129                                     //check for failure
1130                                     if (ce.getCode() == CIM_ERR_FAILED)
1131 h.sterling     1.1                  {
1132 kumpf          1.31                     PEG_TRACE((TRC_LISTENER, Tracer::LEVEL2,
1133 kumpf          1.26                         "_worker_routine::consumeIndication() temporary"
1134 thilo.boehm    1.28                             " failure %s : %s",
1135                                             (const char*)name.getCString(),
1136                                             (const char*)ce.getMessage().getCString()));
1137 kumpf          1.31 
1138 kumpf          1.26                     // Here we simply determine if we should increment
1139                                         // the retry count or not.
1140                                         // We don't want to count a forced retry from a new
1141 kumpf          1.31                     // event to count as a retry.
1142 kumpf          1.26                     // We just have to do it for order's sake.
1143                                         // If the retry Lapse has lapsed on this event,
1144                                         // then increment the counter.
1145                                         if (event->getRetries() > 0)
1146                                         {
1147 kumpf          1.31                         Sint64 differenceInMicroseconds =
1148 kumpf          1.26                             CIMDateTime::getDifference(
1149                                                     event->getLastAttemptTime(),
1150                                                     CIMDateTime::getCurrentDateTime());
1151 h.sterling     1.1  
1152 kumpf          1.31                         if (differenceInMicroseconds >=
1153 kumpf          1.26                                 (DEFAULT_RETRY_LAPSE * 1000))
1154 marek          1.21                         {
1155 h.sterling     1.10                             event->increaseRetries();
1156                                             }
1157 kumpf          1.26                     }
1158                                         else
1159                                         {
1160                                             event->increaseRetries();
1161                                         }
1162 h.sterling     1.1  
1163 kumpf          1.26                     //determine if we have hit the max retry count
1164                                         if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
1165                                         {
1166                                             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
1167                                                 "Error: the maximum retry count has been "
1168                                                     "exceeded.  Removing the event from "
1169                                                     "the queue.");
1170                     
1171                                             Logger::put(
1172                                                 Logger::ERROR_LOG,
1173                                                 System::CIMLISTENER,
1174                                                 Logger::SEVERE,
1175                                                 "The following indication did not get "
1176                                                     "processed successfully: $0",
1177                                             event->getIndicationInstance().getPath().toString());
1178 h.sterling     1.1  
1179                                             delete event;
1180                                             continue;
1181                                         }
1182 kumpf          1.26                     else
1183                                         {
1184                                             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4,
1185                                                 "_worker_routine::placing failed indication "
1186                                                     "back in queue");
1187                                             tmpEventQueue.insert_back(event);
1188                                         }
1189                                     }
1190                                     else
1191 h.sterling     1.1                  {
1192 thilo.boehm    1.28                     PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
1193                                             "Error: consumeIndication() permanent failure: %s",
1194                                             (const char*)ce.getMessage().getCString()));
1195 h.sterling     1.1                      delete event;
1196                                         continue;
1197 kumpf          1.26                 }
1198                                 }
1199                                 catch (Exception & ex)
1200 h.sterling     1.1              {
1201 thilo.boehm    1.28                 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
1202                                         "Error: consumeIndication() permanent failure: %s",
1203                                         (const char*)ex.getMessage().getCString()));
1204 kumpf          1.26                 delete event;
1205                                     continue;
1206 h.sterling     1.1              }
1207 kumpf          1.26             catch (...)
1208                                 {
1209                                     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
1210                                         "Error: consumeIndication() failed: Unknown exception.");
1211                                     delete event;
1212                                     continue;
1213                                 } //end try
1214                     
1215                             } //while eventqueue
1216                     
1217                             // Copy the failed indications back to the main queue
1218                             // We now lock the queue while adding the retries on to the queue
1219                             // so that new events can't get in in front
1220                             // Of those events we are retrying. Retried events happened before
1221                             // any new events coming in.
1222                             IndicationDispatchEvent* tmpEvent = 0;
1223 kumpf          1.27         if (myself->_eventqueue.try_lock())
1224 h.sterling     1.1          {
1225 kumpf          1.27             while (tmpEventQueue.size())
1226                                 {
1227                                     tmpEvent = tmpEventQueue.remove_front();
1228                                     myself->_eventqueue.insert_back(tmpEvent);
1229                                 }
1230                     
1231                                 myself->_eventqueue.unlock();
1232                             }
1233                             else
1234                             {
1235                                 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3,
1236                                     "Failed to lock _eventqueue");
1237 kumpf          1.26         }
1238 h.sterling     1.1      } //shutdown
1239                     
1240                         PEG_METHOD_EXIT();
1241                         return 0;
1242                     }
1243                     
1244                     
1245                     PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2