(file) Return to DynamicConsumer.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

  1 karl  1.11 //%2006////////////////////////////////////////////////////////////////////////
  2 h.sterling 1.1  //
  3                 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4                 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5                 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6                 // IBM Corp.; EMC Corporation, The Open Group.
  7                 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8                 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9                 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10                 // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl       1.11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12                 // EMC Corporation; Symantec Corporation; The Open Group.
 13 h.sterling 1.1  //
 14                 // Permission is hereby granted, free of charge, to any person obtaining a copy
 15                 // of this software and associated documentation files (the "Software"), to
 16                 // deal in the Software without restriction, including without limitation the
 17                 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18                 // sell copies of the Software, and to permit persons to whom the Software is
 19                 // furnished to do so, subject to the following conditions:
 20                 // 
 21                 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22                 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23                 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24                 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25                 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26                 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27                 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28                 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29                 //
 30                 //==============================================================================
 31                 //
 32                 // Author: Heather Sterling (hsterl@us.ibm.com)
 33                 //
 34 h.sterling 1.1  // Modified By: 
 35                 //
 36                 //%/////////////////////////////////////////////////////////////////////////////
 37                 
 38                 #include "DynamicConsumer.h"
 39                 #include "DynamicConsumerFacade.h"
 40                 
 41                 #include <Pegasus/Common/Config.h>
 42 mike       1.14 #include <Pegasus/Common/Time.h>
 43 h.sterling 1.1  #include <Pegasus/Common/System.h>
 44                 #include <Pegasus/Common/Tracer.h>
 45                 #include <Pegasus/Common/XmlWriter.h>
 46                 #include <Pegasus/Common/XmlReader.h>
 47                 #include <Pegasus/Common/XmlParser.h>
 48                 #include <Pegasus/Common/FileSystem.h>
 49                 
 50                 PEGASUS_NAMESPACE_BEGIN
 51                 PEGASUS_USING_STD;
 52                 
 53                 
 54                 DynamicConsumer::DynamicConsumer(): Base(0)
 55                 {
 56                 }
 57                 
 58                 DynamicConsumer::DynamicConsumer(const String& name): 
 59                 Base(0),
 60                 _module(0),
 61 mike       1.13 _eventqueue(),
 62 h.sterling 1.1  _name(name),
 63                 _initialized(false),
 64                 _dieNow(0),
 65                 _no_unload(0)
 66                 {
 67                     _check_queue = new Semaphore(0);
 68                     _shutdownSemaphore = new Semaphore(0);
 69 h.sterling 1.6      _listeningSemaphore = new Semaphore(0);
 70 h.sterling 1.1  }
 71                 
 72                 //ATTN: For migration from old listener -- do we want to support it?
 73                 DynamicConsumer::DynamicConsumer(const String & name,
 74                                                  ConsumerModule* consumerModule,
 75                                                  CIMIndicationConsumerProvider* consumerRef) :
 76                 Base(consumerRef),
 77                 _module(consumerModule),
 78 mike       1.13 _eventqueue(),
 79 h.sterling 1.1  _name(name),
 80                 _initialized(false),
 81                 _dieNow(0),
 82                 _no_unload(0)
 83                 {
 84                     _check_queue = new Semaphore(0);
 85                     _shutdownSemaphore = new Semaphore(0);
 86 h.sterling 1.6      _listeningSemaphore = new Semaphore(0);
 87 h.sterling 1.1  }
 88                 
 89                 DynamicConsumer::~DynamicConsumer(void)
 90                 {
 91                     //delete any outstanding events
 92                     IndicationDispatchEvent* event;
 93                     while (_eventqueue.size())
 94                     {
 95 mike       1.13         event = _eventqueue.remove_front();
 96 h.sterling 1.1          delete event;
 97                     }
 98                 
 99                     //delete semaphores
100 kumpf      1.12     delete _check_queue;
101 h.sterling 1.1  
102 kumpf      1.12     delete _shutdownSemaphore;
103 h.sterling 1.6  
104 kumpf      1.12     delete _listeningSemaphore;
105 h.sterling 1.1  }
106                 
107                 CIMIndicationConsumerProvider* DynamicConsumer::getConsumer()
108                 {
109                     return(_consumer);
110                 }
111                 
112                 ConsumerModule* DynamicConsumer::getModule(void) const
113                 {
114                     return(_module);
115                 }
116                 
117                 String DynamicConsumer::getName(void) const
118                 {
119                     return(_name);
120                 }
121                 
122                 Boolean DynamicConsumer::isLoaded(void) const
123                 {
124                     return(_module == 0 ? false : true);
125                 }
126 h.sterling 1.1  
127                 Boolean DynamicConsumer::isInitialized(void) const
128                 {
129                     return(_initialized);
130                 }
131                 
132                 /** Initializes the consumer.
133                  *  Caller assumes responsibility for catching exceptions thrown by this method.
134                  */ 
135                 void DynamicConsumer::initialize()
136                 {
137                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::initialize");
138                 
139                     if (!_initialized)
140                     {
141                         // yield before a potentially lengthy operation.
142 mike       1.14         Threads::yield();
143 h.sterling 1.1  
144                         try
145                         {
146                             //there is no cimom handle in the listener, so pass null
147                             CIMOMHandle* handle = 0;
148                             DynamicConsumerFacade::initialize(*(handle));
149                 
150                             updateIdleTimer();
151                             _initialized = true;
152                 
153                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Successfully initialized consumer.");
154                 
155                         } catch (...)
156                         {
157                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
158                                              "Exception caught in DynamicConsumerFacade::initialize for " +
159                                              _name);
160                             throw;
161                         }
162                     }
163                 
164 h.sterling 1.1      PEG_METHOD_EXIT();
165                 }
166                 
167                 void DynamicConsumer::setShutdownSemaphore(Semaphore* shutdownSemaphore)
168                 {
169                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::setShutdownSemaphore");
170                 
171                     _shutdownSemaphore = shutdownSemaphore;
172                 
173                     PEG_METHOD_EXIT();
174                 }
175                 
176                 Semaphore* DynamicConsumer::getShutdownSemaphore()
177                 {
178                     return _shutdownSemaphore;
179                 }
180                 
181                 void DynamicConsumer::sendShutdownSignal()
182                 {
183                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::sendShutdownSignal");
184                 
185 h.sterling 1.1      _dieNow = true;
186                     _check_queue->signal();
187                 
188                     PEG_METHOD_EXIT();
189                 }
190                 
191                 void DynamicConsumer::terminate(void)
192                 {
193                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::terminate");
194                 
195                     if (_initialized)
196                     {
197                         // yield before a potentially lengthy operation.
198 mike       1.14         Threads::yield();
199 h.sterling 1.1  
200                         //terminate consumer
201                         try
202                         {
203                             DynamicConsumerFacade::terminate();
204                 
205                         } catch (...)
206                         {
207                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4,
208                                              "Exception caught in DynamicConsumerFacade::Terminate for " +
209                                              _name);
210                             throw;
211                         }
212                 
213                         //update status
214                         _initialized = false;
215                         _dieNow = false;
216                     }
217                 
218                     PEG_METHOD_EXIT();
219                 }
220 h.sterling 1.1  
221                 /** This method should be called after the physical consumer is loaded and before initialization.  
222                  */ 
223                 void DynamicConsumer::set(ConsumerModule* consumerModule,
224                                           CIMIndicationConsumerProvider* consumerRef)
225                 {
226                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::set");
227                 
228                     if (_initialized)
229                     {
230 h.sterling 1.4          throw Exception(MessageLoaderParms("DynListener.DynamicConsumer.CONSUMER_INVALID_STATE",
231                                                            "Error: The consumer is not in the correct state to perform the operation."));
232 h.sterling 1.1      }
233                 
234                     _module = consumerModule;
235                     _consumer = consumerRef;
236                 
237                     PEG_METHOD_EXIT();
238                 }
239                 
240                 /** This method should be called after the consumer is terminated and the module is unloaded.  Note that we cannot test
241                  * for a loaded condition, since the _module reference here may still exist (if more than one consumer is using the module).
242                  * Simply test whether the consumer is initialized.  If it was terminated properly, initialized will be false and the _module
243                  * ref count will be decremented.
244                  */
245                 void DynamicConsumer::reset()
246                 {
247                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::reset");
248                 
249                     if (_initialized)
250                     {
251 h.sterling 1.4          throw Exception(MessageLoaderParms("DynListener.DynamicConsumer.CONSUMER_INVALID_STATE",
252                                                            "Error: The consumer is not in the correct state to perform the operation."));
253 h.sterling 1.1      }
254                 
255                     _module = 0;    // do not delete it, that is taken care of in ConsumerModule itself 
256                     _consumer = 0;  // ATTN: attempting to delete this causes an exception -- why??
257                 
258                     Tracer::trace(__FILE__,__LINE__,TRC_LISTENER,Tracer::LEVEL4,
259                                   "Deleting %d outstanding requests for %s",
260                                   _eventqueue.size(),
261                                   (const char*)_name.getCString());
262                 
263                     //delete outstanding requests
264                     IndicationDispatchEvent* event = 0;
265                     for (Uint32 i = 0; i < _eventqueue.size(); i++)
266                     {
267 mike       1.13         event = _eventqueue.remove_front();
268 h.sterling 1.1          delete event;
269                     }
270                 
271                     PEG_METHOD_EXIT();
272                 }
273                 
274                 void DynamicConsumer::enqueueEvent(IndicationDispatchEvent* event)
275                 {
276                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::enqueueEvent");
277                 
278                     if (!isLoaded())
279                     {
280                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL3, "Error: The consumer is not loaded and therefore cannot handle events.");
281                         return;
282                     }
283                 
284                     try
285                     {
286                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "enqueueEvent before " + _name);
287 h.sterling 1.8          // Our event queue is first in first out.
288 mike       1.13         _eventqueue.insert_back(event);
289 h.sterling 1.1          _check_queue->signal();
290                 
291                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "enqueueEvent after " + _name);
292                 
293                     } catch (Exception& ex)
294                     {
295                         //ATTN: Log missed indication
296                         PEGASUS_STD(cout) << "Error enqueueing event" << ex.getMessage() << "\n";
297                 
298                     } catch (...)
299                     {
300                         //ATTN: Log missed indication
301                         PEGASUS_STD(cout) << "Unknown exception";
302                     }
303                 
304                     PEG_METHOD_EXIT();
305                 }
306                 
307                 void DynamicConsumer::getIdleTimer(struct timeval *tv)
308                 {
309                     if (tv == 0)
310 h.sterling 1.1      {
311                         return;
312                     }
313                 
314                     try
315                     {
316                         AutoMutex lock(_idleTimeMutex);
317                         memcpy(tv, &_idleTime, sizeof(struct timeval));
318                     } catch (...)
319                     {
320 mike       1.14         Time::gettimeofday(tv);
321 h.sterling 1.1      }
322                 }
323                 
324                 void DynamicConsumer::updateIdleTimer()
325                 {
326                     try
327                     {
328                         AutoMutex lock(_idleTimeMutex);
329 mike       1.14         Time::gettimeofday(&_idleTime);
330 h.sterling 1.1  
331                     } catch (...)
332                     {
333                     }
334                 }
335                 
336                 Uint32 DynamicConsumer::getPendingIndications()
337                 {
338                     return _eventqueue.size();
339                 }
340                 
341                 String DynamicConsumer::toString()
342                 {
343                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::toString");
344                 
345                     String buffer = String::EMPTY;
346                     if (_initialized)
347                     {
348                         buffer.append("Consumer " + _name + " is initialized.\n");
349                         buffer.append("Module name " + _module->getFileName() + "\n");
350                     }
351 h.sterling 1.1  
352                     PEG_METHOD_EXIT();
353                     return buffer;
354                 }
355                 
356                 /** Returns true if the consumer has been inactive for longer than the idle period.
357                  */ 
358                 Boolean DynamicConsumer::isIdle() 
359                 {
360                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::isIdle");
361                 
362                     if (!isLoaded())
363                     {
364                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Consumer is not loaded.");
365                         return false;
366                     }
367                 
368                     struct timeval now;
369 mike       1.14     Time::gettimeofday(&now);
370 h.sterling 1.1  
371                     struct timeval timeout = {0,0};
372                     getIdleTimer(&timeout);
373                 
374 h.sterling 1.9  	//if no consumer is currently being served and there's no consumer that has pending indications, we are idle
375 h.sterling 1.10     if (!_current_operations.get() && !getPendingIndications())
376 h.sterling 1.1      {
377                         PEG_METHOD_EXIT();
378                         return true;
379                     }
380                 
381                     PEG_METHOD_EXIT();
382                     return false;
383                 }
384                 
385 h.sterling 1.6  /** This method waits until the event thread is ready to accept incoming indications.  Otherwise, there is a miniscule chance that 
386                  * the first event will be enqueued before the consumer is waiting for it and the first indication after loading the consumer will be lost.
387                  */ 
388                 void DynamicConsumer::waitForEventThread()
389                 {
390                     _listeningSemaphore->wait();
391                 }
392                 
393 h.sterling 1.1  /** This method is called when the consumer is initialized for the first time.
394                  *  It reads the outstanding requests from the dat file and enqueues them.
395                  * 
396                  * ATTN: This method will only get called when a consumer is initialized.  Therefore,
397                  * when the listener starts, the outstanding indications for this consumer will not get sent
398                  * UNTIL a new indication comes in.  This is not really an acceptable scenario.  Maybe the consumer
399                  * manager needs to check the .dat files upon startup and load if they are not empty.
400                  * 
401                  */ 
402 h.sterling 1.9  void DynamicConsumer::_loadOutstandingIndications(Array<IndicationDispatchEvent> indications)
403 h.sterling 1.1  {
404                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::_loadOutstandingIndications");
405                 
406                     //create dispatch events from the instances
407 h.sterling 1.9      IndicationDispatchEvent* event = 0;
408 h.sterling 1.1      for (Uint32 i=0; i < indications.size(); i++)
409                     {
410 h.sterling 1.9  		
411 h.sterling 1.1          event = new IndicationDispatchEvent(OperationContext(),  //ATTN: Do we need to store this?
412 h.sterling 1.9                                              indications[i].getURL(),
413                                                             indications[i].getIndicationInstance());
414                 		
415 mike       1.13         _eventqueue.insert_back(event);
416 h.sterling 1.1      }
417                 
418                     //signal the worker thread so it falls into the queue processing code
419                     if (_eventqueue.size())
420                     {
421                         _check_queue->signal();
422                     }
423                 
424                     PEG_METHOD_EXIT();
425                 }
426                 
427                 /** This method serializes the remaining indications in the queue. It should be called when the
428                  * consumer is shutting down.  Each time the consumer is loaded, these indications will be
429                  * reloaded into the queue.  Therefore, the file should be overwritten each time to eliminate
430                  * duplicating outstanding indications.
431                  * 
432                  * ATTN: Should we let another method delete the instances?
433                  */ 
434 h.sterling 1.9  Array<IndicationDispatchEvent> DynamicConsumer::_retrieveOutstandingIndications()
435 h.sterling 1.1  {
436                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::_retrieveOutstandingIndications");
437                 
438 h.sterling 1.9      Array<IndicationDispatchEvent> indications;
439 h.sterling 1.1      IndicationDispatchEvent* temp = 0;
440                 
441                     try
442                     {
443                         _eventqueue.try_lock();
444 mike       1.13         temp = _eventqueue.front();
445 h.sterling 1.1          while (temp)
446                         {
447                             PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "retrieving");
448 h.sterling 1.9  			indications.append(*temp);
449 mike       1.13             temp = _eventqueue.next_of(temp);
450 h.sterling 1.1          }
451                         _eventqueue.unlock();
452                 
453                     } catch (...)
454                     {
455                         PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Unknown Exception");
456                     }
457                 
458                     PEG_METHOD_EXIT();
459                     return indications;
460                 }
461                 
462                 
463                 ////////////////////////////////
464                 // IndicationDispatchEvent
465                 ////////////////////////////////
466                 
467 h.sterling 1.9  IndicationDispatchEvent::IndicationDispatchEvent()
468                 {
469                 }
470                 
471 h.sterling 1.1  IndicationDispatchEvent::IndicationDispatchEvent(OperationContext context,
472                                                                  String url,
473                                                                  CIMInstance instance) :
474                 _context(context),
475                 _url(url),
476                 _instance(instance),
477 h.sterling 1.4  _retries(0),
478                 _lastAttemptTime(CIMDateTime())
479 h.sterling 1.1  {
480                 }
481                 
482 mike       1.13 IndicationDispatchEvent::IndicationDispatchEvent(
483                     const IndicationDispatchEvent &event) : Linkable(event)
484 h.sterling 1.10 {
485                     _context = event._context;
486                     _url = event._url;
487                     _instance = event._instance;
488                     _retries = event._retries.get();
489                     _lastAttemptTime = event._lastAttemptTime;
490                 }
491                 
492 h.sterling 1.1  IndicationDispatchEvent::~IndicationDispatchEvent()
493                 {
494                 }
495                 
496                 OperationContext IndicationDispatchEvent::getContext() const
497                 {
498                     return _context;
499                 }
500                 
501                 String IndicationDispatchEvent::getURL() const
502                 {
503                     return _url;
504                 }
505                 
506                 CIMInstance IndicationDispatchEvent::getIndicationInstance() const
507                 {
508                     return _instance;
509                 }
510                 
511 mike       1.7  Uint32 IndicationDispatchEvent::getRetries()
512 h.sterling 1.1  {
513 mike       1.7      return _retries.get();
514 h.sterling 1.1  }
515                 
516                 void IndicationDispatchEvent::increaseRetries()
517                 {
518 h.sterling 1.4      PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Increasing retries\n");
519 h.sterling 1.1      _retries++;
520 h.sterling 1.4      _lastAttemptTime = CIMDateTime::getCurrentDateTime();
521                     PEG_TRACE_STRING(TRC_LISTENER, Tracer::LEVEL4, "Last attempt time " + _lastAttemptTime.toString());
522                 }
523                 
524                 CIMDateTime IndicationDispatchEvent::getLastAttemptTime()
525                 {
526                     return _lastAttemptTime;
527 h.sterling 1.1  }
528                 
529 h.sterling 1.9  
530                 IndicationDispatchEvent& IndicationDispatchEvent::operator=(const IndicationDispatchEvent &event)
531                 {
532                 	_context = event._context;
533                 	_url = event._url;
534                 	_instance = event._instance;
535 h.sterling 1.10 	_retries = event._retries.get();
536 h.sterling 1.9  	_lastAttemptTime = event._lastAttemptTime;
537                 
538                     return *this;
539                 }
540                 
541 h.sterling 1.1  Boolean IndicationDispatchEvent::operator==(const IndicationDispatchEvent& event) const
542                 {
543                     if (String::equal(this->_url, event._url) && 
544                         (this->_instance.identical(event._instance)))
545                     {
546                         return true;
547                     }
548                     return false;
549                 }
550                 
551                 PEGASUS_NAMESPACE_END
552                 

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2