1 martin 1.29 //%LICENSE////////////////////////////////////////////////////////////////
|
2 martin 1.30 //
|
3 martin 1.29 // Licensed to The Open Group (TOG) under one or more contributor license
4 // agreements. Refer to the OpenPegasusNOTICE.txt file distributed with
5 // this work for additional information regarding copyright ownership.
6 // Each contributor licenses this file to you under the OpenPegasus Open
7 // Source License; you may not use this file except in compliance with the
8 // License.
|
9 martin 1.30 //
|
10 martin 1.29 // Permission is hereby granted, free of charge, to any person obtaining a
11 // copy of this software and associated documentation files (the "Software"),
12 // to deal in the Software without restriction, including without limitation
13 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 // and/or sell copies of the Software, and to permit persons to whom the
15 // Software is furnished to do so, subject to the following conditions:
|
16 martin 1.30 //
|
17 martin 1.29 // The above copyright notice and this permission notice shall be included
18 // in all copies or substantial portions of the Software.
|
19 martin 1.30 //
|
20 martin 1.29 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
21 martin 1.30 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
22 martin 1.29 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23 // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
24 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
25 // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
26 // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
27 martin 1.30 //
|
28 martin 1.29 //////////////////////////////////////////////////////////////////////////
|
29 h.sterling 1.1 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include <Pegasus/Common/Config.h>
33 //#include <cstdlib>
34 //#include <dlfcn.h>
35 #include <Pegasus/Common/System.h>
36 #include <Pegasus/Common/FileSystem.h>
37 #include <Pegasus/Common/Tracer.h>
38 #include <Pegasus/Common/Logger.h>
39 #include <Pegasus/Common/XmlReader.h>
|
40 mike 1.15 #include <Pegasus/Common/ThreadPool.h>
|
41 h.sterling 1.1 #include <Pegasus/Common/XmlParser.h>
42 #include <Pegasus/Common/XmlWriter.h>
|
43 mike 1.14 #include <Pegasus/Common/List.h>
|
44 mike 1.15 #include <Pegasus/Common/Mutex.h>
|
45 h.sterling 1.1
46 #include "ConsumerManager.h"
47
48 PEGASUS_NAMESPACE_BEGIN
49 PEGASUS_USING_STD;
50
|
51 kumpf 1.31 //ATTN: Can we just use a properties file instead?? If we only have one
|
52 marek 1.21 // property, we may want to just parse it ourselves.
53 // We may need to add more properties, however. Potential per consumer
54 // properties: unloadOk, idleTimout, retryCount, etc
|
55 h.sterling 1.1 static struct OptionRow optionsTable[] =
56 //optionname defaultvalue rqd type domain domainsize clname hlpmsg
57 {
|
58 marek 1.21 {"location", "", false, Option::STRING, 0, 0,
59 "location", "library name for the consumer"},
|
60 h.sterling 1.1 };
61
62 const Uint32 NUM_OPTIONS = sizeof(optionsTable) / sizeof(optionsTable[0]);
63
64 //retry settings
65 static const Uint32 DEFAULT_MAX_RETRY_COUNT = 5;
|
66 h.sterling 1.6 static const Uint32 DEFAULT_RETRY_LAPSE = 300000; //ms = 5 minutes
|
67 h.sterling 1.1
|
68 kumpf 1.31 //constant for fake property that is added to instances when
|
69 marek 1.21 // serializing to track the full URL
|
70 h.sterling 1.11 static const String URL_PROPERTY = "URLString";
|
71 h.sterling 1.1
72
|
73 marek 1.21 ConsumerManager::ConsumerManager(
74 const String& consumerDir,
75 const String& consumerConfigDir,
76 Boolean enableConsumerUnload,
|
77 kumpf 1.31 Uint32 idleTimeout) :
|
78 marek 1.21 _consumerDir(consumerDir),
79 _consumerConfigDir(consumerConfigDir),
80 _enableConsumerUnload(enableConsumerUnload),
81 _idleTimeout(idleTimeout),
82 _forceShutdown(true)
|
83 h.sterling 1.1 {
84 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::ConsumerManager");
85
|
86 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
87 "Consumer library directory: %s",
88 (const char*)consumerDir.getCString()));
89 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
90 "Consumer configuration directory: %s",
91 (const char*)consumerConfigDir.getCString()));
|
92 h.sterling 1.1
|
93 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
|
94 thilo.boehm 1.28 "Consumer unload enabled %d: idle timeout %d",
95 enableConsumerUnload,idleTimeout));
|
96 h.sterling 1.1
97
|
98 h.sterling 1.8 //ATTN: Bugzilla 3765 - Uncomment when OptionManager has a reset capability
|
99 h.sterling 1.6 //_optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
100 h.sterling 1.1
|
101 kumpf 1.3 struct timeval deallocateWait = {15, 0};
102 _thread_pool = new ThreadPool(0, "ConsumerManager", 0, 0, deallocateWait);
|
103 h.sterling 1.1
104 _init();
105
106 PEG_METHOD_EXIT();
107 }
108
109 ConsumerManager::~ConsumerManager()
110 {
111 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::~ConsumerManager");
112
113 unloadAllConsumers();
114
|
115 kumpf 1.3 delete _thread_pool;
|
116 h.sterling 1.1
117 ConsumerTable::Iterator i = _consumers.start();
118 for (; i!=0; i++)
119 {
120 DynamicConsumer* consumer = i.value();
121 delete consumer;
122 }
123
|
124 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all consumers");
|
125 h.sterling 1.1
126 ModuleTable::Iterator j = _modules.start();
127 for (;j!=0;j++)
128 {
129 ConsumerModule* module = j.value();
130 delete module;
131 }
132
|
133 marek 1.18 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Deleted all modules");
|
134 h.sterling 1.1
135 PEG_METHOD_EXIT();
136 }
137
138 void ConsumerManager::_init()
139 {
140 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_init");
141
142 //check if there are any outstanding indications
143 Array<String> files;
144 Uint32 pos;
145 String consumerName;
146
147 if (FileSystem::getDirectoryContents(_consumerConfigDir, files))
148 {
149 for (Uint32 i = 0; i < files.size(); i++)
150 {
151 pos = files[i].find(".dat");
152 if (pos != PEG_NOT_FOUND)
153 {
154 consumerName = files[i].subString(0, pos);
155 h.sterling 1.1
156 try
157 {
|
158 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
159 "Attempting to load indication for '%s'",
160 (const char*)consumerName.getCString()));
|
161 h.sterling 1.1 getConsumer(consumerName);
162
163 } catch (...)
164 {
|
165 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
166 "Cannot load consumer from file '%s'",
167 (const char*)files[i].getCString()));
|
168 h.sterling 1.1 }
169 }
170 }
171 }
172
173 PEG_METHOD_EXIT();
174 }
175
176 String ConsumerManager::getConsumerDir()
177 {
178 return _consumerDir;
179 }
180
181 String ConsumerManager::getConsumerConfigDir()
182 {
183 return _consumerConfigDir;
184 }
185
186 Boolean ConsumerManager::getEnableConsumerUnload()
187 {
188 return _enableConsumerUnload;
189 h.sterling 1.1 }
190
191 Uint32 ConsumerManager::getIdleTimeout()
192 {
193 return _idleTimeout;
194 }
195
|
196 marek 1.21 /** Retrieves the library name associated with the consumer name.
197 * By default, the library name
|
198 kumpf 1.31 * is the same as the consumer name. However, you may specify a different
199 * library name in a consumer configuration file. This file must be named
|
200 marek 1.21 * "MyConsumer.txt" and contain the following: location="libraryName"
201 *
|
202 kumpf 1.31 * The config file is optional and is generally only needed in cases where
|
203 marek 1.21 * there are strict requirements on library naming.
204 *
|
205 kumpf 1.31 * It is the responsibility of the caller to catch any exceptions
|
206 marek 1.21 * thrown by this method.
207 */
|
208 h.sterling 1.1 String ConsumerManager::_getConsumerLibraryName(const String & consumerName)
209 {
210 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumerLibraryName");
211
212 //default library name is consumer name
213 String libraryName = consumerName;
214
|
215 marek 1.21 // check whether an alternative library name was specified in an optional
216 // consumer config file
217 String configFile = FileSystem::getAbsolutePath(
218 (const char*)_consumerConfigDir.getCString(),
219 String(consumerName + ".conf"));
|
220 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
221 "Looking for config file %s",(const char*)configFile.getCString()));
|
222 h.sterling 1.1
223 if (FileSystem::exists(configFile) && FileSystem::canRead(configFile))
224 {
|
225 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
226 "Found config file for consumer %s",
227 (const char*)consumerName.getCString()));
|
228 h.sterling 1.1
229 try
230 {
|
231 kumpf 1.31 //Bugzilla 3765 - Change this to use a member var
|
232 marek 1.21 // when OptionManager has a reset option
|
233 h.sterling 1.8 OptionManager _optionMgr;
|
234 marek 1.21 //comment the following line out later
235 _optionMgr.registerOptions(optionsTable, NUM_OPTIONS);
|
236 h.sterling 1.1 _optionMgr.mergeFile(configFile);
237 _optionMgr.checkRequiredOptions();
238
|
239 kumpf 1.31 if (!_optionMgr.lookupValue("location", libraryName) ||
|
240 marek 1.21 (libraryName == String::EMPTY))
|
241 h.sterling 1.1 {
|
242 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
|
243 marek 1.21 "Warning: Using default library name since none was "
|
244 kumpf 1.31 "specified in %s",(const char*)configFile.getCString()));
|
245 h.sterling 1.1 libraryName = consumerName;
246 }
247
248 } catch (Exception & ex)
249 {
|
250 marek 1.21 throw Exception(
251 MessageLoaderParms(
252 "DynListener.ConsumerManager.INVALID_CONFIG_FILE",
253 "Error reading $0: $1.",
254 configFile,
255 ex.getMessage()));
|
256 h.sterling 1.1 }
257 } else
258 {
|
259 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
260 "No config file exists for %s",
261 (const char*)consumerName.getCString()));
|
262 h.sterling 1.1 }
263
|
264 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
265 "The library name for %s is %s",
266 (const char*)consumerName.getCString(),
267 (const char*)libraryName.getCString()));
|
268 h.sterling 1.1
269 PEG_METHOD_EXIT();
270 return libraryName;
271 }
272
|
273 marek 1.21 /** Returns the DynamicConsumer for the consumerName. If it already exists,
274 * we return the one in the cache. If it
|
275 h.sterling 1.1 * DNE, we create it and initialize it, and add it to the table.
|
276 marek 1.21 * @throws Exception if we cannot successfully create and
277 * initialize the consumer
|
278 kumpf 1.31 */
|
279 h.sterling 1.1 DynamicConsumer* ConsumerManager::getConsumer(const String& consumerName)
280 {
281 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::getConsumer");
282
283 DynamicConsumer* consumer = 0;
284 Boolean cached = false;
285 Boolean entryExists = false;
286
287 AutoMutex lock(_consumerTableMutex);
288
289 if (_consumers.lookup(consumerName, consumer))
290 {
291 //why isn't this working??
292 entryExists = true;
293
294 if (consumer && consumer->isLoaded())
295 {
|
296 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
297 "Consumer exists in the cache and is already loaded: %s",
298 (const char*)consumerName.getCString()));
|
299 h.sterling 1.1 cached = true;
300 }
301 } else
302 {
|
303 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
304 "Consumer not found in cache, creating %s",
305 (const char*)consumerName.getCString()));
|
306 h.sterling 1.1 consumer = new DynamicConsumer(consumerName);
307 }
308
309 if (!cached)
310 {
|
311 venkat.puvvada 1.33 AutoPtr<DynamicConsumer> destroyer(consumer);
|
312 h.sterling 1.1 _initConsumer(consumerName, consumer);
|
313 venkat.puvvada 1.33 destroyer.release();
|
314 h.sterling 1.1
315 if (!entryExists)
316 {
317 _consumers.insert(consumerName, consumer);
318 }
319 }
320
321 consumer->updateIdleTimer();
322
323 PEG_METHOD_EXIT();
324 return consumer;
325 }
326
327 /** Initializes a DynamicConsumer.
|
328 kumpf 1.31 * Caller assumes responsibility for mutexing the operation as well as
|
329 marek 1.21 * ensuring the consumer does not already exist.
|
330 h.sterling 1.1 * @throws Exception if the consumer cannot be initialized
331 */
|
332 marek 1.21 void ConsumerManager::_initConsumer(
333 const String& consumerName,
334 DynamicConsumer* consumer)
|
335 h.sterling 1.1 {
336 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_initConsumer");
337
338 CIMIndicationConsumerProvider* base = 0;
339 ConsumerModule* module = 0;
340
|
341 kumpf 1.31 // lookup provider module in cache (if it exists, it returns
|
342 marek 1.21 // the cached module, otherwise it creates and returns a new one)
|
343 h.sterling 1.1 String libraryName = _getConsumerLibraryName(consumerName);
344 module = _lookupModule(libraryName);
345
346 //build library path
|
347 marek 1.21 String libraryPath = FileSystem::getAbsolutePath(
348 (const char*)_consumerDir.getCString(),
349 FileSystem::buildLibraryFileName(libraryName));
|
350 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Loading library: %s",
351 (const char*)libraryPath.getCString()));
|
352 h.sterling 1.1
353 //load module
354 try
355 {
356 base = module->load(consumerName, libraryPath);
357 consumer->set(module, base);
358
359 } catch (Exception& ex)
360 {
|
361 kumpf 1.31 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
|
362 thilo.boehm 1.28 "Error loading consumer module: %s",
363 (const char*)ex.getMessage().getCString()));
|
364 marek 1.21
365 throw Exception(
366 MessageLoaderParms(
367 "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
368 "Cannot load module ($0:$1): Unknown exception.",
369 consumerName,
370 libraryName));
|
371 h.sterling 1.1 } catch (...)
372 {
|
373 marek 1.21 throw Exception(
374 MessageLoaderParms(
375 "DynListener.ConsumerManager.CANNOT_LOAD_MODULE",
376 "Cannot load module ($0:$1): Unknown exception.",
377 consumerName,
378 libraryName));
|
379 h.sterling 1.1 }
380
|
381 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
382 "Successfully loaded consumer module %s",
383 (const char*)libraryName.getCString()));
|
384 h.sterling 1.1
385 //initialize consumer
386 try
387 {
|
388 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Initializing Consumer %s",
389 (const char*)consumerName.getCString()));
|
390 h.sterling 1.1
391 consumer->initialize();
392
|
393 venkat.puvvada 1.32 Semaphore* semaphore = consumer->getShutdownSemaphore();
|
394 h.sterling 1.1
395 //start the worker thread
|
396 konrad.r 1.7 if (_thread_pool->allocate_and_awaken(consumer,
|
397 h.sterling 1.1 _worker_routine,
|
398 konrad.r 1.7 semaphore) != PEGASUS_THREAD_OK)
|
399 marek 1.21 {
400 PEG_TRACE_CSTRING(
401 TRC_LISTENER,
|
402 marek 1.24 Tracer::LEVEL1,
|
403 marek 1.21 "Could not allocate thread for consumer.");
404
405 throw Exception(
406 MessageLoaderParms(
407 "DynListener.ConsumerManager.CANNOT_ALLOCATE_THREAD",
408 "Not enough threads for consumer worker routine."));
|
409 konrad.r 1.7 }
|
410 h.sterling 1.1
|
411 marek 1.21 //wait until the listening thread has started.
412 // Otherwise, there is a miniscule chance that the first event will
413 // be enqueued before the consumer is waiting for it and the first
414 // indication after loading the consumer will be lost
|
415 h.sterling 1.8 consumer->waitForEventThread();
416
|
417 h.sterling 1.1 //load any outstanding requests
|
418 kumpf 1.31 Array<IndicationDispatchEvent> outstandingIndications =
|
419 marek 1.21 _deserializeOutstandingIndications(consumerName);
|
420 h.sterling 1.1 if (outstandingIndications.size())
421 {
422 //the consumer will signal itself in _loadOustandingIndications
423 consumer->_loadOutstandingIndications(outstandingIndications);
424 }
425
|
426 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
427 "Successfully initialized consumer %s",
428 (const char*)consumerName.getCString()));
|
429 h.sterling 1.1
430 } catch (...)
431 {
432 module->unloadModule();
433 consumer->reset();
|
434 marek 1.21 throw Exception(
435 MessageLoaderParms(
436 "DynListener.ConsumerManager.CANNOT_INITIALIZE_CONSUMER",
437 "Cannot initialize consumer ($0).",
|
438 kumpf 1.31 consumerName));
|
439 h.sterling 1.1 }
440
|
441 kumpf 1.31 PEG_METHOD_EXIT();
|
442 h.sterling 1.1 }
443
444
|
445 marek 1.21 /** Returns the ConsumerModule with the given library name.
446 * If it already exists, we return the one in the cache. If it
|
447 h.sterling 1.1 * DNE, we create it and add it to the table.
|
448 kumpf 1.31 * @throws Exception if we cannot successfully create and
|
449 marek 1.21 * initialize the consumer
|
450 kumpf 1.31 */
|
451 marek 1.21 ConsumerModule* ConsumerManager::_lookupModule(const String & libraryName)
|
452 h.sterling 1.1 {
453 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_lookupModule");
454
455 AutoMutex lock(_moduleTableMutex);
456
457 ConsumerModule* module = 0;
458
459 //see if consumer module is cached
460 if (_modules.lookup(libraryName, module))
461 {
|
462 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
|
463 kumpf 1.31 "Found Consumer Module %s in Consumer Manager Cache",
|
464 thilo.boehm 1.28 (const char*)libraryName.getCString()));
|
465 h.sterling 1.1
466 } else
467 {
|
468 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
469 "Creating Consumer Provider Module %s",
470 (const char*)libraryName.getCString()));
|
471 h.sterling 1.1
|
472 kumpf 1.31 module = new ConsumerModule();
|
473 h.sterling 1.1 _modules.insert(libraryName, module);
474 }
475
476 PEG_METHOD_EXIT();
477 return(module);
478 }
479
480 /** Returns true if there are active consumers
|
481 kumpf 1.31 */
|
482 h.sterling 1.1 Boolean ConsumerManager::hasActiveConsumers()
483 {
484 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasActiveConsumers");
485
486 AutoMutex lock(_consumerTableMutex);
487 DynamicConsumer* consumer = 0;
488
489 try
490 {
491 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
492 {
493 consumer = i.value();
494
|
495 kumpf 1.31 if (consumer &&
496 consumer->isLoaded() &&
|
497 marek 1.21 (consumer->getPendingIndications() > 0))
|
498 h.sterling 1.1 {
|
499 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
500 "Found active consumer: %s" ,
501 (const char*)consumer->_name.getCString()));
|
502 h.sterling 1.1 PEG_METHOD_EXIT();
503 return true;
504 }
505 }
506 } catch (...)
507 {
508 // Unexpected exception; do not assume that no providers are loaded
|
509 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
|
510 marek 1.21 "Unexpected Exception in hasActiveConsumers.");
|
511 h.sterling 1.1 PEG_METHOD_EXIT();
512 return true;
513 }
514
515 PEG_METHOD_EXIT();
516 return false;
517 }
518
519 /** Returns true if there are loaded consumers
|
520 kumpf 1.31 */
|
521 h.sterling 1.1 Boolean ConsumerManager::hasLoadedConsumers()
522 {
523 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::hasLoadedConsumers");
524
525 AutoMutex lock(_consumerTableMutex);
526 DynamicConsumer* consumer = 0;
527
528 try
529 {
530 for (ConsumerTable::Iterator i = _consumers.start(); i != 0; i++)
531 {
532 consumer = i.value();
533
534 if (consumer && consumer->isLoaded())
535 {
|
536 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
537 "Found loaded consumer: %s",
538 (const char*)consumer->_name.getCString()));
|
539 h.sterling 1.1 PEG_METHOD_EXIT();
540 return true;
541 }
542 }
543 } catch (...)
544 {
545 // Unexpected exception; do not assume that no providers are loaded
|
546 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
|
547 marek 1.21 "Unexpected Exception in hasLoadedConsumers.");
|
548 h.sterling 1.1 PEG_METHOD_EXIT();
549 return true;
550 }
551
552 PEG_METHOD_EXIT();
553 return false;
554 }
555
556
557 /** Shutting down a consumer consists of four major steps:
|
558 kumpf 1.31 * 1) Send the shutdown signal. This causes the worker routine to break out
|
559 marek 1.21 * of the loop and exit.
560 * 2) Wait for the worker thread to end. This may take a while if it's
561 * processing an indication. This is optional in a shutdown scenario.
562 * If the listener is shutdown with a -f force, the listener
563 * will not wait for the consumer to finish before shutting down.
564 * Note that a normal shutdown only allows the current consumer indication
565 * to finish. All other queued indications are serialized to a log and
|
566 h.sterling 1.1 * are sent when the consumer is reoaded.
567 * 3) Terminate the consumer provider interface.
|
568 marek 1.21 * 4) Decrement the module refcount (the module will automatically unload when
569 * it's refcount == 0)
|
570 kumpf 1.31 *
|
571 marek 1.21 * In a scenario where more multiple consumers are loaded, the shutdown signal
572 * should be sent to all of the consumers so the threads can finish
573 * simultaneously.
|
574 kumpf 1.31 *
|
575 marek 1.21 * ATTN: Should the normal shutdown wait for everything in the queue to be
576 * processed? Just new indications to be processed? I am not inclined to this
577 * solution since it could take a LOT of time. By serializing and deserialing
|
578 kumpf 1.31 * indications between shutdown and startup, I feel like we do not need to
|
579 marek 1.21 * process ALL queued indications on shutdown.
|
580 kumpf 1.31 */
|
581 h.sterling 1.1
582 /** Unloads all consumers.
|
583 kumpf 1.31 */
|
584 h.sterling 1.1 void ConsumerManager::unloadAllConsumers()
585 {
586 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadAllConsumers");
587
588 AutoMutex lock(_consumerTableMutex);
589
590 if (!_consumers.size())
591 {
|
592 marek 1.21 PEG_TRACE_CSTRING(
593 TRC_LISTENER,
594 Tracer::LEVEL4,
595 "There are no consumers to unload.");
|
596 h.sterling 1.1 PEG_METHOD_EXIT();
597 return;
598 }
599
600 if (!_forceShutdown)
601 {
|
602 marek 1.21 // wait until all the consumers have finished processing the events in
603 // their queue
604 // ATTN: Should this have a timeout even though it's a force??
|
605 h.sterling 1.1 while (hasActiveConsumers())
606 {
|
607 mike 1.15 Threads::sleep(500);
|
608 h.sterling 1.1 }
609 }
610
611 Array<DynamicConsumer*> loadedConsumers;
612
613 ConsumerTable::Iterator i = _consumers.start();
614 DynamicConsumer* consumer = 0;
615
616 for (; i!=0; i++)
617 {
618 consumer = i.value();
619 if (consumer && consumer->isLoaded())
620 {
621 loadedConsumers.append(consumer);
622 }
623 }
624
625 if (loadedConsumers.size())
626 {
627 try
628 {
629 h.sterling 1.1 _unloadConsumers(loadedConsumers);
630
|
631 kumpf 1.16 } catch (Exception&)
|
632 h.sterling 1.1 {
|
633 marek 1.21 PEG_TRACE_CSTRING(
634 TRC_LISTENER,
|
635 marek 1.25 Tracer::LEVEL2,
|
636 marek 1.21 "Error unloading consumers.");
|
637 h.sterling 1.1 }
638 } else
639 {
|
640 marek 1.21 PEG_TRACE_CSTRING(
641 TRC_LISTENER,
642 Tracer::LEVEL4,
643 "There are no consumers to unload.");
|
644 h.sterling 1.1 }
645
646 PEG_METHOD_EXIT();
647 }
648
649 /** Unloads idle consumers.
|
650 kumpf 1.31 */
|
651 h.sterling 1.1 void ConsumerManager::unloadIdleConsumers()
652 {
653 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadIdleConsumers");
654
655 AutoMutex lock(_consumerTableMutex);
656
657 if (!_consumers.size())
658 {
|
659 marek 1.21 PEG_TRACE_CSTRING(
660 TRC_LISTENER,
661 Tracer::LEVEL4,
662 "There are no consumers to unload.");
|
663 h.sterling 1.1 PEG_METHOD_EXIT();
664 return;
665 }
666
667 Array<DynamicConsumer*> loadedConsumers;
668
669 ConsumerTable::Iterator i = _consumers.start();
670 DynamicConsumer* consumer = 0;
671
672 for (; i!=0; i++)
673 {
674 consumer = i.value();
675 if (consumer && consumer->isLoaded() && consumer->isIdle())
676 {
677 loadedConsumers.append(consumer);
678 }
679 }
680
681 if (loadedConsumers.size())
682 {
683 try
684 h.sterling 1.1 {
685 _unloadConsumers(loadedConsumers);
686
|
687 kumpf 1.16 } catch (Exception&)
|
688 h.sterling 1.1 {
|
689 marek 1.21 PEG_TRACE_CSTRING(
690 TRC_LISTENER,
|
691 marek 1.25 Tracer::LEVEL2,
|
692 marek 1.21 "Error unloading consumers.");
|
693 h.sterling 1.1 }
694 } else
695 {
|
696 marek 1.21 PEG_TRACE_CSTRING(
697 TRC_LISTENER,
698 Tracer::LEVEL4,
699 "There are no consumers to unload.");
|
700 h.sterling 1.1 }
701
702 PEG_METHOD_EXIT();
703 }
704
705 /** Unloads a single consumer.
|
706 kumpf 1.31 */
|
707 h.sterling 1.1 void ConsumerManager::unloadConsumer(const String& consumerName)
708 {
709 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::unloadConsumer");
710
711 AutoMutex lock(_consumerTableMutex);
712
713 DynamicConsumer* consumer = 0;
714
715 //check whether the consumer exists
716 if (!_consumers.lookup(consumerName, consumer))
717 {
|
718 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
719 "Error: cannot unload consumer, unknown consumer %s",
720 (const char*)consumerName.getCString()));
|
721 h.sterling 1.1 return;
722 }
723
724 //check whether the consumer is loaded
725 if (consumer && consumer->isLoaded()) //ATTN: forceShutdown?
726 {
727 //unload the consumer
728 Array<DynamicConsumer*> loadedConsumers;
729 loadedConsumers.append(consumer);
730
731 try
732 {
733 _unloadConsumers(loadedConsumers);
734
|
735 kumpf 1.16 } catch (Exception&)
|
736 h.sterling 1.1 {
|
737 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
|
738 marek 1.21 "Error unloading consumers.");
|
739 h.sterling 1.1 }
740
741 } else
742 {
|
743 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL2,
744 "Error: cannot unload the not loaded consumer %s",
745 (const char*)consumerName.getCString()));
|
746 h.sterling 1.1 }
747
748 PEG_METHOD_EXIT();
749 }
750
751 /** Unloads the consumers in the given array.
752 * The consumerTable mutex MUST be locked prior to entering this method.
|
753 kumpf 1.31 */
|
754 marek 1.21 void ConsumerManager::_unloadConsumers(
755 Array<DynamicConsumer*> consumersToUnload)
|
756 h.sterling 1.1 {
757 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_unloadConsumers");
758
759 //tell consumers to shutdown
760 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
761 {
762 consumersToUnload[i]->sendShutdownSignal();
763 }
764
|
765 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL3,
|
766 marek 1.21 "Sent shutdown signal to all consumers.");
767
768 // wait for all the consumer worker threads to complete
|
769 kumpf 1.31 // since we can only shutdown after they are all complete,
|
770 marek 1.21 // it does not matter if the first, fifth, or last
771 // consumer takes the longest; the wait time is equal to the time it takes
772 // for the busiest consumer to stop processing its requests.
|
773 h.sterling 1.1 for (Uint32 i = 0; i < consumersToUnload.size(); i++)
774 {
|
775 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,"Unloading consumer %s",
776 (const char*)consumersToUnload[i]->getName().getCString()));
|
777 h.sterling 1.1
778 //wait for the consumer worker thread to end
|
779 kumpf 1.31 Semaphore* _shutdownSemaphore =
|
780 kumpf 1.26 consumersToUnload[i]->getShutdownSemaphore();
781 if (_shutdownSemaphore && !_shutdownSemaphore->time_wait(10000))
|
782 h.sterling 1.1 {
|
783 kumpf 1.26 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL2,
|
784 marek 1.21 "Timed out while attempting to stop consumer thread.");
|
785 h.sterling 1.1 }
786
|
787 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL2,"Terminating consumer.");
|
788 h.sterling 1.1
789 try
790 {
791 //terminate consumer provider interface
792 consumersToUnload[i]->terminate();
793
794 //unload consumer provider module
795 PEGASUS_ASSERT(consumersToUnload[i]->_module != 0);
796 consumersToUnload[i]->_module->unloadModule();
797
798 //serialize outstanding indications
|
799 marek 1.21 _serializeOutstandingIndications(
800 consumersToUnload[i]->getName(),
801 consumersToUnload[i]->_retrieveOutstandingIndications());
|
802 h.sterling 1.1
803 //reset the consumer
804 consumersToUnload[i]->reset();
805
|
806 thilo.boehm 1.28 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL2,
|
807 marek 1.21 "Consumer library successfully unloaded.");
|
808 h.sterling 1.1
809 } catch (Exception& e)
810 {
|
811 kumpf 1.31 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
|
812 thilo.boehm 1.28 "Error unloading consumer: %s",
|
813 kumpf 1.31 (const char*)e.getMessage().getCString()));
|
814 h.sterling 1.1 //ATTN: throw exception? log warning?
815 }
816 }
817
818 PEG_METHOD_EXIT();
819 }
820
821 /** Serializes oustanding indications to a <MyConsumer>.dat file
822 */
|
823 marek 1.21 void ConsumerManager::_serializeOutstandingIndications(
|
824 kumpf 1.31 const String& consumerName,
|
825 marek 1.21 Array<IndicationDispatchEvent> indications)
826 {
827 PEG_METHOD_ENTER(
828 TRC_LISTENER,
829 "ConsumerManager::_serializeOutstandingIndications");
|
830 h.sterling 1.1
831 if (!indications.size())
832 {
833 PEG_METHOD_EXIT();
834 return;
835 }
836
|
837 marek 1.21 String fileName = FileSystem::getAbsolutePath(
838 (const char*)_consumerConfigDir.getCString(),
839 String(consumerName + ".dat"));
|
840 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Consumer dat file: %s",
841 (const char*)fileName.getCString()));
|
842 h.sterling 1.1
|
843 mike 1.9 Buffer buffer;
|
844 h.sterling 1.1
|
845 kumpf 1.31 // Open the log file and serialize remaining
|
846 h.sterling 1.1 FILE* fileHandle = 0;
|
847 kumpf 1.31 fileHandle = fopen((const char*)fileName.getCString(), "w");
|
848 h.sterling 1.1
849 if (!fileHandle)
850 {
|
851 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
852 "Unable to open log file for %s",
853 (const char*)consumerName.getCString()));
|
854 h.sterling 1.1
855 } else
856 {
|
857 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
858 h.sterling 1.1 "Serializing %d outstanding requests for %s",
859 indications.size(),
|
860 marek 1.18 (const char*)consumerName.getCString()));
|
861 h.sterling 1.1
|
862 marek 1.21 // we have to put the array of instances under a valid root element
|
863 kumpf 1.31 // or the parser complains
|
864 h.sterling 1.1 XmlWriter::append(buffer, "<IRETURNVALUE>\n");
865
|
866 marek 1.21 CIMInstance cimInstance;
|
867 h.sterling 1.1 for (Uint32 i = 0; i < indications.size(); i++)
868 {
|
869 marek 1.21 //set the URL string property on the serializable instance
870 CIMValue cimValue(CIMTYPE_STRING, false);
871 cimValue.set(indications[i].getURL());
872 cimInstance = indications[i].getIndicationInstance();
873 CIMProperty cimProperty(URL_PROPERTY, cimValue);
874 cimInstance.addProperty(cimProperty);
|
875 h.sterling 1.11
876 XmlWriter::appendValueNamedInstanceElement(buffer, cimInstance);
|
877 marek 1.21 }
|
878 h.sterling 1.1
|
879 kumpf 1.19 XmlWriter::append(buffer, "</IRETURNVALUE>");
|
880 h.sterling 1.1
881 fputs((const char*)buffer.getData(), fileHandle);
882
883 fclose(fileHandle);
884 }
885
886 PEG_METHOD_EXIT();
887 }
888
889 /** Reads outstanding indications from a <MyConsumer>.dat file
|
890 kumpf 1.31 */
891 Array<IndicationDispatchEvent>
|
892 marek 1.21 ConsumerManager::_deserializeOutstandingIndications(
893 const String& consumerName)
894 {
895 PEG_METHOD_ENTER(
896 TRC_LISTENER,
897 "ConsumerManager::_deserializeOutstandingIndications");
898
899 String fileName = FileSystem::getAbsolutePath(
900 (const char*)_consumerConfigDir.getCString(),
901 String(consumerName + ".dat"));
|
902 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
903 "Consumer dat file: %s",(const char*)fileName.getCString()));
|
904 h.sterling 1.1
905 Array<CIMInstance> cimInstances;
|
906 marek 1.21 Array<String> urlStrings;
907 Array<IndicationDispatchEvent> indications;
|
908 h.sterling 1.1
909 // Open the log file and serialize remaining indications
910 if (FileSystem::exists(fileName) && FileSystem::canRead(fileName))
911 {
|
912 mike 1.9 Buffer text;
|
913 h.sterling 1.1 CIMInstance cimInstance;
|
914 marek 1.21 CIMProperty cimProperty;
915 CIMValue cimValue;
916 String urlString;
|
917 h.sterling 1.1 XmlEntry entry;
918
919 try
920 {
|
921 marek 1.21 //ATTN: Is this safe to use; what about CRLFs?
922 FileSystem::loadFileToMemory(text, fileName);
|
923 h.sterling 1.1
924 //parse the file
925 XmlParser parser((char*)text.getData());
926 XmlReader::expectStartTag(parser, entry, "IRETURNVALUE");
927
928 while (XmlReader::getNamedInstanceElement(parser, cimInstance))
929 {
|
930 marek 1.21 Uint32 index = cimInstance.findProperty(URL_PROPERTY);
931 if (index != PEG_NOT_FOUND)
932 {
|
933 kumpf 1.31 // get the URL string property from the serialized instance
|
934 marek 1.21 // and remove the property
935 cimProperty = cimInstance.getProperty(index);
936 cimValue = cimProperty.getValue();
937 cimValue.get(urlString);
938 cimInstance.removeProperty(index);
939 }
|
940 kumpf 1.31 IndicationDispatchEvent* indicationEvent =
|
941 marek 1.21 new IndicationDispatchEvent(
|
942 kumpf 1.31 OperationContext(),
943 urlString,
|
944 marek 1.21 cimInstance);
945
|
946 h.sterling 1.11 indications.append(*indicationEvent);
|
947 h.sterling 1.1 }
948
949 XmlReader::expectEndTag(parser, "IRETURNVALUE");
950
|
951 mike 1.20 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL3,
|
952 h.sterling 1.1 "Consumer %s has %d outstanding indications",
953 (const char*)consumerName.getCString(),
|
954 marek 1.18 indications.size()));
|
955 h.sterling 1.1
|
956 kumpf 1.31 //delete the file
|
957 h.sterling 1.1 FileSystem::removeFile(fileName);
958
959 } catch (Exception& ex)
960 {
|
961 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
962 "Error parsing dat file for consumer %s: %s",
963 (const char*)consumerName.getCString(),
964 (const char*)ex.getMessage().getCString()));
|
965 h.sterling 1.1
966 } catch (...)
967 {
|
968 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
969 "Error parsing dat file for consumer %s: Unknown Exception",
970 (const char*)consumerName.getCString()));
|
971 h.sterling 1.1 }
972 }
973
974 PEG_METHOD_EXIT();
|
975 h.sterling 1.11 return indications;
|
976 h.sterling 1.1 }
977
978
979
|
980 kumpf 1.31 /**
|
981 marek 1.21 * This is the main worker thread of the consumer. By having only one thread
|
982 kumpf 1.31 * per consumer, we eliminate a ton of synchronization issues and make it easy
|
983 marek 1.21 * to prevent the consumer from performing two mutually exclusive operations
|
984 kumpf 1.31 * at once. This also prevents one bad consumer from taking the entire
|
985 marek 1.21 * listener down. That being said, it is up to the programmer to write smart
|
986 kumpf 1.31 * consumers, and to ensure that their actions don't deadlock
|
987 marek 1.21 * the worker thread.
|
988 kumpf 1.31 *
|
989 marek 1.21 * If a consumer receives a lot of traffic, or it's consumeIndication() method
990 * takes a considerable amount of time to complete, it may make sense to make
991 * the consumer multi-threaded. The individual consumer can immediately
992 * spawn off* new threads to handle indications, and return immediately to
|
993 kumpf 1.31 * catch the next indication. In this way, a consumer can attain
994 * extremely high performance.
995 *
|
996 h.sterling 1.1 * There are three different events that can signal us:
997 * 1) A new indication (signalled by DynamicListenerIndicationDispatcher)
|
998 marek 1.21 * 2) A shutdown signal (signalled from ConsumerManager, due to a listener
999 * shutdown or an idle consumer state)
|
1000 h.sterling 1.1 * 3) A retry signal (signalled from this routine itself)
|
1001 kumpf 1.31 *
|
1002 marek 1.21 * The idea is that all new indications are put on the front of the queue and
|
1003 kumpf 1.31 * processed first. All of the retry indications are put on the back of the
|
1004 marek 1.21 * queue and are only processed AFTER all new indications are sent.
1005 * Before processing each indication, we check to see whether or not the
|
1006 kumpf 1.31 * shutdown signal was given.
1007 * If so, we immediately break out of the loop, and another compenent
|
1008 marek 1.21 * serializes the remaining indications to a file.
|
1009 kumpf 1.31 *
1010 * An indication gets retried
|
1011 marek 1.21 * if the consumer throws a CIM_ERR_FAILED exception.
|
1012 kumpf 1.31 *
1013 * This function makes sure it waits until the default retry lapse has passed
|
1014 marek 1.21 * to avoid issues with the following scenario:
|
1015 h.sterling 1.5 * 20 new indications come in, 10 of them are successful, 10 are not.
|
1016 kumpf 1.31 * We were signalled 20 times, so we will pass the time_wait 20 times.
|
1017 marek 1.21 * Perceivably, the process time on each indication could be minimal.
1018 * We could potentially proceed to process the retries after a very small time
1019 * interval since we would never hit the wait for the retry timeout.
|
1020 kumpf 1.31 *
1021 */
1022 ThreadReturnType PEGASUS_THREAD_CDECL
|
1023 marek 1.21 ConsumerManager::_worker_routine(void *param)
|
1024 h.sterling 1.1 {
1025 PEG_METHOD_ENTER(TRC_LISTENER, "ConsumerManager::_worker_routine");
1026
1027 DynamicConsumer* myself = static_cast<DynamicConsumer*>(param);
1028 String name = myself->getName();
|
1029 mike 1.15 List<IndicationDispatchEvent,Mutex> tmpEventQueue;
|
1030 h.sterling 1.1
|
1031 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
1032 "_worker_routine::entering loop for %s",
1033 (const char*)name.getCString()));
|
1034 h.sterling 1.1
|
1035 h.sterling 1.8 myself->_listeningSemaphore->signal();
1036
|
1037 h.sterling 1.1 while (true)
1038 {
|
1039 kumpf 1.31 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,"_worker_routine::waiting %s",
|
1040 thilo.boehm 1.28 (const char*)name.getCString()));
1041
|
1042 kumpf 1.26
1043 //wait to be signalled
1044 if (!myself->_check_queue->time_wait(DEFAULT_RETRY_LAPSE))
|
1045 h.sterling 1.1 {
|
1046 kumpf 1.26 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4,
1047 "_worker_routine::Time to retry any outstanding indications.");
1048
1049 // signal the queue in the same way we would,
1050 // if we received a new indication
1051 // this allows the thread to fall into the queue processing code
1052 myself->_check_queue->signal();
1053
1054 continue;
1055 }
1056
|
1057 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1058 "_worker_routine::signalled %s",(const char*)name.getCString()));
|
1059 h.sterling 1.1
|
1060 kumpf 1.26 //check whether we received the shutdown signal
1061 if (myself->_dieNow)
1062 {
|
1063 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1064 "_worker_routine::shutdown received %s",
1065 (const char*)name.getCString()));
|
1066 kumpf 1.26 break;
1067 }
|
1068 h.sterling 1.1
|
1069 kumpf 1.26 //create a temporary queue to store failed indications
1070 tmpEventQueue.clear();
|
1071 h.sterling 1.1
|
1072 kumpf 1.26 //continue processing events until the queue is empty
1073 //make sure to check for the shutdown signal before every iteration
1074 // Note that any time during our processing of events the Listener
1075 // may be enqueueing NEW events for us to process.
1076 // Because we are popping off the front and new events are being
1077 // thrown on the back if events are failing when we start
1078 // But are succeeding by the end of the processing, events may be
1079 // sent out of chronological order.
1080 // However. Once we complete the current queue of events, we will
1081 // always send old events to be retried before sending any
1082 // new events added afterwards.
1083 while (myself->_eventqueue.size())
1084 {
1085 //check for shutdown signal
1086 //this only breaks us out of the queue loop, but we will
1087 //immediately get through the next wait from
1088 //the shutdown signal itself, at which time we break
1089 //out of the main loop
|
1090 h.sterling 1.1 if (myself->_dieNow)
1091 {
|
1092 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
1093 "Received signal to shutdown, jumping out of queue loop %s",
1094 (const char*)name.getCString()));
|
1095 h.sterling 1.1 break;
1096 }
1097
|
1098 kumpf 1.26 //pop next indication off the queue
1099 IndicationDispatchEvent* event = 0;
1100 //what exceptions/errors can this throw?
1101 event = myself->_eventqueue.remove_front();
|
1102 h.sterling 1.1
|
1103 kumpf 1.26 if (!event)
|
1104 h.sterling 1.1 {
|
1105 kumpf 1.26 //this should never happen
1106 continue;
1107 }
|
1108 h.sterling 1.1
|
1109 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1110 "_worker_routine::consumeIndication %s",
1111 (const char*)name.getCString()));
|
1112 h.sterling 1.1
|
1113 kumpf 1.26 try
1114 {
1115 myself->consumeIndication(event->getContext(),
1116 event->getURL(),
1117 event->getIndicationInstance());
1118
|
1119 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL4,
1120 "_worker_routine::processed indication successfully. %s",
1121 (const char*)name.getCString()));
|
1122 h.sterling 1.1
|
1123 kumpf 1.26 delete event;
1124 continue;
1125 }
1126 catch (CIMException & ce)
1127 {
1128 //check for failure
1129 if (ce.getCode() == CIM_ERR_FAILED)
|
1130 h.sterling 1.1 {
|
1131 kumpf 1.31 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL2,
|
1132 kumpf 1.26 "_worker_routine::consumeIndication() temporary"
|
1133 thilo.boehm 1.28 " failure %s : %s",
1134 (const char*)name.getCString(),
1135 (const char*)ce.getMessage().getCString()));
|
1136 kumpf 1.31
|
1137 kumpf 1.26 // Here we simply determine if we should increment
1138 // the retry count or not.
1139 // We don't want to count a forced retry from a new
|
1140 kumpf 1.31 // event to count as a retry.
|
1141 kumpf 1.26 // We just have to do it for order's sake.
1142 // If the retry Lapse has lapsed on this event,
1143 // then increment the counter.
1144 if (event->getRetries() > 0)
1145 {
|
1146 kumpf 1.31 Sint64 differenceInMicroseconds =
|
1147 kumpf 1.26 CIMDateTime::getDifference(
1148 event->getLastAttemptTime(),
1149 CIMDateTime::getCurrentDateTime());
|
1150 h.sterling 1.1
|
1151 kumpf 1.31 if (differenceInMicroseconds >=
|
1152 kumpf 1.26 (DEFAULT_RETRY_LAPSE * 1000))
|
1153 marek 1.21 {
|
1154 h.sterling 1.10 event->increaseRetries();
1155 }
|
1156 kumpf 1.26 }
1157 else
1158 {
1159 event->increaseRetries();
1160 }
|
1161 h.sterling 1.1
|
1162 kumpf 1.26 //determine if we have hit the max retry count
1163 if (event->getRetries() >= DEFAULT_MAX_RETRY_COUNT)
1164 {
1165 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
1166 "Error: the maximum retry count has been "
1167 "exceeded. Removing the event from "
1168 "the queue.");
1169
1170 Logger::put(
1171 Logger::ERROR_LOG,
1172 System::CIMLISTENER,
1173 Logger::SEVERE,
1174 "The following indication did not get "
1175 "processed successfully: $0",
1176 event->getIndicationInstance().getPath().toString());
|
1177 h.sterling 1.1
1178 delete event;
1179 continue;
1180 }
|
1181 kumpf 1.26 else
1182 {
1183 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4,
1184 "_worker_routine::placing failed indication "
1185 "back in queue");
1186 tmpEventQueue.insert_back(event);
1187 }
1188 }
1189 else
|
1190 h.sterling 1.1 {
|
1191 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
1192 "Error: consumeIndication() permanent failure: %s",
1193 (const char*)ce.getMessage().getCString()));
|
1194 h.sterling 1.1 delete event;
1195 continue;
|
1196 kumpf 1.26 }
1197 }
1198 catch (Exception & ex)
|
1199 h.sterling 1.1 {
|
1200 thilo.boehm 1.28 PEG_TRACE((TRC_LISTENER, Tracer::LEVEL1,
1201 "Error: consumeIndication() permanent failure: %s",
1202 (const char*)ex.getMessage().getCString()));
|
1203 kumpf 1.26 delete event;
1204 continue;
|
1205 h.sterling 1.1 }
|
1206 kumpf 1.26 catch (...)
1207 {
1208 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL1,
1209 "Error: consumeIndication() failed: Unknown exception.");
1210 delete event;
1211 continue;
1212 } //end try
1213
1214 } //while eventqueue
1215
1216 // Copy the failed indications back to the main queue
1217 // We now lock the queue while adding the retries on to the queue
1218 // so that new events can't get in in front
1219 // Of those events we are retrying. Retried events happened before
1220 // any new events coming in.
1221 IndicationDispatchEvent* tmpEvent = 0;
|
1222 kumpf 1.27 if (myself->_eventqueue.try_lock())
|
1223 h.sterling 1.1 {
|
1224 kumpf 1.27 while (tmpEventQueue.size())
1225 {
1226 tmpEvent = tmpEventQueue.remove_front();
1227 myself->_eventqueue.insert_back(tmpEvent);
1228 }
1229
1230 myself->_eventqueue.unlock();
1231 }
1232 else
1233 {
1234 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3,
1235 "Failed to lock _eventqueue");
|
1236 kumpf 1.26 }
|
1237 h.sterling 1.1 } //shutdown
1238
1239 PEG_METHOD_EXIT();
1240 return 0;
1241 }
1242
1243
1244 PEGASUS_NAMESPACE_END
|