1 karl 1.12 //%2006////////////////////////////////////////////////////////////////////////
|
2 h.sterling 1.1 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 h.sterling 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Heather Sterling (hsterl@us.ibm.com)
33 //
34 h.sterling 1.1 // Modified By:
35 //
36 //%/////////////////////////////////////////////////////////////////////////////
37
38 #include <Pegasus/Common/Config.h>
39 //#include <cstdlib>
40 //#include <dlfcn.h>
41 #include <Pegasus/Common/System.h>
42 #include <Pegasus/Common/FileSystem.h>
43 #include <Pegasus/Common/Tracer.h>
44 #include <Pegasus/Common/Logger.h>
45 #include <Pegasus/Common/XmlReader.h>
46 #include <Pegasus/Common/XmlParser.h>
47 #include <Pegasus/Common/XmlWriter.h>
48 #include <Pegasus/Common/IPC.h>
49
50 #include "ConsumerManager.h"
51
52 PEGASUS_NAMESPACE_BEGIN
53 PEGASUS_USING_STD;
54
55 h.sterling 1.1 //ATTN: Can we just use a properties file instead?? If we only have one property, we may want to just parse it ourselves.
56 // We may need to add more properties, however. Potential per consumer properties: unloadOk, idleTimout, retryCount, etc
57 static struct OptionRow optionsTable[] =
58 //optionname defaultvalue rqd type domain domainsize clname hlpmsg
59 {
60 {"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"},
61 };
62
63 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
64
65 //retry settings
66 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
|
67 h.sterling 1.6 static const Uint32 DEFAULT_RETRY_LAPSE = 300000; //ms = 5 minutes
|
68 h.sterling 1.1
|
69 h.sterling 1.11 //constant for fake property that is added to instances when serializing to track the full URL
70 static const String URL_PROPERTY = "URLString";
|
71 h.sterling 1.1
72
73 ConsumerManager::ConsumerManager(const String& consumerDir, const String& consumerConfigDir, Boolean enableConsumerUnload, Uint32 idleTimeout) :
74 _consumerDir(consumerDir),
75 _consumerConfigDir(consumerConfigDir),
76 _enableConsumerUnload(enableConsumerUnload),
77 _idleTimeout(idleTimeout),
78 _forceShutdown(true)
79 {
80 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
81
82 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer library directory: " + consumerDir);
83 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer configuration directory: " + consumerConfigDir);
84
85 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
86 "Consumer unload enabled %d: idle timeout %d",
87 enableConsumerUnload,
88 idleTimeout);
89
90
|
91 h.sterling 1.8 //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
|
92 h.sterling 1.6 //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
93 h.sterling 1.1
|
94 kumpf 1.3 struct timeval deallocateWait = {15, 0};
95 _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
|
96 h.sterling 1.1
97 _init();
98
99 PEG_METHOD_EXIT();
100 }
101
102 ConsumerManager::~ConsumerManager()
103 {
104 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
105
106 unloadAllConsumers();
107
|
108 h.sterling 1.5 if (_thread_pool != NULL)
109 {
|
110 kumpf 1.3 delete _thread_pool;
|
111 h.sterling 1.5 }
|
112 h.sterling 1.1
113 ConsumerTable::Iterator i = _consumers.start();
114 for (; i!=0; i++)
115 {
116 DynamicConsumer* consumer = i.value();
117 delete consumer;
118 }
119
120 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
121
122 ModuleTable::Iterator j = _modules.start();
123 for (;j!=0;j++)
124 {
125 ConsumerModule* module = j.value();
126 delete module;
127 }
128
129 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
130
131 PEG_METHOD_EXIT();
132 }
133 h.sterling 1.1
134 void ConsumerManager::_init()
135 {
136 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
137
138 //check if there are any outstanding indications
139 Array<String> files;
140 Uint32 pos;
141 String consumerName;
142
143 if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
144 {
145 for (Uint32 i = 0; i < files.size(); i++)
146 {
147 pos = files[i].find(".dat");
148 if (pos != PEG_NOT_FOUND)
149 {
150 consumerName = files[i].subString(0, pos);
151
152 try
153 {
154 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!");
155 getConsumer(consumerName);
156
157 } catch (...)
158 {
159 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]);
160 }
161 }
162 }
163 }
164
165 PEG_METHOD_EXIT();
166 }
167
168 String ConsumerManager::getConsumerDir()
169 {
170 return _consumerDir;
171 }
172
173 String ConsumerManager::getConsumerConfigDir()
174 {
175 h.sterling 1.1 return _consumerConfigDir;
176 }
177
178 Boolean ConsumerManager::getEnableConsumerUnload()
179 {
180 return _enableConsumerUnload;
181 }
182
183 Uint32 ConsumerManager::getIdleTimeout()
184 {
185 return _idleTimeout;
186 }
187
188 /** Retrieves the library name associated with the consumer name. By default, the library name
189 * is the same as the consumer name. However, you may specify a different library name in a consumer
190 * configuration file. This file must be named "MyConsumer.txt" and contain the following:
191 * location="libraryName"
192 *
193 * The config file is optional and is generally only needed in cases where there are strict requirements
194 * on library naming.
195 *
196 h.sterling 1.1 * It is the responsibility of the caller to catch any exceptions thrown by this method.
197 */
198 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
199 {
200 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
201
202 //default library name is consumer name
203 String libraryName = consumerName;
204
205 //check whether an alternative library name was specified in an optional consumer config file
206 String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf"));
207 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile);
208
209 if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
210 {
211 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName);
212
213 try
214 {
|
215 h.sterling 1.6 //Bugzilla 3765 - Change this to use a member var when OptionManager has a reset option
|
216 h.sterling 1.8 OptionManager _optionMgr;
217 _optionMgr.registerOptions(optionsTable, NUM_OPTIONS); //comment this line out later
|
218 h.sterling 1.1 _optionMgr.mergeFile(configFile);
219 _optionMgr.checkRequiredOptions();
220
221 if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY))
222 {
223 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile);
224 libraryName = consumerName;
225 }
226
227 } catch (Exception & ex)
228 {
229 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE",
230 "Error reading $0: $1.",
231 configFile,
232 ex.getMessage()));
233 }
234 } else
235 {
236 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName);
237 }
238
239 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName);
240
241 PEG_METHOD_EXIT();
242 return libraryName;
243 }
244
245 /** Returns the DynamicConsumer for the consumerName. If it already exists, we return the one in the cache. If it
246 * DNE, we create it and initialize it, and add it to the table.
247 * @throws Exception if we cannot successfully create and initialize the consumer
248 */
249 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
250 {
251 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
252
253 DynamicConsumer* consumer = 0;
254 CIMIndicationConsumerProvider* consumerRef = 0;
255 Boolean cached = false;
256 Boolean entryExists = false;
257
258 AutoMutex lock(_consumerTableMutex);
259
260 h.sterling 1.1 if (_consumers.lookup(consumerName, consumer))
261 {
262 //why isn't this working??
263 entryExists = true;
264
265 if (consumer && consumer->isLoaded())
266 {
267 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName);
268 cached = true;
269 }
270 } else
271 {
272 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName);
273 consumer = new DynamicConsumer(consumerName);
274 //ATTN: The above is a memory leak if _initConsumer throws an exception
275 //need to delete it in that case
276 }
277
278 if (!cached)
279 {
280 _initConsumer(consumerName, consumer);
281 h.sterling 1.1
282 if (!entryExists)
283 {
284 _consumers.insert(consumerName, consumer);
285 }
286 }
287
288 consumer->updateIdleTimer();
289
290 PEG_METHOD_EXIT();
291 return consumer;
292 }
293
294 /** Initializes a DynamicConsumer.
295 * Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist.
296 * @throws Exception if the consumer cannot be initialized
297 */
298 void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer)
299 {
300 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
301
302 h.sterling 1.1 CIMIndicationConsumerProvider* base = 0;
303 ConsumerModule* module = 0;
304
305 //lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one)
306 String libraryName = _getConsumerLibraryName(consumerName);
307 module = _lookupModule(libraryName);
308
309 //build library path
310 String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName));
311 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath);
312
313 //load module
314 try
315 {
316 base = module->load(consumerName, libraryPath);
317 consumer->set(module, base);
318
319 } catch (Exception& ex)
320 {
321 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage());
322
323 h.sterling 1.1 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
324 "Cannot load module ($0:$1): Unknown exception.",
325 consumerName,
326 libraryName));
327 } catch (...)
328 {
329 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
330 "Cannot load module ($0:$1): Unknown exception.",
331 consumerName,
332 libraryName));
333 }
334
335 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName);
336
337 //initialize consumer
338 try
339 {
340 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " + consumerName);
341
342 consumer->initialize();
343
344 h.sterling 1.1 //ATTN: need to change this
345 Semaphore* semaphore = new Semaphore(0); //blocking
346
347 consumer->setShutdownSemaphore(semaphore);
348
349 //start the worker thread
|
350 konrad.r 1.7 if (_thread_pool->allocate_and_awaken(consumer,
|
351 h.sterling 1.1 _worker_routine,
|
352 konrad.r 1.7 semaphore) != PEGASUS_THREAD_OK)
|
353 h.sterling 1.8 {
354 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
355 "Not enough threads for consumer.");
|
356 konrad.r 1.7
|
357 h.sterling 1.8 Tracer::trace(TRC_LISTENER, Tracer::LEVEL2,
358 "Could not allocate thread for consumer.");
|
359 konrad.r 1.7
|
360 h.sterling 1.8 consumer->setShutdownSemaphore(0);
361 delete semaphore;
|
362 konrad.r 1.7 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
|
363 h.sterling 1.8 "Not enough threads for consumer worker routine."));
|
364 konrad.r 1.7 }
|
365 h.sterling 1.1
|
366 h.sterling 1.8 //wait until the listening thread has started. Otherwise, there is a miniscule chance that the first event will be enqueued
367 //before the consumer is waiting for it and the first indication after loading the consumer will be lost
368 consumer->waitForEventThread();
369
|
370 h.sterling 1.1 //load any outstanding requests
|
371 h.sterling 1.11 Array<IndicationDispatchEvent> outstandingIndications = _deserializeOutstandingIndications(consumerName);
|
372 h.sterling 1.1 if (outstandingIndications.size())
373 {
374 //the consumer will signal itself in _loadOustandingIndications
375 consumer->_loadOutstandingIndications(outstandingIndications);
376 }
377
378 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);
379
380 } catch (...)
381 {
382 module->unloadModule();
383 consumer->reset();
384 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
385 "Cannot initialize consumer ($0).",
386 consumerName));
387 }
388
389 PEG_METHOD_EXIT();
390 }
391
392
393 h.sterling 1.1 /** Returns the ConsumerModule with the given library name. If it already exists, we return the one in the cache. If it
394 * DNE, we create it and add it to the table.
395 * @throws Exception if we cannot successfully create and initialize the consumer
396 */
397 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
398 {
399 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
400
401 AutoMutex lock(_moduleTableMutex);
402
403 ConsumerModule* module = 0;
404
405 //see if consumer module is cached
406 if (_modules.lookup(libraryName, module))
407 {
408 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
409 "Found Consumer Module" + libraryName + " in Consumer Manager Cache");
410
411 } else
412 {
413 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
414 h.sterling 1.1 "Creating Consumer Provider Module " + libraryName);
415
416 module = new ConsumerModule();
417 _modules.insert(libraryName, module);
418 }
419
420 PEG_METHOD_EXIT();
421 return(module);
422 }
423
424 /** Returns true if there are active consumers
425 */
426 Boolean ConsumerManager::hasActiveConsumers()
427 {
428 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
429
430 AutoMutex lock(_consumerTableMutex);
431 DynamicConsumer* consumer = 0;
432
433 try
434 {
435 h.sterling 1.1 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
436 {
437 consumer = i.value();
438
439 if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))
440 {
441 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);
442 PEG_METHOD_EXIT();
443 return true;
444 }
445 }
446 } catch (...)
447 {
448 // Unexpected exception; do not assume that no providers are loaded
449 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");
450 PEG_METHOD_EXIT();
451 return true;
452 }
453
454 PEG_METHOD_EXIT();
455 return false;
456 h.sterling 1.1 }
457
458 /** Returns true if there are loaded consumers
459 */
460 Boolean ConsumerManager::hasLoadedConsumers()
461 {
462 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
463
464 AutoMutex lock(_consumerTableMutex);
465 DynamicConsumer* consumer = 0;
466
467 try
468 {
469 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
470 {
471 consumer = i.value();
472
473 if (consumer && consumer->isLoaded())
474 {
475 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);
476 PEG_METHOD_EXIT();
477 h.sterling 1.1 return true;
478 }
479 }
480 } catch (...)
481 {
482 // Unexpected exception; do not assume that no providers are loaded
483 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");
484 PEG_METHOD_EXIT();
485 return true;
486 }
487
488 PEG_METHOD_EXIT();
489 return false;
490 }
491
492
493 /** Shutting down a consumer consists of four major steps:
494 * 1) Send the shutdown signal. This causes the worker routine to break out of the loop and exit.
495 * 2) Wait for the worker thread to end. This may take a while if it's processing an indication. This
496 * is optional in a shutdown scenario. If the listener is shutdown with a -f force, the listener
497 * will not wait for the consumer to finish before shutting down. Note that a normal shutdown only allows
498 h.sterling 1.1 * the current consumer indication to finish. All other queued indications are serialized to a log and
499 * are sent when the consumer is reoaded.
500 * 3) Terminate the consumer provider interface.
501 * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0)
502 *
503 * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all
504 * of the consumers so the threads can finish simultaneously.
505 *
506 * ATTN: Should the normal shutdown wait for everything in the queue to be processed? Just new indications
507 * to be processed? I am not inclined to this solution since it could take a LOT of time. By serializing
508 * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL
509 * queued indications on shutdown.
510 */
511
512 /** Unloads all consumers.
513 */
514 void ConsumerManager::unloadAllConsumers()
515 {
516 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
517
518 AutoMutex lock(_consumerTableMutex);
519 h.sterling 1.1
520 if (!_consumers.size())
521 {
522 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
523 PEG_METHOD_EXIT();
524 return;
525 }
526
527 if (!_forceShutdown)
528 {
529 //wait until all the consumers have finished processing the events in their queue
530 //ATTN: Should this have a timeout even though it's a force??
531 while (hasActiveConsumers())
532 {
533 pegasus_sleep(500);
534 }
535 }
536
537 Array<DynamicConsumer*> loadedConsumers;
538
539 ConsumerTable::Iterator i = _consumers.start();
540 h.sterling 1.1 DynamicConsumer* consumer = 0;
541
542 for (; i!=0; i++)
543 {
544 consumer = i.value();
545 if (consumer && consumer->isLoaded())
546 {
547 loadedConsumers.append(consumer);
548 }
549 }
550
551 if (loadedConsumers.size())
552 {
553 try
554 {
555 _unloadConsumers(loadedConsumers);
556
557 } catch (Exception& ex)
558 {
559 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
560 }
561 h.sterling 1.1 } else
562 {
563 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
564 }
565
566 PEG_METHOD_EXIT();
567 }
568
569 /** Unloads idle consumers.
570 */
571 void ConsumerManager::unloadIdleConsumers()
572 {
573 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
574
575 AutoMutex lock(_consumerTableMutex);
576
577 if (!_consumers.size())
578 {
579 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
580 PEG_METHOD_EXIT();
581 return;
582 h.sterling 1.1 }
583
584 Array<DynamicConsumer*> loadedConsumers;
585
586 ConsumerTable::Iterator i = _consumers.start();
587 DynamicConsumer* consumer = 0;
588
589 for (; i!=0; i++)
590 {
591 consumer = i.value();
592 if (consumer && consumer->isLoaded() && consumer->isIdle())
593 {
594 loadedConsumers.append(consumer);
595 }
596 }
597
598 if (loadedConsumers.size())
599 {
600 try
601 {
602 _unloadConsumers(loadedConsumers);
603 h.sterling 1.1
604 } catch (Exception& ex)
605 {
606 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
607 }
608 } else
609 {
610 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
611 }
612
613 PEG_METHOD_EXIT();
614 }
615
616 /** Unloads a single consumer.
617 */
618 void ConsumerManager::unloadConsumer(const String& consumerName)
619 {
620 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
621
622 AutoMutex lock(_consumerTableMutex);
623
624 h.sterling 1.1 DynamicConsumer* consumer = 0;
625
626 //check whether the consumer exists
627 if (!_consumers.lookup(consumerName, consumer))
628 {
629 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);
630 return;
631 }
632
633 //check whether the consumer is loaded
634 if (consumer && consumer->isLoaded()) //ATTN: forceShutdown?
635 {
636 //unload the consumer
637 Array<DynamicConsumer*> loadedConsumers;
638 loadedConsumers.append(consumer);
639
640 try
641 {
642 _unloadConsumers(loadedConsumers);
643
644 } catch (Exception& ex)
645 h.sterling 1.1 {
646 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
647 }
648
649 } else
650 {
651 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);
652 }
653
654 PEG_METHOD_EXIT();
655 }
656
657 /** Unloads the consumers in the given array.
658 * The consumerTable mutex MUST be locked prior to entering this method.
659 */
660 void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload)
661 {
662 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
663
664 //tell consumers to shutdown
665 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
666 h.sterling 1.1 {
667 consumersToUnload[i]->sendShutdownSignal();
668 }
669
670 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");
671
672 //wait for all the consumer worker threads to complete
673 //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last
674 //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop
675 //processing its requests.
676 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
677 {
678 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());
679
680 //wait for the consumer worker thread to end
681 try
682 {
683 Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();
684 if (_shutdownSemaphore)
685 {
686 _shutdownSemaphore->time_wait(10000);
687 h.sterling 1.1 }
688
689 } catch (TimeOut &)
690 {
691 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");
692 }
693
694 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");
695
696 try
697 {
698 //terminate consumer provider interface
699 consumersToUnload[i]->terminate();
700
701 //unload consumer provider module
702 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
703 consumersToUnload[i]->_module->unloadModule();
704
705 //serialize outstanding indications
706 _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());
707
708 h.sterling 1.1 //reset the consumer
709 consumersToUnload[i]->reset();
710
711 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");
712
713 } catch (Exception& e)
714 {
715 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage());
716 //ATTN: throw exception? log warning?
717 }
718 }
719
720 PEG_METHOD_EXIT();
721 }
722
723 /** Serializes oustanding indications to a <MyConsumer>.dat file
724 */
|
725 h.sterling 1.11 void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<IndicationDispatchEvent> indications)
|
726 h.sterling 1.1 {
727 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");
728
729 if (!indications.size())
730 {
731 PEG_METHOD_EXIT();
732 return;
733 }
734
735 String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
736 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
737
|
738 mike 1.9 Buffer buffer;
|
739 h.sterling 1.1
740 // Open the log file and serialize remaining
741 FILE* fileHandle = 0;
742 fileHandle = fopen((const char*)fileName.getCString(), "w");
743
744 if (!fileHandle)
745 {
746 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);
747
748 } else
749 {
750 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
751 "Serializing %d outstanding requests for %s",
752 indications.size(),
753 (const char*)consumerName.getCString());
754
755 //we have to put the array of instances under a valid root element or the parser complains
756 XmlWriter::append(buffer, "<IRETURNVALUE>\n");
757
|
758 h.sterling 1.11 CIMInstance cimInstance;
|
759 h.sterling 1.1 for (Uint32 i = 0; i < indications.size(); i++)
760 {
|
761 h.sterling 1.11 //set the URL string property on the serializable instance
762 CIMValue cimValue(CIMTYPE_STRING, false);
763 cimValue.set(indications[i].getURL());
764 cimInstance = indications[i].getIndicationInstance();
765 CIMProperty cimProperty(URL_PROPERTY, cimValue);
766 cimInstance.addProperty(cimProperty);
767
768 XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
769 }
|
770 h.sterling 1.1
771 XmlWriter::append(buffer, "</IRETURNVALUE>\0");
772
773 fputs((const char*)buffer.getData(), fileHandle);
774
775 fclose(fileHandle);
776 }
777
778 PEG_METHOD_EXIT();
779 }
780
781 /** Reads outstanding indications from a <MyConsumer>.dat file
782 */
|
783 h.sterling 1.11 Array<IndicationDispatchEvent> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName)
|
784 h.sterling 1.1 {
785 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");
786
787 String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
788 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
789
790 Array<CIMInstance> cimInstances;
|
791 h.sterling 1.11 Array<String> urlStrings;
792 Array<IndicationDispatchEvent> indications;
|
793 h.sterling 1.1
794 // Open the log file and serialize remaining indications
795 if (FileSystem::exists(fileName) && FileSystem::canRead(fileName))
796 {
|
797 mike 1.9 Buffer text;
|
798 h.sterling 1.1 CIMInstance cimInstance;
|
799 h.sterling 1.11 CIMProperty cimProperty;
800 CIMValue cimValue;
801 String urlString;
|
802 h.sterling 1.1 XmlEntry entry;
803
804 try
805 {
806 FileSystem::loadFileToMemory(text, fileName); //ATTN: Is this safe to use; what about CRLFs?
807 text.append('\0');
808
809 //parse the file
810 XmlParser parser((char*)text.getData());
811 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
812
813 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
814 {
|
815 h.sterling 1.11 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
816 if (index != PEG_NOT_FOUND)
817 {
818 //get the URL string property from the serialized instance and remove the property
819 cimProperty = cimInstance.getProperty(index);
820 cimValue = cimProperty.getValue();
821 cimValue.get(urlString);
822 cimInstance.removeProperty(index);
823 }
824 IndicationDispatchEvent* indicationEvent = new IndicationDispatchEvent(OperationContext(), urlString, cimInstance);
825 indications.append(*indicationEvent);
|
826 h.sterling 1.1 }
827
828 XmlReader::expectEndTag(parser, "IRETURNVALUE");
829
830 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
831 "Consumer %s has %d outstanding indications",
832 (const char*)consumerName.getCString(),
|
833 h.sterling 1.11 indications.size());
|
834 h.sterling 1.1
835 //delete the file
836 FileSystem::removeFile(fileName);
837
838 } catch (Exception& ex)
839 {
840 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);
841
842 } catch (...)
843 {
844 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);
845 }
846 }
847
848 PEG_METHOD_EXIT();
|
849 h.sterling 1.11 return indications;
|
850 h.sterling 1.1 }
851
852
853
854 /**
855 * This is the main worker thread of the consumer. By having only one thread per consumer, we eliminate a ton
856 * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive
857 * operations at once. This also prevents one bad consumer from taking the entire listener down. That being said,
858 * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread.
859 *
860 * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to
861 * complete, it may make sense to make the consumer multi-threaded. The individual consumer can immediately spawn off
862 * new threads to handle indications, and return immediately to catch the next indication. In this way, a consumer
863 * can attain extremely high performance.
864 *
865 * There are three different events that can signal us:
866 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
867 * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state)
868 * 3) A retry signal (signalled from this routine itself)
869 *
870 * The idea is that all new indications are put on the front of the queue and processed first. All of the retry
871 h.sterling 1.1 * indications are put on the back of the queue and are only processed AFTER all new indications are sent.
872 * Before processing each indication, we check to see whether or not the shutdown signal was given. If so,
873 * we immediately break out of the loop, and another compenent serializes the remaining indications to a file.
874 *
875 * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception.
876 *
|
877 h.sterling 1.5 * This function makes sure it waits until the default retry lapse has passed to avoid issues with the following scenario:
878 * 20 new indications come in, 10 of them are successful, 10 are not.
|
879 h.sterling 1.1 * We were signalled 20 times, so we will pass the time_wait 20 times. Perceivably, the process time on each indication
880 * could be minimal. We could potentially proceed to process the retries after a very small time interval since
|
881 h.sterling 1.5 * we would never hit the wait for the retry timeout.
|
882 h.sterling 1.1 *
883 */
884 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param)
885 {
886 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
887
888 DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
889 String name = myself->getName();
890 DQueue<IndicationDispatchEvent> tmpEventQueue(true);
891
892 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);
893
|
894 h.sterling 1.8 myself->_listeningSemaphore->signal();
895
|
896 h.sterling 1.1 while (true)
897 {
898 try
899 {
900 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name);
901
902 //wait to be signalled
903 myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
904
905 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name);
906
907 //check whether we received the shutdown signal
908 if (myself->_dieNow)
909 {
910 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name);
911 break;
912 }
913
914 //create a temporary queue to store failed indications
915 tmpEventQueue.empty_list();
916
917 h.sterling 1.1 //continue processing events until the queue is empty
918 //make sure to check for the shutdown signal before every iteration
|
919 h.sterling 1.10 // Note that any time during our processing of events the Listener may be enqueueing NEW events for us to process.
920 // Because we are popping off the front and new events are being thrown on the back if events are failing when we start
921 // But are succeeding by the end of the processing, events may be sent out of chronological order.
922 // However. Once we complete the current queue of events, we will always send old events to be retried before sending any
923 // new events added afterwards.
|
924 h.sterling 1.1 while (myself->_eventqueue.size())
925 {
926 //check for shutdown signal
927 //this only breaks us out of the queue loop, but we will immediately get through the next wait from
928 //the shutdown signal itself, at which time we break out of the main loop
929 if (myself->_dieNow)
930 {
931 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name);
932 break;
933 }
934
935 //pop next indication off the queue
936 IndicationDispatchEvent* event = 0;
937 event = myself->_eventqueue.remove_first(); //what exceptions/errors can this throw?
938
939 if (!event)
940 {
941 //this should never happen
942 continue;
943 }
944
945 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);
946
947 try
948 {
949 myself->consumeIndication(event->getContext(),
950 event->getURL(),
951 event->getIndicationInstance());
952
953 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name);
954
955 delete event;
956 continue;
957
958 } catch (CIMException & ce)
959 {
960 //check for failure
961 if (ce.getCode() == CIM_ERR_FAILED)
962 {
963 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);
|
964 h.sterling 1.10
965 // Here we simply determine if we should increment the retry count or not.
966 // We don't want to count a forced retry from a new event to count as a retry. We just have to do it for
967 // order's sake. If the retry Lapse has lapsed on this event then increment the counter.
968 if (event->getRetries() > 0) {
969 Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), CIMDateTime::getCurrentDateTime());
970 if (differenceInMicroseconds >= (DEFAULT_RETRY_LAPSE * 1000))
971 event->increaseRetries();
972 } else {
973 event->increaseRetries();
974 }
|
975 h.sterling 1.1
976 //determine if we have hit the max retry count
977 if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
978 {
979 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2,
980 "Error: the maximum retry count has been exceeded. Removing the event from the queue.");
981
982 Logger::put(
983 Logger::ERROR_LOG,
984 "ConsumerManager",
985 Logger::SEVERE,
986 "The following indication did not get processed successfully: $0",
987 event->getIndicationInstance().getPath().toString());
988
989 delete event;
990 continue;
991
992 } else
993 {
994 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue");
995 tmpEventQueue.insert_last(event);
996 h.sterling 1.1 }
997
998 } else
999 {
1000 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage());
1001 delete event;
1002 continue;
1003 }
1004
1005 } catch (Exception & ex)
1006 {
1007 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage());
1008 delete event;
1009 continue;
1010
1011 } catch (...)
1012 {
1013 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception.");
1014 delete event;
1015 continue;
1016 } //end try
1017 h.sterling 1.1
1018 } //while eventqueue
1019
|
1020 h.sterling 1.10 // Copy the failed indications back to the main queue
1021 // We now lock the queue while adding the retries on to the queue so that new events can't get in in front
1022 // Of those events we are retrying. Retried events happened before any new events coming in.
|
1023 h.sterling 1.1 IndicationDispatchEvent* tmpEvent = 0;
|
1024 h.sterling 1.10 myself->_eventqueue.try_lock();
|
1025 h.sterling 1.1 while (tmpEventQueue.size())
1026 {
|
1027 h.sterling 1.10 //there is no = operator for DQueue so we must do it manually
|
1028 h.sterling 1.1 tmpEvent = tmpEventQueue.remove_first();
|
1029 h.sterling 1.10 myself->_eventqueue.insert_last_no_lock(tmpEvent);
1030
|
1031 h.sterling 1.1 }
|
1032 h.sterling 1.10 myself->_eventqueue.unlock();
|
1033 h.sterling 1.1
1034 } catch (TimeOut& te)
1035 {
|
1036 h.sterling 1.10 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::Time to retry any outstanding indications.");
|
1037 h.sterling 1.1
1038 //signal the queue in the same way we would if we received a new indication
1039 //this allows the thread to fall into the queue processing code
1040 myself->_check_queue->signal();
1041
1042 } //time_wait
1043
1044
1045 } //shutdown
1046
1047 PEG_METHOD_EXIT();
1048 return 0;
1049 }
1050
1051
1052 PEGASUS_NAMESPACE_END
1053
1054
1055
|