version 1.145, 2008/09/17 18:47:22
|
version 1.146, 2008/10/14 17:25:58
|
|
|
{ | { |
_capabilities = (capabilities | module_capabilities::async); | _capabilities = (capabilities | module_capabilities::async); |
| |
_default_op_timeout.tv_sec = 30; |
|
_default_op_timeout.tv_usec = 100; |
|
|
|
max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE; | max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE; |
| |
// if requested thread max is out of range, then set to | // if requested thread max is out of range, then set to |
|
|
} | } |
_service_count++; | _service_count++; |
| |
if (false == register_service(name, _capabilities, _mask)) |
|
{ |
|
MessageLoaderParms parms( |
|
"Common.MessageQueueService.UNABLE_TO_REGISTER", |
|
"CIM base message queue service is unable to register with the " |
|
"CIMOM dispatcher."); |
|
throw Exception(parms); |
|
} |
|
|
|
_get_polling_list()->insert_back(this); | _get_polling_list()->insert_back(this); |
} | } |
| |
|
|
req->op->processing(); | req->op->processing(); |
| |
MessageType type = req->getType(); | MessageType type = req->getType(); |
if (type == ASYNC_HEARTBEAT) |
if (type == ASYNC_IOCTL) |
handle_heartbeat_request(req); |
|
else if (type == ASYNC_IOCTL) |
|
handle_AsyncIoctl(static_cast<AsyncIoctl *>(req)); | handle_AsyncIoctl(static_cast<AsyncIoctl *>(req)); |
else if (type == ASYNC_CIMSERVICE_START) | else if (type == ASYNC_CIMSERVICE_START) |
handle_CimServiceStart(static_cast<CimServiceStart *>(req)); | handle_CimServiceStart(static_cast<CimServiceStart *>(req)); |
else if (type == ASYNC_CIMSERVICE_STOP) | else if (type == ASYNC_CIMSERVICE_STOP) |
handle_CimServiceStop(static_cast<CimServiceStop *>(req)); | handle_CimServiceStop(static_cast<CimServiceStop *>(req)); |
else if (type == ASYNC_CIMSERVICE_PAUSE) |
|
handle_CimServicePause(static_cast<CimServicePause *>(req)); |
|
else if (type == ASYNC_CIMSERVICE_RESUME) |
|
handle_CimServiceResume(static_cast<CimServiceResume *>(req)); |
|
else if (type == ASYNC_ASYNC_OP_START) |
|
handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req)); |
|
else | else |
{ | { |
// we don't handle this request message | // we don't handle this request message |
|
|
return true; | return true; |
} | } |
| |
void MessageQueueService::handle_heartbeat_request(AsyncRequest* req) |
|
{ |
|
// default action is to echo a heartbeat response |
|
|
|
AsyncReply *reply = new AsyncReply( |
|
ASYNC_HEARTBEAT, |
|
0, |
|
req->op, |
|
async_results::OK, |
|
req->resp, |
|
false); |
|
_completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0); |
|
} |
|
|
|
|
|
void MessageQueueService::handle_heartbeat_reply(AsyncReply* rep) |
|
{ |
|
} |
|
|
|
void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req) | void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req) |
{ | { |
switch (req->ctl) | switch (req->ctl) |
|
|
// clear the stoped bit and update | // clear the stoped bit and update |
_capabilities &= (~(module_capabilities::stopped)); | _capabilities &= (~(module_capabilities::stopped)); |
_make_response(req, async_results::OK); | _make_response(req, async_results::OK); |
// now tell the meta dispatcher we are stopped |
|
update_service(_capabilities, _mask); |
|
} | } |
| |
void MessageQueueService::handle_CimServiceStop(CimServiceStop* req) | void MessageQueueService::handle_CimServiceStop(CimServiceStop* req) |
|
|
// set the stopeed bit and update | // set the stopeed bit and update |
_capabilities |= module_capabilities::stopped; | _capabilities |= module_capabilities::stopped; |
_make_response(req, async_results::CIM_STOPPED); | _make_response(req, async_results::CIM_STOPPED); |
// now tell the meta dispatcher we are stopped |
|
update_service(_capabilities, _mask); |
|
} |
|
|
|
void MessageQueueService::handle_CimServicePause(CimServicePause* req) |
|
{ |
|
// set the paused bit and update |
|
_capabilities |= module_capabilities::paused; |
|
update_service(_capabilities, _mask); |
|
_make_response(req, async_results::CIM_PAUSED); |
|
// now tell the meta dispatcher we are stopped |
|
} |
|
|
|
void MessageQueueService::handle_CimServiceResume(CimServiceResume* req) |
|
{ |
|
// clear the paused bit and update |
|
_capabilities &= (~(module_capabilities::paused)); |
|
update_service(_capabilities, _mask); |
|
_make_response(req, async_results::OK); |
|
// now tell the meta dispatcher we are stopped |
|
} |
|
|
|
void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart* req) |
|
{ |
|
_make_response(req, async_results::CIM_NAK); |
|
} |
|
|
|
void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult* req) |
|
{ |
|
} | } |
| |
AsyncOpNode* MessageQueueService::get_op() | AsyncOpNode* MessageQueueService::get_op() |
|
|
return _meta_dispatcher->route_async(op); | return _meta_dispatcher->route_async(op); |
} | } |
| |
|
|
Boolean MessageQueueService::SendAsync( |
|
Message* msg, |
|
Uint32 destination, |
|
void (*callback)(Message* response, void* handle, void* parameter), |
|
void* handle, |
|
void* parameter) |
|
{ |
|
if (msg == NULL) |
|
return false; |
|
if (callback == NULL) |
|
return SendForget(msg); |
|
AsyncOpNode *op = get_op(); |
|
msg->dest = destination; |
|
if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest))) |
|
{ |
|
op->release(); |
|
return_op(op); |
|
return false; |
|
} |
|
op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK; |
|
op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET); |
|
op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
|
op->__async_callback = callback; |
|
op->_callback_node = op; |
|
op->_callback_handle = handle; |
|
op->_callback_parameter = parameter; |
|
op->_callback_response_q = this; |
|
|
|
if (!(msg->getMask() & MessageMask::ha_async)) |
|
{ |
|
AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart( |
|
op, |
|
destination, |
|
msg); |
|
} |
|
else |
|
{ |
|
op->_request.reset(msg); |
|
(static_cast<AsyncMessage *>(msg))->op = op; |
|
} |
|
return _meta_dispatcher->route_async(op); |
|
} |
|
|
|
|
|
Boolean MessageQueueService::SendForget(Message* msg) | Boolean MessageQueueService::SendForget(Message* msg) |
{ | { |
AsyncOpNode* op = 0; | AsyncOpNode* op = 0; |
|
|
return rpl; | return rpl; |
} | } |
| |
|
Uint32 MessageQueueService::find_service_qid(const String &name) |
Boolean MessageQueueService::register_service( |
|
String name, |
|
Uint32 capabilities, |
|
Uint32 mask) |
|
{ |
|
RegisterCimService *msg = new RegisterCimService( |
|
0, |
|
true, |
|
name, |
|
capabilities, |
|
mask, |
|
_queueId); |
|
msg->dest = CIMOM_Q_ID; |
|
|
|
Boolean registered = false; |
|
AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg)); |
|
|
|
if (reply != 0) |
|
{ |
|
if (reply->getMask() & MessageMask::ha_async) |
|
{ |
|
if (reply->getMask() & MessageMask::ha_reply) |
|
{ |
|
if (reply->result == async_results::OK || |
|
reply->result == async_results::MODULE_ALREADY_REGISTERED) |
|
{ |
|
registered = true; |
|
} |
|
} |
|
} |
|
|
|
delete reply; |
|
} |
|
delete msg; |
|
return registered; |
|
} |
|
|
|
Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask) |
|
{ |
|
UpdateCimService *msg = new UpdateCimService( |
|
0, |
|
true, |
|
_queueId, |
|
_capabilities, |
|
_mask); |
|
Boolean registered = false; |
|
|
|
AsyncMessage* reply = SendWait(msg); |
|
if (reply) |
|
{ |
|
if (reply->getMask() & MessageMask::ha_async) |
|
{ |
|
if (reply->getMask() & MessageMask::ha_reply) |
|
{ |
|
if (static_cast<AsyncReply *>(reply)->result == |
|
async_results::OK) |
|
{ |
|
registered = true; |
|
} |
|
} |
|
} |
|
delete reply; |
|
} |
|
delete msg; |
|
return registered; |
|
} |
|
|
|
|
|
Boolean MessageQueueService::deregister_service() |
|
{ |
|
_meta_dispatcher->deregister_module(_queueId); |
|
return true; |
|
} |
|
|
|
|
|
void MessageQueueService::find_services( |
|
String name, |
|
Uint32 capabilities, |
|
Uint32 mask, |
|
Array<Uint32>* results) |
|
{ |
|
if (results == 0) |
|
{ |
|
throw NullPointer(); |
|
} |
|
|
|
results->clear(); |
|
|
|
FindServiceQueue *req = new FindServiceQueue( |
|
0, |
|
_queueId, |
|
true, |
|
name, |
|
capabilities, |
|
mask); |
|
|
|
req->dest = CIMOM_Q_ID; |
|
|
|
AsyncMessage *reply = SendWait(req); |
|
if (reply) |
|
{ |
|
if (reply->getMask() & MessageMask::ha_async) |
|
{ |
|
if (reply->getMask() & MessageMask::ha_reply) |
|
{ |
|
if (reply->getType() == ASYNC_FIND_SERVICE_Q_RESULT) |
|
{ |
|
if ((static_cast<FindServiceQueueResult*>(reply))->result == |
|
async_results::OK) |
|
*results = |
|
(static_cast<FindServiceQueueResult*>(reply))->qids; |
|
} |
|
} |
|
} |
|
delete reply; |
|
} |
|
delete req; |
|
} |
|
|
|
void MessageQueueService::enumerate_service( |
|
Uint32 queue, |
|
message_module* result) |
|
{ |
|
if (result == 0) |
|
{ |
|
throw NullPointer(); |
|
} |
|
|
|
EnumerateService *req = new EnumerateService( |
|
0, |
|
_queueId, |
|
true, |
|
queue); |
|
|
|
AsyncMessage* reply = SendWait(req); |
|
|
|
if (reply) |
|
{ |
|
Boolean found = false; |
|
|
|
if (reply->getMask() & MessageMask::ha_async) |
|
{ |
|
if (reply->getMask() & MessageMask::ha_reply) |
|
{ |
|
if (reply->getType() == ASYNC_ENUMERATE_SERVICE_RESULT) |
|
{ |
|
if ((static_cast<EnumerateServiceResponse*>(reply))-> |
|
result == async_results::OK) |
|
{ | { |
if (found == false) |
MessageQueue *queue = MessageQueue::lookup((const char*)name.getCString()); |
{ |
PEGASUS_ASSERT(queue); |
found = true; |
return queue->getQueueId(); |
|
|
result->put_name((static_cast< |
|
EnumerateServiceResponse*>(reply))->name); |
|
result->put_capabilities((static_cast< |
|
EnumerateServiceResponse*>(reply))-> |
|
capabilities); |
|
result->put_mask((static_cast< |
|
EnumerateServiceResponse*>(reply))->mask); |
|
result->put_queue((static_cast< |
|
EnumerateServiceResponse*>(reply))->qid); |
|
} |
|
} |
|
} |
|
} |
|
} |
|
delete reply; |
|
} |
|
delete req; |
|
} | } |
| |
MessageQueueService::PollingList* MessageQueueService::_get_polling_list() | MessageQueueService::PollingList* MessageQueueService::_get_polling_list() |