![]() ![]() |
![]() |
File: [Pegasus] / pegasus / src / Pegasus / ProviderManagerService / ProviderAgent / ProviderAgent.cpp
(download)
Revision: 1.41, Mon Jan 14 15:27:55 2013 UTC (11 years, 5 months ago) by marek Branch: MAIN CVS Tags: RELEASE_2_13_0-FC Changes since 1.40: +20 -62 lines BUG#:9530 TITLE: Reduce static code size through moving CIMResponseData up in Message class hierarchy DESCRIPTION: |
//%LICENSE//////////////////////////////////////////////////////////////// // // Licensed to The Open Group (TOG) under one or more contributor license // agreements. Refer to the OpenPegasusNOTICE.txt file distributed with // this work for additional information regarding copyright ownership. // Each contributor licenses this file to you under the OpenPegasus Open // Source License; you may not use this file except in compliance with the // License. // // Permission is hereby granted, free of charge, to any person obtaining a // copy of this software and associated documentation files (the "Software"), // to deal in the Software without restriction, including without limitation // the rights to use, copy, modify, merge, publish, distribute, sublicense, // and/or sell copies of the Software, and to permit persons to whom the // Software is furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included // in all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. // ////////////////////////////////////////////////////////////////////////// // //%///////////////////////////////////////////////////////////////////////////// #include <Pegasus/Common/Signal.h> #include <Pegasus/Common/Array.h> #include <Pegasus/Common/AutoPtr.h> #include <Pegasus/Common/AtomicInt.h> #include <Pegasus/Common/Tracer.h> #include <Pegasus/Config/ConfigManager.h> #include <Pegasus/Common/OperationContext.h> #include <Pegasus/Common/StringConversion.h> #include <Pegasus/ProviderManager2/Default/DefaultProviderManager.h> #if defined(PEGASUS_OS_ZOS) && defined(PEGASUS_ZOS_SECURITY) // This include file will not be provided in the OpenGroup CVS for now. // Do NOT try to include it in your compile #include <Pegasus/Common/safCheckzOS_inline.h> #endif #include "ProviderAgent.h" #ifdef PEGASUS_OS_PASE #include <ILEWrapper/ILEUtilities.h> #endif PEGASUS_USING_STD; PEGASUS_NAMESPACE_BEGIN // threadCreationFailureLogged will indicate if the thread limit related // msg has been logged already. This will help avoid flooding the syslog/audit // log with thread limit reached errors. static AtomicInt threadCreationFailureLogged(0); ///////////////////////////////////////////////////////////////////////////// // // ProviderAgentRequest // ///////////////////////////////////////////////////////////////////////////// /** This class encapsulates the data required by a work thread to process a request in a Provider Agent. */ class ProviderAgentRequest { public: ProviderAgentRequest(ProviderAgent* agent_, CIMRequestMessage* request_) { agent = agent_; request = request_; } ProviderAgent* agent; CIMRequestMessage* request; }; ///////////////////////////////////////////////////////////////////////////// // // ProviderAgent // ///////////////////////////////////////////////////////////////////////////// // Time values used in ThreadPool construction static struct timeval deallocateWait = {300, 0}; Semaphore ProviderAgent::_scmoClassDelivered(0); SCMOClass* ProviderAgent::_transferSCMOClass = 0; Mutex ProviderAgent::_transferSCMOClassMutex; String ProviderAgent::_transferSCMOClassRspMsgID; ProviderAgent* ProviderAgent::_providerAgent = 0; ProviderAgent::ProviderAgent( const String& agentId, AnonymousPipe* pipeFromServer, AnonymousPipe* pipeToServer) : _providerManagerRouter(_indicationCallback, _responseChunkCallback, DefaultProviderManager::createDefaultProviderManagerCallback), _threadPool(0, "ProviderAgent", 0, 0, deallocateWait) { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::ProviderAgent"); // Add the DefaultProviderManager to the router. _terminating = false; _agentId = agentId; _pipeFromServer = pipeFromServer; _pipeToServer = pipeToServer; _providerAgent = this; _isInitialised = false; _providersStopped = false; // Create a SCMOClass Cache and set call back for the repository. SCMOClassCache::getInstance()->setCallBack(_scmoClassCache_GetClass); PEG_METHOD_EXIT(); } ProviderAgent::~ProviderAgent() { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::~ProviderAgent"); _providerAgent = 0; // Destroy the singleton services SCMOClassCache::destroy(); if (_transferSCMOClass) { delete _transferSCMOClass; } PEG_METHOD_EXIT(); } void ProviderAgent::run() { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::run"); // Enable the signal handler to terminate gracefully on SIGHUP and SIGTERM getSigHandle()->registerHandler(PEGASUS_SIGHUP, _terminateSignalHandler); getSigHandle()->activate(PEGASUS_SIGHUP); getSigHandle()->registerHandler(PEGASUS_SIGTERM, _terminateSignalHandler); getSigHandle()->activate(PEGASUS_SIGTERM); // Restore the SIGCHLD signal behavior to its default action getSigHandle()->defaultAction(PEGASUS_SIGCHLD); #ifdef PEGASUS_OS_ZOS // Establish handling signal send to us on USS shutdown getSigHandle()->registerHandler(PEGASUS_SIGDANGER, _terminateSignalHandler); getSigHandle()->activate(PEGASUS_SIGDANGER); // enable process to receive SIGDANGER on USS shutdown __shutdown_registration(_SDR_NOTIFY, _SDR_REGPROCESS, _SDR_SENDSIGDANGER); #endif #ifdef PEGASUS_OS_PASE // PASE environment need more signal handler getSigHandle()->registerHandler(SIGFPE, _synchronousSignalHandler); getSigHandle()->activate(SIGFPE); getSigHandle()->registerHandler(SIGILL, _synchronousSignalHandler); getSigHandle()->activate(SIGILL); getSigHandle()->registerHandler(SIGSEGV, _synchronousSignalHandler); getSigHandle()->activate(SIGSEGV); getSigHandle()->registerHandler(SIGIO, _asynchronousSignalHandler); getSigHandle()->activate(SIGIO); #endif while (!_terminating) { Boolean active = true; try { // // Read and process the next request // active = _readAndProcessRequest(); } catch (Exception& e) { PEG_TRACE((TRC_PROVIDERAGENT, Tracer::LEVEL1, "Unexpected Exception from _readAndProcessRequest(): %s", (const char*)e.getMessage().getCString())); _terminating = true; } catch (...) { PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL1, "Unexpected exception from _readAndProcessRequest()."); _terminating = true; } if (_terminating) { if (!_providersStopped) { // // Stop all providers // CIMStopAllProvidersRequestMessage *stopRequest = new CIMStopAllProvidersRequestMessage("0", QueueIdStack(0)); Uint64 shutdownTimeout = 0; StringConversion::stringToUnsignedInteger( PEGASUS_DEFAULT_SHUTDOWN_TIMEOUT_SECONDS_STRING, shutdownTimeout); stopRequest->shutdownTimeout= Uint32(shutdownTimeout); _processStopAllProvidersRequest(stopRequest); } } else if (!active) { // // Stop agent process when no more providers are loaded // try { if (!_providerManagerRouter.hasActiveProviders() && (_threadPool.runningCount() == 0)) { PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL2, "No active providers. Exiting."); _terminating = true; } else { _threadPool.cleanupIdleThreads(); } } catch (...) { // Do not terminate the agent on this exception PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL1, "Unexpected exception from hasActiveProviders()"); } } } // Notify the cimserver that the provider agent is exiting cleanly. Uint32 messageLength = 0; _pipeToServer->writeBuffer((const char*)&messageLength, sizeof(Uint32)); PEG_METHOD_EXIT(); } Boolean ProviderAgent::_readAndProcessRequest() { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::_readAndProcessRequest"); CIMRequestMessage* request; // // Read the request from CIM Server // CIMMessage* cimMessage; AnonymousPipe::Status readStatus = _pipeFromServer->readMessage(cimMessage); request = dynamic_cast<CIMRequestMessage*>(cimMessage); // Read operation was interrupted if (readStatus == AnonymousPipe::STATUS_INTERRUPT) { PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL1, "Read operation was interrupted."); PEG_METHOD_EXIT(); return false; } if (readStatus == AnonymousPipe::STATUS_CLOSED) { // The CIM Server connection is closed PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL2, "CIMServer connection closed. Exiting."); _terminating = true; PEG_METHOD_EXIT(); return false; } if (readStatus == AnonymousPipe::STATUS_ERROR) { PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL1, "Error reading from pipe. Exiting."); Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING, MessageLoaderParms( "ProviderManager.ProviderAgent.ProviderAgent." "CIMSERVER_COMMUNICATION_FAILED", "cimprovagt \"$0\" communication with CIM Server failed. " "Exiting.", _agentId)); _terminating = true; PEG_METHOD_EXIT(); return false; } // The message was not a request message. if (request == 0) { // The message was not empty. if (0 != cimMessage ) { // The message was not a "wake up" message. if (cimMessage->getType() == PROVAGT_GET_SCMOCLASS_RESPONSE_MESSAGE) { PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL3, "Processing a SCMOClassResponseMessage."); AutoPtr<ProvAgtGetScmoClassResponseMessage> response( dynamic_cast<ProvAgtGetScmoClassResponseMessage*> (cimMessage)); PEGASUS_DEBUG_ASSERT(response.get()); _processGetSCMOClassResponse(response.get()); // The provider agent is still busy. PEG_METHOD_EXIT(); return true; } else if (cimMessage->getType() == CIM_PROCESS_INDICATION_RESPONSE_MESSAGE) { _handleIndicationDeliveryResponse( (CIMProcessIndicationResponseMessage*)cimMessage); PEG_METHOD_EXIT(); return true; } } // A "wake up" message means we should unload idle providers PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL4, "Got a wake up message."); if (!_providerManagerRouter.hasActiveProviders()) { // No active providers, so do not start an idle unload thread return false; } try { _unloadIdleProviders(); } catch (...) { // Ignore exceptions from idle provider unloading PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL2, "Ignoring exception from _unloadIdleProviders()"); } PEG_METHOD_EXIT(); return false; } PEG_TRACE((TRC_PROVIDERAGENT, Tracer::LEVEL3, "Received request from server with messageId %s", (const char*)request->messageId.getCString())); const AcceptLanguageListContainer acceptLang = request->operationContext.get(AcceptLanguageListContainer::NAME); Thread::setLanguages(acceptLang.getLanguages()); // Get the ProviderIdContainer to complete the provider module instance // optimization. If the provider module instance is blank (optimized // out), fill it in from our cache. If it is not blank, update our // cache. (See the _providerModuleCache member description.) if (request->operationContext.contains(ProviderIdContainer::NAME)) { ProviderIdContainer pidc = request->operationContext.get( ProviderIdContainer::NAME); if (pidc.getModule().isUninitialized()) { // Provider module is optimized out. Fill it in from the cache. ProviderIdContainer newpidc(_providerModuleCache, pidc.getProvider(), pidc.isRemoteNameSpace(), pidc.getRemoteInfo()); newpidc.setProvMgrPath(pidc.getProvMgrPath()); request->operationContext.set(newpidc); } else { // Update the cache with the new provider module instance. _providerModuleCache = pidc.getModule(); } } // // It never should be possible to receive a request other than "initialise" // before the provider agent is in state isInitialised // // Bail out. // if (!_isInitialised && (request->getType() != CIM_INITIALIZE_PROVIDER_AGENT_REQUEST_MESSAGE)) { Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING, MessageLoaderParms( "ProviderManager.ProviderAgent.ProviderAgent." "PROVIDERAGENT_NOT_INITIALIZED", "cimprovagt \"$0\" was not yet initialized " "prior to receiving a request message. Exiting.", _agentId)); _terminating = true; PEG_METHOD_EXIT(); return false; } // // Check for messages to be handled by the Agent itself. // if (request->getType() == CIM_INITIALIZE_PROVIDER_AGENT_REQUEST_MESSAGE) { // Process the request in this thread AutoPtr<CIMInitializeProviderAgentRequestMessage> ipaRequest( dynamic_cast<CIMInitializeProviderAgentRequestMessage*>(request)); PEGASUS_ASSERT(ipaRequest.get() != 0); ConfigManager::setPegasusHome(ipaRequest->pegasusHome); ConfigManager* configManager = ConfigManager::getInstance(); // Initialize the configuration properties for (Uint32 i = 0; i < ipaRequest->configProperties.size(); i++) { configManager->initCurrentValue( ipaRequest->configProperties[i].first, ipaRequest->configProperties[i].second); } // Set the default resource bundle directory for the MessageLoader MessageLoader::setPegasusMsgHome(ConfigManager::getHomedPath( configManager->getCurrentValue("messageDir"))); // Set the log file directory #if !defined(PEGASUS_USE_SYSLOGS) Logger::setHomeDirectory(ConfigManager::getHomedPath( configManager->getCurrentValue("logdir"))); #endif System::bindVerbose = ipaRequest->bindVerbose; // // Set _subscriptionInitComplete from value in // InitializeProviderAgent request // _providerManagerRouter.setSubscriptionInitComplete (ipaRequest->subscriptionInitComplete); PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL2, "Processed the agent initialization message."); // Notify the cimserver that the provider agent is initialized. Uint32 messageLength = 0; _pipeToServer->writeBuffer((const char*)&messageLength, sizeof(Uint32)); #if defined(PEGASUS_OS_ZOS) && defined(PEGASUS_ZOS_SECURITY) // prepare and setup the thread-level security environment on z/OS // if security initialization fails startupCheckBPXServer(false); startupCheckMSC(); if (!isZOSSecuritySetup()) { Logger::put_l(Logger::ERROR_LOG, ZOS_SECURITY_NAME, Logger::FATAL, MessageLoaderParms( "ProviderManager.ProviderAgent.ProviderAgent." "UNINITIALIZED_SECURITY_SETUP.PEGASUS_OS_ZOS", "Security environment could not be initialised. " "Assume security fraud. Stopping Provider Agent.")); exit(1); } #endif // provider agent is initialised and ready to go _isInitialised = true; } else if (request->getType() == CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE) { // Process the request in this thread AutoPtr<CIMNotifyConfigChangeRequestMessage> notifyRequest( dynamic_cast<CIMNotifyConfigChangeRequestMessage*>(request)); PEGASUS_ASSERT(notifyRequest.get() != 0); // // Update the ConfigManager with the new property value // ConfigManager* configManager = ConfigManager::getInstance(); CIMException responseException; try { if (notifyRequest->currentValueModified) { String userName = ((IdentityContainer) request->operationContext.get( IdentityContainer::NAME)).getUserName(); configManager->updateCurrentValue( notifyRequest->propertyName, notifyRequest->newPropertyValue, userName, 0, false); } else { configManager->updatePlannedValue( notifyRequest->propertyName, notifyRequest->newPropertyValue, true); } } catch (Exception& e) { responseException = PEGASUS_CIM_EXCEPTION( CIM_ERR_FAILED, e.getMessage()); } AutoPtr<CIMResponseMessage> response(notifyRequest->buildResponse()); response->cimException = responseException; // Return response to CIM Server _writeResponse(response.get()); } else if (request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE) { // Process the request in this thread AutoPtr<Message> response(_processRequest(request)); _writeResponse(response.get()); CIMResponseMessage * respMsg = dynamic_cast<CIMResponseMessage*>(response.get()); // If DisableModule not successful, leave agent process running. // If there are any active providers after DisableModule request // successful, this agent might be servicing the group of // provider modules, leave agent process running. if (((request->getType() == CIM_DISABLE_MODULE_REQUEST_MESSAGE) && (!_providerManagerRouter.hasActiveProviders()) && (respMsg->cimException.getCode() == CIM_ERR_SUCCESS))) { // Operation is successful. End the agent process. _providersStopped = true; _terminating = true; } delete request; } else if (request->getType() == CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE) { _processStopAllProvidersRequest(request); } else { // Start a new thread to process the request ProviderAgentRequest* agentRequest = new ProviderAgentRequest(this, request); ThreadStatus rtn = PEGASUS_THREAD_OK; while ((rtn = _threadPool.allocate_and_awaken(agentRequest, ProviderAgent::_processRequestAndWriteResponse)) != PEGASUS_THREAD_OK) { // Yield only for the following request. // CIM_INITIALIZE_PROVIDER_AGENT_REQUEST_MESSAGE, // CIM_NOTIFY_CONFIG_CHANGE_REQUEST_MESSAGE, // CIM_DISABLE_MODULE_REQUEST_MESSAGE, // CIM_STOP_ALL_PROVIDERS_REQUEST_MESSAGE, // CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE, // CIM_INDICATION_SERVICE_DISABLED_REQUEST_MESSAGE, // CIM_EXPORT_INDICATION_REQUEST_MESSAGE // All the above have already been handled differently // except for CIM_EXPORT_INDICATION_REQUEST_MESSAGE, // CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE and // CIM_INDICATION_SERVICE_DISABLED_REQUEST_MESSAGE. if (rtn == PEGASUS_THREAD_INSUFFICIENT_RESOURCES && (request->getType() == CIM_EXPORT_INDICATION_REQUEST_MESSAGE || request->getType() == CIM_SUBSCRIPTION_INIT_COMPLETE_REQUEST_MESSAGE || request->getType() == CIM_INDICATION_SERVICE_DISABLED_REQUEST_MESSAGE)) { Threads::yield(); } else { PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL1, "Could not allocate thread to process agent request."); MessageLoaderParms msgLoaderPrms( "ProviderManager.ProviderAgent.ProviderAgent." "THREAD_ALLOCATION_FAILED", "Failed to allocate a thread in cimprovagt \"$0\".", _agentId); AutoPtr<CIMResponseMessage> response(request->buildResponse()); response->cimException = PEGASUS_CIM_EXCEPTION_L( CIM_ERR_FAILED,msgLoaderPrms); // Return response to CIM Server _writeResponse(response.get()); // make an entry in syslog for this behaviour. if(threadCreationFailureLogged.get() == 0) { threadCreationFailureLogged.inc(); Logger::put_l(Logger::STANDARD_LOG, System::CIMSERVER,Logger::WARNING,msgLoaderPrms); } delete agentRequest; delete request; PEG_METHOD_EXIT(); return true; } } // Control will reach here only if the thread creation was successful. // Hence this is the right place to reset threadCreationFailureLogged // to that if the thread limit is reached again it is logged. if(threadCreationFailureLogged.get() == 1) { threadCreationFailureLogged.dec(); } } PEG_METHOD_EXIT(); return true; } void ProviderAgent::_processStopAllProvidersRequest(CIMRequestMessage* request) { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::_processStopAllProvidersRequest"); ProviderAgentRequest* agentRequest = new ProviderAgentRequest(this, request); Uint32 shutdownTimeout = ((CIMStopAllProvidersRequestMessage*)request)->shutdownTimeout; if ( _threadPool.allocate_and_awaken(agentRequest, ProviderAgent::_processRequestAndWriteResponse) != PEGASUS_THREAD_OK) { PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL1, "Could not allocate thread to process " "StopAllProvidersRequest. Exiting."); MessageLoaderParms msgLoaderPrms( "ProviderManager.ProviderAgent.ProviderAgent." "THREAD_ALLOCATION_FAILED", "Failed to allocate a thread in cimprovagt \"$0\".", _agentId); Logger::put_l( Logger::STANDARD_LOG, System::CIMSERVER, Logger::WARNING, msgLoaderPrms); exit(1); } // Wait until shutdownTimeout-1 seconds expires or // CIMStopAllprovidersRequestMessage is processed successfully. if (shutdownTimeout) { for (Uint32 i = 0; !_providersStopped && i < shutdownTimeout - 1 ; ++i) { Threads::yield(); Threads::sleep(1000); } } // If the shutdownTimeout expired, exit from here. Providers not // responding to the cleanup requests will cause this agent left as // orphaned process. // If there are agent threads running exit from here.If provider // is not responding cimprovagt may loop forever in ThreadPool // destructor waiting for running threads to become idle. if (!_providersStopped || _threadPool.runningCount()) { MessageLoaderParms msgLoaderPrms( "ProviderManager.ProviderAgent.ProviderAgent." "PROVIDERS_FAILED_TO_CLEANUP", "Providers in the agent \"$0\" have failed to cleanup within \"$1\"" " seconds during the shutdown. Provider agent terminated" " forcibly.", _agentId, shutdownTimeout); Logger::put_l( Logger::STANDARD_LOG, System::CIMSERVER, Logger::WARNING, msgLoaderPrms); exit(1); } _terminating = true; PEG_METHOD_EXIT(); } inline void _completeHostNameAndNamespace( CIMRequestMessage* request, Message* response) { MessageType msgType = request->getType(); if (msgType == CIM_ASSOCIATORS_REQUEST_MESSAGE || msgType == CIM_ASSOCIATOR_NAMES_REQUEST_MESSAGE || msgType == CIM_REFERENCES_REQUEST_MESSAGE || msgType == CIM_REFERENCE_NAMES_REQUEST_MESSAGE) { // can do this cast here since we know request to be one of the four // association request messages CIMOperationRequestMessage* reqMsg= (CIMOperationRequestMessage*) request; // Can use System::getHostName() reliably here since it was initialized // through the COnfigManager at start of ProviderAgent. CIMResponseDataMessage * rspMsg= (CIMResponseDataMessage*) response; CIMResponseData & rspData = rspMsg->getResponseData(); rspData.completeHostNameAndNamespace( System::getHostName(), reqMsg->nameSpace); } } Message* ProviderAgent::_processRequest(CIMRequestMessage* request) { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::_processRequest"); Message* response = 0; try { // Forward the request to the ProviderManager response = _providerManagerRouter.processMessage(request); // for association operations we need to complete hostname and // namespace before the response data gets binary encoded in _completeHostNameAndNamespace(request,response); } catch (Exception& e) { PEG_TRACE((TRC_PROVIDERAGENT, Tracer::LEVEL1, "Caught exception while processing request: %s", (const char*)e.getMessage().getCString())); CIMResponseMessage* cimResponse = request->buildResponse(); cimResponse->cimException = PEGASUS_CIM_EXCEPTION( CIM_ERR_FAILED, e.getMessage()); response = cimResponse; } catch (...) { PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL1, "Caught exception while processing request."); CIMResponseMessage* cimResponse = request->buildResponse(); cimResponse->cimException = PEGASUS_CIM_EXCEPTION( CIM_ERR_FAILED, String::EMPTY); response = cimResponse; } PEG_METHOD_EXIT(); return response; } void ProviderAgent::_processGetSCMOClassResponse( ProvAgtGetScmoClassResponseMessage* response) { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::_processGetSCMOClassResponse"); // // The provider agent requests a SCMOClass from the server by // _scmoClassCache_GetClass() // { AutoMutex mtx(_transferSCMOClassMutex); if (0 != _transferSCMOClass) { PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL1, "_transferSCMOClass was not cleand up. The previous " "ProvAgtGetScmoClassRequest's might have been timed-out."); delete _transferSCMOClass; _transferSCMOClass = 0; } // Copy class and messageID from response _transferSCMOClass = new SCMOClass(response->scmoClass); _transferSCMOClassRspMsgID = response->messageId; } // signal delivery of SCMOClass to _scmoClassCache_GetClass() _scmoClassDelivered.signal(); PEG_METHOD_EXIT(); } void ProviderAgent::_writeResponse(Message* message) { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::_writeResponse"); CIMMessage* response = dynamic_cast<CIMMessage*>(message); PEGASUS_ASSERT(response != 0); // // Write the response message to the pipe // try { // Use Mutex to prevent concurrent writes to the same pipe AutoMutex pipeLock(_pipeToServerMutex); AnonymousPipe::Status writeStatus = _pipeToServer->writeMessage(response); if (writeStatus != AnonymousPipe::STATUS_SUCCESS) { PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL1, "Error writing response to pipe."); Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING, MessageLoaderParms( "ProviderManager.ProviderAgent.ProviderAgent." "CIMSERVER_COMMUNICATION_FAILED", "cimprovagt \"$0\" communication with CIM Server failed. " "Exiting.", _agentId)); _terminating = true; } } catch (...) { PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL1, "Caught exception while writing response."); Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER, Logger::WARNING, MessageLoaderParms( "ProviderManager.ProviderAgent.ProviderAgent." "CIMSERVER_COMMUNICATION_FAILED", "cimprovagt \"$0\" communication with CIM Server failed. " "Exiting.", _agentId)); _terminating = true; } PEG_METHOD_EXIT(); } ThreadReturnType PEGASUS_THREAD_CDECL ProviderAgent::_processRequestAndWriteResponse(void* arg) { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::_processRequestAndWriteResponse"); AutoPtr<ProviderAgentRequest> agentRequest( reinterpret_cast<ProviderAgentRequest*>(arg)); PEGASUS_ASSERT(agentRequest.get() != 0); try { // Get the ProviderAgent and request message from the argument ProviderAgent* agent = agentRequest->agent; AutoPtr<CIMRequestMessage> request(agentRequest->request); const AcceptLanguageListContainer acceptLang = request->operationContext.get(AcceptLanguageListContainer::NAME); Thread::setLanguages(acceptLang.getLanguages()); // Process the request AutoPtr<Message> response(agent->_processRequest(request.get())); // Write the response agent->_writeResponse(response.get()); if (response->getType() == CIM_STOP_ALL_PROVIDERS_RESPONSE_MESSAGE) { agent->_providersStopped = true; } } catch (const Exception& e) { PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1, "Exiting _processRequestAndWriteResponse. Caught Exception: %s", (const char*)e.getMessage().getCString())); } catch (...) { PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1, "Caught unrecognized exception. " "Exiting _processRequestAndWriteResponse."); } PEG_METHOD_EXIT(); return(ThreadReturnType(0)); } void ProviderAgent::_handleIndicationDeliveryResponse( CIMProcessIndicationResponseMessage *response) { IndicationRouter::notify(response); } void ProviderAgent::_indicationCallback( CIMProcessIndicationRequestMessage* message) { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::_indicationCallback"); IndicationRouter router = IndicationRouter(message, _indicationDeliveryRoutine); router.deliverAndWaitForStatus(); PEG_METHOD_EXIT(); } void ProviderAgent::_indicationDeliveryRoutine( CIMProcessIndicationRequestMessage* message) { // Send request back to the server to process _providerAgent->_writeResponse(message); delete message; } void ProviderAgent::_responseChunkCallback( CIMRequestMessage* request, CIMResponseMessage* response) { PEG_METHOD_ENTER( TRC_PROVIDERAGENT, "ProviderAgent::_responseChunkCallback"); // Send request back to the server to process _providerAgent->_writeResponse(response); delete response; PEG_METHOD_EXIT(); } void ProviderAgent::_unloadIdleProviders() { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::_unloadIdleProviders"); ThreadStatus rtn = PEGASUS_THREAD_OK; // Ensure that only one _unloadIdleProvidersHandler thread runs at a time _unloadIdleProvidersBusy++; if ((_unloadIdleProvidersBusy.get() == 1) && ((rtn =_threadPool.allocate_and_awaken( (void*)this, ProviderAgent::_unloadIdleProvidersHandler)) == PEGASUS_THREAD_OK)) { // _unloadIdleProvidersBusy is decremented in // _unloadIdleProvidersHandler } else { // If we fail to allocate a thread, don't retry now. _unloadIdleProvidersBusy--; } if (rtn != PEGASUS_THREAD_OK) { PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL1, "Could not allocate thread to unload idle providers."); } PEG_METHOD_EXIT(); } ThreadReturnType PEGASUS_THREAD_CDECL ProviderAgent::_unloadIdleProvidersHandler(void* arg) throw() { try { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::unloadIdleProvidersHandler"); ProviderAgent* myself = reinterpret_cast<ProviderAgent*>(arg); try { myself->_providerManagerRouter.idleTimeCleanup(); } catch (...) { // Ignore errors PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL2, "Unexpected exception in _unloadIdleProvidersHandler"); } myself->_unloadIdleProvidersBusy--; } catch (...) { // Ignore errors try { PEG_TRACE_CSTRING(TRC_PROVIDERAGENT, Tracer::LEVEL2, "Unexpected exception in _unloadIdleProvidersHandler"); } catch (...) { } } // PEG_METHOD_EXIT(); // Note: This statement could throw an exception return(ThreadReturnType(0)); } void ProviderAgent::_terminateSignalHandler( int s_n, PEGASUS_SIGINFO_T* s_info, void* sig) { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::_terminateSignalHandler"); if (_providerAgent != 0) { _providerAgent->_terminating = true; } PEG_METHOD_EXIT(); } SCMOClass ProviderAgent::_scmoClassCache_GetClass( const CIMNamespaceName& nameSpace, const CIMName& className) { PEG_METHOD_ENTER(TRC_PROVIDERAGENT, "ProviderAgent::_scmoClassCache_GetClass"); String requestID = XmlWriter::getNextMessageId(); // create message ProvAgtGetScmoClassRequestMessage* message = new ProvAgtGetScmoClassRequestMessage( requestID, nameSpace, className, QueueIdStack()); // Send the request for the SCMOClass to the server _providerAgent->_writeResponse(message); delete message; SCMOClass scmoClass = SCMOClass("",""); for(;;) { // Wait for semaphore signaled by _readAndProcessRequest() if (!_scmoClassDelivered.time_wait( PEGASUS_DEFAULT_CLIENT_TIMEOUT_MILLISECONDS)) { PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1, "Timed-out waiting for SCMOClass for " "Name Space Name '%s' Class Name '%s'", (const char*)nameSpace.getString().getCString(), (const char*)className.getString().getCString())); break; } AutoMutex mtx(_transferSCMOClassMutex); if ( 0 == _transferSCMOClass) { PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1, "No SCMOClass received for Name Space Name '%s' " "Class Name '%s'", (const char*)nameSpace.getString().getCString(), (const char*)className.getString().getCString())); break; } // Verify if we have actually received the response for our // request. This may happen when previous requests have timed out. if (_transferSCMOClassRspMsgID != requestID) { delete _transferSCMOClass; _transferSCMOClass = 0; continue; } // Create a local copy. scmoClass = SCMOClass(*_transferSCMOClass); // Delete the transferred instance. delete _transferSCMOClass; _transferSCMOClass = 0; break; } PEG_METHOD_EXIT(); return scmoClass; } // // PASE environment for synchronousSignal and asynchronousSignal // #ifdef PEGASUS_OS_PASE void ProviderAgent::_synchronousSignalHandler( int s_n, PEGASUS_SIGINFO_T* s_info, void* sig) { static bool mark = false; if (mark) exit(1); mark = true; if (_providerAgent != 0) { _providerAgent->_terminating = true; } char fullJobName[29]; umeGetJobName(fullJobName, true); Logger::put_l(Logger::ERROR_LOG, "provider agent", Logger::SEVERE, MessageLoaderParms( "ProviderManager.ProviderAgent.RECEIVE_SYN_SIGNAL.PEGASUS_OS_PASE", "$0 received synchronous signal: $1", fullJobName, s_n)); } void ProviderAgent::_asynchronousSignalHandler( int s_n, PEGASUS_SIGINFO_T* s_info, void* sig) { static bool amark = false; if (amark) exit(1); amark = true; if (_providerAgent != 0) { _providerAgent->_terminating = true; } char fullJobName[29]; umeGetJobName(fullJobName, true); Logger::put_l(Logger::ERROR_LOG, "provider agent", Logger::SEVERE, MessageLoaderParms( "ProviderManager.ProviderAgent.RECEIVE_ASYN_SIGNAL.PEGASUS_OS_PASE", "$0 received asynchronous signal: $1", fullJobName, s_n)); } #endif PEGASUS_NAMESPACE_END
No CVS admin address has been configured |
Powered by ViewCVS 0.9.2 |