version 1.8, 2002/02/04 02:52:06
|
version 1.13, 2002/02/06 15:15:35
|
|
|
| |
#include "MessageQueueService.h" | #include "MessageQueueService.h" |
| |
|
PEGASUS_USING_STD; |
|
|
PEGASUS_NAMESPACE_BEGIN | PEGASUS_NAMESPACE_BEGIN |
| |
MessageQueueService::MessageQueueService(const char *name, | MessageQueueService::MessageQueueService(const char *name, |
|
|
void MessageQueueService::_shutdown_incoming_queue(void) | void MessageQueueService::_shutdown_incoming_queue(void) |
{ | { |
| |
|
if (_incoming_queue_shutdown.value() > 0 ) |
|
return ; |
|
|
AsyncIoctl *msg = new AsyncIoctl(get_next_xid(), | AsyncIoctl *msg = new AsyncIoctl(get_next_xid(), |
0, | 0, |
_queueId, | _queueId, |
|
|
AsyncIoctl::IO_CLOSE, | AsyncIoctl::IO_CLOSE, |
0, | 0, |
0); | 0); |
|
|
msg->op = get_op(); | msg->op = get_op(); |
msg->op->_request.insert_first(msg); | msg->op->_request.insert_first(msg); |
Boolean closed = false; |
|
| |
if (_incoming_queue_shutdown.value() > 0 ) |
|
return ; |
|
| |
_incoming.insert_last_wait(msg->op); | _incoming.insert_last_wait(msg->op); |
msg->op->_client_sem.wait(); | msg->op->_client_sem.wait(); |
|
|
AsyncReply * reply = static_cast<AsyncReply *>(msg->op->_response.remove_first()); | AsyncReply * reply = static_cast<AsyncReply *>(msg->op->_response.remove_first()); |
reply->op = 0; | reply->op = 0; |
msg->op->unlock(); | msg->op->unlock(); |
if ( reply != 0 ) |
|
{ |
|
if(reply->getMask() & message_mask:: ha_async) |
|
{ |
|
if(reply->getMask() & message_mask::ha_reply) |
|
{ |
|
if(reply->result == async_results::OK) |
|
closed = true; |
|
} |
|
} |
|
delete reply; | delete reply; |
} |
|
| |
msg->op->_request.remove(msg); | msg->op->_request.remove(msg); |
msg->op->_state |= ASYNC_OPSTATE_RELEASED; | msg->op->_state |= ASYNC_OPSTATE_RELEASED; |
|
|
return_op(msg->op); | return_op(msg->op); |
|
|
msg->op = 0; | msg->op = 0; |
delete msg; | delete msg; |
} | } |
|
|
| |
service->_handle_incoming_operation(operation, myself, service); | service->_handle_incoming_operation(operation, myself, service); |
} | } |
|
|
} | } |
| |
myself->exit_self( (PEGASUS_THREAD_RETURN) 1 ); | myself->exit_self( (PEGASUS_THREAD_RETURN) 1 ); |
|
|
else if (type == async_messages::CIMSERVICE_STOP) | else if (type == async_messages::CIMSERVICE_STOP) |
handle_CimServiceStop(static_cast<CimServiceStop *>(req)); | handle_CimServiceStop(static_cast<CimServiceStop *>(req)); |
else if (type == async_messages::CIMSERVICE_PAUSE) | else if (type == async_messages::CIMSERVICE_PAUSE) |
|
{ |
handle_CimServicePause(static_cast<CimServicePause *>(req)); | handle_CimServicePause(static_cast<CimServicePause *>(req)); |
|
} |
|
|
else if (type == async_messages::CIMSERVICE_RESUME) | else if (type == async_messages::CIMSERVICE_RESUME) |
handle_CimServiceResume(static_cast<CimServiceResume *>(req)); | handle_CimServiceResume(static_cast<CimServiceResume *>(req)); |
else if ( type == async_messages::ASYNC_OP_START) | else if ( type == async_messages::ASYNC_OP_START) |
|
|
| |
void MessageQueueService::handle_CimServiceStart(CimServiceStart *req) | void MessageQueueService::handle_CimServiceStart(CimServiceStart *req) |
{ | { |
_make_response(req, async_results::CIM_NAK); |
// clear the stoped bit and update |
|
_capabilities &= (~(module_capabilities::stopped)); |
|
_make_response(req, async_results::OK); |
|
// now tell the meta dispatcher we are stopped |
|
update_service(_capabilities, _mask); |
|
|
} | } |
void MessageQueueService::handle_CimServiceStop(CimServiceStop *req) | void MessageQueueService::handle_CimServiceStop(CimServiceStop *req) |
{ | { |
_make_response(req, async_results::CIM_NAK); |
// set the stopeed bit and update |
|
_capabilities |= module_capabilities::stopped; |
|
_make_response(req, async_results::CIM_STOPPED); |
|
// now tell the meta dispatcher we are stopped |
|
update_service(_capabilities, _mask); |
|
|
} | } |
void MessageQueueService::handle_CimServicePause(CimServicePause *req) | void MessageQueueService::handle_CimServicePause(CimServicePause *req) |
{ | { |
_make_response(req, async_results::CIM_NAK); |
// set the paused bit and update |
|
_capabilities |= module_capabilities::paused; |
|
update_service(_capabilities, _mask); |
|
_make_response(req, async_results::CIM_PAUSED); |
|
// now tell the meta dispatcher we are stopped |
} | } |
void MessageQueueService::handle_CimServiceResume(CimServiceResume *req) | void MessageQueueService::handle_CimServiceResume(CimServiceResume *req) |
{ | { |
_make_response(req, async_results::CIM_NAK); |
// clear the paused bit and update |
|
_capabilities &= (~(module_capabilities::paused)); |
|
update_service(_capabilities, _mask); |
|
_make_response(req, async_results::OK); |
|
// now tell the meta dispatcher we are stopped |
} | } |
| |
void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req) | void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req) |
{ | { |
_make_response(req, async_results::CIM_NAK); | _make_response(req, async_results::CIM_NAK); |
|
|
} | } |
| |
void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req) | void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req) |
{ | { |
; |
|
} | } |
| |
AsyncOpNode *MessageQueueService::get_op(void) | AsyncOpNode *MessageQueueService::get_op(void) |
{ | { |
AsyncOpNode *op = new AsyncOpNode(); | AsyncOpNode *op = new AsyncOpNode(); |
| |
op->write_state(ASYNC_OPSTATE_UNKNOWN); |
op->_state = ASYNC_OPSTATE_UNKNOWN; |
op->write_flags(ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL ); |
op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL; |
| |
return op; | return op; |
} | } |