version 1.6, 2002/02/02 01:03:31
|
version 1.7, 2002/02/02 17:58:13
|
|
|
_die(0), | _die(0), |
_pending(true), | _pending(true), |
_incoming(true, 1000), | _incoming(true, 1000), |
|
_incoming_queue_shutdown(0), |
_req_thread(_req_proc, this, false) | _req_thread(_req_proc, this, false) |
{ | { |
_default_op_timeout.tv_sec = 30; | _default_op_timeout.tv_sec = 30; |
|
|
MessageQueueService::~MessageQueueService(void) | MessageQueueService::~MessageQueueService(void) |
{ | { |
_die = 1; | _die = 1; |
|
if (_incoming_queue_shutdown.value() == 0 ) |
_incoming.shutdown_queue(); | _incoming.shutdown_queue(); |
| |
_req_thread.join(); | _req_thread.join(); |
|
|
| |
AtomicInt MessageQueueService::_xid(1); | AtomicInt MessageQueueService::_xid(1); |
| |
|
void MessageQueueService::_shutdown_incoming_queue(void) |
|
{ |
|
_incoming_queue_shutdown = 1; |
|
|
|
_incoming.shutdown_queue(); |
|
_req_thread.cancel(); |
|
} |
|
|
| |
PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) | PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) |
{ | { |
|
|
| |
// pull messages off the incoming queue and dispatch them. then | // pull messages off the incoming queue and dispatch them. then |
// check pending messages that are non-blocking | // check pending messages that are non-blocking |
|
AsyncOpNode *operation = 0; |
| |
while ( service->_die.value() == 0 ) | while ( service->_die.value() == 0 ) |
{ | { |
AsyncOpNode *operation = service->_incoming.remove_first_wait(); |
try |
if ( operation == 0 ) |
{ |
|
operation = service->_incoming.remove_first(); |
|
} |
|
catch(ListClosed & ) |
|
{ |
break; | break; |
|
} |
|
if ( service->_incoming.is_shutdown() || service->_die.value() ) |
|
break; |
|
if( operation ) |
service->_handle_incoming_operation(operation); | service->_handle_incoming_operation(operation); |
|
else |
|
pegasus_yield(); |
} | } |
| |
myself->exit_self( (PEGASUS_THREAD_RETURN) 1 ); | myself->exit_self( (PEGASUS_THREAD_RETURN) 1 ); |
|
|
op->_state |= state ; | op->_state |= state ; |
op->_flags |= flag; | op->_flags |= flag; |
gettimeofday(&(op->_updated), NULL); | gettimeofday(&(op->_updated), NULL); |
if ( false == op->_request.exists(reinterpret_cast<void *>(reply)) ) |
if ( false == op->_response.exists(reinterpret_cast<void *>(reply)) ) |
op->_request.insert_last(reply); |
op->_response.insert_last(reply); |
op->unlock(); | op->unlock(); |
| |
op->_client_sem.signal(); | op->_client_sem.signal(); |
|
|
if (request->op == false) | if (request->op == false) |
{ | { |
request->op = get_op(); | request->op = get_op(); |
request->op->put_request(request); |
request->op->_request.insert_first(request); |
|
|
destroy_op = true; | destroy_op = true; |
} | } |
| |
|
|
request->op->unlock(); | request->op->unlock(); |
| |
return_op(request->op); | return_op(request->op); |
|
request->op = 0; |
// delete request->op; |
|
// request->op = 0; |
|
} | } |
| |
return rpl; | return rpl; |
|
|
mask, | mask, |
_queueId); | _queueId); |
Boolean registered = false; | Boolean registered = false; |
AsyncMessage *reply = SendWait( msg ); |
AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg )); |
| |
if ( reply != 0 ) | if ( reply != 0 ) |
{ | { |
|
|
{ | { |
if(reply->getMask() & message_mask::ha_reply) | if(reply->getMask() & message_mask::ha_reply) |
{ | { |
if((static_cast<AsyncReply *>(reply))->result == async_results::OK) |
if(reply->result == async_results::OK) |
registered = true; | registered = true; |
} | } |
} | } |