1 karl 1.11 //%2006////////////////////////////////////////////////////////////////////////
|
2 h.sterling 1.1 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 h.sterling 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 h.sterling 1.1 #include "DynamicConsumer.h"
35 #include "DynamicConsumerFacade.h"
36
37 #include <Pegasus/Common/Config.h>
|
38 mike 1.14 #include <Pegasus/Common/Time.h>
|
39 h.sterling 1.1 #include <Pegasus/Common/System.h>
40 #include <Pegasus/Common/Tracer.h>
41 #include <Pegasus/Common/XmlWriter.h>
42 #include <Pegasus/Common/XmlReader.h>
43 #include <Pegasus/Common/XmlParser.h>
44 #include <Pegasus/Common/FileSystem.h>
45
46 PEGASUS_NAMESPACE_BEGIN
47 PEGASUS_USING_STD;
48
49
50 DynamicConsumer::DynamicConsumer(): Base(0)
51 {
52 }
53
54 DynamicConsumer::DynamicConsumer(const String& name):
55 Base(0),
56 _module(0),
|
57 mike 1.13 _eventqueue(),
|
58 h.sterling 1.1 _name(name),
59 _initialized(false),
60 _dieNow(0),
61 _no_unload(0)
62 {
63 _check_queue = new Semaphore(0);
64 _shutdownSemaphore = new Semaphore(0);
|
65 h.sterling 1.6 _listeningSemaphore = new Semaphore(0);
|
66 h.sterling 1.1 }
67
68 //ATTN: For migration from old listener -- do we want to support it?
69 DynamicConsumer::DynamicConsumer(const String & name,
70 ConsumerModule* consumerModule,
71 CIMIndicationConsumerProvider* consumerRef) :
72 Base(consumerRef),
73 _module(consumerModule),
|
74 mike 1.13 _eventqueue(),
|
75 h.sterling 1.1 _name(name),
76 _initialized(false),
77 _dieNow(0),
78 _no_unload(0)
79 {
80 _check_queue = new Semaphore(0);
81 _shutdownSemaphore = new Semaphore(0);
|
82 h.sterling 1.6 _listeningSemaphore = new Semaphore(0);
|
83 h.sterling 1.1 }
84
85 DynamicConsumer::~DynamicConsumer(void)
86 {
87 //delete any outstanding events
88 IndicationDispatchEvent* event;
89 while (_eventqueue.size())
90 {
|
91 mike 1.13 event = _eventqueue.remove_front();
|
92 h.sterling 1.1 delete event;
93 }
94
95 //delete semaphores
|
96 kumpf 1.12 delete _check_queue;
|
97 h.sterling 1.1
|
98 kumpf 1.12 delete _shutdownSemaphore;
|
99 h.sterling 1.6
|
100 kumpf 1.12 delete _listeningSemaphore;
|
101 h.sterling 1.1 }
102
103 CIMIndicationConsumerProvider* DynamicConsumer::getConsumer()
104 {
105 return(_consumer);
106 }
107
108 ConsumerModule* DynamicConsumer::getModule(void) const
109 {
110 return(_module);
111 }
112
113 String DynamicConsumer::getName(void) const
114 {
115 return(_name);
116 }
117
118 Boolean DynamicConsumer::isLoaded(void) const
119 {
120 return(_module == 0 ? false : true);
121 }
122 h.sterling 1.1
123 Boolean DynamicConsumer::isInitialized(void) const
124 {
125 return(_initialized);
126 }
127
128 /** Initializes the consumer.
129 * Caller assumes responsibility for catching exceptions thrown by this method.
130 */
131 void DynamicConsumer::initialize()
132 {
133 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::initialize");
134
135 if (!_initialized)
136 {
137 try
138 {
139 //there is no cimom handle in the listener, so pass null
140 CIMOMHandle* handle = 0;
141 DynamicConsumerFacade::initialize(*(handle));
142
143 h.sterling 1.1 updateIdleTimer();
144 _initialized = true;
145
|
146 marek 1.19 PEG_TRACE_CSTRING(
147 TRC_LISTENER,
148 Tracer::LEVEL3,
149 "Successfully initialized consumer.");
|
150 h.sterling 1.1
151 } catch (...)
152 {
|
153 thilo.boehm 1.22 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
154 "Exception caught in DynamicConsumerFacade::initialize for %s",
155 (const char*)_name.getCString()));
|
156 h.sterling 1.1 throw;
157 }
158 }
159
160 PEG_METHOD_EXIT();
161 }
162
163 void DynamicConsumer::setShutdownSemaphore(Semaphore* shutdownSemaphore)
164 {
165 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::setShutdownSemaphore");
166
167 _shutdownSemaphore = shutdownSemaphore;
168
169 PEG_METHOD_EXIT();
170 }
171
172 Semaphore* DynamicConsumer::getShutdownSemaphore()
173 {
174 return _shutdownSemaphore;
175 }
176
177 h.sterling 1.1 void DynamicConsumer::sendShutdownSignal()
178 {
179 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::sendShutdownSignal");
180
181 _dieNow = true;
182 _check_queue->signal();
183
184 PEG_METHOD_EXIT();
185 }
186
187 void DynamicConsumer::terminate(void)
188 {
189 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::terminate");
190
191 if (_initialized)
192 {
193 //terminate consumer
194 try
195 {
196 DynamicConsumerFacade::terminate();
197
198 h.sterling 1.1 } catch (...)
199 {
|
200 thilo.boehm 1.22 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
201 "Exception caught in DynamicConsumerFacade::Terminate for %s",
202 (const char*)_name.getCString()));
|
203 h.sterling 1.1 throw;
204 }
205
206 //update status
207 _initialized = false;
208 _dieNow = false;
209 }
210
211 PEG_METHOD_EXIT();
212 }
213
|
214 marek 1.19 /** This method should be called after the physical consumer
215 * is loaded and before initialization.
|
216 h.sterling 1.1 */
217 void DynamicConsumer::set(ConsumerModule* consumerModule,
218 CIMIndicationConsumerProvider* consumerRef)
219 {
220 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::set");
221
222 if (_initialized)
223 {
|
224 marek 1.19 throw Exception(
225 MessageLoaderParms(
226 "DynListener.DynamicConsumer.CONSUMER_INVALID_STATE",
227 "Error: The consumer is not in the correct state"
228 " to perform the operation."));
|
229 h.sterling 1.1 }
230
231 _module = consumerModule;
232 _consumer = consumerRef;
233
234 PEG_METHOD_EXIT();
235 }
236
|
237 marek 1.19 /** This method should be called after the consumer is terminated and the
238 * module is unloaded. Note that we cannot test for a loaded condition,
239 * since the _module reference here may still exist (if more than one
240 * consumer is using the module).
241 * Simply test whether the consumer is initialized.
242 * If it was terminated properly, initialized will be false and the _module
|
243 h.sterling 1.1 * ref count will be decremented.
244 */
245 void DynamicConsumer::reset()
246 {
247 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::reset");
248
249 if (_initialized)
250 {
|
251 marek 1.19 throw Exception(
252 MessageLoaderParms(
253 "DynListener.DynamicConsumer.CONSUMER_INVALID_STATE",
254 "Error: The consumer is not in the correct state to "
255 "perform the operation."));
|
256 h.sterling 1.1 }
257
|
258 marek 1.19 // do not delete it, that is taken care of in ConsumerModule itself
259 _module = 0;
260 // ATTN: attempting to delete this causes an exception -- why??
261 _consumer = 0;
|
262 h.sterling 1.1
|
263 mike 1.18 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
|
264 h.sterling 1.1 "Deleting %d outstanding requests for %s",
265 _eventqueue.size(),
|
266 marek 1.16 (const char*)_name.getCString()));
|
267 h.sterling 1.1
268 //delete outstanding requests
269 IndicationDispatchEvent* event = 0;
270 for (Uint32 i = 0; i < _eventqueue.size(); i++)
271 {
|
272 mike 1.13 event = _eventqueue.remove_front();
|
273 h.sterling 1.1 delete event;
274 }
275
276 PEG_METHOD_EXIT();
277 }
278
279 void DynamicConsumer::enqueueEvent(IndicationDispatchEvent* event)
280 {
281 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::enqueueEvent");
282
283 if (!isLoaded())
284 {
|
285 marek 1.19 PEG_TRACE_CSTRING(
286 TRC_LISTENER,
|
287 marek 1.20 Tracer::LEVEL1,
|
288 marek 1.19 "Error: The consumer is not loaded and "
289 "therefore cannot handle events.");
|
290 h.sterling 1.1 return;
291 }
292
293 try
294 {
|
295 thilo.boehm 1.22 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
296 "enqueueEvent before %s",(const char*)_name.getCString()));
297
|
298 h.sterling 1.8 // Our event queue is first in first out.
|
299 mike 1.13 _eventqueue.insert_back(event);
|
300 h.sterling 1.1 _check_queue->signal();
301
|
302 thilo.boehm 1.22 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
303 "enqueueEvent after %s",(const char*)_name.getCString()));
|
304 h.sterling 1.1
305 } catch (Exception& ex)
306 {
307 //ATTN: Log missed indication
|
308 thilo.boehm 1.22 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
309 "Exception at enqueueingEvent: %s",
310 (const char*)ex.getMessage().getCString()));
|
311 h.sterling 1.1
312 } catch (...)
313 {
314 //ATTN: Log missed indication
|
315 thilo.boehm 1.22 PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
316 "Unknow exception at enqueueingEvent!");
|
317 h.sterling 1.1 }
318
319 PEG_METHOD_EXIT();
320 }
321
322 void DynamicConsumer::getIdleTimer(struct timeval *tv)
323 {
324 if (tv == 0)
325 {
326 return;
327 }
328
329 try
330 {
331 AutoMutex lock(_idleTimeMutex);
332 memcpy(tv, &_idleTime, sizeof(struct timeval));
333 } catch (...)
334 {
|
335 mike 1.14 Time::gettimeofday(tv);
|
336 h.sterling 1.1 }
337 }
338
339 void DynamicConsumer::updateIdleTimer()
340 {
341 try
342 {
343 AutoMutex lock(_idleTimeMutex);
|
344 mike 1.14 Time::gettimeofday(&_idleTime);
|
345 h.sterling 1.1
346 } catch (...)
347 {
348 }
349 }
350
351 Uint32 DynamicConsumer::getPendingIndications()
352 {
353 return _eventqueue.size();
354 }
355
356 String DynamicConsumer::toString()
357 {
358 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::toString");
359
|
360 kumpf 1.17 String buffer;
|
361 h.sterling 1.1 if (_initialized)
362 {
363 buffer.append("Consumer " + _name + " is initialized.\n");
364 buffer.append("Module name " + _module->getFileName() + "\n");
365 }
366
367 PEG_METHOD_EXIT();
368 return buffer;
369 }
370
|
371 marek 1.19 /** Returns true if the consumer has been inactive for
372 * longer than the idle period.
|
373 h.sterling 1.1 */
374 Boolean DynamicConsumer::isIdle()
375 {
376 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::isIdle");
377
378 if (!isLoaded())
379 {
|
380 marek 1.19 PEG_TRACE_CSTRING(
381 TRC_LISTENER,
|
382 marek 1.20 Tracer::LEVEL2,
|
383 marek 1.19 "Consumer is not loaded.");
|
384 h.sterling 1.1 return false;
385 }
386
387 struct timeval now;
|
388 mike 1.14 Time::gettimeofday(&now);
|
389 h.sterling 1.1
390 struct timeval timeout = {0,0};
391 getIdleTimer(&timeout);
392
|
393 marek 1.19 // if no consumer is currently being served and there's no consumer that
394 // has pending indications, we are idle
|
395 h.sterling 1.10 if (!_current_operations.get() && !getPendingIndications())
|
396 h.sterling 1.1 {
397 PEG_METHOD_EXIT();
398 return true;
399 }
400
401 PEG_METHOD_EXIT();
402 return false;
403 }
404
|
405 marek 1.19 /** This method waits until the event thread is ready to accept incoming
406 * indications. Otherwise, there is a miniscule chance that
407 * the first event will be enqueued before the consumer is waiting for it
408 * and the first indication after loading the consumer will be lost.
|
409 h.sterling 1.6 */
410 void DynamicConsumer::waitForEventThread()
411 {
412 _listeningSemaphore->wait();
413 }
414
|
415 h.sterling 1.1 /** This method is called when the consumer is initialized for the first time.
416 * It reads the outstanding requests from the dat file and enqueues them.
417 *
|
418 marek 1.19 * ATTN: This method will only get called when a consumer is initialized.
419 * Therefore, when the listener starts, the outstanding indications for this
420 * consumer will not get sent UNTIL a new indication comes in. This is not
421 * really an acceptable scenario. Maybe the consumer manager needs to check
422 * the .dat files upon startup and load if they are not empty.
|
423 h.sterling 1.1 */
|
424 marek 1.19 void DynamicConsumer::_loadOutstandingIndications(
425 Array<IndicationDispatchEvent> indications)
|
426 h.sterling 1.1 {
|
427 marek 1.19 PEG_METHOD_ENTER(
428 TRC_LISTENER,
429 "DynamicConsumer::_loadOutstandingIndications");
|
430 h.sterling 1.1
431 //create dispatch events from the instances
|
432 h.sterling 1.9 IndicationDispatchEvent* event = 0;
|
433 h.sterling 1.1 for (Uint32 i=0; i < indications.size(); i++)
434 {
|
435 marek 1.19
436 event = new IndicationDispatchEvent(
437 OperationContext(), //ATTN: Do we need to store this?
438 indications[i].getURL(),
439 indications[i].getIndicationInstance());
440
|
441 mike 1.13 _eventqueue.insert_back(event);
|
442 h.sterling 1.1 }
443
444 //signal the worker thread so it falls into the queue processing code
445 if (_eventqueue.size())
446 {
447 _check_queue->signal();
448 }
449
450 PEG_METHOD_EXIT();
451 }
452
|
453 marek 1.19 /** This method serializes the remaining indications in the queue.
454 * It should be called when the consumer is shutting down. Each time the
455 * consumer is loaded, these indications will be reloaded into the queue.
456 * Therefore, the file should be overwritten each time to eliminate
457 * duplicating outstanding indications.
|
458 h.sterling 1.1 *
459 * ATTN: Should we let another method delete the instances?
460 */
|
461 marek 1.19 Array<IndicationDispatchEvent>
462 DynamicConsumer::_retrieveOutstandingIndications()
|
463 h.sterling 1.1 {
|
464 marek 1.19 PEG_METHOD_ENTER(
465 TRC_LISTENER,
466 "DynamicConsumer::_retrieveOutstandingIndications");
|
467 h.sterling 1.1
|
468 h.sterling 1.9 Array<IndicationDispatchEvent> indications;
|
469 h.sterling 1.1 IndicationDispatchEvent* temp = 0;
470
471 try
472 {
|
473 kumpf 1.21 if (_eventqueue.try_lock())
|
474 h.sterling 1.1 {
|
475 kumpf 1.21 temp = _eventqueue.front();
476 while (temp)
477 {
478 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "retrieving");
479 indications.append(*temp);
480 temp = _eventqueue.next_of(temp);
481 }
482 _eventqueue.unlock();
|
483 h.sterling 1.1 }
|
484 kumpf 1.21 else
485 {
486 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3,
487 "Failed to lock _eventqueue");
488 }
489 }
490 catch (...)
|
491 h.sterling 1.1 {
|
492 marek 1.16 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Unknown Exception");
|
493 h.sterling 1.1 }
494
495 PEG_METHOD_EXIT();
496 return indications;
497 }
498
499
500 ////////////////////////////////
501 // IndicationDispatchEvent
502 ////////////////////////////////
503
|
504 h.sterling 1.9 IndicationDispatchEvent::IndicationDispatchEvent()
505 {
506 }
507
|
508 h.sterling 1.1 IndicationDispatchEvent::IndicationDispatchEvent(OperationContext context,
509 String url,
510 CIMInstance instance) :
511 _context(context),
512 _url(url),
513 _instance(instance),
|
514 h.sterling 1.4 _retries(0),
515 _lastAttemptTime(CIMDateTime())
|
516 h.sterling 1.1 {
517 }
518
|
519 mike 1.13 IndicationDispatchEvent::IndicationDispatchEvent(
520 const IndicationDispatchEvent &event) : Linkable(event)
|
521 h.sterling 1.10 {
522 _context = event._context;
523 _url = event._url;
524 _instance = event._instance;
525 _retries = event._retries.get();
526 _lastAttemptTime = event._lastAttemptTime;
527 }
528
|
529 h.sterling 1.1 IndicationDispatchEvent::~IndicationDispatchEvent()
530 {
531 }
532
533 OperationContext IndicationDispatchEvent::getContext() const
534 {
535 return _context;
536 }
537
538 String IndicationDispatchEvent::getURL() const
539 {
540 return _url;
541 }
542
543 CIMInstance IndicationDispatchEvent::getIndicationInstance() const
544 {
545 return _instance;
546 }
547
|
548 mike 1.7 Uint32 IndicationDispatchEvent::getRetries()
|
549 h.sterling 1.1 {
|
550 mike 1.7 return _retries.get();
|
551 h.sterling 1.1 }
552
553 void IndicationDispatchEvent::increaseRetries()
554 {
|
555 marek 1.16 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Increasing retries\n");
|
556 h.sterling 1.1 _retries++;
|
557 h.sterling 1.4 _lastAttemptTime = CIMDateTime::getCurrentDateTime();
|
558 thilo.boehm 1.22 PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Last attempt time %s",
559 (const char*)_lastAttemptTime.toString().getCString()));
|
560 h.sterling 1.4 }
561
562 CIMDateTime IndicationDispatchEvent::getLastAttemptTime()
563 {
564 return _lastAttemptTime;
|
565 h.sterling 1.1 }
566
|
567 h.sterling 1.9
|
568 marek 1.19 IndicationDispatchEvent&
569 IndicationDispatchEvent::operator=(const IndicationDispatchEvent &event)
|
570 h.sterling 1.9 {
|
571 marek 1.19 _context = event._context;
572 _url = event._url;
573 _instance = event._instance;
574 _retries = event._retries.get();
575 _lastAttemptTime = event._lastAttemptTime;
|
576 h.sterling 1.9
577 return *this;
578 }
579
|
580 marek 1.19 Boolean IndicationDispatchEvent::operator==
581 (const IndicationDispatchEvent& event) const
|
582 h.sterling 1.1 {
583 if (String::equal(this->_url, event._url) &&
584 (this->_instance.identical(event._instance)))
585 {
586 return true;
587 }
588 return false;
589 }
590
591 PEGASUS_NAMESPACE_END
592
|