(file) Return to DynamicConsumer.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

  1 karl  1.11 //%2006////////////////////////////////////////////////////////////////////////
  2 h.sterling 1.1  //
  3                 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4                 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5                 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6                 // IBM Corp.; EMC Corporation, The Open Group.
  7                 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8                 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9                 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10                 // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl       1.11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12                 // EMC Corporation; Symantec Corporation; The Open Group.
 13 h.sterling 1.1  //
 14                 // Permission is hereby granted, free of charge, to any person obtaining a copy
 15                 // of this software and associated documentation files (the "Software"), to
 16                 // deal in the Software without restriction, including without limitation the
 17                 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18                 // sell copies of the Software, and to permit persons to whom the Software is
 19                 // furnished to do so, subject to the following conditions:
 20                 // 
 21                 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22                 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23                 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24                 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25                 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26                 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27                 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28                 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29                 //
 30                 //==============================================================================
 31                 //
 32                 //%/////////////////////////////////////////////////////////////////////////////
 33                 
 34 h.sterling 1.1  #include "DynamicConsumer.h"
 35                 #include "DynamicConsumerFacade.h"
 36                 
 37                 #include <Pegasus/Common/Config.h>
 38 mike       1.14 #include <Pegasus/Common/Time.h>
 39 h.sterling 1.1  #include <Pegasus/Common/System.h>
 40                 #include <Pegasus/Common/Tracer.h>
 41                 #include <Pegasus/Common/XmlWriter.h>
 42                 #include <Pegasus/Common/XmlReader.h>
 43                 #include <Pegasus/Common/XmlParser.h>
 44                 #include <Pegasus/Common/FileSystem.h>
 45                 
 46                 PEGASUS_NAMESPACE_BEGIN
 47                 PEGASUS_USING_STD;
 48                 
 49                 
 50                 DynamicConsumer::DynamicConsumer(): Base(0)
 51                 {
 52                 }
 53                 
 54                 DynamicConsumer::DynamicConsumer(const String& name): 
 55                 Base(0),
 56                 _module(0),
 57 mike       1.13 _eventqueue(),
 58 h.sterling 1.1  _name(name),
 59                 _initialized(false),
 60                 _dieNow(0),
 61                 _no_unload(0)
 62                 {
 63                     _check_queue = new Semaphore(0);
 64                     _shutdownSemaphore = new Semaphore(0);
 65 h.sterling 1.6      _listeningSemaphore = new Semaphore(0);
 66 h.sterling 1.1  }
 67                 
 68                 //ATTN: For migration from old listener -- do we want to support it?
 69                 DynamicConsumer::DynamicConsumer(const String & name,
 70                                                  ConsumerModule* consumerModule,
 71                                                  CIMIndicationConsumerProvider* consumerRef) :
 72                 Base(consumerRef),
 73                 _module(consumerModule),
 74 mike       1.13 _eventqueue(),
 75 h.sterling 1.1  _name(name),
 76                 _initialized(false),
 77                 _dieNow(0),
 78                 _no_unload(0)
 79                 {
 80                     _check_queue = new Semaphore(0);
 81                     _shutdownSemaphore = new Semaphore(0);
 82 h.sterling 1.6      _listeningSemaphore = new Semaphore(0);
 83 h.sterling 1.1  }
 84                 
 85                 DynamicConsumer::~DynamicConsumer(void)
 86                 {
 87                     //delete any outstanding events
 88                     IndicationDispatchEvent* event;
 89                     while (_eventqueue.size())
 90                     {
 91 mike       1.13         event = _eventqueue.remove_front();
 92 h.sterling 1.1          delete event;
 93                     }
 94                 
 95                     //delete semaphores
 96 kumpf      1.12     delete _check_queue;
 97 h.sterling 1.1  
 98 kumpf      1.12     delete _shutdownSemaphore;
 99 h.sterling 1.6  
100 kumpf      1.12     delete _listeningSemaphore;
101 h.sterling 1.1  }
102                 
103                 CIMIndicationConsumerProvider* DynamicConsumer::getConsumer()
104                 {
105                     return(_consumer);
106                 }
107                 
108                 ConsumerModule* DynamicConsumer::getModule(void) const
109                 {
110                     return(_module);
111                 }
112                 
113                 String DynamicConsumer::getName(void) const
114                 {
115                     return(_name);
116                 }
117                 
118                 Boolean DynamicConsumer::isLoaded(void) const
119                 {
120                     return(_module == 0 ? false : true);
121                 }
122 h.sterling 1.1  
123                 Boolean DynamicConsumer::isInitialized(void) const
124                 {
125                     return(_initialized);
126                 }
127                 
128                 /** Initializes the consumer.
129                  *  Caller assumes responsibility for catching exceptions thrown by this method.
130                  */ 
131                 void DynamicConsumer::initialize()
132                 {
133                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::initialize");
134                 
135                     if (!_initialized)
136                     {
137                         try
138                         {
139                             //there is no cimom handle in the listener, so pass null
140                             CIMOMHandle* handle = 0;
141                             DynamicConsumerFacade::initialize(*(handle));
142                 
143 h.sterling 1.1              updateIdleTimer();
144                             _initialized = true;
145                 
146 marek      1.16             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3, "Successfully initialized consumer.");
147 h.sterling 1.1  
148                         } catch (...)
149                         {
150                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
151                                              "Exception caught in DynamicConsumerFacade::initialize for " +
152                                              _name);
153                             throw;
154                         }
155                     }
156                 
157                     PEG_METHOD_EXIT();
158                 }
159                 
160                 void DynamicConsumer::setShutdownSemaphore(Semaphore* shutdownSemaphore)
161                 {
162                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::setShutdownSemaphore");
163                 
164                     _shutdownSemaphore = shutdownSemaphore;
165                 
166                     PEG_METHOD_EXIT();
167                 }
168 h.sterling 1.1  
169                 Semaphore* DynamicConsumer::getShutdownSemaphore()
170                 {
171                     return _shutdownSemaphore;
172                 }
173                 
174                 void DynamicConsumer::sendShutdownSignal()
175                 {
176                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::sendShutdownSignal");
177                 
178                     _dieNow = true;
179                     _check_queue->signal();
180                 
181                     PEG_METHOD_EXIT();
182                 }
183                 
184                 void DynamicConsumer::terminate(void)
185                 {
186                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::terminate");
187                 
188                     if (_initialized)
189 h.sterling 1.1      {
190                         //terminate consumer
191                         try
192                         {
193                             DynamicConsumerFacade::terminate();
194                 
195                         } catch (...)
196                         {
197                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
198                                              "Exception caught in DynamicConsumerFacade::Terminate for " +
199                                              _name);
200                             throw;
201                         }
202                 
203                         //update status
204                         _initialized = false;
205                         _dieNow = false;
206                     }
207                 
208                     PEG_METHOD_EXIT();
209                 }
210 h.sterling 1.1  
211                 /** This method should be called after the physical consumer is loaded and before initialization.  
212                  */ 
213                 void DynamicConsumer::set(ConsumerModule* consumerModule,
214                                           CIMIndicationConsumerProvider* consumerRef)
215                 {
216                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::set");
217                 
218                     if (_initialized)
219                     {
220 h.sterling 1.4          throw Exception(MessageLoaderParms("DynListener.DynamicConsumer.CONSUMER_INVALID_STATE",
221                                                            "Error: The consumer is not in the correct state to perform the operation."));
222 h.sterling 1.1      }
223                 
224                     _module = consumerModule;
225                     _consumer = consumerRef;
226                 
227                     PEG_METHOD_EXIT();
228                 }
229                 
230                 /** This method should be called after the consumer is terminated and the module is unloaded.  Note that we cannot test
231                  * for a loaded condition, since the _module reference here may still exist (if more than one consumer is using the module).
232                  * Simply test whether the consumer is initialized.  If it was terminated properly, initialized will be false and the _module
233                  * ref count will be decremented.
234                  */
235                 void DynamicConsumer::reset()
236                 {
237                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::reset");
238                 
239                     if (_initialized)
240                     {
241 h.sterling 1.4          throw Exception(MessageLoaderParms("DynListener.DynamicConsumer.CONSUMER_INVALID_STATE",
242                                                            "Error: The consumer is not in the correct state to perform the operation."));
243 h.sterling 1.1      }
244                 
245                     _module = 0;    // do not delete it, that is taken care of in ConsumerModule itself 
246                     _consumer = 0;  // ATTN: attempting to delete this causes an exception -- why??
247                 
248 marek      1.16     PEG_TRACE((__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
249 h.sterling 1.1                    "Deleting %d outstanding requests for %s",
250                                   _eventqueue.size(),
251 marek      1.16                   (const char*)_name.getCString()));
252 h.sterling 1.1  
253                     //delete outstanding requests
254                     IndicationDispatchEvent* event = 0;
255                     for (Uint32 i = 0; i < _eventqueue.size(); i++)
256                     {
257 mike       1.13         event = _eventqueue.remove_front();
258 h.sterling 1.1          delete event;
259                     }
260                 
261                     PEG_METHOD_EXIT();
262                 }
263                 
264                 void DynamicConsumer::enqueueEvent(IndicationDispatchEvent* event)
265                 {
266                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::enqueueEvent");
267                 
268                     if (!isLoaded())
269                     {
270 marek      1.16         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3, "Error: The consumer is not loaded and therefore cannot handle events.");
271 h.sterling 1.1          return;
272                     }
273                 
274                     try
275                     {
276                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "enqueueEvent before " + _name);
277 h.sterling 1.8          // Our event queue is first in first out.
278 mike       1.13         _eventqueue.insert_back(event);
279 h.sterling 1.1          _check_queue->signal();
280                 
281                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "enqueueEvent after " + _name);
282                 
283                     } catch (Exception& ex)
284                     {
285                         //ATTN: Log missed indication
286                         PEGASUS_STD(cout) << "Error enqueueing event" << ex.getMessage() << "\n";
287                 
288                     } catch (...)
289                     {
290                         //ATTN: Log missed indication
291                         PEGASUS_STD(cout) << "Unknown exception";
292                     }
293                 
294                     PEG_METHOD_EXIT();
295                 }
296                 
297                 void DynamicConsumer::getIdleTimer(struct timeval *tv)
298                 {
299                     if (tv == 0)
300 h.sterling 1.1      {
301                         return;
302                     }
303                 
304                     try
305                     {
306                         AutoMutex lock(_idleTimeMutex);
307                         memcpy(tv, &_idleTime, sizeof(struct timeval));
308                     } catch (...)
309                     {
310 mike       1.14         Time::gettimeofday(tv);
311 h.sterling 1.1      }
312                 }
313                 
314                 void DynamicConsumer::updateIdleTimer()
315                 {
316                     try
317                     {
318                         AutoMutex lock(_idleTimeMutex);
319 mike       1.14         Time::gettimeofday(&_idleTime);
320 h.sterling 1.1  
321                     } catch (...)
322                     {
323                     }
324                 }
325                 
326                 Uint32 DynamicConsumer::getPendingIndications()
327                 {
328                     return _eventqueue.size();
329                 }
330                 
331                 String DynamicConsumer::toString()
332                 {
333                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::toString");
334                 
335 kumpf      1.17     String buffer;
336 h.sterling 1.1      if (_initialized)
337                     {
338                         buffer.append("Consumer " + _name + " is initialized.\n");
339                         buffer.append("Module name " + _module->getFileName() + "\n");
340                     }
341                 
342                     PEG_METHOD_EXIT();
343                     return buffer;
344                 }
345                 
346                 /** Returns true if the consumer has been inactive for longer than the idle period.
347                  */ 
348                 Boolean DynamicConsumer::isIdle() 
349                 {
350                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::isIdle");
351                 
352                     if (!isLoaded())
353                     {
354 marek      1.16         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer is not loaded.");
355 h.sterling 1.1          return false;
356                     }
357                 
358                     struct timeval now;
359 mike       1.14     Time::gettimeofday(&now);
360 h.sterling 1.1  
361                     struct timeval timeout = {0,0};
362                     getIdleTimer(&timeout);
363                 
364 h.sterling 1.9  	//if no consumer is currently being served and there's no consumer that has pending indications, we are idle
365 h.sterling 1.10     if (!_current_operations.get() && !getPendingIndications())
366 h.sterling 1.1      {
367                         PEG_METHOD_EXIT();
368                         return true;
369                     }
370                 
371                     PEG_METHOD_EXIT();
372                     return false;
373                 }
374                 
375 h.sterling 1.6  /** This method waits until the event thread is ready to accept incoming indications.  Otherwise, there is a miniscule chance that 
376                  * the first event will be enqueued before the consumer is waiting for it and the first indication after loading the consumer will be lost.
377                  */ 
378                 void DynamicConsumer::waitForEventThread()
379                 {
380                     _listeningSemaphore->wait();
381                 }
382                 
383 h.sterling 1.1  /** This method is called when the consumer is initialized for the first time.
384                  *  It reads the outstanding requests from the dat file and enqueues them.
385                  * 
386                  * ATTN: This method will only get called when a consumer is initialized.  Therefore,
387                  * when the listener starts, the outstanding indications for this consumer will not get sent
388                  * UNTIL a new indication comes in.  This is not really an acceptable scenario.  Maybe the consumer
389                  * manager needs to check the .dat files upon startup and load if they are not empty.
390                  * 
391                  */ 
392 h.sterling 1.9  void DynamicConsumer::_loadOutstandingIndications(Array<IndicationDispatchEvent> indications)
393 h.sterling 1.1  {
394                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::_loadOutstandingIndications");
395                 
396                     //create dispatch events from the instances
397 h.sterling 1.9      IndicationDispatchEvent* event = 0;
398 h.sterling 1.1      for (Uint32 i=0; i < indications.size(); i++)
399                     {
400 h.sterling 1.9  		
401 h.sterling 1.1          event = new IndicationDispatchEvent(OperationContext(),  //ATTN: Do we need to store this?
402 h.sterling 1.9                                              indications[i].getURL(),
403                                                             indications[i].getIndicationInstance());
404                 		
405 mike       1.13         _eventqueue.insert_back(event);
406 h.sterling 1.1      }
407                 
408                     //signal the worker thread so it falls into the queue processing code
409                     if (_eventqueue.size())
410                     {
411                         _check_queue->signal();
412                     }
413                 
414                     PEG_METHOD_EXIT();
415                 }
416                 
417                 /** This method serializes the remaining indications in the queue. It should be called when the
418                  * consumer is shutting down.  Each time the consumer is loaded, these indications will be
419                  * reloaded into the queue.  Therefore, the file should be overwritten each time to eliminate
420                  * duplicating outstanding indications.
421                  * 
422                  * ATTN: Should we let another method delete the instances?
423                  */ 
424 h.sterling 1.9  Array<IndicationDispatchEvent> DynamicConsumer::_retrieveOutstandingIndications()
425 h.sterling 1.1  {
426                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::_retrieveOutstandingIndications");
427                 
428 h.sterling 1.9      Array<IndicationDispatchEvent> indications;
429 h.sterling 1.1      IndicationDispatchEvent* temp = 0;
430                 
431                     try
432                     {
433                         _eventqueue.try_lock();
434 mike       1.13         temp = _eventqueue.front();
435 h.sterling 1.1          while (temp)
436                         {
437 marek      1.16             PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "retrieving");
438 h.sterling 1.9  			indications.append(*temp);
439 mike       1.13             temp = _eventqueue.next_of(temp);
440 h.sterling 1.1          }
441                         _eventqueue.unlock();
442                 
443                     } catch (...)
444                     {
445 marek      1.16         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Unknown Exception");
446 h.sterling 1.1      }
447                 
448                     PEG_METHOD_EXIT();
449                     return indications;
450                 }
451                 
452                 
453                 ////////////////////////////////
454                 // IndicationDispatchEvent
455                 ////////////////////////////////
456                 
457 h.sterling 1.9  IndicationDispatchEvent::IndicationDispatchEvent()
458                 {
459                 }
460                 
461 h.sterling 1.1  IndicationDispatchEvent::IndicationDispatchEvent(OperationContext context,
462                                                                  String url,
463                                                                  CIMInstance instance) :
464                 _context(context),
465                 _url(url),
466                 _instance(instance),
467 h.sterling 1.4  _retries(0),
468                 _lastAttemptTime(CIMDateTime())
469 h.sterling 1.1  {
470                 }
471                 
472 mike       1.13 IndicationDispatchEvent::IndicationDispatchEvent(
473                     const IndicationDispatchEvent &event) : Linkable(event)
474 h.sterling 1.10 {
475                     _context = event._context;
476                     _url = event._url;
477                     _instance = event._instance;
478                     _retries = event._retries.get();
479                     _lastAttemptTime = event._lastAttemptTime;
480                 }
481                 
482 h.sterling 1.1  IndicationDispatchEvent::~IndicationDispatchEvent()
483                 {
484                 }
485                 
486                 OperationContext IndicationDispatchEvent::getContext() const
487                 {
488                     return _context;
489                 }
490                 
491                 String IndicationDispatchEvent::getURL() const
492                 {
493                     return _url;
494                 }
495                 
496                 CIMInstance IndicationDispatchEvent::getIndicationInstance() const
497                 {
498                     return _instance;
499                 }
500                 
501 mike       1.7  Uint32 IndicationDispatchEvent::getRetries()
502 h.sterling 1.1  {
503 mike       1.7      return _retries.get();
504 h.sterling 1.1  }
505                 
506                 void IndicationDispatchEvent::increaseRetries()
507                 {
508 marek      1.16     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Increasing retries\n");
509 h.sterling 1.1      _retries++;
510 h.sterling 1.4      _lastAttemptTime = CIMDateTime::getCurrentDateTime();
511                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Last attempt time " + _lastAttemptTime.toString());
512                 }
513                 
514                 CIMDateTime IndicationDispatchEvent::getLastAttemptTime()
515                 {
516                     return _lastAttemptTime;
517 h.sterling 1.1  }
518                 
519 h.sterling 1.9  
520                 IndicationDispatchEvent& IndicationDispatchEvent::operator=(const IndicationDispatchEvent &event)
521                 {
522                 	_context = event._context;
523                 	_url = event._url;
524                 	_instance = event._instance;
525 h.sterling 1.10 	_retries = event._retries.get();
526 h.sterling 1.9  	_lastAttemptTime = event._lastAttemptTime;
527                 
528                     return *this;
529                 }
530                 
531 h.sterling 1.1  Boolean IndicationDispatchEvent::operator==(const IndicationDispatchEvent& event) const
532                 {
533                     if (String::equal(this->_url, event._url) && 
534                         (this->_instance.identical(event._instance)))
535                     {
536                         return true;
537                     }
538                     return false;
539                 }
540                 
541                 PEGASUS_NAMESPACE_END
542                 

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2