(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.6 and 1.13

version 1.6, 2002/02/02 01:03:31 version 1.13, 2002/02/06 15:15:35
Line 28 
Line 28 
  
 #include "MessageQueueService.h" #include "MessageQueueService.h"
  
   PEGASUS_USING_STD;
   
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 MessageQueueService::MessageQueueService(const char *name, MessageQueueService::MessageQueueService(const char *name,
Line 40 
Line 42 
      _die(0),      _die(0),
      _pending(true),      _pending(true),
      _incoming(true, 1000),      _incoming(true, 1000),
        _incoming_queue_shutdown(0),
      _req_thread(_req_proc, this, false)      _req_thread(_req_proc, this, false)
 { {
    _default_op_timeout.tv_sec = 30;    _default_op_timeout.tv_sec = 30;
Line 55 
Line 58 
 MessageQueueService::~MessageQueueService(void) MessageQueueService::~MessageQueueService(void)
 { {
    _die = 1;    _die = 1;
      if (_incoming_queue_shutdown.value() == 0 )
    _incoming.shutdown_queue();    _incoming.shutdown_queue();
  
    _req_thread.join();    _req_thread.join();
Line 63 
Line 67 
  
 AtomicInt MessageQueueService::_xid(1); AtomicInt MessageQueueService::_xid(1);
  
   void MessageQueueService::_shutdown_incoming_queue(void)
   {
   
      if (_incoming_queue_shutdown.value() > 0 )
         return ;
   
      AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
                                       0,
                                       _queueId,
                                       _queueId,
                                       true,
                                       AsyncIoctl::IO_CLOSE,
                                       0,
                                       0);
   
      msg->op = get_op();
      msg->op->_request.insert_first(msg);
   
   
   
      _incoming.insert_last_wait(msg->op);
      msg->op->_client_sem.wait();
   
      msg->op->lock();
      AsyncReply * reply = static_cast<AsyncReply *>(msg->op->_response.remove_first());
      reply->op = 0;
      msg->op->unlock();
      delete reply;
   
      msg->op->_request.remove(msg);
      msg->op->_state |= ASYNC_OPSTATE_RELEASED;
      return_op(msg->op);
   
      msg->op = 0;
      delete msg;
   }
   
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 { {
Line 71 
Line 112 
  
    // pull messages off the incoming queue and dispatch them. then    // pull messages off the incoming queue and dispatch them. then
    // check pending messages that are non-blocking    // check pending messages that are non-blocking
      AsyncOpNode *operation = 0;
  
    while ( service->_die.value() == 0 )    while ( service->_die.value() == 0 )
    {    {
       AsyncOpNode *operation = service->_incoming.remove_first_wait();        try
       if ( operation == 0 )        {
            operation = service->_incoming.remove_first_wait();
         }
         catch(ListClosed & )
         {
          break;          break;
         }
         if( operation )
         {
  
       service->_handle_incoming_operation(operation);           service->_handle_incoming_operation(operation, myself, service);
         }
    }    }
  
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
Line 86 
Line 136 
 } }
  
  
 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)  void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation,
                                                        Thread *thread,
                                                        MessageQueue *queue)
 { {
    if ( operation != 0 )    if ( operation != 0 )
    {    {
Line 97 
Line 149 
       PEGASUS_ASSERT(rq != 0 );       PEGASUS_ASSERT(rq != 0 );
       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );
       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);
         static_cast<AsyncMessage *>(rq)->_myself = thread;
         static_cast<AsyncMessage *>(rq)->_service = queue;
       _handle_async_request(static_cast<AsyncRequest *>(rq));       _handle_async_request(static_cast<AsyncRequest *>(rq));
    }    }
  
Line 120 
Line 174 
       else if (type == async_messages::CIMSERVICE_STOP)       else if (type == async_messages::CIMSERVICE_STOP)
          handle_CimServiceStop(static_cast<CimServiceStop *>(req));          handle_CimServiceStop(static_cast<CimServiceStop *>(req));
       else if (type == async_messages::CIMSERVICE_PAUSE)       else if (type == async_messages::CIMSERVICE_PAUSE)
         {
          handle_CimServicePause(static_cast<CimServicePause *>(req));          handle_CimServicePause(static_cast<CimServicePause *>(req));
         }
   
       else if (type == async_messages::CIMSERVICE_RESUME)       else if (type == async_messages::CIMSERVICE_RESUME)
          handle_CimServiceResume(static_cast<CimServiceResume *>(req));          handle_CimServiceResume(static_cast<CimServiceResume *>(req));
       else if ( type == async_messages::ASYNC_OP_START)       else if ( type == async_messages::ASYNC_OP_START)
Line 160 
Line 217 
    op->_state |= state ;    op->_state |= state ;
    op->_flags |= flag;    op->_flags |= flag;
    gettimeofday(&(op->_updated), NULL);    gettimeofday(&(op->_updated), NULL);
    if ( false == op->_request.exists(reinterpret_cast<void *>(reply)) )     if ( false == op->_response.exists(reinterpret_cast<void *>(reply)) )
       op->_request.insert_last(reply);        op->_response.insert_last(reply);
    op->unlock();    op->unlock();
  
    op->_client_sem.signal();    op->_client_sem.signal();
Line 173 
Line 230 
  
 Boolean MessageQueueService::accept_async(AsyncOpNode *op) Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 { {
      if (_incoming_queue_shutdown.value() > 0 )
         return false;
   
    op->lock();    op->lock();
    Message *rq = op->_request.next(0);    Message *rq = op->_request.next(0);
    op->unlock();    op->unlock();
Line 187 
Line 247 
  
 Boolean MessageQueueService::messageOK(const Message *msg) Boolean MessageQueueService::messageOK(const Message *msg)
 { {
      if (_incoming_queue_shutdown.value() > 0 )
         return false;
   
    if ( msg != 0 )    if ( msg != 0 )
    {    {
       Uint32 mask = msg->getMask();       Uint32 mask = msg->getMask();
Line 231 
Line 294 
  
 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req) void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 { {
   
      switch( req->ctl )
      {
         case AsyncIoctl::IO_CLOSE:
         {
            // save my bearings
            Thread *myself = req->_myself;
            MessageQueueService *service = static_cast<MessageQueueService *>(req->_service);
   
            // respond to this message.
    _make_response(req, async_results::OK);    _make_response(req, async_results::OK);
            // ensure we do not accept any further messages
   
            // ensure we don't recurse on IO_CLOSE
            if( _incoming_queue_shutdown.value() > 0 )
               break;
   
            // set the closing flag
            service->_incoming_queue_shutdown = 1;
            // empty out the queue
            while( 1 )
            {
               AsyncOpNode *operation;
               try
               {
                  operation = service->_incoming.remove_first();
 } }
 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)              catch(IPCException & )
               {
                  break;
               }
               if( operation )
 { {
                  service->_handle_incoming_operation(operation, myself, service);
               }
               else
                  break;
            } // message processing loop
   
            // shutdown the AsyncDQueue
            service->_incoming.shutdown_queue();
            // exit the thread !
            myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
            return;
         }
   
         default:
    _make_response(req, async_results::CIM_NAK);    _make_response(req, async_results::CIM_NAK);
 } }
   }
   
   void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
   {
      // clear the stoped bit and update
      _capabilities &= (~(module_capabilities::stopped));
      _make_response(req, async_results::OK);
      // now tell the meta dispatcher we are stopped
      update_service(_capabilities, _mask);
   
   }
 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req) void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 { {
    _make_response(req, async_results::CIM_NAK);     // set the stopeed bit and update
      _capabilities |= module_capabilities::stopped;
      _make_response(req, async_results::CIM_STOPPED);
      // now tell the meta dispatcher we are stopped
      update_service(_capabilities, _mask);
   
 } }
 void MessageQueueService::handle_CimServicePause(CimServicePause *req) void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 { {
    _make_response(req, async_results::CIM_NAK);     // set the paused bit and update
      _capabilities |= module_capabilities::paused;
      update_service(_capabilities, _mask);
      _make_response(req, async_results::CIM_PAUSED);
      // now tell the meta dispatcher we are stopped
 } }
 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req) void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 { {
    _make_response(req, async_results::CIM_NAK);     // clear the paused  bit and update
      _capabilities &= (~(module_capabilities::paused));
      update_service(_capabilities, _mask);
      _make_response(req, async_results::OK);
      // now tell the meta dispatcher we are stopped
 } }
  
 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req) void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 { {
    _make_response(req, async_results::CIM_NAK);    _make_response(req, async_results::CIM_NAK);
   
 } }
  
 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req) void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 { {
    ;  
 } }
  
 AsyncOpNode *MessageQueueService::get_op(void) AsyncOpNode *MessageQueueService::get_op(void)
 { {
    AsyncOpNode *op = new AsyncOpNode();    AsyncOpNode *op = new AsyncOpNode();
  
    op->write_state(ASYNC_OPSTATE_UNKNOWN);     op->_state = ASYNC_OPSTATE_UNKNOWN;
    op->write_flags(ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL );     op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
  
    return op;    return op;
 } }
Line 289 
Line 418 
    if (request->op == false)    if (request->op == false)
    {    {
       request->op = get_op();       request->op = get_op();
       request->op->put_request(request);        request->op->_request.insert_first(request);
   
       destroy_op = true;       destroy_op = true;
    }    }
  
Line 323 
Line 451 
       request->op->unlock();       request->op->unlock();
  
       return_op(request->op);       return_op(request->op);
         request->op = 0;
 //      delete request->op;  
 //      request->op = 0;  
    }    }
  
    return rpl;    return rpl;
Line 345 
Line 471 
                                                     mask,                                                     mask,
                                                     _queueId);                                                     _queueId);
    Boolean registered = false;    Boolean registered = false;
    AsyncMessage *reply = SendWait( msg );     AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
  
    if ( reply != 0 )    if ( reply != 0 )
    {    {
Line 353 
Line 479 
       {       {
          if(reply->getMask() & message_mask::ha_reply)          if(reply->getMask() & message_mask::ha_reply)
          {          {
             if((static_cast<AsyncReply *>(reply))->result == async_results::OK)              if(reply->result == async_results::OK)
                registered = true;                registered = true;
          }          }
       }       }


Legend:
Removed from v.1.6  
changed lines
  Added in v.1.13

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2