(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.146 and 1.147

version 1.146, 2008/10/14 17:25:58 version 1.147, 2008/10/21 17:11:13
Line 284 
Line 284 
         0);         0);
  
     msg->op = get_op();     msg->op = get_op();
     msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;      msg->op->_flags = ASYNC_OPFLAGS_FIRE_AND_FORGET;
     msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK  
         | ASYNC_OPFLAGS_SIMPLE_STATUS);  
     msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
  
     msg->op->_op_dest = this;     msg->op->_op_dest = this;
     msg->op->_request.reset(msg);     msg->op->_request.reset(msg);
Line 373 
Line 370 
 // including op, op->_callback_node, and op->_callback_ptr // including op, op->_callback_node, and op->_callback_ptr
 void MessageQueueService::_handle_async_callback(AsyncOpNode* op) void MessageQueueService::_handle_async_callback(AsyncOpNode* op)
 { {
     if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)      PEGASUS_ASSERT(op->_flags == ASYNC_OPFLAGS_CALLBACK);
     {  
         Message *msg = op->removeRequest();  
         if (msg && (msg->getMask() & MessageMask::ha_async))  
         {  
             if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_START)  
             {  
                 AsyncLegacyOperationStart *wrapper =  
                     static_cast<AsyncLegacyOperationStart *>(msg);  
                 msg = wrapper->get_action();  
                 delete wrapper;  
             }  
             else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_START)  
             {  
                 AsyncModuleOperationStart *wrapper =  
                     static_cast<AsyncModuleOperationStart *>(msg);  
                 msg = wrapper->get_action();  
                 delete wrapper;  
             }  
             else if (msg->getType() == ASYNC_ASYNC_OP_START)  
             {  
                 AsyncOperationStart *wrapper =  
                     static_cast<AsyncOperationStart *>(msg);  
                 msg = wrapper->get_action();  
                 delete wrapper;  
             }  
             delete msg;  
         }  
   
         msg = op->removeResponse();  
         if (msg && (msg->getMask() & MessageMask::ha_async))  
         {  
             if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_RESULT)  
             {  
                 AsyncLegacyOperationResult *wrapper =  
                     static_cast<AsyncLegacyOperationResult *>(msg);  
                 msg = wrapper->get_result();  
                 delete wrapper;  
             }  
             else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_RESULT)  
             {  
                 AsyncModuleOperationResult *wrapper =  
                     static_cast<AsyncModuleOperationResult *>(msg);  
                 msg = wrapper->get_result();  
                 delete wrapper;  
             }  
         }  
         void (*callback)(Message *, void *, void *) = op->__async_callback;  
         void *handle = op->_callback_handle;  
         void *parm = op->_callback_parameter;  
         op->release();  
         return_op(op);  
         callback(msg, handle, parm);  
     }  
     else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)  
     {  
         // note that _callback_node may be different from op         // note that _callback_node may be different from op
         // op->_callback_response_q is a "this" pointer we can use for         // op->_callback_response_q is a "this" pointer we can use for
         // static callback methods         // static callback methods
         op->_async_callback(         op->_async_callback(
             op->_callback_node, op->_callback_response_q, op->_callback_ptr);             op->_callback_node, op->_callback_response_q, op->_callback_ptr);
     }     }
 }  
  
  
 void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation) void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation)
 { {
     if (operation != 0)     if (operation != 0)
     {     {
   
 // ATTN: optimization  
 // << Tue Feb 19 14:10:38 2002 mdd >>  
         operation->lock();  
   
         Message *rq = operation->_request.get();         Message *rq = operation->_request.get();
  
 // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>> // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
Line 457 
Line 393 
         if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))         if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
         {         {
             operation->_request.release();             operation->_request.release();
             operation->unlock();  
             // delete the op node             // delete the op node
             operation->release();  
             return_op(operation);             return_op(operation);
   
             handleEnqueue(rq);             handleEnqueue(rq);
             return;             return;
         }         }
  
         if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||          if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK) &&
              operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&  
             (operation->_state & ASYNC_OPSTATE_COMPLETE))             (operation->_state & ASYNC_OPSTATE_COMPLETE))
         {         {
             operation->unlock();  
             _handle_async_callback(operation);             _handle_async_callback(operation);
         }         }
         else         else
         {         {
             PEGASUS_ASSERT(rq != 0);             PEGASUS_ASSERT(rq != 0);
             operation->unlock();  
             _handle_async_request(static_cast<AsyncRequest *>(rq));             _handle_async_request(static_cast<AsyncRequest *>(rq));
         }         }
     }     }
Line 485 
Line 415 
  
 void MessageQueueService::_handle_async_request(AsyncRequest *req) void MessageQueueService::_handle_async_request(AsyncRequest *req)
 { {
     if (req != 0)  
     {  
         req->op->processing();  
   
         MessageType type = req->getType();         MessageType type = req->getType();
         if (type == ASYNC_IOCTL)         if (type == ASYNC_IOCTL)
       {
             handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));             handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
       }
         else if (type == ASYNC_CIMSERVICE_START)         else if (type == ASYNC_CIMSERVICE_START)
       {
             handle_CimServiceStart(static_cast<CimServiceStart *>(req));             handle_CimServiceStart(static_cast<CimServiceStart *>(req));
       }
         else if (type == ASYNC_CIMSERVICE_STOP)         else if (type == ASYNC_CIMSERVICE_STOP)
       {
             handle_CimServiceStop(static_cast<CimServiceStop *>(req));             handle_CimServiceStop(static_cast<CimServiceStop *>(req));
       }
         else         else
         {         {
             // we don't handle this request message             // we don't handle this request message
             _make_response(req, async_results::CIM_NAK);             _make_response(req, async_results::CIM_NAK);
         }         }
     }     }
 }  
   
  
 Boolean MessageQueueService::_enqueueResponse( Boolean MessageQueueService::_enqueueResponse(
     Message* request,     Message* request,
Line 612 
Line 542 
     }     }
 // ATTN optimization remove the message checking altogether in the base // ATTN optimization remove the message checking altogether in the base
 // << Mon Feb 18 14:02:20 2002 mdd >> // << Mon Feb 18 14:02:20 2002 mdd >>
     op->lock();  
     Message *rq = op->_request.get();     Message *rq = op->_request.get();
     Message *rp = op->_response.get();     Message *rp = op->_response.get();
     op->unlock();  
  
     if ((rq != 0 && (true == messageOK(rq))) ||     if ((rq != 0 && (true == messageOK(rq))) ||
         (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)         (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
Line 721 
Line 649 
    AsyncOpNode* op = new AsyncOpNode();    AsyncOpNode* op = new AsyncOpNode();
  
    op->_state = ASYNC_OPSTATE_UNKNOWN;    op->_state = ASYNC_OPSTATE_UNKNOWN;
    op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;     op->_flags = ASYNC_OPFLAGS_UNKNOWN;
  
    return op;    return op;
 } }
  
 void MessageQueueService::return_op(AsyncOpNode* op) void MessageQueueService::return_op(AsyncOpNode* op)
 { {
     PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);  
     delete op;     delete op;
 } }
  
Line 740 
Line 667 
     MessageQueue* callback_response_q,     MessageQueue* callback_response_q,
     void* callback_ptr)     void* callback_ptr)
 { {
     PEGASUS_ASSERT(op != 0 && callback != 0);      return _sendAsync(
           op,
           destination,
           callback,
           callback_response_q,
           callback_ptr,
           ASYNC_OPFLAGS_CALLBACK);
  
     // get the queue handle for the destination  }
   
   Boolean MessageQueueService::_sendAsync(
       AsyncOpNode* op,
       Uint32 destination,
       void (*callback)(AsyncOpNode*, MessageQueue*, void*),
       MessageQueue* callback_response_q,
       void* callback_ptr,
       Uint32 flags)
   {
       PEGASUS_ASSERT(op != 0 && callback != 0);
  
     op->lock();  
     // destination of this message     // destination of this message
     op->_op_dest = MessageQueue::lookup(destination);     op->_op_dest = MessageQueue::lookup(destination);
     op->_flags |= ASYNC_OPFLAGS_CALLBACK;      if (op->_op_dest == 0)
     op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);      {
           return false;
       }
       op->_flags = flags;
     // initialize the callback data     // initialize the callback data
     // callback function to be executed by recpt. of response     // callback function to be executed by recpt. of response
     op->_async_callback = callback;     op->_async_callback = callback;
Line 761 
Line 706 
     // I am the originator of this request     // I am the originator of this request
     op->_callback_request_q = this;     op->_callback_request_q = this;
  
     op->unlock();  
     if (op->_op_dest == 0)  
         return false;  
   
     return  _meta_dispatcher->route_async(op);     return  _meta_dispatcher->route_async(op);
 } }
  
Line 787 
Line 728 
             (static_cast<AsyncMessage *>(msg))->op = op;             (static_cast<AsyncMessage *>(msg))->op = op;
         }         }
     }     }
   
       PEGASUS_ASSERT(op->_flags == ASYNC_OPFLAGS_UNKNOWN);
       PEGASUS_ASSERT(op->_state == ASYNC_OPSTATE_UNKNOWN);
     op->_op_dest = MessageQueue::lookup(msg->dest);     op->_op_dest = MessageQueue::lookup(msg->dest);
     op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;  
     op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK  
         | ASYNC_OPFLAGS_SIMPLE_STATUS);  
     op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
     if (op->_op_dest == 0)     if (op->_op_dest == 0)
     {     {
         op->release();  
         return_op(op);         return_op(op);
         return false;         return false;
     }     }
  
       op->_flags = ASYNC_OPFLAGS_FIRE_AND_FORGET;
   
     // now see if the meta dispatcher will take it     // now see if the meta dispatcher will take it
     return  _meta_dispatcher->route_async(op);     return  _meta_dispatcher->route_async(op);
 } }
Line 818 
Line 759 
         destroy_op = true;         destroy_op = true;
     }     }
  
       PEGASUS_ASSERT(request->op->_flags == ASYNC_OPFLAGS_UNKNOWN);
       PEGASUS_ASSERT(request->op->_state == ASYNC_OPSTATE_UNKNOWN);
   
     request->block = false;     request->block = false;
     request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;      _sendAsync(
     SendAsync(  
         request->op,         request->op,
         request->dest,         request->dest,
         _sendwait_callback,         _sendwait_callback,
         this,         this,
         (void *)0);          (void *)0,
           ASYNC_OPFLAGS_PSEUDO_CALLBACK);
  
     request->op->_client_sem.wait();     request->op->_client_sem.wait();
  
Line 834 
Line 778 
  
     if (destroy_op == true)     if (destroy_op == true)
     {     {
         request->op->lock();  
         request->op->_request.release();         request->op->_request.release();
         request->op->_state |= ASYNC_OPSTATE_RELEASED;  
         request->op->unlock();  
         return_op(request->op);         return_op(request->op);
         request->op = 0;         request->op = 0;
     }     }


Legend:
Removed from v.1.146  
changed lines
  Added in v.1.147

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2