version 1.151, 2008/11/05 05:24:34
|
version 1.159, 2008/12/16 18:56:00
|
|
|
//%2006//////////////////////////////////////////////////////////////////////// |
//%LICENSE//////////////////////////////////////////////////////////////// |
// | // |
// Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development |
// Licensed to The Open Group (TOG) under one or more contributor license |
// Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems. |
// agreements. Refer to the OpenPegasusNOTICE.txt file distributed with |
// Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.; |
// this work for additional information regarding copyright ownership. |
// IBM Corp.; EMC Corporation, The Open Group. |
// Each contributor licenses this file to you under the OpenPegasus Open |
// Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.; |
// Source License; you may not use this file except in compliance with the |
// IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group. |
// License. |
// Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.; |
|
// EMC Corporation; VERITAS Software Corporation; The Open Group. |
|
// Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.; |
|
// EMC Corporation; Symantec Corporation; The Open Group. |
|
// | // |
// Permission is hereby granted, free of charge, to any person obtaining a copy |
// Permission is hereby granted, free of charge, to any person obtaining a |
// of this software and associated documentation files (the "Software"), to |
// copy of this software and associated documentation files (the "Software"), |
// deal in the Software without restriction, including without limitation the |
// to deal in the Software without restriction, including without limitation |
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or |
// the rights to use, copy, modify, merge, publish, distribute, sublicense, |
// sell copies of the Software, and to permit persons to whom the Software is |
// and/or sell copies of the Software, and to permit persons to whom the |
// furnished to do so, subject to the following conditions: |
// Software is furnished to do so, subject to the following conditions: |
// | // |
// THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN |
// The above copyright notice and this permission notice shall be included |
// ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED |
// in all copies or substantial portions of the Software. |
// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT |
|
// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR |
|
// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT |
|
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN |
|
// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION |
|
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|
// | // |
//============================================================================== |
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS |
|
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF |
|
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. |
|
// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY |
|
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, |
|
// TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE |
|
// SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. |
|
// |
|
////////////////////////////////////////////////////////////////////////// |
// | // |
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
| |
|
|
#include <Pegasus/Common/Tracer.h> | #include <Pegasus/Common/Tracer.h> |
#include <Pegasus/Common/MessageLoader.h> | #include <Pegasus/Common/MessageLoader.h> |
| |
|
PEGASUS_USING_STD; |
|
|
PEGASUS_NAMESPACE_BEGIN | PEGASUS_NAMESPACE_BEGIN |
| |
cimom *MessageQueueService::_meta_dispatcher = 0; | cimom *MessageQueueService::_meta_dispatcher = 0; |
|
|
void* parm) | void* parm) |
{ | { |
Thread *myself = reinterpret_cast<Thread *>(parm); | Thread *myself = reinterpret_cast<Thread *>(parm); |
List<MessageQueueService, Mutex> *list = |
MessageQueueService::PollingList *list = |
reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm()); |
reinterpret_cast<MessageQueueService::PollingList*>(myself->get_parm()); |
| |
|
try |
|
{ |
while (_stop_polling.get() == 0) | while (_stop_polling.get() == 0) |
{ | { |
_polling_sem.wait(); | _polling_sem.wait(); |
|
|
// processing the _polling_list | // processing the _polling_list |
// (e.g., MessageQueueServer::~MessageQueueService). | // (e.g., MessageQueueServer::~MessageQueueService). |
| |
list->lock(); |
_polling_list_mutex.lock(); |
MessageQueueService *service = list->front(); | MessageQueueService *service = list->front(); |
ThreadStatus rtn = PEGASUS_THREAD_OK; | ThreadStatus rtn = PEGASUS_THREAD_OK; |
while (service != NULL) | while (service != NULL) |
|
|
// lock and has ownership of the service object. | // lock and has ownership of the service object. |
| |
service->_threads++; | service->_threads++; |
try |
|
{ |
|
rtn = _thread_pool->allocate_and_awaken( | rtn = _thread_pool->allocate_and_awaken( |
service, _req_proc, &_polling_sem); | service, _req_proc, &_polling_sem); |
} |
|
catch (...) |
|
{ |
|
service->_threads--; |
|
|
|
// allocate_and_awaken should never generate an exception. |
|
PEGASUS_ASSERT(0); |
|
} |
|
// if no more threads available, break from processing loop | // if no more threads available, break from processing loop |
if (rtn != PEGASUS_THREAD_OK ) | if (rtn != PEGASUS_THREAD_OK ) |
{ | { |
|
|
service->_threads.get())); | service->_threads.get())); |
| |
Threads::yield(); | Threads::yield(); |
service = NULL; |
break; |
} | } |
} | } |
if (service != NULL) |
|
{ |
|
service = list->next_of(service); | service = list->next_of(service); |
} | } |
|
_polling_list_mutex.unlock(); |
|
} |
|
} |
|
catch(const Exception &e) |
|
{ |
|
PEG_TRACE((TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1, |
|
"Exception caught in MessageQueueService::polling_routine : %s", |
|
(const char*)e.getMessage().getCString())); |
|
} |
|
catch(const exception &e) |
|
{ |
|
PEG_TRACE((TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1, |
|
"Exception caught in MessageQueueService::polling_routine : %s", |
|
e.what())); |
} | } |
list->unlock(); |
catch(...) |
|
{ |
|
PEG_TRACE_CSTRING(TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1, |
|
"Unknown Exception caught in MessageQueueService::polling_routine"); |
} | } |
|
|
|
PEGASUS_ASSERT(_stop_polling.get()); |
|
|
return ThreadReturnType(0); | return ThreadReturnType(0); |
} | } |
| |
|
|
} | } |
_service_count++; | _service_count++; |
| |
_get_polling_list()->insert_back(this); |
// Add to the polling list |
|
if (!_polling_list) |
|
{ |
|
_polling_list = new PollingList; |
|
} |
|
_polling_list->insert_back(this); |
|
_meta_dispatcher->registerCIMService(this); |
} | } |
| |
| |
MessageQueueService::~MessageQueueService() | MessageQueueService::~MessageQueueService() |
{ | { |
_die = 1; |
|
|
|
// The polling_routine locks the _polling_list while |
|
// processing the incoming messages for services on the |
|
// list. Deleting the service from the _polling_list |
|
// prior to processing, avoids synchronization issues |
|
// with the _polling_routine. |
|
| |
// ATTN: added to prevent assertion in List in which the list does not |
// Close incoming queue. |
// contain this element. |
|
|
|
if (_get_polling_list()->contains(this)) |
|
_get_polling_list()->remove(this); |
|
|
|
// ATTN: The code for closing the _incoming queue |
|
// is not working correctly. In OpenPegasus 2.5, |
|
// execution of the following code is very timing |
|
// dependent. This needs to be fix. |
|
// See Bug 4079 for details. |
|
if (_incoming_queue_shutdown.get() == 0) | if (_incoming_queue_shutdown.get() == 0) |
{ | { |
_shutdown_incoming_queue(); |
AsyncIoClose *msg = new AsyncIoClose( |
|
0, |
|
_queueId, |
|
_queueId, |
|
true); |
|
SendForget(msg); |
|
// Wait until our queue has been shutdown. |
|
while (_incoming_queue_shutdown.get() == 0) |
|
{ |
|
Threads::yield(); |
|
} |
} | } |
| |
|
// die now. |
|
_die = 1; |
|
|
|
_meta_dispatcher->deregisterCIMService(this); |
|
|
// Wait until all threads processing the messages | // Wait until all threads processing the messages |
// for this service have completed. | // for this service have completed. |
|
|
while (_threads.get() > 0) | while (_threads.get() > 0) |
{ | { |
Threads::yield(); | Threads::yield(); |
} | } |
| |
|
|
|
// The polling_routine locks the _polling_list while |
|
// processing the incoming messages for services on the |
|
// list. Deleting the service from the _polling_list |
|
// prior to processing, avoids synchronization issues |
|
// with the _polling_routine. |
|
_removeFromPollingList(this); |
|
|
{ | { |
AutoMutex autoMut(_meta_dispatcher_mutex); | AutoMutex autoMut(_meta_dispatcher_mutex); |
|
|
_service_count--; | _service_count--; |
|
// If we are last service to die, delete metadispatcher. |
if (_service_count.get() == 0) | if (_service_count.get() == 0) |
{ | { |
|
|
_stop_polling++; | _stop_polling++; |
_polling_sem.signal(); | _polling_sem.signal(); |
if (_polling_thread) | if (_polling_thread) |
|
|
delete _polling_thread; | delete _polling_thread; |
_polling_thread = 0; | _polling_thread = 0; |
} | } |
_meta_dispatcher->_shutdown_routed_queue(); |
|
delete _meta_dispatcher; | delete _meta_dispatcher; |
_meta_dispatcher = 0; | _meta_dispatcher = 0; |
| |
delete _thread_pool; | delete _thread_pool; |
_thread_pool = 0; | _thread_pool = 0; |
} | } |
} // mutex unlocks here |
} |
| |
// Clean up any extra stuff on the queue. | // Clean up any extra stuff on the queue. |
AsyncOpNode* op = 0; | AsyncOpNode* op = 0; |
|
|
} | } |
} | } |
| |
void MessageQueueService::_shutdown_incoming_queue() |
|
{ |
|
if (_incoming_queue_shutdown.get() > 0) |
|
return; |
|
|
|
AsyncIoctl *msg = new AsyncIoctl( |
|
0, |
|
_queueId, |
|
_queueId, |
|
true, |
|
AsyncIoctl::IO_CLOSE, |
|
0, |
|
0); |
|
|
|
msg->op = get_op(); |
|
msg->op->_flags = ASYNC_OPFLAGS_FIRE_AND_FORGET; |
|
|
|
msg->op->_op_dest = this; |
|
msg->op->_request.reset(msg); |
|
if (_incoming.enqueue(msg->op)) |
|
{ |
|
_polling_sem.signal(); |
|
} |
|
else |
|
{ |
|
// This means the queue has already been shut-down (happens when there |
|
// are two AsyncIoctrl::IO_CLOSE messages generated and one got first |
|
// processed. |
|
delete msg; |
|
} |
|
} |
|
|
|
|
|
void MessageQueueService::enqueue(Message* msg) | void MessageQueueService::enqueue(Message* msg) |
{ | { |
PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()"); | PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()"); |
|
|
void MessageQueueService::_handle_async_request(AsyncRequest *req) | void MessageQueueService::_handle_async_request(AsyncRequest *req) |
{ | { |
MessageType type = req->getType(); | MessageType type = req->getType(); |
if (type == ASYNC_IOCTL) |
if (type == ASYNC_IOCLOSE) |
{ | { |
handle_AsyncIoctl(static_cast<AsyncIoctl *>(req)); |
handle_AsyncIoClose(static_cast<AsyncIoClose*>(req)); |
} | } |
else if (type == ASYNC_CIMSERVICE_START) | else if (type == ASYNC_CIMSERVICE_START) |
{ | { |
|
|
cimom::_make_response(req, code); | cimom::_make_response(req, code); |
} | } |
| |
|
|
void MessageQueueService::_completeAsyncResponse( | void MessageQueueService::_completeAsyncResponse( |
AsyncRequest* request, | AsyncRequest* request, |
AsyncReply* reply) | AsyncReply* reply) |
|
|
| |
Boolean MessageQueueService::accept_async(AsyncOpNode* op) | Boolean MessageQueueService::accept_async(AsyncOpNode* op) |
{ | { |
|
if (!_isRunning) |
|
{ |
|
// Don't accept any messages other than start. |
|
if (op->_request.get()->getType() != ASYNC_CIMSERVICE_START) |
|
{ |
|
return false; |
|
} |
|
} |
|
|
if (_incoming_queue_shutdown.get() > 0) | if (_incoming_queue_shutdown.get() > 0) |
return false; | return false; |
|
|
if (_polling_thread == NULL) | if (_polling_thread == NULL) |
{ | { |
|
PEGASUS_ASSERT(_polling_list); |
_polling_thread = new Thread( | _polling_thread = new Thread( |
polling_routine, | polling_routine, |
reinterpret_cast<void *>(_get_polling_list()), |
reinterpret_cast<void *>(_polling_list), |
false); | false); |
ThreadStatus tr = PEGASUS_THREAD_OK; | ThreadStatus tr = PEGASUS_THREAD_OK; |
while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK) | while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK) |
|
|
return false; | return false; |
} | } |
| |
void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req) |
void MessageQueueService::handle_AsyncIoClose(AsyncIoClose *req) |
{ |
|
switch (req->ctl) |
|
{ |
|
case AsyncIoctl::IO_CLOSE: |
|
{ | { |
MessageQueueService *service = | MessageQueueService *service = |
static_cast<MessageQueueService *>(req->op->_service_ptr); |
static_cast<MessageQueueService*>(req->op->_op_dest); |
| |
#ifdef MESSAGEQUEUESERVICE_DEBUG | #ifdef MESSAGEQUEUESERVICE_DEBUG |
PEGASUS_STD(cout) << service->getQueueName() << | PEGASUS_STD(cout) << service->getQueueName() << |
" Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl); |
" Received AsyncIoClose " << PEGASUS_STD(endl); |
#endif | #endif |
|
// set the closing flag, don't accept any more messages |
|
service->_incoming_queue_shutdown = 1; |
| |
// respond to this message. this is fire and forget, so we | // respond to this message. this is fire and forget, so we |
// don't need to delete anything. | // don't need to delete anything. |
// this takes care of two problems that were being found | // this takes care of two problems that were being found |
// << Thu Oct 9 10:52:48 2003 mdd >> | // << Thu Oct 9 10:52:48 2003 mdd >> |
_make_response(req, async_results::OK); | _make_response(req, async_results::OK); |
// ensure we do not accept any further messages |
|
|
|
// ensure we don't recurse on IO_CLOSE |
|
if (_incoming_queue_shutdown.get() > 0) |
|
break; |
|
|
|
// set the closing flag |
|
service->_incoming_queue_shutdown = 1; |
|
// empty out the queue |
|
while (1) |
|
{ |
|
AsyncOpNode* operation = 0; |
|
try |
|
{ |
|
operation = service->_incoming.dequeue(); |
|
} |
|
catch (...) |
|
{ |
|
break; |
|
} |
|
if (operation) |
|
{ |
|
operation->_service_ptr = service; |
|
service->_handle_incoming_operation(operation); |
|
} |
|
else |
|
break; |
|
} // message processing loop |
|
|
|
// shutdown the AsyncQueue |
|
service->_incoming.close(); |
|
return; |
|
} |
|
|
|
default: |
|
_make_response(req, async_results::CIM_NAK); |
|
} |
|
} | } |
| |
void MessageQueueService::handle_CimServiceStart(CimServiceStart* req) | void MessageQueueService::handle_CimServiceStart(CimServiceStart* req) |
|
|
return queue->getQueueId(); | return queue->getQueueId(); |
} | } |
| |
MessageQueueService::PollingList* MessageQueueService::_get_polling_list() |
void MessageQueueService::_removeFromPollingList(MessageQueueService *service) |
{ | { |
_polling_list_mutex.lock(); | _polling_list_mutex.lock(); |
|
_polling_list->remove(service); |
if (!_polling_list) |
|
_polling_list = new PollingList; |
|
|
|
_polling_list_mutex.unlock(); | _polling_list_mutex.unlock(); |
|
|
return _polling_list; |
|
} | } |
| |
PEGASUS_NAMESPACE_END | PEGASUS_NAMESPACE_END |