(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.32 and 1.34

version 1.32, 2002/03/11 02:36:18 version 1.34, 2002/03/12 22:59:35
Line 47 
Line 47 
      _mask(mask),      _mask(mask),
      _die(0),      _die(0),
      _pending(true),      _pending(true),
      _incoming(true, 100 ),       _incoming(true, 1000),
      _incoming_queue_shutdown(0),      _incoming_queue_shutdown(0),
      _req_thread(_req_proc, this, false)      _req_thread(_req_proc, this, false)
 { {
Line 91 
Line 91 
    if (_incoming_queue_shutdown.value() == 0 )    if (_incoming_queue_shutdown.value() == 0 )
    {    {
       _shutdown_incoming_queue();       _shutdown_incoming_queue();
   
  //_incoming.shutdown_queue();  
        _req_thread.join();        _req_thread.join();
    }    }
  
Line 185 
Line 183 
       }       }
       if( operation )       if( operation )
       {       {
            operation->_thread_ptr = myself;
          service->_handle_incoming_operation(operation, myself, service);           operation->_service_ptr = service;
            service->_handle_incoming_operation(operation);
       }       }
    }    }
  
Line 194 
Line 193 
    return(0);    return(0);
 } }
  
   void MessageQueueService::_sendwait_callback(AsyncOpNode *op,
                                                MessageQueue *q,
                                                void *parm)
   {
      op->_client_sem.signal();
   }
   
  
 // callback function is responsible for cleaning up all resources // callback function is responsible for cleaning up all resources
 // including op, op->_callback_node, and op->_callback_ptr // including op, op->_callback_node, and op->_callback_ptr
Line 206 
Line 212 
 } }
  
  
 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation,  void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
                                                      Thread *thread,  //                                                   Thread *thread,
                                                      MessageQueue *queue)  //                                                   MessageQueue *queue)
 { {
    if ( operation != 0 )    if ( operation != 0 )
    {    {
Line 247 
Line 253 
          // ATTN: optimization          // ATTN: optimization
          // << Wed Mar  6 15:00:39 2002 mdd >>          // << Wed Mar  6 15:00:39 2002 mdd >>
          // put thread and queue into the asyncopnode structure.          // put thread and queue into the asyncopnode structure.
          (static_cast<AsyncMessage *>(rq))->_myself = thread;           //  (static_cast<AsyncMessage *>(rq))->_myself = operation->_thread_ptr;
          (static_cast<AsyncMessage *>(rq))->_service = queue;           //   (static_cast<AsyncMessage *>(rq))->_service = operation->_service_ptr;
            // done << Tue Mar 12 14:49:07 2002 mdd >>
          operation->unlock();          operation->unlock();
          _handle_async_request(static_cast<AsyncRequest *>(rq));          _handle_async_request(static_cast<AsyncRequest *>(rq));
       }       }
Line 443 
Line 450 
       case AsyncIoctl::IO_CLOSE:       case AsyncIoctl::IO_CLOSE:
       {       {
          // save my bearings          // save my bearings
          Thread *myself = req->_myself;           Thread *myself = req->op->_thread_ptr;
          MessageQueueService *service = static_cast<MessageQueueService *>(req->_service);           MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
  
          // respond to this message.          // respond to this message.
          _make_response(req, async_results::OK);          _make_response(req, async_results::OK);
Line 470 
Line 477 
             }             }
             if( operation )             if( operation )
             {             {
                service->_handle_incoming_operation(operation, myself, service);                 operation->_thread_ptr = myself;
                  operation->_service_ptr = service;
                  service->_handle_incoming_operation(operation);
             }             }
             else             else
                break;                break;
Line 678 
Line 687 
       destroy_op = true;       destroy_op = true;
    }    }
  
    request->block = true;     request->block = false;
    request->op->lock();  
    request->op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
    request->op->_flags &= ~ASYNC_OPFLAGS_CALLBACK;  
  
    request->op->_op_dest = MessageQueue::lookup(request->dest);     SendAsync(request->op,
    request->op->unlock();               request->dest,
                _sendwait_callback,
                this,
                (void *)0);
  
    if ( request->op->_op_dest == 0 )  
       return 0;  
   
    // now see if the meta dispatcher will take it  
    if (true == _meta_dispatcher->route_async(request->op))  
    {  
       request->op->_client_sem.wait();       request->op->_client_sem.wait();
       PEGASUS_ASSERT(request->op->_state & ASYNC_OPSTATE_COMPLETE);  
    }  
   
    request->op->lock();    request->op->lock();
    AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());    AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
    rpl->op = 0;    rpl->op = 0;


Legend:
Removed from v.1.32  
changed lines
  Added in v.1.34

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2