(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.17 and 1.18

version 1.17, 2002/02/12 23:28:58 version 1.18, 2002/02/18 12:31:29
Line 184 
Line 184 
    {    {
       operation->lock();       operation->lock();
       Message *rq = operation->_request.next(0);       Message *rq = operation->_request.next(0);
         PEGASUS_ASSERT(rq != 0 );
   
         // divert legacy messages to handleEnqueue
         if ( ! (rq->getMask() & message_mask::ha_async) )
         {
            rq = operation->_request.remove_first() ;
       operation->unlock();       operation->unlock();
            // delete the op node
            delete operation;
            handleEnqueue(rq);
            return;
         }
  
       PEGASUS_ASSERT(rq != 0 );        operation->unlock();
       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );  
       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);  
       static_cast<AsyncMessage *>(rq)->_myself = thread;       static_cast<AsyncMessage *>(rq)->_myself = thread;
       static_cast<AsyncMessage *>(rq)->_service = queue;       static_cast<AsyncMessage *>(rq)->_service = queue;
       _handle_async_request(static_cast<AsyncRequest *>(rq));       _handle_async_request(static_cast<AsyncRequest *>(rq));
Line 231 
Line 240 
 Boolean MessageQueueService::_enqueueResponse( Boolean MessageQueueService::_enqueueResponse(
    Message* request,    Message* request,
    Message* response)    Message* response)
   
 { {
    if(request->_async != 0 )    if(request->_async != 0 )
    {    {
       Uint32 mask = request->_async->getMask();       Uint32 mask = request->_async->getMask();
       if ( mask & message_mask::ha_async)        PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
       {  
          if ( mask & message_mask::ha_request)        AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
          {        AsyncOpNode *op = async->op;
             AsyncOpNode *op = (static_cast<AsyncRequest *>(request->_async)->op);        request->_async = 0;
  
             AsyncLegacyOperationResult *async_result =             AsyncLegacyOperationResult *async_result =
                new AsyncLegacyOperationResult(                new AsyncLegacyOperationResult(
                   (static_cast<AsyncRequest *>(request->_async))->getKey(),              async->getKey(),
                   (static_cast<AsyncRequest *>(request->_async))->getRouting(),              async->getRouting(),
                   op,                   op,
                   response);                   response);
             _completeAsyncResponse(static_cast<AsyncRequest *>(request->_async),        _completeAsyncResponse(async,
                                    async_result,                                    async_result,
                                    ASYNC_OPSTATE_COMPLETE,                                    ASYNC_OPSTATE_COMPLETE,
                                    0);                                    0);
             return true;             return true;
          }          }
       }  
    }     // ensure that the destination queue is in response->dest
    return false;     return SendForget(response);
   
 } }
  
 void MessageQueueService::_make_response(AsyncRequest *req, Uint32 code)  void MessageQueueService::_make_response(Message *req, Uint32 code)
 { {
    AsyncReply *reply =     return cimom::_make_response(req, code);
       new AsyncReply(async_messages::REPLY,  
                      req->getKey(),  
                      req->getRouting(),  
                      0,  
                      req->op,  
                      code,  
                      req->resp,  
                      false);  
    _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );  
 } }
  
  
Line 278 
Line 280 
                                                 Uint32 state,                                                 Uint32 state,
                                                 Uint32 flag)                                                 Uint32 flag)
 { {
    PEGASUS_ASSERT(request != 0  && reply != 0 );     return cimom::_completeAsyncResponse(request, reply, state, flag);
   
    AsyncOpNode *op = request->op;  
    op->lock();  
    op->_state |= state ;  
    op->_flags |= flag;  
    gettimeofday(&(op->_updated), NULL);  
    if ( false == op->_response.exists(reinterpret_cast<void *>(reply)) )  
       op->_response.insert_last(reply);  
    op->unlock();  
   
    op->_client_sem.signal();  
   
   
 } }
  
  
Line 317 
Line 306 
 { {
    if (_incoming_queue_shutdown.value() > 0 )    if (_incoming_queue_shutdown.value() > 0 )
       return false;       return false;
   
    if ( msg != 0 )  
    {  
       Uint32 mask = msg->getMask();  
       if ( mask & message_mask::ha_async)  
          if ( mask & message_mask::ha_request)  
             return true;             return true;
    }  
    return false;  
 } }
  
  
 void MessageQueueService::handleEnqueue(void)  void MessageQueueService::handleEnqueue(Message *msg)
 { {
    Message *msg = dequeue();  
    if( msg )    if( msg )
    {  
       delete msg;       delete msg;
   
    }    }
   
   
   void MessageQueueService::handleEnqueue(void)
   {
      Message *msg = dequeue();
      handleEnqueue(msg);
 } }
  
 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req) void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
Line 500 
Line 487 
 } }
  
  
   Boolean MessageQueueService::SendAsync(AsyncRequest *request,
                                          void (*callback)(AsyncOpNode *,
                                                           MessageQueueService *,
                                                           void *))
   {
   
      return true;
   }
   
   
   Boolean MessageQueueService::SendForget(Message *msg)
   {
   
      AsyncOpNode *op = 0;
   
      if (msg->getMask() & message_mask::ha_async)
      {
         op = (static_cast<AsyncMessage *>(msg))->op ;
      }
      if( op == 0 )
         op = get_op();
   
      op->_request.insert_first(msg);
      op->_state &= ~ASYNC_OPSTATE_COMPLETE;
      op->_flags &= ASYNC_OPFLAGS_FIRE_AND_FORGET;
      op->put_response(0);
   
      // now see if the meta dispatcher will take it
      return  _meta_dispatcher->route_async(op);
   }
   
  
 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request) AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
 { {


Legend:
Removed from v.1.17  
changed lines
  Added in v.1.18

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2