(file) Return to DestinationQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / HandlerService

  1 venkat.puvvada 1.1 //%LICENSE////////////////////////////////////////////////////////////////
  2                    //
  3                    // Licensed to The Open Group (TOG) under one or more contributor license
  4                    // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
  5                    // this work for additional information regarding copyright ownership.
  6                    // Each contributor licenses this file to you under the OpenPegasus Open
  7                    // Source License; you may not use this file except in compliance with the
  8                    // License.
  9                    //
 10                    // Permission is hereby granted, free of charge, to any person obtaining a
 11                    // copy of this software and associated documentation files (the "Software"),
 12                    // to deal in the Software without restriction, including without limitation
 13                    // the rights to use, copy, modify, merge, publish, distribute, sublicense,
 14                    // and/or sell copies of the Software, and to permit persons to whom the
 15                    // Software is furnished to do so, subject to the following conditions:
 16                    //
 17                    // The above copyright notice and this permission notice shall be included
 18                    // in all copies or substantial portions of the Software.
 19                    //
 20                    // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
 21                    // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
 22 venkat.puvvada 1.1 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
 23                    // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
 24                    // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
 25                    // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
 26                    // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 27                    //
 28                    //////////////////////////////////////////////////////////////////////////
 29                    //
 30                    //%/////////////////////////////////////////////////////////////////////////////
 31                    
 32                    #include <Pegasus/Common/Time.h>
 33                    #include <Pegasus/Common/Tracer.h>
 34                    #include <Pegasus/Common/Constants.h>
 35                    #include <Pegasus/Common/StringConversion.h>
 36                    #include <Pegasus/Config/ConfigManager.h>
 37                    #include <Pegasus/Provider/CIMOMHandle.h>
 38                    #include "DestinationQueue.h"
 39                    
 40                    PEGASUS_NAMESPACE_BEGIN
 41                    
 42                    // Initialize with default values.
 43 venkat.puvvada 1.1 Uint16 DestinationQueue::_maxDeliveryRetryAttempts = 3;
 44                    Uint64 DestinationQueue::_minDeliveryRetryIntervalUsec = 20 * 1000000;
 45                    Uint64 DestinationQueue::_sequenceIdentifierLifetimeUsec = 600 * 1000000;
 46                    String DestinationQueue::_indicationServiceName = "PG:IndicationService";
 47                    String DestinationQueue::_objectManagerName = "Pegasus";
 48                    
 49 karl           1.1.2.2 DestinationQueue::IndDiscardedReasonMsgs
 50                            DestinationQueue::indDiscardedReasonMsgs[] = {
 51                            {"HandlerService.DestinationQueue.INDICATION_DISCARDED_LD_DELETED",
 52                             "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\" was"
 53                                 " not delivered due to the corresponding listener destination"
 54                                 " instance was removed."},
 55                        
 56                            {"HandlerService.DestinationQueue.INDICATION_DISCARDED_"
 57                                 "SUBSCRIPTION_DELETED",
 58                             "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\" was"
 59                                 " not delivered due to the corresponding subscription was disabled"
 60                                 " or deleted."},
 61                        
 62                            {"HandlerService.DestinationQueue.INDICATION_DISCARDED_"
 63                                 "DESTINATIONQUEUE_FULL",
 64                             "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
 65                                 " was discarded due to the destination queue was full."},
 66                        
 67                            {"HandlerService.DestinationQueue.INDICATION_DISCARDED_SIL_EXPIRED",
 68                             "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
 69                                 " was not delivered due to the sequence identifier lifetime expired."},
 70 karl           1.1.2.2 
 71                            {"HandlerService.DestinationQueue.INDICATION_DISCARDED_DRA_EXCEEDED",
 72                             "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
 73                                 " was not delivered due to the maximum delivery retry"
 74                                 " attempts exceeded. Exception : $2"},
 75                        
 76                            {"HandlerService.DestinationQueue.INDICATION_DISCARDED_CIMSERVER_SHUTDOWN",
 77                             "The indication with SequenceContext \"$0\" and SequenceNumber \"$1\""
 78                                  " was not delivered due to the cimserver shutdown."}
 79 venkat.puvvada 1.1     };
 80                        
 81                        Uint64 DestinationQueue::_serverStartupTimeUsec
 82                            = System::getCurrentTimeUsec();
 83                        
 84                        Boolean DestinationQueue::_initialized = false;
 85                        static Mutex _intializeMutex;
 86                        
 87                        CIMInstance DestinationQueue::_getInstance(const CIMName &className)
 88                        {
 89                            CIMOMHandle cimomHandle;
 90                            Array<CIMInstance> instances =
 91                                cimomHandle.enumerateInstances(
 92                                    OperationContext(),
 93                                    PEGASUS_NAMESPACENAME_INTEROP,
 94                                    className,
 95                                    true,
 96                                    false,
 97                                    false,
 98                                    false,
 99                                    CIMPropertyList());
100 venkat.puvvada 1.1     
101                            return instances[0];
102                        }
103                        
104                        void DestinationQueue::_initIndicationServiceProperties()
105                        {
106                            PEG_METHOD_ENTER(TRC_IND_HANDLER,
107                                "DestinationQueue::_initIndicationServiceProperties");
108                        
109                            CIMInstance instance =
110                                _getInstance(PEGASUS_CLASSNAME_CIM_INDICATIONSERVICE);
111                        
112                            instance.getProperty(
113                                instance.findProperty(
114                                    _PROPERTY_DELIVERY_RETRYATTEMPTS)).getValue().get(
115                                        _maxDeliveryRetryAttempts);
116                        
117                            instance.getProperty(
118                                instance.findProperty(
119                                    PEGASUS_PROPERTYNAME_NAME)).getValue().get(
120                                       _indicationServiceName);
121 venkat.puvvada 1.1     
122                            CIMValue value = instance.getProperty(
123                                instance.findProperty(
124                                    _PROPERTY_DELIVERY_RETRYINTERVAL)).getValue();
125                        
126                            if (value.getType() == CIMTYPE_UINT32)
127                            {
128                                Uint32 tval;
129                                value.get(tval);
130                                _minDeliveryRetryIntervalUsec = Uint64(tval) * 1000000;
131                            }
132                            else
133                            {
134                                value.get(_minDeliveryRetryIntervalUsec);
135                                _minDeliveryRetryIntervalUsec*= 1000000;
136                            }
137                               // See DSP 1054 ver 1.1.0 Sec 7.10
138                            _sequenceIdentifierLifetimeUsec = _maxDeliveryRetryAttempts *
139                                _minDeliveryRetryIntervalUsec * 10;
140                            PEG_METHOD_EXIT();
141                        }
142 venkat.puvvada 1.1     
143                        void DestinationQueue::_initObjectManagerProperties()
144                        {
145                            PEG_METHOD_ENTER(TRC_IND_HANDLER,
146                                "DestinationQueue::_initObjectManagerProperties");
147                        
148                            CIMInstance instance =
149                               _getInstance(PEGASUS_CLASSNAME_PG_OBJECTMANAGER);
150                        
151                            instance.getProperty(
152                                instance.findProperty(
153                                    PEGASUS_PROPERTYNAME_NAME)).getValue().get(
154                                        _objectManagerName);
155                            PEG_METHOD_EXIT();
156                        }
157                        
158                        DestinationQueue::DestinationQueue(
159                            const CIMInstance &handler):_handler(handler)
160                        {
161                            PEG_METHOD_ENTER(TRC_IND_HANDLER,
162                                "DestinationQueue::DestinationQueue");
163 venkat.puvvada 1.1     
164                            if (!_initialized)
165                            {
166                                AutoMutex mtx(_intializeMutex);
167                                if (!_initialized)
168                                {
169                                    try
170                                    {
171                                        PEG_TRACE_CSTRING(TRC_IND_HANDLER, Tracer::LEVEL4,
172                                            "Initializaing the Destination Queue");
173                                        _initIndicationServiceProperties();
174                                        _initObjectManagerProperties();
175                                    }
176                                    catch(const Exception &e)
177                                    {
178                                        PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
179                                            "Exception %s caught while initializing the "
180                                                "DestinationQueue, using default values.",
181                                            (const char*)e.getMessage().getCString()));
182                                    }
183                                    catch(...)
184 venkat.puvvada 1.1                 {
185                                        PEG_TRACE_CSTRING(TRC_IND_HANDLER,Tracer::LEVEL1,
186                                            "Unknown exception caught while initializing the "
187                                                "DestinationQueue, using default values.");
188                                    }
189                                    _initialized = true;
190                                }
191                            }
192                        
193                            // Build the sequence context
194                            _sequenceContext = _indicationServiceName;
195                            _sequenceContext.append("-");
196                            _sequenceContext.append(_objectManagerName);
197                            _sequenceContext.append("-");
198                        
199 karl           1.1.2.2     _connection = 0;
200                        
201 venkat.puvvada 1.1         Uint32 len = 0;
202                            char buffer[22];
203                            const char* ptr = Uint64ToString(buffer, _serverStartupTimeUsec,len);
204                            _sequenceContext.append(String(ptr, len));
205                            _sequenceContext.append("-");
206                        
207                            Uint32 idx = handler.findProperty(
208                                PEGASUS_PROPERTYNAME_LSTNRDST_CREATIONTIME);
209                        
210                            if (idx != PEG_NOT_FOUND)
211                            {
212                                Uint64 tvalue;
213                                handler.getProperty(idx).getValue().get(tvalue);
214                                Uint32 llen = 0;
215                                char lbuffer[22];
216                                const char* lptr = Uint64ToString(lbuffer, tvalue, llen);
217                                _sequenceContext.append(String(lptr, llen));
218                            }
219                            else
220                            {
221                                _sequenceContext.append(String(ptr, len));
222 venkat.puvvada 1.1         }
223                            _lastDeliveryRetryStatus = FAIL;
224                        
225                            _sequenceNumber = 0;
226                            _queueFullDroppedIndications = 0;
227                            _lifetimeExpiredIndications = 0;
228                            _retryAttemptsExceededIndications = 0;
229                            _subscriptionDeleteDroppedIndications = 0;
230                            _calcMaxQueueSize = true;
231                            _lastSuccessfulDeliveryTimeUsec = 0;
232                            _maxIndicationDeliveryQueueSize = 2400;
233                        
234                            _queueCreationTimeUsec = System::getCurrentTimeUsec();
235                            PEG_METHOD_EXIT();
236                        }
237                        
238                        DestinationQueue::~DestinationQueue()
239                        {
240                            PEG_METHOD_ENTER(TRC_IND_HANDLER,
241                                "DestinationQueue::~DestinationQueue");
242                        
243 karl           1.1.2.1     if (_queue.size() || _lastDeliveryRetryStatus == PENDING)
244 venkat.puvvada 1.1         {
245                                _cleanup(LISTENER_NOT_ACTIVE);
246                            }
247 karl           1.1.2.2     delete _connection;
248 venkat.puvvada 1.1     
249                            PEG_METHOD_EXIT();
250                        }
251                        
252                        Sint64 DestinationQueue::getSequenceNumber()
253                        {
254                            AutoMutex mtx(_queueMutex);
255                        
256                            // Determine max queue size, See PEP 324 for the algorithm
257                            if (_calcMaxQueueSize)
258                            {
259                                if ((System::getCurrentTimeUsec() - _queueCreationTimeUsec)
260                                    >= _sequenceIdentifierLifetimeUsec)
261                                {
262                                    // (10 * DeliveryRetryInterval * DeliveryRetryAttempts) /
263                                    // (Number of indications arrived over
264                                    //   sequence-identifier-lifetime.)
265                        
266                                    _maxIndicationDeliveryQueueSize = _sequenceNumber;
267                        
268                                    if (_maxIndicationDeliveryQueueSize < 200)
269 venkat.puvvada 1.1                 {
270                                        _maxIndicationDeliveryQueueSize = 200;
271                                    }
272                                    else if (_maxIndicationDeliveryQueueSize > 2400)
273                                    {
274                                        _maxIndicationDeliveryQueueSize = 2400;
275                                    }
276                                    _calcMaxQueueSize = false;
277                                }
278                            }
279                        
280                            Sint64 nextSequenceNumber = _sequenceNumber++;
281                        
282                            if (_sequenceNumber < 0)
283                            {
284                                _sequenceNumber = 0;
285                            }
286                        
287                            return nextSequenceNumber;
288                        }
289                        
290 venkat.puvvada 1.1     String DestinationQueue::_getSequenceContext(
291                            const CIMInstance &indication)
292                        {
293                            String sequenceContext;
294                        
295                            indication.getProperty(
296                                indication.findProperty(
297                                    _PROPERTY_SEQUENCECONTEXT)).getValue().get(sequenceContext);
298                        
299                            return sequenceContext;
300                        }
301                        
302                        Sint64 DestinationQueue::_getSequenceNumber(
303                            const CIMInstance &indication)
304                        {
305                            Sint64 sequenceNumber;
306                        
307                            indication.getProperty(
308                                indication.findProperty(
309                                    _PROPERTY_SEQUENCENUMBER)).getValue().get(sequenceNumber);
310                        
311 venkat.puvvada 1.1         return sequenceNumber;
312                        }
313                        
314 karl           1.1.2.1 void DestinationQueue::_logDiscardedIndication(
315                            Uint32 reasonCode,
316                            const CIMInstance &indication,
317                            const String &message)
318 venkat.puvvada 1.1     {
319                            PEGASUS_ASSERT(reasonCode <
320 karl           1.1.2.2         sizeof(indDiscardedReasonMsgs)/sizeof(IndDiscardedReasonMsgs));
321 venkat.puvvada 1.1     
322 karl           1.1.2.2     if (reasonCode == DRA_EXCEEDED)
323                            {
324                                Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING,
325                                    MessageLoaderParms(
326                                        indDiscardedReasonMsgs[reasonCode].key,
327                                        indDiscardedReasonMsgs[reasonCode].msg,
328                                        (const char*)_getSequenceContext(indication).getCString(),
329                                        _getSequenceNumber(indication),
330                                        (const char*)message.getCString()));
331                            }
332                            else
333                            {
334                                Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING,
335                                    MessageLoaderParms(
336                                        indDiscardedReasonMsgs[reasonCode].key,
337                                        indDiscardedReasonMsgs[reasonCode].msg,
338                                        (const char*)_getSequenceContext(indication).getCString(),
339                                        _getSequenceNumber(indication)));
340                                    
341                            }
342 venkat.puvvada 1.1     }
343                        
344 karl           1.1.2.1 void DestinationQueue::enqueue(CIMHandleIndicationRequestMessage *message)
345 venkat.puvvada 1.1     {
346                            PEG_METHOD_ENTER(TRC_IND_HANDLER,
347                                "DestinationQueue::enqueue");
348                        
349 karl           1.1.2.1     Uint32 idx;
350                            CIMProperty prop;
351                            CIMInstance &indication = message->indicationInstance;
352                        
353                            if ((idx = indication.findProperty(_PROPERTY_SEQUENCECONTEXT))
354                                != PEG_NOT_FOUND)
355                            {
356                                prop = indication.getProperty(idx);
357                                prop.setValue(getSequenceContext());
358                                indication.removeProperty(idx);
359                            }
360                            else
361                            {
362                                prop = CIMProperty(
363                                    _PROPERTY_SEQUENCECONTEXT,
364                                    getSequenceContext());
365                            }
366                            indication.addProperty(prop);
367                        
368 venkat.puvvada 1.1         AutoMutex mtx(_queueMutex);
369 karl           1.1.2.1     Sint64 sequenceNumber = getSequenceNumber();
370                            if ((idx = indication.findProperty(_PROPERTY_SEQUENCENUMBER))
371                                != PEG_NOT_FOUND)
372                            {
373                                prop = indication.getProperty(idx);
374                                prop.setValue(sequenceNumber);
375                                indication.removeProperty(idx);
376                            }
377                            else
378                            {
379                                prop = CIMProperty(
380                                    _PROPERTY_SEQUENCENUMBER,
381                                    sequenceNumber);
382                            }
383                            indication.addProperty(prop);
384                        
385 karl           1.1.2.2     DeliveryStatusAggregator *aggregator = 0;
386                            if (message->deliveryStatusAggregator &&
387                                message->deliveryStatusAggregator->waitUntilDelivered)
388                            {
389                                aggregator = message->deliveryStatusAggregator;
390                            }
391                        
392 karl           1.1.2.1     IndicationInfo *info = new IndicationInfo(
393                                message->indicationInstance,
394                                message->subscriptionInstance,
395                                message->operationContext,
396                                message->nameSpace.getString(),
397 karl           1.1.2.2         this,
398                                aggregator);
399                        
400 venkat.puvvada 1.1         _queue.insert_back(info);
401                        
402 karl           1.1.2.1     info->lastDeliveryRetryTimeUsec = 0;
403                            info->arrivalTimeUsec = System::getCurrentTimeUsec();
404 venkat.puvvada 1.1     
405                            if (_queue.size() > _maxIndicationDeliveryQueueSize)
406                            {
407                                _queueFullDroppedIndications++;
408                                IndicationInfo *temp = _queue.remove_front();
409 karl           1.1.2.1         _logDiscardedIndication(
410 venkat.puvvada 1.1                 DESTINATIONQUEUE_FULL,
411                                    temp->indication);
412                                delete temp;
413                            }
414 karl           1.1.2.1 
415 venkat.puvvada 1.1         PEG_METHOD_EXIT();
416                        }
417                        
418                        void DestinationQueue::updateDeliveryRetrySuccess(IndicationInfo *info)
419                        {
420                            PEG_METHOD_ENTER(TRC_IND_HANDLER,
421                                "DestinationQueue::updateDeliveryRetrySuccess");
422                        
423                            AutoMutex mtx(_queueMutex);
424 karl           1.1.2.2 
425 venkat.puvvada 1.1         PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
426                            _lastSuccessfulDeliveryTimeUsec = System::getCurrentTimeUsec();
427                            _lastDeliveryRetryStatus = SUCCESS;
428                        
429                            PEG_TRACE((TRC_IND_HANDLER, Tracer::LEVEL4,
430                                "Indication with SequenceContext %s and SequenceNumber %"
431                                    PEGASUS_64BIT_CONVERSION_WIDTH "d is successfully delivered",
432                                (const char*)_getSequenceContext(info->indication).getCString(),
433                                _getSequenceNumber(info->indication)));
434                        
435                            delete info;
436                        
437                            PEG_METHOD_EXIT();
438                        }
439                        
440                        void DestinationQueue::updateDeliveryRetryFailure(
441                            IndicationInfo *info,
442                            const CIMException &e)
443                        {
444                            PEG_METHOD_ENTER(TRC_IND_HANDLER,
445                                "DestinationQueue::updateDeliveryRetryFailure");
446 venkat.puvvada 1.1     
447                            AutoMutex mtx(_queueMutex);
448                        
449 karl           1.1.2.2     // We should not have any connection object here because indication
450                            // delivery has failed.
451                            PEGASUS_ASSERT(!_connection);
452                        
453 venkat.puvvada 1.1         PEGASUS_ASSERT(_lastDeliveryRetryStatus == PENDING);
454                            _lastDeliveryRetryStatus = FAIL;
455                            info->deliveryRetryAttemptsMade++;
456                        
457 karl           1.1.2.1     // Check for DeliveryRetryAttempts by adding the original delivery attempt.
458                            if (info->deliveryRetryAttemptsMade >= _maxDeliveryRetryAttempts + 1)
459 venkat.puvvada 1.1         {
460                                _retryAttemptsExceededIndications++;
461 karl           1.1.2.1         _logDiscardedIndication(
462 venkat.puvvada 1.1                 DRA_EXCEEDED,
463 karl           1.1.2.1             info->indication,
464                                    e.getMessage());
465 venkat.puvvada 1.1             delete info;
466                            }
467 karl           1.1.2.1     else if (_queue.size() >= _maxIndicationDeliveryQueueSize)
468 venkat.puvvada 1.1         {
469                                _queueFullDroppedIndications++;
470 karl           1.1.2.1         _logDiscardedIndication(
471 venkat.puvvada 1.1                 DESTINATIONQUEUE_FULL,
472                                    info->indication);
473                                delete info;
474                            }
475                            else
476                            {
477 karl           1.1.2.1         // To deliver the indications in the correct order, insert the
478                                // delivery retry failed indications at the front of the queue.
479                                _queue.insert_front(info);
480                                PEG_TRACE((TRC_IND_HANDLER,Tracer::LEVEL1,
481                                    "Delivery failure for indication with SequenceContext %s and "
482                                        "SequenceNumber %" PEGASUS_64BIT_CONVERSION_WIDTH "d."
483                                            " DeliveryRetryAttempts made %u. Exception : %s",
484                                    (const char*)_getSequenceContext(info->indication).getCString(),
485                                    _getSequenceNumber(info->indication),
486                                    info->deliveryRetryAttemptsMade,
487                                    (const char*)e.getMessage().getCString()));
488 venkat.puvvada 1.1             info->lastDeliveryRetryTimeUsec = System::getCurrentTimeUsec();
489                            }
490                        
491                            PEG_METHOD_EXIT();
492                        }
493                        
494                        void DestinationQueue::_waitForNonPendingDeliveryStatus()
495                        {
496                            PEG_METHOD_ENTER(TRC_IND_HANDLER,
497                                "DestinationQueue::_waitForNonPendingDeliveryStatus");
498                        
499                            while (true)
500                            {
501                                {
502                                    AutoMutex mtx(_queueMutex);
503                                    if (_lastDeliveryRetryStatus != PENDING)
504                                    {
505                                        break;
506                                    }
507                                }
508                                Threads::yield();
509 venkat.puvvada 1.1             Threads::sleep(50);
510                            }
511                            PEG_METHOD_EXIT();
512                        }
513                        
514                        void DestinationQueue::deleteMatchedIndications(
515                            const CIMObjectPath &subscriptionPath)
516                        {
517                            PEG_METHOD_ENTER(TRC_IND_HANDLER,
518                                "DestinationQueue::deleteMatchedIndications");
519                        
520                            _waitForNonPendingDeliveryStatus();
521                        
522                            IndicationInfo *info;
523                            AutoMutex mtx(_queueMutex);
524                        
525                            for(Uint32 i = 0, n = _queue.size(); i < n; ++i)
526                            {
527                                info = _queue.remove_front();
528                                if (info->subscription.getPath().identical(subscriptionPath))
529                                {
530 venkat.puvvada 1.1                 _subscriptionDeleteDroppedIndications++;
531 karl           1.1.2.1             _logDiscardedIndication(
532 venkat.puvvada 1.1                     SUBSCRIPTION_NOT_ACTIVE,
533                                        info->indication);
534                                    delete info;
535                                }
536                                else
537                                {
538                                    _queue.insert_back(info);
539                                }
540                            }
541                            PEG_METHOD_EXIT();
542                        }
543                        
544                        void DestinationQueue::cleanup()
545                        {
546                            PEG_METHOD_ENTER(TRC_IND_HANDLER,
547                                "DestinationQueue::cleanup");
548                        
549                            _cleanup(LISTENER_NOT_ACTIVE);
550                        
551                            PEG_METHOD_EXIT();
552                        }
553 venkat.puvvada 1.1     
554                        void DestinationQueue::shutdown()
555                        {
556                            PEG_METHOD_ENTER(TRC_IND_HANDLER,
557                                "DestinationQueue::shutdown");
558                        
559                            _cleanup(CIMSERVER_SHUTDOWN);
560                        
561                            PEG_METHOD_EXIT();
562                        }
563                        
564                        void DestinationQueue::_cleanup(int reasonCode)
565                        {
566                            _waitForNonPendingDeliveryStatus();
567                        
568                            IndicationInfo *info;
569                            while ((info = _queue.remove_front()))
570                            {
571 karl           1.1.2.1         _logDiscardedIndication(
572 venkat.puvvada 1.1                 reasonCode,
573                                    info->indication);
574                                delete info;
575                            }
576                        }
577                        
578                        IndicationInfo* DestinationQueue::getNextIndicationForDelivery(
579                            Uint64 &timeNowUsec, Uint64 &nextIndDRIExpTimeUsec)
580                        {
581                            AutoMutex mtx(_queueMutex);
582                        
583                            if (!_queue.size() || _lastDeliveryRetryStatus == PENDING)
584                            {
585 karl           1.1.2.1         // Maximum expiration time is equals to DeliveryRetryInterval.
586                                nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec;
587 karl           1.1.2.2 
588                                // If there are no indications in the queue, delete connection.
589                                if (!_queue.size() && _lastDeliveryRetryStatus != PENDING)
590                                {
591                                    delete _connection;
592                                    _connection = 0;
593                                }
594 venkat.puvvada 1.1             return 0;
595                            }
596                        
597 karl           1.1.2.1     nextIndDRIExpTimeUsec = 0;
598                        
599 venkat.puvvada 1.1         IndicationInfo *info;
600                        
601                            while (_queue.size())
602                            {
603                                info = _queue.front();
604                        
605                                if (timeNowUsec < info->arrivalTimeUsec ||
606                                    timeNowUsec < info->lastDeliveryRetryTimeUsec)
607                                {
608                                    timeNowUsec = System::getCurrentTimeUsec();
609                                }
610                        
611                                if ((timeNowUsec - info->arrivalTimeUsec) >=
612                                    _sequenceIdentifierLifetimeUsec)
613                                {
614                                    _lifetimeExpiredIndications++;
615                                    IndicationInfo *temp = _queue.remove_front();
616 karl           1.1.2.1             _logDiscardedIndication(
617 venkat.puvvada 1.1                     SIL_EXPIRED,
618                                        temp->indication);
619                                    delete temp;
620                                }
621                                else if ((timeNowUsec - info->lastDeliveryRetryTimeUsec)
622                                    >= _minDeliveryRetryIntervalUsec)
623                                {
624                                    _lastDeliveryRetryStatus = PENDING;
625                                    _queue.remove_front();
626                                    IndicationInfo *temp = _queue.front();
627                        
628 karl           1.1.2.1             // The following algorithm is used to determine the elapsed
629                                    // DeliveryRetryAttempts. To deliver the indication in order,
630                                    // Server delays the newer indications until older indications
631                                    // in the queue are attempted for delivery and their
632                                    // DeliveryRetyAttempts are exceeded. The following algorithm
633                                    // ensures that indications won't stay in the queue more than
634                                    // (DeliveryRetryInterval * (DeliveryRetyAttempts + 1) time.
635                                    Uint32 elapsedDeliveryRetryAttempts;
636                                    if (info->lastDeliveryRetryTimeUsec)
637                                    {
638                                        elapsedDeliveryRetryAttempts =
639 karl           1.1.2.2                     ((timeNowUsec - info->lastDeliveryRetryTimeUsec)
640 karl           1.1.2.1                         / _minDeliveryRetryIntervalUsec);
641                                    }
642                                    else
643                                    {
644 karl           1.1.2.2                 elapsedDeliveryRetryAttempts =
645                                            ((timeNowUsec - info->arrivalTimeUsec)
646 karl           1.1.2.1                         / _minDeliveryRetryIntervalUsec);
647                                    }
648                        
649                                    if (elapsedDeliveryRetryAttempts)
650                                    {
651                                        info->deliveryRetryAttemptsMade +=
652                                            elapsedDeliveryRetryAttempts - 1;
653                                    }
654                        
655 venkat.puvvada 1.1                 if (temp)
656                                    {
657                                        if (timeNowUsec - temp->lastDeliveryRetryTimeUsec
658                                                < _minDeliveryRetryIntervalUsec)
659                                        {
660                                            nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
661                                                (timeNowUsec - temp->lastDeliveryRetryTimeUsec);
662                                        }
663                        
664                                        PEGASUS_ASSERT(nextIndDRIExpTimeUsec
665                                            <= _minDeliveryRetryIntervalUsec);
666                                    }
667                        
668                                    return info;
669                                }
670                                else
671                                {
672                                    if (timeNowUsec - info->lastDeliveryRetryTimeUsec
673                                            < _minDeliveryRetryIntervalUsec)
674                                    {
675                                        nextIndDRIExpTimeUsec = _minDeliveryRetryIntervalUsec -
676 venkat.puvvada 1.1                         (timeNowUsec - info->lastDeliveryRetryTimeUsec);
677                                    }
678                        
679                                    PEGASUS_ASSERT(nextIndDRIExpTimeUsec
680                                        <= _minDeliveryRetryIntervalUsec);
681                        
682                                    break;
683                                }
684                            }
685                        
686                            return 0;
687                        }
688                        
689 karl           1.1.2.3 void DestinationQueue::setDeliveryRetryAttempts( Uint16 DeliveryRetryAttempts )
690                        {
691                            AutoMutex mtx(_intializeMutex);
692                            _maxDeliveryRetryAttempts = DeliveryRetryAttempts ;
693                            _sequenceIdentifierLifetimeUsec = _maxDeliveryRetryAttempts *
694                                _minDeliveryRetryIntervalUsec * 10;
695                        }
696                        
697                        void DestinationQueue::setminDeliveryRetryInterval(
698                            Uint32 minDeliveryRetryInterval)
699                        {
700                            AutoMutex mtx(_intializeMutex);
701                            _minDeliveryRetryIntervalUsec =  Uint64(minDeliveryRetryInterval)*1000000 ;
702                            _sequenceIdentifierLifetimeUsec = _maxDeliveryRetryAttempts *
703                                _minDeliveryRetryIntervalUsec * 10;
704                        }
705                        
706 venkat.puvvada 1.1     void DestinationQueue::getInfo(QueueInfo &qinfo)
707                        {
708                            AutoMutex mtx(_queueMutex);
709                        
710                            qinfo.handlerName = _handler.getPath();
711                            qinfo.queueCreationTimeUsec = _queueCreationTimeUsec;
712                            qinfo.sequenceContext = _sequenceContext;
713                            qinfo.nextSequenceNumber = _sequenceNumber;
714                            qinfo.maxQueueLength = _maxIndicationDeliveryQueueSize;
715                            qinfo.sequenceIdentifierLifetimeSeconds =
716                                _sequenceIdentifierLifetimeUsec / 1000000;
717                            qinfo.size = _queue.size();
718                            qinfo.queueFullDroppedIndications = _queueFullDroppedIndications;
719                            qinfo.lifetimeExpiredIndications = _lifetimeExpiredIndications;
720                            qinfo.retryAttemptsExceededIndications = _retryAttemptsExceededIndications;
721                            qinfo.subscriptionDisableDroppedIndications =
722                                _subscriptionDeleteDroppedIndications;
723                            qinfo.lastSuccessfulDeliveryTimeUsec = _lastSuccessfulDeliveryTimeUsec;
724                        }
725                        
726                        PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2