1 karl 1.12 //%2006////////////////////////////////////////////////////////////////////////
|
2 h.sterling 1.1 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.12 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 h.sterling 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 h.sterling 1.1 #include <Pegasus/Common/Config.h>
35 //#include <cstdlib>
36 //#include <dlfcn.h>
37 #include <Pegasus/Common/System.h>
38 #include <Pegasus/Common/FileSystem.h>
39 #include <Pegasus/Common/Tracer.h>
40 #include <Pegasus/Common/Logger.h>
41 #include <Pegasus/Common/XmlReader.h>
|
42 mike 1.15 #include <Pegasus/Common/ThreadPool.h>
|
43 h.sterling 1.1 #include <Pegasus/Common/XmlParser.h>
44 #include <Pegasus/Common/XmlWriter.h>
|
45 mike 1.14 #include <Pegasus/Common/List.h>
|
46 mike 1.15 #include <Pegasus/Common/Mutex.h>
|
47 h.sterling 1.1
48 #include "ConsumerManager.h"
49
50 PEGASUS_NAMESPACE_BEGIN
51 PEGASUS_USING_STD;
52
|
53 marek 1.21 //ATTN: Can we just use a properties file instead?? If we only have one
54 // property, we may want to just parse it ourselves.
55 // We may need to add more properties, however. Potential per consumer
56 // properties: unloadOk, idleTimout, retryCount, etc
|
57 h.sterling 1.1 static struct OptionRow optionsTable[] =
58 //optionname defaultvalue rqd type domain domainsize clname hlpmsg
59 {
|
60 marek 1.21 {"location", "", false, Option::STRING, 0, 0,
61 "location", "library name for the consumer"},
|
62 h.sterling 1.1 };
63
64 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
65
66 //retry settings
67 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
|
68 h.sterling 1.6 static const Uint32 DEFAULT_RETRY_LAPSE = 300000; //ms = 5 minutes
|
69 h.sterling 1.1
|
70 marek 1.21 //constant for fake property that is added to instances when
71 // serializing to track the full URL
|
72 h.sterling 1.11 static const String URL_PROPERTY = "URLString";
|
73 h.sterling 1.1
74
|
75 marek 1.21 ConsumerManager::ConsumerManager(
76 const String& consumerDir,
77 const String& consumerConfigDir,
78 Boolean enableConsumerUnload,
79 Uint32 idleTimeout) :
80 _consumerDir(consumerDir),
81 _consumerConfigDir(consumerConfigDir),
82 _enableConsumerUnload(enableConsumerUnload),
83 _idleTimeout(idleTimeout),
84 _forceShutdown(true)
|
85 h.sterling 1.1 {
86 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
87
|
88 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
89 "Consumer library directory: %s",
90 (const char*)consumerDir.getCString()));
91 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
92 "Consumer configuration directory: %s",
93 (const char*)consumerConfigDir.getCString()));
|
94 h.sterling 1.1
|
95 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
|
96 thilo.boehm 1.28 "Consumer unload enabled %d: idle timeout %d",
97 enableConsumerUnload,idleTimeout));
|
98 h.sterling 1.1
99
|
100 h.sterling 1.8 //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
|
101 h.sterling 1.6 //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
102 h.sterling 1.1
|
103 kumpf 1.3 struct timeval deallocateWait = {15, 0};
104 _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
|
105 h.sterling 1.1
106 _init();
107
108 PEG_METHOD_EXIT();
109 }
110
111 ConsumerManager::~ConsumerManager()
112 {
113 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
114
115 unloadAllConsumers();
116
|
117 kumpf 1.3 delete _thread_pool;
|
118 h.sterling 1.1
119 ConsumerTable::Iterator i = _consumers.start();
120 for (; i!=0; i++)
121 {
122 DynamicConsumer* consumer = i.value();
123 delete consumer;
124 }
125
|
126 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
|
127 h.sterling 1.1
128 ModuleTable::Iterator j = _modules.start();
129 for (;j!=0;j++)
130 {
131 ConsumerModule* module = j.value();
132 delete module;
133 }
134
|
135 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
|
136 h.sterling 1.1
137 PEG_METHOD_EXIT();
138 }
139
140 void ConsumerManager::_init()
141 {
142 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
143
144 //check if there are any outstanding indications
145 Array<String> files;
146 Uint32 pos;
147 String consumerName;
148
149 if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
150 {
151 for (Uint32 i = 0; i < files.size(); i++)
152 {
153 pos = files[i].find(".dat");
154 if (pos != PEG_NOT_FOUND)
155 {
156 consumerName = files[i].subString(0, pos);
157 h.sterling 1.1
158 try
159 {
|
160 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
161 "Attempting to load indication for '%s'",
162 (const char*)consumerName.getCString()));
|
163 h.sterling 1.1 getConsumer(consumerName);
164
165 } catch (...)
166 {
|
167 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
168 "Cannot load consumer from file '%s'",
169 (const char*)files[i].getCString()));
|
170 h.sterling 1.1 }
171 }
172 }
173 }
174
175 PEG_METHOD_EXIT();
176 }
177
178 String ConsumerManager::getConsumerDir()
179 {
180 return _consumerDir;
181 }
182
183 String ConsumerManager::getConsumerConfigDir()
184 {
185 return _consumerConfigDir;
186 }
187
188 Boolean ConsumerManager::getEnableConsumerUnload()
189 {
190 return _enableConsumerUnload;
191 h.sterling 1.1 }
192
193 Uint32 ConsumerManager::getIdleTimeout()
194 {
195 return _idleTimeout;
196 }
197
|
198 marek 1.21 /** Retrieves the library name associated with the consumer name.
199 * By default, the library name
200 * is the same as the consumer name. However, you may specify a different
201 * library name in a consumer configuration file. This file must be named
202 * "MyConsumer.txt" and contain the following: location="libraryName"
203 *
204 * The config file is optional and is generally only needed in cases where
205 * there are strict requirements on library naming.
206 *
207 * It is the responsibility of the caller to catch any exceptions
208 * thrown by this method.
209 */
|
210 h.sterling 1.1 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
211 {
212 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
213
214 //default library name is consumer name
215 String libraryName = consumerName;
216
|
217 marek 1.21 // check whether an alternative library name was specified in an optional
218 // consumer config file
219 String configFile = FileSystem::getAbsolutePath(
220 (const char*)_consumerConfigDir.getCString(),
221 String(consumerName + ".conf"));
|
222 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
223 "Looking for config file %s",(const char*)configFile.getCString()));
|
224 h.sterling 1.1
225 if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
226 {
|
227 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
228 "Found config file for consumer %s",
229 (const char*)consumerName.getCString()));
|
230 h.sterling 1.1
231 try
232 {
|
233 marek 1.21 //Bugzilla 3765 - Change this to use a member var
234 // when OptionManager has a reset option
|
235 h.sterling 1.8 OptionManager _optionMgr;
|
236 marek 1.21 //comment the following line out later
237 _optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
238 h.sterling 1.1 _optionMgr.mergeFile(configFile);
239 _optionMgr.checkRequiredOptions();
240
|
241 marek 1.21 if (!_optionMgr.lookupValue("location", libraryName) ||
242 (libraryName == String::EMPTY))
|
243 h.sterling 1.1 {
|
244 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
|
245 marek 1.21 "Warning: Using default library name since none was "
|
246 thilo.boehm 1.28 "specified in %s",(const char*)configFile.getCString()));
|
247 h.sterling 1.1 libraryName = consumerName;
248 }
249
250 } catch (Exception & ex)
251 {
|
252 marek 1.21 throw Exception(
253 MessageLoaderParms(
254 "DynListener.ConsumerManager.INVALID_CONFIG_FILE",
255 "Error reading $0: $1.",
256 configFile,
257 ex.getMessage()));
|
258 h.sterling 1.1 }
259 } else
260 {
|
261 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
262 "No config file exists for %s",
263 (const char*)consumerName.getCString()));
|
264 h.sterling 1.1 }
265
|
266 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
267 "The library name for %s is %s",
268 (const char*)consumerName.getCString(),
269 (const char*)libraryName.getCString()));
|
270 h.sterling 1.1
271 PEG_METHOD_EXIT();
272 return libraryName;
273 }
274
|
275 marek 1.21 /** Returns the DynamicConsumer for the consumerName. If it already exists,
276 * we return the one in the cache. If it
|
277 h.sterling 1.1 * DNE, we create it and initialize it, and add it to the table.
|
278 marek 1.21 * @throws Exception if we cannot successfully create and
279 * initialize the consumer
|
280 h.sterling 1.1 */
281 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
282 {
283 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
284
285 DynamicConsumer* consumer = 0;
286 CIMIndicationConsumerProvider* consumerRef = 0;
287 Boolean cached = false;
288 Boolean entryExists = false;
289
290 AutoMutex lock(_consumerTableMutex);
291
292 if (_consumers.lookup(consumerName, consumer))
293 {
294 //why isn't this working??
295 entryExists = true;
296
297 if (consumer && consumer->isLoaded())
298 {
|
299 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
300 "Consumer exists in the cache and is already loaded: %s",
301 (const char*)consumerName.getCString()));
|
302 h.sterling 1.1 cached = true;
303 }
304 } else
305 {
|
306 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
307 "Consumer not found in cache, creating %s",
308 (const char*)consumerName.getCString()));
|
309 h.sterling 1.1 consumer = new DynamicConsumer(consumerName);
310 //ATTN: The above is a memory leak if _initConsumer throws an exception
311 //need to delete it in that case
312 }
313
314 if (!cached)
315 {
316 _initConsumer(consumerName, consumer);
317
318 if (!entryExists)
319 {
320 _consumers.insert(consumerName, consumer);
321 }
322 }
323
324 consumer->updateIdleTimer();
325
326 PEG_METHOD_EXIT();
327 return consumer;
328 }
329
330 h.sterling 1.1 /** Initializes a DynamicConsumer.
|
331 marek 1.21 * Caller assumes responsibility for mutexing the operation as well as
332 * ensuring the consumer does not already exist.
|
333 h.sterling 1.1 * @throws Exception if the consumer cannot be initialized
334 */
|
335 marek 1.21 void ConsumerManager::_initConsumer(
336 const String& consumerName,
337 DynamicConsumer* consumer)
|
338 h.sterling 1.1 {
339 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
340
341 CIMIndicationConsumerProvider* base = 0;
342 ConsumerModule* module = 0;
343
|
344 marek 1.21 // lookup provider module in cache (if it exists, it returns
345 // the cached module, otherwise it creates and returns a new one)
|
346 h.sterling 1.1 String libraryName = _getConsumerLibraryName(consumerName);
347 module = _lookupModule(libraryName);
348
349 //build library path
|
350 marek 1.21 String libraryPath = FileSystem::getAbsolutePath(
351 (const char*)_consumerDir.getCString(),
352 FileSystem::buildLibraryFileName(libraryName));
|
353 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Loading library: %s",
354 (const char*)libraryPath.getCString()));
|
355 h.sterling 1.1
356 //load module
357 try
358 {
359 base = module->load(consumerName, libraryPath);
360 consumer->set(module, base);
361
362 } catch (Exception& ex)
363 {
|
364 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
365 "Error loading consumer module: %s",
366 (const char*)ex.getMessage().getCString()));
|
367 marek 1.21
368 throw Exception(
369 MessageLoaderParms(
370 "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
371 "Cannot load module ($0:$1): Unknown exception.",
372 consumerName,
373 libraryName));
|
374 h.sterling 1.1 } catch (...)
375 {
|
376 marek 1.21 throw Exception(
377 MessageLoaderParms(
378 "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
379 "Cannot load module ($0:$1): Unknown exception.",
380 consumerName,
381 libraryName));
|
382 h.sterling 1.1 }
383
|
384 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
385 "Successfully loaded consumer module %s",
386 (const char*)libraryName.getCString()));
|
387 h.sterling 1.1
388 //initialize consumer
389 try
390 {
|
391 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Initializing Consumer %s",
392 (const char*)consumerName.getCString()));
|
393 h.sterling 1.1
394 consumer->initialize();
395
396 //ATTN: need to change this
397 Semaphore* semaphore = new Semaphore(0); //blocking
398
399 consumer->setShutdownSemaphore(semaphore);
400
401 //start the worker thread
|
402 konrad.r 1.7 if (_thread_pool->allocate_and_awaken(consumer,
|
403 h.sterling 1.1 _worker_routine,
|
404 konrad.r 1.7 semaphore) != PEGASUS_THREAD_OK)
|
405 marek 1.21 {
406 PEG_TRACE_CSTRING(
407 TRC_LISTENER,
|
408 marek 1.24 Tracer::LEVEL1,
|
409 marek 1.21 "Could not allocate thread for consumer.");
410
411 consumer->setShutdownSemaphore(0);
412 delete semaphore;
413 throw Exception(
414 MessageLoaderParms(
415 "DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
416 "Not enough threads for consumer worker routine."));
|
417 konrad.r 1.7 }
|
418 h.sterling 1.1
|
419 marek 1.21 //wait until the listening thread has started.
420 // Otherwise, there is a miniscule chance that the first event will
421 // be enqueued before the consumer is waiting for it and the first
422 // indication after loading the consumer will be lost
|
423 h.sterling 1.8 consumer->waitForEventThread();
424
|
425 h.sterling 1.1 //load any outstanding requests
|
426 marek 1.21 Array<IndicationDispatchEvent> outstandingIndications =
427 _deserializeOutstandingIndications(consumerName);
|
428 h.sterling 1.1 if (outstandingIndications.size())
429 {
430 //the consumer will signal itself in _loadOustandingIndications
431 consumer->_loadOutstandingIndications(outstandingIndications);
432 }
433
|
434 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
435 "Successfully initialized consumer %s",
436 (const char*)consumerName.getCString()));
|
437 h.sterling 1.1
438 } catch (...)
439 {
440 module->unloadModule();
441 consumer->reset();
|
442 marek 1.21 throw Exception(
443 MessageLoaderParms(
444 "DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
445 "Cannot initialize consumer ($0).",
446 consumerName));
|
447 h.sterling 1.1 }
448
449 PEG_METHOD_EXIT();
450 }
451
452
|
453 marek 1.21 /** Returns the ConsumerModule with the given library name.
454 * If it already exists, we return the one in the cache. If it
|
455 h.sterling 1.1 * DNE, we create it and add it to the table.
|
456 marek 1.21 * @throws Exception if we cannot successfully create and
457 * initialize the consumer
|
458 h.sterling 1.1 */
|
459 marek 1.21 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
|
460 h.sterling 1.1 {
461 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
462
463 AutoMutex lock(_moduleTableMutex);
464
465 ConsumerModule* module = 0;
466
467 //see if consumer module is cached
468 if (_modules.lookup(libraryName, module))
469 {
|
470 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
471 "Found Consumer Module %s in Consumer Manager Cache",
472 (const char*)libraryName.getCString()));
|
473 h.sterling 1.1
474 } else
475 {
|
476 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
477 "Creating Consumer Provider Module %s",
478 (const char*)libraryName.getCString()));
|
479 h.sterling 1.1
480 module = new ConsumerModule();
481 _modules.insert(libraryName, module);
482 }
483
484 PEG_METHOD_EXIT();
485 return(module);
486 }
487
488 /** Returns true if there are active consumers
489 */
490 Boolean ConsumerManager::hasActiveConsumers()
491 {
492 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
493
494 AutoMutex lock(_consumerTableMutex);
495 DynamicConsumer* consumer = 0;
496
497 try
498 {
499 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
500 h.sterling 1.1 {
501 consumer = i.value();
502
|
503 marek 1.21 if (consumer &&
504 consumer->isLoaded() &&
505 (consumer->getPendingIndications() > 0))
|
506 h.sterling 1.1 {
|
507 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
508 "Found active consumer: %s" ,
509 (const char*)consumer->_name.getCString()));
|
510 h.sterling 1.1 PEG_METHOD_EXIT();
511 return true;
512 }
513 }
514 } catch (...)
515 {
516 // Unexpected exception; do not assume that no providers are loaded
|
517 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
|
518 marek 1.21 "Unexpected Exception in hasActiveConsumers.");
|
519 h.sterling 1.1 PEG_METHOD_EXIT();
520 return true;
521 }
522
523 PEG_METHOD_EXIT();
524 return false;
525 }
526
527 /** Returns true if there are loaded consumers
528 */
529 Boolean ConsumerManager::hasLoadedConsumers()
530 {
531 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
532
533 AutoMutex lock(_consumerTableMutex);
534 DynamicConsumer* consumer = 0;
535
536 try
537 {
538 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
539 {
540 h.sterling 1.1 consumer = i.value();
541
542 if (consumer && consumer->isLoaded())
543 {
|
544 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
545 "Found loaded consumer: %s",
546 (const char*)consumer->_name.getCString()));
|
547 h.sterling 1.1 PEG_METHOD_EXIT();
548 return true;
549 }
550 }
551 } catch (...)
552 {
553 // Unexpected exception; do not assume that no providers are loaded
|
554 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
|
555 marek 1.21 "Unexpected Exception in hasLoadedConsumers.");
|
556 h.sterling 1.1 PEG_METHOD_EXIT();
557 return true;
558 }
559
560 PEG_METHOD_EXIT();
561 return false;
562 }
563
564
565 /** Shutting down a consumer consists of four major steps:
|
566 marek 1.21 * 1) Send the shutdown signal. This causes the worker routine to break out
567 * of the loop and exit.
568 * 2) Wait for the worker thread to end. This may take a while if it's
569 * processing an indication. This is optional in a shutdown scenario.
570 * If the listener is shutdown with a -f force, the listener
571 * will not wait for the consumer to finish before shutting down.
572 * Note that a normal shutdown only allows the current consumer indication
573 * to finish. All other queued indications are serialized to a log and
|
574 h.sterling 1.1 * are sent when the consumer is reoaded.
575 * 3) Terminate the consumer provider interface.
|
576 marek 1.21 * 4) Decrement the module refcount (the module will automatically unload when
577 * it's refcount == 0)
|
578 h.sterling 1.1 *
|
579 marek 1.21 * In a scenario where more multiple consumers are loaded, the shutdown signal
580 * should be sent to all of the consumers so the threads can finish
581 * simultaneously.
|
582 h.sterling 1.1 *
|
583 marek 1.21 * ATTN: Should the normal shutdown wait for everything in the queue to be
584 * processed? Just new indications to be processed? I am not inclined to this
585 * solution since it could take a LOT of time. By serializing and deserialing
586 * indications between shutdown and startup, I feel like we do not need to
587 * process ALL queued indications on shutdown.
|
588 h.sterling 1.1 */
589
590 /** Unloads all consumers.
591 */
592 void ConsumerManager::unloadAllConsumers()
593 {
594 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
595
596 AutoMutex lock(_consumerTableMutex);
597
598 if (!_consumers.size())
599 {
|
600 marek 1.21 PEG_TRACE_CSTRING(
601 TRC_LISTENER,
602 Tracer::LEVEL4,
603 "There are no consumers to unload.");
|
604 h.sterling 1.1 PEG_METHOD_EXIT();
605 return;
606 }
607
608 if (!_forceShutdown)
609 {
|
610 marek 1.21 // wait until all the consumers have finished processing the events in
611 // their queue
612 // ATTN: Should this have a timeout even though it's a force??
|
613 h.sterling 1.1 while (hasActiveConsumers())
614 {
|
615 mike 1.15 Threads::sleep(500);
|
616 h.sterling 1.1 }
617 }
618
619 Array<DynamicConsumer*> loadedConsumers;
620
621 ConsumerTable::Iterator i = _consumers.start();
622 DynamicConsumer* consumer = 0;
623
624 for (; i!=0; i++)
625 {
626 consumer = i.value();
627 if (consumer && consumer->isLoaded())
628 {
629 loadedConsumers.append(consumer);
630 }
631 }
632
633 if (loadedConsumers.size())
634 {
635 try
636 {
637 h.sterling 1.1 _unloadConsumers(loadedConsumers);
638
|
639 kumpf 1.16 } catch (Exception&)
|
640 h.sterling 1.1 {
|
641 marek 1.21 PEG_TRACE_CSTRING(
642 TRC_LISTENER,
|
643 marek 1.25 Tracer::LEVEL2,
|
644 marek 1.21 "Error unloading consumers.");
|
645 h.sterling 1.1 }
646 } else
647 {
|
648 marek 1.21 PEG_TRACE_CSTRING(
649 TRC_LISTENER,
650 Tracer::LEVEL4,
651 "There are no consumers to unload.");
|
652 h.sterling 1.1 }
653
654 PEG_METHOD_EXIT();
655 }
656
657 /** Unloads idle consumers.
658 */
659 void ConsumerManager::unloadIdleConsumers()
660 {
661 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
662
663 AutoMutex lock(_consumerTableMutex);
664
665 if (!_consumers.size())
666 {
|
667 marek 1.21 PEG_TRACE_CSTRING(
668 TRC_LISTENER,
669 Tracer::LEVEL4,
670 "There are no consumers to unload.");
|
671 h.sterling 1.1 PEG_METHOD_EXIT();
672 return;
673 }
674
675 Array<DynamicConsumer*> loadedConsumers;
676
677 ConsumerTable::Iterator i = _consumers.start();
678 DynamicConsumer* consumer = 0;
679
680 for (; i!=0; i++)
681 {
682 consumer = i.value();
683 if (consumer && consumer->isLoaded() && consumer->isIdle())
684 {
685 loadedConsumers.append(consumer);
686 }
687 }
688
689 if (loadedConsumers.size())
690 {
691 try
692 h.sterling 1.1 {
693 _unloadConsumers(loadedConsumers);
694
|
695 kumpf 1.16 } catch (Exception&)
|
696 h.sterling 1.1 {
|
697 marek 1.21 PEG_TRACE_CSTRING(
698 TRC_LISTENER,
|
699 marek 1.25 Tracer::LEVEL2,
|
700 marek 1.21 "Error unloading consumers.");
|
701 h.sterling 1.1 }
702 } else
703 {
|
704 marek 1.21 PEG_TRACE_CSTRING(
705 TRC_LISTENER,
706 Tracer::LEVEL4,
707 "There are no consumers to unload.");
|
708 h.sterling 1.1 }
709
710 PEG_METHOD_EXIT();
711 }
712
713 /** Unloads a single consumer.
714 */
715 void ConsumerManager::unloadConsumer(const String& consumerName)
716 {
717 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
718
719 AutoMutex lock(_consumerTableMutex);
720
721 DynamicConsumer* consumer = 0;
722
723 //check whether the consumer exists
724 if (!_consumers.lookup(consumerName, consumer))
725 {
|
726 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
727 "Error: cannot unload consumer, unknown consumer %s",
728 (const char*)consumerName.getCString()));
|
729 h.sterling 1.1 return;
730 }
731
732 //check whether the consumer is loaded
733 if (consumer && consumer->isLoaded()) //ATTN: forceShutdown?
734 {
735 //unload the consumer
736 Array<DynamicConsumer*> loadedConsumers;
737 loadedConsumers.append(consumer);
738
739 try
740 {
741 _unloadConsumers(loadedConsumers);
742
|
743 kumpf 1.16 } catch (Exception&)
|
744 h.sterling 1.1 {
|
745 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
|
746 marek 1.21 "Error unloading consumers.");
|
747 h.sterling 1.1 }
748
749 } else
750 {
|
751 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
752 "Error: cannot unload the not loaded consumer %s",
753 (const char*)consumerName.getCString()));
|
754 h.sterling 1.1 }
755
756 PEG_METHOD_EXIT();
757 }
758
759 /** Unloads the consumers in the given array.
760 * The consumerTable mutex MUST be locked prior to entering this method.
761 */
|
762 marek 1.21 void ConsumerManager::_unloadConsumers(
763 Array<DynamicConsumer*> consumersToUnload)
|
764 h.sterling 1.1 {
765 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
766
767 //tell consumers to shutdown
768 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
769 {
770 consumersToUnload[i]->sendShutdownSignal();
771 }
772
|
773 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL3,
|
774 marek 1.21 "Sent shutdown signal to all consumers.");
775
776 // wait for all the consumer worker threads to complete
777 // since we can only shutdown after they are all complete,
778 // it does not matter if the first, fifth, or last
779 // consumer takes the longest; the wait time is equal to the time it takes
780 // for the busiest consumer to stop processing its requests.
|
781 h.sterling 1.1 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
782 {
|
783 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,"Unloading consumer %s",
784 (const char*)consumersToUnload[i]->getName().getCString()));
|
785 h.sterling 1.1
786 //wait for the consumer worker thread to end
|
787 kumpf 1.26 Semaphore* _shutdownSemaphore =
788 consumersToUnload[i]->getShutdownSemaphore();
789 if (_shutdownSemaphore && !_shutdownSemaphore->time_wait(10000))
|
790 h.sterling 1.1 {
|
791 kumpf 1.26 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,
|
792 marek 1.21 "Timed out while attempting to stop consumer thread.");
|
793 h.sterling 1.1 }
794
|
795 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL2,"Terminating consumer.");
|
796 h.sterling 1.1
797 try
798 {
799 //terminate consumer provider interface
800 consumersToUnload[i]->terminate();
801
802 //unload consumer provider module
803 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
804 consumersToUnload[i]->_module->unloadModule();
805
806 //serialize outstanding indications
|
807 marek 1.21 _serializeOutstandingIndications(
808 consumersToUnload[i]->getName(),
809 consumersToUnload[i]->_retrieveOutstandingIndications());
|
810 h.sterling 1.1
811 //reset the consumer
812 consumersToUnload[i]->reset();
813
|
814 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL2,
|
815 marek 1.21 "Consumer library successfully unloaded.");
|
816 h.sterling 1.1
817 } catch (Exception& e)
818 {
|
819 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
820 "Error unloading consumer: %s",
821 (const char*)e.getMessage().getCString()));
|
822 h.sterling 1.1 //ATTN: throw exception? log warning?
823 }
824 }
825
826 PEG_METHOD_EXIT();
827 }
828
829 /** Serializes oustanding indications to a <MyConsumer>.dat file
830 */
|
831 marek 1.21 void ConsumerManager::_serializeOutstandingIndications(
832 const String& consumerName,
833 Array<IndicationDispatchEvent> indications)
834 {
835 PEG_METHOD_ENTER(
836 TRC_LISTENER,
837 "ConsumerManager::_serializeOutstandingIndications");
|
838 h.sterling 1.1
839 if (!indications.size())
840 {
841 PEG_METHOD_EXIT();
842 return;
843 }
844
|
845 marek 1.21 String fileName = FileSystem::getAbsolutePath(
846 (const char*)_consumerConfigDir.getCString(),
847 String(consumerName + ".dat"));
|
848 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Consumer dat file: %s",
849 (const char*)fileName.getCString()));
|
850 h.sterling 1.1
|
851 mike 1.9 Buffer buffer;
|
852 h.sterling 1.1
853 // Open the log file and serialize remaining
854 FILE* fileHandle = 0;
855 fileHandle = fopen((const char*)fileName.getCString(), "w");
856
857 if (!fileHandle)
858 {
|
859 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
860 "Unable to open log file for %s",
861 (const char*)consumerName.getCString()));
|
862 h.sterling 1.1
863 } else
864 {
|
865 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
866 h.sterling 1.1 "Serializing %d outstanding requests for %s",
867 indications.size(),
|
868 marek 1.18 (const char*)consumerName.getCString()));
|
869 h.sterling 1.1
|
870 marek 1.21 // we have to put the array of instances under a valid root element
871 // or the parser complains
|
872 h.sterling 1.1 XmlWriter::append(buffer, "<IRETURNVALUE>\n");
873
|
874 marek 1.21 CIMInstance cimInstance;
|
875 h.sterling 1.1 for (Uint32 i = 0; i < indications.size(); i++)
876 {
|
877 marek 1.21 //set the URL string property on the serializable instance
878 CIMValue cimValue(CIMTYPE_STRING, false);
879 cimValue.set(indications[i].getURL());
880 cimInstance = indications[i].getIndicationInstance();
881 CIMProperty cimProperty(URL_PROPERTY, cimValue);
882 cimInstance.addProperty(cimProperty);
|
883 h.sterling 1.11
884 XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
|
885 marek 1.21 }
|
886 h.sterling 1.1
|
887 kumpf 1.19 XmlWriter::append(buffer, "</IRETURNVALUE>");
|
888 h.sterling 1.1
889 fputs((const char*)buffer.getData(), fileHandle);
890
891 fclose(fileHandle);
892 }
893
894 PEG_METHOD_EXIT();
895 }
896
897 /** Reads outstanding indications from a <MyConsumer>.dat file
898 */
|
899 marek 1.21 Array<IndicationDispatchEvent>
900 ConsumerManager::_deserializeOutstandingIndications(
901 const String& consumerName)
902 {
903 PEG_METHOD_ENTER(
904 TRC_LISTENER,
905 "ConsumerManager::_deserializeOutstandingIndications");
906
907 String fileName = FileSystem::getAbsolutePath(
908 (const char*)_consumerConfigDir.getCString(),
909 String(consumerName + ".dat"));
|
910 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
911 "Consumer dat file: %s",(const char*)fileName.getCString()));
|
912 h.sterling 1.1
913 Array<CIMInstance> cimInstances;
|
914 marek 1.21 Array<String> urlStrings;
915 Array<IndicationDispatchEvent> indications;
|
916 h.sterling 1.1
917 // Open the log file and serialize remaining indications
918 if (FileSystem::exists(fileName) && FileSystem::canRead(fileName))
919 {
|
920 mike 1.9 Buffer text;
|
921 h.sterling 1.1 CIMInstance cimInstance;
|
922 marek 1.21 CIMProperty cimProperty;
923 CIMValue cimValue;
924 String urlString;
|
925 h.sterling 1.1 XmlEntry entry;
926
927 try
928 {
|
929 marek 1.21 //ATTN: Is this safe to use; what about CRLFs?
930 FileSystem::loadFileToMemory(text, fileName);
|
931 h.sterling 1.1
932 //parse the file
933 XmlParser parser((char*)text.getData());
934 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
935
936 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
937 {
|
938 marek 1.21 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
939 if (index != PEG_NOT_FOUND)
940 {
941 // get the URL string property from the serialized instance
942 // and remove the property
943 cimProperty = cimInstance.getProperty(index);
944 cimValue = cimProperty.getValue();
945 cimValue.get(urlString);
946 cimInstance.removeProperty(index);
947 }
948 IndicationDispatchEvent* indicationEvent =
949 new IndicationDispatchEvent(
950 OperationContext(),
951 urlString,
952 cimInstance);
953
|
954 h.sterling 1.11 indications.append(*indicationEvent);
|
955 h.sterling 1.1 }
956
957 XmlReader::expectEndTag(parser, "IRETURNVALUE");
958
|
959 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
960 h.sterling 1.1 "Consumer %s has %d outstanding indications",
961 (const char*)consumerName.getCString(),
|
962 marek 1.18 indications.size()));
|
963 h.sterling 1.1
964 //delete the file
965 FileSystem::removeFile(fileName);
966
967 } catch (Exception& ex)
968 {
|
969 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
970 "Error parsing dat file for consumer %s: %s",
971 (const char*)consumerName.getCString(),
972 (const char*)ex.getMessage().getCString()));
|
973 h.sterling 1.1
974 } catch (...)
975 {
|
976 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
977 "Error parsing dat file for consumer %s: Unknown Exception",
978 (const char*)consumerName.getCString()));
|
979 h.sterling 1.1 }
980 }
981
982 PEG_METHOD_EXIT();
|
983 h.sterling 1.11 return indications;
|
984 h.sterling 1.1 }
985
986
987
988 /**
|
989 marek 1.21 * This is the main worker thread of the consumer. By having only one thread
990 * per consumer, we eliminate a ton of synchronization issues and make it easy
991 * to prevent the consumer from performing two mutually exclusive operations
992 * at once. This also prevents one bad consumer from taking the entire
993 * listener down. That being said, it is up to the programmer to write smart
994 * consumers, and to ensure that their actions don't deadlock
995 * the worker thread.
|
996 h.sterling 1.1 *
|
997 marek 1.21 * If a consumer receives a lot of traffic, or it's consumeIndication() method
998 * takes a considerable amount of time to complete, it may make sense to make
999 * the consumer multi-threaded. The individual consumer can immediately
1000 * spawn off* new threads to handle indications, and return immediately to
1001 * catch the next indication. In this way, a consumer can attain
1002 * extremely high performance.
|
1003 h.sterling 1.1 *
1004 * There are three different events that can signal us:
1005 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
|
1006 marek 1.21 * 2) A shutdown signal (signalled from ConsumerManager, due to a listener
1007 * shutdown or an idle consumer state)
|
1008 h.sterling 1.1 * 3) A retry signal (signalled from this routine itself)
1009 *
|
1010 marek 1.21 * The idea is that all new indications are put on the front of the queue and
1011 * processed first. All of the retry indications are put on the back of the
1012 * queue and are only processed AFTER all new indications are sent.
1013 * Before processing each indication, we check to see whether or not the
1014 * shutdown signal was given.
1015 * If so, we immediately break out of the loop, and another compenent
1016 * serializes the remaining indications to a file.
|
1017 h.sterling 1.1 *
|
1018 marek 1.21 * An indication gets retried
1019 * if the consumer throws a CIM_ERR_FAILED exception.
|
1020 h.sterling 1.1 *
|
1021 marek 1.21 * This function makes sure it waits until the default retry lapse has passed
1022 * to avoid issues with the following scenario:
|
1023 h.sterling 1.5 * 20 new indications come in, 10 of them are successful, 10 are not.
|
1024 marek 1.21 * We were signalled 20 times, so we will pass the time_wait 20 times.
1025 * Perceivably, the process time on each indication could be minimal.
1026 * We could potentially proceed to process the retries after a very small time
1027 * interval since we would never hit the wait for the retry timeout.
|
1028 h.sterling 1.1 *
1029 */
|
1030 marek 1.21 ThreadReturnType PEGASUS_THREAD_CDECL
1031 ConsumerManager::_worker_routine(void *param)
|
1032 h.sterling 1.1 {
1033 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
1034
1035 DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
1036 String name = myself->getName();
|
1037 mike 1.15 List<IndicationDispatchEvent,Mutex> tmpEventQueue;
|
1038 h.sterling 1.1
|
1039 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
1040 "_worker_routine::entering loop for %s",
1041 (const char*)name.getCString()));
|
1042 h.sterling 1.1
|
1043 h.sterling 1.8 myself->_listeningSemaphore->signal();
1044
|
1045 h.sterling 1.1 while (true)
1046 {
|
1047 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,"_worker_routine::waiting %s",
1048 (const char*)name.getCString()));
1049
|
1050 kumpf 1.26
1051 //wait to be signalled
1052 if (!myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE))
|
1053 h.sterling 1.1 {
|
1054 kumpf 1.26 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4,
1055 "_worker_routine::Time to retry any outstanding indications.");
1056
1057 // signal the queue in the same way we would,
1058 // if we received a new indication
1059 // this allows the thread to fall into the queue processing code
1060 myself->_check_queue->signal();
1061
1062 continue;
1063 }
1064
|
1065 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1066 "_worker_routine::signalled %s",(const char*)name.getCString()));
|
1067 h.sterling 1.1
|
1068 kumpf 1.26 //check whether we received the shutdown signal
1069 if (myself->_dieNow)
1070 {
|
1071 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1072 "_worker_routine::shutdown received %s",
1073 (const char*)name.getCString()));
|
1074 kumpf 1.26 break;
1075 }
|
1076 h.sterling 1.1
|
1077 kumpf 1.26 //create a temporary queue to store failed indications
1078 tmpEventQueue.clear();
|
1079 h.sterling 1.1
|
1080 kumpf 1.26 //continue processing events until the queue is empty
1081 //make sure to check for the shutdown signal before every iteration
1082 // Note that any time during our processing of events the Listener
1083 // may be enqueueing NEW events for us to process.
1084 // Because we are popping off the front and new events are being
1085 // thrown on the back if events are failing when we start
1086 // But are succeeding by the end of the processing, events may be
1087 // sent out of chronological order.
1088 // However. Once we complete the current queue of events, we will
1089 // always send old events to be retried before sending any
1090 // new events added afterwards.
1091 while (myself->_eventqueue.size())
1092 {
1093 //check for shutdown signal
1094 //this only breaks us out of the queue loop, but we will
1095 //immediately get through the next wait from
1096 //the shutdown signal itself, at which time we break
1097 //out of the main loop
|
1098 h.sterling 1.1 if (myself->_dieNow)
1099 {
|
1100 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
1101 "Received signal to shutdown, jumping out of queue loop %s",
1102 (const char*)name.getCString()));
|
1103 h.sterling 1.1 break;
1104 }
1105
|
1106 kumpf 1.26 //pop next indication off the queue
1107 IndicationDispatchEvent* event = 0;
1108 //what exceptions/errors can this throw?
1109 event = myself->_eventqueue.remove_front();
|
1110 h.sterling 1.1
|
1111 kumpf 1.26 if (!event)
|
1112 h.sterling 1.1 {
|
1113 kumpf 1.26 //this should never happen
1114 continue;
1115 }
|
1116 h.sterling 1.1
|
1117 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1118 "_worker_routine::consumeIndication %s",
1119 (const char*)name.getCString()));
|
1120 h.sterling 1.1
|
1121 kumpf 1.26 try
1122 {
1123 myself->consumeIndication(event->getContext(),
1124 event->getURL(),
1125 event->getIndicationInstance());
1126
|
1127 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1128 "_worker_routine::processed indication successfully. %s",
1129 (const char*)name.getCString()));
|
1130 h.sterling 1.1
|
1131 kumpf 1.26 delete event;
1132 continue;
1133 }
1134 catch (CIMException & ce)
1135 {
1136 //check for failure
1137 if (ce.getCode() == CIM_ERR_FAILED)
|
1138 h.sterling 1.1 {
|
1139 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL2,
|
1140 kumpf 1.26 "_worker_routine::consumeIndication() temporary"
|
1141 thilo.boehm 1.28 " failure %s : %s",
1142 (const char*)name.getCString(),
1143 (const char*)ce.getMessage().getCString()));
|
1144 kumpf 1.26
1145 // Here we simply determine if we should increment
1146 // the retry count or not.
1147 // We don't want to count a forced retry from a new
1148 // event to count as a retry.
1149 // We just have to do it for order's sake.
1150 // If the retry Lapse has lapsed on this event,
1151 // then increment the counter.
1152 if (event->getRetries() > 0)
1153 {
1154 Sint64 differenceInMicroseconds =
1155 CIMDateTime::getDifference(
1156 event->getLastAttemptTime(),
1157 CIMDateTime::getCurrentDateTime());
|
1158 h.sterling 1.1
|
1159 kumpf 1.26 if (differenceInMicroseconds >=
1160 (DEFAULT_RETRY_LAPSE * 1000))
|
1161 marek 1.21 {
|
1162 h.sterling 1.10 event->increaseRetries();
1163 }
|
1164 kumpf 1.26 }
1165 else
1166 {
1167 event->increaseRetries();
1168 }
|
1169 h.sterling 1.1
|
1170 kumpf 1.26 //determine if we have hit the max retry count
1171 if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
1172 {
1173 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
1174 "Error: the maximum retry count has been "
1175 "exceeded. Removing the event from "
1176 "the queue.");
1177
1178 Logger::put(
1179 Logger::ERROR_LOG,
1180 System::CIMLISTENER,
1181 Logger::SEVERE,
1182 "The following indication did not get "
1183 "processed successfully: $0",
1184 event->getIndicationInstance().getPath().toString());
|
1185 h.sterling 1.1
1186 delete event;
1187 continue;
1188 }
|
1189 kumpf 1.26 else
1190 {
1191 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4,
1192 "_worker_routine::placing failed indication "
1193 "back in queue");
1194 tmpEventQueue.insert_back(event);
1195 }
1196 }
1197 else
|
1198 h.sterling 1.1 {
|
1199 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
1200 "Error: consumeIndication() permanent failure: %s",
1201 (const char*)ce.getMessage().getCString()));
|
1202 h.sterling 1.1 delete event;
1203 continue;
|
1204 kumpf 1.26 }
1205 }
1206 catch (Exception & ex)
|
1207 h.sterling 1.1 {
|
1208 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
1209 "Error: consumeIndication() permanent failure: %s",
1210 (const char*)ex.getMessage().getCString()));
|
1211 kumpf 1.26 delete event;
1212 continue;
|
1213 h.sterling 1.1 }
|
1214 kumpf 1.26 catch (...)
1215 {
1216 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
1217 "Error: consumeIndication() failed: Unknown exception.");
1218 delete event;
1219 continue;
1220 } //end try
1221
1222 } //while eventqueue
1223
1224 // Copy the failed indications back to the main queue
1225 // We now lock the queue while adding the retries on to the queue
1226 // so that new events can't get in in front
1227 // Of those events we are retrying. Retried events happened before
1228 // any new events coming in.
1229 IndicationDispatchEvent* tmpEvent = 0;
|
1230 kumpf 1.27 if (myself->_eventqueue.try_lock())
|
1231 h.sterling 1.1 {
|
1232 kumpf 1.27 while (tmpEventQueue.size())
1233 {
1234 tmpEvent = tmpEventQueue.remove_front();
1235 myself->_eventqueue.insert_back(tmpEvent);
1236 }
1237
1238 myself->_eventqueue.unlock();
1239 }
1240 else
1241 {
1242 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3,
1243 "Failed to lock _eventqueue");
|
1244 kumpf 1.26 }
|
1245 h.sterling 1.1 } //shutdown
1246
1247 PEG_METHOD_EXIT();
1248 return 0;
1249 }
1250
1251
1252 PEGASUS_NAMESPACE_END
|