(file) Return to ConsumerManager.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

   1 karl  1.12 //%2006////////////////////////////////////////////////////////////////////////
   2 h.sterling 1.1  //
   3                 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4                 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5                 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6                 // IBM Corp.; EMC Corporation, The Open Group.
   7                 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8                 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9                 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10                 // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 karl       1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  12                 // EMC Corporation; Symantec Corporation; The Open Group.
  13 h.sterling 1.1  //
  14                 // Permission is hereby granted, free of charge, to any person obtaining a copy
  15                 // of this software and associated documentation files (the "Software"), to
  16                 // deal in the Software without restriction, including without limitation the
  17                 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  18                 // sell copies of the Software, and to permit persons to whom the Software is
  19                 // furnished to do so, subject to the following conditions:
  20                 // 
  21                 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  22                 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  23                 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  24                 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  25                 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  26                 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  27                 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  28                 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29                 //
  30                 //==============================================================================
  31                 //
  32                 //%/////////////////////////////////////////////////////////////////////////////
  33                 
  34 h.sterling 1.1  #include <Pegasus/Common/Config.h>
  35                 //#include <cstdlib>
  36                 //#include <dlfcn.h>
  37                 #include <Pegasus/Common/System.h>
  38                 #include <Pegasus/Common/FileSystem.h>
  39                 #include <Pegasus/Common/Tracer.h>
  40                 #include <Pegasus/Common/Logger.h>
  41                 #include <Pegasus/Common/XmlReader.h>
  42 mike       1.15 #include <Pegasus/Common/ThreadPool.h>
  43 h.sterling 1.1  #include <Pegasus/Common/XmlParser.h>
  44                 #include <Pegasus/Common/XmlWriter.h>
  45 mike       1.14 #include <Pegasus/Common/List.h>
  46 mike       1.15 #include <Pegasus/Common/Mutex.h>
  47 h.sterling 1.1  
  48                 #include "ConsumerManager.h"
  49                 
  50                 PEGASUS_NAMESPACE_BEGIN
  51                 PEGASUS_USING_STD;
  52                 
  53 marek      1.21 //ATTN: Can we just use a properties file instead??  If we only have one 
  54                 // property, we may want to just parse it ourselves.
  55                 // We may need to add more properties, however.  Potential per consumer
  56                 // properties: unloadOk, idleTimout, retryCount, etc
  57 h.sterling 1.1  static struct OptionRow optionsTable[] =
  58                 //optionname defaultvalue rqd  type domain domainsize clname hlpmsg
  59                 {
  60 marek      1.21 {"location", "", false, Option::STRING, 0, 0,
  61                  "location", "library name for the consumer"},
  62 h.sterling 1.1  };
  63                 
  64                 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
  65                 
  66                 //retry settings
  67                 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
  68 h.sterling 1.6  static const Uint32 DEFAULT_RETRY_LAPSE = 300000;  //ms = 5 minutes
  69 h.sterling 1.1  
  70 marek      1.21 //constant for fake property that is added to instances when 
  71                 // serializing to track the full URL
  72 h.sterling 1.11 static const String URL_PROPERTY = "URLString";
  73 h.sterling 1.1  
  74                 
  75 marek      1.21 ConsumerManager::ConsumerManager(
  76                     const String& consumerDir,
  77                     const String& consumerConfigDir,
  78                     Boolean enableConsumerUnload,
  79                     Uint32 idleTimeout) : 
  80                         _consumerDir(consumerDir),
  81                         _consumerConfigDir(consumerConfigDir),
  82                         _enableConsumerUnload(enableConsumerUnload),
  83                         _idleTimeout(idleTimeout),
  84                         _forceShutdown(true)
  85 h.sterling 1.1  {
  86                     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
  87                 
  88 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
  89                          "Consumer library directory: %s",
  90                          (const char*)consumerDir.getCString()));
  91                      PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
  92                          "Consumer configuration directory: %s",
  93                          (const char*)consumerConfigDir.getCString()));
  94 h.sterling  1.1  
  95 mike        1.20     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
  96 thilo.boehm 1.28         "Consumer unload enabled %d: idle timeout %d",
  97                           enableConsumerUnload,idleTimeout));
  98 h.sterling  1.1  
  99                  
 100 h.sterling  1.8      //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
 101 h.sterling  1.6      //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
 102 h.sterling  1.1  
 103 kumpf       1.3      struct timeval deallocateWait = {15, 0};
 104                      _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
 105 h.sterling  1.1  
 106                      _init();
 107                  
 108                      PEG_METHOD_EXIT();
 109                  }
 110                  
 111                  ConsumerManager::~ConsumerManager()
 112                  {
 113                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
 114                  
 115                      unloadAllConsumers();
 116                  
 117 kumpf       1.3      delete _thread_pool;
 118 h.sterling  1.1  
 119                      ConsumerTable::Iterator i = _consumers.start();
 120                      for (; i!=0; i++)
 121                      {
 122                          DynamicConsumer* consumer = i.value();
 123                          delete consumer;
 124                      }
 125                  
 126 marek       1.18     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
 127 h.sterling  1.1  
 128                      ModuleTable::Iterator j = _modules.start();
 129                      for (;j!=0;j++)
 130                      {
 131                          ConsumerModule* module = j.value();
 132                          delete module;
 133                      }
 134                  
 135 marek       1.18     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
 136 h.sterling  1.1  
 137                      PEG_METHOD_EXIT();
 138                  }
 139                  
 140                  void ConsumerManager::_init()
 141                  {
 142                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
 143                  
 144                      //check if there are any outstanding indications
 145                      Array<String> files;
 146                      Uint32 pos;
 147                      String consumerName;
 148                  
 149                      if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
 150                      {
 151                          for (Uint32 i = 0; i < files.size(); i++)
 152                          {
 153                              pos = files[i].find(".dat");
 154                              if (pos != PEG_NOT_FOUND)
 155                              {
 156                                  consumerName = files[i].subString(0, pos);
 157 h.sterling  1.1  
 158                                  try
 159                                  {
 160 thilo.boehm 1.28                     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 161                                          "Attempting to load indication for '%s'",
 162                                          (const char*)consumerName.getCString()));
 163 h.sterling  1.1                      getConsumer(consumerName);
 164                  
 165                                  } catch (...)
 166                                  {
 167 thilo.boehm 1.28                     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 168                                          "Cannot load consumer from file '%s'",
 169                                          (const char*)files[i].getCString()));
 170 h.sterling  1.1                  }
 171                              }
 172                          }
 173                      }
 174                  
 175                      PEG_METHOD_EXIT();
 176                  }
 177                  
 178                  String ConsumerManager::getConsumerDir()
 179                  {
 180                      return _consumerDir;
 181                  }
 182                  
 183                  String ConsumerManager::getConsumerConfigDir()
 184                  {
 185                      return _consumerConfigDir;
 186                  }
 187                  
 188                  Boolean ConsumerManager::getEnableConsumerUnload()
 189                  {
 190                      return _enableConsumerUnload;
 191 h.sterling  1.1  }
 192                  
 193                  Uint32 ConsumerManager::getIdleTimeout()
 194                  {
 195                      return _idleTimeout;
 196                  }
 197                  
 198 marek       1.21 /** Retrieves the library name associated with the consumer name.
 199                   *  By default, the library name
 200                   * is the same as the consumer name.  However, you may specify a different 
 201                   * library name in a consumer configuration file.  This file must be named 
 202                   *  "MyConsumer.txt" and contain the following: location="libraryName"
 203                   *
 204                   * The config file is optional and is generally only needed in cases where 
 205                   * there are strict requirements on library naming.
 206                   *
 207                   * It is the responsibility of the caller to catch any exceptions 
 208                   * thrown by this method.
 209                   */
 210 h.sterling  1.1  String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
 211                  {
 212                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
 213                  
 214                      //default library name is consumer name
 215                      String libraryName = consumerName;
 216                  
 217 marek       1.21     // check whether an alternative library name was specified in an optional
 218                      // consumer config file
 219                      String configFile = FileSystem::getAbsolutePath(
 220                                              (const char*)_consumerConfigDir.getCString(),
 221                                              String(consumerName + ".conf"));
 222 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
 223                          "Looking for config file %s",(const char*)configFile.getCString()));
 224 h.sterling  1.1  
 225                      if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
 226                      {
 227 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 228                              "Found config file for consumer %s",
 229                              (const char*)consumerName.getCString()));
 230 h.sterling  1.1  
 231                          try
 232                          {
 233 marek       1.21             //Bugzilla 3765 - Change this to use a member var 
 234                              // when OptionManager has a reset option
 235 h.sterling  1.8              OptionManager _optionMgr;
 236 marek       1.21             //comment the following line out later
 237                              _optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
 238 h.sterling  1.1              _optionMgr.mergeFile(configFile);
 239                              _optionMgr.checkRequiredOptions();
 240                  
 241 marek       1.21             if (!_optionMgr.lookupValue("location", libraryName) || 
 242                                  (libraryName == String::EMPTY))
 243 h.sterling  1.1              {
 244 thilo.boehm 1.28                 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
 245 marek       1.21                     "Warning: Using default library name since none was "
 246 thilo.boehm 1.28                     "specified in %s",(const char*)configFile.getCString())); 
 247 h.sterling  1.1                  libraryName = consumerName;
 248                              }
 249                  
 250                          } catch (Exception & ex)
 251                          {
 252 marek       1.21             throw Exception(
 253                                        MessageLoaderParms(
 254                                            "DynListener.ConsumerManager.INVALID_CONFIG_FILE",
 255                                            "Error reading $0: $1.",
 256                                            configFile,
 257                                            ex.getMessage()));
 258 h.sterling  1.1          }
 259                      } else
 260                      {
 261 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 262                              "No config file exists for %s",
 263                              (const char*)consumerName.getCString()));
 264 h.sterling  1.1      }
 265                  
 266 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 267                          "The library name for %s is %s",
 268                          (const char*)consumerName.getCString(),
 269                          (const char*)libraryName.getCString()));
 270 h.sterling  1.1  
 271                      PEG_METHOD_EXIT();
 272                      return libraryName;
 273                  }
 274                  
 275 marek       1.21 /** Returns the DynamicConsumer for the consumerName.  If it already exists,
 276                   *  we return the one in the cache.  If it
 277 h.sterling  1.1   *  DNE, we create it and initialize it, and add it to the table.
 278 marek       1.21  * @throws Exception if we cannot successfully create and
 279                   *  initialize the consumer
 280 h.sterling  1.1   */ 
 281                  DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
 282                  {
 283                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
 284                  
 285                      DynamicConsumer* consumer = 0;
 286                      CIMIndicationConsumerProvider* consumerRef = 0;
 287                      Boolean cached = false;
 288                      Boolean entryExists = false;
 289                  
 290                      AutoMutex lock(_consumerTableMutex);
 291                  
 292                      if (_consumers.lookup(consumerName, consumer))
 293                      {
 294                          //why isn't this working??
 295                          entryExists = true;
 296                  
 297                          if (consumer && consumer->isLoaded())
 298                          {
 299 thilo.boehm 1.28             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 300                                  "Consumer exists in the cache and is already loaded: %s",
 301                                  (const char*)consumerName.getCString()));
 302 h.sterling  1.1              cached = true;
 303                          }
 304                      } else
 305                      {
 306 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
 307                              "Consumer not found in cache, creating %s",
 308                              (const char*)consumerName.getCString()));
 309 h.sterling  1.1          consumer = new DynamicConsumer(consumerName);
 310                          //ATTN: The above is a memory leak if _initConsumer throws an exception
 311                          //need to delete it in that case
 312                      }
 313                  
 314                      if (!cached)
 315                      {
 316                          _initConsumer(consumerName, consumer);
 317                  
 318                          if (!entryExists)
 319                          {
 320                              _consumers.insert(consumerName, consumer);
 321                          }
 322                      }
 323                  
 324                      consumer->updateIdleTimer();
 325                  
 326                      PEG_METHOD_EXIT();
 327                      return consumer;
 328                  }
 329                  
 330 h.sterling  1.1  /** Initializes a DynamicConsumer.
 331 marek       1.21  * Caller assumes responsibility for mutexing the operation as well as 
 332                   * ensuring the consumer does not already exist.
 333 h.sterling  1.1   * @throws Exception if the consumer cannot be initialized
 334                   */
 335 marek       1.21 void ConsumerManager::_initConsumer(
 336                           const String& consumerName,
 337                           DynamicConsumer* consumer)
 338 h.sterling  1.1  {
 339                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
 340                  
 341                      CIMIndicationConsumerProvider* base = 0;
 342                      ConsumerModule* module = 0;
 343                  
 344 marek       1.21     // lookup provider module in cache (if it exists, it returns 
 345                      // the cached module, otherwise it creates and returns a new one)
 346 h.sterling  1.1      String libraryName = _getConsumerLibraryName(consumerName);
 347                      module = _lookupModule(libraryName);
 348                  
 349                      //build library path
 350 marek       1.21     String libraryPath = FileSystem::getAbsolutePath(
 351                                               (const char*)_consumerDir.getCString(),
 352                                               FileSystem::buildLibraryFileName(libraryName));
 353 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Loading library: %s",
 354                          (const char*)libraryPath.getCString()));
 355 h.sterling  1.1  
 356                      //load module
 357                      try
 358                      {
 359                          base = module->load(consumerName, libraryPath);
 360                          consumer->set(module, base);
 361                  
 362                      } catch (Exception& ex)
 363                      {
 364 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1, 
 365                              "Error loading consumer module: %s",
 366                              (const char*)ex.getMessage().getCString()));
 367 marek       1.21 
 368                          throw Exception(
 369                                    MessageLoaderParms(
 370                                        "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 371                                        "Cannot load module ($0:$1): Unknown exception.",
 372                                        consumerName,
 373                                        libraryName));
 374 h.sterling  1.1      } catch (...)
 375                      {
 376 marek       1.21         throw Exception(
 377                                    MessageLoaderParms(
 378                                        "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 379                                        "Cannot load module ($0:$1): Unknown exception.",
 380                                        consumerName,
 381                                        libraryName));
 382 h.sterling  1.1      }
 383                  
 384 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 385                          "Successfully loaded consumer module %s",
 386                          (const char*)libraryName.getCString()));
 387 h.sterling  1.1  
 388                      //initialize consumer
 389                      try
 390                      {
 391 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Initializing Consumer %s",
 392                              (const char*)consumerName.getCString()));
 393 h.sterling  1.1  
 394                          consumer->initialize();
 395                  
 396                          //ATTN: need to change this
 397                          Semaphore* semaphore = new Semaphore(0);  //blocking
 398                  
 399                          consumer->setShutdownSemaphore(semaphore);
 400                  
 401                          //start the worker thread
 402 konrad.r    1.7          if (_thread_pool->allocate_and_awaken(consumer,
 403 h.sterling  1.1                                            _worker_routine,
 404 konrad.r    1.7                                            semaphore) != PEGASUS_THREAD_OK)
 405 marek       1.21         {
 406                              PEG_TRACE_CSTRING(
 407                                  TRC_LISTENER,
 408 marek       1.24                 Tracer::LEVEL1,
 409 marek       1.21                 "Could not allocate thread for consumer.");
 410                  
 411                              consumer->setShutdownSemaphore(0);
 412                              delete semaphore;
 413                              throw Exception(
 414                                  MessageLoaderParms(
 415                                      "DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
 416                                      "Not enough threads for consumer worker routine."));
 417 konrad.r    1.7          }
 418 h.sterling  1.1  
 419 marek       1.21         //wait until the listening thread has started.
 420                          // Otherwise, there is a miniscule chance that the first event will
 421                          // be enqueued before the consumer is waiting for it and the first
 422                          // indication after loading the consumer will be lost
 423 h.sterling  1.8          consumer->waitForEventThread();
 424                  
 425 h.sterling  1.1          //load any outstanding requests
 426 marek       1.21         Array<IndicationDispatchEvent> outstandingIndications = 
 427                              _deserializeOutstandingIndications(consumerName);
 428 h.sterling  1.1          if (outstandingIndications.size())
 429                          {
 430                              //the consumer will signal itself in _loadOustandingIndications
 431                              consumer->_loadOutstandingIndications(outstandingIndications);
 432                          }
 433                  
 434 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 435                              "Successfully initialized consumer %s",
 436                              (const char*)consumerName.getCString()));
 437 h.sterling  1.1  
 438                      } catch (...)
 439                      {
 440                          module->unloadModule();
 441                          consumer->reset();
 442 marek       1.21         throw Exception(
 443                              MessageLoaderParms(
 444                                  "DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
 445                                  "Cannot initialize consumer ($0).",
 446                                  consumerName));        
 447 h.sterling  1.1      }
 448                  
 449                      PEG_METHOD_EXIT();    
 450                  }
 451                  
 452                  
 453 marek       1.21 /** Returns the ConsumerModule with the given library name.
 454                   *  If it already exists, we return the one in the cache.  If it
 455 h.sterling  1.1   *  DNE, we create it and add it to the table.
 456 marek       1.21  * @throws Exception if we cannot successfully create and 
 457                   *  initialize the consumer
 458 h.sterling  1.1   */ 
 459 marek       1.21 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
 460 h.sterling  1.1  {
 461                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
 462                  
 463                      AutoMutex lock(_moduleTableMutex);
 464                  
 465                      ConsumerModule* module = 0;
 466                  
 467                      //see if consumer module is cached
 468                      if (_modules.lookup(libraryName, module))
 469                      {
 470 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 471                              "Found Consumer Module %s in Consumer Manager Cache", 
 472                              (const char*)libraryName.getCString()));
 473 h.sterling  1.1  
 474                      } else
 475                      {
 476 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 477                              "Creating Consumer Provider Module %s",
 478                              (const char*)libraryName.getCString()));
 479 h.sterling  1.1  
 480                          module = new ConsumerModule(); 
 481                          _modules.insert(libraryName, module);
 482                      }
 483                  
 484                      PEG_METHOD_EXIT();
 485                      return(module);
 486                  }
 487                  
 488                  /** Returns true if there are active consumers
 489                   */ 
 490                  Boolean ConsumerManager::hasActiveConsumers()
 491                  {
 492                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
 493                  
 494                      AutoMutex lock(_consumerTableMutex);
 495                      DynamicConsumer* consumer = 0;
 496                  
 497                      try
 498                      {
 499                          for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 500 h.sterling  1.1          {
 501                              consumer = i.value();
 502                  
 503 marek       1.21             if (consumer && 
 504                                  consumer->isLoaded() && 
 505                                  (consumer->getPendingIndications() > 0))
 506 h.sterling  1.1              {
 507 thilo.boehm 1.28                 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 508                                      "Found active consumer: %s" ,
 509                                      (const char*)consumer->_name.getCString()));
 510 h.sterling  1.1                  PEG_METHOD_EXIT();
 511                                  return true;
 512                              }
 513                          }
 514                      } catch (...)
 515                      {
 516                          // Unexpected exception; do not assume that no providers are loaded
 517 thilo.boehm 1.28         PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
 518 marek       1.21             "Unexpected Exception in hasActiveConsumers.");
 519 h.sterling  1.1          PEG_METHOD_EXIT();
 520                          return true;
 521                      }
 522                  
 523                      PEG_METHOD_EXIT();
 524                      return false;
 525                  }
 526                  
 527                  /** Returns true if there are loaded consumers
 528                   */ 
 529                  Boolean ConsumerManager::hasLoadedConsumers()
 530                  {
 531                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
 532                  
 533                      AutoMutex lock(_consumerTableMutex);
 534                      DynamicConsumer* consumer = 0;
 535                  
 536                      try
 537                      {
 538                          for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 539                          {
 540 h.sterling  1.1              consumer = i.value();
 541                  
 542                              if (consumer && consumer->isLoaded())
 543                              {
 544 thilo.boehm 1.28                 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 545                                       "Found loaded consumer: %s",
 546                                       (const char*)consumer->_name.getCString()));
 547 h.sterling  1.1                  PEG_METHOD_EXIT();
 548                                  return true;
 549                              }
 550                          }
 551                      } catch (...)
 552                      {
 553                          // Unexpected exception; do not assume that no providers are loaded
 554 thilo.boehm 1.28         PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
 555 marek       1.21             "Unexpected Exception in hasLoadedConsumers.");
 556 h.sterling  1.1          PEG_METHOD_EXIT();
 557                          return true;
 558                      }
 559                  
 560                      PEG_METHOD_EXIT();
 561                      return false;
 562                  }
 563                  
 564                  
 565                  /** Shutting down a consumer consists of four major steps:
 566 marek       1.21  * 1) Send the shutdown signal.  This causes the worker routine to break out 
 567                   *    of the loop and exit.
 568                   * 2) Wait for the worker thread to end.  This may take a while if it's
 569                   *    processing an indication.  This is optional in a shutdown scenario.
 570                   *    If the listener is shutdown with a -f force, the listener
 571                   *    will not wait for the consumer to finish before shutting down.
 572                   *    Note that a normal shutdown only allows the current consumer indication
 573                   *    to finish.  All other queued indications are serialized to a log and
 574 h.sterling  1.1   *    are sent when the consumer is reoaded.
 575                   * 3) Terminate the consumer provider interface.
 576 marek       1.21  * 4) Decrement the module refcount (the module will automatically unload when
 577                   *    it's refcount == 0)
 578 h.sterling  1.1   * 
 579 marek       1.21  * In a scenario where more multiple consumers are loaded, the shutdown signal
 580                   * should be sent to all of the consumers so the threads can finish
 581                   * simultaneously.
 582 h.sterling  1.1   * 
 583 marek       1.21  * ATTN: Should the normal shutdown wait for everything in the queue to be
 584                   * processed?  Just new indications to be processed?  I am not inclined to this
 585                   * solution since it could take a LOT of time.  By serializing and deserialing
 586                   * indications between shutdown and startup, I feel like we do not need to 
 587                   * process ALL queued indications on shutdown.
 588 h.sterling  1.1   */ 
 589                  
 590                  /** Unloads all consumers.
 591                   */ 
 592                  void ConsumerManager::unloadAllConsumers()
 593                  {
 594                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
 595                  
 596                      AutoMutex lock(_consumerTableMutex);
 597                  
 598                      if (!_consumers.size())
 599                      {
 600 marek       1.21         PEG_TRACE_CSTRING(
 601                              TRC_LISTENER,
 602                              Tracer::LEVEL4,
 603                              "There are no consumers to unload.");
 604 h.sterling  1.1          PEG_METHOD_EXIT();
 605                          return;
 606                      }
 607                  
 608                      if (!_forceShutdown)
 609                      {
 610 marek       1.21         // wait until all the consumers have finished processing the events in
 611                          // their queue
 612                          // ATTN: Should this have a timeout even though it's a force??
 613 h.sterling  1.1          while (hasActiveConsumers())
 614                          {
 615 mike        1.15             Threads::sleep(500);
 616 h.sterling  1.1          }
 617                      }
 618                  
 619                      Array<DynamicConsumer*> loadedConsumers;
 620                  
 621                      ConsumerTable::Iterator i = _consumers.start();
 622                      DynamicConsumer* consumer = 0;
 623                  
 624                      for (; i!=0; i++)
 625                      {
 626                          consumer = i.value();
 627                          if (consumer && consumer->isLoaded())
 628                          {
 629                              loadedConsumers.append(consumer);
 630                          }
 631                      }
 632                  
 633                      if (loadedConsumers.size())
 634                      {
 635                          try
 636                          {
 637 h.sterling  1.1              _unloadConsumers(loadedConsumers);
 638                  
 639 kumpf       1.16         } catch (Exception&)
 640 h.sterling  1.1          {
 641 marek       1.21             PEG_TRACE_CSTRING(
 642                                  TRC_LISTENER,
 643 marek       1.25                 Tracer::LEVEL2,
 644 marek       1.21                 "Error unloading consumers.");
 645 h.sterling  1.1          }
 646                      } else
 647                      {
 648 marek       1.21         PEG_TRACE_CSTRING(
 649                              TRC_LISTENER,
 650                              Tracer::LEVEL4,
 651                              "There are no consumers to unload.");
 652 h.sterling  1.1      }
 653                  
 654                      PEG_METHOD_EXIT();
 655                  }
 656                  
 657                  /** Unloads idle consumers.
 658                   */ 
 659                  void ConsumerManager::unloadIdleConsumers()
 660                  {
 661                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
 662                  
 663                      AutoMutex lock(_consumerTableMutex);
 664                  
 665                      if (!_consumers.size())
 666                      {
 667 marek       1.21         PEG_TRACE_CSTRING(
 668                              TRC_LISTENER,
 669                              Tracer::LEVEL4,
 670                              "There are no consumers to unload.");
 671 h.sterling  1.1          PEG_METHOD_EXIT();
 672                          return;
 673                      }
 674                  
 675                      Array<DynamicConsumer*> loadedConsumers;
 676                  
 677                      ConsumerTable::Iterator i = _consumers.start();
 678                      DynamicConsumer* consumer = 0;
 679                  
 680                      for (; i!=0; i++)
 681                      {
 682                          consumer = i.value();
 683                          if (consumer && consumer->isLoaded() && consumer->isIdle())
 684                          {
 685                              loadedConsumers.append(consumer);
 686                          }
 687                      }
 688                  
 689                      if (loadedConsumers.size())
 690                      {
 691                          try
 692 h.sterling  1.1          {
 693                              _unloadConsumers(loadedConsumers);
 694                  
 695 kumpf       1.16         } catch (Exception&)
 696 h.sterling  1.1          {
 697 marek       1.21             PEG_TRACE_CSTRING(
 698                                  TRC_LISTENER,
 699 marek       1.25                 Tracer::LEVEL2,
 700 marek       1.21                 "Error unloading consumers.");
 701 h.sterling  1.1          }
 702                      } else
 703                      {
 704 marek       1.21         PEG_TRACE_CSTRING(
 705                              TRC_LISTENER,
 706                              Tracer::LEVEL4,
 707                              "There are no consumers to unload.");
 708 h.sterling  1.1      }
 709                  
 710                      PEG_METHOD_EXIT();
 711                  }
 712                  
 713                  /** Unloads a single consumer.
 714                   */ 
 715                  void ConsumerManager::unloadConsumer(const String& consumerName)
 716                  {
 717                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
 718                  
 719                      AutoMutex lock(_consumerTableMutex);
 720                  
 721                      DynamicConsumer* consumer = 0;
 722                  
 723                      //check whether the consumer exists
 724                      if (!_consumers.lookup(consumerName, consumer))
 725                      {
 726 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
 727                              "Error: cannot unload consumer, unknown consumer %s",
 728                              (const char*)consumerName.getCString()));
 729 h.sterling  1.1          return;
 730                      }
 731                  
 732                      //check whether the consumer is loaded
 733                      if (consumer && consumer->isLoaded())  //ATTN: forceShutdown?
 734                      {
 735                          //unload the consumer
 736                          Array<DynamicConsumer*> loadedConsumers;
 737                          loadedConsumers.append(consumer);
 738                  
 739                          try
 740                          {
 741                              _unloadConsumers(loadedConsumers);
 742                  
 743 kumpf       1.16         } catch (Exception&)
 744 h.sterling  1.1          {
 745 thilo.boehm 1.28             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
 746 marek       1.21                 "Error unloading consumers.");
 747 h.sterling  1.1          }
 748                  
 749                      } else
 750                      {
 751 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
 752                              "Error: cannot unload the not loaded consumer %s",
 753                              (const char*)consumerName.getCString()));
 754 h.sterling  1.1      }
 755                  
 756                      PEG_METHOD_EXIT();
 757                  }
 758                  
 759                  /** Unloads the consumers in the given array.
 760                   *  The consumerTable mutex MUST be locked prior to entering this method.
 761                   */ 
 762 marek       1.21 void ConsumerManager::_unloadConsumers(
 763                      Array<DynamicConsumer*> consumersToUnload)
 764 h.sterling  1.1  {
 765                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
 766                  
 767                      //tell consumers to shutdown
 768                      for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 769                      {
 770                          consumersToUnload[i]->sendShutdownSignal();
 771                      }
 772                  
 773 thilo.boehm 1.28     PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL3,
 774 marek       1.21         "Sent shutdown signal to all consumers.");
 775                  
 776                      // wait for all the consumer worker threads to complete
 777                      // since we can only shutdown after they are all complete, 
 778                      // it does not matter if the first, fifth, or last
 779                      // consumer takes the longest; the wait time is equal to the time it takes
 780                      // for the busiest consumer to stop processing its requests.
 781 h.sterling  1.1      for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 782                      {
 783 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,"Unloading consumer %s",
 784                              (const char*)consumersToUnload[i]->getName().getCString()));
 785 h.sterling  1.1  
 786                          //wait for the consumer worker thread to end
 787 kumpf       1.26         Semaphore* _shutdownSemaphore = 
 788                              consumersToUnload[i]->getShutdownSemaphore();
 789                          if (_shutdownSemaphore && !_shutdownSemaphore->time_wait(10000))
 790 h.sterling  1.1          {
 791 kumpf       1.26             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,
 792 marek       1.21                 "Timed out while attempting to stop consumer thread.");
 793 h.sterling  1.1          }
 794                  
 795 thilo.boehm 1.28         PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL2,"Terminating consumer.");
 796 h.sterling  1.1  
 797                          try
 798                          {
 799                              //terminate consumer provider interface
 800                              consumersToUnload[i]->terminate();
 801                  
 802                              //unload consumer provider module
 803                              PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
 804                              consumersToUnload[i]->_module->unloadModule();
 805                  
 806                              //serialize outstanding indications
 807 marek       1.21             _serializeOutstandingIndications(
 808                                  consumersToUnload[i]->getName(),
 809                                  consumersToUnload[i]->_retrieveOutstandingIndications());
 810 h.sterling  1.1  
 811                              //reset the consumer
 812                              consumersToUnload[i]->reset();
 813                  
 814 thilo.boehm 1.28             PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL2,
 815 marek       1.21                 "Consumer library successfully unloaded.");
 816 h.sterling  1.1  
 817                          } catch (Exception& e)
 818                          {
 819 thilo.boehm 1.28             PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1, 
 820                                  "Error unloading consumer: %s",
 821                                  (const char*)e.getMessage().getCString())); 
 822 h.sterling  1.1              //ATTN: throw exception? log warning?
 823                          }
 824                      }
 825                  
 826                      PEG_METHOD_EXIT();
 827                  }
 828                  
 829                  /** Serializes oustanding indications to a <MyConsumer>.dat file
 830                   */
 831 marek       1.21 void ConsumerManager::_serializeOutstandingIndications(
 832                      const String& consumerName, 
 833                      Array<IndicationDispatchEvent> indications)
 834                  {
 835                      PEG_METHOD_ENTER(
 836                          TRC_LISTENER,
 837                          "ConsumerManager::_serializeOutstandingIndications");
 838 h.sterling  1.1  
 839                      if (!indications.size())
 840                      {
 841                          PEG_METHOD_EXIT();
 842                          return;
 843                      }
 844                  
 845 marek       1.21     String fileName = FileSystem::getAbsolutePath(
 846                                            (const char*)_consumerConfigDir.getCString(),
 847                                            String(consumerName + ".dat"));
 848 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Consumer dat file: %s",
 849                          (const char*)fileName.getCString()));
 850 h.sterling  1.1  
 851 mike        1.9      Buffer buffer;
 852 h.sterling  1.1  
 853                      // Open the log file and serialize remaining 
 854                      FILE* fileHandle = 0;
 855                      fileHandle = fopen((const char*)fileName.getCString(), "w"); 
 856                  
 857                      if (!fileHandle)
 858                      {
 859 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 860                              "Unable to open log file for %s",
 861                              (const char*)consumerName.getCString()));
 862 h.sterling  1.1  
 863                      } else
 864                      {
 865 mike        1.20         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
 866 h.sterling  1.1                        "Serializing %d outstanding requests for %s",
 867                                        indications.size(),
 868 marek       1.18                       (const char*)consumerName.getCString()));
 869 h.sterling  1.1  
 870 marek       1.21         // we have to put the array of instances under a valid root element
 871                          // or the parser complains 
 872 h.sterling  1.1          XmlWriter::append(buffer, "<IRETURNVALUE>\n");
 873                  
 874 marek       1.21         CIMInstance cimInstance;
 875 h.sterling  1.1          for (Uint32 i = 0; i < indications.size(); i++)
 876                          {
 877 marek       1.21             //set the URL string property on the serializable instance
 878                              CIMValue cimValue(CIMTYPE_STRING, false);
 879                              cimValue.set(indications[i].getURL());
 880                              cimInstance = indications[i].getIndicationInstance();
 881                              CIMProperty cimProperty(URL_PROPERTY, cimValue);
 882                              cimInstance.addProperty(cimProperty);
 883 h.sterling  1.11 
 884                              XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
 885 marek       1.21         }
 886 h.sterling  1.1  
 887 kumpf       1.19         XmlWriter::append(buffer, "</IRETURNVALUE>");
 888 h.sterling  1.1  
 889                          fputs((const char*)buffer.getData(), fileHandle);
 890                  
 891                          fclose(fileHandle);
 892                      }
 893                  
 894                      PEG_METHOD_EXIT();
 895                  }
 896                  
 897                  /** Reads outstanding indications from a <MyConsumer>.dat file
 898                   */ 
 899 marek       1.21 Array<IndicationDispatchEvent> 
 900                      ConsumerManager::_deserializeOutstandingIndications(
 901                          const String& consumerName)
 902                  {
 903                      PEG_METHOD_ENTER(
 904                          TRC_LISTENER,
 905                          "ConsumerManager::_deserializeOutstandingIndications");
 906                  
 907                      String fileName = FileSystem::getAbsolutePath(
 908                                            (const char*)_consumerConfigDir.getCString(),
 909                                            String(consumerName + ".dat"));
 910 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
 911                          "Consumer dat file: %s",(const char*)fileName.getCString()));
 912 h.sterling  1.1  
 913                      Array<CIMInstance> cimInstances;
 914 marek       1.21     Array<String>      urlStrings;
 915                      Array<IndicationDispatchEvent> indications;
 916 h.sterling  1.1  
 917                      // Open the log file and serialize remaining indications
 918                      if (FileSystem::exists(fileName)  && FileSystem::canRead(fileName))
 919                      {
 920 mike        1.9          Buffer text;
 921 h.sterling  1.1          CIMInstance cimInstance;
 922 marek       1.21         CIMProperty cimProperty;
 923                          CIMValue cimValue;
 924                          String urlString;
 925 h.sterling  1.1          XmlEntry entry;
 926                  
 927                          try
 928                          {
 929 marek       1.21             //ATTN: Is this safe to use; what about CRLFs?
 930                              FileSystem::loadFileToMemory(text, fileName);
 931 h.sterling  1.1  
 932                              //parse the file
 933                              XmlParser parser((char*)text.getData());
 934                              XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
 935                  
 936                              while (XmlReader::getNamedInstanceElement(parser, cimInstance))
 937                              {
 938 marek       1.21                 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
 939                                  if (index != PEG_NOT_FOUND)
 940                                  {
 941                                      // get the URL string property from the serialized instance 
 942                                      // and remove the property
 943                                      cimProperty = cimInstance.getProperty(index);
 944                                      cimValue = cimProperty.getValue();
 945                                      cimValue.get(urlString);
 946                                      cimInstance.removeProperty(index);
 947                                  }
 948                                  IndicationDispatchEvent* indicationEvent = 
 949                                      new IndicationDispatchEvent(
 950                                          OperationContext(), 
 951                                          urlString, 
 952                                          cimInstance);
 953                  
 954 h.sterling  1.11                 indications.append(*indicationEvent);
 955 h.sterling  1.1              }
 956                  
 957                              XmlReader::expectEndTag(parser, "IRETURNVALUE");
 958                  
 959 mike        1.20             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
 960 h.sterling  1.1                            "Consumer %s has %d outstanding indications",
 961                                            (const char*)consumerName.getCString(),
 962 marek       1.18                           indications.size()));
 963 h.sterling  1.1  
 964                              //delete the file 
 965                              FileSystem::removeFile(fileName);
 966                  
 967                          } catch (Exception& ex)
 968                          {
 969 thilo.boehm 1.28             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 970                                  "Error parsing dat file for consumer %s: %s",
 971                                  (const char*)consumerName.getCString(),
 972                                  (const char*)ex.getMessage().getCString()));
 973 h.sterling  1.1  
 974                          } catch (...)
 975                          {
 976 thilo.boehm 1.28             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
 977                                  "Error parsing dat file for consumer %s: Unknown Exception",
 978                                  (const char*)consumerName.getCString()));
 979 h.sterling  1.1          }
 980                      }
 981                  
 982                      PEG_METHOD_EXIT();
 983 h.sterling  1.11     return indications;
 984 h.sterling  1.1  }
 985                  
 986                  
 987                  
 988                  /** 
 989 marek       1.21  * This is the main worker thread of the consumer.  By having only one thread
 990                   * per consumer, we eliminate a ton of synchronization issues and make it easy 
 991                   * to prevent the consumer from performing two mutually exclusive operations
 992                   * at once.  This also prevents one bad consumer from taking the entire 
 993                   * listener down.  That being said, it is up to the programmer to write smart
 994                   * consumers, and to ensure that their actions don't deadlock 
 995                   * the worker thread.
 996 h.sterling  1.1   * 
 997 marek       1.21  * If a consumer receives a lot of traffic, or it's consumeIndication() method
 998                   * takes a considerable amount of time to complete, it may make sense to make
 999                   * the consumer multi-threaded.  The individual consumer can immediately
1000                   * spawn off* new threads to handle indications, and return immediately to
1001                   * catch the next indication.  In this way, a consumer can attain 
1002                   * extremely high performance. 
1003 h.sterling  1.1   * 
1004                   * There are three different events that can signal us:
1005                   * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
1006 marek       1.21  * 2) A shutdown signal (signalled from ConsumerManager, due to a listener
1007                   *    shutdown or an idle consumer state)
1008 h.sterling  1.1   * 3) A retry signal (signalled from this routine itself)
1009                   * 
1010 marek       1.21  * The idea is that all new indications are put on the front of the queue and
1011                   * processed first.  All of the retry indications are put on the back of the 
1012                   * queue and are only processed AFTER all new indications are sent.
1013                   * Before processing each indication, we check to see whether or not the
1014                   * shutdown signal was given.  
1015                   * If so, we immediately break out of the loop, and another compenent 
1016                   * serializes the remaining indications to a file.
1017 h.sterling  1.1   * 
1018 marek       1.21  * An indication gets retried 
1019                   *     if the consumer throws a CIM_ERR_FAILED exception.
1020 h.sterling  1.1   * 
1021 marek       1.21  * This function makes sure it waits until the default retry lapse has passed 
1022                   * to avoid issues with the following scenario:
1023 h.sterling  1.5   * 20 new indications come in, 10 of them are successful, 10 are not.
1024 marek       1.21  * We were signalled 20 times, so we will pass the time_wait 20 times. 
1025                   * Perceivably, the process time on each indication could be minimal.
1026                   * We could potentially proceed to process the retries after a very small time
1027                   * interval since we would never hit the wait for the retry timeout.
1028 h.sterling  1.1   * 
1029                   */ 
1030 marek       1.21 ThreadReturnType PEGASUS_THREAD_CDECL 
1031                      ConsumerManager::_worker_routine(void *param)
1032 h.sterling  1.1  {
1033                      PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
1034                  
1035                      DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
1036                      String name = myself->getName();
1037 mike        1.15     List<IndicationDispatchEvent,Mutex> tmpEventQueue;
1038 h.sterling  1.1  
1039 thilo.boehm 1.28     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
1040                          "_worker_routine::entering loop for %s",
1041                          (const char*)name.getCString()));
1042 h.sterling  1.1  
1043 h.sterling  1.8      myself->_listeningSemaphore->signal();
1044                  
1045 h.sterling  1.1      while (true)
1046                      {
1047 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,"_worker_routine::waiting %s", 
1048                              (const char*)name.getCString()));
1049                  
1050 kumpf       1.26 
1051                          //wait to be signalled
1052                          if (!myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE))
1053 h.sterling  1.1          {
1054 kumpf       1.26             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4,
1055                                  "_worker_routine::Time to retry any outstanding indications.");
1056                  
1057                              // signal the queue in the same way we would,
1058                              // if we received a new indication
1059                              // this allows the thread to fall into the queue processing code
1060                              myself->_check_queue->signal();
1061                  
1062                              continue;
1063                          }
1064                  
1065 thilo.boehm 1.28         PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1066                              "_worker_routine::signalled %s",(const char*)name.getCString()));
1067 h.sterling  1.1  
1068 kumpf       1.26         //check whether we received the shutdown signal
1069                          if (myself->_dieNow)
1070                          {
1071 thilo.boehm 1.28             PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1072                                  "_worker_routine::shutdown received %s",
1073                                  (const char*)name.getCString()));
1074 kumpf       1.26             break;
1075                          }
1076 h.sterling  1.1  
1077 kumpf       1.26         //create a temporary queue to store failed indications
1078                          tmpEventQueue.clear();
1079 h.sterling  1.1  
1080 kumpf       1.26         //continue processing events until the queue is empty
1081                          //make sure to check for the shutdown signal before every iteration
1082                          // Note that any time during our processing of events the Listener
1083                          // may be enqueueing NEW events for us to process.
1084                          // Because we are popping off the front and new events are being
1085                          // thrown on the back if events are failing when we start
1086                          // But are succeeding by the end of the processing, events may be
1087                          // sent out of chronological order.
1088                          // However. Once we complete the current queue of events, we will
1089                          // always send old events to be retried before sending any
1090                          // new events added afterwards.
1091                          while (myself->_eventqueue.size())
1092                          {
1093                              //check for shutdown signal
1094                              //this only breaks us out of the queue loop, but we will
1095                              //immediately get through the next wait from
1096                              //the shutdown signal itself, at which time we break
1097                              //out of the main loop
1098 h.sterling  1.1              if (myself->_dieNow)
1099                              {
1100 thilo.boehm 1.28                 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
1101                                      "Received signal to shutdown, jumping out of queue loop %s",
1102                                      (const char*)name.getCString()));
1103 h.sterling  1.1                  break;
1104                              }
1105                  
1106 kumpf       1.26             //pop next indication off the queue
1107                              IndicationDispatchEvent* event = 0;
1108                              //what exceptions/errors can this throw?
1109                              event = myself->_eventqueue.remove_front();
1110 h.sterling  1.1  
1111 kumpf       1.26             if (!event)
1112 h.sterling  1.1              {
1113 kumpf       1.26                 //this should never happen
1114                                  continue;
1115                              }
1116 h.sterling  1.1  
1117 thilo.boehm 1.28             PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1118                                  "_worker_routine::consumeIndication %s",
1119                                  (const char*)name.getCString()));
1120 h.sterling  1.1  
1121 kumpf       1.26             try
1122                              {
1123                                  myself->consumeIndication(event->getContext(),
1124                                                            event->getURL(),
1125                                                            event->getIndicationInstance());
1126                  
1127 thilo.boehm 1.28                 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1128                                      "_worker_routine::processed indication successfully. %s",
1129                                      (const char*)name.getCString()));
1130 h.sterling  1.1  
1131 kumpf       1.26                 delete event;
1132                                  continue;
1133                              }
1134                              catch (CIMException & ce)
1135                              {
1136                                  //check for failure
1137                                  if (ce.getCode() == CIM_ERR_FAILED)
1138 h.sterling  1.1                  {
1139 thilo.boehm 1.28                     PEG_TRACE((TRC_LISTENER, Tracer::LEVEL2, 
1140 kumpf       1.26                         "_worker_routine::consumeIndication() temporary"
1141 thilo.boehm 1.28                             " failure %s : %s",
1142                                          (const char*)name.getCString(),
1143                                          (const char*)ce.getMessage().getCString()));
1144 kumpf       1.26                     
1145                                      // Here we simply determine if we should increment
1146                                      // the retry count or not.
1147                                      // We don't want to count a forced retry from a new
1148                                      // event to count as a retry. 
1149                                      // We just have to do it for order's sake.
1150                                      // If the retry Lapse has lapsed on this event,
1151                                      // then increment the counter.
1152                                      if (event->getRetries() > 0)
1153                                      {
1154                                          Sint64 differenceInMicroseconds = 
1155                                              CIMDateTime::getDifference(
1156                                                  event->getLastAttemptTime(),
1157                                                  CIMDateTime::getCurrentDateTime());
1158 h.sterling  1.1  
1159 kumpf       1.26                         if (differenceInMicroseconds >= 
1160                                                  (DEFAULT_RETRY_LAPSE * 1000))
1161 marek       1.21                         {
1162 h.sterling  1.10                             event->increaseRetries();
1163                                          }
1164 kumpf       1.26                     }
1165                                      else
1166                                      {
1167                                          event->increaseRetries();
1168                                      }
1169 h.sterling  1.1  
1170 kumpf       1.26                     //determine if we have hit the max retry count
1171                                      if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
1172                                      {
1173                                          PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
1174                                              "Error: the maximum retry count has been "
1175                                                  "exceeded.  Removing the event from "
1176                                                  "the queue.");
1177                  
1178                                          Logger::put(
1179                                              Logger::ERROR_LOG,
1180                                              System::CIMLISTENER,
1181                                              Logger::SEVERE,
1182                                              "The following indication did not get "
1183                                                  "processed successfully: $0",
1184                                          event->getIndicationInstance().getPath().toString());
1185 h.sterling  1.1  
1186                                          delete event;
1187                                          continue;
1188                                      }
1189 kumpf       1.26                     else
1190                                      {
1191                                          PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4,
1192                                              "_worker_routine::placing failed indication "
1193                                                  "back in queue");
1194                                          tmpEventQueue.insert_back(event);
1195                                      }
1196                                  }
1197                                  else
1198 h.sterling  1.1                  {
1199 thilo.boehm 1.28                     PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
1200                                          "Error: consumeIndication() permanent failure: %s",
1201                                          (const char*)ce.getMessage().getCString()));
1202 h.sterling  1.1                      delete event;
1203                                      continue;
1204 kumpf       1.26                 }
1205                              }
1206                              catch (Exception & ex)
1207 h.sterling  1.1              {
1208 thilo.boehm 1.28                 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
1209                                      "Error: consumeIndication() permanent failure: %s",
1210                                      (const char*)ex.getMessage().getCString()));
1211 kumpf       1.26                 delete event;
1212                                  continue;
1213 h.sterling  1.1              }
1214 kumpf       1.26             catch (...)
1215                              {
1216                                  PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
1217                                      "Error: consumeIndication() failed: Unknown exception.");
1218                                  delete event;
1219                                  continue;
1220                              } //end try
1221                  
1222                          } //while eventqueue
1223                  
1224                          // Copy the failed indications back to the main queue
1225                          // We now lock the queue while adding the retries on to the queue
1226                          // so that new events can't get in in front
1227                          // Of those events we are retrying. Retried events happened before
1228                          // any new events coming in.
1229                          IndicationDispatchEvent* tmpEvent = 0;
1230 kumpf       1.27         if (myself->_eventqueue.try_lock())
1231 h.sterling  1.1          {
1232 kumpf       1.27             while (tmpEventQueue.size())
1233                              {
1234                                  tmpEvent = tmpEventQueue.remove_front();
1235                                  myself->_eventqueue.insert_back(tmpEvent);
1236                              }
1237                  
1238                              myself->_eventqueue.unlock();
1239                          }
1240                          else
1241                          {
1242                              PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3,
1243                                  "Failed to lock _eventqueue");
1244 kumpf       1.26         }
1245 h.sterling  1.1      } //shutdown
1246                  
1247                      PEG_METHOD_EXIT();
1248                      return 0;
1249                  }
1250                  
1251                  
1252                  PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2