(file) Return to DestinationQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / HandlerService

  1 venkat.puvvada 1.1 //%LICENSE////////////////////////////////////////////////////////////////
  2                    //
  3                    // Licensed to The Open Group (TOG) under one or more contributor license
  4                    // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
  5                    // this work for additional information regarding copyright ownership.
  6                    // Each contributor licenses this file to you under the OpenPegasus Open
  7                    // Source License; you may not use this file except in compliance with the
  8                    // License.
  9                    //
 10                    // Permission is hereby granted, free of charge, to any person obtaining a
 11                    // copy of this software and associated documentation files (the "Software"),
 12                    // to deal in the Software without restriction, including without limitation
 13                    // the rights to use, copy, modify, merge, publish, distribute, sublicense,
 14                    // and/or sell copies of the Software, and to permit persons to whom the
 15                    // Software is furnished to do so, subject to the following conditions:
 16                    //
 17                    // The above copyright notice and this permission notice shall be included
 18                    // in all copies or substantial portions of the Software.
 19                    //
 20                    // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 21                    // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 22 venkat.puvvada 1.1 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 23                    // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 24                    // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 25                    // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 26                    // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27                    //
 28                    //////////////////////////////////////////////////////////////////////////
 29                    //
 30                    //%/////////////////////////////////////////////////////////////////////////////
 31                    
 32                    #include <Pegasus/Common/Time.h>
 33                    #include <Pegasus/Common/Tracer.h>
 34                    #include <Pegasus/Common/Constants.h>
 35                    #include <Pegasus/Common/StringConversion.h>
 36 karl           1.10 #include <Pegasus/Common/MessageQueueService.h>
 37 venkat.puvvada 1.1  #include <Pegasus/Config/ConfigManager.h>
 38                     #include <Pegasus/Provider/CIMOMHandle.h>
 39                     #include "DestinationQueue.h"
 40                     
 41                     PEGASUS_NAMESPACE_BEGIN
 42                     
 43                     // Initialize with default values.
 44                     Uint16 DestinationQueue::_maxDeliveryRetryAttempts = 3;
 45                     Uint64 DestinationQueue::_minDeliveryRetryIntervalUsec = 20 * 1000000;
 46 karl           1.10 Uint64 DestinationQueue::_minSubscriptionRemovalTimeIntervalUsec
 47                         = 2592000 * 1000000ULL; // Default 30 days, Refer DSP1054.
 48 venkat.puvvada 1.1  Uint64 DestinationQueue::_sequenceIdentifierLifetimeUsec = 600 * 1000000;
 49 karl           1.10 
 50 venkat.puvvada 1.1  String DestinationQueue::_indicationServiceName = "PG:IndicationService";
 51                     String DestinationQueue::_objectManagerName = "Pegasus";
 52 karl           1.10 Uint32 DestinationQueue::_indicationServiceQid;
 53 venkat.puvvada 1.1  
 54 venkat.puvvada 1.6  DestinationQueue::IndDiscardedReasonMsgs
 55                         DestinationQueue::indDiscardedReasonMsgs[] = {
 56                         {"HandlerService.DestinationQueue.INDICATION_DISCARDED_LD_DELETED",
 57                          "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\" was"
 58                              " not delivered due to the corresponding listener destination"
 59                              " instance was removed."},
 60                     
 61                         {"HandlerService.DestinationQueue.INDICATION_DISCARDED_"
 62                              "SUBSCRIPTION_DELETED",
 63                          "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\" was"
 64                              " not delivered due to the corresponding subscription was disabled"
 65                              " or deleted."},
 66                     
 67                         {"HandlerService.DestinationQueue.INDICATION_DISCARDED_"
 68                              "DESTINATIONQUEUE_FULL",
 69                          "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
 70                              " was discarded due to the destination queue was full."},
 71                     
 72                         {"HandlerService.DestinationQueue.INDICATION_DISCARDED_SIL_EXPIRED",
 73                          "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
 74                              " was not delivered due to the sequence identifier lifetime expired."},
 75 venkat.puvvada 1.6  
 76                         {"HandlerService.DestinationQueue.INDICATION_DISCARDED_DRA_EXCEEDED",
 77                          "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
 78                              " was not delivered due to the maximum delivery retry"
 79                              " attempts exceeded. Exception : $2"},
 80                     
 81                         {"HandlerService.DestinationQueue.INDICATION_DISCARDED_CIMSERVER_SHUTDOWN",
 82                          "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
 83                               " was not delivered due to the cimserver shutdown."}
 84 venkat.puvvada 1.1  };
 85                     
 86                     Uint64 DestinationQueue::_serverStartupTimeUsec
 87                         = System::getCurrentTimeUsec();
 88                     
 89                     Boolean DestinationQueue::_initialized = false;
 90                     static Mutex _intializeMutex;
 91                     
 92                     CIMInstance DestinationQueue::_getInstance(const CIMName &className)
 93                     {
 94                         CIMOMHandle cimomHandle;
 95                         Array<CIMInstance> instances =
 96                             cimomHandle.enumerateInstances(
 97                                 OperationContext(),
 98                                 PEGASUS_NAMESPACENAME_INTEROP,
 99                                 className,
100                                 true,
101                                 false,
102                                 false,
103                                 false,
104                                 CIMPropertyList());
105 venkat.puvvada 1.1  
106                         return instances[0];
107                     }
108                     
109                     void DestinationQueue::_initIndicationServiceProperties()
110                     {
111                         PEG_METHOD_ENTER(TRC_IND_HANDLER,
112                             "DestinationQueue::_initIndicationServiceProperties");
113                     
114                         CIMInstance instance =
115                             _getInstance(PEGASUS_CLASSNAME_CIM_INDICATIONSERVICE);
116                     
117                         instance.getProperty(
118                             instance.findProperty(
119                                 _PROPERTY_DELIVERY_RETRYATTEMPTS)).getValue().get(
120                                     _maxDeliveryRetryAttempts);
121                     
122                         instance.getProperty(
123                             instance.findProperty(
124                                 PEGASUS_PROPERTYNAME_NAME)).getValue().get(
125                                    _indicationServiceName);
126 venkat.puvvada 1.1  
127                         CIMValue value = instance.getProperty(
128                             instance.findProperty(
129                                 _PROPERTY_DELIVERY_RETRYINTERVAL)).getValue();
130                     
131                         if (value.getType() == CIMTYPE_UINT32)
132                         {
133                             Uint32 tval;
134                             value.get(tval);
135                             _minDeliveryRetryIntervalUsec = Uint64(tval) * 1000000;
136                         }
137                         else
138                         {
139                             value.get(_minDeliveryRetryIntervalUsec);
140                             _minDeliveryRetryIntervalUsec*= 1000000;
141                         }
142                            // See DSP 1054 ver 1.1.0 Sec 7.10
143                         _sequenceIdentifierLifetimeUsec = _maxDeliveryRetryAttempts *
144                             _minDeliveryRetryIntervalUsec * 10;
145 karl           1.10 
146                         Uint32 subRemoveIntervalValue;
147                         instance.getProperty(
148                             instance.findProperty(
149                                 _PROPERTY_SUBSCRIPTIONREMOVALTIMEINTERVAL)).getValue().
150                                     get(subRemoveIntervalValue);
151                     
152                         _minSubscriptionRemovalTimeIntervalUsec =
153                             subRemoveIntervalValue * 1000000ULL;
154                     
155 venkat.puvvada 1.1      PEG_METHOD_EXIT();
156                     }
157                     
158                     void DestinationQueue::_initObjectManagerProperties()
159                     {
160                         PEG_METHOD_ENTER(TRC_IND_HANDLER,
161                             "DestinationQueue::_initObjectManagerProperties");
162                     
163                         CIMInstance instance =
164                            _getInstance(PEGASUS_CLASSNAME_PG_OBJECTMANAGER);
165                     
166                         instance.getProperty(
167                             instance.findProperty(
168                                 PEGASUS_PROPERTYNAME_NAME)).getValue().get(
169                                     _objectManagerName);
170                         PEG_METHOD_EXIT();
171                     }
172                     
173                     DestinationQueue::DestinationQueue(
174                         const CIMInstance &handler):_handler(handler)
175                     {
176 venkat.puvvada 1.1      PEG_METHOD_ENTER(TRC_IND_HANDLER,
177                             "DestinationQueue::DestinationQueue");
178                     
179                         if (!_initialized)
180                         {
181                             AutoMutex mtx(_intializeMutex);
182                             if (!_initialized)
183                             {
184                                 try
185                                 {
186                                     PEG_TRACE_CSTRING(TRC_IND_HANDLER, Tracer::LEVEL4,
187                                         "Initializaing the Destination Queue");
188                                     _initIndicationServiceProperties();
189                                     _initObjectManagerProperties();
190 karl           1.10                 _indicationServiceQid = MessageQueueService::find_service_qid(
191                                       PEGASUS_QUEUENAME_INDICATIONSERVICE);
192 venkat.puvvada 1.1              }
193                                 catch(const Exception &e)
194                                 {
195                                     PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
196                                         "Exception %s caught while initializing the "
197                                             "DestinationQueue, using default values.",
198                                         (const char*)e.getMessage().getCString()));
199                                 }
200                                 catch(...)
201                                 {
202                                     PEG_TRACE_CSTRING(TRC_IND_HANDLER,Tracer::LEVEL1,
203                                         "Unknown exception caught while initializing the "
204                                             "DestinationQueue, using default values.");
205                                 }
206                                 _initialized = true;
207                             }
208                         }
209                     
210                         // Build the sequence context
211                         _sequenceContext = _indicationServiceName;
212                         _sequenceContext.append("-");
213 venkat.puvvada 1.1      _sequenceContext.append(_objectManagerName);
214                         _sequenceContext.append("-");
215                     
216 venkat.puvvada 1.8      _connection = 0;
217                     
218 venkat.puvvada 1.1      Uint32 len = 0;
219                         char buffer[22];
220                         const char* ptr = Uint64ToString(buffer, _serverStartupTimeUsec,len);
221                         _sequenceContext.append(String(ptr, len));
222                         _sequenceContext.append("-");
223                     
224                         Uint32 idx = handler.findProperty(
225                             PEGASUS_PROPERTYNAME_LSTNRDST_CREATIONTIME);
226                     
227                         if (idx != PEG_NOT_FOUND)
228                         {
229                             Uint64 tvalue;
230                             handler.getProperty(idx).getValue().get(tvalue);
231                             Uint32 llen = 0;
232                             char lbuffer[22];
233                             const char* lptr = Uint64ToString(lbuffer, tvalue, llen);
234                             _sequenceContext.append(String(lptr, llen));
235                         }
236                         else
237                         {
238                             _sequenceContext.append(String(ptr, len));
239 venkat.puvvada 1.1      }
240                         _lastDeliveryRetryStatus = FAIL;
241                     
242                         _sequenceNumber = 0;
243                         _queueFullDroppedIndications = 0;
244                         _lifetimeExpiredIndications = 0;
245                         _retryAttemptsExceededIndications = 0;
246                         _subscriptionDeleteDroppedIndications = 0;
247                         _calcMaxQueueSize = true;
248                         _maxIndicationDeliveryQueueSize = 2400;
249                     
250 karl           1.10     _lastSuccessfulDeliveryTimeUsec =
251                             _queueCreationTimeUsec = System::getCurrentTimeUsec();
252                     
253 venkat.puvvada 1.1      PEG_METHOD_EXIT();
254                     }
255                     
256                     DestinationQueue::~DestinationQueue()
257                     {
258                         PEG_METHOD_ENTER(TRC_IND_HANDLER,
259                             "DestinationQueue::~DestinationQueue");
260                     
261 marek          1.2      if (_queue.size() || _lastDeliveryRetryStatus == PENDING)
262 venkat.puvvada 1.1      {
263                             _cleanup(LISTENER_NOT_ACTIVE);
264                         }
265 venkat.puvvada 1.8      delete _connection;
266 venkat.puvvada 1.1  
267                         PEG_METHOD_EXIT();
268                     }
269                     
270                     Sint64 DestinationQueue::getSequenceNumber()
271                     {
272                         AutoMutex mtx(_queueMutex);
273                     
274                         // Determine max queue size, See PEP 324 for the algorithm
275                         if (_calcMaxQueueSize)
276                         {
277                             if ((System::getCurrentTimeUsec() - _queueCreationTimeUsec)
278                                 >= _sequenceIdentifierLifetimeUsec)
279                             {
280                                 // (10 * DeliveryRetryInterval * DeliveryRetryAttempts) /
281                                 // (Number of indications arrived over
282                                 //   sequence-identifier-lifetime.)
283                     
284                                 _maxIndicationDeliveryQueueSize = _sequenceNumber;
285                     
286                                 if (_maxIndicationDeliveryQueueSize < 200)
287 venkat.puvvada 1.1              {
288                                     _maxIndicationDeliveryQueueSize = 200;
289                                 }
290                                 else if (_maxIndicationDeliveryQueueSize > 2400)
291                                 {
292                                     _maxIndicationDeliveryQueueSize = 2400;
293                                 }
294                                 _calcMaxQueueSize = false;
295                             }
296                         }
297                     
298                         Sint64 nextSequenceNumber = _sequenceNumber++;
299                     
300                         if (_sequenceNumber < 0)
301                         {
302                             _sequenceNumber = 0;
303                         }
304                     
305                         return nextSequenceNumber;
306                     }
307                     
308 venkat.puvvada 1.1  String DestinationQueue::_getSequenceContext(
309                         const CIMInstance &indication)
310                     {
311                         String sequenceContext;
312                     
313                         indication.getProperty(
314                             indication.findProperty(
315                                 _PROPERTY_SEQUENCECONTEXT)).getValue().get(sequenceContext);
316                     
317                         return sequenceContext;
318                     }
319                     
320                     Sint64 DestinationQueue::_getSequenceNumber(
321                         const CIMInstance &indication)
322                     {
323                         Sint64 sequenceNumber;
324                     
325                         indication.getProperty(
326                             indication.findProperty(
327                                 _PROPERTY_SEQUENCENUMBER)).getValue().get(sequenceNumber);
328                     
329 venkat.puvvada 1.1      return sequenceNumber;
330                     }
331                     
332 venkat.puvvada 1.4  void DestinationQueue::_logDiscardedIndication(
333                         Uint32 reasonCode,
334                         const CIMInstance &indication,
335                         const String &message)
336 venkat.puvvada 1.1  {
337                         PEGASUS_ASSERT(reasonCode <
338 venkat.puvvada 1.6          sizeof(indDiscardedReasonMsgs)/sizeof(IndDiscardedReasonMsgs));
339 venkat.puvvada 1.1  
340 venkat.puvvada 1.6      if (reasonCode == DRA_EXCEEDED)
341                         {
342                             Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING,
343                                 MessageLoaderParms(
344                                     indDiscardedReasonMsgs[reasonCode].key,
345                                     indDiscardedReasonMsgs[reasonCode].msg,
346                                     (const char*)_getSequenceContext(indication).getCString(),
347                                     _getSequenceNumber(indication),
348                                     (const char*)message.getCString()));
349                         }
350                         else
351                         {
352                             Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING,
353                                 MessageLoaderParms(
354                                     indDiscardedReasonMsgs[reasonCode].key,
355                                     indDiscardedReasonMsgs[reasonCode].msg,
356                                     (const char*)_getSequenceContext(indication).getCString(),
357                                     _getSequenceNumber(indication)));
358 karl           1.10 
359 venkat.puvvada 1.6      }
360 venkat.puvvada 1.1  }
361                     
362 marek          1.2  void DestinationQueue::enqueue(CIMHandleIndicationRequestMessage *message)
363 venkat.puvvada 1.1  {
364                         PEG_METHOD_ENTER(TRC_IND_HANDLER,
365                             "DestinationQueue::enqueue");
366                     
367 marek          1.2      Uint32 idx;
368                         CIMProperty prop;
369                         CIMInstance &indication = message->indicationInstance;
370                     
371                         if ((idx = indication.findProperty(_PROPERTY_SEQUENCECONTEXT))
372                             != PEG_NOT_FOUND)
373                         {
374                             prop = indication.getProperty(idx);
375                             prop.setValue(getSequenceContext());
376                             indication.removeProperty(idx);
377                         }
378                         else
379                         {
380                             prop = CIMProperty(
381                                 _PROPERTY_SEQUENCECONTEXT,
382                                 getSequenceContext());
383                         }
384                         indication.addProperty(prop);
385                     
386 venkat.puvvada 1.1      AutoMutex mtx(_queueMutex);
387 marek          1.2      Sint64 sequenceNumber = getSequenceNumber();
388                         if ((idx = indication.findProperty(_PROPERTY_SEQUENCENUMBER))
389                             != PEG_NOT_FOUND)
390                         {
391                             prop = indication.getProperty(idx);
392                             prop.setValue(sequenceNumber);
393                             indication.removeProperty(idx);
394                         }
395                         else
396                         {
397                             prop = CIMProperty(
398                                 _PROPERTY_SEQUENCENUMBER,
399                                 sequenceNumber);
400                         }
401                         indication.addProperty(prop);
402                     
403 venkat.puvvada 1.7      DeliveryStatusAggregator *aggregator = 0;
404                         if (message->deliveryStatusAggregator &&
405                             message->deliveryStatusAggregator->waitUntilDelivered)
406                         {
407                             aggregator = message->deliveryStatusAggregator;
408                         }
409                     
410 marek          1.2      IndicationInfo *info = new IndicationInfo(
411                             message->indicationInstance,
412                             message->subscriptionInstance,
413                             message->operationContext,
414                             message->nameSpace.getString(),
415 venkat.puvvada 1.5          this,
416 venkat.puvvada 1.7          aggregator);
417 venkat.puvvada 1.5  
418 venkat.puvvada 1.1      _queue.insert_back(info);
419                     
420 marek          1.2      info->lastDeliveryRetryTimeUsec = 0;
421                         info->arrivalTimeUsec = System::getCurrentTimeUsec();
422 venkat.puvvada 1.1  
423                         if (_queue.size() > _maxIndicationDeliveryQueueSize)
424                         {
425                             _queueFullDroppedIndications++;
426                             IndicationInfo *temp = _queue.remove_front();
427 venkat.puvvada 1.4          _logDiscardedIndication(
428 venkat.puvvada 1.1              DESTINATIONQUEUE_FULL,
429                                 temp->indication);
430                             delete temp;
431                         }
432 marek          1.2  
433 venkat.puvvada 1.1      PEG_METHOD_EXIT();
434                     }
435                     
436                     void DestinationQueue::updateDeliveryRetrySuccess(IndicationInfo *info)
437                     {
438                         PEG_METHOD_ENTER(TRC_IND_HANDLER,
439                             "DestinationQueue::updateDeliveryRetrySuccess");
440                     
441                         AutoMutex mtx(_queueMutex);
442 venkat.puvvada 1.8  
443 venkat.puvvada 1.1      PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
444                         _lastSuccessfulDeliveryTimeUsec = System::getCurrentTimeUsec();
445                         _lastDeliveryRetryStatus = SUCCESS;
446                     
447                         PEG_TRACE((TRC_IND_HANDLER, Tracer::LEVEL4,
448                             "Indication with SequenceContext %s and SequenceNumber %"
449                                 PEGASUS_64BIT_CONVERSION_WIDTH "d is successfully delivered",
450                             (const char*)_getSequenceContext(info->indication).getCString(),
451                             _getSequenceNumber(info->indication)));
452                     
453                         delete info;
454                     
455                         PEG_METHOD_EXIT();
456                     }
457                     
458                     void DestinationQueue::updateDeliveryRetryFailure(
459                         IndicationInfo *info,
460                         const CIMException &e)
461                     {
462                         PEG_METHOD_ENTER(TRC_IND_HANDLER,
463                             "DestinationQueue::updateDeliveryRetryFailure");
464 venkat.puvvada 1.1  
465                         AutoMutex mtx(_queueMutex);
466                     
467 venkat.puvvada 1.8      // We should not have any connection object here because indication
468                         // delivery has failed.
469                         PEGASUS_ASSERT(!_connection);
470                     
471 venkat.puvvada 1.1      PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
472                         _lastDeliveryRetryStatus = FAIL;
473                         info->deliveryRetryAttemptsMade++;
474                     
475 karl           1.10     // If the last successful delivery time is greater than or equal to
476                         // SubscriptionRemovalTimeInterval, send message to indication service
477                         // to reconcile OnFatalErrorPolicy
478                         if (System::getCurrentTimeUsec() - _lastSuccessfulDeliveryTimeUsec >=
479                             _minSubscriptionRemovalTimeIntervalUsec)
480                         {
481                             CIMProcessIndicationResponseMessage *response =
482                                 new CIMProcessIndicationResponseMessage(
483                                     XmlWriter::getNextMessageId(),
484                                     CIMException(CIM_ERR_FAILED),
485                                     QueueIdStack(_indicationServiceQid),
486                                     String(),
487                                     info->subscription);
488                             response->dest = _indicationServiceQid;
489                             MessageQueueService::SendForget(response);
490                         }
491                     
492 marek          1.2      // Check for DeliveryRetryAttempts by adding the original delivery attempt.
493                         if (info->deliveryRetryAttemptsMade >= _maxDeliveryRetryAttempts + 1)
494 venkat.puvvada 1.1      {
495                             _retryAttemptsExceededIndications++;
496 venkat.puvvada 1.4          _logDiscardedIndication(
497 venkat.puvvada 1.1              DRA_EXCEEDED,
498 venkat.puvvada 1.4              info->indication,
499                                 e.getMessage());
500 venkat.puvvada 1.1          delete info;
501                         }
502 marek          1.2      else if (_queue.size() >= _maxIndicationDeliveryQueueSize)
503 venkat.puvvada 1.1      {
504                             _queueFullDroppedIndications++;
505 venkat.puvvada 1.4          _logDiscardedIndication(
506 venkat.puvvada 1.1              DESTINATIONQUEUE_FULL,
507                                 info->indication);
508                             delete info;
509                         }
510                         else
511                         {
512 marek          1.2          // To deliver the indications in the correct order, insert the
513                             // delivery retry failed indications at the front of the queue.
514                             _queue.insert_front(info);
515 venkat.puvvada 1.4          PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
516                                 "Delivery failure for indication with SequenceContext %s and "
517                                     "SequenceNumber %" PEGASUS_64BIT_CONVERSION_WIDTH "d."
518                                         " DeliveryRetryAttempts made %u. Exception : %s",
519                                 (const char*)_getSequenceContext(info->indication).getCString(),
520                                 _getSequenceNumber(info->indication),
521                                 info->deliveryRetryAttemptsMade,
522                                 (const char*)e.getMessage().getCString()));
523 venkat.puvvada 1.1          info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec();
524                         }
525                     
526                         PEG_METHOD_EXIT();
527                     }
528                     
529                     void DestinationQueue::_waitForNonPendingDeliveryStatus()
530                     {
531                         PEG_METHOD_ENTER(TRC_IND_HANDLER,
532                             "DestinationQueue::_waitForNonPendingDeliveryStatus");
533                     
534                         while (true)
535                         {
536                             {
537                                 AutoMutex mtx(_queueMutex);
538                                 if (_lastDeliveryRetryStatus != PENDING)
539                                 {
540                                     break;
541                                 }
542                             }
543                             Threads::yield();
544 venkat.puvvada 1.1          Threads::sleep(50);
545                         }
546                         PEG_METHOD_EXIT();
547                     }
548                     
549                     void DestinationQueue::deleteMatchedIndications(
550                         const CIMObjectPath &subscriptionPath)
551                     {
552                         PEG_METHOD_ENTER(TRC_IND_HANDLER,
553                             "DestinationQueue::deleteMatchedIndications");
554                     
555                         _waitForNonPendingDeliveryStatus();
556                     
557                         IndicationInfo *info;
558                         AutoMutex mtx(_queueMutex);
559                     
560                         for(Uint32 i = 0, n = _queue.size(); i < n; ++i)
561                         {
562                             info = _queue.remove_front();
563                             if (info->subscription.getPath().identical(subscriptionPath))
564                             {
565 venkat.puvvada 1.1              _subscriptionDeleteDroppedIndications++;
566 venkat.puvvada 1.4              _logDiscardedIndication(
567 venkat.puvvada 1.1                  SUBSCRIPTION_NOT_ACTIVE,
568                                     info->indication);
569                                 delete info;
570                             }
571                             else
572                             {
573                                 _queue.insert_back(info);
574                             }
575                         }
576                         PEG_METHOD_EXIT();
577                     }
578                     
579                     void DestinationQueue::cleanup()
580                     {
581                         PEG_METHOD_ENTER(TRC_IND_HANDLER,
582                             "DestinationQueue::cleanup");
583                     
584                         _cleanup(LISTENER_NOT_ACTIVE);
585                     
586                         PEG_METHOD_EXIT();
587                     }
588 venkat.puvvada 1.1  
589                     void DestinationQueue::shutdown()
590                     {
591                         PEG_METHOD_ENTER(TRC_IND_HANDLER,
592                             "DestinationQueue::shutdown");
593                     
594                         _cleanup(CIMSERVER_SHUTDOWN);
595                     
596                         PEG_METHOD_EXIT();
597                     }
598                     
599                     void DestinationQueue::_cleanup(int reasonCode)
600                     {
601                         _waitForNonPendingDeliveryStatus();
602                     
603                         IndicationInfo *info;
604                         while ((info = _queue.remove_front()))
605                         {
606 venkat.puvvada 1.4          _logDiscardedIndication(
607 venkat.puvvada 1.1              reasonCode,
608                                 info->indication);
609                             delete info;
610                         }
611                     }
612                     
613                     IndicationInfo* DestinationQueue::getNextIndicationForDelivery(
614                         Uint64 &timeNowUsec, Uint64 &nextIndDRIExpTimeUsec)
615                     {
616                         AutoMutex mtx(_queueMutex);
617                     
618                         if (!_queue.size() || _lastDeliveryRetryStatus == PENDING)
619                         {
620 venkat.puvvada 1.3          // Maximum expiration time is equals to DeliveryRetryInterval.
621                             nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec;
622 venkat.puvvada 1.8  
623                             // If there are no indications in the queue, delete connection.
624                             if (!_queue.size() && _lastDeliveryRetryStatus != PENDING)
625                             {
626                                 delete _connection;
627                                 _connection = 0;
628                             }
629 venkat.puvvada 1.1          return 0;
630                         }
631                     
632 venkat.puvvada 1.3      nextIndDRIExpTimeUsec = 0;
633                     
634 venkat.puvvada 1.1      IndicationInfo *info;
635                     
636                         while (_queue.size())
637                         {
638                             info = _queue.front();
639                     
640                             if (timeNowUsec < info->arrivalTimeUsec ||
641                                 timeNowUsec < info->lastDeliveryRetryTimeUsec)
642                             {
643                                 timeNowUsec = System::getCurrentTimeUsec();
644                             }
645                     
646                             if ((timeNowUsec - info->arrivalTimeUsec) >=
647                                 _sequenceIdentifierLifetimeUsec)
648                             {
649                                 _lifetimeExpiredIndications++;
650                                 IndicationInfo *temp = _queue.remove_front();
651 venkat.puvvada 1.4              _logDiscardedIndication(
652 venkat.puvvada 1.1                  SIL_EXPIRED,
653                                     temp->indication);
654                                 delete temp;
655                             }
656                             else if ((timeNowUsec - info->lastDeliveryRetryTimeUsec)
657                                 >= _minDeliveryRetryIntervalUsec)
658                             {
659                                 _lastDeliveryRetryStatus = PENDING;
660                                 _queue.remove_front();
661                                 IndicationInfo *temp = _queue.front();
662                     
663 marek          1.2              // The following algorithm is used to determine the elapsed
664                                 // DeliveryRetryAttempts. To deliver the indication in order,
665                                 // Server delays the newer indications until older indications
666                                 // in the queue are attempted for delivery and their
667                                 // DeliveryRetyAttempts are exceeded. The following algorithm
668                                 // ensures that indications won't stay in the queue more than
669                                 // (DeliveryRetryInterval * (DeliveryRetyAttempts + 1) time.
670                                 Uint32 elapsedDeliveryRetryAttempts;
671                                 if (info->lastDeliveryRetryTimeUsec)
672                                 {
673                                     elapsedDeliveryRetryAttempts =
674 venkat.puvvada 1.8                      ((timeNowUsec - info->lastDeliveryRetryTimeUsec)
675 marek          1.2                          / _minDeliveryRetryIntervalUsec);
676                                 }
677                                 else
678                                 {
679 venkat.puvvada 1.8                  elapsedDeliveryRetryAttempts =
680                                         ((timeNowUsec - info->arrivalTimeUsec)
681 marek          1.2                          / _minDeliveryRetryIntervalUsec);
682                                 }
683                     
684                                 if (elapsedDeliveryRetryAttempts)
685                                 {
686                                     info->deliveryRetryAttemptsMade +=
687                                         elapsedDeliveryRetryAttempts - 1;
688                                 }
689                     
690 venkat.puvvada 1.1              if (temp)
691                                 {
692                                     if (timeNowUsec - temp->lastDeliveryRetryTimeUsec
693                                             < _minDeliveryRetryIntervalUsec)
694                                     {
695                                         nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
696                                             (timeNowUsec - temp->lastDeliveryRetryTimeUsec);
697                                     }
698                     
699                                     PEGASUS_ASSERT(nextIndDRIExpTimeUsec
700                                         <= _minDeliveryRetryIntervalUsec);
701                                 }
702                     
703                                 return info;
704                             }
705                             else
706                             {
707                                 if (timeNowUsec - info->lastDeliveryRetryTimeUsec
708                                         < _minDeliveryRetryIntervalUsec)
709                                 {
710                                     nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
711 venkat.puvvada 1.1                      (timeNowUsec - info->lastDeliveryRetryTimeUsec);
712                                 }
713                     
714                                 PEGASUS_ASSERT(nextIndDRIExpTimeUsec
715                                     <= _minDeliveryRetryIntervalUsec);
716                     
717                                 break;
718                             }
719                         }
720                     
721                         return 0;
722                     }
723                     
724 ashok.pathak   1.9  void DestinationQueue::setDeliveryRetryAttempts( Uint16 DeliveryRetryAttempts )
725                     {
726                         AutoMutex mtx(_intializeMutex);
727                         _maxDeliveryRetryAttempts = DeliveryRetryAttempts ;
728                         _sequenceIdentifierLifetimeUsec = _maxDeliveryRetryAttempts *
729                             _minDeliveryRetryIntervalUsec * 10;
730                     }
731                     
732                     void DestinationQueue::setminDeliveryRetryInterval(
733                         Uint32 minDeliveryRetryInterval)
734                     {
735                         AutoMutex mtx(_intializeMutex);
736                         _minDeliveryRetryIntervalUsec =  Uint64(minDeliveryRetryInterval)*1000000 ;
737                         _sequenceIdentifierLifetimeUsec = _maxDeliveryRetryAttempts *
738                             _minDeliveryRetryIntervalUsec * 10;
739                     }
740                     
741 venkat.puvvada 1.1  void DestinationQueue::getInfo(QueueInfo &qinfo)
742                     {
743                         AutoMutex mtx(_queueMutex);
744                     
745                         qinfo.handlerName = _handler.getPath();
746                         qinfo.queueCreationTimeUsec = _queueCreationTimeUsec;
747                         qinfo.sequenceContext = _sequenceContext;
748                         qinfo.nextSequenceNumber = _sequenceNumber;
749                         qinfo.maxQueueLength = _maxIndicationDeliveryQueueSize;
750                         qinfo.sequenceIdentifierLifetimeSeconds =
751                             _sequenceIdentifierLifetimeUsec / 1000000;
752                         qinfo.size = _queue.size();
753                         qinfo.queueFullDroppedIndications = _queueFullDroppedIndications;
754                         qinfo.lifetimeExpiredIndications = _lifetimeExpiredIndications;
755                         qinfo.retryAttemptsExceededIndications = _retryAttemptsExceededIndications;
756                         qinfo.subscriptionDisableDroppedIndications =
757                             _subscriptionDeleteDroppedIndications;
758 karl           1.10     /* If the last successful delivery time is equals to the queue creation
759                          * time, indication delivery for this destination was never successful
760                          */
761                         qinfo.lastSuccessfulDeliveryTimeUsec =
762                           _lastSuccessfulDeliveryTimeUsec == _queueCreationTimeUsec ? 0 :
763                               _lastSuccessfulDeliveryTimeUsec;
764 venkat.puvvada 1.1  }
765                     
766                     PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2