1 martin 1.29 //%LICENSE////////////////////////////////////////////////////////////////
|
2 martin 1.30 //
|
3 martin 1.29 // Licensed to The Open Group (TOG) under one or more contributor license
4 // agreements. Refer to the OpenPegasusNOTICE.txt file distributed with
5 // this work for additional information regarding copyright ownership.
6 // Each contributor licenses this file to you under the OpenPegasus Open
7 // Source License; you may not use this file except in compliance with the
8 // License.
|
9 martin 1.30 //
|
10 martin 1.29 // Permission is hereby granted, free of charge, to any person obtaining a
11 // copy of this software and associated documentation files (the "Software"),
12 // to deal in the Software without restriction, including without limitation
13 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 // and/or sell copies of the Software, and to permit persons to whom the
15 // Software is furnished to do so, subject to the following conditions:
|
16 martin 1.30 //
|
17 martin 1.29 // The above copyright notice and this permission notice shall be included
18 // in all copies or substantial portions of the Software.
|
19 martin 1.30 //
|
20 martin 1.29 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
21 martin 1.30 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
22 martin 1.29 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23 // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
24 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
25 // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
26 // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
27 martin 1.30 //
|
28 martin 1.29 //////////////////////////////////////////////////////////////////////////
|
29 h.sterling 1.1 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include <Pegasus/Common/Config.h>
33 //#include <cstdlib>
34 //#include <dlfcn.h>
35 #include <Pegasus/Common/System.h>
36 #include <Pegasus/Common/FileSystem.h>
37 #include <Pegasus/Common/Tracer.h>
38 #include <Pegasus/Common/Logger.h>
39 #include <Pegasus/Common/XmlReader.h>
|
40 mike 1.15 #include <Pegasus/Common/ThreadPool.h>
|
41 h.sterling 1.1 #include <Pegasus/Common/XmlParser.h>
42 #include <Pegasus/Common/XmlWriter.h>
|
43 mike 1.14 #include <Pegasus/Common/List.h>
|
44 mike 1.15 #include <Pegasus/Common/Mutex.h>
|
45 h.sterling 1.1
46 #include "ConsumerManager.h"
47
48 PEGASUS_NAMESPACE_BEGIN
49 PEGASUS_USING_STD;
50
|
51 kumpf 1.31 //ATTN: Can we just use a properties file instead?? If we only have one
|
52 marek 1.21 // property, we may want to just parse it ourselves.
53 // We may need to add more properties, however. Potential per consumer
54 // properties: unloadOk, idleTimout, retryCount, etc
|
55 h.sterling 1.1 static struct OptionRow optionsTable[] =
56 //optionname defaultvalue rqd type domain domainsize clname hlpmsg
57 {
|
58 marek 1.21 {"location", "", false, Option::STRING, 0, 0,
59 "location", "library name for the consumer"},
|
60 h.sterling 1.1 };
61
62 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
63
64 //retry settings
65 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
|
66 h.sterling 1.6 static const Uint32 DEFAULT_RETRY_LAPSE = 300000; //ms = 5 minutes
|
67 h.sterling 1.1
|
68 kumpf 1.31 //constant for fake property that is added to instances when
|
69 marek 1.21 // serializing to track the full URL
|
70 h.sterling 1.11 static const String URL_PROPERTY = "URLString";
|
71 h.sterling 1.1
72
|
73 marek 1.21 ConsumerManager::ConsumerManager(
74 const String& consumerDir,
75 const String& consumerConfigDir,
76 Boolean enableConsumerUnload,
|
77 kumpf 1.31 Uint32 idleTimeout) :
|
78 marek 1.21 _consumerDir(consumerDir),
79 _consumerConfigDir(consumerConfigDir),
80 _enableConsumerUnload(enableConsumerUnload),
81 _idleTimeout(idleTimeout),
82 _forceShutdown(true)
|
83 h.sterling 1.1 {
84 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
85
|
86 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
87 "Consumer library directory: %s",
88 (const char*)consumerDir.getCString()));
89 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
90 "Consumer configuration directory: %s",
91 (const char*)consumerConfigDir.getCString()));
|
92 h.sterling 1.1
|
93 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
|
94 thilo.boehm 1.28 "Consumer unload enabled %d: idle timeout %d",
95 enableConsumerUnload,idleTimeout));
|
96 h.sterling 1.1
97
|
98 h.sterling 1.8 //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
|
99 h.sterling 1.6 //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
100 h.sterling 1.1
|
101 kumpf 1.3 struct timeval deallocateWait = {15, 0};
102 _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
|
103 h.sterling 1.1
104 _init();
105
106 PEG_METHOD_EXIT();
107 }
108
109 ConsumerManager::~ConsumerManager()
110 {
111 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
112
113 unloadAllConsumers();
114
|
115 kumpf 1.3 delete _thread_pool;
|
116 h.sterling 1.1
117 ConsumerTable::Iterator i = _consumers.start();
118 for (; i!=0; i++)
119 {
120 DynamicConsumer* consumer = i.value();
121 delete consumer;
122 }
123
|
124 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
|
125 h.sterling 1.1
126 ModuleTable::Iterator j = _modules.start();
127 for (;j!=0;j++)
128 {
129 ConsumerModule* module = j.value();
130 delete module;
131 }
132
|
133 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
|
134 h.sterling 1.1
135 PEG_METHOD_EXIT();
136 }
137
138 void ConsumerManager::_init()
139 {
140 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
141
142 //check if there are any outstanding indications
143 Array<String> files;
144 Uint32 pos;
145 String consumerName;
146
147 if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
148 {
149 for (Uint32 i = 0; i < files.size(); i++)
150 {
151 pos = files[i].find(".dat");
152 if (pos != PEG_NOT_FOUND)
153 {
154 consumerName = files[i].subString(0, pos);
155 h.sterling 1.1
156 try
157 {
|
158 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
159 "Attempting to load indication for '%s'",
160 (const char*)consumerName.getCString()));
|
161 h.sterling 1.1 getConsumer(consumerName);
162
163 } catch (...)
164 {
|
165 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
166 "Cannot load consumer from file '%s'",
167 (const char*)files[i].getCString()));
|
168 h.sterling 1.1 }
169 }
170 }
171 }
172
173 PEG_METHOD_EXIT();
174 }
175
176 String ConsumerManager::getConsumerDir()
177 {
178 return _consumerDir;
179 }
180
181 String ConsumerManager::getConsumerConfigDir()
182 {
183 return _consumerConfigDir;
184 }
185
186 Boolean ConsumerManager::getEnableConsumerUnload()
187 {
188 return _enableConsumerUnload;
189 h.sterling 1.1 }
190
191 Uint32 ConsumerManager::getIdleTimeout()
192 {
193 return _idleTimeout;
194 }
195
|
196 marek 1.21 /** Retrieves the library name associated with the consumer name.
197 * By default, the library name
|
198 kumpf 1.31 * is the same as the consumer name. However, you may specify a different
199 * library name in a consumer configuration file. This file must be named
|
200 marek 1.21 * "MyConsumer.txt" and contain the following: location="libraryName"
201 *
|
202 kumpf 1.31 * The config file is optional and is generally only needed in cases where
|
203 marek 1.21 * there are strict requirements on library naming.
204 *
|
205 kumpf 1.31 * It is the responsibility of the caller to catch any exceptions
|
206 marek 1.21 * thrown by this method.
207 */
|
208 h.sterling 1.1 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
209 {
210 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
211
212 //default library name is consumer name
213 String libraryName = consumerName;
214
|
215 marek 1.21 // check whether an alternative library name was specified in an optional
216 // consumer config file
217 String configFile = FileSystem::getAbsolutePath(
218 (const char*)_consumerConfigDir.getCString(),
219 String(consumerName + ".conf"));
|
220 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
221 "Looking for config file %s",(const char*)configFile.getCString()));
|
222 h.sterling 1.1
223 if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
224 {
|
225 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
226 "Found config file for consumer %s",
227 (const char*)consumerName.getCString()));
|
228 h.sterling 1.1
229 try
230 {
|
231 kumpf 1.31 //Bugzilla 3765 - Change this to use a member var
|
232 marek 1.21 // when OptionManager has a reset option
|
233 h.sterling 1.8 OptionManager _optionMgr;
|
234 marek 1.21 //comment the following line out later
235 _optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
236 h.sterling 1.1 _optionMgr.mergeFile(configFile);
237 _optionMgr.checkRequiredOptions();
238
|
239 kumpf 1.31 if (!_optionMgr.lookupValue("location", libraryName) ||
|
240 marek 1.21 (libraryName == String::EMPTY))
|
241 h.sterling 1.1 {
|
242 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
|
243 marek 1.21 "Warning: Using default library name since none was "
|
244 kumpf 1.31 "specified in %s",(const char*)configFile.getCString()));
|
245 h.sterling 1.1 libraryName = consumerName;
246 }
247
248 } catch (Exception & ex)
249 {
|
250 marek 1.21 throw Exception(
251 MessageLoaderParms(
252 "DynListener.ConsumerManager.INVALID_CONFIG_FILE",
253 "Error reading $0: $1.",
254 configFile,
255 ex.getMessage()));
|
256 h.sterling 1.1 }
257 } else
258 {
|
259 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
260 "No config file exists for %s",
261 (const char*)consumerName.getCString()));
|
262 h.sterling 1.1 }
263
|
264 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
265 "The library name for %s is %s",
266 (const char*)consumerName.getCString(),
267 (const char*)libraryName.getCString()));
|
268 h.sterling 1.1
269 PEG_METHOD_EXIT();
270 return libraryName;
271 }
272
|
273 marek 1.21 /** Returns the DynamicConsumer for the consumerName. If it already exists,
274 * we return the one in the cache. If it
|
275 h.sterling 1.1 * DNE, we create it and initialize it, and add it to the table.
|
276 marek 1.21 * @throws Exception if we cannot successfully create and
277 * initialize the consumer
|
278 kumpf 1.31 */
|
279 h.sterling 1.1 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
280 {
281 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
282
283 DynamicConsumer* consumer = 0;
284 CIMIndicationConsumerProvider* consumerRef = 0;
285 Boolean cached = false;
286 Boolean entryExists = false;
287
288 AutoMutex lock(_consumerTableMutex);
289
290 if (_consumers.lookup(consumerName, consumer))
291 {
292 //why isn't this working??
293 entryExists = true;
294
295 if (consumer && consumer->isLoaded())
296 {
|
297 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
298 "Consumer exists in the cache and is already loaded: %s",
299 (const char*)consumerName.getCString()));
|
300 h.sterling 1.1 cached = true;
301 }
302 } else
303 {
|
304 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
305 "Consumer not found in cache, creating %s",
306 (const char*)consumerName.getCString()));
|
307 h.sterling 1.1 consumer = new DynamicConsumer(consumerName);
308 //ATTN: The above is a memory leak if _initConsumer throws an exception
309 //need to delete it in that case
310 }
311
312 if (!cached)
313 {
314 _initConsumer(consumerName, consumer);
315
316 if (!entryExists)
317 {
318 _consumers.insert(consumerName, consumer);
319 }
320 }
321
322 consumer->updateIdleTimer();
323
324 PEG_METHOD_EXIT();
325 return consumer;
326 }
327
328 h.sterling 1.1 /** Initializes a DynamicConsumer.
|
329 kumpf 1.31 * Caller assumes responsibility for mutexing the operation as well as
|
330 marek 1.21 * ensuring the consumer does not already exist.
|
331 h.sterling 1.1 * @throws Exception if the consumer cannot be initialized
332 */
|
333 marek 1.21 void ConsumerManager::_initConsumer(
334 const String& consumerName,
335 DynamicConsumer* consumer)
|
336 h.sterling 1.1 {
337 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
338
339 CIMIndicationConsumerProvider* base = 0;
340 ConsumerModule* module = 0;
341
|
342 kumpf 1.31 // lookup provider module in cache (if it exists, it returns
|
343 marek 1.21 // the cached module, otherwise it creates and returns a new one)
|
344 h.sterling 1.1 String libraryName = _getConsumerLibraryName(consumerName);
345 module = _lookupModule(libraryName);
346
347 //build library path
|
348 marek 1.21 String libraryPath = FileSystem::getAbsolutePath(
349 (const char*)_consumerDir.getCString(),
350 FileSystem::buildLibraryFileName(libraryName));
|
351 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Loading library: %s",
352 (const char*)libraryPath.getCString()));
|
353 h.sterling 1.1
354 //load module
355 try
356 {
357 base = module->load(consumerName, libraryPath);
358 consumer->set(module, base);
359
360 } catch (Exception& ex)
361 {
|
362 kumpf 1.31 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
|
363 thilo.boehm 1.28 "Error loading consumer module: %s",
364 (const char*)ex.getMessage().getCString()));
|
365 marek 1.21
366 throw Exception(
367 MessageLoaderParms(
368 "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
369 "Cannot load module ($0:$1): Unknown exception.",
370 consumerName,
371 libraryName));
|
372 h.sterling 1.1 } catch (...)
373 {
|
374 marek 1.21 throw Exception(
375 MessageLoaderParms(
376 "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
377 "Cannot load module ($0:$1): Unknown exception.",
378 consumerName,
379 libraryName));
|
380 h.sterling 1.1 }
381
|
382 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
383 "Successfully loaded consumer module %s",
384 (const char*)libraryName.getCString()));
|
385 h.sterling 1.1
386 //initialize consumer
387 try
388 {
|
389 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Initializing Consumer %s",
390 (const char*)consumerName.getCString()));
|
391 h.sterling 1.1
392 consumer->initialize();
393
|
394 venkat.puvvada 1.32 Semaphore* semaphore = consumer->getShutdownSemaphore();
|
395 h.sterling 1.1
396 //start the worker thread
|
397 konrad.r 1.7 if (_thread_pool->allocate_and_awaken(consumer,
|
398 h.sterling 1.1 _worker_routine,
|
399 konrad.r 1.7 semaphore) != PEGASUS_THREAD_OK)
|
400 marek 1.21 {
401 PEG_TRACE_CSTRING(
402 TRC_LISTENER,
|
403 marek 1.24 Tracer::LEVEL1,
|
404 marek 1.21 "Could not allocate thread for consumer.");
405
406 throw Exception(
407 MessageLoaderParms(
408 "DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
409 "Not enough threads for consumer worker routine."));
|
410 konrad.r 1.7 }
|
411 h.sterling 1.1
|
412 marek 1.21 //wait until the listening thread has started.
413 // Otherwise, there is a miniscule chance that the first event will
414 // be enqueued before the consumer is waiting for it and the first
415 // indication after loading the consumer will be lost
|
416 h.sterling 1.8 consumer->waitForEventThread();
417
|
418 h.sterling 1.1 //load any outstanding requests
|
419 kumpf 1.31 Array<IndicationDispatchEvent> outstandingIndications =
|
420 marek 1.21 _deserializeOutstandingIndications(consumerName);
|
421 h.sterling 1.1 if (outstandingIndications.size())
422 {
423 //the consumer will signal itself in _loadOustandingIndications
424 consumer->_loadOutstandingIndications(outstandingIndications);
425 }
426
|
427 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
428 "Successfully initialized consumer %s",
429 (const char*)consumerName.getCString()));
|
430 h.sterling 1.1
431 } catch (...)
432 {
433 module->unloadModule();
434 consumer->reset();
|
435 marek 1.21 throw Exception(
436 MessageLoaderParms(
437 "DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
438 "Cannot initialize consumer ($0).",
|
439 kumpf 1.31 consumerName));
|
440 h.sterling 1.1 }
441
|
442 kumpf 1.31 PEG_METHOD_EXIT();
|
443 h.sterling 1.1 }
444
445
|
446 marek 1.21 /** Returns the ConsumerModule with the given library name.
447 * If it already exists, we return the one in the cache. If it
|
448 h.sterling 1.1 * DNE, we create it and add it to the table.
|
449 kumpf 1.31 * @throws Exception if we cannot successfully create and
|
450 marek 1.21 * initialize the consumer
|
451 kumpf 1.31 */
|
452 marek 1.21 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
|
453 h.sterling 1.1 {
454 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
455
456 AutoMutex lock(_moduleTableMutex);
457
458 ConsumerModule* module = 0;
459
460 //see if consumer module is cached
461 if (_modules.lookup(libraryName, module))
462 {
|
463 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
|
464 kumpf 1.31 "Found Consumer Module %s in Consumer Manager Cache",
|
465 thilo.boehm 1.28 (const char*)libraryName.getCString()));
|
466 h.sterling 1.1
467 } else
468 {
|
469 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
470 "Creating Consumer Provider Module %s",
471 (const char*)libraryName.getCString()));
|
472 h.sterling 1.1
|
473 kumpf 1.31 module = new ConsumerModule();
|
474 h.sterling 1.1 _modules.insert(libraryName, module);
475 }
476
477 PEG_METHOD_EXIT();
478 return(module);
479 }
480
481 /** Returns true if there are active consumers
|
482 kumpf 1.31 */
|
483 h.sterling 1.1 Boolean ConsumerManager::hasActiveConsumers()
484 {
485 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
486
487 AutoMutex lock(_consumerTableMutex);
488 DynamicConsumer* consumer = 0;
489
490 try
491 {
492 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
493 {
494 consumer = i.value();
495
|
496 kumpf 1.31 if (consumer &&
497 consumer->isLoaded() &&
|
498 marek 1.21 (consumer->getPendingIndications() > 0))
|
499 h.sterling 1.1 {
|
500 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
501 "Found active consumer: %s" ,
502 (const char*)consumer->_name.getCString()));
|
503 h.sterling 1.1 PEG_METHOD_EXIT();
504 return true;
505 }
506 }
507 } catch (...)
508 {
509 // Unexpected exception; do not assume that no providers are loaded
|
510 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
|
511 marek 1.21 "Unexpected Exception in hasActiveConsumers.");
|
512 h.sterling 1.1 PEG_METHOD_EXIT();
513 return true;
514 }
515
516 PEG_METHOD_EXIT();
517 return false;
518 }
519
520 /** Returns true if there are loaded consumers
|
521 kumpf 1.31 */
|
522 h.sterling 1.1 Boolean ConsumerManager::hasLoadedConsumers()
523 {
524 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
525
526 AutoMutex lock(_consumerTableMutex);
527 DynamicConsumer* consumer = 0;
528
529 try
530 {
531 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
532 {
533 consumer = i.value();
534
535 if (consumer && consumer->isLoaded())
536 {
|
537 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
538 "Found loaded consumer: %s",
539 (const char*)consumer->_name.getCString()));
|
540 h.sterling 1.1 PEG_METHOD_EXIT();
541 return true;
542 }
543 }
544 } catch (...)
545 {
546 // Unexpected exception; do not assume that no providers are loaded
|
547 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
|
548 marek 1.21 "Unexpected Exception in hasLoadedConsumers.");
|
549 h.sterling 1.1 PEG_METHOD_EXIT();
550 return true;
551 }
552
553 PEG_METHOD_EXIT();
554 return false;
555 }
556
557
558 /** Shutting down a consumer consists of four major steps:
|
559 kumpf 1.31 * 1) Send the shutdown signal. This causes the worker routine to break out
|
560 marek 1.21 * of the loop and exit.
561 * 2) Wait for the worker thread to end. This may take a while if it's
562 * processing an indication. This is optional in a shutdown scenario.
563 * If the listener is shutdown with a -f force, the listener
564 * will not wait for the consumer to finish before shutting down.
565 * Note that a normal shutdown only allows the current consumer indication
566 * to finish. All other queued indications are serialized to a log and
|
567 h.sterling 1.1 * are sent when the consumer is reoaded.
568 * 3) Terminate the consumer provider interface.
|
569 marek 1.21 * 4) Decrement the module refcount (the module will automatically unload when
570 * it's refcount == 0)
|
571 kumpf 1.31 *
|
572 marek 1.21 * In a scenario where more multiple consumers are loaded, the shutdown signal
573 * should be sent to all of the consumers so the threads can finish
574 * simultaneously.
|
575 kumpf 1.31 *
|
576 marek 1.21 * ATTN: Should the normal shutdown wait for everything in the queue to be
577 * processed? Just new indications to be processed? I am not inclined to this
578 * solution since it could take a LOT of time. By serializing and deserialing
|
579 kumpf 1.31 * indications between shutdown and startup, I feel like we do not need to
|
580 marek 1.21 * process ALL queued indications on shutdown.
|
581 kumpf 1.31 */
|
582 h.sterling 1.1
583 /** Unloads all consumers.
|
584 kumpf 1.31 */
|
585 h.sterling 1.1 void ConsumerManager::unloadAllConsumers()
586 {
587 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
588
589 AutoMutex lock(_consumerTableMutex);
590
591 if (!_consumers.size())
592 {
|
593 marek 1.21 PEG_TRACE_CSTRING(
594 TRC_LISTENER,
595 Tracer::LEVEL4,
596 "There are no consumers to unload.");
|
597 h.sterling 1.1 PEG_METHOD_EXIT();
598 return;
599 }
600
601 if (!_forceShutdown)
602 {
|
603 marek 1.21 // wait until all the consumers have finished processing the events in
604 // their queue
605 // ATTN: Should this have a timeout even though it's a force??
|
606 h.sterling 1.1 while (hasActiveConsumers())
607 {
|
608 mike 1.15 Threads::sleep(500);
|
609 h.sterling 1.1 }
610 }
611
612 Array<DynamicConsumer*> loadedConsumers;
613
614 ConsumerTable::Iterator i = _consumers.start();
615 DynamicConsumer* consumer = 0;
616
617 for (; i!=0; i++)
618 {
619 consumer = i.value();
620 if (consumer && consumer->isLoaded())
621 {
622 loadedConsumers.append(consumer);
623 }
624 }
625
626 if (loadedConsumers.size())
627 {
628 try
629 {
630 h.sterling 1.1 _unloadConsumers(loadedConsumers);
631
|
632 kumpf 1.16 } catch (Exception&)
|
633 h.sterling 1.1 {
|
634 marek 1.21 PEG_TRACE_CSTRING(
635 TRC_LISTENER,
|
636 marek 1.25 Tracer::LEVEL2,
|
637 marek 1.21 "Error unloading consumers.");
|
638 h.sterling 1.1 }
639 } else
640 {
|
641 marek 1.21 PEG_TRACE_CSTRING(
642 TRC_LISTENER,
643 Tracer::LEVEL4,
644 "There are no consumers to unload.");
|
645 h.sterling 1.1 }
646
647 PEG_METHOD_EXIT();
648 }
649
650 /** Unloads idle consumers.
|
651 kumpf 1.31 */
|
652 h.sterling 1.1 void ConsumerManager::unloadIdleConsumers()
653 {
654 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
655
656 AutoMutex lock(_consumerTableMutex);
657
658 if (!_consumers.size())
659 {
|
660 marek 1.21 PEG_TRACE_CSTRING(
661 TRC_LISTENER,
662 Tracer::LEVEL4,
663 "There are no consumers to unload.");
|
664 h.sterling 1.1 PEG_METHOD_EXIT();
665 return;
666 }
667
668 Array<DynamicConsumer*> loadedConsumers;
669
670 ConsumerTable::Iterator i = _consumers.start();
671 DynamicConsumer* consumer = 0;
672
673 for (; i!=0; i++)
674 {
675 consumer = i.value();
676 if (consumer && consumer->isLoaded() && consumer->isIdle())
677 {
678 loadedConsumers.append(consumer);
679 }
680 }
681
682 if (loadedConsumers.size())
683 {
684 try
685 h.sterling 1.1 {
686 _unloadConsumers(loadedConsumers);
687
|
688 kumpf 1.16 } catch (Exception&)
|
689 h.sterling 1.1 {
|
690 marek 1.21 PEG_TRACE_CSTRING(
691 TRC_LISTENER,
|
692 marek 1.25 Tracer::LEVEL2,
|
693 marek 1.21 "Error unloading consumers.");
|
694 h.sterling 1.1 }
695 } else
696 {
|
697 marek 1.21 PEG_TRACE_CSTRING(
698 TRC_LISTENER,
699 Tracer::LEVEL4,
700 "There are no consumers to unload.");
|
701 h.sterling 1.1 }
702
703 PEG_METHOD_EXIT();
704 }
705
706 /** Unloads a single consumer.
|
707 kumpf 1.31 */
|
708 h.sterling 1.1 void ConsumerManager::unloadConsumer(const String& consumerName)
709 {
710 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
711
712 AutoMutex lock(_consumerTableMutex);
713
714 DynamicConsumer* consumer = 0;
715
716 //check whether the consumer exists
717 if (!_consumers.lookup(consumerName, consumer))
718 {
|
719 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
720 "Error: cannot unload consumer, unknown consumer %s",
721 (const char*)consumerName.getCString()));
|
722 h.sterling 1.1 return;
723 }
724
725 //check whether the consumer is loaded
726 if (consumer && consumer->isLoaded()) //ATTN: forceShutdown?
727 {
728 //unload the consumer
729 Array<DynamicConsumer*> loadedConsumers;
730 loadedConsumers.append(consumer);
731
732 try
733 {
734 _unloadConsumers(loadedConsumers);
735
|
736 kumpf 1.16 } catch (Exception&)
|
737 h.sterling 1.1 {
|
738 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
|
739 marek 1.21 "Error unloading consumers.");
|
740 h.sterling 1.1 }
741
742 } else
743 {
|
744 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
745 "Error: cannot unload the not loaded consumer %s",
746 (const char*)consumerName.getCString()));
|
747 h.sterling 1.1 }
748
749 PEG_METHOD_EXIT();
750 }
751
752 /** Unloads the consumers in the given array.
753 * The consumerTable mutex MUST be locked prior to entering this method.
|
754 kumpf 1.31 */
|
755 marek 1.21 void ConsumerManager::_unloadConsumers(
756 Array<DynamicConsumer*> consumersToUnload)
|
757 h.sterling 1.1 {
758 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
759
760 //tell consumers to shutdown
761 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
762 {
763 consumersToUnload[i]->sendShutdownSignal();
764 }
765
|
766 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL3,
|
767 marek 1.21 "Sent shutdown signal to all consumers.");
768
769 // wait for all the consumer worker threads to complete
|
770 kumpf 1.31 // since we can only shutdown after they are all complete,
|
771 marek 1.21 // it does not matter if the first, fifth, or last
772 // consumer takes the longest; the wait time is equal to the time it takes
773 // for the busiest consumer to stop processing its requests.
|
774 h.sterling 1.1 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
775 {
|
776 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,"Unloading consumer %s",
777 (const char*)consumersToUnload[i]->getName().getCString()));
|
778 h.sterling 1.1
779 //wait for the consumer worker thread to end
|
780 kumpf 1.31 Semaphore* _shutdownSemaphore =
|
781 kumpf 1.26 consumersToUnload[i]->getShutdownSemaphore();
782 if (_shutdownSemaphore && !_shutdownSemaphore->time_wait(10000))
|
783 h.sterling 1.1 {
|
784 kumpf 1.26 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,
|
785 marek 1.21 "Timed out while attempting to stop consumer thread.");
|
786 h.sterling 1.1 }
787
|
788 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL2,"Terminating consumer.");
|
789 h.sterling 1.1
790 try
791 {
792 //terminate consumer provider interface
793 consumersToUnload[i]->terminate();
794
795 //unload consumer provider module
796 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
797 consumersToUnload[i]->_module->unloadModule();
798
799 //serialize outstanding indications
|
800 marek 1.21 _serializeOutstandingIndications(
801 consumersToUnload[i]->getName(),
802 consumersToUnload[i]->_retrieveOutstandingIndications());
|
803 h.sterling 1.1
804 //reset the consumer
805 consumersToUnload[i]->reset();
806
|
807 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL2,
|
808 marek 1.21 "Consumer library successfully unloaded.");
|
809 h.sterling 1.1
810 } catch (Exception& e)
811 {
|
812 kumpf 1.31 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
|
813 thilo.boehm 1.28 "Error unloading consumer: %s",
|
814 kumpf 1.31 (const char*)e.getMessage().getCString()));
|
815 h.sterling 1.1 //ATTN: throw exception? log warning?
816 }
817 }
818
819 PEG_METHOD_EXIT();
820 }
821
822 /** Serializes oustanding indications to a <MyConsumer>.dat file
823 */
|
824 marek 1.21 void ConsumerManager::_serializeOutstandingIndications(
|
825 kumpf 1.31 const String& consumerName,
|
826 marek 1.21 Array<IndicationDispatchEvent> indications)
827 {
828 PEG_METHOD_ENTER(
829 TRC_LISTENER,
830 "ConsumerManager::_serializeOutstandingIndications");
|
831 h.sterling 1.1
832 if (!indications.size())
833 {
834 PEG_METHOD_EXIT();
835 return;
836 }
837
|
838 marek 1.21 String fileName = FileSystem::getAbsolutePath(
839 (const char*)_consumerConfigDir.getCString(),
840 String(consumerName + ".dat"));
|
841 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Consumer dat file: %s",
842 (const char*)fileName.getCString()));
|
843 h.sterling 1.1
|
844 mike 1.9 Buffer buffer;
|
845 h.sterling 1.1
|
846 kumpf 1.31 // Open the log file and serialize remaining
|
847 h.sterling 1.1 FILE* fileHandle = 0;
|
848 kumpf 1.31 fileHandle = fopen((const char*)fileName.getCString(), "w");
|
849 h.sterling 1.1
850 if (!fileHandle)
851 {
|
852 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
853 "Unable to open log file for %s",
854 (const char*)consumerName.getCString()));
|
855 h.sterling 1.1
856 } else
857 {
|
858 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
859 h.sterling 1.1 "Serializing %d outstanding requests for %s",
860 indications.size(),
|
861 marek 1.18 (const char*)consumerName.getCString()));
|
862 h.sterling 1.1
|
863 marek 1.21 // we have to put the array of instances under a valid root element
|
864 kumpf 1.31 // or the parser complains
|
865 h.sterling 1.1 XmlWriter::append(buffer, "<IRETURNVALUE>\n");
866
|
867 marek 1.21 CIMInstance cimInstance;
|
868 h.sterling 1.1 for (Uint32 i = 0; i < indications.size(); i++)
869 {
|
870 marek 1.21 //set the URL string property on the serializable instance
871 CIMValue cimValue(CIMTYPE_STRING, false);
872 cimValue.set(indications[i].getURL());
873 cimInstance = indications[i].getIndicationInstance();
874 CIMProperty cimProperty(URL_PROPERTY, cimValue);
875 cimInstance.addProperty(cimProperty);
|
876 h.sterling 1.11
877 XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
|
878 marek 1.21 }
|
879 h.sterling 1.1
|
880 kumpf 1.19 XmlWriter::append(buffer, "</IRETURNVALUE>");
|
881 h.sterling 1.1
882 fputs((const char*)buffer.getData(), fileHandle);
883
884 fclose(fileHandle);
885 }
886
887 PEG_METHOD_EXIT();
888 }
889
890 /** Reads outstanding indications from a <MyConsumer>.dat file
|
891 kumpf 1.31 */
892 Array<IndicationDispatchEvent>
|
893 marek 1.21 ConsumerManager::_deserializeOutstandingIndications(
894 const String& consumerName)
895 {
896 PEG_METHOD_ENTER(
897 TRC_LISTENER,
898 "ConsumerManager::_deserializeOutstandingIndications");
899
900 String fileName = FileSystem::getAbsolutePath(
901 (const char*)_consumerConfigDir.getCString(),
902 String(consumerName + ".dat"));
|
903 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
904 "Consumer dat file: %s",(const char*)fileName.getCString()));
|
905 h.sterling 1.1
906 Array<CIMInstance> cimInstances;
|
907 marek 1.21 Array<String> urlStrings;
908 Array<IndicationDispatchEvent> indications;
|
909 h.sterling 1.1
910 // Open the log file and serialize remaining indications
911 if (FileSystem::exists(fileName) && FileSystem::canRead(fileName))
912 {
|
913 mike 1.9 Buffer text;
|
914 h.sterling 1.1 CIMInstance cimInstance;
|
915 marek 1.21 CIMProperty cimProperty;
916 CIMValue cimValue;
917 String urlString;
|
918 h.sterling 1.1 XmlEntry entry;
919
920 try
921 {
|
922 marek 1.21 //ATTN: Is this safe to use; what about CRLFs?
923 FileSystem::loadFileToMemory(text, fileName);
|
924 h.sterling 1.1
925 //parse the file
926 XmlParser parser((char*)text.getData());
927 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
928
929 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
930 {
|
931 marek 1.21 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
932 if (index != PEG_NOT_FOUND)
933 {
|
934 kumpf 1.31 // get the URL string property from the serialized instance
|
935 marek 1.21 // and remove the property
936 cimProperty = cimInstance.getProperty(index);
937 cimValue = cimProperty.getValue();
938 cimValue.get(urlString);
939 cimInstance.removeProperty(index);
940 }
|
941 kumpf 1.31 IndicationDispatchEvent* indicationEvent =
|
942 marek 1.21 new IndicationDispatchEvent(
|
943 kumpf 1.31 OperationContext(),
944 urlString,
|
945 marek 1.21 cimInstance);
946
|
947 h.sterling 1.11 indications.append(*indicationEvent);
|
948 h.sterling 1.1 }
949
950 XmlReader::expectEndTag(parser, "IRETURNVALUE");
951
|
952 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
953 h.sterling 1.1 "Consumer %s has %d outstanding indications",
954 (const char*)consumerName.getCString(),
|
955 marek 1.18 indications.size()));
|
956 h.sterling 1.1
|
957 kumpf 1.31 //delete the file
|
958 h.sterling 1.1 FileSystem::removeFile(fileName);
959
960 } catch (Exception& ex)
961 {
|
962 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
963 "Error parsing dat file for consumer %s: %s",
964 (const char*)consumerName.getCString(),
965 (const char*)ex.getMessage().getCString()));
|
966 h.sterling 1.1
967 } catch (...)
968 {
|
969 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
970 "Error parsing dat file for consumer %s: Unknown Exception",
971 (const char*)consumerName.getCString()));
|
972 h.sterling 1.1 }
973 }
974
975 PEG_METHOD_EXIT();
|
976 h.sterling 1.11 return indications;
|
977 h.sterling 1.1 }
978
979
980
|
981 kumpf 1.31 /**
|
982 marek 1.21 * This is the main worker thread of the consumer. By having only one thread
|
983 kumpf 1.31 * per consumer, we eliminate a ton of synchronization issues and make it easy
|
984 marek 1.21 * to prevent the consumer from performing two mutually exclusive operations
|
985 kumpf 1.31 * at once. This also prevents one bad consumer from taking the entire
|
986 marek 1.21 * listener down. That being said, it is up to the programmer to write smart
|
987 kumpf 1.31 * consumers, and to ensure that their actions don't deadlock
|
988 marek 1.21 * the worker thread.
|
989 kumpf 1.31 *
|
990 marek 1.21 * If a consumer receives a lot of traffic, or it's consumeIndication() method
991 * takes a considerable amount of time to complete, it may make sense to make
992 * the consumer multi-threaded. The individual consumer can immediately
993 * spawn off* new threads to handle indications, and return immediately to
|
994 kumpf 1.31 * catch the next indication. In this way, a consumer can attain
995 * extremely high performance.
996 *
|
997 h.sterling 1.1 * There are three different events that can signal us:
998 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
|
999 marek 1.21 * 2) A shutdown signal (signalled from ConsumerManager, due to a listener
1000 * shutdown or an idle consumer state)
|
1001 h.sterling 1.1 * 3) A retry signal (signalled from this routine itself)
|
1002 kumpf 1.31 *
|
1003 marek 1.21 * The idea is that all new indications are put on the front of the queue and
|
1004 kumpf 1.31 * processed first. All of the retry indications are put on the back of the
|
1005 marek 1.21 * queue and are only processed AFTER all new indications are sent.
1006 * Before processing each indication, we check to see whether or not the
|
1007 kumpf 1.31 * shutdown signal was given.
1008 * If so, we immediately break out of the loop, and another compenent
|
1009 marek 1.21 * serializes the remaining indications to a file.
|
1010 kumpf 1.31 *
1011 * An indication gets retried
|
1012 marek 1.21 * if the consumer throws a CIM_ERR_FAILED exception.
|
1013 kumpf 1.31 *
1014 * This function makes sure it waits until the default retry lapse has passed
|
1015 marek 1.21 * to avoid issues with the following scenario:
|
1016 h.sterling 1.5 * 20 new indications come in, 10 of them are successful, 10 are not.
|
1017 kumpf 1.31 * We were signalled 20 times, so we will pass the time_wait 20 times.
|
1018 marek 1.21 * Perceivably, the process time on each indication could be minimal.
1019 * We could potentially proceed to process the retries after a very small time
1020 * interval since we would never hit the wait for the retry timeout.
|
1021 kumpf 1.31 *
1022 */
1023 ThreadReturnType PEGASUS_THREAD_CDECL
|
1024 marek 1.21 ConsumerManager::_worker_routine(void *param)
|
1025 h.sterling 1.1 {
1026 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
1027
1028 DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
1029 String name = myself->getName();
|
1030 mike 1.15 List<IndicationDispatchEvent,Mutex> tmpEventQueue;
|
1031 h.sterling 1.1
|
1032 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
1033 "_worker_routine::entering loop for %s",
1034 (const char*)name.getCString()));
|
1035 h.sterling 1.1
|
1036 h.sterling 1.8 myself->_listeningSemaphore->signal();
1037
|
1038 h.sterling 1.1 while (true)
1039 {
|
1040 kumpf 1.31 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,"_worker_routine::waiting %s",
|
1041 thilo.boehm 1.28 (const char*)name.getCString()));
1042
|
1043 kumpf 1.26
1044 //wait to be signalled
1045 if (!myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE))
|
1046 h.sterling 1.1 {
|
1047 kumpf 1.26 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4,
1048 "_worker_routine::Time to retry any outstanding indications.");
1049
1050 // signal the queue in the same way we would,
1051 // if we received a new indication
1052 // this allows the thread to fall into the queue processing code
1053 myself->_check_queue->signal();
1054
1055 continue;
1056 }
1057
|
1058 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1059 "_worker_routine::signalled %s",(const char*)name.getCString()));
|
1060 h.sterling 1.1
|
1061 kumpf 1.26 //check whether we received the shutdown signal
1062 if (myself->_dieNow)
1063 {
|
1064 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1065 "_worker_routine::shutdown received %s",
1066 (const char*)name.getCString()));
|
1067 kumpf 1.26 break;
1068 }
|
1069 h.sterling 1.1
|
1070 kumpf 1.26 //create a temporary queue to store failed indications
1071 tmpEventQueue.clear();
|
1072 h.sterling 1.1
|
1073 kumpf 1.26 //continue processing events until the queue is empty
1074 //make sure to check for the shutdown signal before every iteration
1075 // Note that any time during our processing of events the Listener
1076 // may be enqueueing NEW events for us to process.
1077 // Because we are popping off the front and new events are being
1078 // thrown on the back if events are failing when we start
1079 // But are succeeding by the end of the processing, events may be
1080 // sent out of chronological order.
1081 // However. Once we complete the current queue of events, we will
1082 // always send old events to be retried before sending any
1083 // new events added afterwards.
1084 while (myself->_eventqueue.size())
1085 {
1086 //check for shutdown signal
1087 //this only breaks us out of the queue loop, but we will
1088 //immediately get through the next wait from
1089 //the shutdown signal itself, at which time we break
1090 //out of the main loop
|
1091 h.sterling 1.1 if (myself->_dieNow)
1092 {
|
1093 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
1094 "Received signal to shutdown, jumping out of queue loop %s",
1095 (const char*)name.getCString()));
|
1096 h.sterling 1.1 break;
1097 }
1098
|
1099 kumpf 1.26 //pop next indication off the queue
1100 IndicationDispatchEvent* event = 0;
1101 //what exceptions/errors can this throw?
1102 event = myself->_eventqueue.remove_front();
|
1103 h.sterling 1.1
|
1104 kumpf 1.26 if (!event)
|
1105 h.sterling 1.1 {
|
1106 kumpf 1.26 //this should never happen
1107 continue;
1108 }
|
1109 h.sterling 1.1
|
1110 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1111 "_worker_routine::consumeIndication %s",
1112 (const char*)name.getCString()));
|
1113 h.sterling 1.1
|
1114 kumpf 1.26 try
1115 {
1116 myself->consumeIndication(event->getContext(),
1117 event->getURL(),
1118 event->getIndicationInstance());
1119
|
1120 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1121 "_worker_routine::processed indication successfully. %s",
1122 (const char*)name.getCString()));
|
1123 h.sterling 1.1
|
1124 kumpf 1.26 delete event;
1125 continue;
1126 }
1127 catch (CIMException & ce)
1128 {
1129 //check for failure
1130 if (ce.getCode() == CIM_ERR_FAILED)
|
1131 h.sterling 1.1 {
|
1132 kumpf 1.31 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL2,
|
1133 kumpf 1.26 "_worker_routine::consumeIndication() temporary"
|
1134 thilo.boehm 1.28 " failure %s : %s",
1135 (const char*)name.getCString(),
1136 (const char*)ce.getMessage().getCString()));
|
1137 kumpf 1.31
|
1138 kumpf 1.26 // Here we simply determine if we should increment
1139 // the retry count or not.
1140 // We don't want to count a forced retry from a new
|
1141 kumpf 1.31 // event to count as a retry.
|
1142 kumpf 1.26 // We just have to do it for order's sake.
1143 // If the retry Lapse has lapsed on this event,
1144 // then increment the counter.
1145 if (event->getRetries() > 0)
1146 {
|
1147 kumpf 1.31 Sint64 differenceInMicroseconds =
|
1148 kumpf 1.26 CIMDateTime::getDifference(
1149 event->getLastAttemptTime(),
1150 CIMDateTime::getCurrentDateTime());
|
1151 h.sterling 1.1
|
1152 kumpf 1.31 if (differenceInMicroseconds >=
|
1153 kumpf 1.26 (DEFAULT_RETRY_LAPSE * 1000))
|
1154 marek 1.21 {
|
1155 h.sterling 1.10 event->increaseRetries();
1156 }
|
1157 kumpf 1.26 }
1158 else
1159 {
1160 event->increaseRetries();
1161 }
|
1162 h.sterling 1.1
|
1163 kumpf 1.26 //determine if we have hit the max retry count
1164 if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
1165 {
1166 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
1167 "Error: the maximum retry count has been "
1168 "exceeded. Removing the event from "
1169 "the queue.");
1170
1171 Logger::put(
1172 Logger::ERROR_LOG,
1173 System::CIMLISTENER,
1174 Logger::SEVERE,
1175 "The following indication did not get "
1176 "processed successfully: $0",
1177 event->getIndicationInstance().getPath().toString());
|
1178 h.sterling 1.1
1179 delete event;
1180 continue;
1181 }
|
1182 kumpf 1.26 else
1183 {
1184 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4,
1185 "_worker_routine::placing failed indication "
1186 "back in queue");
1187 tmpEventQueue.insert_back(event);
1188 }
1189 }
1190 else
|
1191 h.sterling 1.1 {
|
1192 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
1193 "Error: consumeIndication() permanent failure: %s",
1194 (const char*)ce.getMessage().getCString()));
|
1195 h.sterling 1.1 delete event;
1196 continue;
|
1197 kumpf 1.26 }
1198 }
1199 catch (Exception & ex)
|
1200 h.sterling 1.1 {
|
1201 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
1202 "Error: consumeIndication() permanent failure: %s",
1203 (const char*)ex.getMessage().getCString()));
|
1204 kumpf 1.26 delete event;
1205 continue;
|
1206 h.sterling 1.1 }
|
1207 kumpf 1.26 catch (...)
1208 {
1209 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
1210 "Error: consumeIndication() failed: Unknown exception.");
1211 delete event;
1212 continue;
1213 } //end try
1214
1215 } //while eventqueue
1216
1217 // Copy the failed indications back to the main queue
1218 // We now lock the queue while adding the retries on to the queue
1219 // so that new events can't get in in front
1220 // Of those events we are retrying. Retried events happened before
1221 // any new events coming in.
1222 IndicationDispatchEvent* tmpEvent = 0;
|
1223 kumpf 1.27 if (myself->_eventqueue.try_lock())
|
1224 h.sterling 1.1 {
|
1225 kumpf 1.27 while (tmpEventQueue.size())
1226 {
1227 tmpEvent = tmpEventQueue.remove_front();
1228 myself->_eventqueue.insert_back(tmpEvent);
1229 }
1230
1231 myself->_eventqueue.unlock();
1232 }
1233 else
1234 {
1235 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3,
1236 "Failed to lock _eventqueue");
|
1237 kumpf 1.26 }
|
1238 h.sterling 1.1 } //shutdown
1239
1240 PEG_METHOD_EXIT();
1241 return 0;
1242 }
1243
1244
1245 PEGASUS_NAMESPACE_END
|