version 1.17, 2002/02/12 23:28:58
|
version 1.18, 2002/02/18 12:31:29
|
|
|
{ | { |
operation->lock(); | operation->lock(); |
Message *rq = operation->_request.next(0); | Message *rq = operation->_request.next(0); |
|
PEGASUS_ASSERT(rq != 0 ); |
|
|
|
// divert legacy messages to handleEnqueue |
|
if ( ! (rq->getMask() & message_mask::ha_async) ) |
|
{ |
|
rq = operation->_request.remove_first() ; |
operation->unlock(); | operation->unlock(); |
|
// delete the op node |
|
delete operation; |
|
handleEnqueue(rq); |
|
return; |
|
} |
| |
PEGASUS_ASSERT(rq != 0 ); |
operation->unlock(); |
PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async ); |
|
PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request); |
|
static_cast<AsyncMessage *>(rq)->_myself = thread; | static_cast<AsyncMessage *>(rq)->_myself = thread; |
static_cast<AsyncMessage *>(rq)->_service = queue; | static_cast<AsyncMessage *>(rq)->_service = queue; |
_handle_async_request(static_cast<AsyncRequest *>(rq)); | _handle_async_request(static_cast<AsyncRequest *>(rq)); |
|
|
Boolean MessageQueueService::_enqueueResponse( | Boolean MessageQueueService::_enqueueResponse( |
Message* request, | Message* request, |
Message* response) | Message* response) |
|
|
{ | { |
if(request->_async != 0 ) | if(request->_async != 0 ) |
{ | { |
Uint32 mask = request->_async->getMask(); | Uint32 mask = request->_async->getMask(); |
if ( mask & message_mask::ha_async) |
PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request )); |
{ |
|
if ( mask & message_mask::ha_request) |
AsyncRequest *async = static_cast<AsyncRequest *>(request->_async); |
{ |
AsyncOpNode *op = async->op; |
AsyncOpNode *op = (static_cast<AsyncRequest *>(request->_async)->op); |
request->_async = 0; |
| |
AsyncLegacyOperationResult *async_result = | AsyncLegacyOperationResult *async_result = |
new AsyncLegacyOperationResult( | new AsyncLegacyOperationResult( |
(static_cast<AsyncRequest *>(request->_async))->getKey(), |
async->getKey(), |
(static_cast<AsyncRequest *>(request->_async))->getRouting(), |
async->getRouting(), |
op, | op, |
response); | response); |
_completeAsyncResponse(static_cast<AsyncRequest *>(request->_async), |
_completeAsyncResponse(async, |
async_result, | async_result, |
ASYNC_OPSTATE_COMPLETE, | ASYNC_OPSTATE_COMPLETE, |
0); | 0); |
return true; | return true; |
} | } |
} |
|
} |
// ensure that the destination queue is in response->dest |
return false; |
return SendForget(response); |
|
|
} | } |
| |
void MessageQueueService::_make_response(AsyncRequest *req, Uint32 code) |
void MessageQueueService::_make_response(Message *req, Uint32 code) |
{ | { |
AsyncReply *reply = |
return cimom::_make_response(req, code); |
new AsyncReply(async_messages::REPLY, |
|
req->getKey(), |
|
req->getRouting(), |
|
0, |
|
req->op, |
|
code, |
|
req->resp, |
|
false); |
|
_completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 ); |
|
} | } |
| |
| |
|
|
Uint32 state, | Uint32 state, |
Uint32 flag) | Uint32 flag) |
{ | { |
PEGASUS_ASSERT(request != 0 && reply != 0 ); |
return cimom::_completeAsyncResponse(request, reply, state, flag); |
|
|
AsyncOpNode *op = request->op; |
|
op->lock(); |
|
op->_state |= state ; |
|
op->_flags |= flag; |
|
gettimeofday(&(op->_updated), NULL); |
|
if ( false == op->_response.exists(reinterpret_cast<void *>(reply)) ) |
|
op->_response.insert_last(reply); |
|
op->unlock(); |
|
|
|
op->_client_sem.signal(); |
|
|
|
|
|
} | } |
| |
| |
|
|
{ | { |
if (_incoming_queue_shutdown.value() > 0 ) | if (_incoming_queue_shutdown.value() > 0 ) |
return false; | return false; |
|
|
if ( msg != 0 ) |
|
{ |
|
Uint32 mask = msg->getMask(); |
|
if ( mask & message_mask::ha_async) |
|
if ( mask & message_mask::ha_request) |
|
return true; | return true; |
} |
|
return false; |
|
} | } |
| |
| |
void MessageQueueService::handleEnqueue(void) |
void MessageQueueService::handleEnqueue(Message *msg) |
{ | { |
Message *msg = dequeue(); |
|
if( msg ) | if( msg ) |
{ |
|
delete msg; | delete msg; |
|
|
} | } |
|
|
|
|
|
void MessageQueueService::handleEnqueue(void) |
|
{ |
|
Message *msg = dequeue(); |
|
handleEnqueue(msg); |
} | } |
| |
void MessageQueueService::handle_heartbeat_request(AsyncRequest *req) | void MessageQueueService::handle_heartbeat_request(AsyncRequest *req) |
|
|
} | } |
| |
| |
|
Boolean MessageQueueService::SendAsync(AsyncRequest *request, |
|
void (*callback)(AsyncOpNode *, |
|
MessageQueueService *, |
|
void *)) |
|
{ |
|
|
|
return true; |
|
} |
|
|
|
|
|
Boolean MessageQueueService::SendForget(Message *msg) |
|
{ |
|
|
|
AsyncOpNode *op = 0; |
|
|
|
if (msg->getMask() & message_mask::ha_async) |
|
{ |
|
op = (static_cast<AsyncMessage *>(msg))->op ; |
|
} |
|
if( op == 0 ) |
|
op = get_op(); |
|
|
|
op->_request.insert_first(msg); |
|
op->_state &= ~ASYNC_OPSTATE_COMPLETE; |
|
op->_flags &= ASYNC_OPFLAGS_FIRE_AND_FORGET; |
|
op->put_response(0); |
|
|
|
// now see if the meta dispatcher will take it |
|
return _meta_dispatcher->route_async(op); |
|
} |
|
|
| |
AsyncReply *MessageQueueService::SendWait(AsyncRequest *request) | AsyncReply *MessageQueueService::SendWait(AsyncRequest *request) |
{ | { |