(file) Return to IndicationRouter.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

File: [Pegasus] / pegasus / src / Pegasus / Common / IndicationRouter.cpp (download)
Revision: 1.4, Mon Feb 18 15:00:21 2013 UTC (11 years, 4 months ago) by marek
Branch: MAIN
CVS Tags: preBug9676, postBug9676, TASK-TASK_PEP362_RestfulService_branch-root, TASK-TASK_PEP362_RestfulService_branch-merged_out_from_trunk, TASK-TASK_PEP362_RestfulService_branch-merged_in_to_trunk, TASK-TASK_PEP362_RestfulService_branch-merged_in_from_branch, TASK-TASK_PEP362_RestfulService_branch-branch, TASK-PEP362_RestfulService-root, TASK-PEP362_RestfulService-merged_out_to_branch, TASK-PEP362_RestfulService-merged_out_from_trunk, TASK-PEP362_RestfulService-merged_in_to_trunk, TASK-PEP362_RestfulService-merged_in_from_branch, TASK-PEP362_RestfulService-branch, TASK-PEP317_pullop-merged_out_from_trunk, TASK-PEP317_pullop-merged_in_to_trunk, RELEASE_2_14_1, RELEASE_2_14_0-RC2, RELEASE_2_14_0-RC1, RELEASE_2_14_0, RELEASE_2_14-root, RELEASE_2_14-branch, RELEASE_2_13_0-RC2, RELEASE_2_13_0-RC1, RELEASE_2_13_0-FC, RELEASE_2_13_0, RELEASE_2_13-root, RELEASE_2_13-branch, HEAD, CIMRS_WORK_20130824
Changes since 1.3: +3 -2 lines
BUG#:9536
TITLE: tweak PEGASUS_ASSERT and other macro  to avoid unused warnings and build break

DESCRIPTION:

//%LICENSE////////////////////////////////////////////////////////////////
//
// Licensed to The Open Group (TOG) under one or more contributor license
// agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
// this work for additional information regarding copyright ownership.
// Each contributor licenses this file to you under the OpenPegasus Open
// Source License; you may not use this file except in compliance with the
// License.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
//
//////////////////////////////////////////////////////////////////////////
//
//%/////////////////////////////////////////////////////////////////////////////

#include <Pegasus/Common/IndicationRouter.h>
#include <Pegasus/Common/ModuleController.h>
#include <Pegasus/Common/Exception.h>
#include <Pegasus/Common/MessageQueueService.h>

PEGASUS_NAMESPACE_BEGIN

Mutex IndicationRouter::_statusMutex;
IndicationRouter::DeliveryStatusTable IndicationRouter::_statusTable;

IndicationRouter::IndicationRouter(
    CIMProcessIndicationRequestMessage *request,
    void (*deliveryRoutine)(CIMProcessIndicationRequestMessage*))
        : _request(request), _deliveryRoutine(deliveryRoutine), _entry(0)
{
}

void IndicationRouter::deliverAndWaitForStatus()
{
    String uniqueMessageId;

#ifdef PEGASUS_ENABLE_INDICATION_ORDERING
    // Wait only if this indication is not coming from OOP provider.
    Uint32 timeoutMilliSec = _request->timeoutMilliSec;

    if (!_request->oopAgentName.size())
    {
        char messagePtrString[20];
        sprintf(messagePtrString, "%p", this);
        uniqueMessageId = messagePtrString;
        _request->messageId = uniqueMessageId;

        AutoMutex mtx(_statusMutex);
        _entry =  new DeliveryStatusEntry;
        PEGASUS_FCT_EXECUTE_AND_ASSERT(
            true,
            _statusTable.insert(uniqueMessageId, _entry));
    }

    _deliveryRoutine(_request);

    if (_entry)
    {
        // Maximum wait time is equals to SequenceIdentifierLifeTime
        // = 10 * DeliveryRetryAttempts * DeliveryRetryInterval
        Uint32 maxWaitTimeMilliSec = 10 * 3 * 20 * 1000;
        if (timeoutMilliSec == 0 )
        {
            // Minimum wait time is equals to default
            // DeliveryRetryAttempts * DeliveryRetryInterval
            timeoutMilliSec = 3 * 20 * 1000;
        }
        else if (timeoutMilliSec > maxWaitTimeMilliSec)
        {
            timeoutMilliSec = maxWaitTimeMilliSec;
        }
        _entry->semaphore.time_wait(timeoutMilliSec);
        AutoMutex mtx(_statusMutex);
        _statusTable.remove(uniqueMessageId);
    }
#else
    _deliveryRoutine(_request);
#endif
}

void IndicationRouter::notify(CIMProcessIndicationResponseMessage *response)
{
#ifdef PEGASUS_ENABLE_INDICATION_ORDERING
    DeliveryStatusEntry *entry;
    AutoMutex mtx(_statusMutex);
    if (_statusTable.lookup(response->messageId, entry))
    {
        entry->semaphore.signal();
    }
    delete response;
#else
    // We should not reach here if indication ordering is not enabled.
    PEGASUS_ASSERT(false);
#endif
}

IndicationRouter::~IndicationRouter()
{
    delete _entry;
}

DeliveryStatusAggregator:: DeliveryStatusAggregator(
    const String &origMessageId_,
    Uint32 responseQid_,
    const String &oopAgentName_,
    Boolean waitUntilDelivered_):
        origMessageId(origMessageId_),
        responseQid(responseQid_),
        oopAgentName(oopAgentName_),
        waitUntilDelivered(waitUntilDelivered_),
        _expectedResponseCount(0),
        _currentResponseCount(0),
        _expectedResponseCountSetDone(false)
{
}

void  DeliveryStatusAggregator::complete()
{
    {
        AutoMutex mtx(_responseCountMutex);
        _currentResponseCount++;
        if (!_expectedResponseCountSetDone ||
            _expectedResponseCount != _currentResponseCount)
        {
            return;
        }
    }
    // Don't acquire _responseCountMutex while calling the
    // _sendDeliveryStausResponse. This method deletes this
    // object.
    _sendDeliveryStausResponse();
}

void DeliveryStatusAggregator::incExpectedResponseCount()
{
    AutoMutex mtx(_responseCountMutex);
    _expectedResponseCount++;
}

void DeliveryStatusAggregator::expectedResponseCountSetDone()
{
    {
        AutoMutex mtx(_responseCountMutex);
        _expectedResponseCountSetDone = true;
        if (_expectedResponseCount != _currentResponseCount)
        {
            return;
        }
    }
    // Don't acquire _responseCountMutex while calling the
    // _sendDeliveryStausResponse. This method deletes this
    // object.
    _sendDeliveryStausResponse();
}

void DeliveryStatusAggregator::_sendDeliveryStausResponse()
{
    CIMProcessIndicationResponseMessage *response =
         new CIMProcessIndicationResponseMessage(
             origMessageId,
             CIMException(),
             QueueIdStack(responseQid),
             oopAgentName);
    response->dest = responseQid;
    MessageQueueService::SendForget(response);
    delete this;
}

PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2