1 karl 1.12 //%2006////////////////////////////////////////////////////////////////////////
|
2 h.sterling 1.1 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 h.sterling 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 h.sterling 1.1 #include <Pegasus/Common/Config.h>
35 //#include <cstdlib>
36 //#include <dlfcn.h>
37 #include <Pegasus/Common/System.h>
38 #include <Pegasus/Common/FileSystem.h>
39 #include <Pegasus/Common/Tracer.h>
40 #include <Pegasus/Common/Logger.h>
41 #include <Pegasus/Common/XmlReader.h>
|
42 mike 1.15 #include <Pegasus/Common/ThreadPool.h>
|
43 h.sterling 1.1 #include <Pegasus/Common/XmlParser.h>
44 #include <Pegasus/Common/XmlWriter.h>
|
45 mike 1.14 #include <Pegasus/Common/List.h>
|
46 mike 1.15 #include <Pegasus/Common/Mutex.h>
|
47 h.sterling 1.1
48 #include "ConsumerManager.h"
49
50 PEGASUS_NAMESPACE_BEGIN
51 PEGASUS_USING_STD;
52
|
53 marek 1.21 //ATTN: Can we just use a properties file instead?? If we only have one
54 // property, we may want to just parse it ourselves.
55 // We may need to add more properties, however. Potential per consumer
56 // properties: unloadOk, idleTimout, retryCount, etc
|
57 h.sterling 1.1 static struct OptionRow optionsTable[] =
58 //optionname defaultvalue rqd type domain domainsize clname hlpmsg
59 {
|
60 marek 1.21 {"location", "", false, Option::STRING, 0, 0,
61 "location", "library name for the consumer"},
|
62 h.sterling 1.1 };
63
64 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
65
66 //retry settings
67 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
|
68 h.sterling 1.6 static const Uint32 DEFAULT_RETRY_LAPSE = 300000; //ms = 5 minutes
|
69 h.sterling 1.1
|
70 marek 1.21 //constant for fake property that is added to instances when
71 // serializing to track the full URL
|
72 h.sterling 1.11 static const String URL_PROPERTY = "URLString";
|
73 h.sterling 1.1
74
|
75 marek 1.21 ConsumerManager::ConsumerManager(
76 const String& consumerDir,
77 const String& consumerConfigDir,
78 Boolean enableConsumerUnload,
79 Uint32 idleTimeout) :
80 _consumerDir(consumerDir),
81 _consumerConfigDir(consumerConfigDir),
82 _enableConsumerUnload(enableConsumerUnload),
83 _idleTimeout(idleTimeout),
84 _forceShutdown(true)
|
85 h.sterling 1.1 {
86 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
87
|
88 marek 1.21 PEG_TRACE_STRING(
89 TRC_LISTENER,
90 Tracer::LEVEL4,
91 "Consumer library directory: " + consumerDir);
92 PEG_TRACE_STRING(
93 TRC_LISTENER,
94 Tracer::LEVEL4,
95 "Consumer configuration directory: " + consumerConfigDir);
|
96 h.sterling 1.1
|
97 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
|
98 h.sterling 1.1 "Consumer unload enabled %d: idle timeout %d",
99 enableConsumerUnload,
|
100 marek 1.18 idleTimeout));
|
101 h.sterling 1.1
102
|
103 h.sterling 1.8 //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
|
104 h.sterling 1.6 //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
105 h.sterling 1.1
|
106 kumpf 1.3 struct timeval deallocateWait = {15, 0};
107 _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
|
108 h.sterling 1.1
109 _init();
110
111 PEG_METHOD_EXIT();
112 }
113
114 ConsumerManager::~ConsumerManager()
115 {
116 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
117
118 unloadAllConsumers();
119
|
120 kumpf 1.3 delete _thread_pool;
|
121 h.sterling 1.1
122 ConsumerTable::Iterator i = _consumers.start();
123 for (; i!=0; i++)
124 {
125 DynamicConsumer* consumer = i.value();
126 delete consumer;
127 }
128
|
129 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
|
130 h.sterling 1.1
131 ModuleTable::Iterator j = _modules.start();
132 for (;j!=0;j++)
133 {
134 ConsumerModule* module = j.value();
135 delete module;
136 }
137
|
138 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
|
139 h.sterling 1.1
140 PEG_METHOD_EXIT();
141 }
142
143 void ConsumerManager::_init()
144 {
145 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
146
147 //check if there are any outstanding indications
148 Array<String> files;
149 Uint32 pos;
150 String consumerName;
151
152 if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
153 {
154 for (Uint32 i = 0; i < files.size(); i++)
155 {
156 pos = files[i].find(".dat");
157 if (pos != PEG_NOT_FOUND)
158 {
159 consumerName = files[i].subString(0, pos);
160 h.sterling 1.1
161 try
162 {
|
163 marek 1.21 PEG_TRACE_STRING(
164 TRC_LISTENER,
165 Tracer::LEVEL4,
166 "Attempting to load indication for!" +
167 consumerName + "!");
|
168 h.sterling 1.1 getConsumer(consumerName);
169
170 } catch (...)
171 {
|
172 marek 1.21 PEG_TRACE_STRING(
173 TRC_LISTENER,
174 Tracer::LEVEL4,
175 "Cannot load consumer from file " + files[i]);
|
176 h.sterling 1.1 }
177 }
178 }
179 }
180
181 PEG_METHOD_EXIT();
182 }
183
184 String ConsumerManager::getConsumerDir()
185 {
186 return _consumerDir;
187 }
188
189 String ConsumerManager::getConsumerConfigDir()
190 {
191 return _consumerConfigDir;
192 }
193
194 Boolean ConsumerManager::getEnableConsumerUnload()
195 {
196 return _enableConsumerUnload;
197 h.sterling 1.1 }
198
199 Uint32 ConsumerManager::getIdleTimeout()
200 {
201 return _idleTimeout;
202 }
203
|
204 marek 1.21 /** Retrieves the library name associated with the consumer name.
205 * By default, the library name
206 * is the same as the consumer name. However, you may specify a different
207 * library name in a consumer configuration file. This file must be named
208 * "MyConsumer.txt" and contain the following: location="libraryName"
209 *
210 * The config file is optional and is generally only needed in cases where
211 * there are strict requirements on library naming.
212 *
213 * It is the responsibility of the caller to catch any exceptions
214 * thrown by this method.
215 */
|
216 h.sterling 1.1 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
217 {
218 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
219
220 //default library name is consumer name
221 String libraryName = consumerName;
222
|
223 marek 1.21 // check whether an alternative library name was specified in an optional
224 // consumer config file
225 String configFile = FileSystem::getAbsolutePath(
226 (const char*)_consumerConfigDir.getCString(),
227 String(consumerName + ".conf"));
228 PEG_TRACE_STRING(
229 TRC_LISTENER,
230 Tracer::LEVEL4,
231 "Looking for config file " + configFile);
|
232 h.sterling 1.1
233 if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
234 {
|
235 marek 1.21 PEG_TRACE_STRING(
236 TRC_LISTENER,
237 Tracer::LEVEL4,
238 "Found config file for consumer " + consumerName);
|
239 h.sterling 1.1
240 try
241 {
|
242 marek 1.21 //Bugzilla 3765 - Change this to use a member var
243 // when OptionManager has a reset option
|
244 h.sterling 1.8 OptionManager _optionMgr;
|
245 marek 1.21 //comment the following line out later
246 _optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
247 h.sterling 1.1 _optionMgr.mergeFile(configFile);
248 _optionMgr.checkRequiredOptions();
249
|
250 marek 1.21 if (!_optionMgr.lookupValue("location", libraryName) ||
251 (libraryName == String::EMPTY))
|
252 h.sterling 1.1 {
|
253 marek 1.21 PEG_TRACE_STRING(
254 TRC_LISTENER,
255 Tracer::LEVEL2,
256 "Warning: Using default library name since none was "
257 "specified in " + configFile);
|
258 h.sterling 1.1 libraryName = consumerName;
259 }
260
261 } catch (Exception & ex)
262 {
|
263 marek 1.21 throw Exception(
264 MessageLoaderParms(
265 "DynListener.ConsumerManager.INVALID_CONFIG_FILE",
266 "Error reading $0: $1.",
267 configFile,
268 ex.getMessage()));
|
269 h.sterling 1.1 }
270 } else
271 {
|
272 marek 1.21 PEG_TRACE_STRING(
273 TRC_LISTENER,
274 Tracer::LEVEL4,
275 "No config file exists for " + consumerName);
|
276 h.sterling 1.1 }
277
|
278 marek 1.21 PEG_TRACE_STRING(
279 TRC_LISTENER,
280 Tracer::LEVEL4,
281 "The library name for " + consumerName + " is " + libraryName);
|
282 h.sterling 1.1
283 PEG_METHOD_EXIT();
284 return libraryName;
285 }
286
|
287 marek 1.21 /** Returns the DynamicConsumer for the consumerName. If it already exists,
288 * we return the one in the cache. If it
|
289 h.sterling 1.1 * DNE, we create it and initialize it, and add it to the table.
|
290 marek 1.21 * @throws Exception if we cannot successfully create and
291 * initialize the consumer
|
292 h.sterling 1.1 */
293 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
294 {
295 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
296
297 DynamicConsumer* consumer = 0;
298 CIMIndicationConsumerProvider* consumerRef = 0;
299 Boolean cached = false;
300 Boolean entryExists = false;
301
302 AutoMutex lock(_consumerTableMutex);
303
304 if (_consumers.lookup(consumerName, consumer))
305 {
306 //why isn't this working??
307 entryExists = true;
308
309 if (consumer && consumer->isLoaded())
310 {
|
311 marek 1.21 PEG_TRACE_STRING(
312 TRC_LISTENER,
313 Tracer::LEVEL3,
314 "Consumer exists in the cache and"
315 " is already loaded: " + consumerName);
|
316 h.sterling 1.1 cached = true;
317 }
318 } else
319 {
|
320 marek 1.21 PEG_TRACE_STRING(
321 TRC_LISTENER,
322 Tracer::LEVEL3,
323 "Consumer not found in cache, creating " + consumerName);
|
324 h.sterling 1.1 consumer = new DynamicConsumer(consumerName);
325 //ATTN: The above is a memory leak if _initConsumer throws an exception
326 //need to delete it in that case
327 }
328
329 if (!cached)
330 {
331 _initConsumer(consumerName, consumer);
332
333 if (!entryExists)
334 {
335 _consumers.insert(consumerName, consumer);
336 }
337 }
338
339 consumer->updateIdleTimer();
340
341 PEG_METHOD_EXIT();
342 return consumer;
343 }
344
345 h.sterling 1.1 /** Initializes a DynamicConsumer.
|
346 marek 1.21 * Caller assumes responsibility for mutexing the operation as well as
347 * ensuring the consumer does not already exist.
|
348 h.sterling 1.1 * @throws Exception if the consumer cannot be initialized
349 */
|
350 marek 1.21 void ConsumerManager::_initConsumer(
351 const String& consumerName,
352 DynamicConsumer* consumer)
|
353 h.sterling 1.1 {
354 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
355
356 CIMIndicationConsumerProvider* base = 0;
357 ConsumerModule* module = 0;
358
|
359 marek 1.21 // lookup provider module in cache (if it exists, it returns
360 // the cached module, otherwise it creates and returns a new one)
|
361 h.sterling 1.1 String libraryName = _getConsumerLibraryName(consumerName);
362 module = _lookupModule(libraryName);
363
364 //build library path
|
365 marek 1.21 String libraryPath = FileSystem::getAbsolutePath(
366 (const char*)_consumerDir.getCString(),
367 FileSystem::buildLibraryFileName(libraryName));
368 PEG_TRACE_STRING(
369 TRC_LISTENER,
370 Tracer::LEVEL4,
371 "Loading library: " + libraryPath);
|
372 h.sterling 1.1
373 //load module
374 try
375 {
376 base = module->load(consumerName, libraryPath);
377 consumer->set(module, base);
378
379 } catch (Exception& ex)
380 {
|
381 marek 1.21 PEG_TRACE_STRING(
382 TRC_LISTENER,
383 Tracer::LEVEL2,
384 "Error loading consumer module: " + ex.getMessage());
385
386 throw Exception(
387 MessageLoaderParms(
388 "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
389 "Cannot load module ($0:$1): Unknown exception.",
390 consumerName,
391 libraryName));
|
392 h.sterling 1.1 } catch (...)
393 {
|
394 marek 1.21 throw Exception(
395 MessageLoaderParms(
396 "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
397 "Cannot load module ($0:$1): Unknown exception.",
398 consumerName,
399 libraryName));
|
400 h.sterling 1.1 }
401
|
402 marek 1.21 PEG_TRACE_STRING(
403 TRC_LISTENER,
404 Tracer::LEVEL4,
405 "Successfully loaded consumer module " + libraryName);
|
406 h.sterling 1.1
407 //initialize consumer
408 try
409 {
|
410 marek 1.21 PEG_TRACE_STRING(
411 TRC_LISTENER,
412 Tracer::LEVEL4,
413 "Initializing Consumer " + consumerName);
|
414 h.sterling 1.1
415 consumer->initialize();
416
417 //ATTN: need to change this
418 Semaphore* semaphore = new Semaphore(0); //blocking
419
420 consumer->setShutdownSemaphore(semaphore);
421
422 //start the worker thread
|
423 konrad.r 1.7 if (_thread_pool->allocate_and_awaken(consumer,
|
424 h.sterling 1.1 _worker_routine,
|
425 konrad.r 1.7 semaphore) != PEGASUS_THREAD_OK)
|
426 marek 1.21 {
427 Logger::put(
428 Logger::STANDARD_LOG,
429 System::CIMLISTENER,
430 Logger::TRACE,
431 "Not enough threads for consumer.");
|
432 konrad.r 1.7
|
433 marek 1.21 PEG_TRACE_CSTRING(
434 TRC_LISTENER,
435 Tracer::LEVEL2,
436 "Could not allocate thread for consumer.");
437
438 consumer->setShutdownSemaphore(0);
439 delete semaphore;
440 throw Exception(
441 MessageLoaderParms(
442 "DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
443 "Not enough threads for consumer worker routine."));
|
444 konrad.r 1.7 }
|
445 h.sterling 1.1
|
446 marek 1.21 //wait until the listening thread has started.
447 // Otherwise, there is a miniscule chance that the first event will
448 // be enqueued before the consumer is waiting for it and the first
449 // indication after loading the consumer will be lost
|
450 h.sterling 1.8 consumer->waitForEventThread();
451
|
452 h.sterling 1.1 //load any outstanding requests
|
453 marek 1.21 Array<IndicationDispatchEvent> outstandingIndications =
454 _deserializeOutstandingIndications(consumerName);
|
455 h.sterling 1.1 if (outstandingIndications.size())
456 {
457 //the consumer will signal itself in _loadOustandingIndications
458 consumer->_loadOutstandingIndications(outstandingIndications);
459 }
460
|
461 marek 1.21 PEG_TRACE_STRING(
462 TRC_LISTENER,
463 Tracer::LEVEL4,
464 "Successfully initialized consumer " + consumerName);
|
465 h.sterling 1.1
466 } catch (...)
467 {
468 module->unloadModule();
469 consumer->reset();
|
470 marek 1.21 throw Exception(
471 MessageLoaderParms(
472 "DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
473 "Cannot initialize consumer ($0).",
474 consumerName));
|
475 h.sterling 1.1 }
476
477 PEG_METHOD_EXIT();
478 }
479
480
|
481 marek 1.21 /** Returns the ConsumerModule with the given library name.
482 * If it already exists, we return the one in the cache. If it
|
483 h.sterling 1.1 * DNE, we create it and add it to the table.
|
484 marek 1.21 * @throws Exception if we cannot successfully create and
485 * initialize the consumer
|
486 h.sterling 1.1 */
|
487 marek 1.21 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
|
488 h.sterling 1.1 {
489 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
490
491 AutoMutex lock(_moduleTableMutex);
492
493 ConsumerModule* module = 0;
494
495 //see if consumer module is cached
496 if (_modules.lookup(libraryName, module))
497 {
|
498 marek 1.21 PEG_TRACE_STRING(
499 TRC_LISTENER,
500 Tracer::LEVEL4,
501 "Found Consumer Module" +
502 libraryName + " in Consumer Manager Cache");
|
503 h.sterling 1.1
504 } else
505 {
|
506 marek 1.21 PEG_TRACE_STRING(
507 TRC_LISTENER,
508 Tracer::LEVEL4,
509 "Creating Consumer Provider Module " + libraryName);
|
510 h.sterling 1.1
511 module = new ConsumerModule();
512 _modules.insert(libraryName, module);
513 }
514
515 PEG_METHOD_EXIT();
516 return(module);
517 }
518
519 /** Returns true if there are active consumers
520 */
521 Boolean ConsumerManager::hasActiveConsumers()
522 {
523 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
524
525 AutoMutex lock(_consumerTableMutex);
526 DynamicConsumer* consumer = 0;
527
528 try
529 {
530 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
531 h.sterling 1.1 {
532 consumer = i.value();
533
|
534 marek 1.21 if (consumer &&
535 consumer->isLoaded() &&
536 (consumer->getPendingIndications() > 0))
|
537 h.sterling 1.1 {
|
538 marek 1.21 PEG_TRACE_STRING(
539 TRC_LISTENER,
540 Tracer::LEVEL4,
541 "Found active consumer: " + consumer->_name);
|
542 h.sterling 1.1 PEG_METHOD_EXIT();
543 return true;
544 }
545 }
546 } catch (...)
547 {
548 // Unexpected exception; do not assume that no providers are loaded
|
549 marek 1.21 PEG_TRACE_CSTRING(
550 TRC_LISTENER,
551 Tracer::LEVEL2,
552 "Unexpected Exception in hasActiveConsumers.");
|
553 h.sterling 1.1 PEG_METHOD_EXIT();
554 return true;
555 }
556
557 PEG_METHOD_EXIT();
558 return false;
559 }
560
561 /** Returns true if there are loaded consumers
562 */
563 Boolean ConsumerManager::hasLoadedConsumers()
564 {
565 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
566
567 AutoMutex lock(_consumerTableMutex);
568 DynamicConsumer* consumer = 0;
569
570 try
571 {
572 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
573 {
574 h.sterling 1.1 consumer = i.value();
575
576 if (consumer && consumer->isLoaded())
577 {
|
578 marek 1.21 PEG_TRACE_STRING(
579 TRC_LISTENER,
580 Tracer::LEVEL4,
581 "Found loaded consumer: " + consumer->_name);
|
582 h.sterling 1.1 PEG_METHOD_EXIT();
583 return true;
584 }
585 }
586 } catch (...)
587 {
588 // Unexpected exception; do not assume that no providers are loaded
|
589 marek 1.21 PEG_TRACE_CSTRING(
590 TRC_LISTENER,
591 Tracer::LEVEL2,
592 "Unexpected Exception in hasLoadedConsumers.");
|
593 h.sterling 1.1 PEG_METHOD_EXIT();
594 return true;
595 }
596
597 PEG_METHOD_EXIT();
598 return false;
599 }
600
601
602 /** Shutting down a consumer consists of four major steps:
|
603 marek 1.21 * 1) Send the shutdown signal. This causes the worker routine to break out
604 * of the loop and exit.
605 * 2) Wait for the worker thread to end. This may take a while if it's
606 * processing an indication. This is optional in a shutdown scenario.
607 * If the listener is shutdown with a -f force, the listener
608 * will not wait for the consumer to finish before shutting down.
609 * Note that a normal shutdown only allows the current consumer indication
610 * to finish. All other queued indications are serialized to a log and
|
611 h.sterling 1.1 * are sent when the consumer is reoaded.
612 * 3) Terminate the consumer provider interface.
|
613 marek 1.21 * 4) Decrement the module refcount (the module will automatically unload when
614 * it's refcount == 0)
|
615 h.sterling 1.1 *
|
616 marek 1.21 * In a scenario where more multiple consumers are loaded, the shutdown signal
617 * should be sent to all of the consumers so the threads can finish
618 * simultaneously.
|
619 h.sterling 1.1 *
|
620 marek 1.21 * ATTN: Should the normal shutdown wait for everything in the queue to be
621 * processed? Just new indications to be processed? I am not inclined to this
622 * solution since it could take a LOT of time. By serializing and deserialing
623 * indications between shutdown and startup, I feel like we do not need to
624 * process ALL queued indications on shutdown.
|
625 h.sterling 1.1 */
626
627 /** Unloads all consumers.
628 */
629 void ConsumerManager::unloadAllConsumers()
630 {
631 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
632
633 AutoMutex lock(_consumerTableMutex);
634
635 if (!_consumers.size())
636 {
|
637 marek 1.21 PEG_TRACE_CSTRING(
638 TRC_LISTENER,
639 Tracer::LEVEL4,
640 "There are no consumers to unload.");
|
641 h.sterling 1.1 PEG_METHOD_EXIT();
642 return;
643 }
644
645 if (!_forceShutdown)
646 {
|
647 marek 1.21 // wait until all the consumers have finished processing the events in
648 // their queue
649 // ATTN: Should this have a timeout even though it's a force??
|
650 h.sterling 1.1 while (hasActiveConsumers())
651 {
|
652 mike 1.15 Threads::sleep(500);
|
653 h.sterling 1.1 }
654 }
655
656 Array<DynamicConsumer*> loadedConsumers;
657
658 ConsumerTable::Iterator i = _consumers.start();
659 DynamicConsumer* consumer = 0;
660
661 for (; i!=0; i++)
662 {
663 consumer = i.value();
664 if (consumer && consumer->isLoaded())
665 {
666 loadedConsumers.append(consumer);
667 }
668 }
669
670 if (loadedConsumers.size())
671 {
672 try
673 {
674 h.sterling 1.1 _unloadConsumers(loadedConsumers);
675
|
676 kumpf 1.16 } catch (Exception&)
|
677 h.sterling 1.1 {
|
678 marek 1.21 PEG_TRACE_CSTRING(
679 TRC_LISTENER,
680 Tracer::LEVEL4,
681 "Error unloading consumers.");
|
682 h.sterling 1.1 }
683 } else
684 {
|
685 marek 1.21 PEG_TRACE_CSTRING(
686 TRC_LISTENER,
687 Tracer::LEVEL4,
688 "There are no consumers to unload.");
|
689 h.sterling 1.1 }
690
691 PEG_METHOD_EXIT();
692 }
693
694 /** Unloads idle consumers.
695 */
696 void ConsumerManager::unloadIdleConsumers()
697 {
698 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
699
700 AutoMutex lock(_consumerTableMutex);
701
702 if (!_consumers.size())
703 {
|
704 marek 1.21 PEG_TRACE_CSTRING(
705 TRC_LISTENER,
706 Tracer::LEVEL4,
707 "There are no consumers to unload.");
|
708 h.sterling 1.1 PEG_METHOD_EXIT();
709 return;
710 }
711
712 Array<DynamicConsumer*> loadedConsumers;
713
714 ConsumerTable::Iterator i = _consumers.start();
715 DynamicConsumer* consumer = 0;
716
717 for (; i!=0; i++)
718 {
719 consumer = i.value();
720 if (consumer && consumer->isLoaded() && consumer->isIdle())
721 {
722 loadedConsumers.append(consumer);
723 }
724 }
725
726 if (loadedConsumers.size())
727 {
728 try
729 h.sterling 1.1 {
730 _unloadConsumers(loadedConsumers);
731
|
732 kumpf 1.16 } catch (Exception&)
|
733 h.sterling 1.1 {
|
734 marek 1.21 PEG_TRACE_CSTRING(
735 TRC_LISTENER,
736 Tracer::LEVEL4,
737 "Error unloading consumers.");
|
738 h.sterling 1.1 }
739 } else
740 {
|
741 marek 1.21 PEG_TRACE_CSTRING(
742 TRC_LISTENER,
743 Tracer::LEVEL4,
744 "There are no consumers to unload.");
|
745 h.sterling 1.1 }
746
747 PEG_METHOD_EXIT();
748 }
749
750 /** Unloads a single consumer.
751 */
752 void ConsumerManager::unloadConsumer(const String& consumerName)
753 {
754 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
755
756 AutoMutex lock(_consumerTableMutex);
757
758 DynamicConsumer* consumer = 0;
759
760 //check whether the consumer exists
761 if (!_consumers.lookup(consumerName, consumer))
762 {
|
763 marek 1.21 PEG_TRACE_STRING(
764 TRC_LISTENER,
765 Tracer::LEVEL3,
766 "Error: cannot unload consumer, unknown consumer " + consumerName);
|
767 h.sterling 1.1 return;
768 }
769
770 //check whether the consumer is loaded
771 if (consumer && consumer->isLoaded()) //ATTN: forceShutdown?
772 {
773 //unload the consumer
774 Array<DynamicConsumer*> loadedConsumers;
775 loadedConsumers.append(consumer);
776
777 try
778 {
779 _unloadConsumers(loadedConsumers);
780
|
781 kumpf 1.16 } catch (Exception&)
|
782 h.sterling 1.1 {
|
783 marek 1.21 PEG_TRACE_CSTRING(
784 TRC_LISTENER,
785 Tracer::LEVEL4,
786 "Error unloading consumers.");
|
787 h.sterling 1.1 }
788
789 } else
790 {
|
791 marek 1.21 PEG_TRACE_STRING(
792 TRC_LISTENER,
793 Tracer::LEVEL3,
794 "Error: cannot unload consumer " + consumerName);
|
795 h.sterling 1.1 }
796
797 PEG_METHOD_EXIT();
798 }
799
800 /** Unloads the consumers in the given array.
801 * The consumerTable mutex MUST be locked prior to entering this method.
802 */
|
803 marek 1.21 void ConsumerManager::_unloadConsumers(
804 Array<DynamicConsumer*> consumersToUnload)
|
805 h.sterling 1.1 {
806 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
807
808 //tell consumers to shutdown
809 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
810 {
811 consumersToUnload[i]->sendShutdownSignal();
812 }
813
|
814 marek 1.21 PEG_TRACE_CSTRING(
|
815 kumpf 1.22 TRC_LISTENER,
|
816 marek 1.21 Tracer::LEVEL4,
817 "Sent shutdown signal to all consumers.");
818
819 // wait for all the consumer worker threads to complete
820 // since we can only shutdown after they are all complete,
821 // it does not matter if the first, fifth, or last
822 // consumer takes the longest; the wait time is equal to the time it takes
823 // for the busiest consumer to stop processing its requests.
|
824 h.sterling 1.1 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
825 {
|
826 marek 1.21 PEG_TRACE_STRING(
827 TRC_LISTENER,
828 Tracer::LEVEL3,
829 "Unloading consumer " + consumersToUnload[i]->getName());
|
830 h.sterling 1.1
831 //wait for the consumer worker thread to end
832 try
833 {
|
834 marek 1.21 Semaphore* _shutdownSemaphore =
835 consumersToUnload[i]->getShutdownSemaphore();
|
836 h.sterling 1.1 if (_shutdownSemaphore)
837 {
838 _shutdownSemaphore->time_wait(10000);
839 }
840
841 } catch (TimeOut &)
842 {
|
843 marek 1.21 PEG_TRACE_CSTRING(
844 TRC_LISTENER,
845 Tracer::LEVEL2,
846 "Timed out while attempting to stop consumer thread.");
|
847 h.sterling 1.1 }
848
|
849 marek 1.21 PEG_TRACE_CSTRING(
850 TRC_LISTENER,
851 Tracer::LEVEL2,
852 "Terminating consumer.");
|
853 h.sterling 1.1
854 try
855 {
856 //terminate consumer provider interface
857 consumersToUnload[i]->terminate();
858
859 //unload consumer provider module
860 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
861 consumersToUnload[i]->_module->unloadModule();
862
863 //serialize outstanding indications
|
864 marek 1.21 _serializeOutstandingIndications(
865 consumersToUnload[i]->getName(),
866 consumersToUnload[i]->_retrieveOutstandingIndications());
|
867 h.sterling 1.1
868 //reset the consumer
869 consumersToUnload[i]->reset();
870
|
871 marek 1.21 PEG_TRACE_CSTRING(
872 TRC_LISTENER,
873 Tracer::LEVEL3,
874 "Consumer library successfully unloaded.");
|
875 h.sterling 1.1
876 } catch (Exception& e)
877 {
|
878 marek 1.21 PEG_TRACE_STRING(
879 TRC_LISTENER,
880 Tracer::LEVEL2,
881 "Error unloading consumer: " + e.getMessage());
|
882 h.sterling 1.1 //ATTN: throw exception? log warning?
883 }
884 }
885
886 PEG_METHOD_EXIT();
887 }
888
889 /** Serializes oustanding indications to a <MyConsumer>.dat file
890 */
|
891 marek 1.21 void ConsumerManager::_serializeOutstandingIndications(
892 const String& consumerName,
893 Array<IndicationDispatchEvent> indications)
894 {
895 PEG_METHOD_ENTER(
896 TRC_LISTENER,
897 "ConsumerManager::_serializeOutstandingIndications");
|
898 h.sterling 1.1
899 if (!indications.size())
900 {
901 PEG_METHOD_EXIT();
902 return;
903 }
904
|
905 marek 1.21 String fileName = FileSystem::getAbsolutePath(
906 (const char*)_consumerConfigDir.getCString(),
907 String(consumerName + ".dat"));
908 PEG_TRACE_STRING(
909 TRC_LISTENER,
910 Tracer::LEVEL4,
911 "Consumer dat file: " + fileName);
|
912 h.sterling 1.1
|
913 mike 1.9 Buffer buffer;
|
914 h.sterling 1.1
915 // Open the log file and serialize remaining
916 FILE* fileHandle = 0;
917 fileHandle = fopen((const char*)fileName.getCString(), "w");
918
919 if (!fileHandle)
920 {
|
921 marek 1.21 PEG_TRACE_STRING(
922 TRC_LISTENER,
923 Tracer::LEVEL2,
924 "Unable to open log file for " + consumerName);
|
925 h.sterling 1.1
926 } else
927 {
|
928 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
929 h.sterling 1.1 "Serializing %d outstanding requests for %s",
930 indications.size(),
|
931 marek 1.18 (const char*)consumerName.getCString()));
|
932 h.sterling 1.1
|
933 marek 1.21 // we have to put the array of instances under a valid root element
934 // or the parser complains
|
935 h.sterling 1.1 XmlWriter::append(buffer, "<IRETURNVALUE>\n");
936
|
937 marek 1.21 CIMInstance cimInstance;
|
938 h.sterling 1.1 for (Uint32 i = 0; i < indications.size(); i++)
939 {
|
940 marek 1.21 //set the URL string property on the serializable instance
941 CIMValue cimValue(CIMTYPE_STRING, false);
942 cimValue.set(indications[i].getURL());
943 cimInstance = indications[i].getIndicationInstance();
944 CIMProperty cimProperty(URL_PROPERTY, cimValue);
945 cimInstance.addProperty(cimProperty);
|
946 h.sterling 1.11
947 XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
|
948 marek 1.21 }
|
949 h.sterling 1.1
|
950 kumpf 1.19 XmlWriter::append(buffer, "</IRETURNVALUE>");
|
951 h.sterling 1.1
952 fputs((const char*)buffer.getData(), fileHandle);
953
954 fclose(fileHandle);
955 }
956
957 PEG_METHOD_EXIT();
958 }
959
960 /** Reads outstanding indications from a <MyConsumer>.dat file
961 */
|
962 marek 1.21 Array<IndicationDispatchEvent>
963 ConsumerManager::_deserializeOutstandingIndications(
964 const String& consumerName)
965 {
966 PEG_METHOD_ENTER(
967 TRC_LISTENER,
968 "ConsumerManager::_deserializeOutstandingIndications");
969
970 String fileName = FileSystem::getAbsolutePath(
971 (const char*)_consumerConfigDir.getCString(),
972 String(consumerName + ".dat"));
973 PEG_TRACE_STRING(
974 TRC_LISTENER,
975 Tracer::LEVEL4,
976 "Consumer dat file: " + fileName);
|
977 h.sterling 1.1
978 Array<CIMInstance> cimInstances;
|
979 marek 1.21 Array<String> urlStrings;
980 Array<IndicationDispatchEvent> indications;
|
981 h.sterling 1.1
982 // Open the log file and serialize remaining indications
983 if (FileSystem::exists(fileName) && FileSystem::canRead(fileName))
984 {
|
985 mike 1.9 Buffer text;
|
986 h.sterling 1.1 CIMInstance cimInstance;
|
987 marek 1.21 CIMProperty cimProperty;
988 CIMValue cimValue;
989 String urlString;
|
990 h.sterling 1.1 XmlEntry entry;
991
992 try
993 {
|
994 marek 1.21 //ATTN: Is this safe to use; what about CRLFs?
995 FileSystem::loadFileToMemory(text, fileName);
|
996 h.sterling 1.1
997 //parse the file
998 XmlParser parser((char*)text.getData());
999 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
1000
1001 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
1002 {
|
1003 marek 1.21 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
1004 if (index != PEG_NOT_FOUND)
1005 {
1006 // get the URL string property from the serialized instance
1007 // and remove the property
1008 cimProperty = cimInstance.getProperty(index);
1009 cimValue = cimProperty.getValue();
1010 cimValue.get(urlString);
1011 cimInstance.removeProperty(index);
1012 }
1013 IndicationDispatchEvent* indicationEvent =
1014 new IndicationDispatchEvent(
1015 OperationContext(),
1016 urlString,
1017 cimInstance);
1018
|
1019 h.sterling 1.11 indications.append(*indicationEvent);
|
1020 h.sterling 1.1 }
1021
1022 XmlReader::expectEndTag(parser, "IRETURNVALUE");
1023
|
1024 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
1025 h.sterling 1.1 "Consumer %s has %d outstanding indications",
1026 (const char*)consumerName.getCString(),
|
1027 marek 1.18 indications.size()));
|
1028 h.sterling 1.1
1029 //delete the file
1030 FileSystem::removeFile(fileName);
1031
1032 } catch (Exception& ex)
1033 {
|
1034 marek 1.21 PEG_TRACE_STRING(
1035 TRC_LISTENER,
1036 Tracer::LEVEL3,
1037 "Error parsing dat file: " +
1038 ex.getMessage() + " " + consumerName);
|
1039 h.sterling 1.1
1040 } catch (...)
1041 {
|
1042 marek 1.21 PEG_TRACE_STRING(
1043 TRC_LISTENER,
1044 Tracer::LEVEL2,
1045 "Error parsing dat file: Unknown Exception " + consumerName);
|
1046 h.sterling 1.1 }
1047 }
1048
1049 PEG_METHOD_EXIT();
|
1050 h.sterling 1.11 return indications;
|
1051 h.sterling 1.1 }
1052
1053
1054
1055 /**
|
1056 marek 1.21 * This is the main worker thread of the consumer. By having only one thread
1057 * per consumer, we eliminate a ton of synchronization issues and make it easy
1058 * to prevent the consumer from performing two mutually exclusive operations
1059 * at once. This also prevents one bad consumer from taking the entire
1060 * listener down. That being said, it is up to the programmer to write smart
1061 * consumers, and to ensure that their actions don't deadlock
1062 * the worker thread.
|
1063 h.sterling 1.1 *
|
1064 marek 1.21 * If a consumer receives a lot of traffic, or it's consumeIndication() method
1065 * takes a considerable amount of time to complete, it may make sense to make
1066 * the consumer multi-threaded. The individual consumer can immediately
1067 * spawn off* new threads to handle indications, and return immediately to
1068 * catch the next indication. In this way, a consumer can attain
1069 * extremely high performance.
|
1070 h.sterling 1.1 *
1071 * There are three different events that can signal us:
1072 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
|
1073 marek 1.21 * 2) A shutdown signal (signalled from ConsumerManager, due to a listener
1074 * shutdown or an idle consumer state)
|
1075 h.sterling 1.1 * 3) A retry signal (signalled from this routine itself)
1076 *
|
1077 marek 1.21 * The idea is that all new indications are put on the front of the queue and
1078 * processed first. All of the retry indications are put on the back of the
1079 * queue and are only processed AFTER all new indications are sent.
1080 * Before processing each indication, we check to see whether or not the
1081 * shutdown signal was given.
1082 * If so, we immediately break out of the loop, and another compenent
1083 * serializes the remaining indications to a file.
|
1084 h.sterling 1.1 *
|
1085 marek 1.21 * An indication gets retried
1086 * if the consumer throws a CIM_ERR_FAILED exception.
|
1087 h.sterling 1.1 *
|
1088 marek 1.21 * This function makes sure it waits until the default retry lapse has passed
1089 * to avoid issues with the following scenario:
|
1090 h.sterling 1.5 * 20 new indications come in, 10 of them are successful, 10 are not.
|
1091 marek 1.21 * We were signalled 20 times, so we will pass the time_wait 20 times.
1092 * Perceivably, the process time on each indication could be minimal.
1093 * We could potentially proceed to process the retries after a very small time
1094 * interval since we would never hit the wait for the retry timeout.
|
1095 h.sterling 1.1 *
1096 */
|
1097 marek 1.21 ThreadReturnType PEGASUS_THREAD_CDECL
1098 ConsumerManager::_worker_routine(void *param)
|
1099 h.sterling 1.1 {
1100 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
1101
1102 DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
1103 String name = myself->getName();
|
1104 mike 1.15 List<IndicationDispatchEvent,Mutex> tmpEventQueue;
|
1105 h.sterling 1.1
|
1106 marek 1.21 PEG_TRACE_STRING(
1107 TRC_LISTENER,
1108 Tracer::LEVEL2,
1109 "_worker_routine::entering loop for " + name);
|
1110 h.sterling 1.1
|
1111 h.sterling 1.8 myself->_listeningSemaphore->signal();
1112
|
1113 h.sterling 1.1 while (true)
1114 {
1115 try
1116 {
|
1117 marek 1.21 PEG_TRACE_STRING(
1118 TRC_LISTENER,
1119 Tracer::LEVEL4,
1120 "_worker_routine::waiting " + name);
|
1121 h.sterling 1.1
1122 //wait to be signalled
1123 myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE);
1124
|
1125 marek 1.21 PEG_TRACE_STRING(
1126 TRC_LISTENER,
1127 Tracer::LEVEL4,
1128 "_worker_routine::signalled " + name);
|
1129 h.sterling 1.1
1130 //check whether we received the shutdown signal
1131 if (myself->_dieNow)
1132 {
|
1133 marek 1.21 PEG_TRACE_STRING(
1134 TRC_LISTENER,
1135 Tracer::LEVEL4,
1136 "_worker_routine::shutdown received " + name);
|
1137 h.sterling 1.1 break;
1138 }
1139
1140 //create a temporary queue to store failed indications
|
1141 mike 1.14 tmpEventQueue.clear();
|
1142 h.sterling 1.1
1143 //continue processing events until the queue is empty
1144 //make sure to check for the shutdown signal before every iteration
|
1145 marek 1.21 // Note that any time during our processing of events the Listener
1146 // may be enqueueing NEW events for us to process.
1147 // Because we are popping off the front and new events are being
1148 // thrown on the back if events are failing when we start
1149 // But are succeeding by the end of the processing, events may be
1150 // sent out of chronological order.
1151 // However. Once we complete the current queue of events, we will
1152 // always send old events to be retried before sending any
|
1153 h.sterling 1.10 // new events added afterwards.
|
1154 h.sterling 1.1 while (myself->_eventqueue.size())
1155 {
1156 //check for shutdown signal
|
1157 marek 1.21 //this only breaks us out of the queue loop, but we will
1158 //immediately get through the next wait from
1159 //the shutdown signal itself, at which time we break
1160 //out of the main loop
|
1161 h.sterling 1.1 if (myself->_dieNow)
1162 {
|
1163 marek 1.21 PEG_TRACE_STRING(
1164 TRC_LISTENER,
1165 Tracer::LEVEL4,
1166 "Received signal to shutdown,"
1167 " jumping out of queue loop " + name);
|
1168 h.sterling 1.1 break;
1169 }
1170
1171 //pop next indication off the queue
1172 IndicationDispatchEvent* event = 0;
|
1173 marek 1.21 //what exceptions/errors can this throw?
1174 event = myself->_eventqueue.remove_front();
|
1175 h.sterling 1.1
1176 if (!event)
1177 {
1178 //this should never happen
1179 continue;
1180 }
1181
|
1182 marek 1.21 PEG_TRACE_STRING(
1183 TRC_LISTENER,
1184 Tracer::LEVEL4,
1185 "_worker_routine::consumeIndication " + name);
|
1186 h.sterling 1.1
1187 try
1188 {
1189 myself->consumeIndication(event->getContext(),
1190 event->getURL(),
1191 event->getIndicationInstance());
1192
|
1193 marek 1.21 PEG_TRACE_STRING(
1194 TRC_LISTENER,
1195 Tracer::LEVEL4,
1196 "_worker_routine::processed indication successfully. "
1197 + name);
|
1198 h.sterling 1.1
1199 delete event;
1200 continue;
1201
1202 } catch (CIMException & ce)
1203 {
1204 //check for failure
1205 if (ce.getCode() == CIM_ERR_FAILED)
1206 {
|
1207 marek 1.21 PEG_TRACE_STRING(
1208 TRC_LISTENER,
1209 Tracer::LEVEL2,
1210 "_worker_routine::consumeIndication() temporary"
1211 " failure: " + ce.getMessage() + " " + name);
|
1212 h.sterling 1.10
|
1213 marek 1.21 // Here we simply determine if we should increment
1214 // the retry count or not.
1215 // We don't want to count a forced retry from a new
1216 // event to count as a retry.
1217 // We just have to do it for order's sake.
1218 // If the retry Lapse has lapsed on this event,
1219 // then increment the counter.
1220 if (event->getRetries() > 0)
1221 {
1222 Sint64 differenceInMicroseconds =
1223 CIMDateTime::getDifference(
1224 event->getLastAttemptTime(),
1225 CIMDateTime::getCurrentDateTime());
1226
1227 if (differenceInMicroseconds >=
1228 (DEFAULT_RETRY_LAPSE * 1000))
1229 {
|
1230 h.sterling 1.10 event->increaseRetries();
|
1231 marek 1.21 }
1232 }
1233 else
1234 {
|
1235 h.sterling 1.10 event->increaseRetries();
1236 }
|
1237 h.sterling 1.1
1238 //determine if we have hit the max retry count
1239 if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
1240 {
|
1241 marek 1.21 PEG_TRACE_CSTRING(
1242 TRC_LISTENER,
1243 Tracer::LEVEL2,
1244 "Error: the maximum retry count has been "
1245 "exceeded. Removing the event from "
1246 "the queue.");
|
1247 h.sterling 1.1
1248 Logger::put(
|
1249 marek 1.21 Logger::ERROR_LOG,
1250 System::CIMLISTENER,
1251 Logger::SEVERE,
1252 "The following indication did not get "
1253 "processed successfully: $0",
1254 event->getIndicationInstance().getPath().toString());
|
1255 h.sterling 1.1
1256 delete event;
1257 continue;
1258
1259 } else
1260 {
|
1261 marek 1.21 PEG_TRACE_CSTRING(
1262 TRC_LISTENER,
1263 Tracer::LEVEL4,
1264 "_worker_routine::placing failed indication "
1265 "back in queue");
|
1266 mike 1.14 tmpEventQueue.insert_back(event);
|
1267 h.sterling 1.1 }
1268
1269 } else
1270 {
|
1271 marek 1.21 PEG_TRACE_STRING(
1272 TRC_LISTENER,
1273 Tracer::LEVEL2,
1274 "Error: consumeIndication() permanent failure: "
1275 + ce.getMessage());
|
1276 h.sterling 1.1 delete event;
1277 continue;
1278 }
1279
1280 } catch (Exception & ex)
1281 {
|
1282 marek 1.21 PEG_TRACE_STRING(
1283 TRC_LISTENER,
1284 Tracer::LEVEL2,
1285 "Error: consumeIndication() permanent failure: "
1286 + ex.getMessage());
|
1287 h.sterling 1.1 delete event;
1288 continue;
1289
1290 } catch (...)
1291 {
|
1292 marek 1.21 PEG_TRACE_CSTRING(
1293 TRC_LISTENER,
1294 Tracer::LEVEL2,
1295 "Error: consumeIndication() failed: "
1296 "Unknown exception.");
|
1297 h.sterling 1.1 delete event;
1298 continue;
1299 } //end try
1300
1301 } //while eventqueue
1302
|
1303 h.sterling 1.10 // Copy the failed indications back to the main queue
|
1304 marek 1.21 // We now lock the queue while adding the retries on to the queue
1305 // so that new events can't get in in front
1306 // Of those events we are retrying. Retried events happened before
1307 // any new events coming in.
|
1308 h.sterling 1.1 IndicationDispatchEvent* tmpEvent = 0;
|
1309 h.sterling 1.10 myself->_eventqueue.try_lock();
|
1310 h.sterling 1.1 while (tmpEventQueue.size())
1311 {
|
1312 mike 1.14 tmpEvent = tmpEventQueue.remove_front();
1313 myself->_eventqueue.insert_back(tmpEvent);
|
1314 h.sterling 1.10
|
1315 h.sterling 1.1 }
|
1316 h.sterling 1.10 myself->_eventqueue.unlock();
|
1317 h.sterling 1.1
|
1318 kumpf 1.16 } catch (TimeOut&)
|
1319 h.sterling 1.1 {
|
1320 marek 1.21 PEG_TRACE_CSTRING(
1321 TRC_LISTENER,
1322 Tracer::LEVEL4,
1323 "_worker_routine::Time to retry any outstanding indications.");
1324
1325 // signal the queue in the same way we would,
1326 // if we received a new indication
1327 // this allows the thread to fall into the queue processing code
|
1328 h.sterling 1.1 myself->_check_queue->signal();
1329
1330 } //time_wait
1331
1332
1333 } //shutdown
1334
1335 PEG_METHOD_EXIT();
1336 return 0;
1337 }
1338
1339
1340 PEGASUS_NAMESPACE_END
|