(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.6 and 1.7

version 1.6, 2002/02/02 01:03:31 version 1.7, 2002/02/02 17:58:13
Line 40 
Line 40 
      _die(0),      _die(0),
      _pending(true),      _pending(true),
      _incoming(true, 1000),      _incoming(true, 1000),
        _incoming_queue_shutdown(0),
      _req_thread(_req_proc, this, false)      _req_thread(_req_proc, this, false)
 { {
    _default_op_timeout.tv_sec = 30;    _default_op_timeout.tv_sec = 30;
Line 55 
Line 56 
 MessageQueueService::~MessageQueueService(void) MessageQueueService::~MessageQueueService(void)
 { {
    _die = 1;    _die = 1;
      if (_incoming_queue_shutdown.value() == 0 )
    _incoming.shutdown_queue();    _incoming.shutdown_queue();
  
    _req_thread.join();    _req_thread.join();
Line 63 
Line 65 
  
 AtomicInt MessageQueueService::_xid(1); AtomicInt MessageQueueService::_xid(1);
  
   void MessageQueueService::_shutdown_incoming_queue(void)
   {
      _incoming_queue_shutdown = 1;
   
      _incoming.shutdown_queue();
      _req_thread.cancel();
   }
   
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 { {
Line 71 
Line 81 
  
    // pull messages off the incoming queue and dispatch them. then    // pull messages off the incoming queue and dispatch them. then
    // check pending messages that are non-blocking    // check pending messages that are non-blocking
      AsyncOpNode *operation = 0;
  
    while ( service->_die.value() == 0 )    while ( service->_die.value() == 0 )
    {    {
       AsyncOpNode *operation = service->_incoming.remove_first_wait();        try
       if ( operation == 0 )        {
            operation = service->_incoming.remove_first();
         }
         catch(ListClosed & )
         {
          break;          break;
         }
         if ( service->_incoming.is_shutdown() || service->_die.value() )
            break;
         if( operation )
       service->_handle_incoming_operation(operation);       service->_handle_incoming_operation(operation);
         else
            pegasus_yield();
    }    }
  
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
Line 160 
Line 180 
    op->_state |= state ;    op->_state |= state ;
    op->_flags |= flag;    op->_flags |= flag;
    gettimeofday(&(op->_updated), NULL);    gettimeofday(&(op->_updated), NULL);
    if ( false == op->_request.exists(reinterpret_cast<void *>(reply)) )     if ( false == op->_response.exists(reinterpret_cast<void *>(reply)) )
       op->_request.insert_last(reply);        op->_response.insert_last(reply);
    op->unlock();    op->unlock();
  
    op->_client_sem.signal();    op->_client_sem.signal();
Line 289 
Line 309 
    if (request->op == false)    if (request->op == false)
    {    {
       request->op = get_op();       request->op = get_op();
       request->op->put_request(request);        request->op->_request.insert_first(request);
   
       destroy_op = true;       destroy_op = true;
    }    }
  
Line 323 
Line 342 
       request->op->unlock();       request->op->unlock();
  
       return_op(request->op);       return_op(request->op);
         request->op = 0;
 //      delete request->op;  
 //      request->op = 0;  
    }    }
  
    return rpl;    return rpl;
Line 345 
Line 362 
                                                     mask,                                                     mask,
                                                     _queueId);                                                     _queueId);
    Boolean registered = false;    Boolean registered = false;
    AsyncMessage *reply = SendWait( msg );     AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
  
    if ( reply != 0 )    if ( reply != 0 )
    {    {
Line 353 
Line 370 
       {       {
          if(reply->getMask() & message_mask::ha_reply)          if(reply->getMask() & message_mask::ha_reply)
          {          {
             if((static_cast<AsyncReply *>(reply))->result == async_results::OK)              if(reply->result == async_results::OK)
                registered = true;                registered = true;
          }          }
       }       }


Legend:
Removed from v.1.6  
changed lines
  Added in v.1.7

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2