version 1.67, 2005/05/06 21:44:28
|
version 1.75.2.1, 2006/02/28 19:53:30
|
|
|
//%2005//////////////////////////////////////////////////////////////////////// |
//%2006//////////////////////////////////////////////////////////////////////// |
// | // |
// Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development | // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development |
// Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems. | // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems. |
|
|
// IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group. | // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group. |
// Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.; | // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.; |
// EMC Corporation; VERITAS Software Corporation; The Open Group. | // EMC Corporation; VERITAS Software Corporation; The Open Group. |
|
// Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.; |
|
// EMC Corporation; Symantec Corporation; The Open Group. |
// | // |
// Permission is hereby granted, free of charge, to any person obtaining a copy | // Permission is hereby granted, free of charge, to any person obtaining a copy |
// of this software and associated documentation files (the "Software"), to | // of this software and associated documentation files (the "Software"), to |
|
|
// Amit K Arora (amita@in.ibm.com) for PEP-101 | // Amit K Arora (amita@in.ibm.com) for PEP-101 |
// Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com) | // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com) |
// Seema Gupta (gseema@in.ibm.com for PEP135) | // Seema Gupta (gseema@in.ibm.com for PEP135) |
|
// Jim Wunderlich (Jim_Wunderlich@prodigy.net) |
|
// Aruran, IBM (ashanmug@in.ibm.com)for Bug# 3881 |
// | // |
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
| |
|
|
#ifdef PEGASUS_DISABLE_PROV_USERCTXT | #ifdef PEGASUS_DISABLE_PROV_USERCTXT |
if (forceProviderProcesses) | if (forceProviderProcesses) |
{ | { |
_oopProviderManagerRouter = |
_oopProviderManagerRouter = new OOPProviderManagerRouter( |
new OOPProviderManagerRouter(indicationCallback); |
indicationCallback, responseChunkCallback); |
} | } |
else | else |
{ | { |
_basicProviderManagerRouter = |
_basicProviderManagerRouter = new BasicProviderManagerRouter( |
new BasicProviderManagerRouter(indicationCallback); |
indicationCallback, responseChunkCallback); |
} | } |
#else | #else |
_oopProviderManagerRouter = |
_oopProviderManagerRouter = new OOPProviderManagerRouter( |
new OOPProviderManagerRouter(indicationCallback); |
indicationCallback, responseChunkCallback); |
| |
if (!forceProviderProcesses) | if (!forceProviderProcesses) |
{ | { |
_basicProviderManagerRouter = |
_basicProviderManagerRouter = new BasicProviderManagerRouter( |
new BasicProviderManagerRouter(indicationCallback); |
indicationCallback, responseChunkCallback); |
} | } |
#endif | #endif |
} | } |
|
|
request->op->processing(); | request->op->processing(); |
| |
_incomingQueue.enqueue(request->op); | _incomingQueue.enqueue(request->op); |
|
ThreadStatus rtn = PEGASUS_THREAD_OK; |
while (!_thread_pool->allocate_and_awaken( |
while (( rtn =_thread_pool->allocate_and_awaken( |
(void *)this, ProviderManagerService::handleCimOperation)) |
(void *)this, ProviderManagerService::handleCimOperation)) != PEGASUS_THREAD_OK) |
{ | { |
|
if (rtn==PEGASUS_THREAD_INSUFFICIENT_RESOURCES) |
pegasus_yield(); | pegasus_yield(); |
|
else |
|
{ |
|
Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE, |
|
"Not enough threads to service provider manager." ); |
|
|
|
Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2, |
|
"Could not allocate thread for %s.", |
|
getQueueName()); |
|
break; |
|
} |
} | } |
} | } |
else | else |
|
|
| |
if (msg != 0) | if (msg != 0) |
{ | { |
AcceptLanguages* langs = new AcceptLanguages( |
AcceptLanguageList* langs = new AcceptLanguageList( |
((AcceptLanguageListContainer)msg->operationContext.get( | ((AcceptLanguageListContainer)msg->operationContext.get( |
AcceptLanguageListContainer::NAME)).getLanguages()); | AcceptLanguageListContainer::NAME)).getLanguages()); |
Thread::setLanguages(langs); | Thread::setLanguages(langs); |
|
|
"ProviderManager.ProviderManagerService.PROVIDER_BLOCKED", | "ProviderManager.ProviderManagerService.PROVIDER_BLOCKED", |
"provider blocked.")); | "provider blocked.")); |
response = cimResponse; | response = cimResponse; |
|
|
|
STAT_COPYDISPATCHER |
} | } |
else | else |
{ | { |
|
|
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
} | } |
| |
void |
void ProviderManagerService::responseChunkCallback( |
ProviderManagerService::handleCimResponse(CIMRequestMessage &request, |
CIMRequestMessage* request, |
CIMResponseMessage &response) |
CIMResponseMessage* response) |
{ | { |
CIMStatusCode code = CIM_ERR_SUCCESS; | CIMStatusCode code = CIM_ERR_SUCCESS; |
String message; | String message; |
|
|
{ | { |
// only incomplete messages are processed because the caller ends up | // only incomplete messages are processed because the caller ends up |
// sending the complete() stage | // sending the complete() stage |
PEGASUS_ASSERT(response.isComplete() == false); |
PEGASUS_ASSERT(response->isComplete() == false); |
| |
AsyncLegacyOperationStart *requestAsync = | AsyncLegacyOperationStart *requestAsync = |
dynamic_cast<AsyncLegacyOperationStart *>(request._async); |
dynamic_cast<AsyncLegacyOperationStart *>(request->_async); |
PEGASUS_ASSERT(requestAsync); | PEGASUS_ASSERT(requestAsync); |
AsyncOpNode *op = requestAsync->op; | AsyncOpNode *op = requestAsync->op; |
PEGASUS_ASSERT(op); | PEGASUS_ASSERT(op); |
PEGASUS_ASSERT(! response._async); |
PEGASUS_ASSERT(!response->_async); |
response._async = new AsyncLegacyOperationResult |
response->_async = new AsyncLegacyOperationResult( |
(requestAsync->getKey(), requestAsync->getRouting(), op, &response); |
requestAsync->getKey(), requestAsync->getRouting(), op, response); |
| |
// set the destination | // set the destination |
op->_op_dest = op->_callback_response_q; | op->_op_dest = op->_callback_response_q; |
|
|
} | } |
| |
if (code != CIM_ERR_SUCCESS) | if (code != CIM_ERR_SUCCESS) |
response.cimException = PEGASUS_CIM_EXCEPTION(code, message); |
response->cimException = PEGASUS_CIM_EXCEPTION(code, message); |
} | } |
| |
Message* ProviderManagerService::_processMessage(CIMRequestMessage* request) | Message* ProviderManagerService::_processMessage(CIMRequestMessage* request) |
|
|
{ | { |
PEG_METHOD_ENTER(TRC_PROVIDERMANAGER, | PEG_METHOD_ENTER(TRC_PROVIDERMANAGER, |
"ProviderManagerService::unloadIdleProviders"); | "ProviderManagerService::unloadIdleProviders"); |
|
ThreadStatus rtn = PEGASUS_THREAD_OK; |
// Ensure that only one _unloadIdleProvidersHandler thread runs at a time | // Ensure that only one _unloadIdleProvidersHandler thread runs at a time |
_unloadIdleProvidersBusy++; | _unloadIdleProvidersBusy++; |
if ((_unloadIdleProvidersBusy.value() == 1) && |
if ((_unloadIdleProvidersBusy.get() == 1) && |
(_thread_pool->allocate_and_awaken( |
((rtn = _thread_pool->allocate_and_awaken( |
(void*)this, ProviderManagerService::_unloadIdleProvidersHandler))) |
(void*)this, ProviderManagerService::_unloadIdleProvidersHandler))==PEGASUS_THREAD_OK)) |
{ | { |
// _unloadIdleProvidersBusy is decremented in | // _unloadIdleProvidersBusy is decremented in |
// _unloadIdleProvidersHandler | // _unloadIdleProvidersHandler |
|
|
// If we fail to allocate a thread, don't retry now. | // If we fail to allocate a thread, don't retry now. |
_unloadIdleProvidersBusy--; | _unloadIdleProvidersBusy--; |
} | } |
|
if (rtn != PEGASUS_THREAD_OK) |
|
{ |
|
Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE, |
|
"Not enough threads to unload idle providers."); |
| |
|
Tracer::trace(TRC_PROVIDERMANAGER, Tracer::LEVEL2, |
|
"Could not allocate thread for %s to unload idle providers.", |
|
getQueueName()); |
|
} |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
} | } |
| |
|
|
AcceptLanguageListContainer cntr = request->operationContext.get(AcceptLanguageListContainer::NAME); | AcceptLanguageListContainer cntr = request->operationContext.get(AcceptLanguageListContainer::NAME); |
}catch(const Exception &) | }catch(const Exception &) |
{ | { |
request->operationContext.insert(AcceptLanguageListContainer(AcceptLanguages::EMPTY)); |
request->operationContext.insert(AcceptLanguageListContainer(AcceptLanguageList())); |
} | } |
| |
if (_indicationServiceQueueId == PEG_NOT_FOUND) | if (_indicationServiceQueueId == PEG_NOT_FOUND) |
|
|
_indicationServiceQueueId); | _indicationServiceQueueId); |
| |
providerManagerService->SendForget(asyncRequest); | providerManagerService->SendForget(asyncRequest); |
|
|
|
|
|
|
|
|
|
#ifdef PEGASUS_INDICATIONS_Q_THRESHOLD |
|
|
|
// See Comments in config.mak asociated with |
|
// PEGASUS_INDICATIONS_Q_THRESHOLD |
|
// |
|
// if INDICATIONS_Q_STALL THRESHOLD is gt 0 |
|
// then if there are over INDICATIONS_Q_STALL_THRESHOLD |
|
// indications in the queue |
|
// then force this provider to sleep until the queue count |
|
// is lower than INDICATIONS_Q_RESUME_THRESHOLD |
|
|
|
static Mutex indicationThresholdReportedLock; |
|
static Boolean indicationThresholdReported = false; |
|
|
|
#define INDICATIONS_Q_STALL_THRESHOLD PEGASUS_INDICATIONS_Q_THRESHOLD |
|
#define INDICATIONS_Q_RESUME_THRESHOLD (int)(PEGASUS_INDICATIONS_Q_THRESHOLD*.90) |
|
#define INDICATIONS_Q_STALL_DURATION 250 // milli-seconds |
|
|
|
MessageQueue * indicationsQueue = MessageQueue::lookup(_indicationServiceQueueId); |
|
|
|
if (((MessageQueueService *)indicationsQueue)->getIncomingCount() > INDICATIONS_Q_STALL_THRESHOLD) |
|
{ |
|
AutoMutex indicationThresholdReportedAutoMutex(indicationThresholdReportedLock); |
|
if (!indicationThresholdReported) |
|
{ |
|
indicationThresholdReported = true; |
|
indicationThresholdReportedAutoMutex.unlock(); |
|
|
|
// make log entry to record que max exceeded |
|
|
|
Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::INFORMATION, |
|
"Indication generation stalled: maximum queue count ($0) exceeded.", |
|
INDICATIONS_Q_STALL_THRESHOLD); |
|
} |
|
else |
|
{ |
|
indicationThresholdReportedAutoMutex.unlock(); |
|
} |
|
|
|
while (((MessageQueueService *)indicationsQueue)->getIncomingCount() > INDICATIONS_Q_RESUME_THRESHOLD) |
|
{ |
|
pegasus_sleep(INDICATIONS_Q_STALL_DURATION); |
|
} |
|
|
|
AutoMutex indicationThresholdReportedAutoMutex1(indicationThresholdReportedLock); |
|
// indicationThresholdReportedLock.lock(pegasus_thread_self()); |
|
if(indicationThresholdReported) |
|
{ |
|
indicationThresholdReported = false; |
|
indicationThresholdReportedAutoMutex1.unlock(); |
|
|
|
Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::INFORMATION, |
|
"Indication generation resumed: current queue count = $0", |
|
((MessageQueueService *)indicationsQueue)->getIncomingCount() ); |
|
|
|
} |
|
else |
|
{ |
|
indicationThresholdReportedAutoMutex1.unlock(); |
|
} |
|
} |
|
#endif /* INDICATIONS_Q_STALL_THRESHOLD */ |
|
|
} | } |
| |
PEGASUS_NAMESPACE_END | PEGASUS_NAMESPACE_END |