(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.7 and 1.13

version 1.7, 2002/02/02 17:58:13 version 1.13, 2002/02/06 15:15:35
Line 28 
Line 28 
  
 #include "MessageQueueService.h" #include "MessageQueueService.h"
  
   PEGASUS_USING_STD;
   
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 MessageQueueService::MessageQueueService(const char *name, MessageQueueService::MessageQueueService(const char *name,
Line 67 
Line 69 
  
 void MessageQueueService::_shutdown_incoming_queue(void) void MessageQueueService::_shutdown_incoming_queue(void)
 { {
    _incoming_queue_shutdown = 1;  
  
    _incoming.shutdown_queue();     if (_incoming_queue_shutdown.value() > 0 )
    _req_thread.cancel();        return ;
   
      AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
                                       0,
                                       _queueId,
                                       _queueId,
                                       true,
                                       AsyncIoctl::IO_CLOSE,
                                       0,
                                       0);
   
      msg->op = get_op();
      msg->op->_request.insert_first(msg);
   
   
   
      _incoming.insert_last_wait(msg->op);
      msg->op->_client_sem.wait();
   
      msg->op->lock();
      AsyncReply * reply = static_cast<AsyncReply *>(msg->op->_response.remove_first());
      reply->op = 0;
      msg->op->unlock();
      delete reply;
   
      msg->op->_request.remove(msg);
      msg->op->_state |= ASYNC_OPSTATE_RELEASED;
      return_op(msg->op);
   
      msg->op = 0;
      delete msg;
 } }
  
  
Line 87 
Line 118 
    {    {
       try       try
       {       {
          operation = service->_incoming.remove_first();           operation = service->_incoming.remove_first_wait();
       }       }
       catch(ListClosed & )       catch(ListClosed & )
       {       {
          break;          break;
       }       }
       if ( service->_incoming.is_shutdown() || service->_die.value() )  
          break;  
       if( operation )       if( operation )
          service->_handle_incoming_operation(operation);        {
       else  
          pegasus_yield();           service->_handle_incoming_operation(operation, myself, service);
         }
    }    }
  
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
Line 106 
Line 136 
 } }
  
  
 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)  void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation,
                                                        Thread *thread,
                                                        MessageQueue *queue)
 { {
    if ( operation != 0 )    if ( operation != 0 )
    {    {
Line 117 
Line 149 
       PEGASUS_ASSERT(rq != 0 );       PEGASUS_ASSERT(rq != 0 );
       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );
       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);
         static_cast<AsyncMessage *>(rq)->_myself = thread;
         static_cast<AsyncMessage *>(rq)->_service = queue;
       _handle_async_request(static_cast<AsyncRequest *>(rq));       _handle_async_request(static_cast<AsyncRequest *>(rq));
    }    }
  
Line 140 
Line 174 
       else if (type == async_messages::CIMSERVICE_STOP)       else if (type == async_messages::CIMSERVICE_STOP)
          handle_CimServiceStop(static_cast<CimServiceStop *>(req));          handle_CimServiceStop(static_cast<CimServiceStop *>(req));
       else if (type == async_messages::CIMSERVICE_PAUSE)       else if (type == async_messages::CIMSERVICE_PAUSE)
         {
          handle_CimServicePause(static_cast<CimServicePause *>(req));          handle_CimServicePause(static_cast<CimServicePause *>(req));
         }
   
       else if (type == async_messages::CIMSERVICE_RESUME)       else if (type == async_messages::CIMSERVICE_RESUME)
          handle_CimServiceResume(static_cast<CimServiceResume *>(req));          handle_CimServiceResume(static_cast<CimServiceResume *>(req));
       else if ( type == async_messages::ASYNC_OP_START)       else if ( type == async_messages::ASYNC_OP_START)
Line 193 
Line 230 
  
 Boolean MessageQueueService::accept_async(AsyncOpNode *op) Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 { {
      if (_incoming_queue_shutdown.value() > 0 )
         return false;
   
    op->lock();    op->lock();
    Message *rq = op->_request.next(0);    Message *rq = op->_request.next(0);
    op->unlock();    op->unlock();
Line 207 
Line 247 
  
 Boolean MessageQueueService::messageOK(const Message *msg) Boolean MessageQueueService::messageOK(const Message *msg)
 { {
      if (_incoming_queue_shutdown.value() > 0 )
         return false;
   
    if ( msg != 0 )    if ( msg != 0 )
    {    {
       Uint32 mask = msg->getMask();       Uint32 mask = msg->getMask();
Line 251 
Line 294 
  
 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req) void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 { {
   
      switch( req->ctl )
      {
         case AsyncIoctl::IO_CLOSE:
         {
            // save my bearings
            Thread *myself = req->_myself;
            MessageQueueService *service = static_cast<MessageQueueService *>(req->_service);
   
            // respond to this message.
    _make_response(req, async_results::OK);    _make_response(req, async_results::OK);
            // ensure we do not accept any further messages
   
            // ensure we don't recurse on IO_CLOSE
            if( _incoming_queue_shutdown.value() > 0 )
               break;
   
            // set the closing flag
            service->_incoming_queue_shutdown = 1;
            // empty out the queue
            while( 1 )
            {
               AsyncOpNode *operation;
               try
               {
                  operation = service->_incoming.remove_first();
 } }
 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)              catch(IPCException & )
               {
                  break;
               }
               if( operation )
 { {
                  service->_handle_incoming_operation(operation, myself, service);
               }
               else
                  break;
            } // message processing loop
   
            // shutdown the AsyncDQueue
            service->_incoming.shutdown_queue();
            // exit the thread !
            myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
            return;
         }
   
         default:
    _make_response(req, async_results::CIM_NAK);    _make_response(req, async_results::CIM_NAK);
 } }
   }
   
   void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
   {
      // clear the stoped bit and update
      _capabilities &= (~(module_capabilities::stopped));
      _make_response(req, async_results::OK);
      // now tell the meta dispatcher we are stopped
      update_service(_capabilities, _mask);
   
   }
 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req) void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 { {
    _make_response(req, async_results::CIM_NAK);     // set the stopeed bit and update
      _capabilities |= module_capabilities::stopped;
      _make_response(req, async_results::CIM_STOPPED);
      // now tell the meta dispatcher we are stopped
      update_service(_capabilities, _mask);
   
 } }
 void MessageQueueService::handle_CimServicePause(CimServicePause *req) void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 { {
    _make_response(req, async_results::CIM_NAK);     // set the paused bit and update
      _capabilities |= module_capabilities::paused;
      update_service(_capabilities, _mask);
      _make_response(req, async_results::CIM_PAUSED);
      // now tell the meta dispatcher we are stopped
 } }
 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req) void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 { {
    _make_response(req, async_results::CIM_NAK);     // clear the paused  bit and update
      _capabilities &= (~(module_capabilities::paused));
      update_service(_capabilities, _mask);
      _make_response(req, async_results::OK);
      // now tell the meta dispatcher we are stopped
 } }
  
 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req) void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 { {
    _make_response(req, async_results::CIM_NAK);    _make_response(req, async_results::CIM_NAK);
   
 } }
  
 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req) void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 { {
    ;  
 } }
  
 AsyncOpNode *MessageQueueService::get_op(void) AsyncOpNode *MessageQueueService::get_op(void)
 { {
    AsyncOpNode *op = new AsyncOpNode();    AsyncOpNode *op = new AsyncOpNode();
  
    op->write_state(ASYNC_OPSTATE_UNKNOWN);     op->_state = ASYNC_OPSTATE_UNKNOWN;
    op->write_flags(ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL );     op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
  
    return op;    return op;
 } }


Legend:
Removed from v.1.7  
changed lines
  Added in v.1.13

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2