version 1.70.2.3, 2003/08/08 20:31:17
|
version 1.71, 2003/07/31 14:33:14
|
|
|
| |
#include "MessageQueueService.h" | #include "MessageQueueService.h" |
#include <Pegasus/Common/Tracer.h> | #include <Pegasus/Common/Tracer.h> |
|
#include <Pegasus/Common/MessageLoader.h> //l10n |
| |
PEGASUS_NAMESPACE_BEGIN | PEGASUS_NAMESPACE_BEGIN |
| |
|
|
| |
void MessageQueueService::force_shutdown(void) | void MessageQueueService::force_shutdown(void) |
{ | { |
PEGASUS_STD(cout) << "Forcing shutdown of CIMOM Message Router" << PEGASUS_STD(endl); |
//l10n |
//MessageQueueService::_stop_polling = 1; |
//PEGASUS_STD(cout) << "Forcing shutdown of CIMOM Message Router" << PEGASUS_STD(endl); |
|
MessageLoaderParms parms("Common.MessageQueueService.FORCING_SHUTDOWN", |
|
"Forcing shutdown of $0", |
|
"CIMOM Message Router"); |
|
PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl); |
|
MessageQueueService::_stop_polling = 1; |
MessageQueueService *svc; | MessageQueueService *svc; |
int counter = 0; |
|
_polling_list.lock(); | _polling_list.lock(); |
svc = _polling_list.next(0); | svc = _polling_list.next(0); |
|
|
while(svc != 0) | while(svc != 0) |
{ | { |
PEGASUS_STD(cout) << "Stopping " << svc->getQueueName() << PEGASUS_STD(endl); |
//l10n - reuse same MessageLoaderParms to avoid multiple creates |
|
//PEGASUS_STD(cout) << "Stopping " << svc->getQueueName() << PEGASUS_STD(endl); |
|
parms.msg_id = "Common.MessageQueueService.STOPPING_SERVICE"; |
|
parms.default_msg = "Stopping $0"; |
|
parms.arg0 = svc->getQueueName(); |
|
PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl); |
|
|
_polling_sem.signal(); | _polling_sem.signal(); |
svc->_shutdown_incoming_queue(); | svc->_shutdown_incoming_queue(); |
counter++; |
|
_polling_sem.signal(); | _polling_sem.signal(); |
svc = _polling_list.next(svc); | svc = _polling_list.next(svc); |
} | } |
_polling_list.unlock(); | _polling_list.unlock(); |
_polling_sem.signal(); |
|
|
|
while ( counter != 0) { |
|
Thread::sleep(100); |
|
_polling_list.lock(); |
|
svc = _polling_list.next(0); |
|
while (svc != 0 ) { |
|
if (svc ->_incoming_queue_shutdown.value() == 1 ) { |
|
counter--; |
|
} |
|
svc = _polling_list.next(svc); |
|
} |
|
_polling_list.unlock(); |
|
} |
|
MessageQueueService::_stop_polling = 1; |
|
} | } |
| |
| |
|
|
if(_stop_polling.value() != 0 ) | if(_stop_polling.value() != 0 ) |
{ | { |
break; | break; |
|
|
} | } |
| |
list->lock(); | list->lock(); |
|
|
if( false == register_service(name, _capabilities, _mask) ) | if( false == register_service(name, _capabilities, _mask) ) |
{ | { |
_meta_dispatcher_mutex.unlock(); | _meta_dispatcher_mutex.unlock(); |
throw BindFailedException("MessageQueueService Base Unable to register with Meta Dispatcher"); |
//l10n |
|
//throw BindFailedException("MessageQueueService Base Unable to register with Meta Dispatcher"); |
|
MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER", |
|
"$0 Unable to register with $1", |
|
"MessageQueueService Base", |
|
"Meta Dispatcher"); |
|
|
|
throw BindFailedException(parms); |
} | } |
| |
_polling_list.insert_last(this); | _polling_list.insert_last(this); |
|
|
MessageQueueService::~MessageQueueService(void) | MessageQueueService::~MessageQueueService(void) |
{ | { |
_die = 1; | _die = 1; |
// IBM-KR: This causes a new message (IO_CLOSE) to be spawned, which |
if (_incoming_queue_shutdown.value() == 0 ) |
// doesn't get picked up anyone. The idea was that the message would be |
{ |
// picked up handle_AsyncIoctl which closes the queue and does cleaning. |
_shutdown_incoming_queue(); |
// That described behavior has never surfaced itself. If it does appear, |
} |
// uncomment the if ( ..) { } block below. |
|
|
|
// Note: The handle_AsyncIcotl does get called when force_shutdown(void) gets |
|
// called during Pegasus shutdown procedure (in case you ever wondered). |
|
|
|
//if (_incoming_queue_shutdown.value() == 0 ) |
|
//{ |
|
// _shutdown_incoming_queue(); |
|
//} |
|
_callback_ready.signal(); | _callback_ready.signal(); |
// _callback_thread.join(); | // _callback_thread.join(); |
| |
|
|
} | } |
_meta_dispatcher_mutex.unlock(); | _meta_dispatcher_mutex.unlock(); |
_polling_list.remove(this); | _polling_list.remove(this); |
// Clean up in case there are extra stuff on the queue. |
|
while (_incoming.count()) |
|
{ |
|
delete _incoming.remove_first(); |
|
} |
|
} | } |
| |
void MessageQueueService::_shutdown_incoming_queue(void) | void MessageQueueService::_shutdown_incoming_queue(void) |
|
|
MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr); | MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr); |
| |
// respond to this message. | // respond to this message. |
// _make_response(req, async_results::OK); |
_make_response(req, async_results::OK); |
// ensure we do not accept any further messages | // ensure we do not accept any further messages |
| |
// ensure we don't recurse on IO_CLOSE | // ensure we don't recurse on IO_CLOSE |