(file) Return to WsmProcessor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / WsmServer

File: [Pegasus] / pegasus / src / Pegasus / WsmServer / WsmProcessor.cpp (download)
Revision: 1.11, Fri Mar 20 06:36:05 2009 UTC (15 years, 3 months ago) by sahana.prabhakar
Branch: MAIN
CVS Tags: TASK_PEP317_1JUNE_2013, TASK-PEP348_SCMO-root, TASK-PEP348_SCMO-merged_out_to_branch, TASK-PEP348_SCMO-merged_out_from_trunk, TASK-PEP348_SCMO-merged_in_to_trunk, TASK-PEP348_SCMO-merged_in_from_branch, TASK-PEP348_SCMO-branch, TASK-PEP317_pullop-root, RELEASE_2_11_2-RC1, RELEASE_2_11_2, RELEASE_2_11_1-RC1, RELEASE_2_11_1, RELEASE_2_11_0-RC1, RELEASE_2_11_0-FC, RELEASE_2_11_0, RELEASE_2_11-root, RELEASE_2_11-branch, RELEASE_2_10_1-RC1, RELEASE_2_10_1, RELEASE_2_10_0-RC2, RELEASE_2_10_0-RC1, RELEASE_2_10_0, RELEASE_2_10-root, RELEASE_2_10-branch, BeforeUpdateToHeadOct82011
Branch point for: TASK-PEP317_pullop-branch
Changes since 1.10: +2 -2 lines
BUG#: 8440
TITLE: Certain classes in OpenPegasus code can be MessageQueues instead of MessageQueueService.
DESCRIPTION: Some classes in OP have been derived from MessageQueueService when they are not using any of the asynchronous features provided by MessageQueueService. These classes have been converted to MessageQueues.

//%LICENSE////////////////////////////////////////////////////////////////
//
// Licensed to The Open Group (TOG) under one or more contributor license
// agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
// this work for additional information regarding copyright ownership.
// Each contributor licenses this file to you under the OpenPegasus Open
// Source License; you may not use this file except in compliance with the
// License.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
//
//////////////////////////////////////////////////////////////////////////
//
//%////////////////////////////////////////////////////////////////////////////

#include <cctype>
#include <cstdio>
#include <Pegasus/Common/Config.h>
#include <Pegasus/Common/Tracer.h>
#include <Pegasus/Common/MessageLoader.h>
#include <Pegasus/Common/StringConversion.h>
#include <Pegasus/Common/AutoPtr.h>
#include "WsmConstants.h"
#include "SoapResponse.h"
#include "WsmProcessor.h"

PEGASUS_USING_STD;

PEGASUS_NAMESPACE_BEGIN

Uint64 WsmProcessor::_currentEnumContext = 0;

WsmProcessor::WsmProcessor(
    MessageQueue* cimOperationProcessorQueue,
    CIMRepository* repository)
    : MessageQueue(PEGASUS_QUEUENAME_WSMPROCESSOR),
      _wsmResponseEncoder(),
      _wsmRequestDecoder(this),
      _cimOperationProcessorQueue(cimOperationProcessorQueue),
      _repository(repository),
      _wsmToCimRequestMapper(repository)
{
}

WsmProcessor::~WsmProcessor()
{
    // Clean up enumeration responses that have not been pulled or released.
    for (EnumerationContextTable::Iterator i =
             _enumerationContextTable.start(); i; i++)
    {
        delete i.value().response;
    }
}

void WsmProcessor::handleEnqueue(Message* message)
{
    if (!message)
    {
        return;
    }

    PEGASUS_ASSERT(dynamic_cast<CIMResponseMessage*>(message) != 0);
    handleResponse(dynamic_cast<CIMResponseMessage*>(message));
}

void WsmProcessor::handleEnqueue()
{
    Message* message = dequeue();
    handleEnqueue(message);
}

void WsmProcessor::handleRequest(WsmRequest* wsmRequest)
{
    PEG_METHOD_ENTER(TRC_WSMSERVER, "WsmProcessor::handleRequest()");

    // Process requests by type.  For now, only WS-Transfer operations are
    // implemented, and they all are handled by forwarding to the CIM Server.

    AutoPtr<WsmRequest> wsmRequestDestroyer(wsmRequest);

    try
    {
        CIMOperationRequestMessage* cimRequest =
            _wsmToCimRequestMapper.mapToCimRequest(wsmRequest);

        // Requests that do not have a CIM representation are mapped to NULL
        // and are meant to be handled by the WSM processor itself.
        if (cimRequest)
        {
            // Save the request until the response comes back.
            // Note that the CIM request has its own unique message ID.
            _requestTable.insert(cimRequest->messageId, wsmRequest);

            cimRequest->queueIds.push(getQueueId());
            _cimOperationProcessorQueue->enqueue(cimRequest);

            wsmRequestDestroyer.release();
        }
        else
        {
            switch (wsmRequest->getType())
            {
                case WS_ENUMERATION_PULL:
                    _handlePullRequest((WsenPullRequest*) wsmRequest);
                    break;

                case WS_ENUMERATION_RELEASE:
                    _handleReleaseRequest((WsenReleaseRequest*) wsmRequest);
                    break;

                default:
                    break;
            }
        }
    }
    catch (WsmFault& fault)
    {
        sendResponse(new WsmFaultResponse(wsmRequest, fault));
    }
    catch (CIMException& e)
    {
        sendResponse(new WsmFaultResponse(
            wsmRequest,
            _cimToWsmResponseMapper.mapCimExceptionToWsmFault(e)));
    }
    catch (Exception& e)
    {
        sendResponse(new WsmFaultResponse(
            wsmRequest,
            WsmFault(
                WsmFault::wsman_InternalError,
                e.getMessage(),
                e.getContentLanguages())));
    }
    catch (PEGASUS_STD(exception)& e)
    {
        sendResponse(new WsmFaultResponse(
            wsmRequest,
            WsmFault(WsmFault::wsman_InternalError, e.what())));
    }
    catch (...)
    {
        sendResponse(new WsmFaultResponse(
            wsmRequest,
            WsmFault(WsmFault::wsman_InternalError)));
    }

    // Note this requirement when Enumerate/Pull operations are supported:
    // DSP0226 R6.3-5: For operations that span multiple message sequences,
    // the wsman:Locale element is processed in the initial message only.
    // It should be ignored in subsequent messages because the first
    // message establishes the required locale. The service may issue a
    // fault if the wsman:Locale is present in subsequent messages and the
    // value is different from that used in the initiating request.

    PEG_METHOD_EXIT();
}

void WsmProcessor::handleResponse(CIMResponseMessage* cimResponse)
{
    PEG_METHOD_ENTER(TRC_WSMSERVER, "WsmProcessor::handleResponse()");

    AutoPtr<CIMResponseMessage> cimResponseDestroyer(cimResponse);

    // Lookup the request this response corresponds to
    WsmRequest* wsmRequest;
    Boolean gotRequest =
        _requestTable.lookup(cimResponse->messageId, wsmRequest);
    PEGASUS_ASSERT(gotRequest);
    AutoPtr<WsmRequest> wsmRequestDestroyer(wsmRequest);
    _requestTable.remove(cimResponse->messageId);

    try
    {
        switch (wsmRequest->getType())
        {
            case WS_ENUMERATION_ENUMERATE:
                _handleEnumerateResponse(
                    cimResponse,
                    (WsenEnumerateRequest*) wsmRequest);
                break;

            default:
                _handleDefaultResponse(cimResponse, wsmRequest);
                break;
        }
    }
    catch (WsmFault& fault)
    {
        sendResponse(new WsmFaultResponse(wsmRequest, fault));
    }
    catch (CIMException& e)
    {
        sendResponse(new WsmFaultResponse(
            wsmRequest,
            _cimToWsmResponseMapper.mapCimExceptionToWsmFault(e)));
    }
    catch (Exception& e)
    {
        sendResponse(new WsmFaultResponse(
            wsmRequest,
            WsmFault(
                WsmFault::wsman_InternalError,
                e.getMessage(),
                e.getContentLanguages())));
    }
    catch (PEGASUS_STD(exception)& e)
    {
        sendResponse(new WsmFaultResponse(
            wsmRequest,
            WsmFault(WsmFault::wsman_InternalError, e.what())));
    }
    catch (...)
    {
        sendResponse(new WsmFaultResponse(
            wsmRequest,
            WsmFault(WsmFault::wsman_InternalError)));
    }

    PEG_METHOD_EXIT();
}

void WsmProcessor::sendResponse(WsmResponse* wsmResponse)
{
    PEG_METHOD_ENTER(TRC_WSMSERVER, "WsmProcessor::sendResponse()");

    _wsmResponseEncoder.enqueue(wsmResponse);
    delete wsmResponse;

    PEG_METHOD_EXIT();
}

Uint32 WsmProcessor::getWsmRequestDecoderQueueId()
{
    return _wsmRequestDecoder.getQueueId();
}

void WsmProcessor::_handleEnumerateResponse(
    CIMResponseMessage* cimResponse,
    WsenEnumerateRequest* wsmRequest)
{
    if (cimResponse->cimException.getCode() != CIM_ERR_SUCCESS)
    {
        _handleDefaultResponse(cimResponse, wsmRequest);
        return;
    }

    AutoPtr<SoapResponse> soapResponse;

    {
        AutoMutex lock(_enumerationContextTableLock);

        AutoPtr<WsenEnumerateResponse> wsmResponse(
            (WsenEnumerateResponse*) _cimToWsmResponseMapper.
                mapToWsmResponse(wsmRequest, cimResponse));

        // Get the enumeration expiration time
        CIMDateTime expiration;
        _getExpirationDatetime(wsmRequest->expiration, expiration);

        // Create a new context
        Uint64 contextId = _currentEnumContext++;
        _enumerationContextTable.insert(
            contextId,
            EnumerationContext(
                contextId,
                wsmRequest->userName,
                wsmRequest->enumerationMode,
                expiration,
                wsmRequest->epr,
                wsmResponse.get()));
        wsmResponse->setEnumerationContext(contextId);

        // Get the requsted chunk of results
        AutoPtr<WsenEnumerateResponse> splitResponse(
            _splitEnumerateResponse(wsmRequest, wsmResponse.get(),
                wsmRequest->optimized ? wsmRequest->maxElements : 0));
        splitResponse->setEnumerationContext(contextId);

        // If no items are left in the original response, mark split
        // response as complete
        if (wsmResponse->getSize() == 0)
        {
            splitResponse->setComplete();
        }

        Uint32 numDataItemsEncoded = 0;
        soapResponse.reset(_wsmResponseEncoder.encodeWsenEnumerateResponse(
            splitResponse.get(), numDataItemsEncoded));

        if (splitResponse->getSize() > numDataItemsEncoded)
        {
            // Add unprocessed items back to the context
            splitResponse->remove(0, numDataItemsEncoded);
            wsmResponse->merge(splitResponse.get());
        }

        // Remove the context if there are no instances left
        if (wsmResponse->getSize() == 0)
        {
            _enumerationContextTable.remove(contextId);
        }
        else
        {
            // If the context is not removed, the pointer to the response is
            // now owned by the context
            wsmResponse.release();
        }
    }

    _wsmResponseEncoder.sendResponse(soapResponse.get());
}

void WsmProcessor::_handlePullRequest(WsenPullRequest* wsmRequest)
{
    AutoPtr<SoapResponse> soapResponse;

    {
        AutoMutex lock(_enumerationContextTableLock);
        EnumerationContext* enumContext;

        if (_enumerationContextTable.lookupReference(
                wsmRequest->enumerationContext, enumContext))
        {
            // EPRs of the request and the enumeration context must match
            if (wsmRequest->epr != enumContext->epr)
            {
                throw WsmFault(
                    WsmFault::wsa_MessageInformationHeaderRequired,
                    MessageLoaderParms(
                        "WsmServer.WsmProcessor.INVALID_PULL_EPR",
                        "EPR of a Pull request does not match that of "
                        "the enumeration context."));
            }

            // User credentials of the request and the enumeration context must
            // match.
            if (wsmRequest->userName != enumContext->userName)
            {
                // DSP0226 R8.1-6:  The wsen:Pull and wsen:Release operations
                // are a continuation of the original wsen:Enumerate operation.
                // The service should enforce the same authentication and
                // authorization throughout the entire sequence of operations
                // and should fault any attempt to change credentials during
                // the sequence.

                throw WsmFault(WsmFault::wsman_AccessDenied);
            }

            AutoPtr<WsenPullResponse> wsmResponse(_splitPullResponse(
                wsmRequest, enumContext->response, wsmRequest->maxElements));
            wsmResponse->setEnumerationContext(enumContext->contextId);
            if (enumContext->response->getSize() == 0)
            {
                wsmResponse->setComplete();
            }

            Uint32 numDataItemsEncoded = 0;
            soapResponse.reset(_wsmResponseEncoder.encodeWsenPullResponse(
                wsmResponse.get(), numDataItemsEncoded));

            if (wsmResponse->getSize() > numDataItemsEncoded)
            {
                // Add unprocessed items back to the context
                wsmResponse->remove(0, numDataItemsEncoded);
                enumContext->response->merge(wsmResponse.get());
            }

            // Remove the context if there are no instances left
            if (enumContext->response->getSize() == 0)
            {
                delete enumContext->response;
                _enumerationContextTable.remove(wsmRequest->enumerationContext);
            }
        }
        else
        {
            throw WsmFault(
                WsmFault::wsen_InvalidEnumerationContext,
                MessageLoaderParms(
                    "WsmServer.WsmProcessor.INVALID_ENUMERATION_CONTEXT",
                    "Enumeration context \"$0\" is not valid.",
                    wsmRequest->enumerationContext));
        }
    }

    _wsmResponseEncoder.sendResponse(soapResponse.get());
}

void WsmProcessor::_handleReleaseRequest(WsenReleaseRequest* wsmRequest)
{
    AutoPtr<WsenReleaseResponse> wsmResponse;

    {
        AutoMutex lock(_enumerationContextTableLock);

        EnumerationContext enumContext;
        if (_enumerationContextTable.lookup(
                wsmRequest->enumerationContext, enumContext))
        {
            // EPRs of the request and the enumeration context must match
            if (wsmRequest->epr != enumContext.epr)
            {
                throw WsmFault(
                    WsmFault::wsa_MessageInformationHeaderRequired,
                    MessageLoaderParms(
                        "WsmServer.WsmProcessor.INVALID_RELEASE_EPR",
                        "EPR of a Release request does not match that of "
                        "the enumeration context."));
            }

            // User credentials of the request and the enumeration context must
            // match.
            if (wsmRequest->userName != enumContext.userName)
            {
                // DSP0226 R8.1-6:  The wsen:Pull and wsen:Release operations
                // are a continuation of the original wsen:Enumerate operation.
                // The service should enforce the same authentication and
                // authorization throughout the entire sequence of operations
                // and should fault any attempt to change credentials during
                // the sequence.

                throw WsmFault(WsmFault::wsman_AccessDenied);
            }

            wsmResponse.reset(new WsenReleaseResponse(
                wsmRequest, enumContext.response->getContentLanguages()));

            delete enumContext.response;
            _enumerationContextTable.remove(wsmRequest->enumerationContext);
        }
        else
        {
            throw WsmFault(
                WsmFault::wsen_InvalidEnumerationContext,
                MessageLoaderParms(
                    "WsmServer.WsmProcessor.INVALID_ENUMERATION_CONTEXT",
                    "Enumeration context \"$0\" is not valid.",
                    wsmRequest->enumerationContext));
        }
    }

    _wsmResponseEncoder.enqueue(wsmResponse.get());
}

void WsmProcessor::_handleDefaultResponse(
    CIMResponseMessage* cimResponse, WsmRequest* wsmRequest)
{
    AutoPtr<WsmResponse> wsmResponse(
        _cimToWsmResponseMapper.mapToWsmResponse(wsmRequest, cimResponse));

    cimResponse->updateThreadLanguages();
    cimResponse->queueIds.pop();

    _wsmResponseEncoder.enqueue(wsmResponse.get());
}

WsenEnumerateResponse* WsmProcessor::_splitEnumerateResponse(
    WsenEnumerateRequest* request, WsenEnumerateResponse* response, Uint32 num)
{
    WsenEnumerationData splitData;
    response->getEnumerationData().split(splitData, num);

    return new WsenEnumerateResponse(splitData, response->getItemCount(),
        request, response->getContentLanguages());
}

WsenPullResponse* WsmProcessor::_splitPullResponse(
    WsenPullRequest* request, WsenEnumerateResponse* response, Uint32 num)
{
    WsenEnumerationData splitData;
    response->getEnumerationData().split(splitData, num);

    return new WsenPullResponse(splitData, request,
        response->getContentLanguages());
}

void WsmProcessor::_getExpirationDatetime(
    const String& wsmDT, CIMDateTime& cimDT)
{
    CIMDateTime dt, currentDT;

    // Default expiration interval = 10 mins
    // ATTN WSMAN: what should the value be?
    CIMDateTime maxInterval(0, 0, 10, 0, 0, 6);

    // If expiration is not set, use the dafault.
    if (wsmDT == String::EMPTY)
    {
        dt = maxInterval;
    }
    else
    {
        try
        {
            WsmToCimRequestMapper::convertWsmToCimDatetime(wsmDT, dt);
        }
        catch (...)
        {
            throw WsmFault(
                WsmFault::wsen_InvalidExpirationTime,
            MessageLoaderParms(
                "WsmServer.WsmToCimRequestMapper.INVALID_EXPIRATION_TIME",
                "The expiration time \"$0\" is not valid", wsmDT));
        }
    }

    currentDT = CIMDateTime::getCurrentDateTime();
    if (dt.isInterval())
    {
        if (dt > maxInterval)
        {
            dt = maxInterval;
        }
        cimDT = currentDT + dt;
    }
    else
    {
        if ((dt <= currentDT))
        {
            throw WsmFault(
                WsmFault::wsen_InvalidExpirationTime,
            MessageLoaderParms(
                "WsmServer.WsmToCimRequestMapper.INVALID_EXPIRATION_TIME",
                "The expiration time \"$0\" is not valid", wsmDT));
        }

        if (dt - currentDT > maxInterval)
        {
            cimDT = currentDT + maxInterval;
        }
        else
        {
            cimDT = dt;
        }
    }
}

void WsmProcessor::cleanupExpiredContexts()
{
    CIMDateTime currentDT = CIMDateTime::getCurrentDateTime();
    Array<Uint64> expiredContextIds;
    Array<WsenEnumerateResponse*> expiredResponses;

    AutoMutex lock(_enumerationContextTableLock);
    for (EnumerationContextTable::Iterator i =
             _enumerationContextTable.start (); i; i++)
    {
        EnumerationContext context = i.value();
        if (context.expiration < currentDT)
        {
            expiredContextIds.append(context.contextId);
            expiredResponses.append(context.response);
        }
    }

    for (Uint32 i = 0; i < expiredContextIds.size(); i++)
    {
        delete expiredResponses[i];
        _enumerationContextTable.remove(expiredContextIds[i]);
    }
}

PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2