1 h.sterling 1.1 //%2005////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
11 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
13 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
16 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
18 //
19 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
20 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
22 h.sterling 1.1 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
25 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Heather Sterling (hsterl@us.ibm.com)
31 //
32 // Modified By:
33 //
34 //%/////////////////////////////////////////////////////////////////////////////
35
36 #include <Pegasus/Common/Config.h>
37 //#include <cstdlib>
38 //#include <dlfcn.h>
39 #include <Pegasus/Common/System.h>
40 #include <Pegasus/Common/FileSystem.h>
41 #include <Pegasus/Common/Tracer.h>
42 #include <Pegasus/Common/Logger.h>
43 h.sterling 1.1 #include <Pegasus/Common/XmlReader.h>
44 #include <Pegasus/Common/XmlParser.h>
45 #include <Pegasus/Common/XmlWriter.h>
46 #include <Pegasus/Common/IPC.h>
47
48 #include "ConsumerManager.h"
49
50 PEGASUS_NAMESPACE_BEGIN
51 PEGASUS_USING_STD;
52
53 //ATTN: Can we just use a properties file instead?? If we only have one property, we may want to just parse it ourselves.
54 // We may need to add more properties, however. Potential per consumer properties: unloadOk, idleTimout, retryCount, etc
55 static struct OptionRow optionsTable[] =
56 //optionname defaultvalue rqd type domain domainsize clname hlpmsg
57 {
58 {"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"},
59 };
60
61 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
62
63 //retry settings
64 h.sterling 1.1 //ATTN: Do we want to make these configurable? If so, is a global setting for all the consumers ok?
65 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
66 static const Uint32 DEFAULT_RETRY_LAPSE = 3000; //ms
67
68
69
70 ConsumerManager::ConsumerManager(const String& consumerDir, const String& consumerConfigDir, Boolean enableConsumerUnload, Uint32 idleTimeout) :
71 _consumerDir(consumerDir),
72 _consumerConfigDir(consumerConfigDir),
73 _enableConsumerUnload(enableConsumerUnload),
74 _idleTimeout(idleTimeout),
75 _forceShutdown(true)
76 {
77 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
78
79 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer library directory: " + consumerDir);
80 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer configuration directory: " + consumerConfigDir);
81
82 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
83 "Consumer unload enabled %d: idle timeout %d",
84 enableConsumerUnload,
85 h.sterling 1.1 idleTimeout);
86
87
88 _optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
89
|
105 h.sterling 1.1
106 ConsumerTable::Iterator i = _consumers.start();
107 for (; i!=0; i++)
108 {
109 DynamicConsumer* consumer = i.value();
110 delete consumer;
111 }
112
113 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
114
115 ModuleTable::Iterator j = _modules.start();
116 for (;j!=0;j++)
117 {
118 ConsumerModule* module = j.value();
119 delete module;
120 }
121
122 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
123
124 PEG_METHOD_EXIT();
125 }
126 h.sterling 1.1
127 void ConsumerManager::_init()
128 {
129 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
130
131 //check if there are any outstanding indications
132 Array<String> files;
133 Uint32 pos;
134 String consumerName;
135
136 if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
137 {
138 for (Uint32 i = 0; i < files.size(); i++)
139 {
140 pos = files[i].find(".dat");
141 if (pos != PEG_NOT_FOUND)
142 {
143 consumerName = files[i].subString(0, pos);
144
145 try
146 {
147 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!");
148 getConsumer(consumerName);
149
150 } catch (...)
151 {
152 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]);
153 }
154 }
155 }
156 }
157
158 PEG_METHOD_EXIT();
159 }
160
161 String ConsumerManager::getConsumerDir()
162 {
163 return _consumerDir;
164 }
165
166 String ConsumerManager::getConsumerConfigDir()
167 {
168 h.sterling 1.1 return _consumerConfigDir;
169 }
170
171 Boolean ConsumerManager::getEnableConsumerUnload()
172 {
173 return _enableConsumerUnload;
174 }
175
176 Uint32 ConsumerManager::getIdleTimeout()
177 {
178 return _idleTimeout;
179 }
180
181 /** Retrieves the library name associated with the consumer name. By default, the library name
182 * is the same as the consumer name. However, you may specify a different library name in a consumer
183 * configuration file. This file must be named "MyConsumer.txt" and contain the following:
184 * location="libraryName"
185 *
186 * The config file is optional and is generally only needed in cases where there are strict requirements
187 * on library naming.
188 *
189 h.sterling 1.1 * It is the responsibility of the caller to catch any exceptions thrown by this method.
190 */
191 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
192 {
193 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
194
195 //default library name is consumer name
196 String libraryName = consumerName;
197
198 //check whether an alternative library name was specified in an optional consumer config file
199 String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf"));
200 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile);
201
202 if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
203 {
204 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName);
205
206 try
207 {
208 //ATTN: Does the OptionManager need to be reset? There's no method for it.
209 _optionMgr.mergeFile(configFile);
210 h.sterling 1.1 _optionMgr.checkRequiredOptions();
211
212 if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY))
213 {
214 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile);
215 libraryName = consumerName;
216 }
217
218 } catch (Exception & ex)
219 {
220 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE",
221 "Error reading $0: $1.",
222 configFile,
223 ex.getMessage()));
224 }
225 } else
226 {
227 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName);
228 }
229
230 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName);
231 h.sterling 1.1
232 PEG_METHOD_EXIT();
233 return libraryName;
234 }
235
236 /** Returns the DynamicConsumer for the consumerName. If it already exists, we return the one in the cache. If it
237 * DNE, we create it and initialize it, and add it to the table.
238 * @throws Exception if we cannot successfully create and initialize the consumer
239 */
240 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
241 {
242 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
243
244 DynamicConsumer* consumer = 0;
245 CIMIndicationConsumerProvider* consumerRef = 0;
246 Boolean cached = false;
247 Boolean entryExists = false;
248
249 AutoMutex lock(_consumerTableMutex);
250
251 if (_consumers.lookup(consumerName, consumer))
252 h.sterling 1.1 {
253 //why isn't this working??
254 entryExists = true;
255
256 if (consumer && consumer->isLoaded())
257 {
258 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName);
259 cached = true;
260 }
261 } else
262 {
263 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName);
264 consumer = new DynamicConsumer(consumerName);
265 //ATTN: The above is a memory leak if _initConsumer throws an exception
266 //need to delete it in that case
267 }
268
269 if (!cached)
270 {
271 _initConsumer(consumerName, consumer);
272
273 h.sterling 1.1 if (!entryExists)
274 {
275 _consumers.insert(consumerName, consumer);
276 }
277 }
278
279 consumer->updateIdleTimer();
280
281 PEG_METHOD_EXIT();
282 return consumer;
283 }
284
285 /** Initializes a DynamicConsumer.
286 * Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist.
287 * @throws Exception if the consumer cannot be initialized
288 */
289 void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer)
290 {
291 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
292
293 CIMIndicationConsumerProvider* base = 0;
294 h.sterling 1.1 ConsumerModule* module = 0;
295
296 //lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one)
297 String libraryName = _getConsumerLibraryName(consumerName);
298 module = _lookupModule(libraryName);
299
300 //build library path
301 String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName));
302 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath);
303
304 //load module
305 try
306 {
307 base = module->load(consumerName, libraryPath);
308 consumer->set(module, base);
309
310 } catch (Exception& ex)
311 {
312 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage());
313
314 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
315 h.sterling 1.1 "Cannot load module ($0:$1): Unknown exception.",
316 consumerName,
317 libraryName));
318 } catch (...)
319 {
320 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
321 "Cannot load module ($0:$1): Unknown exception.",
322 consumerName,
323 libraryName));
324 }
325
326 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName);
327
328 //initialize consumer
329 try
330 {
331 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " + consumerName);
332
333 consumer->initialize();
334
335 //ATTN: need to change this
336 h.sterling 1.1 Semaphore* semaphore = new Semaphore(0); //blocking
337
338 consumer->setShutdownSemaphore(semaphore);
339
340 //start the worker thread
341 _thread_pool->allocate_and_awaken(consumer,
342 _worker_routine,
343 semaphore);
344
345 //load any outstanding requests
346 Array<CIMInstance> outstandingIndications = _deserializeOutstandingIndications(consumerName);
347 if (outstandingIndications.size())
348 {
349 //the consumer will signal itself in _loadOustandingIndications
350 consumer->_loadOutstandingIndications(outstandingIndications);
351 }
352
353 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);
354
355 } catch (...)
356 {
357 h.sterling 1.1 module->unloadModule();
358 consumer->reset();
359 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
360 "Cannot initialize consumer ($0).",
361 consumerName));
362 }
363
364 PEG_METHOD_EXIT();
365 }
366
367
368 /** Returns the ConsumerModule with the given library name. If it already exists, we return the one in the cache. If it
369 * DNE, we create it and add it to the table.
370 * @throws Exception if we cannot successfully create and initialize the consumer
371 */
372 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
373 {
374 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
375
376 AutoMutex lock(_moduleTableMutex);
377
378 h.sterling 1.1 ConsumerModule* module = 0;
379
380 //see if consumer module is cached
381 if (_modules.lookup(libraryName, module))
382 {
383 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
384 "Found Consumer Module" + libraryName + " in Consumer Manager Cache");
385
386 } else
387 {
388 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
389 "Creating Consumer Provider Module " + libraryName);
390
391 module = new ConsumerModule();
392 _modules.insert(libraryName, module);
393 }
394
395 PEG_METHOD_EXIT();
396 return(module);
397 }
398
399 h.sterling 1.1 /** Returns true if there are active consumers
400 */
401 Boolean ConsumerManager::hasActiveConsumers()
402 {
403 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
404
405 AutoMutex lock(_consumerTableMutex);
406 DynamicConsumer* consumer = 0;
407
408 try
409 {
410 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
411 {
412 consumer = i.value();
413
414 if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))
415 {
416 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);
417 PEG_METHOD_EXIT();
418 return true;
419 }
420 h.sterling 1.1 }
421 } catch (...)
422 {
423 // Unexpected exception; do not assume that no providers are loaded
424 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");
425 PEG_METHOD_EXIT();
426 return true;
427 }
428
429 PEG_METHOD_EXIT();
430 return false;
431 }
432
433 /** Returns true if there are loaded consumers
434 */
435 Boolean ConsumerManager::hasLoadedConsumers()
436 {
437 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
438
439 AutoMutex lock(_consumerTableMutex);
440 DynamicConsumer* consumer = 0;
441 h.sterling 1.1
442 try
443 {
444 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
445 {
446 consumer = i.value();
447
448 if (consumer && consumer->isLoaded())
449 {
450 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);
451 PEG_METHOD_EXIT();
452 return true;
453 }
454 }
455 } catch (...)
456 {
457 // Unexpected exception; do not assume that no providers are loaded
458 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");
459 PEG_METHOD_EXIT();
460 return true;
461 }
462 h.sterling 1.1
463 PEG_METHOD_EXIT();
464 return false;
465 }
466
467
468 /** Shutting down a consumer consists of four major steps:
469 * 1) Send the shutdown signal. This causes the worker routine to break out of the loop and exit.
470 * 2) Wait for the worker thread to end. This may take a while if it's processing an indication. This
471 * is optional in a shutdown scenario. If the listener is shutdown with a -f force, the listener
472 * will not wait for the consumer to finish before shutting down. Note that a normal shutdown only allows
473 * the current consumer indication to finish. All other queued indications are serialized to a log and
474 * are sent when the consumer is reoaded.
475 * 3) Terminate the consumer provider interface.
476 * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0)
477 *
478 * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all
479 * of the consumers so the threads can finish simultaneously.
480 *
481 * ATTN: Should the normal shutdown wait for everything in the queue to be processed? Just new indications
482 * to be processed? I am not inclined to this solution since it could take a LOT of time. By serializing
483 h.sterling 1.1 * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL
484 * queued indications on shutdown.
485 */
486
487 /** Unloads all consumers.
488 */
489 void ConsumerManager::unloadAllConsumers()
490 {
491 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
492
493 AutoMutex lock(_consumerTableMutex);
494
495 if (!_consumers.size())
496 {
497 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
498 PEG_METHOD_EXIT();
499 return;
500 }
501
502 if (!_forceShutdown)
503 {
504 h.sterling 1.1 //wait until all the consumers have finished processing the events in their queue
505 //ATTN: Should this have a timeout even though it's a force??
506 while (hasActiveConsumers())
507 {
508 pegasus_sleep(500);
509 }
510 }
511
512 Array<DynamicConsumer*> loadedConsumers;
513
514 ConsumerTable::Iterator i = _consumers.start();
515 DynamicConsumer* consumer = 0;
516
517 for (; i!=0; i++)
518 {
519 consumer = i.value();
520 if (consumer && consumer->isLoaded())
521 {
522 loadedConsumers.append(consumer);
523 }
524 }
525 h.sterling 1.1
526 if (loadedConsumers.size())
527 {
528 try
529 {
530 _unloadConsumers(loadedConsumers);
531
532 } catch (Exception& ex)
533 {
534 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
535 }
536 } else
537 {
538 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
539 }
540
541 PEG_METHOD_EXIT();
542 }
543
544 /** Unloads idle consumers.
545 */
546 h.sterling 1.1 void ConsumerManager::unloadIdleConsumers()
547 {
548 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
549
550 AutoMutex lock(_consumerTableMutex);
551
552 if (!_consumers.size())
553 {
554 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
555 PEG_METHOD_EXIT();
556 return;
557 }
558
559 Array<DynamicConsumer*> loadedConsumers;
560
561 ConsumerTable::Iterator i = _consumers.start();
562 DynamicConsumer* consumer = 0;
563
564 for (; i!=0; i++)
565 {
566 consumer = i.value();
567 h.sterling 1.1 if (consumer && consumer->isLoaded() && consumer->isIdle())
568 {
569 loadedConsumers.append(consumer);
570 }
571 }
572
573 if (loadedConsumers.size())
574 {
575 try
576 {
577 _unloadConsumers(loadedConsumers);
578
579 } catch (Exception& ex)
580 {
581 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
582 }
583 } else
584 {
585 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
586 }
587
588 h.sterling 1.1 PEG_METHOD_EXIT();
589 }
590
591 /** Unloads a single consumer.
592 */
593 void ConsumerManager::unloadConsumer(const String& consumerName)
594 {
595 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
596
597 AutoMutex lock(_consumerTableMutex);
598
599 DynamicConsumer* consumer = 0;
600
601 //check whether the consumer exists
602 if (!_consumers.lookup(consumerName, consumer))
603 {
604 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);
605 return;
606 }
607
608 //check whether the consumer is loaded
609 h.sterling 1.1 if (consumer && consumer->isLoaded()) //ATTN: forceShutdown?
610 {
611 //unload the consumer
612 Array<DynamicConsumer*> loadedConsumers;
613 loadedConsumers.append(consumer);
614
615 try
616 {
617 _unloadConsumers(loadedConsumers);
618
619 } catch (Exception& ex)
620 {
621 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
622 }
623
624 } else
625 {
626 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);
627 }
628
629 PEG_METHOD_EXIT();
630 h.sterling 1.1 }
631
632 /** Unloads the consumers in the given array.
633 * The consumerTable mutex MUST be locked prior to entering this method.
634 */
635 void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload)
636 {
637 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
638
639 //tell consumers to shutdown
640 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
641 {
642 consumersToUnload[i]->sendShutdownSignal();
643 }
644
645 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");
646
647 //wait for all the consumer worker threads to complete
648 //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last
649 //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop
650 //processing its requests.
651 h.sterling 1.1 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
652 {
653 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());
654
655 //wait for the consumer worker thread to end
656 try
657 {
658 Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();
659 if (_shutdownSemaphore)
660 {
661 _shutdownSemaphore->time_wait(10000);
662 }
663
664 } catch (TimeOut &)
665 {
666 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");
667 }
668
669 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");
670
671 try
672 h.sterling 1.1 {
673 //terminate consumer provider interface
674 consumersToUnload[i]->terminate();
675
676 //unload consumer provider module
677 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
678 consumersToUnload[i]->_module->unloadModule();
679
680 //serialize outstanding indications
681 _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());
682
683 //reset the consumer
684 consumersToUnload[i]->reset();
685
686 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");
687
688 } catch (Exception& e)
689 {
690 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage());
691 //ATTN: throw exception? log warning?
692 }
693 h.sterling 1.1 }
694
695 PEG_METHOD_EXIT();
696 }
697
698 /** Serializes oustanding indications to a <MyConsumer>.dat file
699 */
700 void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<CIMInstance> indications)
701 {
702 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");
703
704 if (!indications.size())
705 {
706 PEG_METHOD_EXIT();
707 return;
708 }
709
710 String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
711 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
712
713 Array<char> buffer;
714 h.sterling 1.1
715 // Open the log file and serialize remaining
716 FILE* fileHandle = 0;
717 fileHandle = fopen((const char*)fileName.getCString(), "w");
718
719 if (!fileHandle)
720 {
721 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);
722
723 } else
724 {
725 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
726 "Serializing %d outstanding requests for %s",
727 indications.size(),
728 (const char*)consumerName.getCString());
729
730 //we have to put the array of instances under a valid root element or the parser complains
731 XmlWriter::append(buffer, "<IRETURNVALUE>\n");
732
733 for (Uint32 i = 0; i < indications.size(); i++)
734 {
735 h.sterling 1.1 XmlWriter::appendValueNamedInstanceElement(buffer, indications[i]);
736 }
737
738 XmlWriter::append(buffer, "</IRETURNVALUE>\0");
739
740 fputs((const char*)buffer.getData(), fileHandle);
741
742 fclose(fileHandle);
743 }
744
745 PEG_METHOD_EXIT();
746 }
747
748 /** Reads outstanding indications from a <MyConsumer>.dat file
749 */
750 Array<CIMInstance> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName)
751 {
752 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");
753
754 String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
755 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
756 h.sterling 1.1
757 Array<CIMInstance> cimInstances;
758
759 // Open the log file and serialize remaining indications
760 if (FileSystem::exists(fileName) && FileSystem::canRead(fileName))
761 {
762 Array<char> text;
763 CIMInstance cimInstance;
764 XmlEntry entry;
765
766 try
767 {
768 FileSystem::loadFileToMemory(text, fileName); //ATTN: Is this safe to use; what about CRLFs?
769 text.append('\0');
770
771 //parse the file
772 XmlParser parser((char*)text.getData());
773 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
774
775 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
776 {
777 h.sterling 1.1 cimInstances.append(cimInstance);
778 }
779
780 XmlReader::expectEndTag(parser, "IRETURNVALUE");
781
782 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
783 "Consumer %s has %d outstanding indications",
784 (const char*)consumerName.getCString(),
785 cimInstances.size());
786
787 //delete the file
788 FileSystem::removeFile(fileName);
789
790 } catch (Exception& ex)
791 {
792 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);
793
794 } catch (...)
795 {
796 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);
797 }
798 h.sterling 1.1 }
799
800 PEG_METHOD_EXIT();
801 return cimInstances;
802 }
803
804
805
806 /**
807 * This is the main worker thread of the consumer. By having only one thread per consumer, we eliminate a ton
808 * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive
809 * operations at once. This also prevents one bad consumer from taking the entire listener down. That being said,
810 * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread.
811 *
812 * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to
813 * complete, it may make sense to make the consumer multi-threaded. The individual consumer can immediately spawn off
814 * new threads to handle indications, and return immediately to catch the next indication. In this way, a consumer
815 * can attain extremely high performance.
816 *
817 * There are three different events that can signal us:
818 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
819 h.sterling 1.1 * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state)
820 * 3) A retry signal (signalled from this routine itself)
821 *
822 * The idea is that all new indications are put on the front of the queue and processed first. All of the retry
823 * indications are put on the back of the queue and are only processed AFTER all new indications are sent.
824 * Before processing each indication, we check to see whether or not the shutdown signal was given. If so,
825 * we immediately break out of the loop, and another compenent serializes the remaining indications to a file.
826 *
827 * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception.
828 *
829 * ATTN: Outstanding issue with this strategy -- 20 new indications come in, 10 of them are successful, 10 are not
830 * We were signalled 20 times, so we will pass the time_wait 20 times. Perceivably, the process time on each indication
831 * could be minimal. We could potentially proceed to process the retries after a very small time interval since
832 * we would never hit the wait for the retry timeout. We could solve this by keeping track of the last retry
833 * attempt, and only retry if a certain length of time has passed. Otherwise, the logic of the loop needs to be changed.
834 *
835 * ATTN: Outstanding issue with this strategy -- 20 new indications come in, 19 of them come in before the first one
836 * is processed. Because new indications are first in, first out, the 19 indications will be processed in reverse order.
837 * Is this a problem?
838 *
839 */
840 h.sterling 1.1 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param)
841 {
842 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
843
844 DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
845 String name = myself->getName();
846 DQueue<IndicationDispatchEvent> tmpEventQueue(true);
847
848 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);
849
850 PEGASUS_STD(cout) << "Worker thread started for consumer : " << name << endl;
851
852 while (true)
853 {
854 try
855 {
856 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name);
857
858 //wait to be signalled
859 myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
860
861 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name);
862
863 //check whether we received the shutdown signal
864 if (myself->_dieNow)
865 {
866 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name);
867 break;
868 }
869
870 //signal must have been due to an incoming event
871 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::indication received " + name);
872
873 //create a temporary queue to store failed indications
874 tmpEventQueue.empty_list();
875
876 //continue processing events until the queue is empty
877 //make sure to check for the shutdown signal before every iteration
878
879 while (myself->_eventqueue.size())
880 {
881 //check for shutdown signal
882 h.sterling 1.1 //this only breaks us out of the queue loop, but we will immediately get through the next wait from
883 //the shutdown signal itself, at which time we break out of the main loop
884 if (myself->_dieNow)
885 {
886 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name);
887 break;
888 }
889
890 //pop next indication off the queue
891 IndicationDispatchEvent* event = 0;
892 event = myself->_eventqueue.remove_first(); //what exceptions/errors can this throw?
893
894 if (!event)
895 {
896 //this should never happen
897 continue;
898 }
899
900 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);
901
902 try
903 h.sterling 1.1 {
904 myself->consumeIndication(event->getContext(),
905 event->getURL(),
906 event->getIndicationInstance());
907
908 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name);
909
910 delete event;
911 continue;
912
913 } catch (CIMException & ce)
914 {
915 //check for failure
916 if (ce.getCode() == CIM_ERR_FAILED)
917 {
918 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);
919
920 //update event parameters
921 event->increaseRetries();
922
923 //determine if we have hit the max retry count
924 h.sterling 1.1 if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
925 {
926 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2,
927 "Error: the maximum retry count has been exceeded. Removing the event from the queue.");
928
929 Logger::put(
930 Logger::ERROR_LOG,
931 "ConsumerManager",
932 Logger::SEVERE,
933 "The following indication did not get processed successfully: $0",
934 event->getIndicationInstance().getPath().toString());
935
936 delete event;
937 continue;
938
939 } else
940 {
941 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue");
942 tmpEventQueue.insert_last(event);
943 }
944
945 h.sterling 1.1 } else
946 {
947 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage());
948 delete event;
949 continue;
950 }
951
952 } catch (Exception & ex)
953 {
954 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage());
955 delete event;
956 continue;
957
958 } catch (...)
959 {
960 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception.");
961 delete event;
962 continue;
963 } //end try
964
965 } //while eventqueue
966 h.sterling 1.1
967 //copy the failed indications back to the main queue
968 //since we are always adding the failed indications to the back, it should not interfere with the
969 //dispatcher adding events to the front
970
971 //there is no = operator for DQueue so we must do it manually
972 IndicationDispatchEvent* tmpEvent = 0;
973 while (tmpEventQueue.size())
974 {
975 tmpEvent = tmpEventQueue.remove_first();
976 myself->_eventqueue.insert_last(tmpEvent);
977 }
978
979 } catch (TimeOut& te)
980 {
981 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::Time to retry any outstanding indications.");
982
983 //signal the queue in the same way we would if we received a new indication
984 //this allows the thread to fall into the queue processing code
985 myself->_check_queue->signal();
986
987 h.sterling 1.1 } //time_wait
988
989
990 } //shutdown
991
992 PEG_METHOD_EXIT();
993 return 0;
994 }
995
996
997 PEGASUS_NAMESPACE_END
998
999
1000
|