(file) Return to DynamicConsumer.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / DynListener

  1 karl  1.11 //%2006////////////////////////////////////////////////////////////////////////
  2 h.sterling 1.1  //
  3                 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4                 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5                 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6                 // IBM Corp.; EMC Corporation, The Open Group.
  7                 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8                 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9                 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10                 // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl       1.11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12                 // EMC Corporation; Symantec Corporation; The Open Group.
 13 h.sterling 1.1  //
 14                 // Permission is hereby granted, free of charge, to any person obtaining a copy
 15                 // of this software and associated documentation files (the "Software"), to
 16                 // deal in the Software without restriction, including without limitation the
 17                 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18                 // sell copies of the Software, and to permit persons to whom the Software is
 19                 // furnished to do so, subject to the following conditions:
 20                 // 
 21                 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22                 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23                 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24                 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25                 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26                 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27                 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28                 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29                 //
 30                 //==============================================================================
 31                 //
 32                 //%/////////////////////////////////////////////////////////////////////////////
 33                 
 34 h.sterling 1.1  #include "DynamicConsumer.h"
 35                 #include "DynamicConsumerFacade.h"
 36                 
 37                 #include <Pegasus/Common/Config.h>
 38 mike       1.14 #include <Pegasus/Common/Time.h>
 39 h.sterling 1.1  #include <Pegasus/Common/System.h>
 40                 #include <Pegasus/Common/Tracer.h>
 41                 #include <Pegasus/Common/XmlWriter.h>
 42                 #include <Pegasus/Common/XmlReader.h>
 43                 #include <Pegasus/Common/XmlParser.h>
 44                 #include <Pegasus/Common/FileSystem.h>
 45                 
 46                 PEGASUS_NAMESPACE_BEGIN
 47                 PEGASUS_USING_STD;
 48                 
 49                 
 50                 DynamicConsumer::DynamicConsumer(): Base(0)
 51                 {
 52                 }
 53                 
 54                 DynamicConsumer::DynamicConsumer(const String& name): 
 55                 Base(0),
 56                 _module(0),
 57 mike       1.13 _eventqueue(),
 58 h.sterling 1.1  _name(name),
 59                 _initialized(false),
 60                 _dieNow(0),
 61                 _no_unload(0)
 62                 {
 63                     _check_queue = new Semaphore(0);
 64                     _shutdownSemaphore = new Semaphore(0);
 65 h.sterling 1.6      _listeningSemaphore = new Semaphore(0);
 66 h.sterling 1.1  }
 67                 
 68                 //ATTN: For migration from old listener -- do we want to support it?
 69                 DynamicConsumer::DynamicConsumer(const String & name,
 70                                                  ConsumerModule* consumerModule,
 71                                                  CIMIndicationConsumerProvider* consumerRef) :
 72                 Base(consumerRef),
 73                 _module(consumerModule),
 74 mike       1.13 _eventqueue(),
 75 h.sterling 1.1  _name(name),
 76                 _initialized(false),
 77                 _dieNow(0),
 78                 _no_unload(0)
 79                 {
 80                     _check_queue = new Semaphore(0);
 81                     _shutdownSemaphore = new Semaphore(0);
 82 h.sterling 1.6      _listeningSemaphore = new Semaphore(0);
 83 h.sterling 1.1  }
 84                 
 85                 DynamicConsumer::~DynamicConsumer(void)
 86                 {
 87                     //delete any outstanding events
 88                     IndicationDispatchEvent* event;
 89                     while (_eventqueue.size())
 90                     {
 91 mike       1.13         event = _eventqueue.remove_front();
 92 h.sterling 1.1          delete event;
 93                     }
 94                 
 95                     //delete semaphores
 96 kumpf      1.12     delete _check_queue;
 97 h.sterling 1.1  
 98 kumpf      1.12     delete _shutdownSemaphore;
 99 h.sterling 1.6  
100 kumpf      1.12     delete _listeningSemaphore;
101 h.sterling 1.1  }
102                 
103                 CIMIndicationConsumerProvider* DynamicConsumer::getConsumer()
104                 {
105                     return(_consumer);
106                 }
107                 
108                 ConsumerModule* DynamicConsumer::getModule(void) const
109                 {
110                     return(_module);
111                 }
112                 
113                 String DynamicConsumer::getName(void) const
114                 {
115                     return(_name);
116                 }
117                 
118                 Boolean DynamicConsumer::isLoaded(void) const
119                 {
120                     return(_module == 0 ? false : true);
121                 }
122 h.sterling 1.1  
123                 Boolean DynamicConsumer::isInitialized(void) const
124                 {
125                     return(_initialized);
126                 }
127                 
128                 /** Initializes the consumer.
129                  *  Caller assumes responsibility for catching exceptions thrown by this method.
130                  */ 
131                 void DynamicConsumer::initialize()
132                 {
133                     PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::initialize");
134                 
135                     if (!_initialized)
136                     {
137                         try
138                         {
139                             //there is no cimom handle in the listener, so pass null
140                             CIMOMHandle* handle = 0;
141                             DynamicConsumerFacade::initialize(*(handle));
142                 
143 h.sterling 1.1              updateIdleTimer();
144                             _initialized = true;
145                 
146 marek      1.19             PEG_TRACE_CSTRING(
147                                 TRC_LISTENER,
148                                 Tracer::LEVEL3,
149                                 "Successfully initialized consumer.");
150 h.sterling 1.1  
151                         } catch (...)
152                         {
153 thilo.boehm 1.22             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
154                                  "Exception caught in DynamicConsumerFacade::initialize for %s",
155                                  (const char*)_name.getCString()));
156 h.sterling  1.1              throw;
157                          }
158                      }
159                  
160                      PEG_METHOD_EXIT();
161                  }
162                  
163                  void DynamicConsumer::setShutdownSemaphore(Semaphore* shutdownSemaphore)
164                  {
165                      PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::setShutdownSemaphore");
166                  
167                      _shutdownSemaphore = shutdownSemaphore;
168                  
169                      PEG_METHOD_EXIT();
170                  }
171                  
172                  Semaphore* DynamicConsumer::getShutdownSemaphore()
173                  {
174                      return _shutdownSemaphore;
175                  }
176                  
177 h.sterling  1.1  void DynamicConsumer::sendShutdownSignal()
178                  {
179                      PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::sendShutdownSignal");
180                  
181                      _dieNow = true;
182                      _check_queue->signal();
183                  
184                      PEG_METHOD_EXIT();
185                  }
186                  
187                  void DynamicConsumer::terminate(void)
188                  {
189                      PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::terminate");
190                  
191                      if (_initialized)
192                      {
193                          //terminate consumer
194                          try
195                          {
196                              DynamicConsumerFacade::terminate();
197                  
198 h.sterling  1.1          } catch (...)
199                          {
200 thilo.boehm 1.22             PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
201                                  "Exception caught in DynamicConsumerFacade::Terminate for %s",
202                                  (const char*)_name.getCString()));
203 h.sterling  1.1              throw;
204                          }
205                  
206                          //update status
207                          _initialized = false;
208                          _dieNow = false;
209                      }
210                  
211                      PEG_METHOD_EXIT();
212                  }
213                  
214 marek       1.19 /** This method should be called after the physical consumer 
215                   *  is loaded and before initialization.  
216 h.sterling  1.1   */ 
217                  void DynamicConsumer::set(ConsumerModule* consumerModule,
218                                            CIMIndicationConsumerProvider* consumerRef)
219                  {
220                      PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::set");
221                  
222                      if (_initialized)
223                      {
224 marek       1.19         throw Exception(
225                                    MessageLoaderParms(
226                                        "DynListener.DynamicConsumer.CONSUMER_INVALID_STATE",
227                                        "Error: The consumer is not in the correct state"
228                                            " to perform the operation."));
229 h.sterling  1.1      }
230                  
231                      _module = consumerModule;
232                      _consumer = consumerRef;
233                  
234                      PEG_METHOD_EXIT();
235                  }
236                  
237 marek       1.19 /** This method should be called after the consumer is terminated and the 
238                   * module is unloaded.  Note that we cannot test for a loaded condition, 
239                   * since the _module reference here may still exist (if more than one 
240                   * consumer is using the module).
241                   * Simply test whether the consumer is initialized.  
242                   * If it was terminated properly, initialized will be false and the _module
243 h.sterling  1.1   * ref count will be decremented.
244                   */
245                  void DynamicConsumer::reset()
246                  {
247                      PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::reset");
248                  
249                      if (_initialized)
250                      {
251 marek       1.19         throw Exception(
252                                   MessageLoaderParms(
253                                       "DynListener.DynamicConsumer.CONSUMER_INVALID_STATE",
254                                       "Error: The consumer is not in the correct state to "
255                                           "perform the operation."));
256 h.sterling  1.1      }
257                  
258 marek       1.19     // do not delete it, that is taken care of in ConsumerModule itself 
259                      _module = 0;
260                      // ATTN: attempting to delete this causes an exception -- why??
261                      _consumer = 0;
262 h.sterling  1.1  
263 mike        1.18     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
264 h.sterling  1.1                    "Deleting %d outstanding requests for %s",
265                                    _eventqueue.size(),
266 marek       1.16                   (const char*)_name.getCString()));
267 h.sterling  1.1  
268                      //delete outstanding requests
269                      IndicationDispatchEvent* event = 0;
270                      for (Uint32 i = 0; i < _eventqueue.size(); i++)
271                      {
272 mike        1.13         event = _eventqueue.remove_front();
273 h.sterling  1.1          delete event;
274                      }
275                  
276                      PEG_METHOD_EXIT();
277                  }
278                  
279                  void DynamicConsumer::enqueueEvent(IndicationDispatchEvent* event)
280                  {
281                      PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::enqueueEvent");
282                  
283                      if (!isLoaded())
284                      {
285 marek       1.19         PEG_TRACE_CSTRING(
286                              TRC_LISTENER,
287 marek       1.20             Tracer::LEVEL1,
288 marek       1.19             "Error: The consumer is not loaded and "
289                                  "therefore cannot handle events.");
290 h.sterling  1.1          return;
291                      }
292                  
293                      try
294                      {
295 thilo.boehm 1.22         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
296                              "enqueueEvent before %s",(const char*)_name.getCString()));
297                  
298 h.sterling  1.8          // Our event queue is first in first out.
299 mike        1.13         _eventqueue.insert_back(event);
300 h.sterling  1.1          _check_queue->signal();
301                  
302 thilo.boehm 1.22         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,
303                              "enqueueEvent after %s",(const char*)_name.getCString()));
304 h.sterling  1.1  
305                      } catch (Exception& ex)
306                      {
307                          //ATTN: Log missed indication
308 thilo.boehm 1.22         PEG_TRACE((TRC_LISTENER,Tracer::LEVEL1,
309                              "Exception at enqueueingEvent: %s", 
310                              (const char*)ex.getMessage().getCString()));
311 h.sterling  1.1  
312                      } catch (...)
313                      {
314                          //ATTN: Log missed indication
315 thilo.boehm 1.22         PEG_TRACE_CSTRING(TRC_LISTENER,Tracer::LEVEL1,
316                              "Unknow exception at enqueueingEvent!");
317 h.sterling  1.1      }
318                  
319                      PEG_METHOD_EXIT();
320                  }
321                  
322                  void DynamicConsumer::getIdleTimer(struct timeval *tv)
323                  {
324                      if (tv == 0)
325                      {
326                          return;
327                      }
328                  
329                      try
330                      {
331                          AutoMutex lock(_idleTimeMutex);
332                          memcpy(tv, &_idleTime, sizeof(struct timeval));
333                      } catch (...)
334                      {
335 mike        1.14         Time::gettimeofday(tv);
336 h.sterling  1.1      }
337                  }
338                  
339                  void DynamicConsumer::updateIdleTimer()
340                  {
341                      try
342                      {
343                          AutoMutex lock(_idleTimeMutex);
344 mike        1.14         Time::gettimeofday(&_idleTime);
345 h.sterling  1.1  
346                      } catch (...)
347                      {
348                      }
349                  }
350                  
351                  Uint32 DynamicConsumer::getPendingIndications()
352                  {
353                      return _eventqueue.size();
354                  }
355                  
356                  String DynamicConsumer::toString()
357                  {
358                      PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::toString");
359                  
360 kumpf       1.17     String buffer;
361 h.sterling  1.1      if (_initialized)
362                      {
363                          buffer.append("Consumer " + _name + " is initialized.\n");
364                          buffer.append("Module name " + _module->getFileName() + "\n");
365                      }
366                  
367                      PEG_METHOD_EXIT();
368                      return buffer;
369                  }
370                  
371 marek       1.19 /** Returns true if the consumer has been inactive for 
372                   *  longer than the idle period.
373 h.sterling  1.1   */ 
374                  Boolean DynamicConsumer::isIdle() 
375                  {
376                      PEG_METHOD_ENTER(TRC_LISTENER, "DynamicConsumer::isIdle");
377                  
378                      if (!isLoaded())
379                      {
380 marek       1.19         PEG_TRACE_CSTRING(
381                              TRC_LISTENER,
382 marek       1.20             Tracer::LEVEL2,
383 marek       1.19             "Consumer is not loaded.");
384 h.sterling  1.1          return false;
385                      }
386                  
387                      struct timeval now;
388 mike        1.14     Time::gettimeofday(&now);
389 h.sterling  1.1  
390                      struct timeval timeout = {0,0};
391                      getIdleTimer(&timeout);
392                  
393 marek       1.19     // if no consumer is currently being served and there's no consumer that
394                      // has pending indications, we are idle
395 h.sterling  1.10     if (!_current_operations.get() && !getPendingIndications())
396 h.sterling  1.1      {
397                          PEG_METHOD_EXIT();
398                          return true;
399                      }
400                  
401                      PEG_METHOD_EXIT();
402                      return false;
403                  }
404                  
405 marek       1.19 /** This method waits until the event thread is ready to accept incoming
406                   *  indications.  Otherwise, there is a miniscule chance that 
407                   *  the first event will be enqueued before the consumer is waiting for it
408                   *  and the first indication after loading the consumer will be lost.
409 h.sterling  1.6   */ 
410                  void DynamicConsumer::waitForEventThread()
411                  {
412                      _listeningSemaphore->wait();
413                  }
414                  
415 h.sterling  1.1  /** This method is called when the consumer is initialized for the first time.
416                   *  It reads the outstanding requests from the dat file and enqueues them.
417                   * 
418 marek       1.19  * ATTN: This method will only get called when a consumer is initialized.
419                   * Therefore, when the listener starts, the outstanding indications for this
420                   * consumer will not get sent UNTIL a new indication comes in.  This is not
421                   * really an acceptable scenario.  Maybe the consumer manager needs to check
422                   * the .dat files upon startup and load if they are not empty.
423 h.sterling  1.1   */ 
424 marek       1.19 void DynamicConsumer::_loadOutstandingIndications(
425                           Array<IndicationDispatchEvent> indications)
426 h.sterling  1.1  {
427 marek       1.19     PEG_METHOD_ENTER(
428                          TRC_LISTENER,
429                          "DynamicConsumer::_loadOutstandingIndications");
430 h.sterling  1.1  
431                      //create dispatch events from the instances
432 h.sterling  1.9      IndicationDispatchEvent* event = 0;
433 h.sterling  1.1      for (Uint32 i=0; i < indications.size(); i++)
434                      {
435 marek       1.19         
436                          event = new IndicationDispatchEvent(
437                                          OperationContext(),  //ATTN: Do we need to store this?
438                                          indications[i].getURL(),
439                                          indications[i].getIndicationInstance());
440                          
441 mike        1.13         _eventqueue.insert_back(event);
442 h.sterling  1.1      }
443                  
444                      //signal the worker thread so it falls into the queue processing code
445                      if (_eventqueue.size())
446                      {
447                          _check_queue->signal();
448                      }
449                  
450                      PEG_METHOD_EXIT();
451                  }
452                  
453 marek       1.19 /** This method serializes the remaining indications in the queue.
454                   *  It should be called when the consumer is shutting down.  Each time the 
455                   *  consumer is loaded, these indications will be reloaded into the queue.
456                   *  Therefore, the file should be overwritten each time to eliminate 
457                   *  duplicating outstanding indications.
458 h.sterling  1.1   * 
459                   * ATTN: Should we let another method delete the instances?
460                   */ 
461 marek       1.19 Array<IndicationDispatchEvent> 
462                      DynamicConsumer::_retrieveOutstandingIndications()
463 h.sterling  1.1  {
464 marek       1.19     PEG_METHOD_ENTER(
465                          TRC_LISTENER,
466                          "DynamicConsumer::_retrieveOutstandingIndications");
467 h.sterling  1.1  
468 h.sterling  1.9      Array<IndicationDispatchEvent> indications;
469 h.sterling  1.1      IndicationDispatchEvent* temp = 0;
470                  
471                      try
472                      {
473 kumpf       1.21         if (_eventqueue.try_lock())
474 h.sterling  1.1          {
475 kumpf       1.21             temp = _eventqueue.front();
476                              while (temp)
477                              {
478                                  PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "retrieving");
479                                  indications.append(*temp);
480                                  temp = _eventqueue.next_of(temp);
481                              }
482                              _eventqueue.unlock();
483 h.sterling  1.1          }
484 kumpf       1.21         else
485                          {
486                              PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL3,
487                                  "Failed to lock _eventqueue");
488                          }
489                      }
490                      catch (...)
491 h.sterling  1.1      {
492 marek       1.16         PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Unknown Exception");
493 h.sterling  1.1      }
494                  
495                      PEG_METHOD_EXIT();
496                      return indications;
497                  }
498                  
499                  
500                  ////////////////////////////////
501                  // IndicationDispatchEvent
502                  ////////////////////////////////
503                  
504 h.sterling  1.9  IndicationDispatchEvent::IndicationDispatchEvent()
505                  {
506                  }
507                  
508 h.sterling  1.1  IndicationDispatchEvent::IndicationDispatchEvent(OperationContext context,
509                                                                   String url,
510                                                                   CIMInstance instance) :
511                  _context(context),
512                  _url(url),
513                  _instance(instance),
514 h.sterling  1.4  _retries(0),
515                  _lastAttemptTime(CIMDateTime())
516 h.sterling  1.1  {
517                  }
518                  
519 mike        1.13 IndicationDispatchEvent::IndicationDispatchEvent(
520                      const IndicationDispatchEvent &event) : Linkable(event)
521 h.sterling  1.10 {
522                      _context = event._context;
523                      _url = event._url;
524                      _instance = event._instance;
525                      _retries = event._retries.get();
526                      _lastAttemptTime = event._lastAttemptTime;
527                  }
528                  
529 h.sterling  1.1  IndicationDispatchEvent::~IndicationDispatchEvent()
530                  {
531                  }
532                  
533                  OperationContext IndicationDispatchEvent::getContext() const
534                  {
535                      return _context;
536                  }
537                  
538                  String IndicationDispatchEvent::getURL() const
539                  {
540                      return _url;
541                  }
542                  
543                  CIMInstance IndicationDispatchEvent::getIndicationInstance() const
544                  {
545                      return _instance;
546                  }
547                  
548 mike        1.7  Uint32 IndicationDispatchEvent::getRetries()
549 h.sterling  1.1  {
550 mike        1.7      return _retries.get();
551 h.sterling  1.1  }
552                  
553                  void IndicationDispatchEvent::increaseRetries()
554                  {
555 marek       1.16     PEG_TRACE_CSTRING(TRC_LISTENER, Tracer::LEVEL4, "Increasing retries\n");
556 h.sterling  1.1      _retries++;
557 h.sterling  1.4      _lastAttemptTime = CIMDateTime::getCurrentDateTime();
558 thilo.boehm 1.22     PEG_TRACE((TRC_LISTENER,Tracer::LEVEL4,"Last attempt time %s",
559                          (const char*)_lastAttemptTime.toString().getCString()));
560 h.sterling  1.4  }
561                  
562                  CIMDateTime IndicationDispatchEvent::getLastAttemptTime()
563                  {
564                      return _lastAttemptTime;
565 h.sterling  1.1  }
566                  
567 h.sterling  1.9  
568 marek       1.19 IndicationDispatchEvent& 
569                      IndicationDispatchEvent::operator=(const IndicationDispatchEvent &event)
570 h.sterling  1.9  {
571 marek       1.19     _context = event._context;
572                      _url = event._url;
573                      _instance = event._instance;
574                      _retries = event._retries.get();
575                      _lastAttemptTime = event._lastAttemptTime;
576 h.sterling  1.9  
577                      return *this;
578                  }
579                  
580 marek       1.19 Boolean IndicationDispatchEvent::operator==
581                              (const IndicationDispatchEvent& event) const
582 h.sterling  1.1  {
583                      if (String::equal(this->_url, event._url) && 
584                          (this->_instance.identical(event._instance)))
585                      {
586                          return true;
587                      }
588                      return false;
589                  }
590                  
591                  PEGASUS_NAMESPACE_END
592                  

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2