1 karl 1.12 //%2006////////////////////////////////////////////////////////////////////////
|
2 h.sterling 1.1 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 h.sterling 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 h.sterling 1.1 #include <Pegasus/Common/Config.h>
35 //#include <cstdlib>
36 //#include <dlfcn.h>
37 #include <Pegasus/Common/System.h>
38 #include <Pegasus/Common/FileSystem.h>
39 #include <Pegasus/Common/Tracer.h>
40 #include <Pegasus/Common/Logger.h>
41 #include <Pegasus/Common/XmlReader.h>
|
42 mike 1.15 #include <Pegasus/Common/ThreadPool.h>
|
43 h.sterling 1.1 #include <Pegasus/Common/XmlParser.h>
44 #include <Pegasus/Common/XmlWriter.h>
|
45 mike 1.14 #include <Pegasus/Common/List.h>
|
46 mike 1.15 #include <Pegasus/Common/Mutex.h>
|
47 h.sterling 1.1
48 #include "ConsumerManager.h"
49
50 PEGASUS_NAMESPACE_BEGIN
51 PEGASUS_USING_STD;
52
|
53 marek 1.21 //ATTN: Can we just use a properties file instead?? If we only have one
54 // property, we may want to just parse it ourselves.
55 // We may need to add more properties, however. Potential per consumer
56 // properties: unloadOk, idleTimout, retryCount, etc
|
57 h.sterling 1.1 static struct OptionRow optionsTable[] =
58 //optionname defaultvalue rqd type domain domainsize clname hlpmsg
59 {
|
60 marek 1.21 {"location", "", false, Option::STRING, 0, 0,
61 "location", "library name for the consumer"},
|
62 h.sterling 1.1 };
63
64 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
65
66 //retry settings
67 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
|
68 h.sterling 1.6 static const Uint32 DEFAULT_RETRY_LAPSE = 300000; //ms = 5 minutes
|
69 h.sterling 1.1
|
70 marek 1.21 //constant for fake property that is added to instances when
71 // serializing to track the full URL
|
72 h.sterling 1.11 static const String URL_PROPERTY = "URLString";
|
73 h.sterling 1.1
74
|
75 marek 1.21 ConsumerManager::ConsumerManager(
76 const String& consumerDir,
77 const String& consumerConfigDir,
78 Boolean enableConsumerUnload,
79 Uint32 idleTimeout) :
80 _consumerDir(consumerDir),
81 _consumerConfigDir(consumerConfigDir),
82 _enableConsumerUnload(enableConsumerUnload),
83 _idleTimeout(idleTimeout),
84 _forceShutdown(true)
|
85 h.sterling 1.1 {
86 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
87
|
88 marek 1.21 PEG_TRACE_STRING(
89 TRC_LISTENER,
90 Tracer::LEVEL4,
91 "Consumer library directory: " + consumerDir);
92 PEG_TRACE_STRING(
93 TRC_LISTENER,
94 Tracer::LEVEL4,
95 "Consumer configuration directory: " + consumerConfigDir);
|
96 h.sterling 1.1
|
97 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
|
98 h.sterling 1.1 "Consumer unload enabled %d: idle timeout %d",
99 enableConsumerUnload,
|
100 marek 1.18 idleTimeout));
|
101 h.sterling 1.1
102
|
103 h.sterling 1.8 //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
|
104 h.sterling 1.6 //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
105 h.sterling 1.1
|
106 kumpf 1.3 struct timeval deallocateWait = {15, 0};
107 _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
|
108 h.sterling 1.1
109 _init();
110
111 PEG_METHOD_EXIT();
112 }
113
114 ConsumerManager::~ConsumerManager()
115 {
116 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
117
118 unloadAllConsumers();
119
|
120 kumpf 1.3 delete _thread_pool;
|
121 h.sterling 1.1
122 ConsumerTable::Iterator i = _consumers.start();
123 for (; i!=0; i++)
124 {
125 DynamicConsumer* consumer = i.value();
126 delete consumer;
127 }
128
|
129 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
|
130 h.sterling 1.1
131 ModuleTable::Iterator j = _modules.start();
132 for (;j!=0;j++)
133 {
134 ConsumerModule* module = j.value();
135 delete module;
136 }
137
|
138 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
|
139 h.sterling 1.1
140 PEG_METHOD_EXIT();
141 }
142
143 void ConsumerManager::_init()
144 {
145 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
146
147 //check if there are any outstanding indications
148 Array<String> files;
149 Uint32 pos;
150 String consumerName;
151
152 if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
153 {
154 for (Uint32 i = 0; i < files.size(); i++)
155 {
156 pos = files[i].find(".dat");
157 if (pos != PEG_NOT_FOUND)
158 {
159 consumerName = files[i].subString(0, pos);
160 h.sterling 1.1
161 try
162 {
|
163 marek 1.21 PEG_TRACE_STRING(
164 TRC_LISTENER,
165 Tracer::LEVEL4,
166 "Attempting to load indication for!" +
167 consumerName + "!");
|
168 h.sterling 1.1 getConsumer(consumerName);
169
170 } catch (...)
171 {
|
172 marek 1.21 PEG_TRACE_STRING(
173 TRC_LISTENER,
|
174 marek 1.25 Tracer::LEVEL1,
|
175 marek 1.21 "Cannot load consumer from file " + files[i]);
|
176 h.sterling 1.1 }
177 }
178 }
179 }
180
181 PEG_METHOD_EXIT();
182 }
183
184 String ConsumerManager::getConsumerDir()
185 {
186 return _consumerDir;
187 }
188
189 String ConsumerManager::getConsumerConfigDir()
190 {
191 return _consumerConfigDir;
192 }
193
194 Boolean ConsumerManager::getEnableConsumerUnload()
195 {
196 return _enableConsumerUnload;
197 h.sterling 1.1 }
198
199 Uint32 ConsumerManager::getIdleTimeout()
200 {
201 return _idleTimeout;
202 }
203
|
204 marek 1.21 /** Retrieves the library name associated with the consumer name.
205 * By default, the library name
206 * is the same as the consumer name. However, you may specify a different
207 * library name in a consumer configuration file. This file must be named
208 * "MyConsumer.txt" and contain the following: location="libraryName"
209 *
210 * The config file is optional and is generally only needed in cases where
211 * there are strict requirements on library naming.
212 *
213 * It is the responsibility of the caller to catch any exceptions
214 * thrown by this method.
215 */
|
216 h.sterling 1.1 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
217 {
218 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
219
220 //default library name is consumer name
221 String libraryName = consumerName;
222
|
223 marek 1.21 // check whether an alternative library name was specified in an optional
224 // consumer config file
225 String configFile = FileSystem::getAbsolutePath(
226 (const char*)_consumerConfigDir.getCString(),
227 String(consumerName + ".conf"));
228 PEG_TRACE_STRING(
229 TRC_LISTENER,
230 Tracer::LEVEL4,
231 "Looking for config file " + configFile);
|
232 h.sterling 1.1
233 if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
234 {
|
235 marek 1.21 PEG_TRACE_STRING(
236 TRC_LISTENER,
237 Tracer::LEVEL4,
238 "Found config file for consumer " + consumerName);
|
239 h.sterling 1.1
240 try
241 {
|
242 marek 1.21 //Bugzilla 3765 - Change this to use a member var
243 // when OptionManager has a reset option
|
244 h.sterling 1.8 OptionManager _optionMgr;
|
245 marek 1.21 //comment the following line out later
246 _optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
247 h.sterling 1.1 _optionMgr.mergeFile(configFile);
248 _optionMgr.checkRequiredOptions();
249
|
250 marek 1.21 if (!_optionMgr.lookupValue("location", libraryName) ||
251 (libraryName == String::EMPTY))
|
252 h.sterling 1.1 {
|
253 marek 1.21 PEG_TRACE_STRING(
254 TRC_LISTENER,
255 Tracer::LEVEL2,
256 "Warning: Using default library name since none was "
257 "specified in " + configFile);
|
258 h.sterling 1.1 libraryName = consumerName;
259 }
260
261 } catch (Exception & ex)
262 {
|
263 marek 1.21 throw Exception(
264 MessageLoaderParms(
265 "DynListener.ConsumerManager.INVALID_CONFIG_FILE",
266 "Error reading $0: $1.",
267 configFile,
268 ex.getMessage()));
|
269 h.sterling 1.1 }
270 } else
271 {
|
272 marek 1.21 PEG_TRACE_STRING(
273 TRC_LISTENER,
274 Tracer::LEVEL4,
275 "No config file exists for " + consumerName);
|
276 h.sterling 1.1 }
277
|
278 marek 1.21 PEG_TRACE_STRING(
279 TRC_LISTENER,
280 Tracer::LEVEL4,
281 "The library name for " + consumerName + " is " + libraryName);
|
282 h.sterling 1.1
283 PEG_METHOD_EXIT();
284 return libraryName;
285 }
286
|
287 marek 1.21 /** Returns the DynamicConsumer for the consumerName. If it already exists,
288 * we return the one in the cache. If it
|
289 h.sterling 1.1 * DNE, we create it and initialize it, and add it to the table.
|
290 marek 1.21 * @throws Exception if we cannot successfully create and
291 * initialize the consumer
|
292 h.sterling 1.1 */
293 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
294 {
295 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
296
297 DynamicConsumer* consumer = 0;
298 CIMIndicationConsumerProvider* consumerRef = 0;
299 Boolean cached = false;
300 Boolean entryExists = false;
301
302 AutoMutex lock(_consumerTableMutex);
303
304 if (_consumers.lookup(consumerName, consumer))
305 {
306 //why isn't this working??
307 entryExists = true;
308
309 if (consumer && consumer->isLoaded())
310 {
|
311 marek 1.21 PEG_TRACE_STRING(
312 TRC_LISTENER,
|
313 marek 1.25 Tracer::LEVEL4,
|
314 marek 1.21 "Consumer exists in the cache and"
315 " is already loaded: " + consumerName);
|
316 h.sterling 1.1 cached = true;
317 }
318 } else
319 {
|
320 marek 1.21 PEG_TRACE_STRING(
321 TRC_LISTENER,
322 Tracer::LEVEL3,
323 "Consumer not found in cache, creating " + consumerName);
|
324 h.sterling 1.1 consumer = new DynamicConsumer(consumerName);
325 //ATTN: The above is a memory leak if _initConsumer throws an exception
326 //need to delete it in that case
327 }
328
329 if (!cached)
330 {
331 _initConsumer(consumerName, consumer);
332
333 if (!entryExists)
334 {
335 _consumers.insert(consumerName, consumer);
336 }
337 }
338
339 consumer->updateIdleTimer();
340
341 PEG_METHOD_EXIT();
342 return consumer;
343 }
344
345 h.sterling 1.1 /** Initializes a DynamicConsumer.
|
346 marek 1.21 * Caller assumes responsibility for mutexing the operation as well as
347 * ensuring the consumer does not already exist.
|
348 h.sterling 1.1 * @throws Exception if the consumer cannot be initialized
349 */
|
350 marek 1.21 void ConsumerManager::_initConsumer(
351 const String& consumerName,
352 DynamicConsumer* consumer)
|
353 h.sterling 1.1 {
354 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
355
356 CIMIndicationConsumerProvider* base = 0;
357 ConsumerModule* module = 0;
358
|
359 marek 1.21 // lookup provider module in cache (if it exists, it returns
360 // the cached module, otherwise it creates and returns a new one)
|
361 h.sterling 1.1 String libraryName = _getConsumerLibraryName(consumerName);
362 module = _lookupModule(libraryName);
363
364 //build library path
|
365 marek 1.21 String libraryPath = FileSystem::getAbsolutePath(
366 (const char*)_consumerDir.getCString(),
367 FileSystem::buildLibraryFileName(libraryName));
368 PEG_TRACE_STRING(
369 TRC_LISTENER,
370 Tracer::LEVEL4,
371 "Loading library: " + libraryPath);
|
372 h.sterling 1.1
373 //load module
374 try
375 {
376 base = module->load(consumerName, libraryPath);
377 consumer->set(module, base);
378
379 } catch (Exception& ex)
380 {
|
381 marek 1.21 PEG_TRACE_STRING(
382 TRC_LISTENER,
|
383 marek 1.25 Tracer::LEVEL1,
|
384 marek 1.21 "Error loading consumer module: " + ex.getMessage());
385
386 throw Exception(
387 MessageLoaderParms(
388 "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
389 "Cannot load module ($0:$1): Unknown exception.",
390 consumerName,
391 libraryName));
|
392 h.sterling 1.1 } catch (...)
393 {
|
394 marek 1.21 throw Exception(
395 MessageLoaderParms(
396 "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
397 "Cannot load module ($0:$1): Unknown exception.",
398 consumerName,
399 libraryName));
|
400 h.sterling 1.1 }
401
|
402 marek 1.21 PEG_TRACE_STRING(
403 TRC_LISTENER,
404 Tracer::LEVEL4,
405 "Successfully loaded consumer module " + libraryName);
|
406 h.sterling 1.1
407 //initialize consumer
408 try
409 {
|
410 marek 1.21 PEG_TRACE_STRING(
411 TRC_LISTENER,
412 Tracer::LEVEL4,
413 "Initializing Consumer " + consumerName);
|
414 h.sterling 1.1
415 consumer->initialize();
416
417 //ATTN: need to change this
418 Semaphore* semaphore = new Semaphore(0); //blocking
419
420 consumer->setShutdownSemaphore(semaphore);
421
422 //start the worker thread
|
423 konrad.r 1.7 if (_thread_pool->allocate_and_awaken(consumer,
|
424 h.sterling 1.1 _worker_routine,
|
425 konrad.r 1.7 semaphore) != PEGASUS_THREAD_OK)
|
426 marek 1.21 {
427 PEG_TRACE_CSTRING(
428 TRC_LISTENER,
|
429 marek 1.24 Tracer::LEVEL1,
|
430 marek 1.21 "Could not allocate thread for consumer.");
431
432 consumer->setShutdownSemaphore(0);
433 delete semaphore;
434 throw Exception(
435 MessageLoaderParms(
436 "DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
437 "Not enough threads for consumer worker routine."));
|
438 konrad.r 1.7 }
|
439 h.sterling 1.1
|
440 marek 1.21 //wait until the listening thread has started.
441 // Otherwise, there is a miniscule chance that the first event will
442 // be enqueued before the consumer is waiting for it and the first
443 // indication after loading the consumer will be lost
|
444 h.sterling 1.8 consumer->waitForEventThread();
445
|
446 h.sterling 1.1 //load any outstanding requests
|
447 marek 1.21 Array<IndicationDispatchEvent> outstandingIndications =
448 _deserializeOutstandingIndications(consumerName);
|
449 h.sterling 1.1 if (outstandingIndications.size())
450 {
451 //the consumer will signal itself in _loadOustandingIndications
452 consumer->_loadOutstandingIndications(outstandingIndications);
453 }
454
|
455 marek 1.21 PEG_TRACE_STRING(
456 TRC_LISTENER,
457 Tracer::LEVEL4,
458 "Successfully initialized consumer " + consumerName);
|
459 h.sterling 1.1
460 } catch (...)
461 {
462 module->unloadModule();
463 consumer->reset();
|
464 marek 1.21 throw Exception(
465 MessageLoaderParms(
466 "DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
467 "Cannot initialize consumer ($0).",
468 consumerName));
|
469 h.sterling 1.1 }
470
471 PEG_METHOD_EXIT();
472 }
473
474
|
475 marek 1.21 /** Returns the ConsumerModule with the given library name.
476 * If it already exists, we return the one in the cache. If it
|
477 h.sterling 1.1 * DNE, we create it and add it to the table.
|
478 marek 1.21 * @throws Exception if we cannot successfully create and
479 * initialize the consumer
|
480 h.sterling 1.1 */
|
481 marek 1.21 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
|
482 h.sterling 1.1 {
483 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
484
485 AutoMutex lock(_moduleTableMutex);
486
487 ConsumerModule* module = 0;
488
489 //see if consumer module is cached
490 if (_modules.lookup(libraryName, module))
491 {
|
492 marek 1.21 PEG_TRACE_STRING(
493 TRC_LISTENER,
494 Tracer::LEVEL4,
495 "Found Consumer Module" +
496 libraryName + " in Consumer Manager Cache");
|
497 h.sterling 1.1
498 } else
499 {
|
500 marek 1.21 PEG_TRACE_STRING(
501 TRC_LISTENER,
502 Tracer::LEVEL4,
503 "Creating Consumer Provider Module " + libraryName);
|
504 h.sterling 1.1
505 module = new ConsumerModule();
506 _modules.insert(libraryName, module);
507 }
508
509 PEG_METHOD_EXIT();
510 return(module);
511 }
512
513 /** Returns true if there are active consumers
514 */
515 Boolean ConsumerManager::hasActiveConsumers()
516 {
517 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
518
519 AutoMutex lock(_consumerTableMutex);
520 DynamicConsumer* consumer = 0;
521
522 try
523 {
524 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
525 h.sterling 1.1 {
526 consumer = i.value();
527
|
528 marek 1.21 if (consumer &&
529 consumer->isLoaded() &&
530 (consumer->getPendingIndications() > 0))
|
531 h.sterling 1.1 {
|
532 marek 1.21 PEG_TRACE_STRING(
533 TRC_LISTENER,
534 Tracer::LEVEL4,
535 "Found active consumer: " + consumer->_name);
|
536 h.sterling 1.1 PEG_METHOD_EXIT();
537 return true;
538 }
539 }
540 } catch (...)
541 {
542 // Unexpected exception; do not assume that no providers are loaded
|
543 marek 1.21 PEG_TRACE_CSTRING(
544 TRC_LISTENER,
|
545 marek 1.25 Tracer::LEVEL1,
|
546 marek 1.21 "Unexpected Exception in hasActiveConsumers.");
|
547 h.sterling 1.1 PEG_METHOD_EXIT();
548 return true;
549 }
550
551 PEG_METHOD_EXIT();
552 return false;
553 }
554
555 /** Returns true if there are loaded consumers
556 */
557 Boolean ConsumerManager::hasLoadedConsumers()
558 {
559 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
560
561 AutoMutex lock(_consumerTableMutex);
562 DynamicConsumer* consumer = 0;
563
564 try
565 {
566 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
567 {
568 h.sterling 1.1 consumer = i.value();
569
570 if (consumer && consumer->isLoaded())
571 {
|
572 marek 1.21 PEG_TRACE_STRING(
573 TRC_LISTENER,
574 Tracer::LEVEL4,
575 "Found loaded consumer: " + consumer->_name);
|
576 h.sterling 1.1 PEG_METHOD_EXIT();
577 return true;
578 }
579 }
580 } catch (...)
581 {
582 // Unexpected exception; do not assume that no providers are loaded
|
583 marek 1.21 PEG_TRACE_CSTRING(
584 TRC_LISTENER,
|
585 marek 1.25 Tracer::LEVEL1,
|
586 marek 1.21 "Unexpected Exception in hasLoadedConsumers.");
|
587 h.sterling 1.1 PEG_METHOD_EXIT();
588 return true;
589 }
590
591 PEG_METHOD_EXIT();
592 return false;
593 }
594
595
596 /** Shutting down a consumer consists of four major steps:
|
597 marek 1.21 * 1) Send the shutdown signal. This causes the worker routine to break out
598 * of the loop and exit.
599 * 2) Wait for the worker thread to end. This may take a while if it's
600 * processing an indication. This is optional in a shutdown scenario.
601 * If the listener is shutdown with a -f force, the listener
602 * will not wait for the consumer to finish before shutting down.
603 * Note that a normal shutdown only allows the current consumer indication
604 * to finish. All other queued indications are serialized to a log and
|
605 h.sterling 1.1 * are sent when the consumer is reoaded.
606 * 3) Terminate the consumer provider interface.
|
607 marek 1.21 * 4) Decrement the module refcount (the module will automatically unload when
608 * it's refcount == 0)
|
609 h.sterling 1.1 *
|
610 marek 1.21 * In a scenario where more multiple consumers are loaded, the shutdown signal
611 * should be sent to all of the consumers so the threads can finish
612 * simultaneously.
|
613 h.sterling 1.1 *
|
614 marek 1.21 * ATTN: Should the normal shutdown wait for everything in the queue to be
615 * processed? Just new indications to be processed? I am not inclined to this
616 * solution since it could take a LOT of time. By serializing and deserialing
617 * indications between shutdown and startup, I feel like we do not need to
618 * process ALL queued indications on shutdown.
|
619 h.sterling 1.1 */
620
621 /** Unloads all consumers.
622 */
623 void ConsumerManager::unloadAllConsumers()
624 {
625 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
626
627 AutoMutex lock(_consumerTableMutex);
628
629 if (!_consumers.size())
630 {
|
631 marek 1.21 PEG_TRACE_CSTRING(
632 TRC_LISTENER,
633 Tracer::LEVEL4,
634 "There are no consumers to unload.");
|
635 h.sterling 1.1 PEG_METHOD_EXIT();
636 return;
637 }
638
639 if (!_forceShutdown)
640 {
|
641 marek 1.21 // wait until all the consumers have finished processing the events in
642 // their queue
643 // ATTN: Should this have a timeout even though it's a force??
|
644 h.sterling 1.1 while (hasActiveConsumers())
645 {
|
646 mike 1.15 Threads::sleep(500);
|
647 h.sterling 1.1 }
648 }
649
650 Array<DynamicConsumer*> loadedConsumers;
651
652 ConsumerTable::Iterator i = _consumers.start();
653 DynamicConsumer* consumer = 0;
654
655 for (; i!=0; i++)
656 {
657 consumer = i.value();
658 if (consumer && consumer->isLoaded())
659 {
660 loadedConsumers.append(consumer);
661 }
662 }
663
664 if (loadedConsumers.size())
665 {
666 try
667 {
668 h.sterling 1.1 _unloadConsumers(loadedConsumers);
669
|
670 kumpf 1.16 } catch (Exception&)
|
671 h.sterling 1.1 {
|
672 marek 1.21 PEG_TRACE_CSTRING(
673 TRC_LISTENER,
|
674 marek 1.25 Tracer::LEVEL2,
|
675 marek 1.21 "Error unloading consumers.");
|
676 h.sterling 1.1 }
677 } else
678 {
|
679 marek 1.21 PEG_TRACE_CSTRING(
680 TRC_LISTENER,
681 Tracer::LEVEL4,
682 "There are no consumers to unload.");
|
683 h.sterling 1.1 }
684
685 PEG_METHOD_EXIT();
686 }
687
688 /** Unloads idle consumers.
689 */
690 void ConsumerManager::unloadIdleConsumers()
691 {
692 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
693
694 AutoMutex lock(_consumerTableMutex);
695
696 if (!_consumers.size())
697 {
|
698 marek 1.21 PEG_TRACE_CSTRING(
699 TRC_LISTENER,
700 Tracer::LEVEL4,
701 "There are no consumers to unload.");
|
702 h.sterling 1.1 PEG_METHOD_EXIT();
703 return;
704 }
705
706 Array<DynamicConsumer*> loadedConsumers;
707
708 ConsumerTable::Iterator i = _consumers.start();
709 DynamicConsumer* consumer = 0;
710
711 for (; i!=0; i++)
712 {
713 consumer = i.value();
714 if (consumer && consumer->isLoaded() && consumer->isIdle())
715 {
716 loadedConsumers.append(consumer);
717 }
718 }
719
720 if (loadedConsumers.size())
721 {
722 try
723 h.sterling 1.1 {
724 _unloadConsumers(loadedConsumers);
725
|
726 kumpf 1.16 } catch (Exception&)
|
727 h.sterling 1.1 {
|
728 marek 1.21 PEG_TRACE_CSTRING(
729 TRC_LISTENER,
|
730 marek 1.25 Tracer::LEVEL2,
|
731 marek 1.21 "Error unloading consumers.");
|
732 h.sterling 1.1 }
733 } else
734 {
|
735 marek 1.21 PEG_TRACE_CSTRING(
736 TRC_LISTENER,
737 Tracer::LEVEL4,
738 "There are no consumers to unload.");
|
739 h.sterling 1.1 }
740
741 PEG_METHOD_EXIT();
742 }
743
744 /** Unloads a single consumer.
745 */
746 void ConsumerManager::unloadConsumer(const String& consumerName)
747 {
748 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
749
750 AutoMutex lock(_consumerTableMutex);
751
752 DynamicConsumer* consumer = 0;
753
754 //check whether the consumer exists
755 if (!_consumers.lookup(consumerName, consumer))
756 {
|
757 marek 1.21 PEG_TRACE_STRING(
758 TRC_LISTENER,
|
759 marek 1.25 Tracer::LEVEL2,
|
760 marek 1.21 "Error: cannot unload consumer, unknown consumer " + consumerName);
|
761 h.sterling 1.1 return;
762 }
763
764 //check whether the consumer is loaded
765 if (consumer && consumer->isLoaded()) //ATTN: forceShutdown?
766 {
767 //unload the consumer
768 Array<DynamicConsumer*> loadedConsumers;
769 loadedConsumers.append(consumer);
770
771 try
772 {
773 _unloadConsumers(loadedConsumers);
774
|
775 kumpf 1.16 } catch (Exception&)
|
776 h.sterling 1.1 {
|
777 marek 1.21 PEG_TRACE_CSTRING(
778 TRC_LISTENER,
|
779 marek 1.25 Tracer::LEVEL1,
|
780 marek 1.21 "Error unloading consumers.");
|
781 h.sterling 1.1 }
782
783 } else
784 {
|
785 marek 1.21 PEG_TRACE_STRING(
786 TRC_LISTENER,
|
787 marek 1.25 Tracer::LEVEL2,
788 "Error: cannot unload the not loaded consumer " + consumerName);
|
789 h.sterling 1.1 }
790
791 PEG_METHOD_EXIT();
792 }
793
794 /** Unloads the consumers in the given array.
795 * The consumerTable mutex MUST be locked prior to entering this method.
796 */
|
797 marek 1.21 void ConsumerManager::_unloadConsumers(
798 Array<DynamicConsumer*> consumersToUnload)
|
799 h.sterling 1.1 {
800 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
801
802 //tell consumers to shutdown
803 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
804 {
805 consumersToUnload[i]->sendShutdownSignal();
806 }
807
|
808 marek 1.21 PEG_TRACE_CSTRING(
|
809 kumpf 1.22 TRC_LISTENER,
|
810 marek 1.25 Tracer::LEVEL3,
|
811 marek 1.21 "Sent shutdown signal to all consumers.");
812
813 // wait for all the consumer worker threads to complete
814 // since we can only shutdown after they are all complete,
815 // it does not matter if the first, fifth, or last
816 // consumer takes the longest; the wait time is equal to the time it takes
817 // for the busiest consumer to stop processing its requests.
|
818 h.sterling 1.1 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
819 {
|
820 marek 1.21 PEG_TRACE_STRING(
821 TRC_LISTENER,
|
822 marek 1.25 Tracer::LEVEL4,
|
823 marek 1.21 "Unloading consumer " + consumersToUnload[i]->getName());
|
824 h.sterling 1.1
825 //wait for the consumer worker thread to end
826 try
827 {
|
828 marek 1.21 Semaphore* _shutdownSemaphore =
829 consumersToUnload[i]->getShutdownSemaphore();
|
830 h.sterling 1.1 if (_shutdownSemaphore)
831 {
832 _shutdownSemaphore->time_wait(10000);
833 }
834
835 } catch (TimeOut &)
836 {
|
837 marek 1.21 PEG_TRACE_CSTRING(
838 TRC_LISTENER,
839 Tracer::LEVEL2,
840 "Timed out while attempting to stop consumer thread.");
|
841 h.sterling 1.1 }
842
|
843 marek 1.21 PEG_TRACE_CSTRING(
844 TRC_LISTENER,
845 Tracer::LEVEL2,
846 "Terminating consumer.");
|
847 h.sterling 1.1
848 try
849 {
850 //terminate consumer provider interface
851 consumersToUnload[i]->terminate();
852
853 //unload consumer provider module
854 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
855 consumersToUnload[i]->_module->unloadModule();
856
857 //serialize outstanding indications
|
858 marek 1.21 _serializeOutstandingIndications(
859 consumersToUnload[i]->getName(),
860 consumersToUnload[i]->_retrieveOutstandingIndications());
|
861 h.sterling 1.1
862 //reset the consumer
863 consumersToUnload[i]->reset();
864
|
865 marek 1.21 PEG_TRACE_CSTRING(
866 TRC_LISTENER,
|
867 marek 1.25 Tracer::LEVEL2,
|
868 marek 1.21 "Consumer library successfully unloaded.");
|
869 h.sterling 1.1
870 } catch (Exception& e)
871 {
|
872 marek 1.21 PEG_TRACE_STRING(
873 TRC_LISTENER,
|
874 marek 1.25 Tracer::LEVEL1,
|
875 marek 1.21 "Error unloading consumer: " + e.getMessage());
|
876 h.sterling 1.1 //ATTN: throw exception? log warning?
877 }
878 }
879
880 PEG_METHOD_EXIT();
881 }
882
883 /** Serializes oustanding indications to a <MyConsumer>.dat file
884 */
|
885 marek 1.21 void ConsumerManager::_serializeOutstandingIndications(
886 const String& consumerName,
887 Array<IndicationDispatchEvent> indications)
888 {
889 PEG_METHOD_ENTER(
890 TRC_LISTENER,
891 "ConsumerManager::_serializeOutstandingIndications");
|
892 h.sterling 1.1
893 if (!indications.size())
894 {
895 PEG_METHOD_EXIT();
896 return;
897 }
898
|
899 marek 1.21 String fileName = FileSystem::getAbsolutePath(
900 (const char*)_consumerConfigDir.getCString(),
901 String(consumerName + ".dat"));
902 PEG_TRACE_STRING(
903 TRC_LISTENER,
904 Tracer::LEVEL4,
905 "Consumer dat file: " + fileName);
|
906 h.sterling 1.1
|
907 mike 1.9 Buffer buffer;
|
908 h.sterling 1.1
909 // Open the log file and serialize remaining
910 FILE* fileHandle = 0;
911 fileHandle = fopen((const char*)fileName.getCString(), "w");
912
913 if (!fileHandle)
914 {
|
915 marek 1.21 PEG_TRACE_STRING(
916 TRC_LISTENER,
|
917 marek 1.25 Tracer::LEVEL1,
|
918 marek 1.21 "Unable to open log file for " + consumerName);
|
919 h.sterling 1.1
920 } else
921 {
|
922 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
923 h.sterling 1.1 "Serializing %d outstanding requests for %s",
924 indications.size(),
|
925 marek 1.18 (const char*)consumerName.getCString()));
|
926 h.sterling 1.1
|
927 marek 1.21 // we have to put the array of instances under a valid root element
928 // or the parser complains
|
929 h.sterling 1.1 XmlWriter::append(buffer, "<IRETURNVALUE>\n");
930
|
931 marek 1.21 CIMInstance cimInstance;
|
932 h.sterling 1.1 for (Uint32 i = 0; i < indications.size(); i++)
933 {
|
934 marek 1.21 //set the URL string property on the serializable instance
935 CIMValue cimValue(CIMTYPE_STRING, false);
936 cimValue.set(indications[i].getURL());
937 cimInstance = indications[i].getIndicationInstance();
938 CIMProperty cimProperty(URL_PROPERTY, cimValue);
939 cimInstance.addProperty(cimProperty);
|
940 h.sterling 1.11
941 XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
|
942 marek 1.21 }
|
943 h.sterling 1.1
|
944 kumpf 1.19 XmlWriter::append(buffer, "</IRETURNVALUE>");
|
945 h.sterling 1.1
946 fputs((const char*)buffer.getData(), fileHandle);
947
948 fclose(fileHandle);
949 }
950
951 PEG_METHOD_EXIT();
952 }
953
954 /** Reads outstanding indications from a <MyConsumer>.dat file
955 */
|
956 marek 1.21 Array<IndicationDispatchEvent>
957 ConsumerManager::_deserializeOutstandingIndications(
958 const String& consumerName)
959 {
960 PEG_METHOD_ENTER(
961 TRC_LISTENER,
962 "ConsumerManager::_deserializeOutstandingIndications");
963
964 String fileName = FileSystem::getAbsolutePath(
965 (const char*)_consumerConfigDir.getCString(),
966 String(consumerName + ".dat"));
967 PEG_TRACE_STRING(
968 TRC_LISTENER,
969 Tracer::LEVEL4,
970 "Consumer dat file: " + fileName);
|
971 h.sterling 1.1
972 Array<CIMInstance> cimInstances;
|
973 marek 1.21 Array<String> urlStrings;
974 Array<IndicationDispatchEvent> indications;
|
975 h.sterling 1.1
976 // Open the log file and serialize remaining indications
977 if (FileSystem::exists(fileName) && FileSystem::canRead(fileName))
978 {
|
979 mike 1.9 Buffer text;
|
980 h.sterling 1.1 CIMInstance cimInstance;
|
981 marek 1.21 CIMProperty cimProperty;
982 CIMValue cimValue;
983 String urlString;
|
984 h.sterling 1.1 XmlEntry entry;
985
986 try
987 {
|
988 marek 1.21 //ATTN: Is this safe to use; what about CRLFs?
989 FileSystem::loadFileToMemory(text, fileName);
|
990 h.sterling 1.1
991 //parse the file
992 XmlParser parser((char*)text.getData());
993 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
994
995 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
996 {
|
997 marek 1.21 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
998 if (index != PEG_NOT_FOUND)
999 {
1000 // get the URL string property from the serialized instance
1001 // and remove the property
1002 cimProperty = cimInstance.getProperty(index);
1003 cimValue = cimProperty.getValue();
1004 cimValue.get(urlString);
1005 cimInstance.removeProperty(index);
1006 }
1007 IndicationDispatchEvent* indicationEvent =
1008 new IndicationDispatchEvent(
1009 OperationContext(),
1010 urlString,
1011 cimInstance);
1012
|
1013 h.sterling 1.11 indications.append(*indicationEvent);
|
1014 h.sterling 1.1 }
1015
1016 XmlReader::expectEndTag(parser, "IRETURNVALUE");
1017
|
1018 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
1019 h.sterling 1.1 "Consumer %s has %d outstanding indications",
1020 (const char*)consumerName.getCString(),
|
1021 marek 1.18 indications.size()));
|
1022 h.sterling 1.1
1023 //delete the file
1024 FileSystem::removeFile(fileName);
1025
1026 } catch (Exception& ex)
1027 {
|
1028 marek 1.21 PEG_TRACE_STRING(
1029 TRC_LISTENER,
|
1030 marek 1.25 Tracer::LEVEL1,
|
1031 marek 1.21 "Error parsing dat file: " +
1032 ex.getMessage() + " " + consumerName);
|
1033 h.sterling 1.1
1034 } catch (...)
1035 {
|
1036 marek 1.21 PEG_TRACE_STRING(
1037 TRC_LISTENER,
|
1038 marek 1.25 Tracer::LEVEL1,
|
1039 marek 1.21 "Error parsing dat file: Unknown Exception " + consumerName);
|
1040 h.sterling 1.1 }
1041 }
1042
1043 PEG_METHOD_EXIT();
|
1044 h.sterling 1.11 return indications;
|
1045 h.sterling 1.1 }
1046
1047
1048
1049 /**
|
1050 marek 1.21 * This is the main worker thread of the consumer. By having only one thread
1051 * per consumer, we eliminate a ton of synchronization issues and make it easy
1052 * to prevent the consumer from performing two mutually exclusive operations
1053 * at once. This also prevents one bad consumer from taking the entire
1054 * listener down. That being said, it is up to the programmer to write smart
1055 * consumers, and to ensure that their actions don't deadlock
1056 * the worker thread.
|
1057 h.sterling 1.1 *
|
1058 marek 1.21 * If a consumer receives a lot of traffic, or it's consumeIndication() method
1059 * takes a considerable amount of time to complete, it may make sense to make
1060 * the consumer multi-threaded. The individual consumer can immediately
1061 * spawn off* new threads to handle indications, and return immediately to
1062 * catch the next indication. In this way, a consumer can attain
1063 * extremely high performance.
|
1064 h.sterling 1.1 *
1065 * There are three different events that can signal us:
1066 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
|
1067 marek 1.21 * 2) A shutdown signal (signalled from ConsumerManager, due to a listener
1068 * shutdown or an idle consumer state)
|
1069 h.sterling 1.1 * 3) A retry signal (signalled from this routine itself)
1070 *
|
1071 marek 1.21 * The idea is that all new indications are put on the front of the queue and
1072 * processed first. All of the retry indications are put on the back of the
1073 * queue and are only processed AFTER all new indications are sent.
1074 * Before processing each indication, we check to see whether or not the
1075 * shutdown signal was given.
1076 * If so, we immediately break out of the loop, and another compenent
1077 * serializes the remaining indications to a file.
|
1078 h.sterling 1.1 *
|
1079 marek 1.21 * An indication gets retried
1080 * if the consumer throws a CIM_ERR_FAILED exception.
|
1081 h.sterling 1.1 *
|
1082 marek 1.21 * This function makes sure it waits until the default retry lapse has passed
1083 * to avoid issues with the following scenario:
|
1084 h.sterling 1.5 * 20 new indications come in, 10 of them are successful, 10 are not.
|
1085 marek 1.21 * We were signalled 20 times, so we will pass the time_wait 20 times.
1086 * Perceivably, the process time on each indication could be minimal.
1087 * We could potentially proceed to process the retries after a very small time
1088 * interval since we would never hit the wait for the retry timeout.
|
1089 h.sterling 1.1 *
1090 */
|
1091 marek 1.21 ThreadReturnType PEGASUS_THREAD_CDECL
1092 ConsumerManager::_worker_routine(void *param)
|
1093 h.sterling 1.1 {
1094 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
1095
1096 DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
1097 String name = myself->getName();
|
1098 mike 1.15 List<IndicationDispatchEvent,Mutex> tmpEventQueue;
|
1099 h.sterling 1.1
|
1100 marek 1.21 PEG_TRACE_STRING(
1101 TRC_LISTENER,
|
1102 marek 1.25 Tracer::LEVEL3,
|
1103 marek 1.21 "_worker_routine::entering loop for " + name);
|
1104 h.sterling 1.1
|
1105 h.sterling 1.8 myself->_listeningSemaphore->signal();
1106
|
1107 h.sterling 1.1 while (true)
1108 {
1109 try
1110 {
|
1111 marek 1.21 PEG_TRACE_STRING(
1112 TRC_LISTENER,
1113 Tracer::LEVEL4,
1114 "_worker_routine::waiting " + name);
|
1115 h.sterling 1.1
1116 //wait to be signalled
1117 myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
1118
|
1119 marek 1.21 PEG_TRACE_STRING(
1120 TRC_LISTENER,
1121 Tracer::LEVEL4,
1122 "_worker_routine::signalled " + name);
|
1123 h.sterling 1.1
1124 //check whether we received the shutdown signal
1125 if (myself->_dieNow)
1126 {
|
1127 marek 1.21 PEG_TRACE_STRING(
1128 TRC_LISTENER,
1129 Tracer::LEVEL4,
1130 "_worker_routine::shutdown received " + name);
|
1131 h.sterling 1.1 break;
1132 }
1133
1134 //create a temporary queue to store failed indications
|
1135 mike 1.14 tmpEventQueue.clear();
|
1136 h.sterling 1.1
1137 //continue processing events until the queue is empty
1138 //make sure to check for the shutdown signal before every iteration
|
1139 marek 1.21 // Note that any time during our processing of events the Listener
1140 // may be enqueueing NEW events for us to process.
1141 // Because we are popping off the front and new events are being
1142 // thrown on the back if events are failing when we start
1143 // But are succeeding by the end of the processing, events may be
1144 // sent out of chronological order.
1145 // However. Once we complete the current queue of events, we will
1146 // always send old events to be retried before sending any
|
1147 h.sterling 1.10 // new events added afterwards.
|
1148 h.sterling 1.1 while (myself->_eventqueue.size())
1149 {
1150 //check for shutdown signal
|
1151 marek 1.21 //this only breaks us out of the queue loop, but we will
1152 //immediately get through the next wait from
1153 //the shutdown signal itself, at which time we break
1154 //out of the main loop
|
1155 h.sterling 1.1 if (myself->_dieNow)
1156 {
|
1157 marek 1.21 PEG_TRACE_STRING(
1158 TRC_LISTENER,
1159 Tracer::LEVEL4,
1160 "Received signal to shutdown,"
1161 " jumping out of queue loop " + name);
|
1162 h.sterling 1.1 break;
1163 }
1164
1165 //pop next indication off the queue
1166 IndicationDispatchEvent* event = 0;
|
1167 marek 1.21 //what exceptions/errors can this throw?
1168 event = myself->_eventqueue.remove_front();
|
1169 h.sterling 1.1
1170 if (!event)
1171 {
1172 //this should never happen
1173 continue;
1174 }
1175
|
1176 marek 1.21 PEG_TRACE_STRING(
1177 TRC_LISTENER,
1178 Tracer::LEVEL4,
1179 "_worker_routine::consumeIndication " + name);
|
1180 h.sterling 1.1
1181 try
1182 {
1183 myself->consumeIndication(event->getContext(),
1184 event->getURL(),
1185 event->getIndicationInstance());
1186
|
1187 marek 1.21 PEG_TRACE_STRING(
1188 TRC_LISTENER,
1189 Tracer::LEVEL4,
1190 "_worker_routine::processed indication successfully. "
1191 + name);
|
1192 h.sterling 1.1
1193 delete event;
1194 continue;
1195
1196 } catch (CIMException & ce)
1197 {
1198 //check for failure
1199 if (ce.getCode() == CIM_ERR_FAILED)
1200 {
|
1201 marek 1.21 PEG_TRACE_STRING(
1202 TRC_LISTENER,
1203 Tracer::LEVEL2,
1204 "_worker_routine::consumeIndication() temporary"
1205 " failure: " + ce.getMessage() + " " + name);
|
1206 h.sterling 1.10
|
1207 marek 1.21 // Here we simply determine if we should increment
1208 // the retry count or not.
1209 // We don't want to count a forced retry from a new
1210 // event to count as a retry.
1211 // We just have to do it for order's sake.
1212 // If the retry Lapse has lapsed on this event,
1213 // then increment the counter.
1214 if (event->getRetries() > 0)
1215 {
1216 Sint64 differenceInMicroseconds =
1217 CIMDateTime::getDifference(
1218 event->getLastAttemptTime(),
1219 CIMDateTime::getCurrentDateTime());
1220
1221 if (differenceInMicroseconds >=
1222 (DEFAULT_RETRY_LAPSE * 1000))
1223 {
|
1224 h.sterling 1.10 event->increaseRetries();
|
1225 marek 1.21 }
1226 }
1227 else
1228 {
|
1229 h.sterling 1.10 event->increaseRetries();
1230 }
|
1231 h.sterling 1.1
1232 //determine if we have hit the max retry count
1233 if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
1234 {
|
1235 marek 1.21 PEG_TRACE_CSTRING(
1236 TRC_LISTENER,
|
1237 marek 1.25 Tracer::LEVEL1,
|
1238 marek 1.21 "Error: the maximum retry count has been "
1239 "exceeded. Removing the event from "
1240 "the queue.");
|
1241 h.sterling 1.1
1242 Logger::put(
|
1243 marek 1.21 Logger::ERROR_LOG,
1244 System::CIMLISTENER,
1245 Logger::SEVERE,
1246 "The following indication did not get "
1247 "processed successfully: $0",
1248 event->getIndicationInstance().getPath().toString());
|
1249 h.sterling 1.1
1250 delete event;
1251 continue;
1252
1253 } else
1254 {
|
1255 marek 1.21 PEG_TRACE_CSTRING(
1256 TRC_LISTENER,
1257 Tracer::LEVEL4,
1258 "_worker_routine::placing failed indication "
1259 "back in queue");
|
1260 mike 1.14 tmpEventQueue.insert_back(event);
|
1261 h.sterling 1.1 }
1262
1263 } else
1264 {
|
1265 marek 1.21 PEG_TRACE_STRING(
1266 TRC_LISTENER,
|
1267 marek 1.25 Tracer::LEVEL1,
|
1268 marek 1.21 "Error: consumeIndication() permanent failure: "
1269 + ce.getMessage());
|
1270 h.sterling 1.1 delete event;
1271 continue;
1272 }
1273
1274 } catch (Exception & ex)
1275 {
|
1276 marek 1.21 PEG_TRACE_STRING(
1277 TRC_LISTENER,
|
1278 marek 1.25 Tracer::LEVEL1,
|
1279 marek 1.21 "Error: consumeIndication() permanent failure: "
1280 + ex.getMessage());
|
1281 h.sterling 1.1 delete event;
1282 continue;
1283
1284 } catch (...)
1285 {
|
1286 marek 1.21 PEG_TRACE_CSTRING(
1287 TRC_LISTENER,
|
1288 marek 1.25 Tracer::LEVEL1,
|
1289 marek 1.21 "Error: consumeIndication() failed: "
1290 "Unknown exception.");
|
1291 h.sterling 1.1 delete event;
1292 continue;
1293 } //end try
1294
1295 } //while eventqueue
1296
|
1297 h.sterling 1.10 // Copy the failed indications back to the main queue
|
1298 marek 1.21 // We now lock the queue while adding the retries on to the queue
1299 // so that new events can't get in in front
1300 // Of those events we are retrying. Retried events happened before
1301 // any new events coming in.
|
1302 h.sterling 1.1 IndicationDispatchEvent* tmpEvent = 0;
|
1303 h.sterling 1.10 myself->_eventqueue.try_lock();
|
1304 h.sterling 1.1 while (tmpEventQueue.size())
1305 {
|
1306 mike 1.14 tmpEvent = tmpEventQueue.remove_front();
1307 myself->_eventqueue.insert_back(tmpEvent);
|
1308 h.sterling 1.10
|
1309 h.sterling 1.1 }
|
1310 h.sterling 1.10 myself->_eventqueue.unlock();
|
1311 h.sterling 1.1
|
1312 kumpf 1.16 } catch (TimeOut&)
|
1313 h.sterling 1.1 {
|
1314 marek 1.21 PEG_TRACE_CSTRING(
1315 TRC_LISTENER,
1316 Tracer::LEVEL4,
1317 "_worker_routine::Time to retry any outstanding indications.");
1318
1319 // signal the queue in the same way we would,
1320 // if we received a new indication
1321 // this allows the thread to fall into the queue processing code
|
1322 h.sterling 1.1 myself->_check_queue->signal();
1323
1324 } //time_wait
1325
1326
1327 } //shutdown
1328
1329 PEG_METHOD_EXIT();
1330 return 0;
1331 }
1332
1333
1334 PEGASUS_NAMESPACE_END
|