(file) Return to ConsumerManager.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

   1 h.sterling 1.1 //%2005////////////////////////////////////////////////////////////////////////
   2                //
   3                // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4                // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5                // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6                // IBM Corp.; EMC Corporation, The Open Group.
   7                // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8                // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9                // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10                // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11                //
  12                // Permission is hereby granted, free of charge, to any person obtaining a copy
  13                // of this software and associated documentation files (the "Software"), to
  14                // deal in the Software without restriction, including without limitation the
  15                // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  16                // sell copies of the Software, and to permit persons to whom the Software is
  17                // furnished to do so, subject to the following conditions:
  18                // 
  19                // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  20                // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  21                // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  22 h.sterling 1.1 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  23                // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  24                // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  25                // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  26                // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  27                //
  28                //==============================================================================
  29                //
  30                // Author: Heather Sterling (hsterl@us.ibm.com)
  31                //
  32                // Modified By: 
  33                //
  34                //%/////////////////////////////////////////////////////////////////////////////
  35                
  36                #include <Pegasus/Common/Config.h>
  37                //#include <cstdlib>
  38                //#include <dlfcn.h>
  39                #include <Pegasus/Common/System.h>
  40                #include <Pegasus/Common/FileSystem.h>
  41                #include <Pegasus/Common/Tracer.h>
  42                #include <Pegasus/Common/Logger.h>
  43 h.sterling 1.1 #include <Pegasus/Common/XmlReader.h>
  44                #include <Pegasus/Common/XmlParser.h>
  45                #include <Pegasus/Common/XmlWriter.h>
  46                #include <Pegasus/Common/IPC.h>
  47                
  48                #include "ConsumerManager.h"
  49                
  50                PEGASUS_NAMESPACE_BEGIN
  51                PEGASUS_USING_STD;
  52                
  53                //ATTN: Can we just use a properties file instead??  If we only have one property, we may want to just parse it ourselves.
  54                // We may need to add more properties, however.  Potential per consumer properties: unloadOk, idleTimout, retryCount, etc
  55                static struct OptionRow optionsTable[] =
  56                //optionname defaultvalue rqd  type domain domainsize clname hlpmsg
  57                {
  58                {"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"},
  59                };
  60                
  61                const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
  62                
  63                //retry settings
  64 h.sterling 1.1 //ATTN: Do we want to make these configurable?  If so, is a global setting for all the consumers ok?
  65                static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
  66                static const Uint32 DEFAULT_RETRY_LAPSE = 3000;  //ms
  67                
  68                
  69                
  70                ConsumerManager::ConsumerManager(const String& consumerDir, const String& consumerConfigDir, Boolean enableConsumerUnload, Uint32 idleTimeout) : 
  71                _consumerDir(consumerDir),
  72                _consumerConfigDir(consumerConfigDir),
  73                _enableConsumerUnload(enableConsumerUnload),
  74                _idleTimeout(idleTimeout),
  75                _forceShutdown(true)
  76                {
  77                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
  78                
  79                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer library directory: " + consumerDir);
  80                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer configuration directory: " + consumerConfigDir);
  81                
  82                    Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
  83                                  "Consumer unload enabled %d: idle timeout %d",
  84                                  enableConsumerUnload,
  85 h.sterling 1.1                   idleTimeout);
  86                
  87                
  88                    _optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
  89                
  90 kumpf      1.3     struct timeval deallocateWait = {15, 0};
  91                    _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
  92 h.sterling 1.1 
  93                    _init();
  94                
  95                    PEG_METHOD_EXIT();
  96                }
  97                
  98                ConsumerManager::~ConsumerManager()
  99                {
 100                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
 101                
 102                    unloadAllConsumers();
 103                
 104 kumpf      1.3     delete _thread_pool;
 105 h.sterling 1.1 
 106                    ConsumerTable::Iterator i = _consumers.start();
 107                    for (; i!=0; i++)
 108                    {
 109                        DynamicConsumer* consumer = i.value();
 110                        delete consumer;
 111                    }
 112                
 113                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
 114                
 115                    ModuleTable::Iterator j = _modules.start();
 116                    for (;j!=0;j++)
 117                    {
 118                        ConsumerModule* module = j.value();
 119                        delete module;
 120                    }
 121                
 122                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
 123                
 124                    PEG_METHOD_EXIT();
 125                }
 126 h.sterling 1.1 
 127                void ConsumerManager::_init()
 128                {
 129                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
 130                
 131                    //check if there are any outstanding indications
 132                    Array<String> files;
 133                    Uint32 pos;
 134                    String consumerName;
 135                
 136                    if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
 137                    {
 138                        for (Uint32 i = 0; i < files.size(); i++)
 139                        {
 140                            pos = files[i].find(".dat");
 141                            if (pos != PEG_NOT_FOUND)
 142                            {
 143                                consumerName = files[i].subString(0, pos);
 144                
 145                                try
 146                                {
 147 h.sterling 1.1                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!");
 148                                    getConsumer(consumerName);
 149                
 150                                } catch (...)
 151                                {
 152                                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]);
 153                                }
 154                            }
 155                        }
 156                    }
 157                
 158                    PEG_METHOD_EXIT();
 159                }
 160                
 161                String ConsumerManager::getConsumerDir()
 162                {
 163                    return _consumerDir;
 164                }
 165                
 166                String ConsumerManager::getConsumerConfigDir()
 167                {
 168 h.sterling 1.1     return _consumerConfigDir;
 169                }
 170                
 171                Boolean ConsumerManager::getEnableConsumerUnload()
 172                {
 173                    return _enableConsumerUnload;
 174                }
 175                
 176                Uint32 ConsumerManager::getIdleTimeout()
 177                {
 178                    return _idleTimeout;
 179                }
 180                
 181                /** Retrieves the library name associated with the consumer name.  By default, the library name
 182                  * is the same as the consumer name.  However, you may specify a different library name in a consumer
 183                  * configuration file.  This file must be named "MyConsumer.txt" and contain the following:
 184                  *     location="libraryName"
 185                  *
 186                  * The config file is optional and is generally only needed in cases where there are strict requirements
 187                  * on library naming.
 188                  *
 189 h.sterling 1.1   * It is the responsibility of the caller to catch any exceptions thrown by this method.
 190                  */
 191                String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
 192                {
 193                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
 194                
 195                    //default library name is consumer name
 196                    String libraryName = consumerName;
 197                
 198                    //check whether an alternative library name was specified in an optional consumer config file
 199                    String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf"));
 200                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile);
 201                
 202                    if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
 203                    {
 204                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName);
 205                
 206                        try
 207                        {
 208                            //ATTN: Does the OptionManager need to be reset?  There's no method for it.
 209                            _optionMgr.mergeFile(configFile);
 210 h.sterling 1.1             _optionMgr.checkRequiredOptions();
 211                
 212                            if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY))
 213                            {
 214                                PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile); 
 215                                libraryName = consumerName;
 216                            }
 217                
 218                        } catch (Exception & ex)
 219                        {
 220                            throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE",
 221                                                               "Error reading $0: $1.",
 222                                                               configFile,
 223                                                               ex.getMessage()));
 224                        }
 225                    } else
 226                    {
 227                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName);
 228                    }
 229                
 230                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName);
 231 h.sterling 1.1 
 232                    PEG_METHOD_EXIT();
 233                    return libraryName;
 234                }
 235                
 236                /** Returns the DynamicConsumer for the consumerName.  If it already exists, we return the one in the cache.  If it
 237                 *  DNE, we create it and initialize it, and add it to the table.
 238                 * @throws Exception if we cannot successfully create and initialize the consumer
 239                 */ 
 240                DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
 241                {
 242                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
 243                
 244                    DynamicConsumer* consumer = 0;
 245                    CIMIndicationConsumerProvider* consumerRef = 0;
 246                    Boolean cached = false;
 247                    Boolean entryExists = false;
 248                
 249                    AutoMutex lock(_consumerTableMutex);
 250                
 251                    if (_consumers.lookup(consumerName, consumer))
 252 h.sterling 1.1     {
 253                        //why isn't this working??
 254                        entryExists = true;
 255                
 256                        if (consumer && consumer->isLoaded())
 257                        {
 258                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName);
 259                            cached = true;
 260                        }
 261                    } else
 262                    {
 263                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName);
 264                        consumer = new DynamicConsumer(consumerName);
 265                        //ATTN: The above is a memory leak if _initConsumer throws an exception
 266                        //need to delete it in that case
 267                    }
 268                
 269                    if (!cached)
 270                    {
 271                        _initConsumer(consumerName, consumer);
 272                
 273 h.sterling 1.1         if (!entryExists)
 274                        {
 275                            _consumers.insert(consumerName, consumer);
 276                        }
 277                    }
 278                
 279                    consumer->updateIdleTimer();
 280                
 281                    PEG_METHOD_EXIT();
 282                    return consumer;
 283                }
 284                
 285                /** Initializes a DynamicConsumer.
 286                 * Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist.
 287                 * @throws Exception if the consumer cannot be initialized
 288                 */
 289                void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer)
 290                {
 291                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
 292                
 293                    CIMIndicationConsumerProvider* base = 0;
 294 h.sterling 1.1     ConsumerModule* module = 0;
 295                
 296                    //lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one)
 297                    String libraryName = _getConsumerLibraryName(consumerName);
 298                    module = _lookupModule(libraryName);
 299                
 300                    //build library path
 301                    String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName));
 302                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath);
 303                
 304                    //load module
 305                    try
 306                    {
 307                        base = module->load(consumerName, libraryPath);
 308                        consumer->set(module, base);
 309                
 310                    } catch (Exception& ex)
 311                    {
 312                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage());
 313                
 314                        throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 315 h.sterling 1.1                                            "Cannot load module ($0:$1): Unknown exception.",
 316                                                           consumerName,
 317                                                           libraryName));
 318                    } catch (...)
 319                    {
 320                        throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
 321                                                           "Cannot load module ($0:$1): Unknown exception.",
 322                                                           consumerName,
 323                                                           libraryName));
 324                    }
 325                
 326                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName);
 327                
 328                    //initialize consumer
 329                    try
 330                    {
 331                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " +  consumerName);
 332                
 333                        consumer->initialize();
 334                
 335                        //ATTN: need to change this
 336 h.sterling 1.1         Semaphore* semaphore = new Semaphore(0);  //blocking
 337                
 338                        consumer->setShutdownSemaphore(semaphore);
 339                
 340                        //start the worker thread
 341                        _thread_pool->allocate_and_awaken(consumer,
 342                                                          _worker_routine,
 343                                                          semaphore);
 344                
 345                        //load any outstanding requests
 346                        Array<CIMInstance> outstandingIndications = _deserializeOutstandingIndications(consumerName);
 347                        if (outstandingIndications.size())
 348                        {
 349                            //the consumer will signal itself in _loadOustandingIndications
 350                            consumer->_loadOutstandingIndications(outstandingIndications);
 351                        }
 352                
 353                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);
 354                
 355                    } catch (...)
 356                    {
 357 h.sterling 1.1         module->unloadModule();
 358                        consumer->reset();
 359                        throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
 360                                                           "Cannot initialize consumer ($0).",
 361                                                           consumerName));        
 362                    }
 363                
 364                    PEG_METHOD_EXIT();    
 365                }
 366                
 367                
 368                /** Returns the ConsumerModule with the given library name.  If it already exists, we return the one in the cache.  If it
 369                 *  DNE, we create it and add it to the table.
 370                 * @throws Exception if we cannot successfully create and initialize the consumer
 371                 */ 
 372                ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName) 
 373                {
 374                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
 375                
 376                    AutoMutex lock(_moduleTableMutex);
 377                
 378 h.sterling 1.1     ConsumerModule* module = 0;
 379                
 380                    //see if consumer module is cached
 381                    if (_modules.lookup(libraryName, module))
 382                    {
 383                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
 384                                         "Found Consumer Module" + libraryName + " in Consumer Manager Cache");
 385                
 386                    } else
 387                    {
 388                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
 389                                         "Creating Consumer Provider Module " + libraryName);
 390                
 391                        module = new ConsumerModule(); 
 392                        _modules.insert(libraryName, module);
 393                    }
 394                
 395                    PEG_METHOD_EXIT();
 396                    return(module);
 397                }
 398                
 399 h.sterling 1.1 /** Returns true if there are active consumers
 400                 */ 
 401                Boolean ConsumerManager::hasActiveConsumers()
 402                {
 403                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
 404                
 405                    AutoMutex lock(_consumerTableMutex);
 406                    DynamicConsumer* consumer = 0;
 407                
 408                    try
 409                    {
 410                        for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 411                        {
 412                            consumer = i.value();
 413                
 414                            if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))
 415                            {
 416                                PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);
 417                                PEG_METHOD_EXIT();
 418                                return true;
 419                            }
 420 h.sterling 1.1         }
 421                    } catch (...)
 422                    {
 423                        // Unexpected exception; do not assume that no providers are loaded
 424                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");
 425                        PEG_METHOD_EXIT();
 426                        return true;
 427                    }
 428                
 429                    PEG_METHOD_EXIT();
 430                    return false;
 431                }
 432                
 433                /** Returns true if there are loaded consumers
 434                 */ 
 435                Boolean ConsumerManager::hasLoadedConsumers()
 436                {
 437                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
 438                
 439                    AutoMutex lock(_consumerTableMutex);
 440                    DynamicConsumer* consumer = 0;
 441 h.sterling 1.1 
 442                    try
 443                    {
 444                        for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
 445                        {
 446                            consumer = i.value();
 447                
 448                            if (consumer && consumer->isLoaded())
 449                            {
 450                                PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);
 451                                PEG_METHOD_EXIT();
 452                                return true;
 453                            }
 454                        }
 455                    } catch (...)
 456                    {
 457                        // Unexpected exception; do not assume that no providers are loaded
 458                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");
 459                        PEG_METHOD_EXIT();
 460                        return true;
 461                    }
 462 h.sterling 1.1 
 463                    PEG_METHOD_EXIT();
 464                    return false;
 465                }
 466                
 467                
 468                /** Shutting down a consumer consists of four major steps:
 469                 * 1) Send the shutdown signal.  This causes the worker routine to break out of the loop and exit.
 470                 * 2) Wait for the worker thread to end.  This may take a while if it's processing an indication.  This
 471                 *    is optional in a shutdown scenario.  If the listener is shutdown with a -f force, the listener
 472                 *    will not wait for the consumer to finish before shutting down.  Note that a normal shutdown only allows
 473                 *    the current consumer indication to finish.  All other queued indications are serialized to a log and 
 474                 *    are sent when the consumer is reoaded.
 475                 * 3) Terminate the consumer provider interface.
 476                 * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0)
 477                 * 
 478                 * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all
 479                 * of the consumers so the threads can finish simultaneously.
 480                 * 
 481                 * ATTN: Should the normal shutdown wait for everything in the queue to be processed?  Just new indications
 482                 * to be processed?  I am not inclined to this solution since it could take a LOT of time.  By serializing 
 483 h.sterling 1.1  * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL
 484                 * queued indications on shutdown.  
 485                 */ 
 486                
 487                /** Unloads all consumers.
 488                 */ 
 489                void ConsumerManager::unloadAllConsumers()
 490                {
 491                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
 492                
 493                    AutoMutex lock(_consumerTableMutex);
 494                
 495                    if (!_consumers.size())
 496                    {
 497                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 498                        PEG_METHOD_EXIT();
 499                        return;
 500                    }
 501                
 502                    if (!_forceShutdown)
 503                    {
 504 h.sterling 1.1         //wait until all the consumers have finished processing the events in their queue
 505                        //ATTN: Should this have a timeout even though it's a force??
 506                        while (hasActiveConsumers())
 507                        {
 508                            pegasus_sleep(500);
 509                        }
 510                    }
 511                
 512                    Array<DynamicConsumer*> loadedConsumers;
 513                
 514                    ConsumerTable::Iterator i = _consumers.start();
 515                    DynamicConsumer* consumer = 0;
 516                
 517                    for (; i!=0; i++)
 518                    {
 519                        consumer = i.value();
 520                        if (consumer && consumer->isLoaded())
 521                        {
 522                            loadedConsumers.append(consumer);
 523                        }
 524                    }
 525 h.sterling 1.1 
 526                    if (loadedConsumers.size())
 527                    {
 528                        try
 529                        {
 530                            _unloadConsumers(loadedConsumers);
 531                
 532                        } catch (Exception& ex)
 533                        {
 534                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 535                        }
 536                    } else
 537                    {
 538                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 539                    }
 540                
 541                    PEG_METHOD_EXIT();
 542                }
 543                
 544                /** Unloads idle consumers.
 545                 */ 
 546 h.sterling 1.1 void ConsumerManager::unloadIdleConsumers()
 547                {
 548                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
 549                
 550                    AutoMutex lock(_consumerTableMutex);
 551                
 552                    if (!_consumers.size())
 553                    {
 554                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 555                        PEG_METHOD_EXIT();
 556                        return;
 557                    }
 558                
 559                    Array<DynamicConsumer*> loadedConsumers;
 560                
 561                    ConsumerTable::Iterator i = _consumers.start();
 562                    DynamicConsumer* consumer = 0;
 563                
 564                    for (; i!=0; i++)
 565                    {
 566                        consumer = i.value();
 567 h.sterling 1.1         if (consumer && consumer->isLoaded() && consumer->isIdle())
 568                        {
 569                            loadedConsumers.append(consumer);
 570                        }
 571                    }
 572                
 573                    if (loadedConsumers.size())
 574                    {
 575                        try
 576                        {
 577                            _unloadConsumers(loadedConsumers);
 578                
 579                        } catch (Exception& ex)
 580                        {
 581                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 582                        }
 583                    } else
 584                    {
 585                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
 586                    }
 587                
 588 h.sterling 1.1     PEG_METHOD_EXIT();
 589                }
 590                
 591                /** Unloads a single consumer.
 592                 */ 
 593                void ConsumerManager::unloadConsumer(const String& consumerName)
 594                {
 595                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
 596                
 597                    AutoMutex lock(_consumerTableMutex);
 598                
 599                    DynamicConsumer* consumer = 0;
 600                
 601                    //check whether the consumer exists
 602                    if (!_consumers.lookup(consumerName, consumer))
 603                    {
 604                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);
 605                        return;
 606                    }
 607                
 608                    //check whether the consumer is loaded
 609 h.sterling 1.1     if (consumer && consumer->isLoaded())  //ATTN: forceShutdown?
 610                    {
 611                        //unload the consumer
 612                        Array<DynamicConsumer*> loadedConsumers;
 613                        loadedConsumers.append(consumer);
 614                
 615                        try
 616                        {
 617                            _unloadConsumers(loadedConsumers);
 618                
 619                        } catch (Exception& ex)
 620                        {
 621                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
 622                        }
 623                
 624                    } else
 625                    {
 626                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);
 627                    }
 628                
 629                    PEG_METHOD_EXIT();
 630 h.sterling 1.1 }
 631                
 632                /** Unloads the consumers in the given array.
 633                 *  The consumerTable mutex MUST be locked prior to entering this method.
 634                 */ 
 635                void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload)
 636                {
 637                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
 638                
 639                    //tell consumers to shutdown
 640                    for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 641                    {
 642                        consumersToUnload[i]->sendShutdownSignal();
 643                    }
 644                
 645                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");
 646                
 647                    //wait for all the consumer worker threads to complete
 648                    //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last
 649                    //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop
 650                    //processing its requests.
 651 h.sterling 1.1     for (Uint32 i = 0; i < consumersToUnload.size(); i++)
 652                    {
 653                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());
 654                
 655                        //wait for the consumer worker thread to end
 656                        try
 657                        {
 658                            Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();
 659                            if (_shutdownSemaphore)
 660                            {
 661                                _shutdownSemaphore->time_wait(10000); 
 662                            }
 663                
 664                        } catch (TimeOut &)
 665                        {
 666                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");
 667                        }
 668                
 669                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");
 670                
 671                        try
 672 h.sterling 1.1         {
 673                            //terminate consumer provider interface
 674                            consumersToUnload[i]->terminate();
 675                
 676                            //unload consumer provider module
 677                            PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
 678                            consumersToUnload[i]->_module->unloadModule();
 679                
 680                            //serialize outstanding indications
 681                            _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());
 682                
 683                            //reset the consumer
 684                            consumersToUnload[i]->reset();
 685                
 686                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");
 687                
 688                        } catch (Exception& e)
 689                        {
 690                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage()); 
 691                            //ATTN: throw exception? log warning?
 692                        }
 693 h.sterling 1.1     }
 694                
 695                    PEG_METHOD_EXIT();
 696                }
 697                
 698                /** Serializes oustanding indications to a <MyConsumer>.dat file
 699                 */
 700                void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<CIMInstance> indications)
 701                {
 702                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");
 703                
 704                    if (!indications.size())
 705                    {
 706                        PEG_METHOD_EXIT();
 707                        return;
 708                    }
 709                
 710                    String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
 711                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
 712                
 713                    Array<char> buffer;
 714 h.sterling 1.1 
 715                    // Open the log file and serialize remaining 
 716                    FILE* fileHandle = 0;
 717                    fileHandle = fopen((const char*)fileName.getCString(), "w"); 
 718                
 719                    if (!fileHandle)
 720                    {
 721                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);
 722                
 723                    } else
 724                    {
 725                        Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
 726                                      "Serializing %d outstanding requests for %s",
 727                                      indications.size(),
 728                                      (const char*)consumerName.getCString());
 729                
 730                        //we have to put the array of instances under a valid root element or the parser complains 
 731                        XmlWriter::append(buffer, "<IRETURNVALUE>\n");
 732                
 733                        for (Uint32 i = 0; i < indications.size(); i++)
 734                        {
 735 h.sterling 1.1             XmlWriter::appendValueNamedInstanceElement(buffer, indications[i]);
 736                        }
 737                
 738                        XmlWriter::append(buffer, "</IRETURNVALUE>\0");
 739                
 740                        fputs((const char*)buffer.getData(), fileHandle);
 741                
 742                        fclose(fileHandle);
 743                    }
 744                
 745                    PEG_METHOD_EXIT();
 746                }
 747                
 748                /** Reads outstanding indications from a <MyConsumer>.dat file
 749                 */ 
 750                Array<CIMInstance> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName)
 751                {
 752                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");
 753                
 754                    String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
 755                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
 756 h.sterling 1.1 
 757                    Array<CIMInstance> cimInstances;
 758                
 759                    // Open the log file and serialize remaining indications
 760                    if (FileSystem::exists(fileName)  && FileSystem::canRead(fileName))
 761                    {
 762                        Array<char> text;
 763                        CIMInstance cimInstance;
 764                        XmlEntry entry;
 765                
 766                        try
 767                        {
 768                            FileSystem::loadFileToMemory(text, fileName);  //ATTN: Is this safe to use; what about CRLFs?
 769                            text.append('\0');
 770                
 771                            //parse the file
 772                            XmlParser parser((char*)text.getData());
 773                            XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
 774                
 775                            while (XmlReader::getNamedInstanceElement(parser, cimInstance))
 776                            {
 777 h.sterling 1.1                 cimInstances.append(cimInstance);
 778                            }
 779                
 780                            XmlReader::expectEndTag(parser, "IRETURNVALUE");
 781                
 782                            Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
 783                                          "Consumer %s has %d outstanding indications",
 784                                          (const char*)consumerName.getCString(),
 785                                          cimInstances.size());
 786                
 787                            //delete the file 
 788                            FileSystem::removeFile(fileName);
 789                
 790                        } catch (Exception& ex)
 791                        {
 792                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);
 793                
 794                        } catch (...)
 795                        {
 796                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);
 797                        }
 798 h.sterling 1.1     }
 799                
 800                    PEG_METHOD_EXIT();
 801                    return cimInstances;
 802                }
 803                
 804                
 805                
 806                /** 
 807                 * This is the main worker thread of the consumer.  By having only one thread per consumer, we eliminate a ton
 808                 * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive
 809                 * operations at once.  This also prevents one bad consumer from taking the entire listener down.  That being said,
 810                 * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread. 
 811                 * 
 812                 * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to
 813                 * complete, it may make sense to make the consumer multi-threaded.  The individual consumer can immediately spawn off
 814                 * new threads to handle indications, and return immediately to catch the next indication.  In this way, a consumer
 815                 * can attain extremely high performance. 
 816                 * 
 817                 * There are three different events that can signal us:
 818                 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
 819 h.sterling 1.1  * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state)
 820                 * 3) A retry signal (signalled from this routine itself)
 821                 * 
 822                 * The idea is that all new indications are put on the front of the queue and processed first.  All of the retry
 823                 * indications are put on the back of the queue and are only processed AFTER all new indications are sent.
 824                 * Before processing each indication, we check to see whether or not the shutdown signal was given.  If so,
 825                 * we immediately break out of the loop, and another compenent serializes the remaining indications to a file.
 826                 * 
 827                 * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception.
 828                 * 
 829                 * ATTN: Outstanding issue with this strategy -- 20 new indications come in, 10 of them are successful, 10 are not
 830                 * We were signalled 20 times, so we will pass the time_wait 20 times.  Perceivably, the process time on each indication
 831                 * could be minimal.  We could potentially proceed to process the retries after a very small time interval since
 832                 * we would never hit the wait for the retry timeout.  We could solve this by keeping track of the last retry
 833                 * attempt, and only retry if a certain length of time has passed.  Otherwise, the logic of the loop needs to be changed.
 834                 * 
 835                 * ATTN: Outstanding issue with this strategy -- 20 new indications come in, 19 of them come in before the first one
 836                 * is processed.  Because new indications are first in, first out, the 19 indications will be processed in reverse order.
 837                 * Is this a problem?
 838                 * 
 839                 */ 
 840 h.sterling 1.1 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param)
 841                {
 842                    PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
 843                
 844                    DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
 845                    String name = myself->getName();
 846                    DQueue<IndicationDispatchEvent> tmpEventQueue(true);
 847                
 848                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);
 849                
 850                    PEGASUS_STD(cout) << "Worker thread started for consumer : " << name << endl;
 851                
 852                    while (true)
 853                    {
 854                        try
 855                        {
 856                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name);
 857                
 858                            //wait to be signalled
 859                            myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
 860                
 861 h.sterling 1.1             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name);
 862                
 863                            //check whether we received the shutdown signal
 864                            if (myself->_dieNow)
 865                            {
 866                                PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name);
 867                                break;
 868                            }
 869                
 870                            //signal must have been due to an incoming event
 871                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::indication received " + name);
 872                
 873                            //create a temporary queue to store failed indications
 874                            tmpEventQueue.empty_list();
 875                
 876                            //continue processing events until the queue is empty
 877                            //make sure to check for the shutdown signal before every iteration
 878                
 879                            while (myself->_eventqueue.size())
 880                            {
 881                                //check for shutdown signal
 882 h.sterling 1.1                 //this only breaks us out of the queue loop, but we will immediately get through the next wait from
 883                                //the shutdown signal itself, at which time we break out of the main loop
 884                                if (myself->_dieNow)
 885                                {
 886                                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name);
 887                                    break;
 888                                }
 889                
 890                                //pop next indication off the queue
 891                                IndicationDispatchEvent* event = 0;
 892                                event = myself->_eventqueue.remove_first();  //what exceptions/errors can this throw?
 893                
 894                                if (!event)
 895                                {
 896                                    //this should never happen
 897                                    continue;
 898                                }
 899                
 900                                PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);
 901                
 902                                try
 903 h.sterling 1.1                 {
 904                                    myself->consumeIndication(event->getContext(),
 905                                                              event->getURL(),
 906                                                              event->getIndicationInstance());
 907                
 908                                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name);
 909                
 910                                    delete event;
 911                                    continue;
 912                
 913                                } catch (CIMException & ce)
 914                                {
 915                                    //check for failure
 916                                    if (ce.getCode() == CIM_ERR_FAILED)
 917                                    {
 918                                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);
 919                
 920                                        //update event parameters
 921                                        event->increaseRetries();
 922                
 923                                        //determine if we have hit the max retry count
 924 h.sterling 1.1                         if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
 925                                        {
 926                                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2,
 927                                                             "Error: the maximum retry count has been exceeded.  Removing the event from the queue.");
 928                
 929                                            Logger::put(
 930                                                       Logger::ERROR_LOG,
 931                                                       "ConsumerManager",
 932                                                       Logger::SEVERE,
 933                                                       "The following indication did not get processed successfully: $0", 
 934                                                       event->getIndicationInstance().getPath().toString());
 935                
 936                                            delete event;
 937                                            continue;
 938                
 939                                        } else
 940                                        {
 941                                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue");
 942                                            tmpEventQueue.insert_last(event);
 943                                        }
 944                
 945 h.sterling 1.1                     } else
 946                                    {
 947                                        PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage());
 948                                        delete event;
 949                                        continue;
 950                                    }
 951                
 952                                } catch (Exception & ex)
 953                                {
 954                                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage());
 955                                    delete event;
 956                                    continue;
 957                
 958                                } catch (...)
 959                                {
 960                                    PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception.");
 961                                    delete event;
 962                                    continue;
 963                                } //end try
 964                
 965                            } //while eventqueue
 966 h.sterling 1.1 
 967                            //copy the failed indications back to the main queue
 968                            //since we are always adding the failed indications to the back, it should not interfere with the
 969                            //dispatcher adding events to the front
 970                
 971                            //there is no = operator for DQueue so we must do it manually
 972                            IndicationDispatchEvent* tmpEvent = 0;
 973                            while (tmpEventQueue.size())
 974                            {
 975                                tmpEvent = tmpEventQueue.remove_first();
 976                                myself->_eventqueue.insert_last(tmpEvent);
 977                            }
 978                
 979                        } catch (TimeOut& te)
 980                        {
 981                            PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::Time to retry any outstanding indications.");
 982                
 983                            //signal the queue in the same way we would if we received a new indication
 984                            //this allows the thread to fall into the queue processing code
 985                            myself->_check_queue->signal();
 986                
 987 h.sterling 1.1         } //time_wait
 988                
 989                
 990                    } //shutdown
 991                
 992                    PEG_METHOD_EXIT();
 993                    return 0;
 994                }
 995                
 996                
 997                PEGASUS_NAMESPACE_END
 998                
 999                
1000                

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2