1 karl 1.11 //%2006////////////////////////////////////////////////////////////////////////
|
2 h.sterling 1.1 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 h.sterling 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 h.sterling 1.1 #include "DynamicConsumer.h"
35 #include "DynamicConsumerFacade.h"
36
37 #include <Pegasus/Common/Config.h>
|
38 mike 1.14 #include <Pegasus/Common/Time.h>
|
39 h.sterling 1.1 #include <Pegasus/Common/System.h>
40 #include <Pegasus/Common/Tracer.h>
41 #include <Pegasus/Common/XmlWriter.h>
42 #include <Pegasus/Common/XmlReader.h>
43 #include <Pegasus/Common/XmlParser.h>
44 #include <Pegasus/Common/FileSystem.h>
45
46 PEGASUS_NAMESPACE_BEGIN
47 PEGASUS_USING_STD;
48
49
50 DynamicConsumer::DynamicConsumer(): Base(0)
51 {
52 }
53
54 DynamicConsumer::DynamicConsumer(const String& name):
55 Base(0),
56 _module(0),
|
57 mike 1.13 _eventqueue(),
|
58 h.sterling 1.1 _name(name),
59 _initialized(false),
60 _dieNow(0),
61 _no_unload(0)
62 {
63 _check_queue = new Semaphore(0);
64 _shutdownSemaphore = new Semaphore(0);
|
65 h.sterling 1.6 _listeningSemaphore = new Semaphore(0);
|
66 h.sterling 1.1 }
67
68 //ATTN: For migration from old listener -- do we want to support it?
69 DynamicConsumer::DynamicConsumer(const String & name,
70 ConsumerModule* consumerModule,
71 CIMIndicationConsumerProvider* consumerRef) :
72 Base(consumerRef),
73 _module(consumerModule),
|
74 mike 1.13 _eventqueue(),
|
75 h.sterling 1.1 _name(name),
76 _initialized(false),
77 _dieNow(0),
78 _no_unload(0)
79 {
80 _check_queue = new Semaphore(0);
81 _shutdownSemaphore = new Semaphore(0);
|
82 h.sterling 1.6 _listeningSemaphore = new Semaphore(0);
|
83 h.sterling 1.1 }
84
85 DynamicConsumer::~DynamicConsumer(void)
86 {
87 //delete any outstanding events
88 IndicationDispatchEvent* event;
89 while (_eventqueue.size())
90 {
|
91 mike 1.13 event = _eventqueue.remove_front();
|
92 h.sterling 1.1 delete event;
93 }
94
95 //delete semaphores
|
96 kumpf 1.12 delete _check_queue;
|
97 h.sterling 1.1
|
98 kumpf 1.12 delete _shutdownSemaphore;
|
99 h.sterling 1.6
|
100 kumpf 1.12 delete _listeningSemaphore;
|
101 h.sterling 1.1 }
102
103 CIMIndicationConsumerProvider* DynamicConsumer::getConsumer()
104 {
105 return(_consumer);
106 }
107
108 ConsumerModule* DynamicConsumer::getModule(void) const
109 {
110 return(_module);
111 }
112
113 String DynamicConsumer::getName(void) const
114 {
115 return(_name);
116 }
117
118 Boolean DynamicConsumer::isLoaded(void) const
119 {
120 return(_module == 0 ? false : true);
121 }
122 h.sterling 1.1
123 Boolean DynamicConsumer::isInitialized(void) const
124 {
125 return(_initialized);
126 }
127
128 /** Initializes the consumer.
129 * Caller assumes responsibility for catching exceptions thrown by this method.
130 */
131 void DynamicConsumer::initialize()
132 {
133 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::initialize");
134
135 if (!_initialized)
136 {
137 try
138 {
139 //there is no cimom handle in the listener, so pass null
140 CIMOMHandle* handle = 0;
141 DynamicConsumerFacade::initialize(*(handle));
142
143 h.sterling 1.1 updateIdleTimer();
144 _initialized = true;
145
|
146 marek 1.16 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3, "Successfully initialized consumer.");
|
147 h.sterling 1.1
148 } catch (...)
149 {
150 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
151 "Exception caught in DynamicConsumerFacade::initialize for " +
152 _name);
153 throw;
154 }
155 }
156
157 PEG_METHOD_EXIT();
158 }
159
160 void DynamicConsumer::setShutdownSemaphore(Semaphore* shutdownSemaphore)
161 {
162 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::setShutdownSemaphore");
163
164 _shutdownSemaphore = shutdownSemaphore;
165
166 PEG_METHOD_EXIT();
167 }
168 h.sterling 1.1
169 Semaphore* DynamicConsumer::getShutdownSemaphore()
170 {
171 return _shutdownSemaphore;
172 }
173
174 void DynamicConsumer::sendShutdownSignal()
175 {
176 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::sendShutdownSignal");
177
178 _dieNow = true;
179 _check_queue->signal();
180
181 PEG_METHOD_EXIT();
182 }
183
184 void DynamicConsumer::terminate(void)
185 {
186 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::terminate");
187
188 if (_initialized)
189 h.sterling 1.1 {
190 //terminate consumer
191 try
192 {
193 DynamicConsumerFacade::terminate();
194
195 } catch (...)
196 {
197 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
198 "Exception caught in DynamicConsumerFacade::Terminate for " +
199 _name);
200 throw;
201 }
202
203 //update status
204 _initialized = false;
205 _dieNow = false;
206 }
207
208 PEG_METHOD_EXIT();
209 }
210 h.sterling 1.1
211 /** This method should be called after the physical consumer is loaded and before initialization.
212 */
213 void DynamicConsumer::set(ConsumerModule* consumerModule,
214 CIMIndicationConsumerProvider* consumerRef)
215 {
216 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::set");
217
218 if (_initialized)
219 {
|
220 h.sterling 1.4 throw Exception(MessageLoaderParms("DynListener.DynamicConsumer.CONSUMER_INVALID_STATE",
221 "Error: The consumer is not in the correct state to perform the operation."));
|
222 h.sterling 1.1 }
223
224 _module = consumerModule;
225 _consumer = consumerRef;
226
227 PEG_METHOD_EXIT();
228 }
229
230 /** This method should be called after the consumer is terminated and the module is unloaded. Note that we cannot test
231 * for a loaded condition, since the _module reference here may still exist (if more than one consumer is using the module).
232 * Simply test whether the consumer is initialized. If it was terminated properly, initialized will be false and the _module
233 * ref count will be decremented.
234 */
235 void DynamicConsumer::reset()
236 {
237 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::reset");
238
239 if (_initialized)
240 {
|
241 h.sterling 1.4 throw Exception(MessageLoaderParms("DynListener.DynamicConsumer.CONSUMER_INVALID_STATE",
242 "Error: The consumer is not in the correct state to perform the operation."));
|
243 h.sterling 1.1 }
244
245 _module = 0; // do not delete it, that is taken care of in ConsumerModule itself
246 _consumer = 0; // ATTN: attempting to delete this causes an exception -- why??
247
|
248 marek 1.16 PEG_TRACE((__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
|
249 h.sterling 1.1 "Deleting %d outstanding requests for %s",
250 _eventqueue.size(),
|
251 marek 1.16 (const char*)_name.getCString()));
|
252 h.sterling 1.1
253 //delete outstanding requests
254 IndicationDispatchEvent* event = 0;
255 for (Uint32 i = 0; i < _eventqueue.size(); i++)
256 {
|
257 mike 1.13 event = _eventqueue.remove_front();
|
258 h.sterling 1.1 delete event;
259 }
260
261 PEG_METHOD_EXIT();
262 }
263
264 void DynamicConsumer::enqueueEvent(IndicationDispatchEvent* event)
265 {
266 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::enqueueEvent");
267
268 if (!isLoaded())
269 {
|
270 marek 1.16 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3, "Error: The consumer is not loaded and therefore cannot handle events.");
|
271 h.sterling 1.1 return;
272 }
273
274 try
275 {
276 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "enqueueEvent before " + _name);
|
277 h.sterling 1.8 // Our event queue is first in first out.
|
278 mike 1.13 _eventqueue.insert_back(event);
|
279 h.sterling 1.1 _check_queue->signal();
280
281 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "enqueueEvent after " + _name);
282
283 } catch (Exception& ex)
284 {
285 //ATTN: Log missed indication
286 PEGASUS_STD(cout) << "Error enqueueing event" << ex.getMessage() << "\n";
287
288 } catch (...)
289 {
290 //ATTN: Log missed indication
291 PEGASUS_STD(cout) << "Unknown exception";
292 }
293
294 PEG_METHOD_EXIT();
295 }
296
297 void DynamicConsumer::getIdleTimer(struct timeval *tv)
298 {
299 if (tv == 0)
300 h.sterling 1.1 {
301 return;
302 }
303
304 try
305 {
306 AutoMutex lock(_idleTimeMutex);
307 memcpy(tv, &_idleTime, sizeof(struct timeval));
308 } catch (...)
309 {
|
310 mike 1.14 Time::gettimeofday(tv);
|
311 h.sterling 1.1 }
312 }
313
314 void DynamicConsumer::updateIdleTimer()
315 {
316 try
317 {
318 AutoMutex lock(_idleTimeMutex);
|
319 mike 1.14 Time::gettimeofday(&_idleTime);
|
320 h.sterling 1.1
321 } catch (...)
322 {
323 }
324 }
325
326 Uint32 DynamicConsumer::getPendingIndications()
327 {
328 return _eventqueue.size();
329 }
330
331 String DynamicConsumer::toString()
332 {
333 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::toString");
334
|
335 kumpf 1.17 String buffer;
|
336 h.sterling 1.1 if (_initialized)
337 {
338 buffer.append("Consumer " + _name + " is initialized.\n");
339 buffer.append("Module name " + _module->getFileName() + "\n");
340 }
341
342 PEG_METHOD_EXIT();
343 return buffer;
344 }
345
346 /** Returns true if the consumer has been inactive for longer than the idle period.
347 */
348 Boolean DynamicConsumer::isIdle()
349 {
350 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::isIdle");
351
352 if (!isLoaded())
353 {
|
354 marek 1.16 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer is not loaded.");
|
355 h.sterling 1.1 return false;
356 }
357
358 struct timeval now;
|
359 mike 1.14 Time::gettimeofday(&now);
|
360 h.sterling 1.1
361 struct timeval timeout = {0,0};
362 getIdleTimer(&timeout);
363
|
364 h.sterling 1.9 //if no consumer is currently being served and there's no consumer that has pending indications, we are idle
|
365 h.sterling 1.10 if (!_current_operations.get() && !getPendingIndications())
|
366 h.sterling 1.1 {
367 PEG_METHOD_EXIT();
368 return true;
369 }
370
371 PEG_METHOD_EXIT();
372 return false;
373 }
374
|
375 h.sterling 1.6 /** This method waits until the event thread is ready to accept incoming indications. Otherwise, there is a miniscule chance that
376 * the first event will be enqueued before the consumer is waiting for it and the first indication after loading the consumer will be lost.
377 */
378 void DynamicConsumer::waitForEventThread()
379 {
380 _listeningSemaphore->wait();
381 }
382
|
383 h.sterling 1.1 /** This method is called when the consumer is initialized for the first time.
384 * It reads the outstanding requests from the dat file and enqueues them.
385 *
386 * ATTN: This method will only get called when a consumer is initialized. Therefore,
387 * when the listener starts, the outstanding indications for this consumer will not get sent
388 * UNTIL a new indication comes in. This is not really an acceptable scenario. Maybe the consumer
389 * manager needs to check the .dat files upon startup and load if they are not empty.
390 *
391 */
|
392 h.sterling 1.9 void DynamicConsumer::_loadOutstandingIndications(Array<IndicationDispatchEvent> indications)
|
393 h.sterling 1.1 {
394 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::_loadOutstandingIndications");
395
396 //create dispatch events from the instances
|
397 h.sterling 1.9 IndicationDispatchEvent* event = 0;
|
398 h.sterling 1.1 for (Uint32 i=0; i < indications.size(); i++)
399 {
|
400 h.sterling 1.9
|
401 h.sterling 1.1 event = new IndicationDispatchEvent(OperationContext(), //ATTN: Do we need to store this?
|
402 h.sterling 1.9 indications[i].getURL(),
403 indications[i].getIndicationInstance());
404
|
405 mike 1.13 _eventqueue.insert_back(event);
|
406 h.sterling 1.1 }
407
408 //signal the worker thread so it falls into the queue processing code
409 if (_eventqueue.size())
410 {
411 _check_queue->signal();
412 }
413
414 PEG_METHOD_EXIT();
415 }
416
417 /** This method serializes the remaining indications in the queue. It should be called when the
418 * consumer is shutting down. Each time the consumer is loaded, these indications will be
419 * reloaded into the queue. Therefore, the file should be overwritten each time to eliminate
420 * duplicating outstanding indications.
421 *
422 * ATTN: Should we let another method delete the instances?
423 */
|
424 h.sterling 1.9 Array<IndicationDispatchEvent> DynamicConsumer::_retrieveOutstandingIndications()
|
425 h.sterling 1.1 {
426 PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::_retrieveOutstandingIndications");
427
|
428 h.sterling 1.9 Array<IndicationDispatchEvent> indications;
|
429 h.sterling 1.1 IndicationDispatchEvent* temp = 0;
430
431 try
432 {
433 _eventqueue.try_lock();
|
434 mike 1.13 temp = _eventqueue.front();
|
435 h.sterling 1.1 while (temp)
436 {
|
437 marek 1.16 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "retrieving");
|
438 h.sterling 1.9 indications.append(*temp);
|
439 mike 1.13 temp = _eventqueue.next_of(temp);
|
440 h.sterling 1.1 }
441 _eventqueue.unlock();
442
443 } catch (...)
444 {
|
445 marek 1.16 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Unknown Exception");
|
446 h.sterling 1.1 }
447
448 PEG_METHOD_EXIT();
449 return indications;
450 }
451
452
453 ////////////////////////////////
454 // IndicationDispatchEvent
455 ////////////////////////////////
456
|
457 h.sterling 1.9 IndicationDispatchEvent::IndicationDispatchEvent()
458 {
459 }
460
|
461 h.sterling 1.1 IndicationDispatchEvent::IndicationDispatchEvent(OperationContext context,
462 String url,
463 CIMInstance instance) :
464 _context(context),
465 _url(url),
466 _instance(instance),
|
467 h.sterling 1.4 _retries(0),
468 _lastAttemptTime(CIMDateTime())
|
469 h.sterling 1.1 {
470 }
471
|
472 mike 1.13 IndicationDispatchEvent::IndicationDispatchEvent(
473 const IndicationDispatchEvent &event) : Linkable(event)
|
474 h.sterling 1.10 {
475 _context = event._context;
476 _url = event._url;
477 _instance = event._instance;
478 _retries = event._retries.get();
479 _lastAttemptTime = event._lastAttemptTime;
480 }
481
|
482 h.sterling 1.1 IndicationDispatchEvent::~IndicationDispatchEvent()
483 {
484 }
485
486 OperationContext IndicationDispatchEvent::getContext() const
487 {
488 return _context;
489 }
490
491 String IndicationDispatchEvent::getURL() const
492 {
493 return _url;
494 }
495
496 CIMInstance IndicationDispatchEvent::getIndicationInstance() const
497 {
498 return _instance;
499 }
500
|
501 mike 1.7 Uint32 IndicationDispatchEvent::getRetries()
|
502 h.sterling 1.1 {
|
503 mike 1.7 return _retries.get();
|
504 h.sterling 1.1 }
505
506 void IndicationDispatchEvent::increaseRetries()
507 {
|
508 marek 1.16 PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Increasing retries\n");
|
509 h.sterling 1.1 _retries++;
|
510 h.sterling 1.4 _lastAttemptTime = CIMDateTime::getCurrentDateTime();
511 PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Last attempt time " + _lastAttemptTime.toString());
512 }
513
514 CIMDateTime IndicationDispatchEvent::getLastAttemptTime()
515 {
516 return _lastAttemptTime;
|
517 h.sterling 1.1 }
518
|
519 h.sterling 1.9
520 IndicationDispatchEvent& IndicationDispatchEvent::operator=(const IndicationDispatchEvent &event)
521 {
522 _context = event._context;
523 _url = event._url;
524 _instance = event._instance;
|
525 h.sterling 1.10 _retries = event._retries.get();
|
526 h.sterling 1.9 _lastAttemptTime = event._lastAttemptTime;
527
528 return *this;
529 }
530
|
531 h.sterling 1.1 Boolean IndicationDispatchEvent::operator==(const IndicationDispatchEvent& event) const
532 {
533 if (String::equal(this->_url, event._url) &&
534 (this->_instance.identical(event._instance)))
535 {
536 return true;
537 }
538 return false;
539 }
540
541 PEGASUS_NAMESPACE_END
542
|