(file) Return to ConsumerManager.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

   1 h.sterling 1.1 //%2005////////////////////////////////////////////////////////////////////////
   2                //
   3                // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4                // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5                // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6                // IBM Corp.; EMC Corporation, The Open Group.
   7                // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8                // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9                // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10                // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11                //
  12                // Permission is hereby granted, free of charge, to any person obtaining a copy
  13                // of this software and associated documentation files (the "Software"), to
  14                // deal in the Software without restriction, including without limitation the
  15                // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  16                // sell copies of the Software, and to permit persons to whom the Software is
  17                // furnished to do so, subject to the following conditions:
  18                // 
  19                // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  20                // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  21                // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  22 h.sterling 1.1 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  23                // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  24                // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  25                // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  26                // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  27                //
  28                //==============================================================================
  29                //
  30                // Author: Heather Sterling (hsterl@us.ibm.com)
  31                //
  32                // Modified By: 
  33                //
  34                //%/////////////////////////////////////////////////////////////////////////////
  35                
  36                #include <Pegasus/Common/Config.h>
  37                //#include <cstdlib>
  38                //#include <dlfcn.h>
  39                #include <Pegasus/Common/System.h>
  40                #include <Pegasus/Common/FileSystem.h>
  41                #include <Pegasus/Common/Tracer.h>
  42                #include <Pegasus/Common/Logger.h>
  43 h.sterling 1.1 #include <Pegasus/Common/XmlReader.h>
  44                #include <Pegasus/Common/XmlParser.h>
  45                #include <Pegasus/Common/XmlWriter.h>
  46                #include <Pegasus/Common/IPC.h>
  47                
  48                #include "ConsumerManager.h"
  49                
  50                PEGASUS_NAMESPACE_BEGIN
  51                PEGASUS_USING_STD;
  52                
  53                //ATTN: Can we just use a properties file instead??  If we only have one property, we may want to just parse it ourselves.
  54                // We may need to add more properties, however.  Potential per consumer properties: unloadOk, idleTimout, retryCount, etc
  55                static struct OptionRow optionsTable[] =
  56                //optionname defaultvalue rqd  type domain domainsize clname hlpmsg
  57                {
  58                {"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"},
  59                };
  60                
  61                const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
  62                
  63                //retry settings
  64 h.sterling 1.1 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
  65 h.sterling 1.6 static const Uint32 DEFAULT_RETRY_LAPSE = 300000;  //ms = 5 minutes
  66 h.sterling 1.1 
  67                
  68                
  69                ConsumerManager::ConsumerManager(const String& consumerDir, const String& consumerConfigDir, Boolean enableConsumerUnload, Uint32 idleTimeout) : 
  70                _consumerDir(consumerDir),
  71                _consumerConfigDir(consumerConfigDir),
  72                _enableConsumerUnload(enableConsumerUnload),
  73                _idleTimeout(idleTimeout),
  74                _forceShutdown(true)
  75                {
  76                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
  77                
  78                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer library directory: " + consumerDir);
  79                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer configuration directory: " + consumerConfigDir);
  80                
  81                    Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
  82                                  "Consumer unload enabled %d: idle timeout %d",
  83                                  enableConsumerUnload,
  84                                  idleTimeout);
  85                
  86                
  87 h.sterling 1.8     //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
  88 h.sterling 1.6     //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
  89 h.sterling 1.1 
  90 kumpf      1.3     struct timeval deallocateWait = {15, 0};
  91                    _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
  92 h.sterling 1.1 
  93                    _init();
  94                
  95                    PEG_METHOD_EXIT();
  96                }
  97                
  98                ConsumerManager::~ConsumerManager()
  99                {
 100                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
 101                
 102                    unloadAllConsumers();
 103                
 104 h.sterling 1.5     if (_thread_pool != NULL)
 105                    {
 106 kumpf      1.3     delete _thread_pool;
 107 h.sterling 1.5     }
 108 h.sterling 1.1 
 109                    ConsumerTable::Iterator i = _consumers.start();
 110                    for (; i!=0; i++)
 111                    {
 112                        DynamicConsumer* consumer = i.value();
 113                        delete consumer;
 114                    }
 115                
 116                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
 117                
 118                    ModuleTable::Iterator j = _modules.start();
 119                    for (;j!=0;j++)
 120                    {
 121                        ConsumerModule* module = j.value();
 122                        delete module;
 123                    }
 124                
 125                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
 126                
 127                    PEG_METHOD_EXIT();
 128                }
 129 h.sterling 1.1 
 130                void ConsumerManager::_init()
 131                {
 132                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
 133                
 134                    //check if there are any outstanding indications
 135                    Array<String> files;
 136                    Uint32 pos;
 137                    String consumerName;
 138                
 139                    if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
 140                    {
 141                        for (Uint32 i = 0; i < files.size(); i++)
 142                        {
 143                            pos = files[i].find(".dat");
 144                            if (pos != PEG_NOT_FOUND)
 145                            {
 146                                consumerName = files[i].subString(0, pos);
 147                
 148                                try
 149                                {
 150 h.sterling 1.1                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!");
 151                                    getConsumer(consumerName);
 152                
 153                                } catch (...)
 154                                {
 155                                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]);
 156                                }
 157                            }
 158                        }
 159                    }
 160                
 161                    PEG_METHOD_EXIT();
 162                }
 163                
 164                String ConsumerManager::getConsumerDir()
 165                {
 166                    return _consumerDir;
 167                }
 168                
 169                String ConsumerManager::getConsumerConfigDir()
 170                {
 171 h.sterling 1.1     return _consumerConfigDir;
 172                }
 173                
 174                Boolean ConsumerManager::getEnableConsumerUnload()
 175                {
 176                    return _enableConsumerUnload;
 177                }
 178                
 179                Uint32 ConsumerManager::getIdleTimeout()
 180                {
 181                    return _idleTimeout;
 182                }
 183                
 184                /** Retrieves the library name associated with the consumer name.  By default, the library name
 185                  * is the same as the consumer name.  However, you may specify a different library name in a consumer
 186                  * configuration file.  This file must be named "MyConsumer.txt" and contain the following:
 187                  *     location="libraryName"
 188                  *
 189                  * The config file is optional and is generally only needed in cases where there are strict requirements
 190                  * on library naming.
 191                  *
 192 h.sterling 1.1   * It is the responsibility of the caller to catch any exceptions thrown by this method.
 193                  */
 194                String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
 195                {
 196                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
 197                
 198                    //default library name is consumer name
 199                    String libraryName = consumerName;
 200                
 201                    //check whether an alternative library name was specified in an optional consumer config file
 202                    String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf"));
 203                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile);
 204                
 205                    if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
 206                    {
 207                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName);
 208                
 209                        try
 210                        {
 211 h.sterling 1.6             //Bugzilla 3765 - Change this to use a member var when OptionManager has a reset option
 212 h.sterling 1.8             OptionManager _optionMgr;
 213                            _optionMgr.registerOptions(optionsTable, NUM_OPTIONS); //comment this line out later
 214 h.sterling 1.1             _optionMgr.mergeFile(configFile);
 215                            _optionMgr.checkRequiredOptions();
 216                
 217                            if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY))
 218                            {
 219                                PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile); 
 220                                libraryName = consumerName;
 221                            }
 222                
 223                        } catch (Exception & ex)
 224                        {
 225                            throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE",
 226                                                               "Error reading $0: $1.",
 227                                                               configFile,
 228                                                               ex.getMessage()));
 229                        }
 230                    } else
 231                    {
 232                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName);
 233                    }
 234                
 235 h.sterling 1.1     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName);
 236                
 237                    PEG_METHOD_EXIT();
 238                    return libraryName;
 239                }
 240                
 241                /** Returns the DynamicConsumer for the consumerName.  If it already exists, we return the one in the cache.  If it
 242                 *  DNE, we create it and initialize it, and add it to the table.
 243                 * @throws Exception if we cannot successfully create and initialize the consumer
 244                 */ 
 245                DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
 246                {
 247                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
 248                
 249                    DynamicConsumer* consumer = 0;
 250                    CIMIndicationConsumerProvider* consumerRef = 0;
 251                    Boolean cached = false;
 252                    Boolean entryExists = false;
 253                
 254                    AutoMutex lock(_consumerTableMutex);
 255                
 256 h.sterling 1.1     if (_consumers.lookup(consumerName, consumer))
 257                    {
 258                        //why isn't this working??
 259                        entryExists = true;
 260                
 261                        if (consumer && consumer->isLoaded())
 262                        {
 263                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName);
 264                            cached = true;
 265                        }
 266                    } else
 267                    {
 268                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName);
 269                        consumer = new DynamicConsumer(consumerName);
 270                        //ATTN: The above is a memory leak if _initConsumer throws an exception
 271                        //need to delete it in that case
 272                    }
 273                
 274                    if (!cached)
 275                    {
 276                        _initConsumer(consumerName, consumer);
 277 h.sterling 1.1 
 278                        if (!entryExists)
 279                        {
 280                            _consumers.insert(consumerName, consumer);
 281                        }
 282                    }
 283                
 284                    consumer->updateIdleTimer();
 285                
 286                    PEG_METHOD_EXIT();
 287                    return consumer;
 288                }
 289                
 290                /** Initializes a DynamicConsumer.
 291                 * Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist.
 292                 * @throws Exception if the consumer cannot be initialized
 293                 */
 294                void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer)
 295                {
 296                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
 297                
 298 h.sterling 1.1     CIMIndicationConsumerProvider* base = 0;
 299                    ConsumerModule* module = 0;
 300                
 301                    //lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one)
 302                    String libraryName = _getConsumerLibraryName(consumerName);
 303                    module = _lookupModule(libraryName);
 304                
 305                    //build library path
 306                    String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName));
 307                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath);
 308                
 309                    //load module
 310                    try
 311                    {
 312                        base = module->load(consumerName, libraryPath);
 313                        consumer->set(module, base);
 314                
 315                    } catch (Exception& ex)
 316                    {
 317                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage());
 318                
 319 h.sterling 1.1         throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 320                                                           "Cannot load module ($0:$1): Unknown exception.",
 321                                                           consumerName,
 322                                                           libraryName));
 323                    } catch (...)
 324                    {
 325                        throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 326                                                           "Cannot load module ($0:$1): Unknown exception.",
 327                                                           consumerName,
 328                                                           libraryName));
 329                    }
 330                
 331                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName);
 332                
 333                    //initialize consumer
 334                    try
 335                    {
 336                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " +  consumerName);
 337                
 338                        consumer->initialize();
 339                
 340 h.sterling 1.1         //ATTN: need to change this
 341                        Semaphore* semaphore = new Semaphore(0);  //blocking
 342                
 343                        consumer->setShutdownSemaphore(semaphore);
 344                
 345                        //start the worker thread
 346 konrad.r   1.7         if (_thread_pool->allocate_and_awaken(consumer,
 347 h.sterling 1.1                                           _worker_routine,
 348 konrad.r   1.7                                           semaphore) != PEGASUS_THREAD_OK)
 349 h.sterling 1.8     {
 350                        Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
 351                        "Not enough threads for consumer.");
 352 konrad.r   1.7  
 353 h.sterling 1.8         Tracer::trace(TRC_LISTENER, Tracer::LEVEL2,
 354                        "Could not allocate thread for consumer.");
 355 konrad.r   1.7 
 356 h.sterling 1.8        consumer->setShutdownSemaphore(0);
 357                       delete semaphore;
 358 konrad.r   1.7            throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
 359 h.sterling 1.8                         "Not enough threads for consumer worker routine."));
 360 konrad.r   1.7         }
 361 h.sterling 1.1 
 362 h.sterling 1.8         //wait until the listening thread has started.  Otherwise, there is a miniscule chance that the first event will be enqueued
 363                        //before the consumer is waiting for it and the first indication after loading the consumer will be lost
 364                        consumer->waitForEventThread();
 365                
 366 h.sterling 1.1         //load any outstanding requests
 367                        Array<CIMInstance> outstandingIndications = _deserializeOutstandingIndications(consumerName);
 368                        if (outstandingIndications.size())
 369                        {
 370                            //the consumer will signal itself in _loadOustandingIndications
 371                            consumer->_loadOutstandingIndications(outstandingIndications);
 372                        }
 373                
 374                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);
 375                
 376                    } catch (...)
 377                    {
 378                        module->unloadModule();
 379                        consumer->reset();
 380                        throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
 381                                                           "Cannot initialize consumer ($0).",
 382                                                           consumerName));        
 383                    }
 384                
 385                    PEG_METHOD_EXIT();    
 386                }
 387 h.sterling 1.1 
 388                
 389                /** Returns the ConsumerModule with the given library name.  If it already exists, we return the one in the cache.  If it
 390                 *  DNE, we create it and add it to the table.
 391                 * @throws Exception if we cannot successfully create and initialize the consumer
 392                 */ 
 393                ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName) 
 394                {
 395                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
 396                
 397                    AutoMutex lock(_moduleTableMutex);
 398                
 399                    ConsumerModule* module = 0;
 400                
 401                    //see if consumer module is cached
 402                    if (_modules.lookup(libraryName, module))
 403                    {
 404                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
 405                                         "Found Consumer Module" + libraryName + " in Consumer Manager Cache");
 406                
 407                    } else
 408 h.sterling 1.1     {
 409                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
 410                                         "Creating Consumer Provider Module " + libraryName);
 411                
 412                        module = new ConsumerModule(); 
 413                        _modules.insert(libraryName, module);
 414                    }
 415                
 416                    PEG_METHOD_EXIT();
 417                    return(module);
 418                }
 419                
 420                /** Returns true if there are active consumers
 421                 */ 
 422                Boolean ConsumerManager::hasActiveConsumers()
 423                {
 424                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
 425                
 426                    AutoMutex lock(_consumerTableMutex);
 427                    DynamicConsumer* consumer = 0;
 428                
 429 h.sterling 1.1     try
 430                    {
 431                        for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 432                        {
 433                            consumer = i.value();
 434                
 435                            if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))
 436                            {
 437                                PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);
 438                                PEG_METHOD_EXIT();
 439                                return true;
 440                            }
 441                        }
 442                    } catch (...)
 443                    {
 444                        // Unexpected exception; do not assume that no providers are loaded
 445                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");
 446                        PEG_METHOD_EXIT();
 447                        return true;
 448                    }
 449                
 450 h.sterling 1.1     PEG_METHOD_EXIT();
 451                    return false;
 452                }
 453                
 454                /** Returns true if there are loaded consumers
 455                 */ 
 456                Boolean ConsumerManager::hasLoadedConsumers()
 457                {
 458                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
 459                
 460                    AutoMutex lock(_consumerTableMutex);
 461                    DynamicConsumer* consumer = 0;
 462                
 463                    try
 464                    {
 465                        for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 466                        {
 467                            consumer = i.value();
 468                
 469                            if (consumer && consumer->isLoaded())
 470                            {
 471 h.sterling 1.1                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);
 472                                PEG_METHOD_EXIT();
 473                                return true;
 474                            }
 475                        }
 476                    } catch (...)
 477                    {
 478                        // Unexpected exception; do not assume that no providers are loaded
 479                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");
 480                        PEG_METHOD_EXIT();
 481                        return true;
 482                    }
 483                
 484                    PEG_METHOD_EXIT();
 485                    return false;
 486                }
 487                
 488                
 489                /** Shutting down a consumer consists of four major steps:
 490                 * 1) Send the shutdown signal.  This causes the worker routine to break out of the loop and exit.
 491                 * 2) Wait for the worker thread to end.  This may take a while if it's processing an indication.  This
 492 h.sterling 1.1  *    is optional in a shutdown scenario.  If the listener is shutdown with a -f force, the listener
 493                 *    will not wait for the consumer to finish before shutting down.  Note that a normal shutdown only allows
 494                 *    the current consumer indication to finish.  All other queued indications are serialized to a log and 
 495                 *    are sent when the consumer is reoaded.
 496                 * 3) Terminate the consumer provider interface.
 497                 * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0)
 498                 * 
 499                 * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all
 500                 * of the consumers so the threads can finish simultaneously.
 501                 * 
 502                 * ATTN: Should the normal shutdown wait for everything in the queue to be processed?  Just new indications
 503                 * to be processed?  I am not inclined to this solution since it could take a LOT of time.  By serializing 
 504                 * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL
 505                 * queued indications on shutdown.  
 506                 */ 
 507                
 508                /** Unloads all consumers.
 509                 */ 
 510                void ConsumerManager::unloadAllConsumers()
 511                {
 512                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
 513 h.sterling 1.1 
 514                    AutoMutex lock(_consumerTableMutex);
 515                
 516                    if (!_consumers.size())
 517                    {
 518                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 519                        PEG_METHOD_EXIT();
 520                        return;
 521                    }
 522                
 523                    if (!_forceShutdown)
 524                    {
 525                        //wait until all the consumers have finished processing the events in their queue
 526                        //ATTN: Should this have a timeout even though it's a force??
 527                        while (hasActiveConsumers())
 528                        {
 529                            pegasus_sleep(500);
 530                        }
 531                    }
 532                
 533                    Array<DynamicConsumer*> loadedConsumers;
 534 h.sterling 1.1 
 535                    ConsumerTable::Iterator i = _consumers.start();
 536                    DynamicConsumer* consumer = 0;
 537                
 538                    for (; i!=0; i++)
 539                    {
 540                        consumer = i.value();
 541                        if (consumer && consumer->isLoaded())
 542                        {
 543                            loadedConsumers.append(consumer);
 544                        }
 545                    }
 546                
 547                    if (loadedConsumers.size())
 548                    {
 549                        try
 550                        {
 551                            _unloadConsumers(loadedConsumers);
 552                
 553                        } catch (Exception& ex)
 554                        {
 555 h.sterling 1.1             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 556                        }
 557                    } else
 558                    {
 559                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 560                    }
 561                
 562                    PEG_METHOD_EXIT();
 563                }
 564                
 565                /** Unloads idle consumers.
 566                 */ 
 567                void ConsumerManager::unloadIdleConsumers()
 568                {
 569                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
 570                
 571                    AutoMutex lock(_consumerTableMutex);
 572                
 573                    if (!_consumers.size())
 574                    {
 575                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 576 h.sterling 1.1         PEG_METHOD_EXIT();
 577                        return;
 578                    }
 579                
 580                    Array<DynamicConsumer*> loadedConsumers;
 581                
 582                    ConsumerTable::Iterator i = _consumers.start();
 583                    DynamicConsumer* consumer = 0;
 584                
 585                    for (; i!=0; i++)
 586                    {
 587                        consumer = i.value();
 588                        if (consumer && consumer->isLoaded() && consumer->isIdle())
 589                        {
 590                            loadedConsumers.append(consumer);
 591                        }
 592                    }
 593                
 594                    if (loadedConsumers.size())
 595                    {
 596                        try
 597 h.sterling 1.1         {
 598                            _unloadConsumers(loadedConsumers);
 599                
 600                        } catch (Exception& ex)
 601                        {
 602                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 603                        }
 604                    } else
 605                    {
 606                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 607                    }
 608                
 609                    PEG_METHOD_EXIT();
 610                }
 611                
 612                /** Unloads a single consumer.
 613                 */ 
 614                void ConsumerManager::unloadConsumer(const String& consumerName)
 615                {
 616                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
 617                
 618 h.sterling 1.1     AutoMutex lock(_consumerTableMutex);
 619                
 620                    DynamicConsumer* consumer = 0;
 621                
 622                    //check whether the consumer exists
 623                    if (!_consumers.lookup(consumerName, consumer))
 624                    {
 625                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);
 626                        return;
 627                    }
 628                
 629                    //check whether the consumer is loaded
 630                    if (consumer && consumer->isLoaded())  //ATTN: forceShutdown?
 631                    {
 632                        //unload the consumer
 633                        Array<DynamicConsumer*> loadedConsumers;
 634                        loadedConsumers.append(consumer);
 635                
 636                        try
 637                        {
 638                            _unloadConsumers(loadedConsumers);
 639 h.sterling 1.1 
 640                        } catch (Exception& ex)
 641                        {
 642                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 643                        }
 644                
 645                    } else
 646                    {
 647                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);
 648                    }
 649                
 650                    PEG_METHOD_EXIT();
 651                }
 652                
 653                /** Unloads the consumers in the given array.
 654                 *  The consumerTable mutex MUST be locked prior to entering this method.
 655                 */ 
 656                void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload)
 657                {
 658                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
 659                
 660 h.sterling 1.1     //tell consumers to shutdown
 661                    for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 662                    {
 663                        consumersToUnload[i]->sendShutdownSignal();
 664                    }
 665                
 666                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");
 667                
 668                    //wait for all the consumer worker threads to complete
 669                    //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last
 670                    //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop
 671                    //processing its requests.
 672                    for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 673                    {
 674                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());
 675                
 676                        //wait for the consumer worker thread to end
 677                        try
 678                        {
 679                            Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();
 680                            if (_shutdownSemaphore)
 681 h.sterling 1.1             {
 682                                _shutdownSemaphore->time_wait(10000); 
 683                            }
 684                
 685                        } catch (TimeOut &)
 686                        {
 687                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");
 688                        }
 689                
 690                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");
 691                
 692                        try
 693                        {
 694                            //terminate consumer provider interface
 695                            consumersToUnload[i]->terminate();
 696                
 697                            //unload consumer provider module
 698                            PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
 699                            consumersToUnload[i]->_module->unloadModule();
 700                
 701                            //serialize outstanding indications
 702 h.sterling 1.1             _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());
 703                
 704                            //reset the consumer
 705                            consumersToUnload[i]->reset();
 706                
 707                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");
 708                
 709                        } catch (Exception& e)
 710                        {
 711                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage()); 
 712                            //ATTN: throw exception? log warning?
 713                        }
 714                    }
 715                
 716                    PEG_METHOD_EXIT();
 717                }
 718                
 719                /** Serializes oustanding indications to a <MyConsumer>.dat file
 720                 */
 721                void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<CIMInstance> indications)
 722                {
 723 h.sterling 1.1     PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");
 724                
 725                    if (!indications.size())
 726                    {
 727                        PEG_METHOD_EXIT();
 728                        return;
 729                    }
 730                
 731                    String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
 732                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
 733                
 734                    Array<char> buffer;
 735                
 736                    // Open the log file and serialize remaining 
 737                    FILE* fileHandle = 0;
 738                    fileHandle = fopen((const char*)fileName.getCString(), "w"); 
 739                
 740                    if (!fileHandle)
 741                    {
 742                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);
 743                
 744 h.sterling 1.1     } else
 745                    {
 746                        Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
 747                                      "Serializing %d outstanding requests for %s",
 748                                      indications.size(),
 749                                      (const char*)consumerName.getCString());
 750                
 751                        //we have to put the array of instances under a valid root element or the parser complains 
 752                        XmlWriter::append(buffer, "<IRETURNVALUE>\n");
 753                
 754                        for (Uint32 i = 0; i < indications.size(); i++)
 755                        {
 756                            XmlWriter::appendValueNamedInstanceElement(buffer, indications[i]);
 757                        }
 758                
 759                        XmlWriter::append(buffer, "</IRETURNVALUE>\0");
 760                
 761                        fputs((const char*)buffer.getData(), fileHandle);
 762                
 763                        fclose(fileHandle);
 764                    }
 765 h.sterling 1.1 
 766                    PEG_METHOD_EXIT();
 767                }
 768                
 769                /** Reads outstanding indications from a <MyConsumer>.dat file
 770                 */ 
 771                Array<CIMInstance> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName)
 772                {
 773                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");
 774                
 775                    String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
 776                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
 777                
 778                    Array<CIMInstance> cimInstances;
 779                
 780                    // Open the log file and serialize remaining indications
 781                    if (FileSystem::exists(fileName)  && FileSystem::canRead(fileName))
 782                    {
 783                        Array<char> text;
 784                        CIMInstance cimInstance;
 785                        XmlEntry entry;
 786 h.sterling 1.1 
 787                        try
 788                        {
 789                            FileSystem::loadFileToMemory(text, fileName);  //ATTN: Is this safe to use; what about CRLFs?
 790                            text.append('\0');
 791                
 792                            //parse the file
 793                            XmlParser parser((char*)text.getData());
 794                            XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
 795                
 796                            while (XmlReader::getNamedInstanceElement(parser, cimInstance))
 797                            {
 798                                cimInstances.append(cimInstance);
 799                            }
 800                
 801                            XmlReader::expectEndTag(parser, "IRETURNVALUE");
 802                
 803                            Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
 804                                          "Consumer %s has %d outstanding indications",
 805                                          (const char*)consumerName.getCString(),
 806                                          cimInstances.size());
 807 h.sterling 1.1 
 808                            //delete the file 
 809                            FileSystem::removeFile(fileName);
 810                
 811                        } catch (Exception& ex)
 812                        {
 813                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);
 814                
 815                        } catch (...)
 816                        {
 817                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);
 818                        }
 819                    }
 820                
 821                    PEG_METHOD_EXIT();
 822                    return cimInstances;
 823                }
 824                
 825                
 826                
 827                /** 
 828 h.sterling 1.1  * This is the main worker thread of the consumer.  By having only one thread per consumer, we eliminate a ton
 829                 * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive
 830                 * operations at once.  This also prevents one bad consumer from taking the entire listener down.  That being said,
 831                 * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread. 
 832                 * 
 833                 * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to
 834                 * complete, it may make sense to make the consumer multi-threaded.  The individual consumer can immediately spawn off
 835                 * new threads to handle indications, and return immediately to catch the next indication.  In this way, a consumer
 836                 * can attain extremely high performance. 
 837                 * 
 838                 * There are three different events that can signal us:
 839                 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
 840                 * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state)
 841                 * 3) A retry signal (signalled from this routine itself)
 842                 * 
 843                 * The idea is that all new indications are put on the front of the queue and processed first.  All of the retry
 844                 * indications are put on the back of the queue and are only processed AFTER all new indications are sent.
 845                 * Before processing each indication, we check to see whether or not the shutdown signal was given.  If so,
 846                 * we immediately break out of the loop, and another compenent serializes the remaining indications to a file.
 847                 * 
 848                 * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception.
 849 h.sterling 1.1  * 
 850 h.sterling 1.5  * This function makes sure it waits until the default retry lapse has passed to avoid issues with the following scenario:
 851                 * 20 new indications come in, 10 of them are successful, 10 are not.
 852 h.sterling 1.1  * We were signalled 20 times, so we will pass the time_wait 20 times.  Perceivably, the process time on each indication
 853                 * could be minimal.  We could potentially proceed to process the retries after a very small time interval since
 854 h.sterling 1.5  * we would never hit the wait for the retry timeout.  
 855 h.sterling 1.1  * 
 856                 * ATTN: Outstanding issue with this strategy -- 20 new indications come in, 19 of them come in before the first one
 857                 * is processed.  Because new indications are first in, first out, the 19 indications will be processed in reverse order.
 858 h.sterling 1.5  * Is this a problem? Short answer - NO.  They could arrive in reverse order anyways depending on the network stack.
 859 h.sterling 1.1  * 
 860                 */ 
 861                PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param)
 862                {
 863                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
 864                
 865                    DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
 866                    String name = myself->getName();
 867                    DQueue<IndicationDispatchEvent> tmpEventQueue(true);
 868                
 869                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);
 870                
 871                    PEGASUS_STD(cout) << "Worker thread started for consumer : " << name << endl;
 872                
 873 h.sterling 1.8     myself->_listeningSemaphore->signal();
 874                
 875 h.sterling 1.1     while (true)
 876                    {
 877                        try
 878                        {
 879                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name);
 880                
 881                            //wait to be signalled
 882                            myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
 883                
 884                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name);
 885                
 886                            //check whether we received the shutdown signal
 887                            if (myself->_dieNow)
 888                            {
 889                                PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name);
 890                                break;
 891                            }
 892                
 893                            //signal must have been due to an incoming event
 894                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::indication received " + name);
 895                
 896 h.sterling 1.1             //create a temporary queue to store failed indications
 897                            tmpEventQueue.empty_list();
 898                
 899                            //continue processing events until the queue is empty
 900                            //make sure to check for the shutdown signal before every iteration
 901                
 902                            while (myself->_eventqueue.size())
 903                            {
 904                                //check for shutdown signal
 905                                //this only breaks us out of the queue loop, but we will immediately get through the next wait from
 906                                //the shutdown signal itself, at which time we break out of the main loop
 907                                if (myself->_dieNow)
 908                                {
 909                                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name);
 910                                    break;
 911                                }
 912                
 913                                //pop next indication off the queue
 914                                IndicationDispatchEvent* event = 0;
 915                                event = myself->_eventqueue.remove_first();  //what exceptions/errors can this throw?
 916                
 917 h.sterling 1.1                 if (!event)
 918                                {
 919                                    //this should never happen
 920                                    continue;
 921                                }
 922                
 923 h.sterling 1.5                 //check retry status. do not retry until the retry time has elapsed
 924                                if (event->getRetries() > 0)
 925                                {
 926                                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Last attempt time from event is " + event->getLastAttemptTime().toString());
 927                                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Current time is " + CIMDateTime::getCurrentDateTime().toString());
 928                                    Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(),
 929                                                                                                 CIMDateTime::getCurrentDateTime());
 930                
 931                                    if (differenceInMicroseconds < (DEFAULT_RETRY_LAPSE * 1000))
 932                                    {
 933                                        //do not retry; just add to the retry queue
 934                                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::cannot retry event until retry time has elapsed");
 935                                        tmpEventQueue.insert_last(event);
 936                                        continue;
 937                                    }
 938                                }
 939                
 940 h.sterling 1.1                 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);
 941                
 942                                try
 943                                {
 944                                    myself->consumeIndication(event->getContext(),
 945                                                              event->getURL(),
 946                                                              event->getIndicationInstance());
 947                
 948                                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name);
 949                
 950                                    delete event;
 951                                    continue;
 952                
 953                                } catch (CIMException & ce)
 954                                {
 955                                    //check for failure
 956                                    if (ce.getCode() == CIM_ERR_FAILED)
 957                                    {
 958                                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);
 959                
 960                                        //update event parameters
 961 h.sterling 1.1                         event->increaseRetries();
 962                
 963                                        //determine if we have hit the max retry count
 964                                        if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
 965                                        {
 966                                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2,
 967                                                             "Error: the maximum retry count has been exceeded.  Removing the event from the queue.");
 968                
 969                                            Logger::put(
 970                                                       Logger::ERROR_LOG,
 971                                                       "ConsumerManager",
 972                                                       Logger::SEVERE,
 973                                                       "The following indication did not get processed successfully: $0", 
 974                                                       event->getIndicationInstance().getPath().toString());
 975                
 976                                            delete event;
 977                                            continue;
 978                
 979                                        } else
 980                                        {
 981                                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue");
 982 h.sterling 1.1                             tmpEventQueue.insert_last(event);
 983                                        }
 984                
 985                                    } else
 986                                    {
 987                                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage());
 988                                        delete event;
 989                                        continue;
 990                                    }
 991                
 992                                } catch (Exception & ex)
 993                                {
 994                                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage());
 995                                    delete event;
 996                                    continue;
 997                
 998                                } catch (...)
 999                                {
1000                                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception.");
1001                                    delete event;
1002                                    continue;
1003 h.sterling 1.1                 } //end try
1004                
1005                            } //while eventqueue
1006                
1007                            //copy the failed indications back to the main queue
1008                            //since we are always adding the failed indications to the back, it should not interfere with the
1009                            //dispatcher adding events to the front
1010                
1011                            //there is no = operator for DQueue so we must do it manually
1012                            IndicationDispatchEvent* tmpEvent = 0;
1013                            while (tmpEventQueue.size())
1014                            {
1015                                tmpEvent = tmpEventQueue.remove_first();
1016                                myself->_eventqueue.insert_last(tmpEvent);
1017                            }
1018                
1019                        } catch (TimeOut& te)
1020                        {
1021                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::Time to retry any outstanding indications.");
1022                
1023                            //signal the queue in the same way we would if we received a new indication
1024 h.sterling 1.1             //this allows the thread to fall into the queue processing code
1025                            myself->_check_queue->signal();
1026                
1027                        } //time_wait
1028                
1029                
1030                    } //shutdown
1031                
1032                    PEG_METHOD_EXIT();
1033                    return 0;
1034                }
1035                
1036                
1037                PEGASUS_NAMESPACE_END
1038                
1039                
1040                

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2