(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.3 and 1.4

version 1.3, 2002/01/19 01:53:22 version 1.4, 2002/01/21 02:01:22
Line 30 
Line 30 
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
   
   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
   {
      Thread *myself = reinterpret_cast<Thread *>(parm);
      MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
   
      // pull messages off the incoming queue and dispatch them. then
      // check pending messages that are non-blocking
   
      while ( service->_die.value() < 1 )
      {
   
         AsyncOpNode *operation = service->_incoming.remove_first_wait();
         service->_handle_incoming_operation(operation);
      }
   
      myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
      return(0);
   }
   
   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_rpl_proc(void * parm)
   {
      Thread *myself = reinterpret_cast<Thread *>(parm);
      MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
   
      // pull messages off the _pending queue and complete them.
      // this loop is not optimized because every time we complete an operation
      // we start again to traverse the loop at the beginning.
      Uint32 loop = 1;
      Boolean unlinked = false;
   
   
      while ( service->_die.value() < 1 )
      {
         service->_pending.wait_for_node();
         AsyncOpNode *operation = service->_pending.next(0);
   //      operation = service->_pending.remove_no_lock(operation);
         while( operation != 0 )
         {
            if ( operation->_user_data != loop )
            {
               Uint32 state = operation->read_state();
   
               if ( state  & ASYNC_OPSTATE_COMPLETE )
               {
                  service->_pending.remove_no_lock(operation) ;
                  operation->_client_sem.signal();
                  operation = service->_pending.next(0);
                  unlinked = true;
                  continue;
               }
               operation->_user_data = loop;
            }
            operation = service->_pending.next(operation);
         }
         service->_pending.unlock();
         loop++;
         if ( unlinked == true)
         {
            pegasus_yield();
            unlinked = false;
         }
      }
   
      myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
      return(0);
   }
   
   
   void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
   {
      if ( operation != 0 )
      {
         Message *rq = operation->get_request();
         PEGASUS_ASSERT(rq != 0 );
         PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );
         PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);
         _handle_async_request(static_cast<AsyncRequest *>(rq));
      }
   
      return;
   
   }
   
   
   Boolean MessageQueueService::messageOK(const Message *msg)
   {
      if ( msg != 0 )
      {
         Uint32 mask = msg->getMask();
         if ( mask & message_mask::ha_async)
            if ( mask & message_mask::ha_request)
               return true;
      }
      return false;
   }
   
   
 MessageQueueService::MessageQueueService(const char *name, MessageQueueService::MessageQueueService(const char *name,
                                          Uint32 queueID,                                          Uint32 queueID,
                                          Uint32 capabilities,                                          Uint32 capabilities,
                                          Uint32 mask)                                          Uint32 mask)
    : Base(name, true,  queueID),     : Base(name, false,  queueID),
      _capabilities(capabilities),      _capabilities(capabilities),
      _mask(mask),      _mask(mask),
      _die(0)       _die(0),
        _pending(true, 1000),
        _incoming(true, 1000),
        _req_thread(_req_proc, this, false),
        _rpl_thread(_rpl_proc, this, false)
 { {
    _default_op_timeout.tv_sec = 30;    _default_op_timeout.tv_sec = 30;
    _default_op_timeout.tv_usec = 100;    _default_op_timeout.tv_usec = 100;
    _meta_dispatcher = static_cast<cimom *>(Base::lookup(CIMOM_Q_ID));    _meta_dispatcher = static_cast<cimom *>(Base::lookup(CIMOM_Q_ID));
    if(_meta_dispatcher == 0 )    if(_meta_dispatcher == 0 )
       throw NullPointer();       throw NullPointer();
      _req_thread.run();
      _rpl_thread.run();
   
 } }
  
   
 MessageQueueService::~MessageQueueService(void) MessageQueueService::~MessageQueueService(void)
 { {
    _die = 1;    _die = 1;
      _pending.shutdown_queue();
      _incoming.shutdown_queue();
      _req_thread.join();
      _rpl_thread.join();
 } }
  
   
 AtomicInt MessageQueueService::_xid(1); AtomicInt MessageQueueService::_xid(1);
  
 // mutex is UNLOCKED  
 void MessageQueueService::handleEnqueue(void)  Boolean MessageQueueService::accept_async(AsyncOpNode *op) throw(IPCException)
 { {
    Message *msg = dequeue();     // incoming async request message
    if( msg )  
      PEGASUS_ASSERT(op != 0 );
   
      Message *rq = op->get_request();
      if ( true == messageOK(rq) )
    {    {
       if(msg->getMask() & message_mask::ha_async)        try
       {       {
          _handle_async_msg(static_cast<AsyncMessage *>(msg));           _incoming.insert_first_wait(op);
       }       }
       else        catch ( IPCException& e)
          delete msg;        {
            return false ;
         }
         return true;
    }    }
      return false;
 } }
  
  
 void MessageQueueService::_enqueueAsyncResponse(AsyncRequest *request,  void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
                                                 AsyncReply *reply,                                                 AsyncReply *reply,
                                                 Uint32 state,                                                 Uint32 state,
                                                 Uint32 flag)                                                 Uint32 flag)
Line 88 
Line 204 
    op->_flags |= flag;    op->_flags |= flag;
    gettimeofday(&(op->_updated), NULL);    gettimeofday(&(op->_updated), NULL);
    op->unlock();    op->unlock();
    op->_client_sem.signal();  
   
 } }
  
 // may be overriden by derived classes  void MessageQueueService::handleEnqueue(void)
 void MessageQueueService::_handle_async_msg(AsyncMessage *msg)  
 { {
    if( msg == 0 )     Message *msg = dequeue();
       return;     if( msg )
   
    Uint32 mask = msg->getMask();  
    Uint32 type = msg->getType();  
   
    if (mask & message_mask::ha_async)  
    {    {
       if (mask & message_mask::ha_request)  
          _handle_async_request(static_cast<AsyncRequest *>(msg));  
       else  
          _handle_async_reply(static_cast<AsyncReply *>(msg));  
    }  
    else  
       delete msg;       delete msg;
 } }
   }
  
 void MessageQueueService::_handle_async_request(AsyncRequest *req) void MessageQueueService::_handle_async_request(AsyncRequest *req)
 { {
      if ( req != 0 )
      {
    req->op->processing();    req->op->processing();
  
    Uint32 type = req->getType();    Uint32 type = req->getType();
Line 136 
Line 241 
       // we don't handle this request message       // we don't handle this request message
       _make_response(req, async_results::CIM_NAK );       _make_response(req, async_results::CIM_NAK );
    }    }
      }
 } }
  
 void MessageQueueService::_handle_async_reply(AsyncReply *rep) void MessageQueueService::_handle_async_reply(AsyncReply *rep)
Line 170 
Line 275 
                      code,                      code,
                      req->resp,                      req->resp,
                      false);                      false);
    _enqueueAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );     _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
 } }
  
  
Line 187 
Line 292 
                      async_results::OK,                      async_results::OK,
                      req->resp,                      req->resp,
                      false);                      false);
    _enqueueAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );     _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
   
 } }
  
  
Line 231 
Line 335 
  
 AsyncOpNode *MessageQueueService::get_op(void) AsyncOpNode *MessageQueueService::get_op(void)
 { {
    AsyncOpNode *op = _meta_dispatcher->get_cached_op();     AsyncOpNode *op = new AsyncOpNode();
    if(op == 0 )  
       throw NullPointer();  
  
    op->write_state(ASYNC_OPSTATE_UNKNOWN);    op->write_state(ASYNC_OPSTATE_UNKNOWN);
    op->write_flags(ASYNC_OPFLAGS_SINGLE |     op->write_flags(ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL );
                    ASYNC_OPFLAGS_NORMAL |  
                    ASYNC_OPFLAGS_META_DISPATCHER);  
    return op;    return op;
 } }
  
 void MessageQueueService::return_op(AsyncOpNode *op) void MessageQueueService::return_op(AsyncOpNode *op)
 { {
    PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );    PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
   
    if(op->read_state() & ASYNC_OPFLAGS_META_DISPATCHER )  
    {  
       _meta_dispatcher->cache_op(op);  
    }  
    else  
       delete op;       delete op;
 } }
  
 AsyncMessage *MessageQueueService::SendWait(AsyncRequest *request)  
 {  
    if (request == 0 )  
       throw NullPointer();  
    AsyncMessage *ret_msg = 0;  
  
    AsyncOpNode *op = request->op;  
  
    if(op == 0 )  AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
       return 0;  
    //     ATTN: debugging  
    op->put_response(0);  
    if(true == _meta_dispatcher->accept_async(static_cast<Message *>(request)))  
    {    {
       op->_client_sem.wait();     if ( request == 0 )
       ret_msg = static_cast<AsyncMessage *>(op->_response);        return 0 ;
       op->_response = 0;     PEGASUS_ASSERT( request->op != 0 );
   
   
 //      op->lock();  
 //       while( op->_response.count() )  
 //       {  
 //       AsyncMessage *rply = static_cast<AsyncMessage *>(op->_response.remove_last());  
 //       if (rply != 0 )  
 //       {  
 //          rply->op = 0;  
 //      reply_list->insert_first( static_cast<AsyncMessage *>(op->_response) );  
 //       }  
 //       }  
       // release the opnode, the meta-dispatcher will recycle it for us  
 //      op->_state |= ASYNC_OPSTATE_RELEASED ;  
 //      op->unlock();  
    }  
    return ret_msg;  
   
 }  
  
      request->block = true;
      request->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
      request->op->put_response(0);
  
 void MessageQueueService::SendWait(AsyncRequest *request, unlocked_dq<AsyncMessage> *reply_list)     // first link it on our pending list
 {     _pending.insert_last_wait(request->op);
    if (request == 0 || reply_list == 0 )  
       throw NullPointer();  
  
    AsyncOpNode *op = request->op;     // now see if the meta dispatcher will take it
  
    if(op == 0 )     if (true == _meta_dispatcher->route_async(request->op))
       return;  
    //     ATTN: debugging  
    op->put_response(0);  
    if(true == _meta_dispatcher->accept_async(static_cast<Message *>(request)))  
    {    {
         request->op->_client_sem.wait();
      }
  
      return static_cast<AsyncReply *>(request->op->get_response());
  
       op->_client_sem.wait();  
       op->lock();  
   
       AsyncMessage *response = static_cast<AsyncMessage *>(op->_response);  
       reply_list->insert_first(response);  
       op->unlock();  
   
       op->release();  
   
 //      op->lock();  
 //       while( op->_response.count() )  
 //       {  
 //       AsyncMessage *rply = static_cast<AsyncMessage *>(op->_response.remove_last());  
 //       if (rply != 0 )  
 //       {  
 //          rply->op = 0;  
 //      reply_list->insert_first( static_cast<AsyncMessage *>(op->_response) );  
 //       }  
 //       }  
       // release the opnode, the meta-dispatcher will recycle it for us  
 //      op->_state |= ASYNC_OPSTATE_RELEASED ;  
 //      op->unlock();  
    }    }
    else  
    {  
       // manually free the opnode and message  
       op->release();  
       return_op(op);  
    }  
 }  
   
 // Boolean MessageQueueService::SendAsync(AsyncMessage *msg)  
 // {  
 //    return _meta_dispatcher->accept_async(static_cast<Message *>(msg));  
 // }  
  
  
 Boolean MessageQueueService::register_service(String name, Boolean MessageQueueService::register_service(String name,


Legend:
Removed from v.1.3  
changed lines
  Added in v.1.4

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2