1 h.sterling 1.1 //%2005////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
11 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
13 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
16 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
18 //
19 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
20 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
22 h.sterling 1.1 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
25 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Heather Sterling (hsterl@us.ibm.com)
31 //
32 // Modified By:
33 //
34 //%/////////////////////////////////////////////////////////////////////////////
35
36 #include <Pegasus/Common/Config.h>
37 //#include <cstdlib>
38 //#include <dlfcn.h>
39 #include <Pegasus/Common/System.h>
40 #include <Pegasus/Common/FileSystem.h>
41 #include <Pegasus/Common/Tracer.h>
42 #include <Pegasus/Common/Logger.h>
43 h.sterling 1.1 #include <Pegasus/Common/XmlReader.h>
44 #include <Pegasus/Common/XmlParser.h>
45 #include <Pegasus/Common/XmlWriter.h>
46 #include <Pegasus/Common/IPC.h>
47
48 #include "ConsumerManager.h"
49
50 PEGASUS_NAMESPACE_BEGIN
51 PEGASUS_USING_STD;
52
53 //ATTN: Can we just use a properties file instead?? If we only have one property, we may want to just parse it ourselves.
54 // We may need to add more properties, however. Potential per consumer properties: unloadOk, idleTimout, retryCount, etc
55 static struct OptionRow optionsTable[] =
56 //optionname defaultvalue rqd type domain domainsize clname hlpmsg
57 {
58 {"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"},
59 };
60
61 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
62
63 //retry settings
64 h.sterling 1.1 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
|
65 h.sterling 1.6 static const Uint32 DEFAULT_RETRY_LAPSE = 300000; //ms = 5 minutes
|
66 h.sterling 1.1
67
68
69 ConsumerManager::ConsumerManager(const String& consumerDir, const String& consumerConfigDir, Boolean enableConsumerUnload, Uint32 idleTimeout) :
70 _consumerDir(consumerDir),
71 _consumerConfigDir(consumerConfigDir),
72 _enableConsumerUnload(enableConsumerUnload),
73 _idleTimeout(idleTimeout),
74 _forceShutdown(true)
75 {
76 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
77
78 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer library directory: " + consumerDir);
79 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer configuration directory: " + consumerConfigDir);
80
81 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
82 "Consumer unload enabled %d: idle timeout %d",
83 enableConsumerUnload,
84 idleTimeout);
85
86
|
87 h.sterling 1.8 //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
|
88 h.sterling 1.6 //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
89 h.sterling 1.1
|
90 kumpf 1.3 struct timeval deallocateWait = {15, 0};
91 _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
|
92 h.sterling 1.1
93 _init();
94
95 PEG_METHOD_EXIT();
96 }
97
98 ConsumerManager::~ConsumerManager()
99 {
100 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
101
102 unloadAllConsumers();
103
|
104 h.sterling 1.5 if (_thread_pool != NULL)
105 {
|
106 kumpf 1.3 delete _thread_pool;
|
107 h.sterling 1.5 }
|
108 h.sterling 1.1
109 ConsumerTable::Iterator i = _consumers.start();
110 for (; i!=0; i++)
111 {
112 DynamicConsumer* consumer = i.value();
113 delete consumer;
114 }
115
116 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
117
118 ModuleTable::Iterator j = _modules.start();
119 for (;j!=0;j++)
120 {
121 ConsumerModule* module = j.value();
122 delete module;
123 }
124
125 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
126
127 PEG_METHOD_EXIT();
128 }
129 h.sterling 1.1
130 void ConsumerManager::_init()
131 {
132 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
133
134 //check if there are any outstanding indications
135 Array<String> files;
136 Uint32 pos;
137 String consumerName;
138
139 if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
140 {
141 for (Uint32 i = 0; i < files.size(); i++)
142 {
143 pos = files[i].find(".dat");
144 if (pos != PEG_NOT_FOUND)
145 {
146 consumerName = files[i].subString(0, pos);
147
148 try
149 {
150 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!");
151 getConsumer(consumerName);
152
153 } catch (...)
154 {
155 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]);
156 }
157 }
158 }
159 }
160
161 PEG_METHOD_EXIT();
162 }
163
164 String ConsumerManager::getConsumerDir()
165 {
166 return _consumerDir;
167 }
168
169 String ConsumerManager::getConsumerConfigDir()
170 {
171 h.sterling 1.1 return _consumerConfigDir;
172 }
173
174 Boolean ConsumerManager::getEnableConsumerUnload()
175 {
176 return _enableConsumerUnload;
177 }
178
179 Uint32 ConsumerManager::getIdleTimeout()
180 {
181 return _idleTimeout;
182 }
183
184 /** Retrieves the library name associated with the consumer name. By default, the library name
185 * is the same as the consumer name. However, you may specify a different library name in a consumer
186 * configuration file. This file must be named "MyConsumer.txt" and contain the following:
187 * location="libraryName"
188 *
189 * The config file is optional and is generally only needed in cases where there are strict requirements
190 * on library naming.
191 *
192 h.sterling 1.1 * It is the responsibility of the caller to catch any exceptions thrown by this method.
193 */
194 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
195 {
196 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
197
198 //default library name is consumer name
199 String libraryName = consumerName;
200
201 //check whether an alternative library name was specified in an optional consumer config file
202 String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf"));
203 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile);
204
205 if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
206 {
207 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName);
208
209 try
210 {
|
211 h.sterling 1.6 //Bugzilla 3765 - Change this to use a member var when OptionManager has a reset option
|
212 h.sterling 1.8 OptionManager _optionMgr;
213 _optionMgr.registerOptions(optionsTable, NUM_OPTIONS); //comment this line out later
|
214 h.sterling 1.1 _optionMgr.mergeFile(configFile);
215 _optionMgr.checkRequiredOptions();
216
217 if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY))
218 {
219 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile);
220 libraryName = consumerName;
221 }
222
223 } catch (Exception & ex)
224 {
225 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE",
226 "Error reading $0: $1.",
227 configFile,
228 ex.getMessage()));
229 }
230 } else
231 {
232 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName);
233 }
234
235 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName);
236
237 PEG_METHOD_EXIT();
238 return libraryName;
239 }
240
241 /** Returns the DynamicConsumer for the consumerName. If it already exists, we return the one in the cache. If it
242 * DNE, we create it and initialize it, and add it to the table.
243 * @throws Exception if we cannot successfully create and initialize the consumer
244 */
245 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
246 {
247 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
248
249 DynamicConsumer* consumer = 0;
250 CIMIndicationConsumerProvider* consumerRef = 0;
251 Boolean cached = false;
252 Boolean entryExists = false;
253
254 AutoMutex lock(_consumerTableMutex);
255
256 h.sterling 1.1 if (_consumers.lookup(consumerName, consumer))
257 {
258 //why isn't this working??
259 entryExists = true;
260
261 if (consumer && consumer->isLoaded())
262 {
263 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName);
264 cached = true;
265 }
266 } else
267 {
268 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName);
269 consumer = new DynamicConsumer(consumerName);
270 //ATTN: The above is a memory leak if _initConsumer throws an exception
271 //need to delete it in that case
272 }
273
274 if (!cached)
275 {
276 _initConsumer(consumerName, consumer);
277 h.sterling 1.1
278 if (!entryExists)
279 {
280 _consumers.insert(consumerName, consumer);
281 }
282 }
283
284 consumer->updateIdleTimer();
285
286 PEG_METHOD_EXIT();
287 return consumer;
288 }
289
290 /** Initializes a DynamicConsumer.
291 * Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist.
292 * @throws Exception if the consumer cannot be initialized
293 */
294 void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer)
295 {
296 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
297
298 h.sterling 1.1 CIMIndicationConsumerProvider* base = 0;
299 ConsumerModule* module = 0;
300
301 //lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one)
302 String libraryName = _getConsumerLibraryName(consumerName);
303 module = _lookupModule(libraryName);
304
305 //build library path
306 String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName));
307 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath);
308
309 //load module
310 try
311 {
312 base = module->load(consumerName, libraryPath);
313 consumer->set(module, base);
314
315 } catch (Exception& ex)
316 {
317 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage());
318
319 h.sterling 1.1 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
320 "Cannot load module ($0:$1): Unknown exception.",
321 consumerName,
322 libraryName));
323 } catch (...)
324 {
325 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
326 "Cannot load module ($0:$1): Unknown exception.",
327 consumerName,
328 libraryName));
329 }
330
331 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName);
332
333 //initialize consumer
334 try
335 {
336 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " + consumerName);
337
338 consumer->initialize();
339
340 h.sterling 1.1 //ATTN: need to change this
341 Semaphore* semaphore = new Semaphore(0); //blocking
342
343 consumer->setShutdownSemaphore(semaphore);
344
345 //start the worker thread
|
346 konrad.r 1.7 if (_thread_pool->allocate_and_awaken(consumer,
|
347 h.sterling 1.1 _worker_routine,
|
348 konrad.r 1.7 semaphore) != PEGASUS_THREAD_OK)
|
349 h.sterling 1.8 {
350 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
351 "Not enough threads for consumer.");
|
352 konrad.r 1.7
|
353 h.sterling 1.8 Tracer::trace(TRC_LISTENER, Tracer::LEVEL2,
354 "Could not allocate thread for consumer.");
|
355 konrad.r 1.7
|
356 h.sterling 1.8 consumer->setShutdownSemaphore(0);
357 delete semaphore;
|
358 konrad.r 1.7 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
|
359 h.sterling 1.8 "Not enough threads for consumer worker routine."));
|
360 konrad.r 1.7 }
|
361 h.sterling 1.1
|
362 h.sterling 1.8 //wait until the listening thread has started. Otherwise, there is a miniscule chance that the first event will be enqueued
363 //before the consumer is waiting for it and the first indication after loading the consumer will be lost
364 consumer->waitForEventThread();
365
|
366 h.sterling 1.1 //load any outstanding requests
367 Array<CIMInstance> outstandingIndications = _deserializeOutstandingIndications(consumerName);
368 if (outstandingIndications.size())
369 {
370 //the consumer will signal itself in _loadOustandingIndications
371 consumer->_loadOutstandingIndications(outstandingIndications);
372 }
373
374 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);
375
376 } catch (...)
377 {
378 module->unloadModule();
379 consumer->reset();
380 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
381 "Cannot initialize consumer ($0).",
382 consumerName));
383 }
384
385 PEG_METHOD_EXIT();
386 }
387 h.sterling 1.1
388
389 /** Returns the ConsumerModule with the given library name. If it already exists, we return the one in the cache. If it
390 * DNE, we create it and add it to the table.
391 * @throws Exception if we cannot successfully create and initialize the consumer
392 */
393 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
394 {
395 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
396
397 AutoMutex lock(_moduleTableMutex);
398
399 ConsumerModule* module = 0;
400
401 //see if consumer module is cached
402 if (_modules.lookup(libraryName, module))
403 {
404 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
405 "Found Consumer Module" + libraryName + " in Consumer Manager Cache");
406
407 } else
408 h.sterling 1.1 {
409 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
410 "Creating Consumer Provider Module " + libraryName);
411
412 module = new ConsumerModule();
413 _modules.insert(libraryName, module);
414 }
415
416 PEG_METHOD_EXIT();
417 return(module);
418 }
419
420 /** Returns true if there are active consumers
421 */
422 Boolean ConsumerManager::hasActiveConsumers()
423 {
424 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
425
426 AutoMutex lock(_consumerTableMutex);
427 DynamicConsumer* consumer = 0;
428
429 h.sterling 1.1 try
430 {
431 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
432 {
433 consumer = i.value();
434
435 if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))
436 {
437 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);
438 PEG_METHOD_EXIT();
439 return true;
440 }
441 }
442 } catch (...)
443 {
444 // Unexpected exception; do not assume that no providers are loaded
445 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");
446 PEG_METHOD_EXIT();
447 return true;
448 }
449
450 h.sterling 1.1 PEG_METHOD_EXIT();
451 return false;
452 }
453
454 /** Returns true if there are loaded consumers
455 */
456 Boolean ConsumerManager::hasLoadedConsumers()
457 {
458 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
459
460 AutoMutex lock(_consumerTableMutex);
461 DynamicConsumer* consumer = 0;
462
463 try
464 {
465 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
466 {
467 consumer = i.value();
468
469 if (consumer && consumer->isLoaded())
470 {
471 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);
472 PEG_METHOD_EXIT();
473 return true;
474 }
475 }
476 } catch (...)
477 {
478 // Unexpected exception; do not assume that no providers are loaded
479 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");
480 PEG_METHOD_EXIT();
481 return true;
482 }
483
484 PEG_METHOD_EXIT();
485 return false;
486 }
487
488
489 /** Shutting down a consumer consists of four major steps:
490 * 1) Send the shutdown signal. This causes the worker routine to break out of the loop and exit.
491 * 2) Wait for the worker thread to end. This may take a while if it's processing an indication. This
492 h.sterling 1.1 * is optional in a shutdown scenario. If the listener is shutdown with a -f force, the listener
493 * will not wait for the consumer to finish before shutting down. Note that a normal shutdown only allows
494 * the current consumer indication to finish. All other queued indications are serialized to a log and
495 * are sent when the consumer is reoaded.
496 * 3) Terminate the consumer provider interface.
497 * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0)
498 *
499 * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all
500 * of the consumers so the threads can finish simultaneously.
501 *
502 * ATTN: Should the normal shutdown wait for everything in the queue to be processed? Just new indications
503 * to be processed? I am not inclined to this solution since it could take a LOT of time. By serializing
504 * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL
505 * queued indications on shutdown.
506 */
507
508 /** Unloads all consumers.
509 */
510 void ConsumerManager::unloadAllConsumers()
511 {
512 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
513 h.sterling 1.1
514 AutoMutex lock(_consumerTableMutex);
515
516 if (!_consumers.size())
517 {
518 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
519 PEG_METHOD_EXIT();
520 return;
521 }
522
523 if (!_forceShutdown)
524 {
525 //wait until all the consumers have finished processing the events in their queue
526 //ATTN: Should this have a timeout even though it's a force??
527 while (hasActiveConsumers())
528 {
529 pegasus_sleep(500);
530 }
531 }
532
533 Array<DynamicConsumer*> loadedConsumers;
534 h.sterling 1.1
535 ConsumerTable::Iterator i = _consumers.start();
536 DynamicConsumer* consumer = 0;
537
538 for (; i!=0; i++)
539 {
540 consumer = i.value();
541 if (consumer && consumer->isLoaded())
542 {
543 loadedConsumers.append(consumer);
544 }
545 }
546
547 if (loadedConsumers.size())
548 {
549 try
550 {
551 _unloadConsumers(loadedConsumers);
552
553 } catch (Exception& ex)
554 {
555 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
556 }
557 } else
558 {
559 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
560 }
561
562 PEG_METHOD_EXIT();
563 }
564
565 /** Unloads idle consumers.
566 */
567 void ConsumerManager::unloadIdleConsumers()
568 {
569 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
570
571 AutoMutex lock(_consumerTableMutex);
572
573 if (!_consumers.size())
574 {
575 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
576 h.sterling 1.1 PEG_METHOD_EXIT();
577 return;
578 }
579
580 Array<DynamicConsumer*> loadedConsumers;
581
582 ConsumerTable::Iterator i = _consumers.start();
583 DynamicConsumer* consumer = 0;
584
585 for (; i!=0; i++)
586 {
587 consumer = i.value();
588 if (consumer && consumer->isLoaded() && consumer->isIdle())
589 {
590 loadedConsumers.append(consumer);
591 }
592 }
593
594 if (loadedConsumers.size())
595 {
596 try
597 h.sterling 1.1 {
598 _unloadConsumers(loadedConsumers);
599
600 } catch (Exception& ex)
601 {
602 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
603 }
604 } else
605 {
606 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
607 }
608
609 PEG_METHOD_EXIT();
610 }
611
612 /** Unloads a single consumer.
613 */
614 void ConsumerManager::unloadConsumer(const String& consumerName)
615 {
616 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
617
618 h.sterling 1.1 AutoMutex lock(_consumerTableMutex);
619
620 DynamicConsumer* consumer = 0;
621
622 //check whether the consumer exists
623 if (!_consumers.lookup(consumerName, consumer))
624 {
625 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);
626 return;
627 }
628
629 //check whether the consumer is loaded
630 if (consumer && consumer->isLoaded()) //ATTN: forceShutdown?
631 {
632 //unload the consumer
633 Array<DynamicConsumer*> loadedConsumers;
634 loadedConsumers.append(consumer);
635
636 try
637 {
638 _unloadConsumers(loadedConsumers);
639 h.sterling 1.1
640 } catch (Exception& ex)
641 {
642 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
643 }
644
645 } else
646 {
647 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);
648 }
649
650 PEG_METHOD_EXIT();
651 }
652
653 /** Unloads the consumers in the given array.
654 * The consumerTable mutex MUST be locked prior to entering this method.
655 */
656 void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload)
657 {
658 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
659
660 h.sterling 1.1 //tell consumers to shutdown
661 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
662 {
663 consumersToUnload[i]->sendShutdownSignal();
664 }
665
666 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");
667
668 //wait for all the consumer worker threads to complete
669 //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last
670 //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop
671 //processing its requests.
672 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
673 {
674 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());
675
676 //wait for the consumer worker thread to end
677 try
678 {
679 Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();
680 if (_shutdownSemaphore)
681 h.sterling 1.1 {
682 _shutdownSemaphore->time_wait(10000);
683 }
684
685 } catch (TimeOut &)
686 {
687 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");
688 }
689
690 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");
691
692 try
693 {
694 //terminate consumer provider interface
695 consumersToUnload[i]->terminate();
696
697 //unload consumer provider module
698 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
699 consumersToUnload[i]->_module->unloadModule();
700
701 //serialize outstanding indications
702 h.sterling 1.1 _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());
703
704 //reset the consumer
705 consumersToUnload[i]->reset();
706
707 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");
708
709 } catch (Exception& e)
710 {
711 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage());
712 //ATTN: throw exception? log warning?
713 }
714 }
715
716 PEG_METHOD_EXIT();
717 }
718
719 /** Serializes oustanding indications to a <MyConsumer>.dat file
720 */
721 void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<CIMInstance> indications)
722 {
723 h.sterling 1.1 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");
724
725 if (!indications.size())
726 {
727 PEG_METHOD_EXIT();
728 return;
729 }
730
731 String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
732 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
733
734 Array<char> buffer;
735
736 // Open the log file and serialize remaining
737 FILE* fileHandle = 0;
738 fileHandle = fopen((const char*)fileName.getCString(), "w");
739
740 if (!fileHandle)
741 {
742 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);
743
744 h.sterling 1.1 } else
745 {
746 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
747 "Serializing %d outstanding requests for %s",
748 indications.size(),
749 (const char*)consumerName.getCString());
750
751 //we have to put the array of instances under a valid root element or the parser complains
752 XmlWriter::append(buffer, "<IRETURNVALUE>\n");
753
754 for (Uint32 i = 0; i < indications.size(); i++)
755 {
756 XmlWriter::appendValueNamedInstanceElement(buffer, indications[i]);
757 }
758
759 XmlWriter::append(buffer, "</IRETURNVALUE>\0");
760
761 fputs((const char*)buffer.getData(), fileHandle);
762
763 fclose(fileHandle);
764 }
765 h.sterling 1.1
766 PEG_METHOD_EXIT();
767 }
768
769 /** Reads outstanding indications from a <MyConsumer>.dat file
770 */
771 Array<CIMInstance> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName)
772 {
773 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");
774
775 String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
776 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
777
778 Array<CIMInstance> cimInstances;
779
780 // Open the log file and serialize remaining indications
781 if (FileSystem::exists(fileName) && FileSystem::canRead(fileName))
782 {
783 Array<char> text;
784 CIMInstance cimInstance;
785 XmlEntry entry;
786 h.sterling 1.1
787 try
788 {
789 FileSystem::loadFileToMemory(text, fileName); //ATTN: Is this safe to use; what about CRLFs?
790 text.append('\0');
791
792 //parse the file
793 XmlParser parser((char*)text.getData());
794 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
795
796 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
797 {
798 cimInstances.append(cimInstance);
799 }
800
801 XmlReader::expectEndTag(parser, "IRETURNVALUE");
802
803 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
804 "Consumer %s has %d outstanding indications",
805 (const char*)consumerName.getCString(),
806 cimInstances.size());
807 h.sterling 1.1
808 //delete the file
809 FileSystem::removeFile(fileName);
810
811 } catch (Exception& ex)
812 {
813 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);
814
815 } catch (...)
816 {
817 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);
818 }
819 }
820
821 PEG_METHOD_EXIT();
822 return cimInstances;
823 }
824
825
826
827 /**
828 h.sterling 1.1 * This is the main worker thread of the consumer. By having only one thread per consumer, we eliminate a ton
829 * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive
830 * operations at once. This also prevents one bad consumer from taking the entire listener down. That being said,
831 * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread.
832 *
833 * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to
834 * complete, it may make sense to make the consumer multi-threaded. The individual consumer can immediately spawn off
835 * new threads to handle indications, and return immediately to catch the next indication. In this way, a consumer
836 * can attain extremely high performance.
837 *
838 * There are three different events that can signal us:
839 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
840 * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state)
841 * 3) A retry signal (signalled from this routine itself)
842 *
843 * The idea is that all new indications are put on the front of the queue and processed first. All of the retry
844 * indications are put on the back of the queue and are only processed AFTER all new indications are sent.
845 * Before processing each indication, we check to see whether or not the shutdown signal was given. If so,
846 * we immediately break out of the loop, and another compenent serializes the remaining indications to a file.
847 *
848 * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception.
849 h.sterling 1.1 *
|
850 h.sterling 1.5 * This function makes sure it waits until the default retry lapse has passed to avoid issues with the following scenario:
851 * 20 new indications come in, 10 of them are successful, 10 are not.
|
852 h.sterling 1.1 * We were signalled 20 times, so we will pass the time_wait 20 times. Perceivably, the process time on each indication
853 * could be minimal. We could potentially proceed to process the retries after a very small time interval since
|
854 h.sterling 1.5 * we would never hit the wait for the retry timeout.
|
855 h.sterling 1.1 *
856 * ATTN: Outstanding issue with this strategy -- 20 new indications come in, 19 of them come in before the first one
857 * is processed. Because new indications are first in, first out, the 19 indications will be processed in reverse order.
|
858 h.sterling 1.5 * Is this a problem? Short answer - NO. They could arrive in reverse order anyways depending on the network stack.
|
859 h.sterling 1.1 *
860 */
861 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param)
862 {
863 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
864
865 DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
866 String name = myself->getName();
867 DQueue<IndicationDispatchEvent> tmpEventQueue(true);
868
869 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);
870
871 PEGASUS_STD(cout) << "Worker thread started for consumer : " << name << endl;
872
|
873 h.sterling 1.8 myself->_listeningSemaphore->signal();
874
|
875 h.sterling 1.1 while (true)
876 {
877 try
878 {
879 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name);
880
881 //wait to be signalled
882 myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
883
884 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name);
885
886 //check whether we received the shutdown signal
887 if (myself->_dieNow)
888 {
889 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name);
890 break;
891 }
892
893 //signal must have been due to an incoming event
894 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::indication received " + name);
895
896 h.sterling 1.1 //create a temporary queue to store failed indications
897 tmpEventQueue.empty_list();
898
899 //continue processing events until the queue is empty
900 //make sure to check for the shutdown signal before every iteration
901
902 while (myself->_eventqueue.size())
903 {
904 //check for shutdown signal
905 //this only breaks us out of the queue loop, but we will immediately get through the next wait from
906 //the shutdown signal itself, at which time we break out of the main loop
907 if (myself->_dieNow)
908 {
909 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name);
910 break;
911 }
912
913 //pop next indication off the queue
914 IndicationDispatchEvent* event = 0;
915 event = myself->_eventqueue.remove_first(); //what exceptions/errors can this throw?
916
917 h.sterling 1.1 if (!event)
918 {
919 //this should never happen
920 continue;
921 }
922
|
923 h.sterling 1.5 //check retry status. do not retry until the retry time has elapsed
924 if (event->getRetries() > 0)
925 {
926 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Last attempt time from event is " + event->getLastAttemptTime().toString());
927 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Current time is " + CIMDateTime::getCurrentDateTime().toString());
928 Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(),
929 CIMDateTime::getCurrentDateTime());
930
931 if (differenceInMicroseconds < (DEFAULT_RETRY_LAPSE * 1000))
932 {
933 //do not retry; just add to the retry queue
934 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::cannot retry event until retry time has elapsed");
935 tmpEventQueue.insert_last(event);
936 continue;
937 }
938 }
939
|
940 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);
941
942 try
943 {
944 myself->consumeIndication(event->getContext(),
945 event->getURL(),
946 event->getIndicationInstance());
947
948 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name);
949
950 delete event;
951 continue;
952
953 } catch (CIMException & ce)
954 {
955 //check for failure
956 if (ce.getCode() == CIM_ERR_FAILED)
957 {
958 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);
959
960 //update event parameters
961 h.sterling 1.1 event->increaseRetries();
962
963 //determine if we have hit the max retry count
964 if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
965 {
966 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2,
967 "Error: the maximum retry count has been exceeded. Removing the event from the queue.");
968
969 Logger::put(
970 Logger::ERROR_LOG,
971 "ConsumerManager",
972 Logger::SEVERE,
973 "The following indication did not get processed successfully: $0",
974 event->getIndicationInstance().getPath().toString());
975
976 delete event;
977 continue;
978
979 } else
980 {
981 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue");
982 h.sterling 1.1 tmpEventQueue.insert_last(event);
983 }
984
985 } else
986 {
987 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage());
988 delete event;
989 continue;
990 }
991
992 } catch (Exception & ex)
993 {
994 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage());
995 delete event;
996 continue;
997
998 } catch (...)
999 {
1000 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception.");
1001 delete event;
1002 continue;
1003 h.sterling 1.1 } //end try
1004
1005 } //while eventqueue
1006
1007 //copy the failed indications back to the main queue
1008 //since we are always adding the failed indications to the back, it should not interfere with the
1009 //dispatcher adding events to the front
1010
1011 //there is no = operator for DQueue so we must do it manually
1012 IndicationDispatchEvent* tmpEvent = 0;
1013 while (tmpEventQueue.size())
1014 {
1015 tmpEvent = tmpEventQueue.remove_first();
1016 myself->_eventqueue.insert_last(tmpEvent);
1017 }
1018
1019 } catch (TimeOut& te)
1020 {
1021 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::Time to retry any outstanding indications.");
1022
1023 //signal the queue in the same way we would if we received a new indication
1024 h.sterling 1.1 //this allows the thread to fall into the queue processing code
1025 myself->_check_queue->signal();
1026
1027 } //time_wait
1028
1029
1030 } //shutdown
1031
1032 PEG_METHOD_EXIT();
1033 return 0;
1034 }
1035
1036
1037 PEGASUS_NAMESPACE_END
1038
1039
1040
|