(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.4 and 1.5

version 1.4, 2002/01/21 02:01:22 version 1.5, 2002/01/21 21:20:35
Line 30 
Line 30 
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
   
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)  
 {  
    Thread *myself = reinterpret_cast<Thread *>(parm);  
    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());  
   
    // pull messages off the incoming queue and dispatch them. then  
    // check pending messages that are non-blocking  
   
    while ( service->_die.value() < 1 )  
    {  
   
       AsyncOpNode *operation = service->_incoming.remove_first_wait();  
       service->_handle_incoming_operation(operation);  
    }  
   
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );  
    return(0);  
 }  
   
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_rpl_proc(void * parm)  
 {  
    Thread *myself = reinterpret_cast<Thread *>(parm);  
    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());  
   
    // pull messages off the _pending queue and complete them.  
    // this loop is not optimized because every time we complete an operation  
    // we start again to traverse the loop at the beginning.  
    Uint32 loop = 1;  
    Boolean unlinked = false;  
   
   
    while ( service->_die.value() < 1 )  
    {  
       service->_pending.wait_for_node();  
       AsyncOpNode *operation = service->_pending.next(0);  
 //      operation = service->_pending.remove_no_lock(operation);  
       while( operation != 0 )  
       {  
          if ( operation->_user_data != loop )  
          {  
             Uint32 state = operation->read_state();  
   
             if ( state  & ASYNC_OPSTATE_COMPLETE )  
             {  
                service->_pending.remove_no_lock(operation) ;  
                operation->_client_sem.signal();  
                operation = service->_pending.next(0);  
                unlinked = true;  
                continue;  
             }  
             operation->_user_data = loop;  
          }  
          operation = service->_pending.next(operation);  
       }  
       service->_pending.unlock();  
       loop++;  
       if ( unlinked == true)  
       {  
          pegasus_yield();  
          unlinked = false;  
       }  
    }  
   
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );  
    return(0);  
 }  
   
   
 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)  
 {  
    if ( operation != 0 )  
    {  
       Message *rq = operation->get_request();  
       PEGASUS_ASSERT(rq != 0 );  
       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );  
       PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);  
       _handle_async_request(static_cast<AsyncRequest *>(rq));  
    }  
   
    return;  
   
 }  
   
   
 Boolean MessageQueueService::messageOK(const Message *msg)  
 {  
    if ( msg != 0 )  
    {  
       Uint32 mask = msg->getMask();  
       if ( mask & message_mask::ha_async)  
          if ( mask & message_mask::ha_request)  
             return true;  
    }  
    return false;  
 }  
   
   
 MessageQueueService::MessageQueueService(const char *name, MessageQueueService::MessageQueueService(const char *name,
                                          Uint32 queueID,                                          Uint32 queueID,
                                          Uint32 capabilities,                                          Uint32 capabilities,
Line 136 
Line 38 
      _capabilities(capabilities),      _capabilities(capabilities),
      _mask(mask),      _mask(mask),
      _die(0),      _die(0),
      _pending(true, 1000),       _pending(true),
      _incoming(true, 1000),       _incoming(true),
      _req_thread(_req_proc, this, false),       _req_thread(_req_proc, this, false)
      _rpl_thread(_rpl_proc, this, false)  
 { {
    _default_op_timeout.tv_sec = 30;    _default_op_timeout.tv_sec = 30;
    _default_op_timeout.tv_usec = 100;    _default_op_timeout.tv_usec = 100;
Line 147 
Line 48 
    if(_meta_dispatcher == 0 )    if(_meta_dispatcher == 0 )
       throw NullPointer();       throw NullPointer();
    _req_thread.run();    _req_thread.run();
    _rpl_thread.run();  
  
 } }
  
Line 155 
Line 55 
 MessageQueueService::~MessageQueueService(void) MessageQueueService::~MessageQueueService(void)
 { {
    _die = 1;    _die = 1;
    _pending.shutdown_queue();  
    _incoming.shutdown_queue();  
    _req_thread.join();    _req_thread.join();
    _rpl_thread.join();  
 } }
  
 AtomicInt MessageQueueService::_xid(1); AtomicInt MessageQueueService::_xid(1);
  
  
 Boolean MessageQueueService::accept_async(AsyncOpNode *op) throw(IPCException)  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 { {
    // incoming async request message     Thread *myself = reinterpret_cast<Thread *>(parm);
      MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
  
    PEGASUS_ASSERT(op != 0 );     // pull messages off the incoming queue and dispatch them. then
      // check pending messages that are non-blocking
  
    Message *rq = op->get_request();     while ( service->_die.value() == 0  ||
    if ( true == messageOK(rq) )             service->_incoming.count() > 0  )
    {    {
       try        if ( service->_incoming.count() == 0 )
       {       {
          _incoming.insert_first_wait(op);           pegasus_yield();
       }           continue;
       catch ( IPCException& e)  
       {  
          return false ;  
       }  
       return true;  
    }  
    return false;  
 }  
   
  
 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,        }
                                                 AsyncReply *reply,  
                                                 Uint32 state,  
                                                 Uint32 flag)  
 {  
    PEGASUS_ASSERT(request != 0  && reply != 0 );  
  
    AsyncOpNode *op = request->op;        AsyncOpNode *operation = service->_incoming.remove_first();
    op->lock();        service->_handle_incoming_operation(operation);
 //   if (false == op->_response.exists(reply))     }
 //      op->_response.insert_last(reply);  
    op->_response = reply;  
  
    op->_state |= state ;     myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
    op->_flags |= flag;     return(0);
    gettimeofday(&(op->_updated), NULL);  
    op->unlock();  
 } }
  
 void MessageQueueService::handleEnqueue(void)  
   void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
 { {
    Message *msg = dequeue();     if ( operation != 0 )
    if( msg )  
    {    {
       delete msg;        Message *rq = operation->get_request();
         PEGASUS_ASSERT(rq != 0 );
         PEGASUS_ASSERT(rq->getMask() & message_mask::ha_async );
         PEGASUS_ASSERT(rq->getMask() & message_mask::ha_request);
         _handle_async_request(static_cast<AsyncRequest *>(rq));
    }    }
   
      return;
   
 } }
  
 void MessageQueueService::_handle_async_request(AsyncRequest *req) void MessageQueueService::_handle_async_request(AsyncRequest *req)
Line 244 
Line 134 
    }    }
 } }
  
 void MessageQueueService::_handle_async_reply(AsyncReply *rep)  
 {  
   
    if (rep->op != 0 )  
       rep->op->processing();  
   
    Uint32 type = rep->getType();  
   
    if ( type == async_messages::ASYNC_OP_RESULT )  
       handle_AsyncOperationResult(static_cast<AsyncOperationResult *>(rep));  
    else  
    {  
       // we don't handle this reply  
       ;  
    }  
   
    if( rep->op != 0 )  
       rep->op->release();  
 }  
   
 void MessageQueueService::_make_response(AsyncRequest *req, Uint32 code) void MessageQueueService::_make_response(AsyncRequest *req, Uint32 code)
 { {
    AsyncReply *reply =    AsyncReply *reply =
Line 279 
Line 149 
 } }
  
  
   void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
                                                   AsyncReply *reply,
                                                   Uint32 state,
                                                   Uint32 flag)
   {
      PEGASUS_ASSERT(request != 0  && reply != 0 );
   
      AsyncOpNode *op = request->op;
      op->lock();
      op->_response = reply;
   
      op->_state |= state ;
      op->_flags |= flag;
      gettimeofday(&(op->_updated), NULL);
      op->unlock();
      op->_client_sem.signal();
   
   }
   
   
   
   Boolean MessageQueueService::accept_async(AsyncOpNode *op)
   {
      Message *rq = op->get_request();
      if( true == messageOK(rq) &&  _die.value() == 0  )
      {
         _incoming.insert_last(op);
         return true;
      }
      return false;
   }
   
   Boolean MessageQueueService::messageOK(const Message *msg)
   {
      if ( msg != 0 )
      {
         Uint32 mask = msg->getMask();
         if ( mask & message_mask::ha_async)
            if ( mask & message_mask::ha_request)
               return true;
      }
      return false;
   }
   
   
   void MessageQueueService::handleEnqueue(void)
   {
      Message *msg = dequeue();
      if( msg )
      {
         delete msg;
      }
   }
   
 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req) void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 { {
    // default action is to echo a heartbeat response    // default action is to echo a heartbeat response
Line 355 
Line 279 
 { {
    if ( request == 0 )    if ( request == 0 )
       return 0 ;       return 0 ;
    PEGASUS_ASSERT( request->op != 0 );  
      Boolean destroy_op = false;
   
      if (request->op == false)
      {
         request->op = get_op();
         request->op->put_request(request);
   
         destroy_op = true;
      }
  
    request->block = true;    request->block = true;
    request->op->_state &= ~ASYNC_OPSTATE_COMPLETE;    request->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
    request->op->put_response(0);    request->op->put_response(0);
  
    // first link it on our pending list    // first link it on our pending list
    _pending.insert_last_wait(request->op);     // _pending.insert_last_wait(request->op);
  
    // now see if the meta dispatcher will take it    // now see if the meta dispatcher will take it
  
Line 371 
Line 304 
       request->op->_client_sem.wait();       request->op->_client_sem.wait();
    }    }
  
    return static_cast<AsyncReply *>(request->op->get_response());     AsyncReply * rpl = static_cast<AsyncReply *>(request->op->get_response());
      if( destroy_op == true)
      {
         request->op->release();
         delete request->op;
         request->op = 0;
      }
  
      return rpl;
 } }
  
  
Line 381 
Line 321 
                                               Uint32 mask)                                               Uint32 mask)
  
 { {
    AsyncOpNode *op = _meta_dispatcher->get_cached_op();  
   
    op->_state |= ASYNC_OPSTATE_UNKNOWN;  
    op->_flags |= ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;  
   
    RegisterCimService *msg = new RegisterCimService(get_next_xid(),    RegisterCimService *msg = new RegisterCimService(get_next_xid(),
                                                     op,                                                      0,
                                                     true,                                                     true,
                                                     name,                                                     name,
                                                     capabilities,                                                     capabilities,
Line 404 
Line 339 
          {          {
             if((static_cast<AsyncReply *>(reply))->result == async_results::OK)             if((static_cast<AsyncReply *>(reply))->result == async_results::OK)
                registered = true;                registered = true;
   
          }          }
       }       }
  
       delete reply;       delete reply;
    }    }
      delete msg;
    return registered;    return registered;
 } }
  
 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask) Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
 { {
  
    AsyncOpNode *op = _meta_dispatcher->get_cached_op();  
    op->_state |= ASYNC_OPSTATE_UNKNOWN;  
    op->_flags |= ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;  
  
    UpdateCimService *msg = new UpdateCimService(get_next_xid(),    UpdateCimService *msg = new UpdateCimService(get_next_xid(),
                                                 op,                                                  0,
                                                 true,                                                 true,
                                                 _queueId,                                                 _queueId,
                                                 _capabilities,                                                 _capabilities,
                                                 _mask);                                                 _mask);
    Boolean registered = false;    Boolean registered = false;
  
   
    AsyncMessage *reply = SendWait(msg);    AsyncMessage *reply = SendWait(msg);
    if (reply)    if (reply)
    {    {
Line 442 
Line 373 
       }       }
       delete reply;       delete reply;
    }    }
      delete msg;
    return registered;    return registered;
 } }
  
Line 449 
Line 381 
 Boolean MessageQueueService::deregister_service(void) Boolean MessageQueueService::deregister_service(void)
 { {
  
 //   _meta_dispatcher->deregister_module(_queueId);     _meta_dispatcher->deregister_module(_queueId);
 //   return true;     return true;
   
   
    AsyncOpNode *op = _meta_dispatcher->get_cached_op();  
    op->_state |= ASYNC_OPSTATE_UNKNOWN;  
    op->_flags |= ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;  
   
    DeRegisterCimService *msg = new DeRegisterCimService(get_next_xid(),  
                                                         op,  
                                                         true,  
                                                         _queueId);  
    Boolean deregistered = false;  
   
   
   
    return _meta_dispatcher->accept_async(static_cast<Message *>(msg));  
 } }
  
  
Line 478 
Line 395 
    if( results == 0 )    if( results == 0 )
       throw NullPointer();       throw NullPointer();
  
    AsyncOpNode *op = get_op();  
    results->clear();    results->clear();
  
    FindServiceQueue *req =    FindServiceQueue *req =
       new FindServiceQueue(get_next_xid(),       new FindServiceQueue(get_next_xid(),
                            op,                             0,
                            _queueId,                            _queueId,
                            true,                            true,
                            name,                            name,
Line 506 
Line 422 
       }       }
       delete reply;       delete reply;
    }    }
      delete req;
    return ;    return ;
 } }
  
Line 514 
Line 431 
    if(result == 0)    if(result == 0)
       throw NullPointer();       throw NullPointer();
  
    AsyncOpNode *op = get_op();  
   
    EnumerateService *req    EnumerateService *req
       = new EnumerateService(get_next_xid(),       = new EnumerateService(get_next_xid(),
                              op,                               0,
                              _queueId,                              _queueId,
                              true,                              true,
                              queue);                              queue);
Line 552 
Line 467 
       }       }
       delete reply;       delete reply;
    }    }
      delete req;
   
    return;    return;
 } }
  


Legend:
Removed from v.1.4  
changed lines
  Added in v.1.5

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2