1 karl 1.11 //%2006////////////////////////////////////////////////////////////////////////
|
2 h.sterling 1.1 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 h.sterling 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Heather Sterling (hsterl@us.ibm.com)
33 //
34 h.sterling 1.1 // Modified By:
35 //
36 //%/////////////////////////////////////////////////////////////////////////////
37
38 #include "DynamicConsumer.h"
39 #include "DynamicConsumerFacade.h"
40
41 #include <Pegasus/Common/Config.h>
|
42 mike 1.14 #include <Pegasus/Common/Time.h>
|
43 h.sterling 1.1 #include <Pegasus/Common/System.h>
44 #include <Pegasus/Common/Tracer.h>
45 #include <Pegasus/Common/XmlWriter.h>
46 #include <Pegasus/Common/XmlReader.h>
47 #include <Pegasus/Common/XmlParser.h>
48 #include <Pegasus/Common/FileSystem.h>
49
50 PEGASUS_NAMESPACE_BEGIN
51 PEGASUS_USING_STD;
52
53
54 DynamicConsumer::DynamicConsumer(): Base(0)
55 {
56 }
57
58 DynamicConsumer::DynamicConsumer(const String& name):
59 Base(0),
60 _module(0),
|
61 mike 1.13 _eventqueue(),
|
62 h.sterling 1.1 _name(name),
63 _initialized(false),
64 _dieNow(0),
65 _no_unload(0)
66 {
67 _check_queue = new Semaphore(0);
68 _shutdownSemaphore = new Semaphore(0);
|
69 h.sterling 1.6 _listeningSemaphore = new Semaphore(0);
|
70 h.sterling 1.1 }
71
72 //ATTN: For migration from old listener -- do we want to support it?
73 DynamicConsumer::DynamicConsumer(const String & name,
74 ConsumerModule* consumerModule,
75 CIMIndicationConsumerProvider* consumerRef) :
76 Base(consumerRef),
77 _module(consumerModule),
|
78 mike 1.13 _eventqueue(),
|
79 h.sterling 1.1 _name(name),
80 _initialized(false),
81 _dieNow(0),
82 _no_unload(0)
83 {
84 _check_queue = new Semaphore(0);
85 _shutdownSemaphore = new Semaphore(0);
|
86 h.sterling 1.6 _listeningSemaphore = new Semaphore(0);
|
87 h.sterling 1.1 }
88
89 DynamicConsumer::~DynamicConsumer(void)
90 {
91 //delete any outstanding events
92 IndicationDispatchEvent* event;
93 while (_eventqueue.size())
94 {
|
95 mike 1.13 event = _eventqueue.remove_front();
|
96 h.sterling 1.1 delete event;
97 }
98
99 //delete semaphores
|
100 kumpf 1.12 delete _check_queue;
|
101 h.sterling 1.1
|
102 kumpf 1.12 delete _shutdownSemaphore;
|
103 h.sterling 1.6
|
104 kumpf 1.12 delete _listeningSemaphore;
|
105 h.sterling 1.1 }
106
107 CIMIndicationConsumerProvider* DynamicConsumer::getConsumer()
108 {
109 return(_consumer);
110 }
111
112 ConsumerModule* DynamicConsumer::getModule(void) const
113 {
114 return(_module);
115 }
116
117 String DynamicConsumer::getName(void) const
118 {
119 return(_name);
120 }
121
122 Boolean DynamicConsumer::isLoaded(void) const
123 {
124 return(_module == 0 ? false : true);
125 }
126 h.sterling 1.1
127 Boolean DynamicConsumer::isInitialized(void) const
128 {
129 return(_initialized);
130 }
131
132 /** Initializes the consumer.
133 * Caller assumes responsibility for catching exceptions thrown by this method.
134 */
135 void DynamicConsumer::initialize()
136 {
137 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::initialize");
138
139 if (!_initialized)
140 {
141 // yield before a potentially lengthy operation.
|
142 mike 1.14 Threads::yield();
|
143 h.sterling 1.1
144 try
145 {
146 //there is no cimom handle in the listener, so pass null
147 CIMOMHandle* handle = 0;
148 DynamicConsumerFacade::initialize(*(handle));
149
150 updateIdleTimer();
151 _initialized = true;
152
153 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Successfully initialized consumer.");
154
155 } catch (...)
156 {
157 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
158 "Exception caught in DynamicConsumerFacade::initialize for " +
159 _name);
160 throw;
161 }
162 }
163
164 h.sterling 1.1 PEG_METHOD_EXIT();
165 }
166
167 void DynamicConsumer::setShutdownSemaphore(Semaphore* shutdownSemaphore)
168 {
169 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::setShutdownSemaphore");
170
171 _shutdownSemaphore = shutdownSemaphore;
172
173 PEG_METHOD_EXIT();
174 }
175
176 Semaphore* DynamicConsumer::getShutdownSemaphore()
177 {
178 return _shutdownSemaphore;
179 }
180
181 void DynamicConsumer::sendShutdownSignal()
182 {
183 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::sendShutdownSignal");
184
185 h.sterling 1.1 _dieNow = true;
186 _check_queue->signal();
187
188 PEG_METHOD_EXIT();
189 }
190
191 void DynamicConsumer::terminate(void)
192 {
193 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::terminate");
194
195 if (_initialized)
196 {
197 // yield before a potentially lengthy operation.
|
198 mike 1.14 Threads::yield();
|
199 h.sterling 1.1
200 //terminate consumer
201 try
202 {
203 DynamicConsumerFacade::terminate();
204
205 } catch (...)
206 {
207 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
208 "Exception caught in DynamicConsumerFacade::Terminate for " +
209 _name);
210 throw;
211 }
212
213 //update status
214 _initialized = false;
215 _dieNow = false;
216 }
217
218 PEG_METHOD_EXIT();
219 }
220 h.sterling 1.1
221 /** This method should be called after the physical consumer is loaded and before initialization.
222 */
223 void DynamicConsumer::set(ConsumerModule* consumerModule,
224 CIMIndicationConsumerProvider* consumerRef)
225 {
226 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::set");
227
228 if (_initialized)
229 {
|
230 h.sterling 1.4 throw Exception(MessageLoaderParms("DynListener.DynamicConsumer.CONSUMER_INVALID_STATE",
231 "Error: The consumer is not in the correct state to perform the operation."));
|
232 h.sterling 1.1 }
233
234 _module = consumerModule;
235 _consumer = consumerRef;
236
237 PEG_METHOD_EXIT();
238 }
239
240 /** This method should be called after the consumer is terminated and the module is unloaded. Note that we cannot test
241 * for a loaded condition, since the _module reference here may still exist (if more than one consumer is using the module).
242 * Simply test whether the consumer is initialized. If it was terminated properly, initialized will be false and the _module
243 * ref count will be decremented.
244 */
245 void DynamicConsumer::reset()
246 {
247 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::reset");
248
249 if (_initialized)
250 {
|
251 h.sterling 1.4 throw Exception(MessageLoaderParms("DynListener.DynamicConsumer.CONSUMER_INVALID_STATE",
252 "Error: The consumer is not in the correct state to perform the operation."));
|
253 h.sterling 1.1 }
254
255 _module = 0; // do not delete it, that is taken care of in ConsumerModule itself
256 _consumer = 0; // ATTN: attempting to delete this causes an exception -- why??
257
258 Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
259 "Deleting %d outstanding requests for %s",
260 _eventqueue.size(),
261 (const char*)_name.getCString());
262
263 //delete outstanding requests
264 IndicationDispatchEvent* event = 0;
265 for (Uint32 i = 0; i < _eventqueue.size(); i++)
266 {
|
267 mike 1.13 event = _eventqueue.remove_front();
|
268 h.sterling 1.1 delete event;
269 }
270
271 PEG_METHOD_EXIT();
272 }
273
274 void DynamicConsumer::enqueueEvent(IndicationDispatchEvent* event)
275 {
276 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::enqueueEvent");
277
278 if (!isLoaded())
279 {
280 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: The consumer is not loaded and therefore cannot handle events.");
281 return;
282 }
283
284 try
285 {
286 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "enqueueEvent before " + _name);
|
287 h.sterling 1.8 // Our event queue is first in first out.
|
288 mike 1.13 _eventqueue.insert_back(event);
|
289 h.sterling 1.1 _check_queue->signal();
290
291 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "enqueueEvent after " + _name);
292
293 } catch (Exception& ex)
294 {
295 //ATTN: Log missed indication
296 PEGASUS_STD(cout) << "Error enqueueing event" << ex.getMessage() << "\n";
297
298 } catch (...)
299 {
300 //ATTN: Log missed indication
301 PEGASUS_STD(cout) << "Unknown exception";
302 }
303
304 PEG_METHOD_EXIT();
305 }
306
307 void DynamicConsumer::getIdleTimer(struct timeval *tv)
308 {
309 if (tv == 0)
310 h.sterling 1.1 {
311 return;
312 }
313
314 try
315 {
316 AutoMutex lock(_idleTimeMutex);
317 memcpy(tv, &_idleTime, sizeof(struct timeval));
318 } catch (...)
319 {
|
320 mike 1.14 Time::gettimeofday(tv);
|
321 h.sterling 1.1 }
322 }
323
324 void DynamicConsumer::updateIdleTimer()
325 {
326 try
327 {
328 AutoMutex lock(_idleTimeMutex);
|
329 mike 1.14 Time::gettimeofday(&_idleTime);
|
330 h.sterling 1.1
331 } catch (...)
332 {
333 }
334 }
335
336 Uint32 DynamicConsumer::getPendingIndications()
337 {
338 return _eventqueue.size();
339 }
340
341 String DynamicConsumer::toString()
342 {
343 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::toString");
344
345 String buffer = String::EMPTY;
346 if (_initialized)
347 {
348 buffer.append("Consumer " + _name + " is initialized.\n");
349 buffer.append("Module name " + _module->getFileName() + "\n");
350 }
351 h.sterling 1.1
352 PEG_METHOD_EXIT();
353 return buffer;
354 }
355
356 /** Returns true if the consumer has been inactive for longer than the idle period.
357 */
358 Boolean DynamicConsumer::isIdle()
359 {
360 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::isIdle");
361
362 if (!isLoaded())
363 {
364 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer is not loaded.");
365 return false;
366 }
367
368 struct timeval now;
|
369 mike 1.14 Time::gettimeofday(&now);
|
370 h.sterling 1.1
371 struct timeval timeout = {0,0};
372 getIdleTimer(&timeout);
373
|
374 h.sterling 1.9 //if no consumer is currently being served and there's no consumer that has pending indications, we are idle
|
375 h.sterling 1.10 if (!_current_operations.get() && !getPendingIndications())
|
376 h.sterling 1.1 {
377 PEG_METHOD_EXIT();
378 return true;
379 }
380
381 PEG_METHOD_EXIT();
382 return false;
383 }
384
|
385 h.sterling 1.6 /** This method waits until the event thread is ready to accept incoming indications. Otherwise, there is a miniscule chance that
386 * the first event will be enqueued before the consumer is waiting for it and the first indication after loading the consumer will be lost.
387 */
388 void DynamicConsumer::waitForEventThread()
389 {
390 _listeningSemaphore->wait();
391 }
392
|
393 h.sterling 1.1 /** This method is called when the consumer is initialized for the first time.
394 * It reads the outstanding requests from the dat file and enqueues them.
395 *
396 * ATTN: This method will only get called when a consumer is initialized. Therefore,
397 * when the listener starts, the outstanding indications for this consumer will not get sent
398 * UNTIL a new indication comes in. This is not really an acceptable scenario. Maybe the consumer
399 * manager needs to check the .dat files upon startup and load if they are not empty.
400 *
401 */
|
402 h.sterling 1.9 void DynamicConsumer::_loadOutstandingIndications(Array<IndicationDispatchEvent> indications)
|
403 h.sterling 1.1 {
404 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::_loadOutstandingIndications");
405
406 //create dispatch events from the instances
|
407 h.sterling 1.9 IndicationDispatchEvent* event = 0;
|
408 h.sterling 1.1 for (Uint32 i=0; i < indications.size(); i++)
409 {
|
410 h.sterling 1.9
|
411 h.sterling 1.1 event = new IndicationDispatchEvent(OperationContext(), //ATTN: Do we need to store this?
|
412 h.sterling 1.9 indications[i].getURL(),
413 indications[i].getIndicationInstance());
414
|
415 mike 1.13 _eventqueue.insert_back(event);
|
416 h.sterling 1.1 }
417
418 //signal the worker thread so it falls into the queue processing code
419 if (_eventqueue.size())
420 {
421 _check_queue->signal();
422 }
423
424 PEG_METHOD_EXIT();
425 }
426
427 /** This method serializes the remaining indications in the queue. It should be called when the
428 * consumer is shutting down. Each time the consumer is loaded, these indications will be
429 * reloaded into the queue. Therefore, the file should be overwritten each time to eliminate
430 * duplicating outstanding indications.
431 *
432 * ATTN: Should we let another method delete the instances?
433 */
|
434 h.sterling 1.9 Array<IndicationDispatchEvent> DynamicConsumer::_retrieveOutstandingIndications()
|
435 h.sterling 1.1 {
436 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::_retrieveOutstandingIndications");
437
|
438 h.sterling 1.9 Array<IndicationDispatchEvent> indications;
|
439 h.sterling 1.1 IndicationDispatchEvent* temp = 0;
440
441 try
442 {
443 _eventqueue.try_lock();
|
444 mike 1.13 temp = _eventqueue.front();
|
445 h.sterling 1.1 while (temp)
446 {
447 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "retrieving");
|
448 h.sterling 1.9 indications.append(*temp);
|
449 mike 1.13 temp = _eventqueue.next_of(temp);
|
450 h.sterling 1.1 }
451 _eventqueue.unlock();
452
453 } catch (...)
454 {
455 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Unknown Exception");
456 }
457
458 PEG_METHOD_EXIT();
459 return indications;
460 }
461
462
463 ////////////////////////////////
464 // IndicationDispatchEvent
465 ////////////////////////////////
466
|
467 h.sterling 1.9 IndicationDispatchEvent::IndicationDispatchEvent()
468 {
469 }
470
|
471 h.sterling 1.1 IndicationDispatchEvent::IndicationDispatchEvent(OperationContext context,
472 String url,
473 CIMInstance instance) :
474 _context(context),
475 _url(url),
476 _instance(instance),
|
477 h.sterling 1.4 _retries(0),
478 _lastAttemptTime(CIMDateTime())
|
479 h.sterling 1.1 {
480 }
481
|
482 mike 1.13 IndicationDispatchEvent::IndicationDispatchEvent(
483 const IndicationDispatchEvent &event) : Linkable(event)
|
484 h.sterling 1.10 {
485 _context = event._context;
486 _url = event._url;
487 _instance = event._instance;
488 _retries = event._retries.get();
489 _lastAttemptTime = event._lastAttemptTime;
490 }
491
|
492 h.sterling 1.1 IndicationDispatchEvent::~IndicationDispatchEvent()
493 {
494 }
495
496 OperationContext IndicationDispatchEvent::getContext() const
497 {
498 return _context;
499 }
500
501 String IndicationDispatchEvent::getURL() const
502 {
503 return _url;
504 }
505
506 CIMInstance IndicationDispatchEvent::getIndicationInstance() const
507 {
508 return _instance;
509 }
510
|
511 mike 1.7 Uint32 IndicationDispatchEvent::getRetries()
|
512 h.sterling 1.1 {
|
513 mike 1.7 return _retries.get();
|
514 h.sterling 1.1 }
515
516 void IndicationDispatchEvent::increaseRetries()
517 {
|
518 h.sterling 1.4 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Increasing retries\n");
|
519 h.sterling 1.1 _retries++;
|
520 h.sterling 1.4 _lastAttemptTime = CIMDateTime::getCurrentDateTime();
521 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Last attempt time " + _lastAttemptTime.toString());
522 }
523
524 CIMDateTime IndicationDispatchEvent::getLastAttemptTime()
525 {
526 return _lastAttemptTime;
|
527 h.sterling 1.1 }
528
|
529 h.sterling 1.9
530 IndicationDispatchEvent& IndicationDispatchEvent::operator=(const IndicationDispatchEvent &event)
531 {
532 _context = event._context;
533 _url = event._url;
534 _instance = event._instance;
|
535 h.sterling 1.10 _retries = event._retries.get();
|
536 h.sterling 1.9 _lastAttemptTime = event._lastAttemptTime;
537
538 return *this;
539 }
540
|
541 h.sterling 1.1 Boolean IndicationDispatchEvent::operator==(const IndicationDispatchEvent& event) const
542 {
543 if (String::equal(this->_url, event._url) &&
544 (this->_instance.identical(event._instance)))
545 {
546 return true;
547 }
548 return false;
549 }
550
551 PEGASUS_NAMESPACE_END
552
|