(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.3 and 1.13

version 1.3, 2002/01/19 01:53:22 version 1.13, 2002/02/06 15:15:35
Line 28 
Line 28 
  
 #include "MessageQueueService.h" #include "MessageQueueService.h"
  
   PEGASUS_USING_STD;
   
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 MessageQueueService::MessageQueueService(const char *name, MessageQueueService::MessageQueueService(const char *name,
                                          Uint32 queueID,                                          Uint32 queueID,
                                          Uint32 capabilities,                                          Uint32 capabilities,
                                          Uint32 mask)                                          Uint32 mask)
    : Base(name, true,  queueID),     : Base(name, false,  queueID),
      _capabilities(capabilities),      _capabilities(capabilities),
      _mask(mask),      _mask(mask),
      _die(0)       _die(0),
        _pending(true),
        _incoming(true, 1000),
        _incoming_queue_shutdown(0),
        _req_thread(_req_proc, this, false)
 { {
    _default_op_timeout.tv_sec = 30;    _default_op_timeout.tv_sec = 30;
    _default_op_timeout.tv_usec = 100;    _default_op_timeout.tv_usec = 100;
    _meta_dispatcher = static_cast<cimom *>(Base::lookup(CIMOM_Q_ID));    _meta_dispatcher = static_cast<cimom *>(Base::lookup(CIMOM_Q_ID));
    if(_meta_dispatcher == 0 )    if(_meta_dispatcher == 0 )
       throw NullPointer();       throw NullPointer();
      _req_thread.run();
   
 } }
  
   
 MessageQueueService::~MessageQueueService(void) MessageQueueService::~MessageQueueService(void)
 { {
    _die = 1;    _die = 1;
      if (_incoming_queue_shutdown.value() == 0 )
          _incoming.shutdown_queue();
  
 }     _req_thread.join();
  
   }
  
 AtomicInt MessageQueueService::_xid(1); AtomicInt MessageQueueService::_xid(1);
  
 // mutex is UNLOCKED  void MessageQueueService::_shutdown_incoming_queue(void)
 void MessageQueueService::handleEnqueue(void)  
 { {
    Message *msg = dequeue();  
    if( msg )     if (_incoming_queue_shutdown.value() > 0 )
    {        return ;
       if(msg->getMask() & message_mask::ha_async)  
       {     AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
          _handle_async_msg(static_cast<AsyncMessage *>(msg));                                      0,
       }                                      _queueId,
       else                                      _queueId,
                                       true,
                                       AsyncIoctl::IO_CLOSE,
                                       0,
                                       0);
   
      msg->op = get_op();
      msg->op->_request.insert_first(msg);
   
   
   
      _incoming.insert_last_wait(msg->op);
      msg->op->_client_sem.wait();
   
      msg->op->lock();
      AsyncReply * reply = static_cast<AsyncReply *>(msg->op->_response.remove_first());
      reply->op = 0;
      msg->op->unlock();
      delete reply;
   
      msg->op->_request.remove(msg);
      msg->op->_state |= ASYNC_OPSTATE_RELEASED;
      return_op(msg->op);
   
      msg->op = 0;
          delete msg;          delete msg;
    }    }
 }  
  
  
 void MessageQueueService::_enqueueAsyncResponse(AsyncRequest *request,  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
                                                 AsyncReply *reply,  
                                                 Uint32 state,  
                                                 Uint32 flag)  
 { {
    PEGASUS_ASSERT(request != 0  && reply != 0 );     Thread *myself = reinterpret_cast<Thread *>(parm);
      MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
  
    AsyncOpNode *op = request->op;     // pull messages off the incoming queue and dispatch them. then
    op->lock();     // check pending messages that are non-blocking
 //   if (false == op->_response.exists(reply))     AsyncOpNode *operation = 0;
 //      op->_response.insert_last(reply);  
    op->_response = reply;  
  
    op->_state |= state ;     while ( service->_die.value() == 0 )
    op->_flags |= flag;     {
    gettimeofday(&(op->_updated), NULL);        try
    op->unlock();        {
    op->_client_sem.signal();           operation = service->_incoming.remove_first_wait();
         }
         catch(ListClosed & )
         {
            break;
         }
         if( operation )
         {
  
            service->_handle_incoming_operation(operation, myself, service);
         }
 } }
  
 // may be overriden by derived classes     myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 void MessageQueueService::_handle_async_msg(AsyncMessage *msg)     return(0);
 {  }
    if( msg == 0 )  
       return;  
  
    Uint32 mask = msg->getMask();  
    Uint32 type = msg->getType();  
  
    if (mask & message_mask::ha_async)  void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation,
                                                        Thread *thread,
                                                        MessageQueue *queue)
    {    {
       if (mask & message_mask::ha_request)     if ( operation != 0 )
          _handle_async_request(static_cast<AsyncRequest *>(msg));     {
       else        operation->lock();
          _handle_async_reply(static_cast<AsyncReply *>(msg));        Message *rq = operation->_request.next(0);
         operation->unlock();
   
         PEGASUS_ASSERT(rq != 0 );
         PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );
         PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);
         static_cast<AsyncMessage *>(rq)->_myself = thread;
         static_cast<AsyncMessage *>(rq)->_service = queue;
         _handle_async_request(static_cast<AsyncRequest *>(rq));
    }    }
    else  
       delete msg;     return;
   
 } }
  
 void MessageQueueService::_handle_async_request(AsyncRequest *req) void MessageQueueService::_handle_async_request(AsyncRequest *req)
 { {
      if ( req != 0 )
      {
    req->op->processing();    req->op->processing();
  
    Uint32 type = req->getType();    Uint32 type = req->getType();
Line 126 
Line 174 
    else if (type == async_messages::CIMSERVICE_STOP)    else if (type == async_messages::CIMSERVICE_STOP)
       handle_CimServiceStop(static_cast<CimServiceStop *>(req));       handle_CimServiceStop(static_cast<CimServiceStop *>(req));
    else if (type == async_messages::CIMSERVICE_PAUSE)    else if (type == async_messages::CIMSERVICE_PAUSE)
         {
       handle_CimServicePause(static_cast<CimServicePause *>(req));       handle_CimServicePause(static_cast<CimServicePause *>(req));
         }
   
    else if (type == async_messages::CIMSERVICE_RESUME)    else if (type == async_messages::CIMSERVICE_RESUME)
       handle_CimServiceResume(static_cast<CimServiceResume *>(req));       handle_CimServiceResume(static_cast<CimServiceResume *>(req));
    else if ( type == async_messages::ASYNC_OP_START)    else if ( type == async_messages::ASYNC_OP_START)
Line 136 
Line 187 
       // we don't handle this request message       // we don't handle this request message
       _make_response(req, async_results::CIM_NAK );       _make_response(req, async_results::CIM_NAK );
    }    }
   
 } }
   
 void MessageQueueService::_handle_async_reply(AsyncReply *rep)  
 {  
   
    if (rep->op != 0 )  
       rep->op->processing();  
   
    Uint32 type = rep->getType();  
   
    if ( type == async_messages::ASYNC_OP_RESULT )  
       handle_AsyncOperationResult(static_cast<AsyncOperationResult *>(rep));  
    else  
    {  
       // we don't handle this reply  
       ;  
    }  
   
    if( rep->op != 0 )  
       rep->op->release();  
 } }
  
 void MessageQueueService::_make_response(AsyncRequest *req, Uint32 code) void MessageQueueService::_make_response(AsyncRequest *req, Uint32 code)
Line 170 
Line 201 
                      code,                      code,
                      req->resp,                      req->resp,
                      false);                      false);
    _enqueueAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );     _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
   }
   
   
   void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
                                                   AsyncReply *reply,
                                                   Uint32 state,
                                                   Uint32 flag)
   {
      PEGASUS_ASSERT(request != 0  && reply != 0 );
   
      AsyncOpNode *op = request->op;
      op->lock();
      op->_state |= state ;
      op->_flags |= flag;
      gettimeofday(&(op->_updated), NULL);
      if ( false == op->_response.exists(reinterpret_cast<void *>(reply)) )
         op->_response.insert_last(reply);
      op->unlock();
   
      op->_client_sem.signal();
   
   
   }
   
   
   
   Boolean MessageQueueService::accept_async(AsyncOpNode *op)
   {
      if (_incoming_queue_shutdown.value() > 0 )
         return false;
   
      op->lock();
      Message *rq = op->_request.next(0);
      op->unlock();
   
      if( true == messageOK(rq) &&  _die.value() == 0  )
      {
         _incoming.insert_last_wait(op);
         return true;
      }
      return false;
 } }
  
   Boolean MessageQueueService::messageOK(const Message *msg)
   {
      if (_incoming_queue_shutdown.value() > 0 )
         return false;
   
      if ( msg != 0 )
      {
         Uint32 mask = msg->getMask();
         if ( mask & message_mask::ha_async)
            if ( mask & message_mask::ha_request)
               return true;
      }
      return false;
   }
   
   
   void MessageQueueService::handleEnqueue(void)
   {
      Message *msg = dequeue();
      if( msg )
      {
         delete msg;
      }
   }
  
 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req) void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 { {
Line 187 
Line 283 
                      async_results::OK,                      async_results::OK,
                      req->resp,                      req->resp,
                      false);                      false);
    _enqueueAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );     _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
   
 } }
  
  
Line 199 
Line 294 
  
 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req) void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 { {
   
      switch( req->ctl )
      {
         case AsyncIoctl::IO_CLOSE:
         {
            // save my bearings
            Thread *myself = req->_myself;
            MessageQueueService *service = static_cast<MessageQueueService *>(req->_service);
   
            // respond to this message.
    _make_response(req, async_results::OK);    _make_response(req, async_results::OK);
            // ensure we do not accept any further messages
   
            // ensure we don't recurse on IO_CLOSE
            if( _incoming_queue_shutdown.value() > 0 )
               break;
   
            // set the closing flag
            service->_incoming_queue_shutdown = 1;
            // empty out the queue
            while( 1 )
            {
               AsyncOpNode *operation;
               try
               {
                  operation = service->_incoming.remove_first();
 } }
 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)              catch(IPCException & )
 { {
                  break;
               }
               if( operation )
               {
                  service->_handle_incoming_operation(operation, myself, service);
               }
               else
                  break;
            } // message processing loop
   
            // shutdown the AsyncDQueue
            service->_incoming.shutdown_queue();
            // exit the thread !
            myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
            return;
         }
   
         default:
    _make_response(req, async_results::CIM_NAK);    _make_response(req, async_results::CIM_NAK);
 } }
   }
   
   void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
   {
      // clear the stoped bit and update
      _capabilities &= (~(module_capabilities::stopped));
      _make_response(req, async_results::OK);
      // now tell the meta dispatcher we are stopped
      update_service(_capabilities, _mask);
   
   }
 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req) void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 { {
    _make_response(req, async_results::CIM_NAK);     // set the stopeed bit and update
      _capabilities |= module_capabilities::stopped;
      _make_response(req, async_results::CIM_STOPPED);
      // now tell the meta dispatcher we are stopped
      update_service(_capabilities, _mask);
   
 } }
 void MessageQueueService::handle_CimServicePause(CimServicePause *req) void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 { {
    _make_response(req, async_results::CIM_NAK);     // set the paused bit and update
      _capabilities |= module_capabilities::paused;
      update_service(_capabilities, _mask);
      _make_response(req, async_results::CIM_PAUSED);
      // now tell the meta dispatcher we are stopped
 } }
 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req) void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 { {
    _make_response(req, async_results::CIM_NAK);     // clear the paused  bit and update
      _capabilities &= (~(module_capabilities::paused));
      update_service(_capabilities, _mask);
      _make_response(req, async_results::OK);
      // now tell the meta dispatcher we are stopped
 } }
  
 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req) void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 { {
    _make_response(req, async_results::CIM_NAK);    _make_response(req, async_results::CIM_NAK);
   
 } }
  
 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req) void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 { {
    ;  
 } }
  
 AsyncOpNode *MessageQueueService::get_op(void) AsyncOpNode *MessageQueueService::get_op(void)
 { {
    AsyncOpNode *op = _meta_dispatcher->get_cached_op();     AsyncOpNode *op = new AsyncOpNode();
    if(op == 0 )  
       throw NullPointer();     op->_state = ASYNC_OPSTATE_UNKNOWN;
      op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
  
    op->write_state(ASYNC_OPSTATE_UNKNOWN);  
    op->write_flags(ASYNC_OPFLAGS_SINGLE |  
                    ASYNC_OPFLAGS_NORMAL |  
                    ASYNC_OPFLAGS_META_DISPATCHER);  
    return op;    return op;
 } }
  
 void MessageQueueService::return_op(AsyncOpNode *op) void MessageQueueService::return_op(AsyncOpNode *op)
 { {
    PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );    PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
   
    if(op->read_state() & ASYNC_OPFLAGS_META_DISPATCHER )  
    {  
       _meta_dispatcher->cache_op(op);  
    }  
    else  
       delete op;       delete op;
 } }
  
 AsyncMessage *MessageQueueService::SendWait(AsyncRequest *request)  
 {  
    if (request == 0 )  
       throw NullPointer();  
    AsyncMessage *ret_msg = 0;  
  
    AsyncOpNode *op = request->op;  
  
    if(op == 0 )  AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
       return 0;  
    //     ATTN: debugging  
    op->put_response(0);  
    if(true == _meta_dispatcher->accept_async(static_cast<Message *>(request)))  
    {    {
       op->_client_sem.wait();     if ( request == 0 )
       ret_msg = static_cast<AsyncMessage *>(op->_response);        return 0 ;
       op->_response = 0;  
   
  
 //      op->lock();     Boolean destroy_op = false;
 //       while( op->_response.count() )  
 //       {  
 //       AsyncMessage *rply = static_cast<AsyncMessage *>(op->_response.remove_last());  
 //       if (rply != 0 )  
 //       {  
 //          rply->op = 0;  
 //      reply_list->insert_first( static_cast<AsyncMessage *>(op->_response) );  
 //       }  
 //       }  
       // release the opnode, the meta-dispatcher will recycle it for us  
 //      op->_state |= ASYNC_OPSTATE_RELEASED ;  
 //      op->unlock();  
    }  
    return ret_msg;  
  
      if (request->op == false)
      {
         request->op = get_op();
         request->op->_request.insert_first(request);
         destroy_op = true;
 } }
  
      request->block = true;
      request->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
      request->op->put_response(0);
  
 void MessageQueueService::SendWait(AsyncRequest *request, unlocked_dq<AsyncMessage> *reply_list)     // first link it on our pending list
 {     // _pending.insert_last_wait(request->op);
    if (request == 0 || reply_list == 0 )  
       throw NullPointer();  
  
    AsyncOpNode *op = request->op;     // now see if the meta dispatcher will take it
  
    if(op == 0 )     if (true == _meta_dispatcher->route_async(request->op))
       return;  
    //     ATTN: debugging  
    op->put_response(0);  
    if(true == _meta_dispatcher->accept_async(static_cast<Message *>(request)))  
    {    {
         request->op->_client_sem.wait();
         PEGASUS_ASSERT(request->op->_state & ASYNC_OPSTATE_COMPLETE);
  
      }
  
       op->_client_sem.wait();     request->op->lock();
       op->lock();     AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
      rpl->op = 0;
       AsyncMessage *response = static_cast<AsyncMessage *>(op->_response);     request->op->unlock();
       reply_list->insert_first(response);  
       op->unlock();  
   
       op->release();  
  
 //      op->lock();     if( destroy_op == true)
 //       while( op->_response.count() )  
 //       {  
 //       AsyncMessage *rply = static_cast<AsyncMessage *>(op->_response.remove_last());  
 //       if (rply != 0 )  
 //       {  
 //          rply->op = 0;  
 //      reply_list->insert_first( static_cast<AsyncMessage *>(op->_response) );  
 //       }  
 //       }  
       // release the opnode, the meta-dispatcher will recycle it for us  
 //      op->_state |= ASYNC_OPSTATE_RELEASED ;  
 //      op->unlock();  
    }  
    else  
    {    {
       // manually free the opnode and message        request->op->lock();
       op->release();        request->op->_request.remove(request);
       return_op(op);        request->op->_state |= ASYNC_OPSTATE_RELEASED;
    }        request->op->unlock();
   
         return_op(request->op);
         request->op = 0;
 } }
  
 // Boolean MessageQueueService::SendAsync(AsyncMessage *msg)     return rpl;
 // {  }
 //    return _meta_dispatcher->accept_async(static_cast<Message *>(msg));  
 // }  
  
  
 Boolean MessageQueueService::register_service(String name, Boolean MessageQueueService::register_service(String name,
Line 349 
Line 463 
                                               Uint32 mask)                                               Uint32 mask)
  
 { {
    AsyncOpNode *op = _meta_dispatcher->get_cached_op();  
   
    op->_state |= ASYNC_OPSTATE_UNKNOWN;  
    op->_flags |= ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;  
   
    RegisterCimService *msg = new RegisterCimService(get_next_xid(),    RegisterCimService *msg = new RegisterCimService(get_next_xid(),
                                                     op,                                                      0,
                                                     true,                                                     true,
                                                     name,                                                     name,
                                                     capabilities,                                                     capabilities,
                                                     mask,                                                     mask,
                                                     _queueId);                                                     _queueId);
    Boolean registered = false;    Boolean registered = false;
    AsyncMessage *reply = SendWait( msg );     AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
  
    if ( reply != 0 )    if ( reply != 0 )
    {    {
Line 370 
Line 479 
       {       {
          if(reply->getMask() & message_mask::ha_reply)          if(reply->getMask() & message_mask::ha_reply)
          {          {
             if((static_cast<AsyncReply *>(reply))->result == async_results::OK)              if(reply->result == async_results::OK)
                registered = true;                registered = true;
   
          }          }
       }       }
  
       delete reply;       delete reply;
    }    }
      delete msg;
    return registered;    return registered;
 } }
  
 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask) Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
 { {
  
    AsyncOpNode *op = _meta_dispatcher->get_cached_op();  
    op->_state |= ASYNC_OPSTATE_UNKNOWN;  
    op->_flags |= ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;  
  
    UpdateCimService *msg = new UpdateCimService(get_next_xid(),    UpdateCimService *msg = new UpdateCimService(get_next_xid(),
                                                 op,                                                  0,
                                                 true,                                                 true,
                                                 _queueId,                                                 _queueId,
                                                 _capabilities,                                                 _capabilities,
                                                 _mask);                                                 _mask);
    Boolean registered = false;    Boolean registered = false;
  
   
    AsyncMessage *reply = SendWait(msg);    AsyncMessage *reply = SendWait(msg);
    if (reply)    if (reply)
    {    {
Line 410 
Line 515 
       }       }
       delete reply;       delete reply;
    }    }
      delete msg;
    return registered;    return registered;
 } }
  
Line 417 
Line 523 
 Boolean MessageQueueService::deregister_service(void) Boolean MessageQueueService::deregister_service(void)
 { {
  
 //   _meta_dispatcher->deregister_module(_queueId);     _meta_dispatcher->deregister_module(_queueId);
 //   return true;     return true;
   
   
    AsyncOpNode *op = _meta_dispatcher->get_cached_op();  
    op->_state |= ASYNC_OPSTATE_UNKNOWN;  
    op->_flags |= ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;  
   
    DeRegisterCimService *msg = new DeRegisterCimService(get_next_xid(),  
                                                         op,  
                                                         true,  
                                                         _queueId);  
    Boolean deregistered = false;  
   
   
   
    return _meta_dispatcher->accept_async(static_cast<Message *>(msg));  
 } }
  
  
Line 446 
Line 537 
    if( results == 0 )    if( results == 0 )
       throw NullPointer();       throw NullPointer();
  
    AsyncOpNode *op = get_op();  
    results->clear();    results->clear();
  
    FindServiceQueue *req =    FindServiceQueue *req =
       new FindServiceQueue(get_next_xid(),       new FindServiceQueue(get_next_xid(),
                            op,                             0,
                            _queueId,                            _queueId,
                            true,                            true,
                            name,                            name,
Line 474 
Line 564 
       }       }
       delete reply;       delete reply;
    }    }
      delete req;
    return ;    return ;
 } }
  
Line 482 
Line 573 
    if(result == 0)    if(result == 0)
       throw NullPointer();       throw NullPointer();
  
    AsyncOpNode *op = get_op();  
   
    EnumerateService *req    EnumerateService *req
       = new EnumerateService(get_next_xid(),       = new EnumerateService(get_next_xid(),
                              op,                               0,
                              _queueId,                              _queueId,
                              true,                              true,
                              queue);                              queue);
Line 520 
Line 609 
       }       }
       delete reply;       delete reply;
    }    }
      delete req;
   
    return;    return;
 } }
  


Legend:
Removed from v.1.3  
changed lines
  Added in v.1.13

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2