1 h.sterling 1.1 //%2005////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
11 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
13 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
16 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
18 //
19 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
20 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
22 h.sterling 1.1 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
25 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Heather Sterling (hsterl@us.ibm.com)
31 //
32 // Modified By:
33 //
34 //%/////////////////////////////////////////////////////////////////////////////
35
36 #include <Pegasus/Common/Config.h>
37 //#include <cstdlib>
38 //#include <dlfcn.h>
39 #include <Pegasus/Common/System.h>
40 #include <Pegasus/Common/FileSystem.h>
41 #include <Pegasus/Common/Tracer.h>
42 #include <Pegasus/Common/Logger.h>
43 h.sterling 1.1 #include <Pegasus/Common/XmlReader.h>
44 #include <Pegasus/Common/XmlParser.h>
45 #include <Pegasus/Common/XmlWriter.h>
46 #include <Pegasus/Common/IPC.h>
47
48 #include "ConsumerManager.h"
49
50 PEGASUS_NAMESPACE_BEGIN
51 PEGASUS_USING_STD;
52
53 //ATTN: Can we just use a properties file instead?? If we only have one property, we may want to just parse it ourselves.
54 // We may need to add more properties, however. Potential per consumer properties: unloadOk, idleTimout, retryCount, etc
55 static struct OptionRow optionsTable[] =
56 //optionname defaultvalue rqd type domain domainsize clname hlpmsg
57 {
58 {"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"},
59 };
60
61 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
62
63 //retry settings
64 h.sterling 1.1 //ATTN: Do we want to make these configurable? If so, is a global setting for all the consumers ok?
65 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
|
108 h.sterling 1.1
109 ConsumerTable::Iterator i = _consumers.start();
110 for (; i!=0; i++)
111 {
112 DynamicConsumer* consumer = i.value();
113 delete consumer;
114 }
115
116 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
117
118 ModuleTable::Iterator j = _modules.start();
119 for (;j!=0;j++)
120 {
121 ConsumerModule* module = j.value();
122 delete module;
123 }
124
125 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
126
127 PEG_METHOD_EXIT();
128 }
129 h.sterling 1.1
130 void ConsumerManager::_init()
131 {
132 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
133
134 //check if there are any outstanding indications
135 Array<String> files;
136 Uint32 pos;
137 String consumerName;
138
139 if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
140 {
141 for (Uint32 i = 0; i < files.size(); i++)
142 {
143 pos = files[i].find(".dat");
144 if (pos != PEG_NOT_FOUND)
145 {
146 consumerName = files[i].subString(0, pos);
147
148 try
149 {
150 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!");
151 getConsumer(consumerName);
152
153 } catch (...)
154 {
155 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]);
156 }
157 }
158 }
159 }
160
161 PEG_METHOD_EXIT();
162 }
163
164 String ConsumerManager::getConsumerDir()
165 {
166 return _consumerDir;
167 }
168
169 String ConsumerManager::getConsumerConfigDir()
170 {
171 h.sterling 1.1 return _consumerConfigDir;
172 }
173
174 Boolean ConsumerManager::getEnableConsumerUnload()
175 {
176 return _enableConsumerUnload;
177 }
178
179 Uint32 ConsumerManager::getIdleTimeout()
180 {
181 return _idleTimeout;
182 }
183
184 /** Retrieves the library name associated with the consumer name. By default, the library name
185 * is the same as the consumer name. However, you may specify a different library name in a consumer
186 * configuration file. This file must be named "MyConsumer.txt" and contain the following:
187 * location="libraryName"
188 *
189 * The config file is optional and is generally only needed in cases where there are strict requirements
190 * on library naming.
191 *
192 h.sterling 1.1 * It is the responsibility of the caller to catch any exceptions thrown by this method.
193 */
194 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
195 {
196 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
197
198 //default library name is consumer name
199 String libraryName = consumerName;
200
201 //check whether an alternative library name was specified in an optional consumer config file
202 String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf"));
203 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile);
204
205 if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
206 {
207 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName);
208
209 try
210 {
211 //ATTN: Does the OptionManager need to be reset? There's no method for it.
212 _optionMgr.mergeFile(configFile);
213 h.sterling 1.1 _optionMgr.checkRequiredOptions();
214
215 if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY))
216 {
217 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile);
218 libraryName = consumerName;
219 }
220
221 } catch (Exception & ex)
222 {
223 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE",
224 "Error reading $0: $1.",
225 configFile,
226 ex.getMessage()));
227 }
228 } else
229 {
230 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName);
231 }
232
233 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName);
234 h.sterling 1.1
235 PEG_METHOD_EXIT();
236 return libraryName;
237 }
238
239 /** Returns the DynamicConsumer for the consumerName. If it already exists, we return the one in the cache. If it
240 * DNE, we create it and initialize it, and add it to the table.
241 * @throws Exception if we cannot successfully create and initialize the consumer
242 */
243 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
244 {
245 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
246
247 DynamicConsumer* consumer = 0;
248 CIMIndicationConsumerProvider* consumerRef = 0;
249 Boolean cached = false;
250 Boolean entryExists = false;
251
252 AutoMutex lock(_consumerTableMutex);
253
254 if (_consumers.lookup(consumerName, consumer))
255 h.sterling 1.1 {
256 //why isn't this working??
257 entryExists = true;
258
259 if (consumer && consumer->isLoaded())
260 {
261 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName);
262 cached = true;
263 }
264 } else
265 {
266 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName);
267 consumer = new DynamicConsumer(consumerName);
268 //ATTN: The above is a memory leak if _initConsumer throws an exception
269 //need to delete it in that case
270 }
271
272 if (!cached)
273 {
274 _initConsumer(consumerName, consumer);
275
276 h.sterling 1.1 if (!entryExists)
277 {
278 _consumers.insert(consumerName, consumer);
279 }
280 }
281
282 consumer->updateIdleTimer();
283
284 PEG_METHOD_EXIT();
285 return consumer;
286 }
287
288 /** Initializes a DynamicConsumer.
289 * Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist.
290 * @throws Exception if the consumer cannot be initialized
291 */
292 void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer)
293 {
294 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
295
296 CIMIndicationConsumerProvider* base = 0;
297 h.sterling 1.1 ConsumerModule* module = 0;
298
299 //lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one)
300 String libraryName = _getConsumerLibraryName(consumerName);
301 module = _lookupModule(libraryName);
302
303 //build library path
304 String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName));
305 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath);
306
307 //load module
308 try
309 {
310 base = module->load(consumerName, libraryPath);
311 consumer->set(module, base);
312
313 } catch (Exception& ex)
314 {
315 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage());
316
317 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
318 h.sterling 1.1 "Cannot load module ($0:$1): Unknown exception.",
319 consumerName,
320 libraryName));
321 } catch (...)
322 {
323 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
324 "Cannot load module ($0:$1): Unknown exception.",
325 consumerName,
326 libraryName));
327 }
328
329 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName);
330
331 //initialize consumer
332 try
333 {
334 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " + consumerName);
335
336 consumer->initialize();
337
338 //ATTN: need to change this
339 h.sterling 1.1 Semaphore* semaphore = new Semaphore(0); //blocking
340
341 consumer->setShutdownSemaphore(semaphore);
342
343 //start the worker thread
344 _thread_pool->allocate_and_awaken(consumer,
345 _worker_routine,
346 semaphore);
347
348 //load any outstanding requests
349 Array<CIMInstance> outstandingIndications = _deserializeOutstandingIndications(consumerName);
350 if (outstandingIndications.size())
351 {
352 //the consumer will signal itself in _loadOustandingIndications
353 consumer->_loadOutstandingIndications(outstandingIndications);
354 }
355
356 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);
357
358 } catch (...)
359 {
360 h.sterling 1.1 module->unloadModule();
361 consumer->reset();
362 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
363 "Cannot initialize consumer ($0).",
364 consumerName));
365 }
366
367 PEG_METHOD_EXIT();
368 }
369
370
371 /** Returns the ConsumerModule with the given library name. If it already exists, we return the one in the cache. If it
372 * DNE, we create it and add it to the table.
373 * @throws Exception if we cannot successfully create and initialize the consumer
374 */
375 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
376 {
377 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
378
379 AutoMutex lock(_moduleTableMutex);
380
381 h.sterling 1.1 ConsumerModule* module = 0;
382
383 //see if consumer module is cached
384 if (_modules.lookup(libraryName, module))
385 {
386 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
387 "Found Consumer Module" + libraryName + " in Consumer Manager Cache");
388
389 } else
390 {
391 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
392 "Creating Consumer Provider Module " + libraryName);
393
394 module = new ConsumerModule();
395 _modules.insert(libraryName, module);
396 }
397
398 PEG_METHOD_EXIT();
399 return(module);
400 }
401
402 h.sterling 1.1 /** Returns true if there are active consumers
403 */
404 Boolean ConsumerManager::hasActiveConsumers()
405 {
406 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
407
408 AutoMutex lock(_consumerTableMutex);
409 DynamicConsumer* consumer = 0;
410
411 try
412 {
413 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
414 {
415 consumer = i.value();
416
417 if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))
418 {
419 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);
420 PEG_METHOD_EXIT();
421 return true;
422 }
423 h.sterling 1.1 }
424 } catch (...)
425 {
426 // Unexpected exception; do not assume that no providers are loaded
427 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");
428 PEG_METHOD_EXIT();
429 return true;
430 }
431
432 PEG_METHOD_EXIT();
433 return false;
434 }
435
436 /** Returns true if there are loaded consumers
437 */
438 Boolean ConsumerManager::hasLoadedConsumers()
439 {
440 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
441
442 AutoMutex lock(_consumerTableMutex);
443 DynamicConsumer* consumer = 0;
444 h.sterling 1.1
445 try
446 {
447 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
448 {
449 consumer = i.value();
450
451 if (consumer && consumer->isLoaded())
452 {
453 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);
454 PEG_METHOD_EXIT();
455 return true;
456 }
457 }
458 } catch (...)
459 {
460 // Unexpected exception; do not assume that no providers are loaded
461 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");
462 PEG_METHOD_EXIT();
463 return true;
464 }
465 h.sterling 1.1
466 PEG_METHOD_EXIT();
467 return false;
468 }
469
470
471 /** Shutting down a consumer consists of four major steps:
472 * 1) Send the shutdown signal. This causes the worker routine to break out of the loop and exit.
473 * 2) Wait for the worker thread to end. This may take a while if it's processing an indication. This
474 * is optional in a shutdown scenario. If the listener is shutdown with a -f force, the listener
475 * will not wait for the consumer to finish before shutting down. Note that a normal shutdown only allows
476 * the current consumer indication to finish. All other queued indications are serialized to a log and
477 * are sent when the consumer is reoaded.
478 * 3) Terminate the consumer provider interface.
479 * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0)
480 *
481 * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all
482 * of the consumers so the threads can finish simultaneously.
483 *
484 * ATTN: Should the normal shutdown wait for everything in the queue to be processed? Just new indications
485 * to be processed? I am not inclined to this solution since it could take a LOT of time. By serializing
486 h.sterling 1.1 * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL
487 * queued indications on shutdown.
488 */
489
490 /** Unloads all consumers.
491 */
492 void ConsumerManager::unloadAllConsumers()
493 {
494 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
495
496 AutoMutex lock(_consumerTableMutex);
497
498 if (!_consumers.size())
499 {
500 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
501 PEG_METHOD_EXIT();
502 return;
503 }
504
505 if (!_forceShutdown)
506 {
507 h.sterling 1.1 //wait until all the consumers have finished processing the events in their queue
508 //ATTN: Should this have a timeout even though it's a force??
509 while (hasActiveConsumers())
510 {
511 pegasus_sleep(500);
512 }
513 }
514
515 Array<DynamicConsumer*> loadedConsumers;
516
517 ConsumerTable::Iterator i = _consumers.start();
518 DynamicConsumer* consumer = 0;
519
520 for (; i!=0; i++)
521 {
522 consumer = i.value();
523 if (consumer && consumer->isLoaded())
524 {
525 loadedConsumers.append(consumer);
526 }
527 }
528 h.sterling 1.1
529 if (loadedConsumers.size())
530 {
531 try
532 {
533 _unloadConsumers(loadedConsumers);
534
535 } catch (Exception& ex)
536 {
537 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
538 }
539 } else
540 {
541 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
542 }
543
544 PEG_METHOD_EXIT();
545 }
546
547 /** Unloads idle consumers.
548 */
549 h.sterling 1.1 void ConsumerManager::unloadIdleConsumers()
550 {
551 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
552
553 AutoMutex lock(_consumerTableMutex);
554
555 if (!_consumers.size())
556 {
557 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
558 PEG_METHOD_EXIT();
559 return;
560 }
561
562 Array<DynamicConsumer*> loadedConsumers;
563
564 ConsumerTable::Iterator i = _consumers.start();
565 DynamicConsumer* consumer = 0;
566
567 for (; i!=0; i++)
568 {
569 consumer = i.value();
570 h.sterling 1.1 if (consumer && consumer->isLoaded() && consumer->isIdle())
571 {
572 loadedConsumers.append(consumer);
573 }
574 }
575
576 if (loadedConsumers.size())
577 {
578 try
579 {
580 _unloadConsumers(loadedConsumers);
581
582 } catch (Exception& ex)
583 {
584 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
585 }
586 } else
587 {
588 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
589 }
590
591 h.sterling 1.1 PEG_METHOD_EXIT();
592 }
593
594 /** Unloads a single consumer.
595 */
596 void ConsumerManager::unloadConsumer(const String& consumerName)
597 {
598 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
599
600 AutoMutex lock(_consumerTableMutex);
601
602 DynamicConsumer* consumer = 0;
603
604 //check whether the consumer exists
605 if (!_consumers.lookup(consumerName, consumer))
606 {
607 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);
608 return;
609 }
610
611 //check whether the consumer is loaded
612 h.sterling 1.1 if (consumer && consumer->isLoaded()) //ATTN: forceShutdown?
613 {
614 //unload the consumer
615 Array<DynamicConsumer*> loadedConsumers;
616 loadedConsumers.append(consumer);
617
618 try
619 {
620 _unloadConsumers(loadedConsumers);
621
622 } catch (Exception& ex)
623 {
624 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
625 }
626
627 } else
628 {
629 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);
630 }
631
632 PEG_METHOD_EXIT();
633 h.sterling 1.1 }
634
635 /** Unloads the consumers in the given array.
636 * The consumerTable mutex MUST be locked prior to entering this method.
637 */
638 void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload)
639 {
640 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
641
642 //tell consumers to shutdown
643 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
644 {
645 consumersToUnload[i]->sendShutdownSignal();
646 }
647
648 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");
649
650 //wait for all the consumer worker threads to complete
651 //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last
652 //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop
653 //processing its requests.
654 h.sterling 1.1 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
655 {
656 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());
657
658 //wait for the consumer worker thread to end
659 try
660 {
661 Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();
662 if (_shutdownSemaphore)
663 {
664 _shutdownSemaphore->time_wait(10000);
665 }
666
667 } catch (TimeOut &)
668 {
669 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");
670 }
671
672 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");
673
674 try
675 h.sterling 1.1 {
676 //terminate consumer provider interface
677 consumersToUnload[i]->terminate();
678
679 //unload consumer provider module
680 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
681 consumersToUnload[i]->_module->unloadModule();
682
683 //serialize outstanding indications
684 _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());
685
686 //reset the consumer
687 consumersToUnload[i]->reset();
688
689 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");
690
691 } catch (Exception& e)
692 {
693 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage());
694 //ATTN: throw exception? log warning?
695 }
696 h.sterling 1.1 }
697
698 PEG_METHOD_EXIT();
699 }
700
701 /** Serializes oustanding indications to a <MyConsumer>.dat file
702 */
703 void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<CIMInstance> indications)
704 {
705 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");
706
707 if (!indications.size())
708 {
709 PEG_METHOD_EXIT();
710 return;
711 }
712
713 String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
714 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
715
716 Array<char> buffer;
717 h.sterling 1.1
718 // Open the log file and serialize remaining
719 FILE* fileHandle = 0;
720 fileHandle = fopen((const char*)fileName.getCString(), "w");
721
722 if (!fileHandle)
723 {
724 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);
725
726 } else
727 {
728 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
729 "Serializing %d outstanding requests for %s",
730 indications.size(),
731 (const char*)consumerName.getCString());
732
733 //we have to put the array of instances under a valid root element or the parser complains
734 XmlWriter::append(buffer, "<IRETURNVALUE>\n");
735
736 for (Uint32 i = 0; i < indications.size(); i++)
737 {
738 h.sterling 1.1 XmlWriter::appendValueNamedInstanceElement(buffer, indications[i]);
739 }
740
741 XmlWriter::append(buffer, "</IRETURNVALUE>\0");
742
743 fputs((const char*)buffer.getData(), fileHandle);
744
745 fclose(fileHandle);
746 }
747
748 PEG_METHOD_EXIT();
749 }
750
751 /** Reads outstanding indications from a <MyConsumer>.dat file
752 */
753 Array<CIMInstance> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName)
754 {
755 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");
756
757 String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
758 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
759 h.sterling 1.1
760 Array<CIMInstance> cimInstances;
761
762 // Open the log file and serialize remaining indications
763 if (FileSystem::exists(fileName) && FileSystem::canRead(fileName))
764 {
765 Array<char> text;
766 CIMInstance cimInstance;
767 XmlEntry entry;
768
769 try
770 {
771 FileSystem::loadFileToMemory(text, fileName); //ATTN: Is this safe to use; what about CRLFs?
772 text.append('\0');
773
774 //parse the file
775 XmlParser parser((char*)text.getData());
776 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
777
778 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
779 {
780 h.sterling 1.1 cimInstances.append(cimInstance);
781 }
782
783 XmlReader::expectEndTag(parser, "IRETURNVALUE");
784
785 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
786 "Consumer %s has %d outstanding indications",
787 (const char*)consumerName.getCString(),
788 cimInstances.size());
789
790 //delete the file
791 FileSystem::removeFile(fileName);
792
793 } catch (Exception& ex)
794 {
795 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);
796
797 } catch (...)
798 {
799 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);
800 }
801 h.sterling 1.1 }
802
803 PEG_METHOD_EXIT();
804 return cimInstances;
805 }
806
807
808
809 /**
810 * This is the main worker thread of the consumer. By having only one thread per consumer, we eliminate a ton
811 * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive
812 * operations at once. This also prevents one bad consumer from taking the entire listener down. That being said,
813 * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread.
814 *
815 * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to
816 * complete, it may make sense to make the consumer multi-threaded. The individual consumer can immediately spawn off
817 * new threads to handle indications, and return immediately to catch the next indication. In this way, a consumer
818 * can attain extremely high performance.
819 *
820 * There are three different events that can signal us:
821 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
822 h.sterling 1.1 * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state)
823 * 3) A retry signal (signalled from this routine itself)
824 *
825 * The idea is that all new indications are put on the front of the queue and processed first. All of the retry
826 * indications are put on the back of the queue and are only processed AFTER all new indications are sent.
827 * Before processing each indication, we check to see whether or not the shutdown signal was given. If so,
828 * we immediately break out of the loop, and another compenent serializes the remaining indications to a file.
829 *
830 * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception.
831 *
|