1 karl 1.12 //%2006////////////////////////////////////////////////////////////////////////
|
2 h.sterling 1.1 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 h.sterling 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 h.sterling 1.1 #include <Pegasus/Common/Config.h>
35 //#include <cstdlib>
36 //#include <dlfcn.h>
37 #include <Pegasus/Common/System.h>
38 #include <Pegasus/Common/FileSystem.h>
39 #include <Pegasus/Common/Tracer.h>
40 #include <Pegasus/Common/Logger.h>
41 #include <Pegasus/Common/XmlReader.h>
|
42 mike 1.15 #include <Pegasus/Common/ThreadPool.h>
|
43 h.sterling 1.1 #include <Pegasus/Common/XmlParser.h>
44 #include <Pegasus/Common/XmlWriter.h>
|
45 mike 1.14 #include <Pegasus/Common/List.h>
|
46 mike 1.15 #include <Pegasus/Common/Mutex.h>
|
47 h.sterling 1.1
48 #include "ConsumerManager.h"
49
50 PEGASUS_NAMESPACE_BEGIN
51 PEGASUS_USING_STD;
52
|
53 marek 1.21 //ATTN: Can we just use a properties file instead?? If we only have one
54 // property, we may want to just parse it ourselves.
55 // We may need to add more properties, however. Potential per consumer
56 // properties: unloadOk, idleTimout, retryCount, etc
|
57 h.sterling 1.1 static struct OptionRow optionsTable[] =
58 //optionname defaultvalue rqd type domain domainsize clname hlpmsg
59 {
|
60 marek 1.21 {"location", "", false, Option::STRING, 0, 0,
61 "location", "library name for the consumer"},
|
62 h.sterling 1.1 };
63
64 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
65
66 //retry settings
67 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
|
68 h.sterling 1.6 static const Uint32 DEFAULT_RETRY_LAPSE = 300000; //ms = 5 minutes
|
69 h.sterling 1.1
|
70 marek 1.21 //constant for fake property that is added to instances when
71 // serializing to track the full URL
|
72 h.sterling 1.11 static const String URL_PROPERTY = "URLString";
|
73 h.sterling 1.1
74
|
75 marek 1.21 ConsumerManager::ConsumerManager(
76 const String& consumerDir,
77 const String& consumerConfigDir,
78 Boolean enableConsumerUnload,
79 Uint32 idleTimeout) :
80 _consumerDir(consumerDir),
81 _consumerConfigDir(consumerConfigDir),
82 _enableConsumerUnload(enableConsumerUnload),
83 _idleTimeout(idleTimeout),
84 _forceShutdown(true)
|
85 h.sterling 1.1 {
86 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
87
|
88 marek 1.21 PEG_TRACE_STRING(
89 TRC_LISTENER,
90 Tracer::LEVEL4,
91 "Consumer library directory: " + consumerDir);
92 PEG_TRACE_STRING(
93 TRC_LISTENER,
94 Tracer::LEVEL4,
95 "Consumer configuration directory: " + consumerConfigDir);
|
96 h.sterling 1.1
|
97 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
|
98 h.sterling 1.1 "Consumer unload enabled %d: idle timeout %d",
99 enableConsumerUnload,
|
100 marek 1.18 idleTimeout));
|
101 h.sterling 1.1
102
|
103 h.sterling 1.8 //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
|
104 h.sterling 1.6 //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
105 h.sterling 1.1
|
106 kumpf 1.3 struct timeval deallocateWait = {15, 0};
107 _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
|
108 h.sterling 1.1
109 _init();
110
111 PEG_METHOD_EXIT();
112 }
113
114 ConsumerManager::~ConsumerManager()
115 {
116 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
117
118 unloadAllConsumers();
119
|
120 kumpf 1.3 delete _thread_pool;
|
121 h.sterling 1.1
122 ConsumerTable::Iterator i = _consumers.start();
123 for (; i!=0; i++)
124 {
125 DynamicConsumer* consumer = i.value();
126 delete consumer;
127 }
128
|
129 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
|
130 h.sterling 1.1
131 ModuleTable::Iterator j = _modules.start();
132 for (;j!=0;j++)
133 {
134 ConsumerModule* module = j.value();
135 delete module;
136 }
137
|
138 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
|
139 h.sterling 1.1
140 PEG_METHOD_EXIT();
141 }
142
143 void ConsumerManager::_init()
144 {
145 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
146
147 //check if there are any outstanding indications
148 Array<String> files;
149 Uint32 pos;
150 String consumerName;
151
152 if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
153 {
154 for (Uint32 i = 0; i < files.size(); i++)
155 {
156 pos = files[i].find(".dat");
157 if (pos != PEG_NOT_FOUND)
158 {
159 consumerName = files[i].subString(0, pos);
160 h.sterling 1.1
161 try
162 {
|
163 marek 1.21 PEG_TRACE_STRING(
164 TRC_LISTENER,
165 Tracer::LEVEL4,
166 "Attempting to load indication for!" +
167 consumerName + "!");
|
168 h.sterling 1.1 getConsumer(consumerName);
169
170 } catch (...)
171 {
|
172 marek 1.21 PEG_TRACE_STRING(
173 TRC_LISTENER,
174 Tracer::LEVEL4,
175 "Cannot load consumer from file " + files[i]);
|
176 h.sterling 1.1 }
177 }
178 }
179 }
180
181 PEG_METHOD_EXIT();
182 }
183
184 String ConsumerManager::getConsumerDir()
185 {
186 return _consumerDir;
187 }
188
189 String ConsumerManager::getConsumerConfigDir()
190 {
191 return _consumerConfigDir;
192 }
193
194 Boolean ConsumerManager::getEnableConsumerUnload()
195 {
196 return _enableConsumerUnload;
197 h.sterling 1.1 }
198
199 Uint32 ConsumerManager::getIdleTimeout()
200 {
201 return _idleTimeout;
202 }
203
|
204 marek 1.21 /** Retrieves the library name associated with the consumer name.
205 * By default, the library name
206 * is the same as the consumer name. However, you may specify a different
207 * library name in a consumer configuration file. This file must be named
208 * "MyConsumer.txt" and contain the following: location="libraryName"
209 *
210 * The config file is optional and is generally only needed in cases where
211 * there are strict requirements on library naming.
212 *
213 * It is the responsibility of the caller to catch any exceptions
214 * thrown by this method.
215 */
|
216 h.sterling 1.1 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
217 {
218 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
219
220 //default library name is consumer name
221 String libraryName = consumerName;
222
|
223 marek 1.21 // check whether an alternative library name was specified in an optional
224 // consumer config file
225 String configFile = FileSystem::getAbsolutePath(
226 (const char*)_consumerConfigDir.getCString(),
227 String(consumerName + ".conf"));
228 PEG_TRACE_STRING(
229 TRC_LISTENER,
230 Tracer::LEVEL4,
231 "Looking for config file " + configFile);
|
232 h.sterling 1.1
233 if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
234 {
|
235 marek 1.21 PEG_TRACE_STRING(
236 TRC_LISTENER,
237 Tracer::LEVEL4,
238 "Found config file for consumer " + consumerName);
|
239 h.sterling 1.1
240 try
241 {
|
242 marek 1.21 //Bugzilla 3765 - Change this to use a member var
243 // when OptionManager has a reset option
|
244 h.sterling 1.8 OptionManager _optionMgr;
|
245 marek 1.21 //comment the following line out later
246 _optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
247 h.sterling 1.1 _optionMgr.mergeFile(configFile);
248 _optionMgr.checkRequiredOptions();
249
|
250 marek 1.21 if (!_optionMgr.lookupValue("location", libraryName) ||
251 (libraryName == String::EMPTY))
|
252 h.sterling 1.1 {
|
253 marek 1.21 PEG_TRACE_STRING(
254 TRC_LISTENER,
255 Tracer::LEVEL2,
256 "Warning: Using default library name since none was "
257 "specified in " + configFile);
|
258 h.sterling 1.1 libraryName = consumerName;
259 }
260
261 } catch (Exception & ex)
262 {
|
263 marek 1.21 throw Exception(
264 MessageLoaderParms(
265 "DynListener.ConsumerManager.INVALID_CONFIG_FILE",
266 "Error reading $0: $1.",
267 configFile,
268 ex.getMessage()));
|
269 h.sterling 1.1 }
270 } else
271 {
|
272 marek 1.21 PEG_TRACE_STRING(
273 TRC_LISTENER,
274 Tracer::LEVEL4,
275 "No config file exists for " + consumerName);
|
276 h.sterling 1.1 }
277
|
278 marek 1.21 PEG_TRACE_STRING(
279 TRC_LISTENER,
280 Tracer::LEVEL4,
281 "The library name for " + consumerName + " is " + libraryName);
|
282 h.sterling 1.1
283 PEG_METHOD_EXIT();
284 return libraryName;
285 }
286
|
287 marek 1.21 /** Returns the DynamicConsumer for the consumerName. If it already exists,
288 * we return the one in the cache. If it
|
289 h.sterling 1.1 * DNE, we create it and initialize it, and add it to the table.
|
290 marek 1.21 * @throws Exception if we cannot successfully create and
291 * initialize the consumer
|
292 h.sterling 1.1 */
293 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
294 {
295 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
296
297 DynamicConsumer* consumer = 0;
298 CIMIndicationConsumerProvider* consumerRef = 0;
299 Boolean cached = false;
300 Boolean entryExists = false;
301
302 AutoMutex lock(_consumerTableMutex);
303
304 if (_consumers.lookup(consumerName, consumer))
305 {
306 //why isn't this working??
307 entryExists = true;
308
309 if (consumer && consumer->isLoaded())
310 {
|
311 marek 1.21 PEG_TRACE_STRING(
312 TRC_LISTENER,
313 Tracer::LEVEL3,
314 "Consumer exists in the cache and"
315 " is already loaded: " + consumerName);
|
316 h.sterling 1.1 cached = true;
317 }
318 } else
319 {
|
320 marek 1.21 PEG_TRACE_STRING(
321 TRC_LISTENER,
322 Tracer::LEVEL3,
323 "Consumer not found in cache, creating " + consumerName);
|
324 h.sterling 1.1 consumer = new DynamicConsumer(consumerName);
325 //ATTN: The above is a memory leak if _initConsumer throws an exception
326 //need to delete it in that case
327 }
328
329 if (!cached)
330 {
331 _initConsumer(consumerName, consumer);
332
333 if (!entryExists)
334 {
335 _consumers.insert(consumerName, consumer);
336 }
337 }
338
339 consumer->updateIdleTimer();
340
341 PEG_METHOD_EXIT();
342 return consumer;
343 }
344
345 h.sterling 1.1 /** Initializes a DynamicConsumer.
|
346 marek 1.21 * Caller assumes responsibility for mutexing the operation as well as
347 * ensuring the consumer does not already exist.
|
348 h.sterling 1.1 * @throws Exception if the consumer cannot be initialized
349 */
|
350 marek 1.21 void ConsumerManager::_initConsumer(
351 const String& consumerName,
352 DynamicConsumer* consumer)
|
353 h.sterling 1.1 {
354 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
355
356 CIMIndicationConsumerProvider* base = 0;
357 ConsumerModule* module = 0;
358
|
359 marek 1.21 // lookup provider module in cache (if it exists, it returns
360 // the cached module, otherwise it creates and returns a new one)
|
361 h.sterling 1.1 String libraryName = _getConsumerLibraryName(consumerName);
362 module = _lookupModule(libraryName);
363
364 //build library path
|
365 marek 1.21 String libraryPath = FileSystem::getAbsolutePath(
366 (const char*)_consumerDir.getCString(),
367 FileSystem::buildLibraryFileName(libraryName));
368 PEG_TRACE_STRING(
369 TRC_LISTENER,
370 Tracer::LEVEL4,
371 "Loading library: " + libraryPath);
|
372 h.sterling 1.1
373 //load module
374 try
375 {
376 base = module->load(consumerName, libraryPath);
377 consumer->set(module, base);
378
379 } catch (Exception& ex)
380 {
|
381 marek 1.21 PEG_TRACE_STRING(
382 TRC_LISTENER,
383 Tracer::LEVEL2,
384 "Error loading consumer module: " + ex.getMessage());
385
386 throw Exception(
387 MessageLoaderParms(
388 "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
389 "Cannot load module ($0:$1): Unknown exception.",
390 consumerName,
391 libraryName));
|
392 h.sterling 1.1 } catch (...)
393 {
|
394 marek 1.21 throw Exception(
395 MessageLoaderParms(
396 "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
397 "Cannot load module ($0:$1): Unknown exception.",
398 consumerName,
399 libraryName));
|
400 h.sterling 1.1 }
401
|
402 marek 1.21 PEG_TRACE_STRING(
403 TRC_LISTENER,
404 Tracer::LEVEL4,
405 "Successfully loaded consumer module " + libraryName);
|
406 h.sterling 1.1
407 //initialize consumer
408 try
409 {
|
410 marek 1.21 PEG_TRACE_STRING(
411 TRC_LISTENER,
412 Tracer::LEVEL4,
413 "Initializing Consumer " + consumerName);
|
414 h.sterling 1.1
415 consumer->initialize();
416
417 //ATTN: need to change this
418 Semaphore* semaphore = new Semaphore(0); //blocking
419
420 consumer->setShutdownSemaphore(semaphore);
421
422 //start the worker thread
|
423 konrad.r 1.7 if (_thread_pool->allocate_and_awaken(consumer,
|
424 h.sterling 1.1 _worker_routine,
|
425 konrad.r 1.7 semaphore) != PEGASUS_THREAD_OK)
|
426 marek 1.21 {
|
427 marek 1.23 // TBD-7646
|
428 marek 1.21 PEG_TRACE_CSTRING(
429 TRC_LISTENER,
430 Tracer::LEVEL2,
431 "Could not allocate thread for consumer.");
432
433 consumer->setShutdownSemaphore(0);
434 delete semaphore;
435 throw Exception(
436 MessageLoaderParms(
437 "DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
438 "Not enough threads for consumer worker routine."));
|
439 konrad.r 1.7 }
|
440 h.sterling 1.1
|
441 marek 1.21 //wait until the listening thread has started.
442 // Otherwise, there is a miniscule chance that the first event will
443 // be enqueued before the consumer is waiting for it and the first
444 // indication after loading the consumer will be lost
|
445 h.sterling 1.8 consumer->waitForEventThread();
446
|
447 h.sterling 1.1 //load any outstanding requests
|
448 marek 1.21 Array<IndicationDispatchEvent> outstandingIndications =
449 _deserializeOutstandingIndications(consumerName);
|
450 h.sterling 1.1 if (outstandingIndications.size())
451 {
452 //the consumer will signal itself in _loadOustandingIndications
453 consumer->_loadOutstandingIndications(outstandingIndications);
454 }
455
|
456 marek 1.21 PEG_TRACE_STRING(
457 TRC_LISTENER,
458 Tracer::LEVEL4,
459 "Successfully initialized consumer " + consumerName);
|
460 h.sterling 1.1
461 } catch (...)
462 {
463 module->unloadModule();
464 consumer->reset();
|
465 marek 1.21 throw Exception(
466 MessageLoaderParms(
467 "DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
468 "Cannot initialize consumer ($0).",
469 consumerName));
|
470 h.sterling 1.1 }
471
472 PEG_METHOD_EXIT();
473 }
474
475
|
476 marek 1.21 /** Returns the ConsumerModule with the given library name.
477 * If it already exists, we return the one in the cache. If it
|
478 h.sterling 1.1 * DNE, we create it and add it to the table.
|
479 marek 1.21 * @throws Exception if we cannot successfully create and
480 * initialize the consumer
|
481 h.sterling 1.1 */
|
482 marek 1.21 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
|
483 h.sterling 1.1 {
484 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
485
486 AutoMutex lock(_moduleTableMutex);
487
488 ConsumerModule* module = 0;
489
490 //see if consumer module is cached
491 if (_modules.lookup(libraryName, module))
492 {
|
493 marek 1.21 PEG_TRACE_STRING(
494 TRC_LISTENER,
495 Tracer::LEVEL4,
496 "Found Consumer Module" +
497 libraryName + " in Consumer Manager Cache");
|
498 h.sterling 1.1
499 } else
500 {
|
501 marek 1.21 PEG_TRACE_STRING(
502 TRC_LISTENER,
503 Tracer::LEVEL4,
504 "Creating Consumer Provider Module " + libraryName);
|
505 h.sterling 1.1
506 module = new ConsumerModule();
507 _modules.insert(libraryName, module);
508 }
509
510 PEG_METHOD_EXIT();
511 return(module);
512 }
513
514 /** Returns true if there are active consumers
515 */
516 Boolean ConsumerManager::hasActiveConsumers()
517 {
518 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
519
520 AutoMutex lock(_consumerTableMutex);
521 DynamicConsumer* consumer = 0;
522
523 try
524 {
525 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
526 h.sterling 1.1 {
527 consumer = i.value();
528
|
529 marek 1.21 if (consumer &&
530 consumer->isLoaded() &&
531 (consumer->getPendingIndications() > 0))
|
532 h.sterling 1.1 {
|
533 marek 1.21 PEG_TRACE_STRING(
534 TRC_LISTENER,
535 Tracer::LEVEL4,
536 "Found active consumer: " + consumer->_name);
|
537 h.sterling 1.1 PEG_METHOD_EXIT();
538 return true;
539 }
540 }
541 } catch (...)
542 {
543 // Unexpected exception; do not assume that no providers are loaded
|
544 marek 1.21 PEG_TRACE_CSTRING(
545 TRC_LISTENER,
546 Tracer::LEVEL2,
547 "Unexpected Exception in hasActiveConsumers.");
|
548 h.sterling 1.1 PEG_METHOD_EXIT();
549 return true;
550 }
551
552 PEG_METHOD_EXIT();
553 return false;
554 }
555
556 /** Returns true if there are loaded consumers
557 */
558 Boolean ConsumerManager::hasLoadedConsumers()
559 {
560 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
561
562 AutoMutex lock(_consumerTableMutex);
563 DynamicConsumer* consumer = 0;
564
565 try
566 {
567 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
568 {
569 h.sterling 1.1 consumer = i.value();
570
571 if (consumer && consumer->isLoaded())
572 {
|
573 marek 1.21 PEG_TRACE_STRING(
574 TRC_LISTENER,
575 Tracer::LEVEL4,
576 "Found loaded consumer: " + consumer->_name);
|
577 h.sterling 1.1 PEG_METHOD_EXIT();
578 return true;
579 }
580 }
581 } catch (...)
582 {
583 // Unexpected exception; do not assume that no providers are loaded
|
584 marek 1.21 PEG_TRACE_CSTRING(
585 TRC_LISTENER,
586 Tracer::LEVEL2,
587 "Unexpected Exception in hasLoadedConsumers.");
|
588 h.sterling 1.1 PEG_METHOD_EXIT();
589 return true;
590 }
591
592 PEG_METHOD_EXIT();
593 return false;
594 }
595
596
597 /** Shutting down a consumer consists of four major steps:
|
598 marek 1.21 * 1) Send the shutdown signal. This causes the worker routine to break out
599 * of the loop and exit.
600 * 2) Wait for the worker thread to end. This may take a while if it's
601 * processing an indication. This is optional in a shutdown scenario.
602 * If the listener is shutdown with a -f force, the listener
603 * will not wait for the consumer to finish before shutting down.
604 * Note that a normal shutdown only allows the current consumer indication
605 * to finish. All other queued indications are serialized to a log and
|
606 h.sterling 1.1 * are sent when the consumer is reoaded.
607 * 3) Terminate the consumer provider interface.
|
608 marek 1.21 * 4) Decrement the module refcount (the module will automatically unload when
609 * it's refcount == 0)
|
610 h.sterling 1.1 *
|
611 marek 1.21 * In a scenario where more multiple consumers are loaded, the shutdown signal
612 * should be sent to all of the consumers so the threads can finish
613 * simultaneously.
|
614 h.sterling 1.1 *
|
615 marek 1.21 * ATTN: Should the normal shutdown wait for everything in the queue to be
616 * processed? Just new indications to be processed? I am not inclined to this
617 * solution since it could take a LOT of time. By serializing and deserialing
618 * indications between shutdown and startup, I feel like we do not need to
619 * process ALL queued indications on shutdown.
|
620 h.sterling 1.1 */
621
622 /** Unloads all consumers.
623 */
624 void ConsumerManager::unloadAllConsumers()
625 {
626 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
627
628 AutoMutex lock(_consumerTableMutex);
629
630 if (!_consumers.size())
631 {
|
632 marek 1.21 PEG_TRACE_CSTRING(
633 TRC_LISTENER,
634 Tracer::LEVEL4,
635 "There are no consumers to unload.");
|
636 h.sterling 1.1 PEG_METHOD_EXIT();
637 return;
638 }
639
640 if (!_forceShutdown)
641 {
|
642 marek 1.21 // wait until all the consumers have finished processing the events in
643 // their queue
644 // ATTN: Should this have a timeout even though it's a force??
|
645 h.sterling 1.1 while (hasActiveConsumers())
646 {
|
647 mike 1.15 Threads::sleep(500);
|
648 h.sterling 1.1 }
649 }
650
651 Array<DynamicConsumer*> loadedConsumers;
652
653 ConsumerTable::Iterator i = _consumers.start();
654 DynamicConsumer* consumer = 0;
655
656 for (; i!=0; i++)
657 {
658 consumer = i.value();
659 if (consumer && consumer->isLoaded())
660 {
661 loadedConsumers.append(consumer);
662 }
663 }
664
665 if (loadedConsumers.size())
666 {
667 try
668 {
669 h.sterling 1.1 _unloadConsumers(loadedConsumers);
670
|
671 kumpf 1.16 } catch (Exception&)
|
672 h.sterling 1.1 {
|
673 marek 1.21 PEG_TRACE_CSTRING(
674 TRC_LISTENER,
675 Tracer::LEVEL4,
676 "Error unloading consumers.");
|
677 h.sterling 1.1 }
678 } else
679 {
|
680 marek 1.21 PEG_TRACE_CSTRING(
681 TRC_LISTENER,
682 Tracer::LEVEL4,
683 "There are no consumers to unload.");
|
684 h.sterling 1.1 }
685
686 PEG_METHOD_EXIT();
687 }
688
689 /** Unloads idle consumers.
690 */
691 void ConsumerManager::unloadIdleConsumers()
692 {
693 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
694
695 AutoMutex lock(_consumerTableMutex);
696
697 if (!_consumers.size())
698 {
|
699 marek 1.21 PEG_TRACE_CSTRING(
700 TRC_LISTENER,
701 Tracer::LEVEL4,
702 "There are no consumers to unload.");
|
703 h.sterling 1.1 PEG_METHOD_EXIT();
704 return;
705 }
706
707 Array<DynamicConsumer*> loadedConsumers;
708
709 ConsumerTable::Iterator i = _consumers.start();
710 DynamicConsumer* consumer = 0;
711
712 for (; i!=0; i++)
713 {
714 consumer = i.value();
715 if (consumer && consumer->isLoaded() && consumer->isIdle())
716 {
717 loadedConsumers.append(consumer);
718 }
719 }
720
721 if (loadedConsumers.size())
722 {
723 try
724 h.sterling 1.1 {
725 _unloadConsumers(loadedConsumers);
726
|
727 kumpf 1.16 } catch (Exception&)
|
728 h.sterling 1.1 {
|
729 marek 1.21 PEG_TRACE_CSTRING(
730 TRC_LISTENER,
731 Tracer::LEVEL4,
732 "Error unloading consumers.");
|
733 h.sterling 1.1 }
734 } else
735 {
|
736 marek 1.21 PEG_TRACE_CSTRING(
737 TRC_LISTENER,
738 Tracer::LEVEL4,
739 "There are no consumers to unload.");
|
740 h.sterling 1.1 }
741
742 PEG_METHOD_EXIT();
743 }
744
745 /** Unloads a single consumer.
746 */
747 void ConsumerManager::unloadConsumer(const String& consumerName)
748 {
749 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
750
751 AutoMutex lock(_consumerTableMutex);
752
753 DynamicConsumer* consumer = 0;
754
755 //check whether the consumer exists
756 if (!_consumers.lookup(consumerName, consumer))
757 {
|
758 marek 1.21 PEG_TRACE_STRING(
759 TRC_LISTENER,
760 Tracer::LEVEL3,
761 "Error: cannot unload consumer, unknown consumer " + consumerName);
|
762 h.sterling 1.1 return;
763 }
764
765 //check whether the consumer is loaded
766 if (consumer && consumer->isLoaded()) //ATTN: forceShutdown?
767 {
768 //unload the consumer
769 Array<DynamicConsumer*> loadedConsumers;
770 loadedConsumers.append(consumer);
771
772 try
773 {
774 _unloadConsumers(loadedConsumers);
775
|
776 kumpf 1.16 } catch (Exception&)
|
777 h.sterling 1.1 {
|
778 marek 1.21 PEG_TRACE_CSTRING(
779 TRC_LISTENER,
780 Tracer::LEVEL4,
781 "Error unloading consumers.");
|
782 h.sterling 1.1 }
783
784 } else
785 {
|
786 marek 1.21 PEG_TRACE_STRING(
787 TRC_LISTENER,
788 Tracer::LEVEL3,
789 "Error: cannot unload consumer " + consumerName);
|
790 h.sterling 1.1 }
791
792 PEG_METHOD_EXIT();
793 }
794
795 /** Unloads the consumers in the given array.
796 * The consumerTable mutex MUST be locked prior to entering this method.
797 */
|
798 marek 1.21 void ConsumerManager::_unloadConsumers(
799 Array<DynamicConsumer*> consumersToUnload)
|
800 h.sterling 1.1 {
801 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
802
803 //tell consumers to shutdown
804 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
805 {
806 consumersToUnload[i]->sendShutdownSignal();
807 }
808
|
809 marek 1.21 PEG_TRACE_CSTRING(
|
810 kumpf 1.22 TRC_LISTENER,
|
811 marek 1.21 Tracer::LEVEL4,
812 "Sent shutdown signal to all consumers.");
813
814 // wait for all the consumer worker threads to complete
815 // since we can only shutdown after they are all complete,
816 // it does not matter if the first, fifth, or last
817 // consumer takes the longest; the wait time is equal to the time it takes
818 // for the busiest consumer to stop processing its requests.
|
819 h.sterling 1.1 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
820 {
|
821 marek 1.21 PEG_TRACE_STRING(
822 TRC_LISTENER,
823 Tracer::LEVEL3,
824 "Unloading consumer " + consumersToUnload[i]->getName());
|
825 h.sterling 1.1
826 //wait for the consumer worker thread to end
827 try
828 {
|
829 marek 1.21 Semaphore* _shutdownSemaphore =
830 consumersToUnload[i]->getShutdownSemaphore();
|
831 h.sterling 1.1 if (_shutdownSemaphore)
832 {
833 _shutdownSemaphore->time_wait(10000);
834 }
835
836 } catch (TimeOut &)
837 {
|
838 marek 1.21 PEG_TRACE_CSTRING(
839 TRC_LISTENER,
840 Tracer::LEVEL2,
841 "Timed out while attempting to stop consumer thread.");
|
842 h.sterling 1.1 }
843
|
844 marek 1.21 PEG_TRACE_CSTRING(
845 TRC_LISTENER,
846 Tracer::LEVEL2,
847 "Terminating consumer.");
|
848 h.sterling 1.1
849 try
850 {
851 //terminate consumer provider interface
852 consumersToUnload[i]->terminate();
853
854 //unload consumer provider module
855 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
856 consumersToUnload[i]->_module->unloadModule();
857
858 //serialize outstanding indications
|
859 marek 1.21 _serializeOutstandingIndications(
860 consumersToUnload[i]->getName(),
861 consumersToUnload[i]->_retrieveOutstandingIndications());
|
862 h.sterling 1.1
863 //reset the consumer
864 consumersToUnload[i]->reset();
865
|
866 marek 1.21 PEG_TRACE_CSTRING(
867 TRC_LISTENER,
868 Tracer::LEVEL3,
869 "Consumer library successfully unloaded.");
|
870 h.sterling 1.1
871 } catch (Exception& e)
872 {
|
873 marek 1.21 PEG_TRACE_STRING(
874 TRC_LISTENER,
875 Tracer::LEVEL2,
876 "Error unloading consumer: " + e.getMessage());
|
877 h.sterling 1.1 //ATTN: throw exception? log warning?
878 }
879 }
880
881 PEG_METHOD_EXIT();
882 }
883
884 /** Serializes oustanding indications to a <MyConsumer>.dat file
885 */
|
886 marek 1.21 void ConsumerManager::_serializeOutstandingIndications(
887 const String& consumerName,
888 Array<IndicationDispatchEvent> indications)
889 {
890 PEG_METHOD_ENTER(
891 TRC_LISTENER,
892 "ConsumerManager::_serializeOutstandingIndications");
|
893 h.sterling 1.1
894 if (!indications.size())
895 {
896 PEG_METHOD_EXIT();
897 return;
898 }
899
|
900 marek 1.21 String fileName = FileSystem::getAbsolutePath(
901 (const char*)_consumerConfigDir.getCString(),
902 String(consumerName + ".dat"));
903 PEG_TRACE_STRING(
904 TRC_LISTENER,
905 Tracer::LEVEL4,
906 "Consumer dat file: " + fileName);
|
907 h.sterling 1.1
|
908 mike 1.9 Buffer buffer;
|
909 h.sterling 1.1
910 // Open the log file and serialize remaining
911 FILE* fileHandle = 0;
912 fileHandle = fopen((const char*)fileName.getCString(), "w");
913
914 if (!fileHandle)
915 {
|
916 marek 1.21 PEG_TRACE_STRING(
917 TRC_LISTENER,
918 Tracer::LEVEL2,
919 "Unable to open log file for " + consumerName);
|
920 h.sterling 1.1
921 } else
922 {
|
923 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
924 h.sterling 1.1 "Serializing %d outstanding requests for %s",
925 indications.size(),
|
926 marek 1.18 (const char*)consumerName.getCString()));
|
927 h.sterling 1.1
|
928 marek 1.21 // we have to put the array of instances under a valid root element
929 // or the parser complains
|
930 h.sterling 1.1 XmlWriter::append(buffer, "<IRETURNVALUE>\n");
931
|
932 marek 1.21 CIMInstance cimInstance;
|
933 h.sterling 1.1 for (Uint32 i = 0; i < indications.size(); i++)
934 {
|
935 marek 1.21 //set the URL string property on the serializable instance
936 CIMValue cimValue(CIMTYPE_STRING, false);
937 cimValue.set(indications[i].getURL());
938 cimInstance = indications[i].getIndicationInstance();
939 CIMProperty cimProperty(URL_PROPERTY, cimValue);
940 cimInstance.addProperty(cimProperty);
|
941 h.sterling 1.11
942 XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
|
943 marek 1.21 }
|
944 h.sterling 1.1
|
945 kumpf 1.19 XmlWriter::append(buffer, "</IRETURNVALUE>");
|
946 h.sterling 1.1
947 fputs((const char*)buffer.getData(), fileHandle);
948
949 fclose(fileHandle);
950 }
951
952 PEG_METHOD_EXIT();
953 }
954
955 /** Reads outstanding indications from a <MyConsumer>.dat file
956 */
|
957 marek 1.21 Array<IndicationDispatchEvent>
958 ConsumerManager::_deserializeOutstandingIndications(
959 const String& consumerName)
960 {
961 PEG_METHOD_ENTER(
962 TRC_LISTENER,
963 "ConsumerManager::_deserializeOutstandingIndications");
964
965 String fileName = FileSystem::getAbsolutePath(
966 (const char*)_consumerConfigDir.getCString(),
967 String(consumerName + ".dat"));
968 PEG_TRACE_STRING(
969 TRC_LISTENER,
970 Tracer::LEVEL4,
971 "Consumer dat file: " + fileName);
|
972 h.sterling 1.1
973 Array<CIMInstance> cimInstances;
|
974 marek 1.21 Array<String> urlStrings;
975 Array<IndicationDispatchEvent> indications;
|
976 h.sterling 1.1
977 // Open the log file and serialize remaining indications
978 if (FileSystem::exists(fileName) && FileSystem::canRead(fileName))
979 {
|
980 mike 1.9 Buffer text;
|
981 h.sterling 1.1 CIMInstance cimInstance;
|
982 marek 1.21 CIMProperty cimProperty;
983 CIMValue cimValue;
984 String urlString;
|
985 h.sterling 1.1 XmlEntry entry;
986
987 try
988 {
|
989 marek 1.21 //ATTN: Is this safe to use; what about CRLFs?
990 FileSystem::loadFileToMemory(text, fileName);
|
991 h.sterling 1.1
992 //parse the file
993 XmlParser parser((char*)text.getData());
994 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
995
996 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
997 {
|
998 marek 1.21 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
999 if (index != PEG_NOT_FOUND)
1000 {
1001 // get the URL string property from the serialized instance
1002 // and remove the property
1003 cimProperty = cimInstance.getProperty(index);
1004 cimValue = cimProperty.getValue();
1005 cimValue.get(urlString);
1006 cimInstance.removeProperty(index);
1007 }
1008 IndicationDispatchEvent* indicationEvent =
1009 new IndicationDispatchEvent(
1010 OperationContext(),
1011 urlString,
1012 cimInstance);
1013
|
1014 h.sterling 1.11 indications.append(*indicationEvent);
|
1015 h.sterling 1.1 }
1016
1017 XmlReader::expectEndTag(parser, "IRETURNVALUE");
1018
|
1019 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
1020 h.sterling 1.1 "Consumer %s has %d outstanding indications",
1021 (const char*)consumerName.getCString(),
|
1022 marek 1.18 indications.size()));
|
1023 h.sterling 1.1
1024 //delete the file
1025 FileSystem::removeFile(fileName);
1026
1027 } catch (Exception& ex)
1028 {
|
1029 marek 1.21 PEG_TRACE_STRING(
1030 TRC_LISTENER,
1031 Tracer::LEVEL3,
1032 "Error parsing dat file: " +
1033 ex.getMessage() + " " + consumerName);
|
1034 h.sterling 1.1
1035 } catch (...)
1036 {
|
1037 marek 1.21 PEG_TRACE_STRING(
1038 TRC_LISTENER,
1039 Tracer::LEVEL2,
1040 "Error parsing dat file: Unknown Exception " + consumerName);
|
1041 h.sterling 1.1 }
1042 }
1043
1044 PEG_METHOD_EXIT();
|
1045 h.sterling 1.11 return indications;
|
1046 h.sterling 1.1 }
1047
1048
1049
1050 /**
|
1051 marek 1.21 * This is the main worker thread of the consumer. By having only one thread
1052 * per consumer, we eliminate a ton of synchronization issues and make it easy
1053 * to prevent the consumer from performing two mutually exclusive operations
1054 * at once. This also prevents one bad consumer from taking the entire
1055 * listener down. That being said, it is up to the programmer to write smart
1056 * consumers, and to ensure that their actions don't deadlock
1057 * the worker thread.
|
1058 h.sterling 1.1 *
|
1059 marek 1.21 * If a consumer receives a lot of traffic, or it's consumeIndication() method
1060 * takes a considerable amount of time to complete, it may make sense to make
1061 * the consumer multi-threaded. The individual consumer can immediately
1062 * spawn off* new threads to handle indications, and return immediately to
1063 * catch the next indication. In this way, a consumer can attain
1064 * extremely high performance.
|
1065 h.sterling 1.1 *
1066 * There are three different events that can signal us:
1067 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
|
1068 marek 1.21 * 2) A shutdown signal (signalled from ConsumerManager, due to a listener
1069 * shutdown or an idle consumer state)
|
1070 h.sterling 1.1 * 3) A retry signal (signalled from this routine itself)
1071 *
|
1072 marek 1.21 * The idea is that all new indications are put on the front of the queue and
1073 * processed first. All of the retry indications are put on the back of the
1074 * queue and are only processed AFTER all new indications are sent.
1075 * Before processing each indication, we check to see whether or not the
1076 * shutdown signal was given.
1077 * If so, we immediately break out of the loop, and another compenent
1078 * serializes the remaining indications to a file.
|
1079 h.sterling 1.1 *
|
1080 marek 1.21 * An indication gets retried
1081 * if the consumer throws a CIM_ERR_FAILED exception.
|
1082 h.sterling 1.1 *
|
1083 marek 1.21 * This function makes sure it waits until the default retry lapse has passed
1084 * to avoid issues with the following scenario:
|
1085 h.sterling 1.5 * 20 new indications come in, 10 of them are successful, 10 are not.
|
1086 marek 1.21 * We were signalled 20 times, so we will pass the time_wait 20 times.
1087 * Perceivably, the process time on each indication could be minimal.
1088 * We could potentially proceed to process the retries after a very small time
1089 * interval since we would never hit the wait for the retry timeout.
|
1090 h.sterling 1.1 *
1091 */
|
1092 marek 1.21 ThreadReturnType PEGASUS_THREAD_CDECL
1093 ConsumerManager::_worker_routine(void *param)
|
1094 h.sterling 1.1 {
1095 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
1096
1097 DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
1098 String name = myself->getName();
|
1099 mike 1.15 List<IndicationDispatchEvent,Mutex> tmpEventQueue;
|
1100 h.sterling 1.1
|
1101 marek 1.21 PEG_TRACE_STRING(
1102 TRC_LISTENER,
1103 Tracer::LEVEL2,
1104 "_worker_routine::entering loop for " + name);
|
1105 h.sterling 1.1
|
1106 h.sterling 1.8 myself->_listeningSemaphore->signal();
1107
|
1108 h.sterling 1.1 while (true)
1109 {
1110 try
1111 {
|
1112 marek 1.21 PEG_TRACE_STRING(
1113 TRC_LISTENER,
1114 Tracer::LEVEL4,
1115 "_worker_routine::waiting " + name);
|
1116 h.sterling 1.1
1117 //wait to be signalled
1118 myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
1119
|
1120 marek 1.21 PEG_TRACE_STRING(
1121 TRC_LISTENER,
1122 Tracer::LEVEL4,
1123 "_worker_routine::signalled " + name);
|
1124 h.sterling 1.1
1125 //check whether we received the shutdown signal
1126 if (myself->_dieNow)
1127 {
|
1128 marek 1.21 PEG_TRACE_STRING(
1129 TRC_LISTENER,
1130 Tracer::LEVEL4,
1131 "_worker_routine::shutdown received " + name);
|
1132 h.sterling 1.1 break;
1133 }
1134
1135 //create a temporary queue to store failed indications
|
1136 mike 1.14 tmpEventQueue.clear();
|
1137 h.sterling 1.1
1138 //continue processing events until the queue is empty
1139 //make sure to check for the shutdown signal before every iteration
|
1140 marek 1.21 // Note that any time during our processing of events the Listener
1141 // may be enqueueing NEW events for us to process.
1142 // Because we are popping off the front and new events are being
1143 // thrown on the back if events are failing when we start
1144 // But are succeeding by the end of the processing, events may be
1145 // sent out of chronological order.
1146 // However. Once we complete the current queue of events, we will
1147 // always send old events to be retried before sending any
|
1148 h.sterling 1.10 // new events added afterwards.
|
1149 h.sterling 1.1 while (myself->_eventqueue.size())
1150 {
1151 //check for shutdown signal
|
1152 marek 1.21 //this only breaks us out of the queue loop, but we will
1153 //immediately get through the next wait from
1154 //the shutdown signal itself, at which time we break
1155 //out of the main loop
|
1156 h.sterling 1.1 if (myself->_dieNow)
1157 {
|
1158 marek 1.21 PEG_TRACE_STRING(
1159 TRC_LISTENER,
1160 Tracer::LEVEL4,
1161 "Received signal to shutdown,"
1162 " jumping out of queue loop " + name);
|
1163 h.sterling 1.1 break;
1164 }
1165
1166 //pop next indication off the queue
1167 IndicationDispatchEvent* event = 0;
|
1168 marek 1.21 //what exceptions/errors can this throw?
1169 event = myself->_eventqueue.remove_front();
|
1170 h.sterling 1.1
1171 if (!event)
1172 {
1173 //this should never happen
1174 continue;
1175 }
1176
|
1177 marek 1.21 PEG_TRACE_STRING(
1178 TRC_LISTENER,
1179 Tracer::LEVEL4,
1180 "_worker_routine::consumeIndication " + name);
|
1181 h.sterling 1.1
1182 try
1183 {
1184 myself->consumeIndication(event->getContext(),
1185 event->getURL(),
1186 event->getIndicationInstance());
1187
|
1188 marek 1.21 PEG_TRACE_STRING(
1189 TRC_LISTENER,
1190 Tracer::LEVEL4,
1191 "_worker_routine::processed indication successfully. "
1192 + name);
|
1193 h.sterling 1.1
1194 delete event;
1195 continue;
1196
1197 } catch (CIMException & ce)
1198 {
1199 //check for failure
1200 if (ce.getCode() == CIM_ERR_FAILED)
1201 {
|
1202 marek 1.21 PEG_TRACE_STRING(
1203 TRC_LISTENER,
1204 Tracer::LEVEL2,
1205 "_worker_routine::consumeIndication() temporary"
1206 " failure: " + ce.getMessage() + " " + name);
|
1207 h.sterling 1.10
|
1208 marek 1.21 // Here we simply determine if we should increment
1209 // the retry count or not.
1210 // We don't want to count a forced retry from a new
1211 // event to count as a retry.
1212 // We just have to do it for order's sake.
1213 // If the retry Lapse has lapsed on this event,
1214 // then increment the counter.
1215 if (event->getRetries() > 0)
1216 {
1217 Sint64 differenceInMicroseconds =
1218 CIMDateTime::getDifference(
1219 event->getLastAttemptTime(),
1220 CIMDateTime::getCurrentDateTime());
1221
1222 if (differenceInMicroseconds >=
1223 (DEFAULT_RETRY_LAPSE * 1000))
1224 {
|
1225 h.sterling 1.10 event->increaseRetries();
|
1226 marek 1.21 }
1227 }
1228 else
1229 {
|
1230 h.sterling 1.10 event->increaseRetries();
1231 }
|
1232 h.sterling 1.1
1233 //determine if we have hit the max retry count
1234 if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
1235 {
|
1236 marek 1.21 PEG_TRACE_CSTRING(
1237 TRC_LISTENER,
1238 Tracer::LEVEL2,
1239 "Error: the maximum retry count has been "
1240 "exceeded. Removing the event from "
1241 "the queue.");
|
1242 h.sterling 1.1
1243 Logger::put(
|
1244 marek 1.21 Logger::ERROR_LOG,
1245 System::CIMLISTENER,
1246 Logger::SEVERE,
1247 "The following indication did not get "
1248 "processed successfully: $0",
1249 event->getIndicationInstance().getPath().toString());
|
1250 h.sterling 1.1
1251 delete event;
1252 continue;
1253
1254 } else
1255 {
|
1256 marek 1.21 PEG_TRACE_CSTRING(
1257 TRC_LISTENER,
1258 Tracer::LEVEL4,
1259 "_worker_routine::placing failed indication "
1260 "back in queue");
|
1261 mike 1.14 tmpEventQueue.insert_back(event);
|
1262 h.sterling 1.1 }
1263
1264 } else
1265 {
|
1266 marek 1.21 PEG_TRACE_STRING(
1267 TRC_LISTENER,
1268 Tracer::LEVEL2,
1269 "Error: consumeIndication() permanent failure: "
1270 + ce.getMessage());
|
1271 h.sterling 1.1 delete event;
1272 continue;
1273 }
1274
1275 } catch (Exception & ex)
1276 {
|
1277 marek 1.21 PEG_TRACE_STRING(
1278 TRC_LISTENER,
1279 Tracer::LEVEL2,
1280 "Error: consumeIndication() permanent failure: "
1281 + ex.getMessage());
|
1282 h.sterling 1.1 delete event;
1283 continue;
1284
1285 } catch (...)
1286 {
|
1287 marek 1.21 PEG_TRACE_CSTRING(
1288 TRC_LISTENER,
1289 Tracer::LEVEL2,
1290 "Error: consumeIndication() failed: "
1291 "Unknown exception.");
|
1292 h.sterling 1.1 delete event;
1293 continue;
1294 } //end try
1295
1296 } //while eventqueue
1297
|
1298 h.sterling 1.10 // Copy the failed indications back to the main queue
|
1299 marek 1.21 // We now lock the queue while adding the retries on to the queue
1300 // so that new events can't get in in front
1301 // Of those events we are retrying. Retried events happened before
1302 // any new events coming in.
|
1303 h.sterling 1.1 IndicationDispatchEvent* tmpEvent = 0;
|
1304 h.sterling 1.10 myself->_eventqueue.try_lock();
|
1305 h.sterling 1.1 while (tmpEventQueue.size())
1306 {
|
1307 mike 1.14 tmpEvent = tmpEventQueue.remove_front();
1308 myself->_eventqueue.insert_back(tmpEvent);
|
1309 h.sterling 1.10
|
1310 h.sterling 1.1 }
|
1311 h.sterling 1.10 myself->_eventqueue.unlock();
|
1312 h.sterling 1.1
|
1313 kumpf 1.16 } catch (TimeOut&)
|
1314 h.sterling 1.1 {
|
1315 marek 1.21 PEG_TRACE_CSTRING(
1316 TRC_LISTENER,
1317 Tracer::LEVEL4,
1318 "_worker_routine::Time to retry any outstanding indications.");
1319
1320 // signal the queue in the same way we would,
1321 // if we received a new indication
1322 // this allows the thread to fall into the queue processing code
|
1323 h.sterling 1.1 myself->_check_queue->signal();
1324
1325 } //time_wait
1326
1327
1328 } //shutdown
1329
1330 PEG_METHOD_EXIT();
1331 return 0;
1332 }
1333
1334
1335 PEGASUS_NAMESPACE_END
|