(file) Return to DestinationQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / HandlerService

  1 venkat.puvvada 1.1 //%LICENSE////////////////////////////////////////////////////////////////
  2                    //
  3                    // Licensed to The Open Group (TOG) under one or more contributor license
  4                    // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
  5                    // this work for additional information regarding copyright ownership.
  6                    // Each contributor licenses this file to you under the OpenPegasus Open
  7                    // Source License; you may not use this file except in compliance with the
  8                    // License.
  9                    //
 10                    // Permission is hereby granted, free of charge, to any person obtaining a
 11                    // copy of this software and associated documentation files (the "Software"),
 12                    // to deal in the Software without restriction, including without limitation
 13                    // the rights to use, copy, modify, merge, publish, distribute, sublicense,
 14                    // and/or sell copies of the Software, and to permit persons to whom the
 15                    // Software is furnished to do so, subject to the following conditions:
 16                    //
 17                    // The above copyright notice and this permission notice shall be included
 18                    // in all copies or substantial portions of the Software.
 19                    //
 20                    // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 21                    // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 22 venkat.puvvada 1.1 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 23                    // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 24                    // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 25                    // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 26                    // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27                    //
 28                    //////////////////////////////////////////////////////////////////////////
 29                    //
 30                    //%/////////////////////////////////////////////////////////////////////////////
 31                    
 32                    #include <Pegasus/Common/Time.h>
 33                    #include <Pegasus/Common/Tracer.h>
 34                    #include <Pegasus/Common/Constants.h>
 35                    #include <Pegasus/Common/StringConversion.h>
 36                    #include <Pegasus/Config/ConfigManager.h>
 37                    #include <Pegasus/Provider/CIMOMHandle.h>
 38                    #include "DestinationQueue.h"
 39                    
 40                    PEGASUS_NAMESPACE_BEGIN
 41                    
 42                    // Initialize with default values.
 43 venkat.puvvada 1.1 Uint16 DestinationQueue::_maxDeliveryRetryAttempts = 3;
 44                    Uint64 DestinationQueue::_minDeliveryRetryIntervalUsec = 20 * 1000000;
 45                    Uint64 DestinationQueue::_sequenceIdentifierLifetimeUsec = 600 * 1000000;
 46                    String DestinationQueue::_indicationServiceName = "PG:IndicationService";
 47                    String DestinationQueue::_objectManagerName = "Pegasus";
 48                    
 49 venkat.puvvada 1.6 DestinationQueue::IndDiscardedReasonMsgs
 50                        DestinationQueue::indDiscardedReasonMsgs[] = {
 51                        {"HandlerService.DestinationQueue.INDICATION_DISCARDED_LD_DELETED",
 52                         "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\" was"
 53                             " not delivered due to the corresponding listener destination"
 54                             " instance was removed."},
 55                    
 56                        {"HandlerService.DestinationQueue.INDICATION_DISCARDED_"
 57                             "SUBSCRIPTION_DELETED",
 58                         "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\" was"
 59                             " not delivered due to the corresponding subscription was disabled"
 60                             " or deleted."},
 61                    
 62                        {"HandlerService.DestinationQueue.INDICATION_DISCARDED_"
 63                             "DESTINATIONQUEUE_FULL",
 64                         "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
 65                             " was discarded due to the destination queue was full."},
 66                    
 67                        {"HandlerService.DestinationQueue.INDICATION_DISCARDED_SIL_EXPIRED",
 68                         "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
 69                             " was not delivered due to the sequence identifier lifetime expired."},
 70 venkat.puvvada 1.6 
 71                        {"HandlerService.DestinationQueue.INDICATION_DISCARDED_DRA_EXCEEDED",
 72                         "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
 73                             " was not delivered due to the maximum delivery retry"
 74                             " attempts exceeded. Exception : $2"},
 75                    
 76                        {"HandlerService.DestinationQueue.INDICATION_DISCARDED_CIMSERVER_SHUTDOWN",
 77                         "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
 78                              " was not delivered due to the cimserver shutdown."}
 79 venkat.puvvada 1.1 };
 80                    
 81                    Uint64 DestinationQueue::_serverStartupTimeUsec
 82                        = System::getCurrentTimeUsec();
 83                    
 84                    Boolean DestinationQueue::_initialized = false;
 85                    static Mutex _intializeMutex;
 86                    
 87                    CIMInstance DestinationQueue::_getInstance(const CIMName &className)
 88                    {
 89                        CIMOMHandle cimomHandle;
 90                        Array<CIMInstance> instances =
 91                            cimomHandle.enumerateInstances(
 92                                OperationContext(),
 93                                PEGASUS_NAMESPACENAME_INTEROP,
 94                                className,
 95                                true,
 96                                false,
 97                                false,
 98                                false,
 99                                CIMPropertyList());
100 venkat.puvvada 1.1 
101                        return instances[0];
102                    }
103                    
104                    void DestinationQueue::_initIndicationServiceProperties()
105                    {
106                        PEG_METHOD_ENTER(TRC_IND_HANDLER,
107                            "DestinationQueue::_initIndicationServiceProperties");
108                    
109                        CIMInstance instance =
110                            _getInstance(PEGASUS_CLASSNAME_CIM_INDICATIONSERVICE);
111                    
112                        instance.getProperty(
113                            instance.findProperty(
114                                _PROPERTY_DELIVERY_RETRYATTEMPTS)).getValue().get(
115                                    _maxDeliveryRetryAttempts);
116                    
117                        instance.getProperty(
118                            instance.findProperty(
119                                PEGASUS_PROPERTYNAME_NAME)).getValue().get(
120                                   _indicationServiceName);
121 venkat.puvvada 1.1 
122                        CIMValue value = instance.getProperty(
123                            instance.findProperty(
124                                _PROPERTY_DELIVERY_RETRYINTERVAL)).getValue();
125                    
126                        if (value.getType() == CIMTYPE_UINT32)
127                        {
128                            Uint32 tval;
129                            value.get(tval);
130                            _minDeliveryRetryIntervalUsec = Uint64(tval) * 1000000;
131                        }
132                        else
133                        {
134                            value.get(_minDeliveryRetryIntervalUsec);
135                            _minDeliveryRetryIntervalUsec*= 1000000;
136                        }
137                           // See DSP 1054 ver 1.1.0 Sec 7.10
138                        _sequenceIdentifierLifetimeUsec = _maxDeliveryRetryAttempts *
139                            _minDeliveryRetryIntervalUsec * 10;
140                        PEG_METHOD_EXIT();
141                    }
142 venkat.puvvada 1.1 
143                    void DestinationQueue::_initObjectManagerProperties()
144                    {
145                        PEG_METHOD_ENTER(TRC_IND_HANDLER,
146                            "DestinationQueue::_initObjectManagerProperties");
147                    
148                        CIMInstance instance =
149                           _getInstance(PEGASUS_CLASSNAME_PG_OBJECTMANAGER);
150                    
151                        instance.getProperty(
152                            instance.findProperty(
153                                PEGASUS_PROPERTYNAME_NAME)).getValue().get(
154                                    _objectManagerName);
155                        PEG_METHOD_EXIT();
156                    }
157                    
158                    DestinationQueue::DestinationQueue(
159                        const CIMInstance &handler):_handler(handler)
160                    {
161                        PEG_METHOD_ENTER(TRC_IND_HANDLER,
162                            "DestinationQueue::DestinationQueue");
163 venkat.puvvada 1.1 
164                        if (!_initialized)
165                        {
166                            AutoMutex mtx(_intializeMutex);
167                            if (!_initialized)
168                            {
169                                try
170                                {
171                                    PEG_TRACE_CSTRING(TRC_IND_HANDLER, Tracer::LEVEL4,
172                                        "Initializaing the Destination Queue");
173                                    _initIndicationServiceProperties();
174                                    _initObjectManagerProperties();
175                                }
176                                catch(const Exception &e)
177                                {
178                                    PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
179                                        "Exception %s caught while initializing the "
180                                            "DestinationQueue, using default values.",
181                                        (const char*)e.getMessage().getCString()));
182                                }
183                                catch(...)
184 venkat.puvvada 1.1             {
185                                    PEG_TRACE_CSTRING(TRC_IND_HANDLER,Tracer::LEVEL1,
186                                        "Unknown exception caught while initializing the "
187                                            "DestinationQueue, using default values.");
188                                }
189                                _initialized = true;
190                            }
191                        }
192                    
193                        // Build the sequence context
194                        _sequenceContext = _indicationServiceName;
195                        _sequenceContext.append("-");
196                        _sequenceContext.append(_objectManagerName);
197                        _sequenceContext.append("-");
198                    
199                        Uint32 len = 0;
200                        char buffer[22];
201                        const char* ptr = Uint64ToString(buffer, _serverStartupTimeUsec,len);
202                        _sequenceContext.append(String(ptr, len));
203                        _sequenceContext.append("-");
204                    
205 venkat.puvvada 1.1     Uint32 idx = handler.findProperty(
206                            PEGASUS_PROPERTYNAME_LSTNRDST_CREATIONTIME);
207                    
208                        if (idx != PEG_NOT_FOUND)
209                        {
210                            Uint64 tvalue;
211                            handler.getProperty(idx).getValue().get(tvalue);
212                            Uint32 llen = 0;
213                            char lbuffer[22];
214                            const char* lptr = Uint64ToString(lbuffer, tvalue, llen);
215                            _sequenceContext.append(String(lptr, llen));
216                        }
217                        else
218                        {
219                            _sequenceContext.append(String(ptr, len));
220                        }
221                        _lastDeliveryRetryStatus = FAIL;
222                    
223                        _sequenceNumber = 0;
224                        _queueFullDroppedIndications = 0;
225                        _lifetimeExpiredIndications = 0;
226 venkat.puvvada 1.1     _retryAttemptsExceededIndications = 0;
227                        _subscriptionDeleteDroppedIndications = 0;
228                        _calcMaxQueueSize = true;
229                        _lastSuccessfulDeliveryTimeUsec = 0;
230                        _maxIndicationDeliveryQueueSize = 2400;
231                    
232                        _queueCreationTimeUsec = System::getCurrentTimeUsec();
233                        PEG_METHOD_EXIT();
234                    }
235                    
236                    DestinationQueue::~DestinationQueue()
237                    {
238                        PEG_METHOD_ENTER(TRC_IND_HANDLER,
239                            "DestinationQueue::~DestinationQueue");
240                    
241 marek          1.2     if (_queue.size() || _lastDeliveryRetryStatus == PENDING)
242 venkat.puvvada 1.1     {
243                            _cleanup(LISTENER_NOT_ACTIVE);
244                        }
245                    
246                        PEG_METHOD_EXIT();
247                    }
248                    
249                    Sint64 DestinationQueue::getSequenceNumber()
250                    {
251                        AutoMutex mtx(_queueMutex);
252                    
253                        // Determine max queue size, See PEP 324 for the algorithm
254                        if (_calcMaxQueueSize)
255                        {
256                            if ((System::getCurrentTimeUsec() - _queueCreationTimeUsec)
257                                >= _sequenceIdentifierLifetimeUsec)
258                            {
259                                // (10 * DeliveryRetryInterval * DeliveryRetryAttempts) /
260                                // (Number of indications arrived over
261                                //   sequence-identifier-lifetime.)
262                    
263 venkat.puvvada 1.1             _maxIndicationDeliveryQueueSize = _sequenceNumber;
264                    
265                                if (_maxIndicationDeliveryQueueSize < 200)
266                                {
267                                    _maxIndicationDeliveryQueueSize = 200;
268                                }
269                                else if (_maxIndicationDeliveryQueueSize > 2400)
270                                {
271                                    _maxIndicationDeliveryQueueSize = 2400;
272                                }
273                                _calcMaxQueueSize = false;
274                            }
275                        }
276                    
277                        Sint64 nextSequenceNumber = _sequenceNumber++;
278                    
279                        if (_sequenceNumber < 0)
280                        {
281                            _sequenceNumber = 0;
282                        }
283                    
284 venkat.puvvada 1.1     return nextSequenceNumber;
285                    }
286                    
287                    String DestinationQueue::_getSequenceContext(
288                        const CIMInstance &indication)
289                    {
290                        String sequenceContext;
291                    
292                        indication.getProperty(
293                            indication.findProperty(
294                                _PROPERTY_SEQUENCECONTEXT)).getValue().get(sequenceContext);
295                    
296                        return sequenceContext;
297                    }
298                    
299                    Sint64 DestinationQueue::_getSequenceNumber(
300                        const CIMInstance &indication)
301                    {
302                        Sint64 sequenceNumber;
303                    
304                        indication.getProperty(
305 venkat.puvvada 1.1         indication.findProperty(
306                                _PROPERTY_SEQUENCENUMBER)).getValue().get(sequenceNumber);
307                    
308                        return sequenceNumber;
309                    }
310                    
311 venkat.puvvada 1.4 void DestinationQueue::_logDiscardedIndication(
312                        Uint32 reasonCode,
313                        const CIMInstance &indication,
314                        const String &message)
315 venkat.puvvada 1.1 {
316                        PEGASUS_ASSERT(reasonCode <
317 venkat.puvvada 1.6         sizeof(indDiscardedReasonMsgs)/sizeof(IndDiscardedReasonMsgs));
318 venkat.puvvada 1.1 
319 venkat.puvvada 1.6     if (reasonCode == DRA_EXCEEDED)
320                        {
321                            Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING,
322                                MessageLoaderParms(
323                                    indDiscardedReasonMsgs[reasonCode].key,
324                                    indDiscardedReasonMsgs[reasonCode].msg,
325                                    (const char*)_getSequenceContext(indication).getCString(),
326                                    _getSequenceNumber(indication),
327                                    (const char*)message.getCString()));
328                        }
329                        else
330                        {
331                            Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING,
332                                MessageLoaderParms(
333                                    indDiscardedReasonMsgs[reasonCode].key,
334                                    indDiscardedReasonMsgs[reasonCode].msg,
335                                    (const char*)_getSequenceContext(indication).getCString(),
336                                    _getSequenceNumber(indication)));
337                                
338                        }
339 venkat.puvvada 1.1 }
340                    
341 marek          1.2 void DestinationQueue::enqueue(CIMHandleIndicationRequestMessage *message)
342 venkat.puvvada 1.1 {
343                        PEG_METHOD_ENTER(TRC_IND_HANDLER,
344                            "DestinationQueue::enqueue");
345                    
346 marek          1.2     Uint32 idx;
347                        CIMProperty prop;
348                        CIMInstance &indication = message->indicationInstance;
349                    
350                        if ((idx = indication.findProperty(_PROPERTY_SEQUENCECONTEXT))
351                            != PEG_NOT_FOUND)
352                        {
353                            prop = indication.getProperty(idx);
354                            prop.setValue(getSequenceContext());
355                            indication.removeProperty(idx);
356                        }
357                        else
358                        {
359                            prop = CIMProperty(
360                                _PROPERTY_SEQUENCECONTEXT,
361                                getSequenceContext());
362                        }
363                        indication.addProperty(prop);
364                    
365 venkat.puvvada 1.1     AutoMutex mtx(_queueMutex);
366 marek          1.2     Sint64 sequenceNumber = getSequenceNumber();
367                        if ((idx = indication.findProperty(_PROPERTY_SEQUENCENUMBER))
368                            != PEG_NOT_FOUND)
369                        {
370                            prop = indication.getProperty(idx);
371                            prop.setValue(sequenceNumber);
372                            indication.removeProperty(idx);
373                        }
374                        else
375                        {
376                            prop = CIMProperty(
377                                _PROPERTY_SEQUENCENUMBER,
378                                sequenceNumber);
379                        }
380                        indication.addProperty(prop);
381                    
382 venkat.puvvada 1.7     DeliveryStatusAggregator *aggregator = 0;
383                        if (message->deliveryStatusAggregator &&
384                            message->deliveryStatusAggregator->waitUntilDelivered)
385                        {
386                            aggregator = message->deliveryStatusAggregator;
387                        }
388                    
389                    
390 marek          1.2     IndicationInfo *info = new IndicationInfo(
391                            message->indicationInstance,
392                            message->subscriptionInstance,
393                            message->operationContext,
394                            message->nameSpace.getString(),
395 venkat.puvvada 1.5         this,
396 venkat.puvvada 1.7         aggregator);
397 venkat.puvvada 1.5 
398 venkat.puvvada 1.1     _queue.insert_back(info);
399                    
400 marek          1.2     info->lastDeliveryRetryTimeUsec = 0;
401                        info->arrivalTimeUsec = System::getCurrentTimeUsec();
402 venkat.puvvada 1.1 
403                        if (_queue.size() > _maxIndicationDeliveryQueueSize)
404                        {
405                            _queueFullDroppedIndications++;
406                            IndicationInfo *temp = _queue.remove_front();
407 venkat.puvvada 1.4         _logDiscardedIndication(
408 venkat.puvvada 1.1             DESTINATIONQUEUE_FULL,
409                                temp->indication);
410                            delete temp;
411                        }
412 marek          1.2 
413 venkat.puvvada 1.1     PEG_METHOD_EXIT();
414                    }
415                    
416                    void DestinationQueue::updateDeliveryRetrySuccess(IndicationInfo *info)
417                    {
418                        PEG_METHOD_ENTER(TRC_IND_HANDLER,
419                            "DestinationQueue::updateDeliveryRetrySuccess");
420                    
421                        AutoMutex mtx(_queueMutex);
422                        PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
423                        _lastSuccessfulDeliveryTimeUsec = System::getCurrentTimeUsec();
424                        _lastDeliveryRetryStatus = SUCCESS;
425                    
426                        PEG_TRACE((TRC_IND_HANDLER, Tracer::LEVEL4,
427                            "Indication with SequenceContext %s and SequenceNumber %"
428                                PEGASUS_64BIT_CONVERSION_WIDTH "d is successfully delivered",
429                            (const char*)_getSequenceContext(info->indication).getCString(),
430                            _getSequenceNumber(info->indication)));
431                    
432                        delete info;
433                    
434 venkat.puvvada 1.1     PEG_METHOD_EXIT();
435                    }
436                    
437                    void DestinationQueue::updateDeliveryRetryFailure(
438                        IndicationInfo *info,
439                        const CIMException &e)
440                    {
441                        PEG_METHOD_ENTER(TRC_IND_HANDLER,
442                            "DestinationQueue::updateDeliveryRetryFailure");
443                    
444                        AutoMutex mtx(_queueMutex);
445                    
446                        PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
447                        _lastDeliveryRetryStatus = FAIL;
448                        info->deliveryRetryAttemptsMade++;
449                    
450 marek          1.2     // Check for DeliveryRetryAttempts by adding the original delivery attempt.
451                        if (info->deliveryRetryAttemptsMade >= _maxDeliveryRetryAttempts + 1)
452 venkat.puvvada 1.1     {
453                            _retryAttemptsExceededIndications++;
454 venkat.puvvada 1.4         _logDiscardedIndication(
455 venkat.puvvada 1.1             DRA_EXCEEDED,
456 venkat.puvvada 1.4             info->indication,
457                                e.getMessage());
458 venkat.puvvada 1.1         delete info;
459                        }
460 marek          1.2     else if (_queue.size() >= _maxIndicationDeliveryQueueSize)
461 venkat.puvvada 1.1     {
462                            _queueFullDroppedIndications++;
463 venkat.puvvada 1.4         _logDiscardedIndication(
464 venkat.puvvada 1.1             DESTINATIONQUEUE_FULL,
465                                info->indication);
466                            delete info;
467                        }
468                        else
469                        {
470 marek          1.2         // To deliver the indications in the correct order, insert the
471                            // delivery retry failed indications at the front of the queue.
472                            _queue.insert_front(info);
473 venkat.puvvada 1.4         PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
474                                "Delivery failure for indication with SequenceContext %s and "
475                                    "SequenceNumber %" PEGASUS_64BIT_CONVERSION_WIDTH "d."
476                                        " DeliveryRetryAttempts made %u. Exception : %s",
477                                (const char*)_getSequenceContext(info->indication).getCString(),
478                                _getSequenceNumber(info->indication),
479                                info->deliveryRetryAttemptsMade,
480                                (const char*)e.getMessage().getCString()));
481 venkat.puvvada 1.1         info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec();
482                        }
483                    
484                        PEG_METHOD_EXIT();
485                    }
486                    
487                    void DestinationQueue::_waitForNonPendingDeliveryStatus()
488                    {
489                        PEG_METHOD_ENTER(TRC_IND_HANDLER,
490                            "DestinationQueue::_waitForNonPendingDeliveryStatus");
491                    
492                        while (true)
493                        {
494                            {
495                                AutoMutex mtx(_queueMutex);
496                                if (_lastDeliveryRetryStatus != PENDING)
497                                {
498                                    break;
499                                }
500                            }
501                            Threads::yield();
502 venkat.puvvada 1.1         Threads::sleep(50);
503                        }
504                        PEG_METHOD_EXIT();
505                    }
506                    
507                    void DestinationQueue::deleteMatchedIndications(
508                        const CIMObjectPath &subscriptionPath)
509                    {
510                        PEG_METHOD_ENTER(TRC_IND_HANDLER,
511                            "DestinationQueue::deleteMatchedIndications");
512                    
513                        _waitForNonPendingDeliveryStatus();
514                    
515                        IndicationInfo *info;
516                        AutoMutex mtx(_queueMutex);
517                    
518                        for(Uint32 i = 0, n = _queue.size(); i < n; ++i)
519                        {
520                            info = _queue.remove_front();
521                            if (info->subscription.getPath().identical(subscriptionPath))
522                            {
523 venkat.puvvada 1.1             _subscriptionDeleteDroppedIndications++;
524 venkat.puvvada 1.4             _logDiscardedIndication(
525 venkat.puvvada 1.1                 SUBSCRIPTION_NOT_ACTIVE,
526                                    info->indication);
527                                delete info;
528                            }
529                            else
530                            {
531                                _queue.insert_back(info);
532                            }
533                        }
534                        PEG_METHOD_EXIT();
535                    }
536                    
537                    void DestinationQueue::cleanup()
538                    {
539                        PEG_METHOD_ENTER(TRC_IND_HANDLER,
540                            "DestinationQueue::cleanup");
541                    
542                        _cleanup(LISTENER_NOT_ACTIVE);
543                    
544                        PEG_METHOD_EXIT();
545                    }
546 venkat.puvvada 1.1 
547                    void DestinationQueue::shutdown()
548                    {
549                        PEG_METHOD_ENTER(TRC_IND_HANDLER,
550                            "DestinationQueue::shutdown");
551                    
552                        _cleanup(CIMSERVER_SHUTDOWN);
553                    
554                        PEG_METHOD_EXIT();
555                    }
556                    
557                    void DestinationQueue::_cleanup(int reasonCode)
558                    {
559                        _waitForNonPendingDeliveryStatus();
560                    
561                        IndicationInfo *info;
562                        while ((info = _queue.remove_front()))
563                        {
564 venkat.puvvada 1.4         _logDiscardedIndication(
565 venkat.puvvada 1.1             reasonCode,
566                                info->indication);
567                            delete info;
568                        }
569                    }
570                    
571                    IndicationInfo* DestinationQueue::getNextIndicationForDelivery(
572                        Uint64 &timeNowUsec, Uint64 &nextIndDRIExpTimeUsec)
573                    {
574                        AutoMutex mtx(_queueMutex);
575                    
576                        if (!_queue.size() || _lastDeliveryRetryStatus == PENDING)
577                        {
578 venkat.puvvada 1.3         // Maximum expiration time is equals to DeliveryRetryInterval.
579                            nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec;
580 venkat.puvvada 1.1         return 0;
581                        }
582                    
583 venkat.puvvada 1.3     nextIndDRIExpTimeUsec = 0;
584                    
585 venkat.puvvada 1.1     IndicationInfo *info;
586                    
587                        while (_queue.size())
588                        {
589                            info = _queue.front();
590                    
591                            if (timeNowUsec < info->arrivalTimeUsec ||
592                                timeNowUsec < info->lastDeliveryRetryTimeUsec)
593                            {
594                                timeNowUsec = System::getCurrentTimeUsec();
595                            }
596                    
597                            if ((timeNowUsec - info->arrivalTimeUsec) >=
598                                _sequenceIdentifierLifetimeUsec)
599                            {
600                                _lifetimeExpiredIndications++;
601                                IndicationInfo *temp = _queue.remove_front();
602 venkat.puvvada 1.4             _logDiscardedIndication(
603 venkat.puvvada 1.1                 SIL_EXPIRED,
604                                    temp->indication);
605                                delete temp;
606                            }
607                            else if ((timeNowUsec - info->lastDeliveryRetryTimeUsec)
608                                >= _minDeliveryRetryIntervalUsec)
609                            {
610                                _lastDeliveryRetryStatus = PENDING;
611                                _queue.remove_front();
612                                IndicationInfo *temp = _queue.front();
613                    
614 marek          1.2             // The following algorithm is used to determine the elapsed
615                                // DeliveryRetryAttempts. To deliver the indication in order,
616                                // Server delays the newer indications until older indications
617                                // in the queue are attempted for delivery and their
618                                // DeliveryRetyAttempts are exceeded. The following algorithm
619                                // ensures that indications won't stay in the queue more than
620                                // (DeliveryRetryInterval * (DeliveryRetyAttempts + 1) time.
621                                Uint32 elapsedDeliveryRetryAttempts;
622                                if (info->lastDeliveryRetryTimeUsec)
623                                {
624                                    elapsedDeliveryRetryAttempts =
625                                        ((timeNowUsec - info->lastDeliveryRetryTimeUsec) 
626                                            / _minDeliveryRetryIntervalUsec);
627                                }
628                                else
629                                {
630                                    elapsedDeliveryRetryAttempts = 
631                                        ((timeNowUsec - info->arrivalTimeUsec) 
632                                            / _minDeliveryRetryIntervalUsec);
633                                }
634                    
635 marek          1.2             if (elapsedDeliveryRetryAttempts)
636                                {
637                                    info->deliveryRetryAttemptsMade +=
638                                        elapsedDeliveryRetryAttempts - 1;
639                                }
640                    
641 venkat.puvvada 1.1             if (temp)
642                                {
643                                    if (timeNowUsec - temp->lastDeliveryRetryTimeUsec
644                                            < _minDeliveryRetryIntervalUsec)
645                                    {
646                                        nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
647                                            (timeNowUsec - temp->lastDeliveryRetryTimeUsec);
648                                    }
649                    
650                                    PEGASUS_ASSERT(nextIndDRIExpTimeUsec
651                                        <= _minDeliveryRetryIntervalUsec);
652                                }
653                    
654                                return info;
655                            }
656                            else
657                            {
658                                if (timeNowUsec - info->lastDeliveryRetryTimeUsec
659                                        < _minDeliveryRetryIntervalUsec)
660                                {
661                                    nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
662 venkat.puvvada 1.1                     (timeNowUsec - info->lastDeliveryRetryTimeUsec);
663                                }
664                    
665                                PEGASUS_ASSERT(nextIndDRIExpTimeUsec
666                                    <= _minDeliveryRetryIntervalUsec);
667                    
668                                break;
669                            }
670                        }
671                    
672                        return 0;
673                    }
674                    
675                    void DestinationQueue::getInfo(QueueInfo &qinfo)
676                    {
677                        AutoMutex mtx(_queueMutex);
678                    
679                        qinfo.handlerName = _handler.getPath();
680                        qinfo.queueCreationTimeUsec = _queueCreationTimeUsec;
681                        qinfo.sequenceContext = _sequenceContext;
682                        qinfo.nextSequenceNumber = _sequenceNumber;
683 venkat.puvvada 1.1     qinfo.maxQueueLength = _maxIndicationDeliveryQueueSize;
684                        qinfo.sequenceIdentifierLifetimeSeconds =
685                            _sequenceIdentifierLifetimeUsec / 1000000;
686                        qinfo.size = _queue.size();
687                        qinfo.queueFullDroppedIndications = _queueFullDroppedIndications;
688                        qinfo.lifetimeExpiredIndications = _lifetimeExpiredIndications;
689                        qinfo.retryAttemptsExceededIndications = _retryAttemptsExceededIndications;
690                        qinfo.subscriptionDisableDroppedIndications =
691                            _subscriptionDeleteDroppedIndications;
692                        qinfo.lastSuccessfulDeliveryTimeUsec = _lastSuccessfulDeliveryTimeUsec;
693                    }
694                    
695                    PEGASUS_NAMESPACE_END
696                    

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2