(file) Return to ConsumerManager.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

   1 martin 1.29 //%LICENSE////////////////////////////////////////////////////////////////
   2 martin 1.30 //
   3 martin 1.29 // Licensed to The Open Group (TOG) under one or more contributor license
   4             // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
   5             // this work for additional information regarding copyright ownership.
   6             // Each contributor licenses this file to you under the OpenPegasus Open
   7             // Source License; you may not use this file except in compliance with the
   8             // License.
   9 martin 1.30 //
  10 martin 1.29 // Permission is hereby granted, free of charge, to any person obtaining a
  11             // copy of this software and associated documentation files (the "Software"),
  12             // to deal in the Software without restriction, including without limitation
  13             // the rights to use, copy, modify, merge, publish, distribute, sublicense,
  14             // and/or sell copies of the Software, and to permit persons to whom the
  15             // Software is furnished to do so, subject to the following conditions:
  16 martin 1.30 //
  17 martin 1.29 // The above copyright notice and this permission notice shall be included
  18             // in all copies or substantial portions of the Software.
  19 martin 1.30 //
  20 martin 1.29 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
  21 martin 1.30 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  22 martin 1.29 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
  23             // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
  24             // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
  25             // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
  26             // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  27 martin 1.30 //
  28 martin 1.29 //////////////////////////////////////////////////////////////////////////
  29 h.sterling 1.1  //
  30                 //%/////////////////////////////////////////////////////////////////////////////
  31                 
  32                 #include <Pegasus/Common/Config.h>
  33                 //#include <cstdlib>
  34                 //#include <dlfcn.h>
  35                 #include <Pegasus/Common/System.h>
  36                 #include <Pegasus/Common/FileSystem.h>
  37                 #include <Pegasus/Common/Tracer.h>
  38                 #include <Pegasus/Common/Logger.h>
  39                 #include <Pegasus/Common/XmlReader.h>
  40 mike       1.15 #include <Pegasus/Common/ThreadPool.h>
  41 h.sterling 1.1  #include <Pegasus/Common/XmlParser.h>
  42                 #include <Pegasus/Common/XmlWriter.h>
  43 mike       1.14 #include <Pegasus/Common/List.h>
  44 mike       1.15 #include <Pegasus/Common/Mutex.h>
  45 h.sterling 1.1  
  46                 #include "ConsumerManager.h"
  47                 
  48                 PEGASUS_NAMESPACE_BEGIN
  49                 PEGASUS_USING_STD;
  50                 
  51 kumpf      1.31 //ATTN: Can we just use a properties file instead??  If we only have one
  52 marek      1.21 // property, we may want to just parse it ourselves.
  53                 // We may need to add more properties, however.  Potential per consumer
  54                 // properties: unloadOk, idleTimout, retryCount, etc
  55 h.sterling 1.1  static struct OptionRow optionsTable[] =
  56                 //optionname defaultvalue rqd  type domain domainsize clname hlpmsg
  57                 {
  58 marek      1.21 {"location", "", false, Option::STRING, 0, 0,
  59                  "location", "library name for the consumer"},
  60 h.sterling 1.1  };
  61                 
  62                 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
  63                 
  64                 //retry settings
  65                 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
  66 h.sterling 1.6  static const Uint32 DEFAULT_RETRY_LAPSE = 300000;  //ms = 5 minutes
  67 h.sterling 1.1  
  68 kumpf      1.31 //constant for fake property that is added to instances when
  69 marek      1.21 // serializing to track the full URL
  70 h.sterling 1.11 static const String URL_PROPERTY = "URLString";
  71 h.sterling 1.1  
  72                 
  73 marek      1.21 ConsumerManager::ConsumerManager(
  74                     const String& consumerDir,
  75                     const String& consumerConfigDir,
  76                     Boolean enableConsumerUnload,
  77 kumpf      1.31     Uint32 idleTimeout) :
  78 marek      1.21         _consumerDir(consumerDir),
  79                         _consumerConfigDir(consumerConfigDir),
  80                         _enableConsumerUnload(enableConsumerUnload),
  81                         _idleTimeout(idleTimeout),
  82                         _forceShutdown(true)
  83 h.sterling 1.1  {
  84                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
  85                 
  86 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
  87                          "Consumer library directory: %s",
  88                          (const char*)consumerDir.getCString()));
  89                      PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
  90                          "Consumer configuration directory: %s",
  91                          (const char*)consumerConfigDir.getCString()));
  92 h.sterling  1.1  
  93 mike        1.20     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
  94 thilo.boehm 1.28         "Consumer unload enabled %d: idle timeout %d",
  95                           enableConsumerUnload,idleTimeout));
  96 h.sterling  1.1  
  97                  
  98 h.sterling  1.8      //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
  99 h.sterling  1.6      //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
 100 h.sterling  1.1  
 101 kumpf       1.3      struct timeval deallocateWait = {15, 0};
 102                      _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
 103 h.sterling  1.1  
 104                      _init();
 105                  
 106                      PEG_METHOD_EXIT();
 107                  }
 108                  
 109                  ConsumerManager::~ConsumerManager()
 110                  {
 111                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
 112                  
 113                      unloadAllConsumers();
 114                  
 115 kumpf       1.3      delete _thread_pool;
 116 h.sterling  1.1  
 117                      ConsumerTable::Iterator i = _consumers.start();
 118                      for (; i!=0; i++)
 119                      {
 120                          DynamicConsumer* consumer = i.value();
 121                          delete consumer;
 122                      }
 123                  
 124 marek       1.18     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
 125 h.sterling  1.1  
 126                      ModuleTable::Iterator j = _modules.start();
 127                      for (;j!=0;j++)
 128                      {
 129                          ConsumerModule* module = j.value();
 130                          delete module;
 131                      }
 132                  
 133 marek       1.18     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
 134 h.sterling  1.1  
 135                      PEG_METHOD_EXIT();
 136                  }
 137                  
 138                  void ConsumerManager::_init()
 139                  {
 140                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
 141                  
 142                      //check if there are any outstanding indications
 143                      Array<String> files;
 144                      Uint32 pos;
 145                      String consumerName;
 146                  
 147                      if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
 148                      {
 149                          for (Uint32 i = 0; i < files.size(); i++)
 150                          {
 151                              pos = files[i].find(".dat");
 152                              if (pos != PEG_NOT_FOUND)
 153                              {
 154                                  consumerName = files[i].subString(0, pos);
 155 h.sterling  1.1  
 156                                  try
 157                                  {
 158 thilo.boehm 1.28                     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 159                                          "Attempting to load indication for '%s'",
 160                                          (const char*)consumerName.getCString()));
 161 h.sterling  1.1                      getConsumer(consumerName);
 162                  
 163                                  } catch (...)
 164                                  {
 165 thilo.boehm 1.28                     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 166                                          "Cannot load consumer from file '%s'",
 167                                          (const char*)files[i].getCString()));
 168 h.sterling  1.1                  }
 169                              }
 170                          }
 171                      }
 172                  
 173                      PEG_METHOD_EXIT();
 174                  }
 175                  
 176                  String ConsumerManager::getConsumerDir()
 177                  {
 178                      return _consumerDir;
 179                  }
 180                  
 181                  String ConsumerManager::getConsumerConfigDir()
 182                  {
 183                      return _consumerConfigDir;
 184                  }
 185                  
 186                  Boolean ConsumerManager::getEnableConsumerUnload()
 187                  {
 188                      return _enableConsumerUnload;
 189 h.sterling  1.1  }
 190                  
 191                  Uint32 ConsumerManager::getIdleTimeout()
 192                  {
 193                      return _idleTimeout;
 194                  }
 195                  
 196 marek       1.21 /** Retrieves the library name associated with the consumer name.
 197                   *  By default, the library name
 198 kumpf       1.31  * is the same as the consumer name.  However, you may specify a different
 199                   * library name in a consumer configuration file.  This file must be named
 200 marek       1.21  *  "MyConsumer.txt" and contain the following: location="libraryName"
 201                   *
 202 kumpf       1.31  * The config file is optional and is generally only needed in cases where
 203 marek       1.21  * there are strict requirements on library naming.
 204                   *
 205 kumpf       1.31  * It is the responsibility of the caller to catch any exceptions
 206 marek       1.21  * thrown by this method.
 207                   */
 208 h.sterling  1.1  String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
 209                  {
 210                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
 211                  
 212                      //default library name is consumer name
 213                      String libraryName = consumerName;
 214                  
 215 marek       1.21     // check whether an alternative library name was specified in an optional
 216                      // consumer config file
 217                      String configFile = FileSystem::getAbsolutePath(
 218                                              (const char*)_consumerConfigDir.getCString(),
 219                                              String(consumerName + ".conf"));
 220 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
 221                          "Looking for config file %s",(const char*)configFile.getCString()));
 222 h.sterling  1.1  
 223                      if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
 224                      {
 225 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 226                              "Found config file for consumer %s",
 227                              (const char*)consumerName.getCString()));
 228 h.sterling  1.1  
 229                          try
 230                          {
 231 kumpf       1.31             //Bugzilla 3765 - Change this to use a member var
 232 marek       1.21             // when OptionManager has a reset option
 233 h.sterling  1.8              OptionManager _optionMgr;
 234 marek       1.21             //comment the following line out later
 235                              _optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
 236 h.sterling  1.1              _optionMgr.mergeFile(configFile);
 237                              _optionMgr.checkRequiredOptions();
 238                  
 239 kumpf       1.31             if (!_optionMgr.lookupValue("location", libraryName) ||
 240 marek       1.21                 (libraryName == String::EMPTY))
 241 h.sterling  1.1              {
 242 thilo.boehm 1.28                 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
 243 marek       1.21                     "Warning: Using default library name since none was "
 244 kumpf       1.31                     "specified in %s",(const char*)configFile.getCString()));
 245 h.sterling  1.1                  libraryName = consumerName;
 246                              }
 247                  
 248                          } catch (Exception & ex)
 249                          {
 250 marek       1.21             throw Exception(
 251                                        MessageLoaderParms(
 252                                            "DynListener.ConsumerManager.INVALID_CONFIG_FILE",
 253                                            "Error reading $0: $1.",
 254                                            configFile,
 255                                            ex.getMessage()));
 256 h.sterling  1.1          }
 257                      } else
 258                      {
 259 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 260                              "No config file exists for %s",
 261                              (const char*)consumerName.getCString()));
 262 h.sterling  1.1      }
 263                  
 264 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 265                          "The library name for %s is %s",
 266                          (const char*)consumerName.getCString(),
 267                          (const char*)libraryName.getCString()));
 268 h.sterling  1.1  
 269                      PEG_METHOD_EXIT();
 270                      return libraryName;
 271                  }
 272                  
 273 marek       1.21 /** Returns the DynamicConsumer for the consumerName.  If it already exists,
 274                   *  we return the one in the cache.  If it
 275 h.sterling  1.1   *  DNE, we create it and initialize it, and add it to the table.
 276 marek       1.21  * @throws Exception if we cannot successfully create and
 277                   *  initialize the consumer
 278 kumpf       1.31  */
 279 h.sterling  1.1  DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
 280                  {
 281                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
 282                  
 283                      DynamicConsumer* consumer = 0;
 284                      Boolean cached = false;
 285                      Boolean entryExists = false;
 286                  
 287                      AutoMutex lock(_consumerTableMutex);
 288                  
 289                      if (_consumers.lookup(consumerName, consumer))
 290                      {
 291                          //why isn't this working??
 292                          entryExists = true;
 293                  
 294                          if (consumer && consumer->isLoaded())
 295                          {
 296 thilo.boehm 1.28             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 297                                  "Consumer exists in the cache and is already loaded: %s",
 298                                  (const char*)consumerName.getCString()));
 299 h.sterling  1.1              cached = true;
 300                          }
 301                      } else
 302                      {
 303 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
 304                              "Consumer not found in cache, creating %s",
 305                              (const char*)consumerName.getCString()));
 306 h.sterling  1.1          consumer = new DynamicConsumer(consumerName);
 307                      }
 308                  
 309                      if (!cached)
 310                      {
 311 venkat.puvvada 1.33         AutoPtr<DynamicConsumer> destroyer(consumer);
 312 h.sterling     1.1          _initConsumer(consumerName, consumer);
 313 venkat.puvvada 1.33         destroyer.release();
 314 h.sterling     1.1  
 315                             if (!entryExists)
 316                             {
 317                                 _consumers.insert(consumerName, consumer);
 318                             }
 319                         }
 320                     
 321                         consumer->updateIdleTimer();
 322                     
 323                         PEG_METHOD_EXIT();
 324                         return consumer;
 325                     }
 326                     
 327                     /** Initializes a DynamicConsumer.
 328 kumpf          1.31  * Caller assumes responsibility for mutexing the operation as well as
 329 marek          1.21  * ensuring the consumer does not already exist.
 330 h.sterling     1.1   * @throws Exception if the consumer cannot be initialized
 331                      */
 332 marek          1.21 void ConsumerManager::_initConsumer(
 333                              const String& consumerName,
 334                              DynamicConsumer* consumer)
 335 h.sterling     1.1  {
 336                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
 337                     
 338                         CIMIndicationConsumerProvider* base = 0;
 339                         ConsumerModule* module = 0;
 340                     
 341 kumpf          1.31     // lookup provider module in cache (if it exists, it returns
 342 marek          1.21     // the cached module, otherwise it creates and returns a new one)
 343 h.sterling     1.1      String libraryName = _getConsumerLibraryName(consumerName);
 344                         module = _lookupModule(libraryName);
 345                     
 346                         //build library path
 347 marek          1.21     String libraryPath = FileSystem::getAbsolutePath(
 348                                                  (const char*)_consumerDir.getCString(),
 349                                                  FileSystem::buildLibraryFileName(libraryName));
 350 thilo.boehm    1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Loading library: %s",
 351                             (const char*)libraryPath.getCString()));
 352 h.sterling     1.1  
 353                         //load module
 354                         try
 355                         {
 356                             base = module->load(consumerName, libraryPath);
 357                             consumer->set(module, base);
 358                     
 359                         } catch (Exception& ex)
 360                         {
 361 kumpf          1.31         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 362 thilo.boehm    1.28             "Error loading consumer module: %s",
 363                                 (const char*)ex.getMessage().getCString()));
 364 marek          1.21 
 365                             throw Exception(
 366                                       MessageLoaderParms(
 367                                           "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 368                                           "Cannot load module ($0:$1): Unknown exception.",
 369                                           consumerName,
 370                                           libraryName));
 371 h.sterling     1.1      } catch (...)
 372                         {
 373 marek          1.21         throw Exception(
 374                                       MessageLoaderParms(
 375                                           "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 376                                           "Cannot load module ($0:$1): Unknown exception.",
 377                                           consumerName,
 378                                           libraryName));
 379 h.sterling     1.1      }
 380                     
 381 thilo.boehm    1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 382                             "Successfully loaded consumer module %s",
 383                             (const char*)libraryName.getCString()));
 384 h.sterling     1.1  
 385                         //initialize consumer
 386                         try
 387                         {
 388 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Initializing Consumer %s",
 389                                 (const char*)consumerName.getCString()));
 390 h.sterling     1.1  
 391                             consumer->initialize();
 392                     
 393 venkat.puvvada 1.32         Semaphore* semaphore = consumer->getShutdownSemaphore();
 394 h.sterling     1.1  
 395                             //start the worker thread
 396 konrad.r       1.7          if (_thread_pool->allocate_and_awaken(consumer,
 397 h.sterling     1.1                                            _worker_routine,
 398 konrad.r       1.7                                            semaphore) != PEGASUS_THREAD_OK)
 399 marek          1.21         {
 400                                 PEG_TRACE_CSTRING(
 401                                     TRC_LISTENER,
 402 marek          1.24                 Tracer::LEVEL1,
 403 marek          1.21                 "Could not allocate thread for consumer.");
 404                     
 405                                 throw Exception(
 406                                     MessageLoaderParms(
 407                                         "DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
 408                                         "Not enough threads for consumer worker routine."));
 409 konrad.r       1.7          }
 410 h.sterling     1.1  
 411 marek          1.21         //wait until the listening thread has started.
 412                             // Otherwise, there is a miniscule chance that the first event will
 413                             // be enqueued before the consumer is waiting for it and the first
 414                             // indication after loading the consumer will be lost
 415 h.sterling     1.8          consumer->waitForEventThread();
 416                     
 417 h.sterling     1.1          //load any outstanding requests
 418 kumpf          1.31         Array<IndicationDispatchEvent> outstandingIndications =
 419 marek          1.21             _deserializeOutstandingIndications(consumerName);
 420 h.sterling     1.1          if (outstandingIndications.size())
 421                             {
 422                                 //the consumer will signal itself in _loadOustandingIndications
 423                                 consumer->_loadOutstandingIndications(outstandingIndications);
 424                             }
 425                     
 426 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 427                                 "Successfully initialized consumer %s",
 428                                 (const char*)consumerName.getCString()));
 429 h.sterling     1.1  
 430                         } catch (...)
 431                         {
 432                             module->unloadModule();
 433                             consumer->reset();
 434 marek          1.21         throw Exception(
 435                                 MessageLoaderParms(
 436                                     "DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
 437                                     "Cannot initialize consumer ($0).",
 438 kumpf          1.31                 consumerName));
 439 h.sterling     1.1      }
 440                     
 441 kumpf          1.31     PEG_METHOD_EXIT();
 442 h.sterling     1.1  }
 443                     
 444                     
 445 marek          1.21 /** Returns the ConsumerModule with the given library name.
 446                      *  If it already exists, we return the one in the cache.  If it
 447 h.sterling     1.1   *  DNE, we create it and add it to the table.
 448 kumpf          1.31  * @throws Exception if we cannot successfully create and
 449 marek          1.21  *  initialize the consumer
 450 kumpf          1.31  */
 451 marek          1.21 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
 452 h.sterling     1.1  {
 453                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
 454                     
 455                         AutoMutex lock(_moduleTableMutex);
 456                     
 457                         ConsumerModule* module = 0;
 458                     
 459                         //see if consumer module is cached
 460                         if (_modules.lookup(libraryName, module))
 461                         {
 462 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 463 kumpf          1.31             "Found Consumer Module %s in Consumer Manager Cache",
 464 thilo.boehm    1.28             (const char*)libraryName.getCString()));
 465 h.sterling     1.1  
 466                         } else
 467                         {
 468 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 469                                 "Creating Consumer Provider Module %s",
 470                                 (const char*)libraryName.getCString()));
 471 h.sterling     1.1  
 472 kumpf          1.31         module = new ConsumerModule();
 473 h.sterling     1.1          _modules.insert(libraryName, module);
 474                         }
 475                     
 476                         PEG_METHOD_EXIT();
 477                         return(module);
 478                     }
 479                     
 480                     /** Returns true if there are active consumers
 481 kumpf          1.31  */
 482 h.sterling     1.1  Boolean ConsumerManager::hasActiveConsumers()
 483                     {
 484                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
 485                     
 486                         AutoMutex lock(_consumerTableMutex);
 487                         DynamicConsumer* consumer = 0;
 488                     
 489                         try
 490                         {
 491                             for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 492                             {
 493                                 consumer = i.value();
 494                     
 495 kumpf          1.31             if (consumer &&
 496                                     consumer->isLoaded() &&
 497 marek          1.21                 (consumer->getPendingIndications() > 0))
 498 h.sterling     1.1              {
 499 thilo.boehm    1.28                 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 500                                         "Found active consumer: %s" ,
 501                                         (const char*)consumer->_name.getCString()));
 502 h.sterling     1.1                  PEG_METHOD_EXIT();
 503                                     return true;
 504                                 }
 505                             }
 506                         } catch (...)
 507                         {
 508                             // Unexpected exception; do not assume that no providers are loaded
 509 thilo.boehm    1.28         PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
 510 marek          1.21             "Unexpected Exception in hasActiveConsumers.");
 511 h.sterling     1.1          PEG_METHOD_EXIT();
 512                             return true;
 513                         }
 514                     
 515                         PEG_METHOD_EXIT();
 516                         return false;
 517                     }
 518                     
 519                     /** Returns true if there are loaded consumers
 520 kumpf          1.31  */
 521 h.sterling     1.1  Boolean ConsumerManager::hasLoadedConsumers()
 522                     {
 523                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
 524                     
 525                         AutoMutex lock(_consumerTableMutex);
 526                         DynamicConsumer* consumer = 0;
 527                     
 528                         try
 529                         {
 530                             for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 531                             {
 532                                 consumer = i.value();
 533                     
 534                                 if (consumer && consumer->isLoaded())
 535                                 {
 536 thilo.boehm    1.28                 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 537                                          "Found loaded consumer: %s",
 538                                          (const char*)consumer->_name.getCString()));
 539 h.sterling     1.1                  PEG_METHOD_EXIT();
 540                                     return true;
 541                                 }
 542                             }
 543                         } catch (...)
 544                         {
 545                             // Unexpected exception; do not assume that no providers are loaded
 546 thilo.boehm    1.28         PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
 547 marek          1.21             "Unexpected Exception in hasLoadedConsumers.");
 548 h.sterling     1.1          PEG_METHOD_EXIT();
 549                             return true;
 550                         }
 551                     
 552                         PEG_METHOD_EXIT();
 553                         return false;
 554                     }
 555                     
 556                     
 557                     /** Shutting down a consumer consists of four major steps:
 558 kumpf          1.31  * 1) Send the shutdown signal.  This causes the worker routine to break out
 559 marek          1.21  *    of the loop and exit.
 560                      * 2) Wait for the worker thread to end.  This may take a while if it's
 561                      *    processing an indication.  This is optional in a shutdown scenario.
 562                      *    If the listener is shutdown with a -f force, the listener
 563                      *    will not wait for the consumer to finish before shutting down.
 564                      *    Note that a normal shutdown only allows the current consumer indication
 565                      *    to finish.  All other queued indications are serialized to a log and
 566 h.sterling     1.1   *    are sent when the consumer is reoaded.
 567                      * 3) Terminate the consumer provider interface.
 568 marek          1.21  * 4) Decrement the module refcount (the module will automatically unload when
 569                      *    it's refcount == 0)
 570 kumpf          1.31  *
 571 marek          1.21  * In a scenario where more multiple consumers are loaded, the shutdown signal
 572                      * should be sent to all of the consumers so the threads can finish
 573                      * simultaneously.
 574 kumpf          1.31  *
 575 marek          1.21  * ATTN: Should the normal shutdown wait for everything in the queue to be
 576                      * processed?  Just new indications to be processed?  I am not inclined to this
 577                      * solution since it could take a LOT of time.  By serializing and deserialing
 578 kumpf          1.31  * indications between shutdown and startup, I feel like we do not need to
 579 marek          1.21  * process ALL queued indications on shutdown.
 580 kumpf          1.31  */
 581 h.sterling     1.1  
 582                     /** Unloads all consumers.
 583 kumpf          1.31  */
 584 h.sterling     1.1  void ConsumerManager::unloadAllConsumers()
 585                     {
 586                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
 587                     
 588                         AutoMutex lock(_consumerTableMutex);
 589                     
 590                         if (!_consumers.size())
 591                         {
 592 marek          1.21         PEG_TRACE_CSTRING(
 593                                 TRC_LISTENER,
 594                                 Tracer::LEVEL4,
 595                                 "There are no consumers to unload.");
 596 h.sterling     1.1          PEG_METHOD_EXIT();
 597                             return;
 598                         }
 599                     
 600                         if (!_forceShutdown)
 601                         {
 602 marek          1.21         // wait until all the consumers have finished processing the events in
 603                             // their queue
 604                             // ATTN: Should this have a timeout even though it's a force??
 605 h.sterling     1.1          while (hasActiveConsumers())
 606                             {
 607 mike           1.15             Threads::sleep(500);
 608 h.sterling     1.1          }
 609                         }
 610                     
 611                         Array<DynamicConsumer*> loadedConsumers;
 612                     
 613                         ConsumerTable::Iterator i = _consumers.start();
 614                         DynamicConsumer* consumer = 0;
 615                     
 616                         for (; i!=0; i++)
 617                         {
 618                             consumer = i.value();
 619                             if (consumer && consumer->isLoaded())
 620                             {
 621                                 loadedConsumers.append(consumer);
 622                             }
 623                         }
 624                     
 625                         if (loadedConsumers.size())
 626                         {
 627                             try
 628                             {
 629 h.sterling     1.1              _unloadConsumers(loadedConsumers);
 630                     
 631 kumpf          1.16         } catch (Exception&)
 632 h.sterling     1.1          {
 633 marek          1.21             PEG_TRACE_CSTRING(
 634                                     TRC_LISTENER,
 635 marek          1.25                 Tracer::LEVEL2,
 636 marek          1.21                 "Error unloading consumers.");
 637 h.sterling     1.1          }
 638                         } else
 639                         {
 640 marek          1.21         PEG_TRACE_CSTRING(
 641                                 TRC_LISTENER,
 642                                 Tracer::LEVEL4,
 643                                 "There are no consumers to unload.");
 644 h.sterling     1.1      }
 645                     
 646                         PEG_METHOD_EXIT();
 647                     }
 648                     
 649                     /** Unloads idle consumers.
 650 kumpf          1.31  */
 651 h.sterling     1.1  void ConsumerManager::unloadIdleConsumers()
 652                     {
 653                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
 654                     
 655                         AutoMutex lock(_consumerTableMutex);
 656                     
 657                         if (!_consumers.size())
 658                         {
 659 marek          1.21         PEG_TRACE_CSTRING(
 660                                 TRC_LISTENER,
 661                                 Tracer::LEVEL4,
 662                                 "There are no consumers to unload.");
 663 h.sterling     1.1          PEG_METHOD_EXIT();
 664                             return;
 665                         }
 666                     
 667                         Array<DynamicConsumer*> loadedConsumers;
 668                     
 669                         ConsumerTable::Iterator i = _consumers.start();
 670                         DynamicConsumer* consumer = 0;
 671                     
 672                         for (; i!=0; i++)
 673                         {
 674                             consumer = i.value();
 675                             if (consumer && consumer->isLoaded() && consumer->isIdle())
 676                             {
 677                                 loadedConsumers.append(consumer);
 678                             }
 679                         }
 680                     
 681                         if (loadedConsumers.size())
 682                         {
 683                             try
 684 h.sterling     1.1          {
 685                                 _unloadConsumers(loadedConsumers);
 686                     
 687 kumpf          1.16         } catch (Exception&)
 688 h.sterling     1.1          {
 689 marek          1.21             PEG_TRACE_CSTRING(
 690                                     TRC_LISTENER,
 691 marek          1.25                 Tracer::LEVEL2,
 692 marek          1.21                 "Error unloading consumers.");
 693 h.sterling     1.1          }
 694                         } else
 695                         {
 696 marek          1.21         PEG_TRACE_CSTRING(
 697                                 TRC_LISTENER,
 698                                 Tracer::LEVEL4,
 699                                 "There are no consumers to unload.");
 700 h.sterling     1.1      }
 701                     
 702                         PEG_METHOD_EXIT();
 703                     }
 704                     
 705                     /** Unloads a single consumer.
 706 kumpf          1.31  */
 707 h.sterling     1.1  void ConsumerManager::unloadConsumer(const String& consumerName)
 708                     {
 709                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
 710                     
 711                         AutoMutex lock(_consumerTableMutex);
 712                     
 713                         DynamicConsumer* consumer = 0;
 714                     
 715                         //check whether the consumer exists
 716                         if (!_consumers.lookup(consumerName, consumer))
 717                         {
 718 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
 719                                 "Error: cannot unload consumer, unknown consumer %s",
 720                                 (const char*)consumerName.getCString()));
 721 h.sterling     1.1          return;
 722                         }
 723                     
 724                         //check whether the consumer is loaded
 725                         if (consumer && consumer->isLoaded())  //ATTN: forceShutdown?
 726                         {
 727                             //unload the consumer
 728                             Array<DynamicConsumer*> loadedConsumers;
 729                             loadedConsumers.append(consumer);
 730                     
 731                             try
 732                             {
 733                                 _unloadConsumers(loadedConsumers);
 734                     
 735 kumpf          1.16         } catch (Exception&)
 736 h.sterling     1.1          {
 737 thilo.boehm    1.28             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
 738 marek          1.21                 "Error unloading consumers.");
 739 h.sterling     1.1          }
 740                     
 741                         } else
 742                         {
 743 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
 744                                 "Error: cannot unload the not loaded consumer %s",
 745                                 (const char*)consumerName.getCString()));
 746 h.sterling     1.1      }
 747                     
 748                         PEG_METHOD_EXIT();
 749                     }
 750                     
 751                     /** Unloads the consumers in the given array.
 752                      *  The consumerTable mutex MUST be locked prior to entering this method.
 753 kumpf          1.31  */
 754 marek          1.21 void ConsumerManager::_unloadConsumers(
 755                         Array<DynamicConsumer*> consumersToUnload)
 756 h.sterling     1.1  {
 757                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
 758                     
 759                         //tell consumers to shutdown
 760                         for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 761                         {
 762                             consumersToUnload[i]->sendShutdownSignal();
 763                         }
 764                     
 765 thilo.boehm    1.28     PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL3,
 766 marek          1.21         "Sent shutdown signal to all consumers.");
 767                     
 768                         // wait for all the consumer worker threads to complete
 769 kumpf          1.31     // since we can only shutdown after they are all complete,
 770 marek          1.21     // it does not matter if the first, fifth, or last
 771                         // consumer takes the longest; the wait time is equal to the time it takes
 772                         // for the busiest consumer to stop processing its requests.
 773 h.sterling     1.1      for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 774                         {
 775 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,"Unloading consumer %s",
 776                                 (const char*)consumersToUnload[i]->getName().getCString()));
 777 h.sterling     1.1  
 778                             //wait for the consumer worker thread to end
 779 kumpf          1.31         Semaphore* _shutdownSemaphore =
 780 kumpf          1.26             consumersToUnload[i]->getShutdownSemaphore();
 781                             if (_shutdownSemaphore && !_shutdownSemaphore->time_wait(10000))
 782 h.sterling     1.1          {
 783 kumpf          1.26             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,
 784 marek          1.21                 "Timed out while attempting to stop consumer thread.");
 785 h.sterling     1.1          }
 786                     
 787 thilo.boehm    1.28         PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL2,"Terminating consumer.");
 788 h.sterling     1.1  
 789                             try
 790                             {
 791                                 //terminate consumer provider interface
 792                                 consumersToUnload[i]->terminate();
 793                     
 794                                 //unload consumer provider module
 795                                 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
 796                                 consumersToUnload[i]->_module->unloadModule();
 797                     
 798                                 //serialize outstanding indications
 799 marek          1.21             _serializeOutstandingIndications(
 800                                     consumersToUnload[i]->getName(),
 801                                     consumersToUnload[i]->_retrieveOutstandingIndications());
 802 h.sterling     1.1  
 803                                 //reset the consumer
 804                                 consumersToUnload[i]->reset();
 805                     
 806 thilo.boehm    1.28             PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL2,
 807 marek          1.21                 "Consumer library successfully unloaded.");
 808 h.sterling     1.1  
 809                             } catch (Exception& e)
 810                             {
 811 kumpf          1.31             PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
 812 thilo.boehm    1.28                 "Error unloading consumer: %s",
 813 kumpf          1.31                 (const char*)e.getMessage().getCString()));
 814 h.sterling     1.1              //ATTN: throw exception? log warning?
 815                             }
 816                         }
 817                     
 818                         PEG_METHOD_EXIT();
 819                     }
 820                     
 821                     /** Serializes oustanding indications to a <MyConsumer>.dat file
 822                      */
 823 marek          1.21 void ConsumerManager::_serializeOutstandingIndications(
 824 kumpf          1.31     const String& consumerName,
 825 marek          1.21     Array<IndicationDispatchEvent> indications)
 826                     {
 827                         PEG_METHOD_ENTER(
 828                             TRC_LISTENER,
 829                             "ConsumerManager::_serializeOutstandingIndications");
 830 h.sterling     1.1  
 831                         if (!indications.size())
 832                         {
 833                             PEG_METHOD_EXIT();
 834                             return;
 835                         }
 836                     
 837 marek          1.21     String fileName = FileSystem::getAbsolutePath(
 838                                               (const char*)_consumerConfigDir.getCString(),
 839                                               String(consumerName + ".dat"));
 840 thilo.boehm    1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Consumer dat file: %s",
 841                             (const char*)fileName.getCString()));
 842 h.sterling     1.1  
 843 mike           1.9      Buffer buffer;
 844 h.sterling     1.1  
 845 kumpf          1.31     // Open the log file and serialize remaining
 846 h.sterling     1.1      FILE* fileHandle = 0;
 847 kumpf          1.31     fileHandle = fopen((const char*)fileName.getCString(), "w");
 848 h.sterling     1.1  
 849                         if (!fileHandle)
 850                         {
 851 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 852                                 "Unable to open log file for %s",
 853                                 (const char*)consumerName.getCString()));
 854 h.sterling     1.1  
 855                         } else
 856                         {
 857 mike           1.20         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
 858 h.sterling     1.1                        "Serializing %d outstanding requests for %s",
 859                                           indications.size(),
 860 marek          1.18                       (const char*)consumerName.getCString()));
 861 h.sterling     1.1  
 862 marek          1.21         // we have to put the array of instances under a valid root element
 863 kumpf          1.31         // or the parser complains
 864 h.sterling     1.1          XmlWriter::append(buffer, "<IRETURNVALUE>\n");
 865                     
 866 marek          1.21         CIMInstance cimInstance;
 867 h.sterling     1.1          for (Uint32 i = 0; i < indications.size(); i++)
 868                             {
 869 marek          1.21             //set the URL string property on the serializable instance
 870                                 CIMValue cimValue(CIMTYPE_STRING, false);
 871                                 cimValue.set(indications[i].getURL());
 872                                 cimInstance = indications[i].getIndicationInstance();
 873                                 CIMProperty cimProperty(URL_PROPERTY, cimValue);
 874                                 cimInstance.addProperty(cimProperty);
 875 h.sterling     1.11 
 876                                 XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
 877 marek          1.21         }
 878 h.sterling     1.1  
 879 kumpf          1.19         XmlWriter::append(buffer, "</IRETURNVALUE>");
 880 h.sterling     1.1  
 881                             fputs((const char*)buffer.getData(), fileHandle);
 882                     
 883                             fclose(fileHandle);
 884                         }
 885                     
 886                         PEG_METHOD_EXIT();
 887                     }
 888                     
 889                     /** Reads outstanding indications from a <MyConsumer>.dat file
 890 kumpf          1.31  */
 891                     Array<IndicationDispatchEvent>
 892 marek          1.21     ConsumerManager::_deserializeOutstandingIndications(
 893                             const String& consumerName)
 894                     {
 895                         PEG_METHOD_ENTER(
 896                             TRC_LISTENER,
 897                             "ConsumerManager::_deserializeOutstandingIndications");
 898                     
 899                         String fileName = FileSystem::getAbsolutePath(
 900                                               (const char*)_consumerConfigDir.getCString(),
 901                                               String(consumerName + ".dat"));
 902 thilo.boehm    1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 903                             "Consumer dat file: %s",(const char*)fileName.getCString()));
 904 h.sterling     1.1  
 905                         Array<CIMInstance> cimInstances;
 906 marek          1.21     Array<String>      urlStrings;
 907                         Array<IndicationDispatchEvent> indications;
 908 h.sterling     1.1  
 909                         // Open the log file and serialize remaining indications
 910                         if (FileSystem::exists(fileName)  && FileSystem::canRead(fileName))
 911                         {
 912 mike           1.9          Buffer text;
 913 h.sterling     1.1          CIMInstance cimInstance;
 914 marek          1.21         CIMProperty cimProperty;
 915                             CIMValue cimValue;
 916                             String urlString;
 917 h.sterling     1.1          XmlEntry entry;
 918                     
 919                             try
 920                             {
 921 marek          1.21             //ATTN: Is this safe to use; what about CRLFs?
 922                                 FileSystem::loadFileToMemory(text, fileName);
 923 h.sterling     1.1  
 924                                 //parse the file
 925                                 XmlParser parser((char*)text.getData());
 926                                 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
 927                     
 928                                 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
 929                                 {
 930 marek          1.21                 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
 931                                     if (index != PEG_NOT_FOUND)
 932                                     {
 933 kumpf          1.31                     // get the URL string property from the serialized instance
 934 marek          1.21                     // and remove the property
 935                                         cimProperty = cimInstance.getProperty(index);
 936                                         cimValue = cimProperty.getValue();
 937                                         cimValue.get(urlString);
 938                                         cimInstance.removeProperty(index);
 939                                     }
 940 kumpf          1.31                 IndicationDispatchEvent* indicationEvent =
 941 marek          1.21                     new IndicationDispatchEvent(
 942 kumpf          1.31                         OperationContext(),
 943                                             urlString,
 944 marek          1.21                         cimInstance);
 945                     
 946 h.sterling     1.11                 indications.append(*indicationEvent);
 947 h.sterling     1.1              }
 948                     
 949                                 XmlReader::expectEndTag(parser, "IRETURNVALUE");
 950                     
 951 mike           1.20             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
 952 h.sterling     1.1                            "Consumer %s has %d outstanding indications",
 953                                               (const char*)consumerName.getCString(),
 954 marek          1.18                           indications.size()));
 955 h.sterling     1.1  
 956 kumpf          1.31             //delete the file
 957 h.sterling     1.1              FileSystem::removeFile(fileName);
 958                     
 959                             } catch (Exception& ex)
 960                             {
 961 thilo.boehm    1.28             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 962                                     "Error parsing dat file for consumer %s: %s",
 963                                     (const char*)consumerName.getCString(),
 964                                     (const char*)ex.getMessage().getCString()));
 965 h.sterling     1.1  
 966                             } catch (...)
 967                             {
 968 thilo.boehm    1.28             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 969                                     "Error parsing dat file for consumer %s: Unknown Exception",
 970                                     (const char*)consumerName.getCString()));
 971 h.sterling     1.1          }
 972                         }
 973                     
 974                         PEG_METHOD_EXIT();
 975 h.sterling     1.11     return indications;
 976 h.sterling     1.1  }
 977                     
 978                     
 979                     
 980 kumpf          1.31 /**
 981 marek          1.21  * This is the main worker thread of the consumer.  By having only one thread
 982 kumpf          1.31  * per consumer, we eliminate a ton of synchronization issues and make it easy
 983 marek          1.21  * to prevent the consumer from performing two mutually exclusive operations
 984 kumpf          1.31  * at once.  This also prevents one bad consumer from taking the entire
 985 marek          1.21  * listener down.  That being said, it is up to the programmer to write smart
 986 kumpf          1.31  * consumers, and to ensure that their actions don't deadlock
 987 marek          1.21  * the worker thread.
 988 kumpf          1.31  *
 989 marek          1.21  * If a consumer receives a lot of traffic, or it's consumeIndication() method
 990                      * takes a considerable amount of time to complete, it may make sense to make
 991                      * the consumer multi-threaded.  The individual consumer can immediately
 992                      * spawn off* new threads to handle indications, and return immediately to
 993 kumpf          1.31  * catch the next indication.  In this way, a consumer can attain
 994                      * extremely high performance.
 995                      *
 996 h.sterling     1.1   * There are three different events that can signal us:
 997                      * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
 998 marek          1.21  * 2) A shutdown signal (signalled from ConsumerManager, due to a listener
 999                      *    shutdown or an idle consumer state)
1000 h.sterling     1.1   * 3) A retry signal (signalled from this routine itself)
1001 kumpf          1.31  *
1002 marek          1.21  * The idea is that all new indications are put on the front of the queue and
1003 kumpf          1.31  * processed first.  All of the retry indications are put on the back of the
1004 marek          1.21  * queue and are only processed AFTER all new indications are sent.
1005                      * Before processing each indication, we check to see whether or not the
1006 kumpf          1.31  * shutdown signal was given.
1007                      * If so, we immediately break out of the loop, and another compenent
1008 marek          1.21  * serializes the remaining indications to a file.
1009 kumpf          1.31  *
1010                      * An indication gets retried
1011 marek          1.21  *     if the consumer throws a CIM_ERR_FAILED exception.
1012 kumpf          1.31  *
1013                      * This function makes sure it waits until the default retry lapse has passed
1014 marek          1.21  * to avoid issues with the following scenario:
1015 h.sterling     1.5   * 20 new indications come in, 10 of them are successful, 10 are not.
1016 kumpf          1.31  * We were signalled 20 times, so we will pass the time_wait 20 times.
1017 marek          1.21  * Perceivably, the process time on each indication could be minimal.
1018                      * We could potentially proceed to process the retries after a very small time
1019                      * interval since we would never hit the wait for the retry timeout.
1020 kumpf          1.31  *
1021                      */
1022                     ThreadReturnType PEGASUS_THREAD_CDECL
1023 marek          1.21     ConsumerManager::_worker_routine(void *param)
1024 h.sterling     1.1  {
1025                         PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
1026                     
1027                         DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
1028                         String name = myself->getName();
1029 mike           1.15     List<IndicationDispatchEvent,Mutex> tmpEventQueue;
1030 h.sterling     1.1  
1031 thilo.boehm    1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
1032                             "_worker_routine::entering loop for %s",
1033                             (const char*)name.getCString()));
1034 h.sterling     1.1  
1035 h.sterling     1.8      myself->_listeningSemaphore->signal();
1036                     
1037 h.sterling     1.1      while (true)
1038                         {
1039 kumpf          1.31         PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,"_worker_routine::waiting %s",
1040 thilo.boehm    1.28             (const char*)name.getCString()));
1041                     
1042 kumpf          1.26 
1043                             //wait to be signalled
1044                             if (!myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE))
1045 h.sterling     1.1          {
1046 kumpf          1.26             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4,
1047                                     "_worker_routine::Time to retry any outstanding indications.");
1048                     
1049                                 // signal the queue in the same way we would,
1050                                 // if we received a new indication
1051                                 // this allows the thread to fall into the queue processing code
1052                                 myself->_check_queue->signal();
1053                     
1054                                 continue;
1055                             }
1056                     
1057 thilo.boehm    1.28         PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1058                                 "_worker_routine::signalled %s",(const char*)name.getCString()));
1059 h.sterling     1.1  
1060 kumpf          1.26         //check whether we received the shutdown signal
1061                             if (myself->_dieNow)
1062                             {
1063 thilo.boehm    1.28             PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1064                                     "_worker_routine::shutdown received %s",
1065                                     (const char*)name.getCString()));
1066 kumpf          1.26             break;
1067                             }
1068 h.sterling     1.1  
1069 kumpf          1.26         //create a temporary queue to store failed indications
1070                             tmpEventQueue.clear();
1071 h.sterling     1.1  
1072 kumpf          1.26         //continue processing events until the queue is empty
1073                             //make sure to check for the shutdown signal before every iteration
1074                             // Note that any time during our processing of events the Listener
1075                             // may be enqueueing NEW events for us to process.
1076                             // Because we are popping off the front and new events are being
1077                             // thrown on the back if events are failing when we start
1078                             // But are succeeding by the end of the processing, events may be
1079                             // sent out of chronological order.
1080                             // However. Once we complete the current queue of events, we will
1081                             // always send old events to be retried before sending any
1082                             // new events added afterwards.
1083                             while (myself->_eventqueue.size())
1084                             {
1085                                 //check for shutdown signal
1086                                 //this only breaks us out of the queue loop, but we will
1087                                 //immediately get through the next wait from
1088                                 //the shutdown signal itself, at which time we break
1089                                 //out of the main loop
1090 h.sterling     1.1              if (myself->_dieNow)
1091                                 {
1092 thilo.boehm    1.28                 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
1093                                         "Received signal to shutdown, jumping out of queue loop %s",
1094                                         (const char*)name.getCString()));
1095 h.sterling     1.1                  break;
1096                                 }
1097                     
1098 kumpf          1.26             //pop next indication off the queue
1099                                 IndicationDispatchEvent* event = 0;
1100                                 //what exceptions/errors can this throw?
1101                                 event = myself->_eventqueue.remove_front();
1102 h.sterling     1.1  
1103 kumpf          1.26             if (!event)
1104 h.sterling     1.1              {
1105 kumpf          1.26                 //this should never happen
1106                                     continue;
1107                                 }
1108 h.sterling     1.1  
1109 thilo.boehm    1.28             PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1110                                     "_worker_routine::consumeIndication %s",
1111                                     (const char*)name.getCString()));
1112 h.sterling     1.1  
1113 kumpf          1.26             try
1114                                 {
1115                                     myself->consumeIndication(event->getContext(),
1116                                                               event->getURL(),
1117                                                               event->getIndicationInstance());
1118                     
1119 thilo.boehm    1.28                 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1120                                         "_worker_routine::processed indication successfully. %s",
1121                                         (const char*)name.getCString()));
1122 h.sterling     1.1  
1123 kumpf          1.26                 delete event;
1124                                     continue;
1125                                 }
1126                                 catch (CIMException & ce)
1127                                 {
1128                                     //check for failure
1129                                     if (ce.getCode() == CIM_ERR_FAILED)
1130 h.sterling     1.1                  {
1131 kumpf          1.31                     PEG_TRACE((TRC_LISTENER, Tracer::LEVEL2,
1132 kumpf          1.26                         "_worker_routine::consumeIndication() temporary"
1133 thilo.boehm    1.28                             " failure %s : %s",
1134                                             (const char*)name.getCString(),
1135                                             (const char*)ce.getMessage().getCString()));
1136 kumpf          1.31 
1137 kumpf          1.26                     // Here we simply determine if we should increment
1138                                         // the retry count or not.
1139                                         // We don't want to count a forced retry from a new
1140 kumpf          1.31                     // event to count as a retry.
1141 kumpf          1.26                     // We just have to do it for order's sake.
1142                                         // If the retry Lapse has lapsed on this event,
1143                                         // then increment the counter.
1144                                         if (event->getRetries() > 0)
1145                                         {
1146 kumpf          1.31                         Sint64 differenceInMicroseconds =
1147 kumpf          1.26                             CIMDateTime::getDifference(
1148                                                     event->getLastAttemptTime(),
1149                                                     CIMDateTime::getCurrentDateTime());
1150 h.sterling     1.1  
1151 kumpf          1.31                         if (differenceInMicroseconds >=
1152 kumpf          1.26                                 (DEFAULT_RETRY_LAPSE * 1000))
1153 marek          1.21                         {
1154 h.sterling     1.10                             event->increaseRetries();
1155                                             }
1156 kumpf          1.26                     }
1157                                         else
1158                                         {
1159                                             event->increaseRetries();
1160                                         }
1161 h.sterling     1.1  
1162 kumpf          1.26                     //determine if we have hit the max retry count
1163                                         if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
1164                                         {
1165                                             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
1166                                                 "Error: the maximum retry count has been "
1167                                                     "exceeded.  Removing the event from "
1168                                                     "the queue.");
1169                     
1170                                             Logger::put(
1171                                                 Logger::ERROR_LOG,
1172                                                 System::CIMLISTENER,
1173                                                 Logger::SEVERE,
1174                                                 "The following indication did not get "
1175                                                     "processed successfully: $0",
1176                                             event->getIndicationInstance().getPath().toString());
1177 h.sterling     1.1  
1178                                             delete event;
1179                                             continue;
1180                                         }
1181 kumpf          1.26                     else
1182                                         {
1183                                             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4,
1184                                                 "_worker_routine::placing failed indication "
1185                                                     "back in queue");
1186                                             tmpEventQueue.insert_back(event);
1187                                         }
1188                                     }
1189                                     else
1190 h.sterling     1.1                  {
1191 thilo.boehm    1.28                     PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
1192                                             "Error: consumeIndication() permanent failure: %s",
1193                                             (const char*)ce.getMessage().getCString()));
1194 h.sterling     1.1                      delete event;
1195                                         continue;
1196 kumpf          1.26                 }
1197                                 }
1198                                 catch (Exception & ex)
1199 h.sterling     1.1              {
1200 thilo.boehm    1.28                 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
1201                                         "Error: consumeIndication() permanent failure: %s",
1202                                         (const char*)ex.getMessage().getCString()));
1203 kumpf          1.26                 delete event;
1204                                     continue;
1205 h.sterling     1.1              }
1206 kumpf          1.26             catch (...)
1207                                 {
1208                                     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
1209                                         "Error: consumeIndication() failed: Unknown exception.");
1210                                     delete event;
1211                                     continue;
1212                                 } //end try
1213                     
1214                             } //while eventqueue
1215                     
1216                             // Copy the failed indications back to the main queue
1217                             // We now lock the queue while adding the retries on to the queue
1218                             // so that new events can't get in in front
1219                             // Of those events we are retrying. Retried events happened before
1220                             // any new events coming in.
1221                             IndicationDispatchEvent* tmpEvent = 0;
1222 kumpf          1.27         if (myself->_eventqueue.try_lock())
1223 h.sterling     1.1          {
1224 kumpf          1.27             while (tmpEventQueue.size())
1225                                 {
1226                                     tmpEvent = tmpEventQueue.remove_front();
1227                                     myself->_eventqueue.insert_back(tmpEvent);
1228                                 }
1229                     
1230                                 myself->_eventqueue.unlock();
1231                             }
1232                             else
1233                             {
1234                                 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3,
1235                                     "Failed to lock _eventqueue");
1236 kumpf          1.26         }
1237 h.sterling     1.1      } //shutdown
1238                     
1239                         PEG_METHOD_EXIT();
1240                         return 0;
1241                     }
1242                     
1243                     
1244                     PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2