1 karl 1.12 //%2006////////////////////////////////////////////////////////////////////////
|
2 h.sterling 1.1 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 h.sterling 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Heather Sterling (hsterl@us.ibm.com)
33 //
34 h.sterling 1.1 // Modified By:
35 //
36 //%/////////////////////////////////////////////////////////////////////////////
37
38 #include <Pegasus/Common/Config.h>
39 //#include <cstdlib>
40 //#include <dlfcn.h>
41 #include <Pegasus/Common/System.h>
42 #include <Pegasus/Common/FileSystem.h>
43 #include <Pegasus/Common/Tracer.h>
44 #include <Pegasus/Common/Logger.h>
45 #include <Pegasus/Common/XmlReader.h>
|
46 mike 1.15 #include <Pegasus/Common/ThreadPool.h>
|
47 h.sterling 1.1 #include <Pegasus/Common/XmlParser.h>
48 #include <Pegasus/Common/XmlWriter.h>
|
49 mike 1.14 #include <Pegasus/Common/List.h>
|
50 mike 1.15 #include <Pegasus/Common/Mutex.h>
|
51 h.sterling 1.1
52 #include "ConsumerManager.h"
53
54 PEGASUS_NAMESPACE_BEGIN
55 PEGASUS_USING_STD;
56
57 //ATTN: Can we just use a properties file instead?? If we only have one property, we may want to just parse it ourselves.
58 // We may need to add more properties, however. Potential per consumer properties: unloadOk, idleTimout, retryCount, etc
59 static struct OptionRow optionsTable[] =
60 //optionname defaultvalue rqd type domain domainsize clname hlpmsg
61 {
62 {"location", "", false, Option::STRING, 0, 0, "location", "library name for the consumer"},
63 };
64
65 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
66
67 //retry settings
68 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
|
69 h.sterling 1.6 static const Uint32 DEFAULT_RETRY_LAPSE = 300000; //ms = 5 minutes
|
70 h.sterling 1.1
|
71 h.sterling 1.11 //constant for fake property that is added to instances when serializing to track the full URL
72 static const String URL_PROPERTY = "URLString";
|
73 h.sterling 1.1
74
75 ConsumerManager::ConsumerManager(const String& consumerDir, const String& consumerConfigDir, Boolean enableConsumerUnload, Uint32 idleTimeout) :
76 _consumerDir(consumerDir),
77 _consumerConfigDir(consumerConfigDir),
78 _enableConsumerUnload(enableConsumerUnload),
79 _idleTimeout(idleTimeout),
80 _forceShutdown(true)
81 {
82 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
83
84 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer library directory: " + consumerDir);
85 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer configuration directory: " + consumerConfigDir);
86
|
87 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
|
88 h.sterling 1.1 "Consumer unload enabled %d: idle timeout %d",
89 enableConsumerUnload,
|
90 marek 1.18 idleTimeout));
|
91 h.sterling 1.1
92
|
93 h.sterling 1.8 //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
|
94 h.sterling 1.6 //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
95 h.sterling 1.1
|
96 kumpf 1.3 struct timeval deallocateWait = {15, 0};
97 _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
|
98 h.sterling 1.1
99 _init();
100
101 PEG_METHOD_EXIT();
102 }
103
104 ConsumerManager::~ConsumerManager()
105 {
106 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
107
108 unloadAllConsumers();
109
|
110 kumpf 1.3 delete _thread_pool;
|
111 h.sterling 1.1
112 ConsumerTable::Iterator i = _consumers.start();
113 for (; i!=0; i++)
114 {
115 DynamicConsumer* consumer = i.value();
116 delete consumer;
117 }
118
|
119 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
|
120 h.sterling 1.1
121 ModuleTable::Iterator j = _modules.start();
122 for (;j!=0;j++)
123 {
124 ConsumerModule* module = j.value();
125 delete module;
126 }
127
|
128 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
|
129 h.sterling 1.1
130 PEG_METHOD_EXIT();
131 }
132
133 void ConsumerManager::_init()
134 {
135 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
136
137 //check if there are any outstanding indications
138 Array<String> files;
139 Uint32 pos;
140 String consumerName;
141
142 if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
143 {
144 for (Uint32 i = 0; i < files.size(); i++)
145 {
146 pos = files[i].find(".dat");
147 if (pos != PEG_NOT_FOUND)
148 {
149 consumerName = files[i].subString(0, pos);
150 h.sterling 1.1
151 try
152 {
153 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Attempting to load indication for!" + consumerName + "!");
154 getConsumer(consumerName);
155
156 } catch (...)
157 {
158 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Cannot load consumer from file " + files[i]);
159 }
160 }
161 }
162 }
163
164 PEG_METHOD_EXIT();
165 }
166
167 String ConsumerManager::getConsumerDir()
168 {
169 return _consumerDir;
170 }
171 h.sterling 1.1
172 String ConsumerManager::getConsumerConfigDir()
173 {
174 return _consumerConfigDir;
175 }
176
177 Boolean ConsumerManager::getEnableConsumerUnload()
178 {
179 return _enableConsumerUnload;
180 }
181
182 Uint32 ConsumerManager::getIdleTimeout()
183 {
184 return _idleTimeout;
185 }
186
187 /** Retrieves the library name associated with the consumer name. By default, the library name
188 * is the same as the consumer name. However, you may specify a different library name in a consumer
189 * configuration file. This file must be named "MyConsumer.txt" and contain the following:
190 * location="libraryName"
191 *
192 h.sterling 1.1 * The config file is optional and is generally only needed in cases where there are strict requirements
193 * on library naming.
194 *
195 * It is the responsibility of the caller to catch any exceptions thrown by this method.
196 */
197 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
198 {
199 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
200
201 //default library name is consumer name
202 String libraryName = consumerName;
203
204 //check whether an alternative library name was specified in an optional consumer config file
205 String configFile = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".conf"));
206 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Looking for config file " + configFile);
207
208 if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
209 {
210 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found config file for consumer " + consumerName);
211
212 try
213 h.sterling 1.1 {
|
214 h.sterling 1.6 //Bugzilla 3765 - Change this to use a member var when OptionManager has a reset option
|
215 h.sterling 1.8 OptionManager _optionMgr;
216 _optionMgr.registerOptions(optionsTable, NUM_OPTIONS); //comment this line out later
|
217 h.sterling 1.1 _optionMgr.mergeFile(configFile);
218 _optionMgr.checkRequiredOptions();
219
220 if (!_optionMgr.lookupValue("location", libraryName) || (libraryName == String::EMPTY))
221 {
222 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Warning: Using default library name since none was specified in " + configFile);
223 libraryName = consumerName;
224 }
225
226 } catch (Exception & ex)
227 {
228 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.INVALID_CONFIG_FILE",
229 "Error reading $0: $1.",
230 configFile,
231 ex.getMessage()));
232 }
233 } else
234 {
235 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "No config file exists for " + consumerName);
236 }
237
238 h.sterling 1.1 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "The library name for " + consumerName + " is " + libraryName);
239
240 PEG_METHOD_EXIT();
241 return libraryName;
242 }
243
244 /** Returns the DynamicConsumer for the consumerName. If it already exists, we return the one in the cache. If it
245 * DNE, we create it and initialize it, and add it to the table.
246 * @throws Exception if we cannot successfully create and initialize the consumer
247 */
248 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
249 {
250 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
251
252 DynamicConsumer* consumer = 0;
253 CIMIndicationConsumerProvider* consumerRef = 0;
254 Boolean cached = false;
255 Boolean entryExists = false;
256
257 AutoMutex lock(_consumerTableMutex);
258
259 h.sterling 1.1 if (_consumers.lookup(consumerName, consumer))
260 {
261 //why isn't this working??
262 entryExists = true;
263
264 if (consumer && consumer->isLoaded())
265 {
266 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer exists in the cache and is already loaded: " + consumerName);
267 cached = true;
268 }
269 } else
270 {
271 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer not found in cache, creating " + consumerName);
272 consumer = new DynamicConsumer(consumerName);
273 //ATTN: The above is a memory leak if _initConsumer throws an exception
274 //need to delete it in that case
275 }
276
277 if (!cached)
278 {
279 _initConsumer(consumerName, consumer);
280 h.sterling 1.1
281 if (!entryExists)
282 {
283 _consumers.insert(consumerName, consumer);
284 }
285 }
286
287 consumer->updateIdleTimer();
288
289 PEG_METHOD_EXIT();
290 return consumer;
291 }
292
293 /** Initializes a DynamicConsumer.
294 * Caller assumes responsibility for mutexing the operation as well as ensuring the consumer does not already exist.
295 * @throws Exception if the consumer cannot be initialized
296 */
297 void ConsumerManager::_initConsumer(const String& consumerName, DynamicConsumer* consumer)
298 {
299 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
300
301 h.sterling 1.1 CIMIndicationConsumerProvider* base = 0;
302 ConsumerModule* module = 0;
303
304 //lookup provider module in cache (if it exists, it returns the cached module, otherwise it creates and returns a new one)
305 String libraryName = _getConsumerLibraryName(consumerName);
306 module = _lookupModule(libraryName);
307
308 //build library path
309 String libraryPath = FileSystem::getAbsolutePath((const char*)_consumerDir.getCString(), FileSystem::buildLibraryFileName(libraryName));
310 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Loading library: " + libraryPath);
311
312 //load module
313 try
314 {
315 base = module->load(consumerName, libraryPath);
316 consumer->set(module, base);
317
318 } catch (Exception& ex)
319 {
320 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error loading consumer module: " + ex.getMessage());
321
322 h.sterling 1.1 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
323 "Cannot load module ($0:$1): Unknown exception.",
324 consumerName,
325 libraryName));
326 } catch (...)
327 {
328 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
329 "Cannot load module ($0:$1): Unknown exception.",
330 consumerName,
331 libraryName));
332 }
333
334 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully loaded consumer module " + libraryName);
335
336 //initialize consumer
337 try
338 {
339 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Initializing Consumer " + consumerName);
340
341 consumer->initialize();
342
343 h.sterling 1.1 //ATTN: need to change this
344 Semaphore* semaphore = new Semaphore(0); //blocking
345
346 consumer->setShutdownSemaphore(semaphore);
347
348 //start the worker thread
|
349 konrad.r 1.7 if (_thread_pool->allocate_and_awaken(consumer,
|
350 h.sterling 1.1 _worker_routine,
|
351 konrad.r 1.7 semaphore) != PEGASUS_THREAD_OK)
|
352 h.sterling 1.8 {
|
353 yi.zhou 1.17 Logger::put(Logger::STANDARD_LOG, System::CIMLISTENER, Logger::TRACE,
|
354 h.sterling 1.8 "Not enough threads for consumer.");
|
355 konrad.r 1.7
|
356 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,
|
357 h.sterling 1.8 "Could not allocate thread for consumer.");
|
358 konrad.r 1.7
|
359 h.sterling 1.8 consumer->setShutdownSemaphore(0);
360 delete semaphore;
|
361 konrad.r 1.7 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
|
362 h.sterling 1.8 "Not enough threads for consumer worker routine."));
|
363 konrad.r 1.7 }
|
364 h.sterling 1.1
|
365 h.sterling 1.8 //wait until the listening thread has started. Otherwise, there is a miniscule chance that the first event will be enqueued
366 //before the consumer is waiting for it and the first indication after loading the consumer will be lost
367 consumer->waitForEventThread();
368
|
369 h.sterling 1.1 //load any outstanding requests
|
370 h.sterling 1.11 Array<IndicationDispatchEvent> outstandingIndications = _deserializeOutstandingIndications(consumerName);
|
371 h.sterling 1.1 if (outstandingIndications.size())
372 {
373 //the consumer will signal itself in _loadOustandingIndications
374 consumer->_loadOutstandingIndications(outstandingIndications);
375 }
376
377 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Successfully initialized consumer " + consumerName);
378
379 } catch (...)
380 {
381 module->unloadModule();
382 consumer->reset();
383 throw Exception(MessageLoaderParms("DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
384 "Cannot initialize consumer ($0).",
385 consumerName));
386 }
387
388 PEG_METHOD_EXIT();
389 }
390
391
392 h.sterling 1.1 /** Returns the ConsumerModule with the given library name. If it already exists, we return the one in the cache. If it
393 * DNE, we create it and add it to the table.
394 * @throws Exception if we cannot successfully create and initialize the consumer
395 */
396 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
397 {
398 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
399
400 AutoMutex lock(_moduleTableMutex);
401
402 ConsumerModule* module = 0;
403
404 //see if consumer module is cached
405 if (_modules.lookup(libraryName, module))
406 {
407 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
408 "Found Consumer Module" + libraryName + " in Consumer Manager Cache");
409
410 } else
411 {
412 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
413 h.sterling 1.1 "Creating Consumer Provider Module " + libraryName);
414
415 module = new ConsumerModule();
416 _modules.insert(libraryName, module);
417 }
418
419 PEG_METHOD_EXIT();
420 return(module);
421 }
422
423 /** Returns true if there are active consumers
424 */
425 Boolean ConsumerManager::hasActiveConsumers()
426 {
427 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
428
429 AutoMutex lock(_consumerTableMutex);
430 DynamicConsumer* consumer = 0;
431
432 try
433 {
434 h.sterling 1.1 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
435 {
436 consumer = i.value();
437
438 if (consumer && consumer->isLoaded() && (consumer->getPendingIndications() > 0))
439 {
440 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found active consumer: " + consumer->_name);
441 PEG_METHOD_EXIT();
442 return true;
443 }
444 }
445 } catch (...)
446 {
447 // Unexpected exception; do not assume that no providers are loaded
|
448 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasActiveConsumers.");
|
449 h.sterling 1.1 PEG_METHOD_EXIT();
450 return true;
451 }
452
453 PEG_METHOD_EXIT();
454 return false;
455 }
456
457 /** Returns true if there are loaded consumers
458 */
459 Boolean ConsumerManager::hasLoadedConsumers()
460 {
461 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
462
463 AutoMutex lock(_consumerTableMutex);
464 DynamicConsumer* consumer = 0;
465
466 try
467 {
468 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
469 {
470 h.sterling 1.1 consumer = i.value();
471
472 if (consumer && consumer->isLoaded())
473 {
474 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Found loaded consumer: " + consumer->_name);
475 PEG_METHOD_EXIT();
476 return true;
477 }
478 }
479 } catch (...)
480 {
481 // Unexpected exception; do not assume that no providers are loaded
|
482 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Unexpected Exception in hasLoadedConsumers.");
|
483 h.sterling 1.1 PEG_METHOD_EXIT();
484 return true;
485 }
486
487 PEG_METHOD_EXIT();
488 return false;
489 }
490
491
492 /** Shutting down a consumer consists of four major steps:
493 * 1) Send the shutdown signal. This causes the worker routine to break out of the loop and exit.
494 * 2) Wait for the worker thread to end. This may take a while if it's processing an indication. This
495 * is optional in a shutdown scenario. If the listener is shutdown with a -f force, the listener
496 * will not wait for the consumer to finish before shutting down. Note that a normal shutdown only allows
497 * the current consumer indication to finish. All other queued indications are serialized to a log and
498 * are sent when the consumer is reoaded.
499 * 3) Terminate the consumer provider interface.
500 * 4) Decrement the module refcount (the module will automatically unload when it's refcount == 0)
501 *
502 * In a scenario where more multiple consumers are loaded, the shutdown signal should be sent to all
503 * of the consumers so the threads can finish simultaneously.
504 h.sterling 1.1 *
505 * ATTN: Should the normal shutdown wait for everything in the queue to be processed? Just new indications
506 * to be processed? I am not inclined to this solution since it could take a LOT of time. By serializing
507 * and deserialing indications between shutdown and startup, I feel like we do not need to process ALL
508 * queued indications on shutdown.
509 */
510
511 /** Unloads all consumers.
512 */
513 void ConsumerManager::unloadAllConsumers()
514 {
515 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
516
517 AutoMutex lock(_consumerTableMutex);
518
519 if (!_consumers.size())
520 {
|
521 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
|
522 h.sterling 1.1 PEG_METHOD_EXIT();
523 return;
524 }
525
526 if (!_forceShutdown)
527 {
528 //wait until all the consumers have finished processing the events in their queue
529 //ATTN: Should this have a timeout even though it's a force??
530 while (hasActiveConsumers())
531 {
|
532 mike 1.15 Threads::sleep(500);
|
533 h.sterling 1.1 }
534 }
535
536 Array<DynamicConsumer*> loadedConsumers;
537
538 ConsumerTable::Iterator i = _consumers.start();
539 DynamicConsumer* consumer = 0;
540
541 for (; i!=0; i++)
542 {
543 consumer = i.value();
544 if (consumer && consumer->isLoaded())
545 {
546 loadedConsumers.append(consumer);
547 }
548 }
549
550 if (loadedConsumers.size())
551 {
552 try
553 {
554 h.sterling 1.1 _unloadConsumers(loadedConsumers);
555
|
556 kumpf 1.16 } catch (Exception&)
|
557 h.sterling 1.1 {
|
558 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
|
559 h.sterling 1.1 }
560 } else
561 {
|
562 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
|
563 h.sterling 1.1 }
564
565 PEG_METHOD_EXIT();
566 }
567
568 /** Unloads idle consumers.
569 */
570 void ConsumerManager::unloadIdleConsumers()
571 {
572 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
573
574 AutoMutex lock(_consumerTableMutex);
575
576 if (!_consumers.size())
577 {
|
578 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
|
579 h.sterling 1.1 PEG_METHOD_EXIT();
580 return;
581 }
582
583 Array<DynamicConsumer*> loadedConsumers;
584
585 ConsumerTable::Iterator i = _consumers.start();
586 DynamicConsumer* consumer = 0;
587
588 for (; i!=0; i++)
589 {
590 consumer = i.value();
591 if (consumer && consumer->isLoaded() && consumer->isIdle())
592 {
593 loadedConsumers.append(consumer);
594 }
595 }
596
597 if (loadedConsumers.size())
598 {
599 try
600 h.sterling 1.1 {
601 _unloadConsumers(loadedConsumers);
602
|
603 kumpf 1.16 } catch (Exception&)
|
604 h.sterling 1.1 {
|
605 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
|
606 h.sterling 1.1 }
607 } else
608 {
|
609 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "There are no consumers to unload.");
|
610 h.sterling 1.1 }
611
612 PEG_METHOD_EXIT();
613 }
614
615 /** Unloads a single consumer.
616 */
617 void ConsumerManager::unloadConsumer(const String& consumerName)
618 {
619 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
620
621 AutoMutex lock(_consumerTableMutex);
622
623 DynamicConsumer* consumer = 0;
624
625 //check whether the consumer exists
626 if (!_consumers.lookup(consumerName, consumer))
627 {
628 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer, unknown consumer " + consumerName);
629 return;
630 }
631 h.sterling 1.1
632 //check whether the consumer is loaded
633 if (consumer && consumer->isLoaded()) //ATTN: forceShutdown?
634 {
635 //unload the consumer
636 Array<DynamicConsumer*> loadedConsumers;
637 loadedConsumers.append(consumer);
638
639 try
640 {
641 _unloadConsumers(loadedConsumers);
642
|
643 kumpf 1.16 } catch (Exception&)
|
644 h.sterling 1.1 {
|
645 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Error unloading consumers.");
|
646 h.sterling 1.1 }
647
648 } else
649 {
650 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: cannot unload consumer " + consumerName);
651 }
652
653 PEG_METHOD_EXIT();
654 }
655
656 /** Unloads the consumers in the given array.
657 * The consumerTable mutex MUST be locked prior to entering this method.
658 */
659 void ConsumerManager::_unloadConsumers(Array<DynamicConsumer*> consumersToUnload)
660 {
661 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
662
663 //tell consumers to shutdown
664 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
665 {
666 consumersToUnload[i]->sendShutdownSignal();
667 h.sterling 1.1 }
668
|
669 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Sent shutdown signal to all consumers.");
|
670 h.sterling 1.1
671 //wait for all the consumer worker threads to complete
672 //since we can only shutdown after they are all complete, it does not matter if the first, fifth, or last
673 //consumer takes the longest; the wait time is equal to the time it takes for the busiest consumer to stop
674 //processing its requests.
675 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
676 {
677 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Unloading consumer " + consumersToUnload[i]->getName());
678
679 //wait for the consumer worker thread to end
680 try
681 {
682 Semaphore* _shutdownSemaphore = consumersToUnload[i]->getShutdownSemaphore();
683 if (_shutdownSemaphore)
684 {
685 _shutdownSemaphore->time_wait(10000);
686 }
687
688 } catch (TimeOut &)
689 {
|
690 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Timed out while attempting to stop consumer thread.");
|
691 h.sterling 1.1 }
692
|
693 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Terminating consumer.");
|
694 h.sterling 1.1
695 try
696 {
697 //terminate consumer provider interface
698 consumersToUnload[i]->terminate();
699
700 //unload consumer provider module
701 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
702 consumersToUnload[i]->_module->unloadModule();
703
704 //serialize outstanding indications
705 _serializeOutstandingIndications(consumersToUnload[i]->getName(), consumersToUnload[i]->_retrieveOutstandingIndications());
706
707 //reset the consumer
708 consumersToUnload[i]->reset();
709
|
710 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3, "Consumer library successfully unloaded.");
|
711 h.sterling 1.1
712 } catch (Exception& e)
713 {
714 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error unloading consumer: " + e.getMessage());
715 //ATTN: throw exception? log warning?
716 }
717 }
718
719 PEG_METHOD_EXIT();
720 }
721
722 /** Serializes oustanding indications to a <MyConsumer>.dat file
723 */
|
724 h.sterling 1.11 void ConsumerManager::_serializeOutstandingIndications(const String& consumerName, Array<IndicationDispatchEvent> indications)
|
725 h.sterling 1.1 {
726 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_serializeOutstandingIndications");
727
728 if (!indications.size())
729 {
730 PEG_METHOD_EXIT();
731 return;
732 }
733
734 String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
735 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
736
|
737 mike 1.9 Buffer buffer;
|
738 h.sterling 1.1
739 // Open the log file and serialize remaining
740 FILE* fileHandle = 0;
741 fileHandle = fopen((const char*)fileName.getCString(), "w");
742
743 if (!fileHandle)
744 {
745 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Unable to open log file for " + consumerName);
746
747 } else
748 {
|
749 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
750 h.sterling 1.1 "Serializing %d outstanding requests for %s",
751 indications.size(),
|
752 marek 1.18 (const char*)consumerName.getCString()));
|
753 h.sterling 1.1
754 //we have to put the array of instances under a valid root element or the parser complains
755 XmlWriter::append(buffer, "<IRETURNVALUE>\n");
756
|
757 h.sterling 1.11 CIMInstance cimInstance;
|
758 h.sterling 1.1 for (Uint32 i = 0; i < indications.size(); i++)
759 {
|
760 h.sterling 1.11 //set the URL string property on the serializable instance
761 CIMValue cimValue(CIMTYPE_STRING, false);
762 cimValue.set(indications[i].getURL());
763 cimInstance = indications[i].getIndicationInstance();
764 CIMProperty cimProperty(URL_PROPERTY, cimValue);
765 cimInstance.addProperty(cimProperty);
766
767 XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
768 }
|
769 h.sterling 1.1
|
770 kumpf 1.19 XmlWriter::append(buffer, "</IRETURNVALUE>");
|
771 h.sterling 1.1
772 fputs((const char*)buffer.getData(), fileHandle);
773
774 fclose(fileHandle);
775 }
776
777 PEG_METHOD_EXIT();
778 }
779
780 /** Reads outstanding indications from a <MyConsumer>.dat file
781 */
|
782 h.sterling 1.11 Array<IndicationDispatchEvent> ConsumerManager::_deserializeOutstandingIndications(const String& consumerName)
|
783 h.sterling 1.1 {
784 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_deserializeOutstandingIndications");
785
786 String fileName = FileSystem::getAbsolutePath((const char*)_consumerConfigDir.getCString(), String(consumerName + ".dat"));
787 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer dat file: " + fileName);
788
789 Array<CIMInstance> cimInstances;
|
790 h.sterling 1.11 Array<String> urlStrings;
791 Array<IndicationDispatchEvent> indications;
|
792 h.sterling 1.1
793 // Open the log file and serialize remaining indications
794 if (FileSystem::exists(fileName) && FileSystem::canRead(fileName))
795 {
|
796 mike 1.9 Buffer text;
|
797 h.sterling 1.1 CIMInstance cimInstance;
|
798 h.sterling 1.11 CIMProperty cimProperty;
799 CIMValue cimValue;
800 String urlString;
|
801 h.sterling 1.1 XmlEntry entry;
802
803 try
804 {
805 FileSystem::loadFileToMemory(text, fileName); //ATTN: Is this safe to use; what about CRLFs?
806
807 //parse the file
808 XmlParser parser((char*)text.getData());
809 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
810
811 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
812 {
|
813 h.sterling 1.11 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
814 if (index != PEG_NOT_FOUND)
815 {
816 //get the URL string property from the serialized instance and remove the property
817 cimProperty = cimInstance.getProperty(index);
818 cimValue = cimProperty.getValue();
819 cimValue.get(urlString);
820 cimInstance.removeProperty(index);
821 }
822 IndicationDispatchEvent* indicationEvent = new IndicationDispatchEvent(OperationContext(), urlString, cimInstance);
823 indications.append(*indicationEvent);
|
824 h.sterling 1.1 }
825
826 XmlReader::expectEndTag(parser, "IRETURNVALUE");
827
|
828 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
829 h.sterling 1.1 "Consumer %s has %d outstanding indications",
830 (const char*)consumerName.getCString(),
|
831 marek 1.18 indications.size()));
|
832 h.sterling 1.1
833 //delete the file
834 FileSystem::removeFile(fileName);
835
836 } catch (Exception& ex)
837 {
838 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error parsing dat file: " + ex.getMessage() + " " + consumerName);
839
840 } catch (...)
841 {
842 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error parsing dat file: Unknown Exception " + consumerName);
843 }
844 }
845
846 PEG_METHOD_EXIT();
|
847 h.sterling 1.11 return indications;
|
848 h.sterling 1.1 }
849
850
851
852 /**
853 * This is the main worker thread of the consumer. By having only one thread per consumer, we eliminate a ton
854 * of synchronization issues and make it easy to prevent the consumer from performing two mutually exclusive
855 * operations at once. This also prevents one bad consumer from taking the entire listener down. That being said,
856 * it is up to the programmer to write smart consumers, and to ensure that their actions don't deadlock the worker thread.
857 *
858 * If a consumer receives a lot of traffic, or it's consumeIndication() method takes a considerable amount of time to
859 * complete, it may make sense to make the consumer multi-threaded. The individual consumer can immediately spawn off
860 * new threads to handle indications, and return immediately to catch the next indication. In this way, a consumer
861 * can attain extremely high performance.
862 *
863 * There are three different events that can signal us:
864 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
865 * 2) A shutdown signal (signalled from ConsumerManager, due to a listener shutdown or an idle consumer state)
866 * 3) A retry signal (signalled from this routine itself)
867 *
868 * The idea is that all new indications are put on the front of the queue and processed first. All of the retry
869 h.sterling 1.1 * indications are put on the back of the queue and are only processed AFTER all new indications are sent.
870 * Before processing each indication, we check to see whether or not the shutdown signal was given. If so,
871 * we immediately break out of the loop, and another compenent serializes the remaining indications to a file.
872 *
873 * An indication gets retried if the consumer throws a CIM_ERR_FAILED exception.
874 *
|
875 h.sterling 1.5 * This function makes sure it waits until the default retry lapse has passed to avoid issues with the following scenario:
876 * 20 new indications come in, 10 of them are successful, 10 are not.
|
877 h.sterling 1.1 * We were signalled 20 times, so we will pass the time_wait 20 times. Perceivably, the process time on each indication
878 * could be minimal. We could potentially proceed to process the retries after a very small time interval since
|
879 h.sterling 1.5 * we would never hit the wait for the retry timeout.
|
880 h.sterling 1.1 *
881 */
|
882 mike 1.15 ThreadReturnType PEGASUS_THREAD_CDECL ConsumerManager::_worker_routine(void *param)
|
883 h.sterling 1.1 {
884 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
885
886 DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
887 String name = myself->getName();
|
888 mike 1.15 List<IndicationDispatchEvent,Mutex> tmpEventQueue;
|
889 h.sterling 1.1
890 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::entering loop for " + name);
891
|
892 h.sterling 1.8 myself->_listeningSemaphore->signal();
893
|
894 h.sterling 1.1 while (true)
895 {
896 try
897 {
898 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::waiting " + name);
899
900 //wait to be signalled
901 myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
902
903 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::signalled " + name);
904
905 //check whether we received the shutdown signal
906 if (myself->_dieNow)
907 {
908 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::shutdown received " + name);
909 break;
910 }
911
912 //create a temporary queue to store failed indications
|
913 mike 1.14 tmpEventQueue.clear();
|
914 h.sterling 1.1
915 //continue processing events until the queue is empty
916 //make sure to check for the shutdown signal before every iteration
|
917 h.sterling 1.10 // Note that any time during our processing of events the Listener may be enqueueing NEW events for us to process.
918 // Because we are popping off the front and new events are being thrown on the back if events are failing when we start
919 // But are succeeding by the end of the processing, events may be sent out of chronological order.
920 // However. Once we complete the current queue of events, we will always send old events to be retried before sending any
921 // new events added afterwards.
|
922 h.sterling 1.1 while (myself->_eventqueue.size())
923 {
924 //check for shutdown signal
925 //this only breaks us out of the queue loop, but we will immediately get through the next wait from
926 //the shutdown signal itself, at which time we break out of the main loop
927 if (myself->_dieNow)
928 {
929 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Received signal to shutdown, jumping out of queue loop " + name);
930 break;
931 }
932
933 //pop next indication off the queue
934 IndicationDispatchEvent* event = 0;
|
935 mike 1.14 event = myself->_eventqueue.remove_front(); //what exceptions/errors can this throw?
|
936 h.sterling 1.1
937 if (!event)
938 {
939 //this should never happen
940 continue;
941 }
942
943 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::consumeIndication " + name);
944
945 try
946 {
947 myself->consumeIndication(event->getContext(),
948 event->getURL(),
949 event->getIndicationInstance());
950
951 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::processed indication successfully. " + name);
952
953 delete event;
954 continue;
955
956 } catch (CIMException & ce)
957 h.sterling 1.1 {
958 //check for failure
959 if (ce.getCode() == CIM_ERR_FAILED)
960 {
961 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "_worker_routine::consumeIndication() temporary failure: " + ce.getMessage() + " " + name);
|
962 h.sterling 1.10
963 // Here we simply determine if we should increment the retry count or not.
964 // We don't want to count a forced retry from a new event to count as a retry. We just have to do it for
965 // order's sake. If the retry Lapse has lapsed on this event then increment the counter.
966 if (event->getRetries() > 0) {
967 Sint64 differenceInMicroseconds = CIMDateTime::getDifference(event->getLastAttemptTime(), CIMDateTime::getCurrentDateTime());
968 if (differenceInMicroseconds >= (DEFAULT_RETRY_LAPSE * 1000))
969 event->increaseRetries();
970 } else {
971 event->increaseRetries();
972 }
|
973 h.sterling 1.1
974 //determine if we have hit the max retry count
975 if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
976 {
|
977 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,
|
978 h.sterling 1.1 "Error: the maximum retry count has been exceeded. Removing the event from the queue.");
979
980 Logger::put(
981 Logger::ERROR_LOG,
|
982 yi.zhou 1.17 System::CIMLISTENER,
|
983 h.sterling 1.1 Logger::SEVERE,
984 "The following indication did not get processed successfully: $0",
985 event->getIndicationInstance().getPath().toString());
986
987 delete event;
988 continue;
989
990 } else
991 {
|
992 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::placing failed indication back in queue");
|
993 mike 1.14 tmpEventQueue.insert_back(event);
|
994 h.sterling 1.1 }
995
996 } else
997 {
998 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ce.getMessage());
999 delete event;
1000 continue;
1001 }
1002
1003 } catch (Exception & ex)
1004 {
1005 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() permanent failure: " + ex.getMessage());
1006 delete event;
1007 continue;
1008
1009 } catch (...)
1010 {
|
1011 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2, "Error: consumeIndication() failed: Unknown exception.");
|
1012 h.sterling 1.1 delete event;
1013 continue;
1014 } //end try
1015
1016 } //while eventqueue
1017
|
1018 h.sterling 1.10 // Copy the failed indications back to the main queue
1019 // We now lock the queue while adding the retries on to the queue so that new events can't get in in front
1020 // Of those events we are retrying. Retried events happened before any new events coming in.
|
1021 h.sterling 1.1 IndicationDispatchEvent* tmpEvent = 0;
|
1022 h.sterling 1.10 myself->_eventqueue.try_lock();
|
1023 h.sterling 1.1 while (tmpEventQueue.size())
1024 {
|
1025 mike 1.14 tmpEvent = tmpEventQueue.remove_front();
1026 myself->_eventqueue.insert_back(tmpEvent);
|
1027 h.sterling 1.10
|
1028 h.sterling 1.1 }
|
1029 h.sterling 1.10 myself->_eventqueue.unlock();
|
1030 h.sterling 1.1
|
1031 kumpf 1.16 } catch (TimeOut&)
|
1032 h.sterling 1.1 {
|
1033 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "_worker_routine::Time to retry any outstanding indications.");
|
1034 h.sterling 1.1
1035 //signal the queue in the same way we would if we received a new indication
1036 //this allows the thread to fall into the queue processing code
1037 myself->_check_queue->signal();
1038
1039 } //time_wait
1040
1041
1042 } //shutdown
1043
1044 PEG_METHOD_EXIT();
1045 return 0;
1046 }
1047
1048
1049 PEGASUS_NAMESPACE_END
1050
1051
1052
|