version 1.4, 2010/11/02 05:27:58
|
version 1.4.2.2, 2012/02/15 17:47:17
|
|
|
#include <Pegasus/Common/AutoPtr.h> | #include <Pegasus/Common/AutoPtr.h> |
#include <Pegasus/Common/ArrayInternal.h> | #include <Pegasus/Common/ArrayInternal.h> |
#include <Pegasus/Common/CIMMessage.h> | #include <Pegasus/Common/CIMMessage.h> |
#include <Pegasus/Common/CIMMessageSerializer.h> |
|
#include <Pegasus/Common/CIMMessageDeserializer.h> |
|
#include <Pegasus/Common/OperationContextInternal.h> | #include <Pegasus/Common/OperationContextInternal.h> |
#include <Pegasus/Common/System.h> | #include <Pegasus/Common/System.h> |
#include <Pegasus/Common/AnonymousPipe.h> | #include <Pegasus/Common/AnonymousPipe.h> |
|
|
intervals along with unloadIdleProviders | intervals along with unloadIdleProviders |
*/ | */ |
void cleanDisconnectedClientRequests(); | void cleanDisconnectedClientRequests(); |
|
static void setAllProvidersStopped(); |
|
void sendResponse(CIMResponseMessage *response); |
private: | private: |
// | // |
// Private methods | // Private methods |
|
|
*/ | */ |
ThreadPool* _threadPool; | ThreadPool* _threadPool; |
| |
|
static Boolean _allProvidersStopped; |
}; | }; |
| |
Uint32 ProviderAgentContainer::_numProviderProcesses = 0; | Uint32 ProviderAgentContainer::_numProviderProcesses = 0; |
Mutex ProviderAgentContainer::_numProviderProcessesMutex; | Mutex ProviderAgentContainer::_numProviderProcessesMutex; |
|
Boolean ProviderAgentContainer::_allProvidersStopped = false; |
| |
// Set this to a value that no valid CIMResponseMessage* will have. | // Set this to a value that no valid CIMResponseMessage* will have. |
CIMResponseMessage* ProviderAgentContainer::_REQUEST_NOT_PROCESSED = | CIMResponseMessage* ProviderAgentContainer::_REQUEST_NOT_PROCESSED = |
|
|
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
} | } |
| |
|
void ProviderAgentContainer::setAllProvidersStopped() |
|
{ |
|
_allProvidersStopped = true; |
|
} |
|
|
void ProviderAgentContainer::_startAgentProcess() | void ProviderAgentContainer::_startAgentProcess() |
{ | { |
PEG_METHOD_ENTER( | PEG_METHOD_ENTER( |
|
|
| |
try | try |
{ | { |
|
CIMException cimException; |
|
if (!cleanShutdown) |
|
{ |
|
cimException = PEGASUS_CIM_EXCEPTION( |
|
CIM_ERR_FAILED, |
|
MessageLoaderParms( |
|
"ProviderManager.OOPProviderManagerRouter." |
|
"CIMPROVAGT_CONNECTION_LOST", |
|
"Lost connection with cimprovagt \"$0\".", |
|
_moduleOrGroupName)); |
|
} |
|
|
AutoMutex lock(_agentMutex); | AutoMutex lock(_agentMutex); |
| |
PEGASUS_ASSERT(_isInitialized); | PEGASUS_ASSERT(_isInitialized); |
|
|
Boolean sendResponseNow = false; | Boolean sendResponseNow = false; |
CIMResponseMessage *response; | CIMResponseMessage *response; |
| |
if(cleanShutdown) |
|
{ |
|
MessageType msgType = i.value()->requestMessage->getType(); | MessageType msgType = i.value()->requestMessage->getType(); |
| |
|
// Note: Whether this agent was shutdown cleanly or not, |
|
// for the below requests wait until all responses are |
|
// received. |
if(msgType == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE || | if(msgType == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE || |
msgType == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE || | msgType == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE || |
msgType == | msgType == |
CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE || | CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE || |
msgType == | msgType == |
CIM_INDICATION_SERVICE_DISABLED_REQUEST_MESSAGE || | CIM_INDICATION_SERVICE_DISABLED_REQUEST_MESSAGE || |
msgType == CIM_DELETE_SUBSCRIPTION_REQUEST_MESSAGE) |
msgType == CIM_ENABLE_MODULE_REQUEST_MESSAGE || |
|
msgType == CIM_DISABLE_MODULE_REQUEST_MESSAGE) |
{ | { |
CIMException ce; |
PEGASUS_ASSERT(i.value()->respAggregator); |
if(i.value()->respAggregator == NULL || |
if(i.value()->respAggregator->isComplete(cimException)) |
i.value()->respAggregator->isComplete(ce)) |
{ |
|
CIMException cimException; |
|
sendResponseNow = true; |
|
|
|
if (msgType == CIM_ENABLE_MODULE_REQUEST_MESSAGE || |
|
msgType == CIM_DISABLE_MODULE_REQUEST_MESSAGE) |
|
{ |
|
CIMException e = |
|
i.value()->respAggregator->getException(); |
|
if (e.getCode() == CIM_ERR_SUCCESS) |
|
{ |
|
retryReqArray.append(i.value()->requestMessage); |
|
sendResponseNow = false; |
|
} |
|
else |
|
{ |
|
cimException = e; |
|
} |
|
} |
|
|
|
if (sendResponseNow) |
{ | { |
response = | response = |
i.value()->requestMessage->buildResponse(); | i.value()->requestMessage->buildResponse(); |
response->messageId = i.value()->originalMessageId; | response->messageId = i.value()->originalMessageId; |
response->cimException = ce; |
response->cimException = cimException; |
sendResponseNow = true; | sendResponseNow = true; |
} | } |
|
delete i.value()->respAggregator; |
} | } |
else |
} |
|
else if (msgType == CIM_DELETE_SUBSCRIPTION_REQUEST_MESSAGE) |
|
{ |
|
response = i.value()->requestMessage->buildResponse(); |
|
response->messageId = i.value()->originalMessageId; |
|
sendResponseNow = true; |
|
} |
|
else if (cleanShutdown) |
{ | { |
// retry the request | // retry the request |
retryReqArray.append(i.value()->requestMessage); | retryReqArray.append(i.value()->requestMessage); |
} | } |
} |
|
else | else |
{ | { |
|
// Requests with respAggregator set were already handled |
|
// before. |
|
PEGASUS_ASSERT(!i.value()->respAggregator); |
response = i.value()->requestMessage->buildResponse(); | response = i.value()->requestMessage->buildResponse(); |
response->cimException = PEGASUS_CIM_EXCEPTION( |
response->cimException = cimException; |
CIM_ERR_FAILED, |
|
MessageLoaderParms( |
|
"ProviderManager.OOPProviderManagerRouter." |
|
"CIMPROVAGT_CONNECTION_LOST", |
|
"Lost connection with cimprovagt \"$0\".", |
|
_moduleOrGroupName)); |
|
sendResponseNow = true; | sendResponseNow = true; |
} | } |
| |
|
|
i.value()->requestMessage, | i.value()->requestMessage, |
response); | response); |
} | } |
// delete the response aggregator |
|
delete i.value()->respAggregator; |
|
} | } |
_outstandingRequestTable.clear(); | _outstandingRequestTable.clear(); |
} | } |
|
|
return _groupNameWithType; | return _groupNameWithType; |
} | } |
| |
|
void ProviderAgentContainer::sendResponse(CIMResponseMessage *response) |
|
{ |
|
AutoMutex lock(_agentMutex); |
|
|
|
AnonymousPipe::Status writeStatus = |
|
_pipeToAgent->writeMessage(response); |
|
if (writeStatus != AnonymousPipe::STATUS_SUCCESS) |
|
{ |
|
PEG_TRACE((TRC_PROVIDERMANAGER, Tracer::LEVEL1, |
|
"Failed to write message to pipe. writeStatus = %d.", |
|
writeStatus)); |
|
} |
|
delete response; |
|
} |
|
|
CIMResponseMessage* ProviderAgentContainer::processMessage( | CIMResponseMessage* ProviderAgentContainer::processMessage( |
CIMRequestMessage* request,RespAggCounter* respAggregator) | CIMRequestMessage* request,RespAggCounter* respAggregator) |
{ | { |
|
|
} | } |
} while (response == _REQUEST_NOT_PROCESSED); | } while (response == _REQUEST_NOT_PROCESSED); |
| |
if (msgType == CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE) |
|
{ |
|
_subscriptionInitComplete = true; |
|
} |
|
else if (msgType == |
|
CIM_INDICATION_SERVICE_DISABLED_REQUEST_MESSAGE) |
|
{ |
|
_subscriptionInitComplete = false; |
|
} |
|
|
|
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return response; | return response; |
} | } |
|
|
{ | { |
AutoMutex lock(_agentMutex); | AutoMutex lock(_agentMutex); |
| |
|
// Don't process any other messages if _allProvidersStopped flag |
|
// is set. CIMServer hangs during the shutdown if the agent is |
|
// started to process a request after StopAllProviders request |
|
// has been processed. This scenario may happen if provider |
|
// generates indication during the shutdwon whose destination is |
|
// indication consumer provider running within cimserver. |
|
if (_allProvidersStopped && |
|
request->getType() != CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) |
|
{ |
|
//Note: l11n is not necessary, not propagated to client. |
|
CIMException e = CIMException( |
|
CIM_ERR_FAILED, |
|
"Request not processed, CIMServer shutting down"); |
|
if (!respAggregator || respAggregator->isComplete(e)) |
|
{ |
|
|
|
PEG_TRACE((TRC_PROVIDERMANAGER, Tracer::LEVEL1, |
|
"Exception: %s", |
|
(const char*)e.getMessage().getCString())); |
|
response = request->buildResponse(); |
|
delete respAggregator; |
|
PEG_METHOD_EXIT(); |
|
return response; |
|
} |
|
} |
|
|
// | // |
// Initialize the Provider Agent, if necessary | // Initialize the Provider Agent, if necessary |
// | // |
|
|
origProviderId.reset(new ProviderIdContainer( | origProviderId.reset(new ProviderIdContainer( |
pidc.getModule(), pidc.getProvider(), | pidc.getModule(), pidc.getProvider(), |
pidc.isRemoteNameSpace(), pidc.getRemoteInfo())); | pidc.isRemoteNameSpace(), pidc.getRemoteInfo())); |
|
origProviderId->setProvMgrPath(pidc.getProvMgrPath()); |
if (_providerModuleCache.isUninitialized() || | if (_providerModuleCache.isUninitialized() || |
(!pidc.getModule().identical(_providerModuleCache))) | (!pidc.getModule().identical(_providerModuleCache))) |
{ | { |
|
|
| |
AutoPtr<ProvAgtGetScmoClassResponseMessage> response( | AutoPtr<ProvAgtGetScmoClassResponseMessage> response( |
new ProvAgtGetScmoClassResponseMessage( | new ProvAgtGetScmoClassResponseMessage( |
XmlWriter::getNextMessageId(), |
request->messageId, |
CIMException(), | CIMException(), |
QueueIdStack(), | QueueIdStack(), |
SCMOClass("",""))); | SCMOClass("",""))); |
|
|
if (message->getType() == CIM_PROCESS_INDICATION_REQUEST_MESSAGE) | if (message->getType() == CIM_PROCESS_INDICATION_REQUEST_MESSAGE) |
{ | { |
// Process an indication message | // Process an indication message |
|
CIMProcessIndicationRequestMessage* request = |
_indicationCallback( |
|
reinterpret_cast<CIMProcessIndicationRequestMessage*>( | reinterpret_cast<CIMProcessIndicationRequestMessage*>( |
message)); |
message); |
|
request->oopAgentName = getGroupNameWithType(); |
|
_indicationCallback(request); |
} | } |
else if (message->getType()==PROVAGT_GET_SCMOCLASS_REQUEST_MESSAGE) | else if (message->getType()==PROVAGT_GET_SCMOCLASS_REQUEST_MESSAGE) |
{ | { |
|
|
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
} | } |
| |
|
void OOPProviderManagerRouter::_handleIndicationDeliveryResponse( |
|
CIMResponseMessage *response) |
|
{ |
|
if (response->getType() == CIM_PROCESS_INDICATION_RESPONSE_MESSAGE) |
|
{ |
|
CIMProcessIndicationResponseMessage *rsp = |
|
(CIMProcessIndicationResponseMessage*)response; |
|
|
|
// Look up the Provider Agents for this module |
|
Array<ProviderAgentContainer*> paArray = |
|
_lookupProviderAgents(rsp->oopAgentName); |
|
|
|
for (Uint32 i = 0; i < paArray.size(); i++) |
|
{ |
|
if (paArray[i]->isInitialized()) |
|
{ |
|
paArray[i]->sendResponse(response); |
|
} |
|
} |
|
return; |
|
} |
|
|
|
PEGASUS_ASSERT(false); |
|
} |
|
|
Message* OOPProviderManagerRouter::processMessage(Message* message) | Message* OOPProviderManagerRouter::processMessage(Message* message) |
{ | { |
PEG_METHOD_ENTER(TRC_PROVIDERMANAGER, | PEG_METHOD_ENTER(TRC_PROVIDERMANAGER, |
"OOPProviderManagerRouter::processMessage"); | "OOPProviderManagerRouter::processMessage"); |
| |
|
if (message->getType() == CIM_PROCESS_INDICATION_RESPONSE_MESSAGE) |
|
{ |
|
_handleIndicationDeliveryResponse((CIMResponseMessage*)message); |
|
return 0; |
|
} |
|
|
CIMRequestMessage* request = dynamic_cast<CIMRequestMessage *>(message); | CIMRequestMessage* request = dynamic_cast<CIMRequestMessage *>(message); |
PEGASUS_ASSERT(request != 0); | PEGASUS_ASSERT(request != 0); |
| |
|
|
// | // |
if (request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) | if (request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) |
{ | { |
|
ProviderAgentContainer::setAllProvidersStopped(); |
// Forward the CIMStopAllProvidersRequest to all providers | // Forward the CIMStopAllProvidersRequest to all providers |
response.reset(_forwardRequestToAllAgents(request)); | response.reset(_forwardRequestToAllAgents(request)); |
| |