1 karl 1.12 //%2006////////////////////////////////////////////////////////////////////////
|
2 h.sterling 1.1 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 h.sterling 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Heather Sterling (hsterl@us.ibm.com)
33 //
34 h.sterling 1.1 // Modified By:
35 //
36 //%/////////////////////////////////////////////////////////////////////////////
37
38 #include <Pegasus/Common/Config.h>
39 //#include <cstdlib>
40 //#include <dlfcn.h>
41 #include <Pegasus/Common/System.h>
42 #include <Pegasus/Common/FileSystem.h>
43 #include <Pegasus/Common/Tracer.h>
44 #include <Pegasus/Common/Logger.h>
45 #include <Pegasus/Common/XmlReader.h>
46 #include <Pegasus/Common/XmlParser.h>
47 #include <Pegasus/Common/XmlWriter.h>
48 #include <Pegasus/Common/IPC.h>
49
50 #include "ConsumerManager.h"
51
52 PEGASUS_NAMESPACE_BEGIN
53 PEGASUS_USING_STD;
54
55 h.sterling 1.1 //ATTN: Can we just use a properties file instead?? If we only have one property, we may want to just parse it ourselves.
56 // We may need to add more properties, however. Potential per consumer properties: unloadOk, idleTimout, retryCount, etc
57 static struct OptionRow optionsTable[] =
58 //optionname defaultvalue rqd type domain domainsize clname hlpmsg
59 {
60 {"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"},
61 };
62
63 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
64
65 //retry settings
66 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
|
67 h.sterling 1.6 static const Uint32 DEFAULT_RETRY_LAPSE = 300000; //ms = 5 minutes
|
68 h.sterling 1.1
|
69 h.sterling 1.11 //constant for fake property that is added to instances when serializing to track the full URL
70 static const String URL_PROPERTY = "URLString";
|
71 h.sterling 1.1
72
73 ConsumerManager::ConsumerManager(const String& consumerDir, const String& consumerConfigDir, Boolean enableConsumerUnload, Uint32 idleTimeout) :
74 _consumerDir(consumerDir),
75 _consumerConfigDir(consumerConfigDir),
76 _enableConsumerUnload(enableConsumerUnload),
77 _idleTimeout(idleTimeout),
78 _forceShutdown(true)
79 {
80 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
81
82 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer library directory: " + consumerDir);
83 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer configuration directory: " + consumerConfigDir);
84
85 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
86 "Consumer unload enabled %d: idle timeout %d",
87 enableConsumerUnload,
88 idleTimeout);
89
90
|
91 h.sterling 1.8 //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
|
92 h.sterling 1.6 //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
93 h.sterling 1.1
|
94 kumpf 1.3 struct timeval deallocateWait = {15, 0};
95 _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
|
96 h.sterling 1.1
97 _init();
98
99 PEG_METHOD_EXIT();
100 }
101
102 ConsumerManager::~ConsumerManager()
103 {
104 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
105
106 unloadAllConsumers();
107
|
108 kumpf 1.3 delete _thread_pool;
|
109 h.sterling 1.1
110 ConsumerTable::Iterator i = _consumers.start();
111 for (; i!=0; i++)
112 {
113 DynamicConsumer* consumer = i.value();
114 delete consumer;
115 }
116
117 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
118
119 ModuleTable::Iterator j = _modules.start();
120 for (;j!=0;j++)
121 {
122 ConsumerModule* module = j.value();
123 delete module;
124 }
125
126 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
127
128 PEG_METHOD_EXIT();
129 }
130 h.sterling 1.1
131 void ConsumerManager::_init()
132 {
133 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
134
135 //check if there are any outstanding indications
136 Array<String> files;
137 Uint32 pos;
138 String consumerName;
139
140 if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
141 {
142 for (Uint32 i = 0; i < files.size(); i++)
143 {
144 pos = files[i].find(".dat");
145 if (pos != PEG_NOT_FOUND)
146 {
147 consumerName = files[i].subString(0, pos);
148
149 try
150 {
151 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!");
152 getConsumer(consumerName);
153
154 } catch (...)
155 {
156 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]);
157 }
158 }
159 }
160 }
161
162 PEG_METHOD_EXIT();
163 }
164
165 String ConsumerManager::getConsumerDir()
166 {
167 return _consumerDir;
168 }
169
170 String ConsumerManager::getConsumerConfigDir()
171 {
172 h.sterling 1.1 return _consumerConfigDir;
173 }
174
175 Boolean ConsumerManager::getEnableConsumerUnload()
176 {
177 return _enableConsumerUnload;
178 }
179
180 Uint32 ConsumerManager::getIdleTimeout()
181 {
182 return _idleTimeout;
183 }
184
185 /** Retrieves the library name associated with the consumer name. By default, the library name
186 * is the same as the consumer name. However, you may specify a different library name in a consumer
187 * configuration file. This file must be named "MyConsumer.txt" and contain the following:
188 * location="libraryName"
189 *
190 * The config file is optional and is generally only needed in cases where there are strict requirements
191 * on library naming.
192 *
193 h.sterling 1.1 * It is the responsibility of the caller to catch any exceptions thrown by this method.
194 */
195 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
196 {
197 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
198
199 //default library name is consumer name
200 String libraryName = consumerName;
201
202 //check whether an alternative library name was specified in an optional consumer config file
203 String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf"));
204 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile);
205
206 if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
207 {
208 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName);
209
210 try
211 {
|
212 h.sterling 1.6 //Bugzilla 3765 - Change this to use a member var when OptionManager has a reset option
|
213 h.sterling 1.8 OptionManager _optionMgr;
214 _optionMgr.registerOptions(optionsTable, NUM_OPTIONS); //comment this line out later
|
215 h.sterling 1.1 _optionMgr.mergeFile(configFile);
216 _optionMgr.checkRequiredOptions();
217
218 if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY))
219 {
220 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile);
221 libraryName = consumerName;
222 }
223
224 } catch (Exception & ex)
225 {
226 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE",
227 "Error reading $0: $1.",
228 configFile,
229 ex.getMessage()));
230 }
231 } else
232 {
233 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName);
234 }
235
236 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName);
237
238 PEG_METHOD_EXIT();
239 return libraryName;
240 }
241
242 /** Returns the DynamicConsumer for the consumerName. If it already exists, we return the one in the cache. If it
243 * DNE, we create it and initialize it, and add it to the table.
244 * @throws Exception if we cannot successfully create and initialize the consumer
245 */
246 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
247 {
248 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
249
250 DynamicConsumer* consumer = 0;
251 CIMIndicationConsumerProvider* consumerRef = 0;
252 Boolean cached = false;
253 Boolean entryExists = false;
254
255 AutoMutex lock(_consumerTableMutex);
256
257 h.sterling 1.1 if (_consumers.lookup(consumerName, consumer))
258 {
259 //why isn't this working??
260 entryExists = true;
261
262 if (consumer && consumer->isLoaded())
263 {
264 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName);
265 cached = true;
266 }
267 } else
268 {
269 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName);
270 consumer = new DynamicConsumer(consumerName);
271 //ATTN: The above is a memory leak if _initConsumer throws an exception
272 //need to delete it in that case
273 }
274
275 if (!cached)
276 {
277 _initConsumer(consumerName, consumer);
278 h.sterling 1.1
279 if (!entryExists)
280 {
281 _consumers.insert(consumerName, consumer);
282 }
283 }
284
285 consumer->updateIdleTimer();
286
287 PEG_METHOD_EXIT();
288 return consumer;
289 }
290
291 /** Initializes a DynamicConsumer.
292 * Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist.
293 * @throws Exception if the consumer cannot be initialized
294 */
295 void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer)
296 {
297 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
298
299 h.sterling 1.1 CIMIndicationConsumerProvider* base = 0;
300 ConsumerModule* module = 0;
301
302 //lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one)
303 String libraryName = _getConsumerLibraryName(consumerName);
304 module = _lookupModule(libraryName);
305
306 //build library path
307 String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName));
308 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath);
309
310 //load module
311 try
312 {
313 base = module->load(consumerName, libraryPath);
314 consumer->set(module, base);
315
316 } catch (Exception& ex)
317 {
318 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage());
319
320 h.sterling 1.1 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
321 "Cannot load module ($0:$1): Unknown exception.",
322 consumerName,
323 libraryName));
324 } catch (...)
325 {
326 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
327 "Cannot load module ($0:$1): Unknown exception.",
328 consumerName,
329 libraryName));
330 }
331
332 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName);
333
334 //initialize consumer
335 try
336 {
337 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " + consumerName);
338
339 consumer->initialize();
340
341 h.sterling 1.1 //ATTN: need to change this
342 Semaphore* semaphore = new Semaphore(0); //blocking
343
344 consumer->setShutdownSemaphore(semaphore);
345
346 //start the worker thread
|
347 konrad.r 1.7 if (_thread_pool->allocate_and_awaken(consumer,
|
348 h.sterling 1.1 _worker_routine,
|
349 konrad.r 1.7 semaphore) != PEGASUS_THREAD_OK)
|
350 h.sterling 1.8 {
351 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
352 "Not enough threads for consumer.");
|
353 konrad.r 1.7
|
354 h.sterling 1.8 Tracer::trace(TRC_LISTENER, Tracer::LEVEL2,
355 "Could not allocate thread for consumer.");
|
356 konrad.r 1.7
|
357 h.sterling 1.8 consumer->setShutdownSemaphore(0);
358 delete semaphore;
|
359 konrad.r 1.7 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
|
360 h.sterling 1.8 "Not enough threads for consumer worker routine."));
|
361 konrad.r 1.7 }
|
362 h.sterling 1.1
|
363 h.sterling 1.8 //wait until the listening thread has started. Otherwise, there is a miniscule chance that the first event will be enqueued
364 //before the consumer is waiting for it and the first indication after loading the consumer will be lost
365 consumer->waitForEventThread();
366
|
367 h.sterling 1.1 //load any outstanding requests
|
368 h.sterling 1.11 Array<IndicationDispatchEvent> outstandingIndications = _deserializeOutstandingIndications(consumerName);
|
369 h.sterling 1.1 if (outstandingIndications.size())
370 {
371 //the consumer will signal itself in _loadOustandingIndications
372 consumer->_loadOutstandingIndications(outstandingIndications);
373 }
374
375 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);
376
377 } catch (...)
378 {
379 module->unloadModule();
380 consumer->reset();
381 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
382 "Cannot initialize consumer ($0).",
383 consumerName));
384 }
385
386 PEG_METHOD_EXIT();
387 }
388
389
390 h.sterling 1.1 /** Returns the ConsumerModule with the given library name. If it already exists, we return the one in the cache. If it
391 * DNE, we create it and add it to the table.
392 * @throws Exception if we cannot successfully create and initialize the consumer
393 */
394 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
395 {
396 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
397
398 AutoMutex lock(_moduleTableMutex);
399
400 ConsumerModule* module = 0;
401
402 //see if consumer module is cached
403 if (_modules.lookup(libraryName, module))
404 {
405 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
406 "Found Consumer Module" + libraryName + " in Consumer Manager Cache");
407
408 } else
409 {
410 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
411 h.sterling 1.1 "Creating Consumer Provider Module " + libraryName);
412
413 module = new ConsumerModule();
414 _modules.insert(libraryName, module);
415 }
416
417 PEG_METHOD_EXIT();
418 return(module);
419 }
420
421 /** Returns true if there are active consumers
422 */
423 Boolean ConsumerManager::hasActiveConsumers()
424 {
425 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
426
427 AutoMutex lock(_consumerTableMutex);
428 DynamicConsumer* consumer = 0;
429
430 try
431 {
432 h.sterling 1.1 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
433 {
434 consumer = i.value();
435
436 if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))
437 {
438 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);
439 PEG_METHOD_EXIT();
440 return true;
441 }
442 }
443 } catch (...)
444 {
445 // Unexpected exception; do not assume that no providers are loaded
446 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");
447 PEG_METHOD_EXIT();
448 return true;
449 }
450
451 PEG_METHOD_EXIT();
452 return false;
453 h.sterling 1.1 }
454
455 /** Returns true if there are loaded consumers
456 */
457 Boolean ConsumerManager::hasLoadedConsumers()
458 {
459 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
460
461 AutoMutex lock(_consumerTableMutex);
462 DynamicConsumer* consumer = 0;
463
464 try
465 {
466 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
467 {
468 consumer = i.value();
469
470 if (consumer && consumer->isLoaded())
471 {
472 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);
473 PEG_METHOD_EXIT();
474 h.sterling 1.1 return true;
475 }
476 }
477 } catch (...)
478 {
479 // Unexpected exception; do not assume that no providers are loaded
480 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");
481 PEG_METHOD_EXIT();
482 return true;
483 }
484
485 PEG_METHOD_EXIT();
486 return false;
487 }
488
489
490 /** Shutting down a consumer consists of four major steps:
491 * 1) Send the shutdown signal. This causes the worker routine to break out of the loop and exit.
492 * 2) Wait for the worker thread to end. This may take a while if it's processing an indication. This
493 * is optional in a shutdown scenario. If the listener is shutdown with a -f force, the listener
494 * will not wait for the consumer to finish before shutting down. Note that a normal shutdown only allows
495 h.sterling 1.1 * the current consumer indication to finish. All other queued indications are serialized to a log and
496 * are sent when the consumer is reoaded.
497 * 3) Terminate the consumer provider interface.
498 * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0)
499 *
500 * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all
501 * of the consumers so the threads can finish simultaneously.
502 *
503 * ATTN: Should the normal shutdown wait for everything in the queue to be processed? Just new indications
504 * to be processed? I am not inclined to this solution since it could take a LOT of time. By serializing
505 * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL
506 * queued indications on shutdown.
507 */
508
509 /** Unloads all consumers.
510 */
511 void ConsumerManager::unloadAllConsumers()
512 {
513 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
514
515 AutoMutex lock(_consumerTableMutex);
516 h.sterling 1.1
517 if (!_consumers.size())
518 {
519 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
520 PEG_METHOD_EXIT();
521 return;
522 }
523
524 if (!_forceShutdown)
525 {
526 //wait until all the consumers have finished processing the events in their queue
527 //ATTN: Should this have a timeout even though it's a force??
528 while (hasActiveConsumers())
529 {
530 pegasus_sleep(500);
531 }
532 }
533
534 Array<DynamicConsumer*> loadedConsumers;
535
536 ConsumerTable::Iterator i = _consumers.start();
537 h.sterling 1.1 DynamicConsumer* consumer = 0;
538
539 for (; i!=0; i++)
540 {
541 consumer = i.value();
542 if (consumer && consumer->isLoaded())
543 {
544 loadedConsumers.append(consumer);
545 }
546 }
547
548 if (loadedConsumers.size())
549 {
550 try
551 {
552 _unloadConsumers(loadedConsumers);
553
554 } catch (Exception& ex)
555 {
556 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
557 }
558 h.sterling 1.1 } else
559 {
560 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
561 }
562
563 PEG_METHOD_EXIT();
564 }
565
566 /** Unloads idle consumers.
567 */
568 void ConsumerManager::unloadIdleConsumers()
569 {
570 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
571
572 AutoMutex lock(_consumerTableMutex);
573
574 if (!_consumers.size())
575 {
576 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
577 PEG_METHOD_EXIT();
578 return;
579 h.sterling 1.1 }
580
581 Array<DynamicConsumer*> loadedConsumers;
582
583 ConsumerTable::Iterator i = _consumers.start();
584 DynamicConsumer* consumer = 0;
585
586 for (; i!=0; i++)
587 {
588 consumer = i.value();
589 if (consumer && consumer->isLoaded() && consumer->isIdle())
590 {
591 loadedConsumers.append(consumer);
592 }
593 }
594
595 if (loadedConsumers.size())
596 {
597 try
598 {
599 _unloadConsumers(loadedConsumers);
600 h.sterling 1.1
601 } catch (Exception& ex)
602 {
603 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
604 }
605 } else
606 {
607 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
608 }
609
610 PEG_METHOD_EXIT();
611 }
612
613 /** Unloads a single consumer.
614 */
615 void ConsumerManager::unloadConsumer(const String& consumerName)
616 {
617 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
618
619 AutoMutex lock(_consumerTableMutex);
620
621 h.sterling 1.1 DynamicConsumer* consumer = 0;
622
623 //check whether the consumer exists
624 if (!_consumers.lookup(consumerName, consumer))
625 {
626 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);
627 return;
628 }
629
630 //check whether the consumer is loaded
631 if (consumer && consumer->isLoaded()) //ATTN: forceShutdown?
632 {
633 //unload the consumer
634 Array<DynamicConsumer*> loadedConsumers;
635 loadedConsumers.append(consumer);
636
637 try
638 {
639 _unloadConsumers(loadedConsumers);
640
641 } catch (Exception& ex)
642 h.sterling 1.1 {
643 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
644 }
645
646 } else
647 {
648 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);
649 }
650
651 PEG_METHOD_EXIT();
652 }
653
654 /** Unloads the consumers in the given array.
655 * The consumerTable mutex MUST be locked prior to entering this method.
656 */
657 void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload)
658 {
659 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
660
661 //tell consumers to shutdown
662 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
663 h.sterling 1.1 {
664 consumersToUnload[i]->sendShutdownSignal();
665 }
666
667 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");
668
669 //wait for all the consumer worker threads to complete
670 //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last
671 //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop
672 //processing its requests.
673 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
674 {
675 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());
676
677 //wait for the consumer worker thread to end
678 try
679 {
680 Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();
681 if (_shutdownSemaphore)
682 {
683 _shutdownSemaphore->time_wait(10000);
684 h.sterling 1.1 }
685
686 } catch (TimeOut &)
687 {
688 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");
689 }
690
691 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");
692
693 try
694 {
695 //terminate consumer provider interface
696 consumersToUnload[i]->terminate();
697
698 //unload consumer provider module
699 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
700 consumersToUnload[i]->_module->unloadModule();
701
702 //serialize outstanding indications
703 _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());
704
705 h.sterling 1.1 //reset the consumer
706 consumersToUnload[i]->reset();
707
708 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");
709
710 } catch (Exception& e)
711 {
712 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage());
713 //ATTN: throw exception? log warning?
714 }
715 }
716
717 PEG_METHOD_EXIT();
718 }
719
720 /** Serializes oustanding indications to a <MyConsumer>.dat file
721 */
|
722 h.sterling 1.11 void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<IndicationDispatchEvent> indications)
|
723 h.sterling 1.1 {
724 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");
725
726 if (!indications.size())
727 {
728 PEG_METHOD_EXIT();
729 return;
730 }
731
732 String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
733 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
734
|
735 mike 1.9 Buffer buffer;
|
736 h.sterling 1.1
737 // Open the log file and serialize remaining
738 FILE* fileHandle = 0;
739 fileHandle = fopen((const char*)fileName.getCString(), "w");
740
741 if (!fileHandle)
742 {
743 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);
744
745 } else
746 {
747 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
748 "Serializing %d outstanding requests for %s",
749 indications.size(),
750 (const char*)consumerName.getCString());
751
752 //we have to put the array of instances under a valid root element or the parser complains
753 XmlWriter::append(buffer, "<IRETURNVALUE>\n");
754
|
755 h.sterling 1.11 CIMInstance cimInstance;
|
756 h.sterling 1.1 for (Uint32 i = 0; i < indications.size(); i++)
757 {
|
758 h.sterling 1.11 //set the URL string property on the serializable instance
759 CIMValue cimValue(CIMTYPE_STRING, false);
760 cimValue.set(indications[i].getURL());
761 cimInstance = indications[i].getIndicationInstance();
762 CIMProperty cimProperty(URL_PROPERTY, cimValue);
763 cimInstance.addProperty(cimProperty);
764
765 XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
766 }
|
767 h.sterling 1.1
768 XmlWriter::append(buffer, "</IRETURNVALUE>\0");
769
770 fputs((const char*)buffer.getData(), fileHandle);
771
772 fclose(fileHandle);
773 }
774
775 PEG_METHOD_EXIT();
776 }
777
778 /** Reads outstanding indications from a <MyConsumer>.dat file
779 */
|
780 h.sterling 1.11 Array<IndicationDispatchEvent> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName)
|
781 h.sterling 1.1 {
782 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");
783
784 String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
785 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
786
787 Array<CIMInstance> cimInstances;
|
788 h.sterling 1.11 Array<String> urlStrings;
789 Array<IndicationDispatchEvent> indications;
|
790 h.sterling 1.1
791 // Open the log file and serialize remaining indications
792 if (FileSystem::exists(fileName) && FileSystem::canRead(fileName))
793 {
|
794 mike 1.9 Buffer text;
|
795 h.sterling 1.1 CIMInstance cimInstance;
|
796 h.sterling 1.11 CIMProperty cimProperty;
797 CIMValue cimValue;
798 String urlString;
|
799 h.sterling 1.1 XmlEntry entry;
800
801 try
802 {
803 FileSystem::loadFileToMemory(text, fileName); //ATTN: Is this safe to use; what about CRLFs?
804 text.append('\0');
805
806 //parse the file
807 XmlParser parser((char*)text.getData());
808 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
809
810 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
811 {
|
812 h.sterling 1.11 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
813 if (index != PEG_NOT_FOUND)
814 {
815 //get the URL string property from the serialized instance and remove the property
816 cimProperty = cimInstance.getProperty(index);
817 cimValue = cimProperty.getValue();
818 cimValue.get(urlString);
819 cimInstance.removeProperty(index);
820 }
821 IndicationDispatchEvent* indicationEvent = new IndicationDispatchEvent(OperationContext(), urlString, cimInstance);
822 indications.append(*indicationEvent);
|
823 h.sterling 1.1 }
824
825 XmlReader::expectEndTag(parser, "IRETURNVALUE");
826
827 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL3,
828 "Consumer %s has %d outstanding indications",
829 (const char*)consumerName.getCString(),
|
830 h.sterling 1.11 indications.size());
|
831 h.sterling 1.1
832 //delete the file
833 FileSystem::removeFile(fileName);
834
835 } catch (Exception& ex)
836 {
837 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);
838
839 } catch (...)
840 {
841 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);
842 }
843 }
844
845 PEG_METHOD_EXIT();
|
846 h.sterling 1.11 return indications;
|
847 h.sterling 1.1 }
848
849
850
851 /**
852 * This is the main worker thread of the consumer. By having only one thread per consumer, we eliminate a ton
853 * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive
854 * operations at once. This also prevents one bad consumer from taking the entire listener down. That being said,
855 * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread.
856 *
857 * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to
858 * complete, it may make sense to make the consumer multi-threaded. The individual consumer can immediately spawn off
859 * new threads to handle indications, and return immediately to catch the next indication. In this way, a consumer
860 * can attain extremely high performance.
861 *
862 * There are three different events that can signal us:
863 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
864 * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state)
865 * 3) A retry signal (signalled from this routine itself)
866 *
867 * The idea is that all new indications are put on the front of the queue and processed first. All of the retry
868 h.sterling 1.1 * indications are put on the back of the queue and are only processed AFTER all new indications are sent.
869 * Before processing each indication, we check to see whether or not the shutdown signal was given. If so,
870 * we immediately break out of the loop, and another compenent serializes the remaining indications to a file.
871 *
872 * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception.
873 *
|
874 h.sterling 1.5 * This function makes sure it waits until the default retry lapse has passed to avoid issues with the following scenario:
875 * 20 new indications come in, 10 of them are successful, 10 are not.
|
876 h.sterling 1.1 * We were signalled 20 times, so we will pass the time_wait 20 times. Perceivably, the process time on each indication
877 * could be minimal. We could potentially proceed to process the retries after a very small time interval since
|
878 h.sterling 1.5 * we would never hit the wait for the retry timeout.
|
879 h.sterling 1.1 *
880 */
881 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param)
882 {
883 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
884
885 DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
886 String name = myself->getName();
887 DQueue<IndicationDispatchEvent> tmpEventQueue(true);
888
889 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);
890
|
891 h.sterling 1.8 myself->_listeningSemaphore->signal();
892
|
893 h.sterling 1.1 while (true)
894 {
895 try
896 {
897 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name);
898
899 //wait to be signalled
900 myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
901
902 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name);
903
904 //check whether we received the shutdown signal
905 if (myself->_dieNow)
906 {
907 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name);
908 break;
909 }
910
911 //create a temporary queue to store failed indications
912 tmpEventQueue.empty_list();
913
914 h.sterling 1.1 //continue processing events until the queue is empty
915 //make sure to check for the shutdown signal before every iteration
|
916 h.sterling 1.10 // Note that any time during our processing of events the Listener may be enqueueing NEW events for us to process.
917 // Because we are popping off the front and new events are being thrown on the back if events are failing when we start
918 // But are succeeding by the end of the processing, events may be sent out of chronological order.
919 // However. Once we complete the current queue of events, we will always send old events to be retried before sending any
920 // new events added afterwards.
|
921 h.sterling 1.1 while (myself->_eventqueue.size())
922 {
923 //check for shutdown signal
924 //this only breaks us out of the queue loop, but we will immediately get through the next wait from
925 //the shutdown signal itself, at which time we break out of the main loop
926 if (myself->_dieNow)
927 {
928 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name);
929 break;
930 }
931
932 //pop next indication off the queue
933 IndicationDispatchEvent* event = 0;
934 event = myself->_eventqueue.remove_first(); //what exceptions/errors can this throw?
935
936 if (!event)
937 {
938 //this should never happen
939 continue;
940 }
941
942 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);
943
944 try
945 {
946 myself->consumeIndication(event->getContext(),
947 event->getURL(),
948 event->getIndicationInstance());
949
950 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name);
951
952 delete event;
953 continue;
954
955 } catch (CIMException & ce)
956 {
957 //check for failure
958 if (ce.getCode() == CIM_ERR_FAILED)
959 {
960 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);
|
961 h.sterling 1.10
962 // Here we simply determine if we should increment the retry count or not.
963 // We don't want to count a forced retry from a new event to count as a retry. We just have to do it for
964 // order's sake. If the retry Lapse has lapsed on this event then increment the counter.
965 if (event->getRetries() > 0) {
966 Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), CIMDateTime::getCurrentDateTime());
967 if (differenceInMicroseconds >= (DEFAULT_RETRY_LAPSE * 1000))
968 event->increaseRetries();
969 } else {
970 event->increaseRetries();
971 }
|
972 h.sterling 1.1
973 //determine if we have hit the max retry count
974 if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
975 {
976 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2,
977 "Error: the maximum retry count has been exceeded. Removing the event from the queue.");
978
979 Logger::put(
980 Logger::ERROR_LOG,
981 "ConsumerManager",
982 Logger::SEVERE,
983 "The following indication did not get processed successfully: $0",
984 event->getIndicationInstance().getPath().toString());
985
986 delete event;
987 continue;
988
989 } else
990 {
991 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue");
992 tmpEventQueue.insert_last(event);
993 h.sterling 1.1 }
994
995 } else
996 {
997 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage());
998 delete event;
999 continue;
1000 }
1001
1002 } catch (Exception & ex)
1003 {
1004 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage());
1005 delete event;
1006 continue;
1007
1008 } catch (...)
1009 {
1010 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception.");
1011 delete event;
1012 continue;
1013 } //end try
1014 h.sterling 1.1
1015 } //while eventqueue
1016
|
1017 h.sterling 1.10 // Copy the failed indications back to the main queue
1018 // We now lock the queue while adding the retries on to the queue so that new events can't get in in front
1019 // Of those events we are retrying. Retried events happened before any new events coming in.
|
1020 h.sterling 1.1 IndicationDispatchEvent* tmpEvent = 0;
|
1021 h.sterling 1.10 myself->_eventqueue.try_lock();
|
1022 h.sterling 1.1 while (tmpEventQueue.size())
1023 {
|
1024 h.sterling 1.10 //there is no = operator for DQueue so we must do it manually
|
1025 h.sterling 1.1 tmpEvent = tmpEventQueue.remove_first();
|
1026 h.sterling 1.10 myself->_eventqueue.insert_last_no_lock(tmpEvent);
1027
|
1028 h.sterling 1.1 }
|
1029 h.sterling 1.10 myself->_eventqueue.unlock();
|
1030 h.sterling 1.1
1031 } catch (TimeOut& te)
1032 {
|
1033 h.sterling 1.10 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::Time to retry any outstanding indications.");
|
1034 h.sterling 1.1
1035 //signal the queue in the same way we would if we received a new indication
1036 //this allows the thread to fall into the queue processing code
1037 myself->_check_queue->signal();
1038
1039 } //time_wait
1040
1041
1042 } //shutdown
1043
1044 PEG_METHOD_EXIT();
1045 return 0;
1046 }
1047
1048
1049 PEGASUS_NAMESPACE_END
1050
1051
1052
|