(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.21 and 1.22

version 1.21, 2002/02/19 18:21:14 version 1.22, 2002/02/20 22:00:51
Line 27 
Line 27 
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include "MessageQueueService.h" #include "MessageQueueService.h"
   #include <Pegasus/Common/Tracer.h>
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
Line 42 
Line 43 
                                          Uint32 capabilities,                                          Uint32 capabilities,
                                          Uint32 mask)                                          Uint32 mask)
    : Base(name, true,  queueID),    : Base(name, true,  queueID),
      _capabilities(capabilities),  
      _mask(mask),      _mask(mask),
      _die(0),      _die(0),
      _pending(true),      _pending(true),
Line 50 
Line 51 
      _incoming_queue_shutdown(0),      _incoming_queue_shutdown(0),
      _req_thread(_req_proc, this, false)      _req_thread(_req_proc, this, false)
 { {
      _capabilities = (capabilities | module_capabilities::async);
   
    _default_op_timeout.tv_sec = 30;    _default_op_timeout.tv_sec = 30;
    _default_op_timeout.tv_usec = 100;    _default_op_timeout.tv_usec = 100;
  
Line 70 
Line 73 
    _service_count++;    _service_count++;
  
  
   
    if( false == register_service(name, _capabilities, _mask) )    if( false == register_service(name, _capabilities, _mask) )
    {    {
       _meta_dispatcher_mutex.unlock();       _meta_dispatcher_mutex.unlock();
Line 143 
Line 145 
  
 } }
  
   
   
   void MessageQueueService::enqueue(Message *msg) throw(IPCException)
   {
      Base::enqueue(msg);
   
   //    PEGASUS_ASSERT(msg != 0 );
   
   //    cout << "inside overriden enqueue" << endl;
   //        if (!msg)
   //     {
   //        Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
   //         "MessageQueue::enqueue failure");
   //        throw NullPointer();
   //     }
   
   //     if (getenv("PEGASUS_TRACE"))
   //     {
   //        cout << "===== " << getQueueName() << ": ";
   //        msg->print(cout);
   //     }
   
   //    msg->dest = _queueId;
   //    SendForget(msg);
   
   }
   
   
   
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 { {
    Thread *myself = reinterpret_cast<Thread *>(parm);    Thread *myself = reinterpret_cast<Thread *>(parm);
Line 173 
Line 204 
    return(0);    return(0);
 } }
  
   void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
   {
      return_op(op);
   }
   
  
 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation, void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation,
                                                      Thread *thread,                                                      Thread *thread,
Line 180 
Line 216 
 { {
    if ( operation != 0 )    if ( operation != 0 )
    {    {
   
   // ATTN: optimization
   // << Tue Feb 19 14:10:38 2002 mdd >>
       operation->lock();       operation->lock();
         if ((operation->_state & ASYNC_OPFLAGS_CALLBACK) &&
            (operation->_state & ASYNC_OPSTATE_COMPLETE))
         {
            operation->unlock();
            _handle_async_callback(operation);
         }
   
       Message *rq = operation->_request.next(0);       Message *rq = operation->_request.next(0);
       PEGASUS_ASSERT(rq != 0 );       PEGASUS_ASSERT(rq != 0 );
  
Line 205 
Line 251 
  
 } }
  
   
   
   
 void MessageQueueService::_handle_async_request(AsyncRequest *req) void MessageQueueService::_handle_async_request(AsyncRequest *req)
 { {
    if ( req != 0 )    if ( req != 0 )
Line 295 
Line 344 
    Message *rp = op->_response.next(0);    Message *rp = op->_response.next(0);
    op->unlock();    op->unlock();
  
    if(  ((true == messageOK(rq)) || ( true == messageOK(rp) )) &&  _die.value() == 0  )     if(  (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&
           _die.value() == 0  )
    {    {
       _incoming.insert_last_wait(op);       _incoming.insert_last_wait(op);
       return true;       return true;
Line 314 
Line 364 
  
 void MessageQueueService::handleEnqueue(Message *msg) void MessageQueueService::handleEnqueue(Message *msg)
 { {
   
    if ( msg )    if ( msg )
       delete msg;       delete msg;
  
Line 322 
Line 373 
  
 void MessageQueueService::handleEnqueue(void) void MessageQueueService::handleEnqueue(void)
 { {
   
    Message *msg = dequeue();    Message *msg = dequeue();
    handleEnqueue(msg);    handleEnqueue(msg);
 } }
Line 500 
Line 552 
    if ( 0 == (op->_op_dest = MessageQueue::lookup(destination)))    if ( 0 == (op->_op_dest = MessageQueue::lookup(destination)))
       return false;       return false;
  
    op->_flags &= ASYNC_OPFLAGS_CALLBACK;     op->_flags |= ASYNC_OPFLAGS_CALLBACK;
    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPSTATE_COMPLETE);     op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
      op->_state &= ~ASYNC_OPSTATE_COMPLETE;
  
  
    return  _meta_dispatcher->route_async(op);    return  _meta_dispatcher->route_async(op);
Line 512 
Line 565 
 { {
  
    AsyncOpNode *op = 0;    AsyncOpNode *op = 0;
      Uint32 mask = msg->getMask();
  
    if (msg->getMask() & message_mask::ha_async)     if (mask & message_mask::ha_async)
    {    {
       op = (static_cast<AsyncMessage *>(msg))->op ;       op = (static_cast<AsyncMessage *>(msg))->op ;
    }    }
   
    if( op == 0 )    if( op == 0 )
    {    {
       op = get_op();       op = get_op();
       op->_request.insert_first(msg);       op->_request.insert_first(msg);
         if (mask & message_mask::ha_async)
            (static_cast<AsyncMessage *>(msg))->op = op;
    }    }
  
    op->_flags &= ASYNC_OPFLAGS_FIRE_AND_FORGET;     op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS | ASYNC_OPSTATE_COMPLETE);     op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);
    op->put_response(0);     op->_state &= ~ASYNC_OPSTATE_COMPLETE;
  
    // get the queue handle for the destination    // get the queue handle for the destination
    if ( 0 == (op->_op_dest = MessageQueue::lookup(msg->dest)))    if ( 0 == (op->_op_dest = MessageQueue::lookup(msg->dest)))
Line 551 
Line 608 
    }    }
  
    request->block = true;    request->block = true;
    request->op->_state &= ~(ASYNC_OPSTATE_COMPLETE | ASYNC_OPFLAGS_CALLBACK);     request->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
    request->op->put_response(0);     request->op->_flags &= ~ASYNC_OPFLAGS_CALLBACK;
  
    // get the queue handle for the destination    // get the queue handle for the destination
    if ( 0 == (request->op->_op_dest = MessageQueue::lookup(request->dest)))    if ( 0 == (request->op->_op_dest = MessageQueue::lookup(request->dest)))


Legend:
Removed from v.1.21  
changed lines
  Added in v.1.22

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2