(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.117 and 1.118

version 1.117, 2005/10/14 17:40:18 version 1.118, 2005/10/24 19:35:25
Line 44 
Line 44 
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 cimom *MessageQueueService::_meta_dispatcher = 0; cimom *MessageQueueService::_meta_dispatcher = 0;
 AtomicInt MessageQueueService::_service_count = 0;  AtomicInt MessageQueueService::_service_count(0);
 AtomicInt MessageQueueService::_xid(1); AtomicInt MessageQueueService::_xid(1);
 Mutex MessageQueueService::_meta_dispatcher_mutex; Mutex MessageQueueService::_meta_dispatcher_mutex;
  
Line 110 
Line 110 
 { {
    Thread *myself = reinterpret_cast<Thread *>(parm);    Thread *myself = reinterpret_cast<Thread *>(parm);
    DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());    DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
    while (_stop_polling.value()  == 0)     while (_stop_polling.get()  == 0)
    {    {
       _polling_sem.wait();       _polling_sem.wait();
  
       if (_stop_polling.value() != 0)        if (_stop_polling.get() != 0)
       {       {
          break;          break;
       }       }
Line 134 
Line 134 
       while (service != NULL)       while (service != NULL)
       {       {
           if ((service->_incoming.count() > 0) &&           if ((service->_incoming.count() > 0) &&
               (service->_die.value() == 0) &&                (service->_die.get() == 0) &&
               (service->_threads < max_threads_per_svc_queue))                (service->_threads.get() < max_threads_per_svc_queue))
           {           {
              // The _threads count is used to track the              // The _threads count is used to track the
              // number of active threads that have been allocated              // number of active threads that have been allocated
Line 171 
Line 171 
                     "Skipping the service for right now. ",                     "Skipping the service for right now. ",
                     service->getQueueName(),                     service->getQueueName(),
                     service->_incoming.count(),                     service->_incoming.count(),
                     service->_threads.value());                      service->_threads.get());
  
                  pegasus_yield();                  pegasus_yield();
                  service = NULL;                  service = NULL;
Line 184 
Line 184 
       }       }
       list->unlock();       list->unlock();
  
       if (_check_idle_flag.value() != 0)        if (_check_idle_flag.get() != 0)
       {       {
          _check_idle_flag = 0;          _check_idle_flag = 0;
          // try to do idle thread clean up processing when system is not busy          // try to do idle thread clean up processing when system is not busy
Line 251 
Line 251 
    if (_meta_dispatcher == 0)    if (_meta_dispatcher == 0)
    {    {
       _stop_polling = 0;       _stop_polling = 0;
       PEGASUS_ASSERT(_service_count.value() == 0);        PEGASUS_ASSERT(_service_count.get() == 0);
       _meta_dispatcher = new cimom();       _meta_dispatcher = new cimom();
       if (_meta_dispatcher == NULL)       if (_meta_dispatcher == NULL)
       {       {
Line 297 
Line 297 
    // execution of the following code is very timing    // execution of the following code is very timing
    // dependent. This needs to be fix.    // dependent. This needs to be fix.
    // See Bug 4079 for details.    // See Bug 4079 for details.
    if (_incoming_queue_shutdown.value() == 0)     if (_incoming_queue_shutdown.get() == 0)
    {    {
        _shutdown_incoming_queue();        _shutdown_incoming_queue();
    }    }
Line 305 
Line 305 
    // Wait until all threads processing the messages    // Wait until all threads processing the messages
    // for this service have completed.    // for this service have completed.
  
    while (_threads.value() > 0)     while (_threads.get() > 0)
    {    {
       pegasus_yield();       pegasus_yield();
    }    }
Line 313 
Line 313 
    {    {
      AutoMutex autoMut(_meta_dispatcher_mutex);      AutoMutex autoMut(_meta_dispatcher_mutex);
      _service_count--;      _service_count--;
      if (_service_count.value() == 0)       if (_service_count.get() == 0)
      {      {
  
       _stop_polling++;       _stop_polling++;
Line 346 
Line 346 
  
 void MessageQueueService::_shutdown_incoming_queue() void MessageQueueService::_shutdown_incoming_queue()
 { {
    if (_incoming_queue_shutdown.value() > 0)     if (_incoming_queue_shutdown.get() > 0)
       return;       return;
  
    AsyncIoctl *msg = new AsyncIoctl(    AsyncIoctl *msg = new AsyncIoctl(
Line 404 
Line 404 
     try     try
     {     {
  
         if (service->_die.value() != 0)          if (service->_die.get() != 0)
         {         {
             service->_threads--;             service->_threads--;
             return (0);             return (0);
Line 693 
Line 693 
  
 Boolean MessageQueueService::accept_async(AsyncOpNode *op) Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 { {
    if (_incoming_queue_shutdown.value() > 0)     if (_incoming_queue_shutdown.get() > 0)
       return false;       return false;
    if (_polling_thread == NULL)    if (_polling_thread == NULL)
    {    {
Line 719 
Line 719 
    op->unlock();    op->unlock();
  
    if ((rq != 0 && (true == messageOK(rq))) ||    if ((rq != 0 && (true == messageOK(rq))) ||
        (rp != 0 && (true == messageOK(rp))) && _die.value() == 0)         (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
    {    {
       _incoming.insert_last_wait(op);       _incoming.insert_last_wait(op);
       _polling_sem.signal();       _polling_sem.signal();
Line 730 
Line 730 
  
 Boolean MessageQueueService::messageOK(const Message *msg) Boolean MessageQueueService::messageOK(const Message *msg)
 { {
    if (_incoming_queue_shutdown.value() > 0)     if (_incoming_queue_shutdown.get() > 0)
       return false;       return false;
    return true;    return true;
 } }
Line 775 
Line 775 
          // ensure we do not accept any further messages          // ensure we do not accept any further messages
  
          // ensure we don't recurse on IO_CLOSE          // ensure we don't recurse on IO_CLOSE
          if (_incoming_queue_shutdown.value() > 0)           if (_incoming_queue_shutdown.get() > 0)
             break;             break;
  
          // set the closing flag          // set the closing flag
Line 1262 
Line 1262 
    Uint32 value;    Uint32 value;
    AutoMutex autoMut(_monitor);    AutoMutex autoMut(_monitor);
    _xid++;    _xid++;
    value =  _xid.value();     value =  _xid.get();
    return value;    return value;
  
 } }


Legend:
Removed from v.1.117  
changed lines
  Added in v.1.118

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2