(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.107 and 1.112

version 1.107, 2005/05/27 02:34:02 version 1.112, 2005/06/14 17:10:37
Line 35 
Line 35 
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
   // #include <iostream.h>
 #include "MessageQueueService.h" #include "MessageQueueService.h"
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
 #include <Pegasus/Common/MessageLoader.h> //l10n #include <Pegasus/Common/MessageLoader.h> //l10n
Line 58 
Line 59 
 { {
    return _thread_pool;    return _thread_pool;
 } }
   //
   // MAX_THREADS_PER_SVC_QUEUE_LIMIT
   //
   // 5000 is seriously too high a number for the limit but since
   // previously there was no limit at all this is intended to approximate
   // that behavior. In my testing on a unit processor system the system
   // behaved best with a low number 2 to 5 for the MAX_THREADS_PER_SVC_QUEUE.
   // When set to 1000 the system deadlocked with indications that were
   // not delivered and apparently left sitting within the server in a queue.
   //
   // JR Wunderlich Jun 6, 2005
   //
   
   #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
   
   Uint32 max_threads_per_svc_queue;
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL
 MessageQueueService::kill_idle_threads(void *parm) MessageQueueService::kill_idle_threads(void *parm)
Line 103 
Line 120 
       }       }
  
       list->lock();       list->lock();
         int list_index = 0;
       MessageQueueService *service = list->next(0);       MessageQueueService *service = list->next(0);
       while(service != NULL)       while(service != NULL)
       {       {
          if (service->_incoming.count() > 0 && service->_die.value() == 0)            int rtn;
             rtn = true;
             if (service->_incoming.count() > 0
                 && service->_die.value() == 0
                 && service->_threads <= max_threads_per_svc_queue)
               rtn = _thread_pool->allocate_and_awaken(service, _req_proc,
                                                           &_polling_sem);
   
             // if no more threads available, break from processing loop
             if (rtn == false)
          {          {
             _thread_pool->allocate_and_awaken(service, _req_proc);                service = NULL;
          }          }
             else
               {
          service = list->next(service);          service = list->next(service);
       }       }
           }
       list->unlock();       list->unlock();
   
       if (_check_idle_flag.value() != 0)       if (_check_idle_flag.value() != 0)
       {       {
          _check_idle_flag = 0;          _check_idle_flag = 0;
            // try to do idle thread clean up processing when system is not busy
            // if system is busy there may not be a thread available to allocate
            // so nothing will be done and that is OK.
   
            _thread_pool->allocate_and_awaken(service, kill_idle_threads, &_polling_sem);
  
          // If there are insufficent resources to run  
          // kill_idle_threads, then just return.  
          _thread_pool->allocate_and_awaken(service, kill_idle_threads);  
       }       }
    }    }
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
Line 144 
Line 177 
      _incoming(true, 0),      _incoming(true, 0),
      _incoming_queue_shutdown(0)      _incoming_queue_shutdown(0)
 { {
   
    _capabilities = (capabilities | module_capabilities::async);    _capabilities = (capabilities | module_capabilities::async);
  
    _default_op_timeout.tv_sec = 30;    _default_op_timeout.tv_sec = 30;
    _default_op_timeout.tv_usec = 100;    _default_op_timeout.tv_usec = 100;
  
      max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
   
      // if requested threads gt MAX_THREADS_PER_SVC_QUEUE_LIMIT
      // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT
   
      if (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT)
        {
          max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
        }
   
      // if requested threads eq 0 (unlimited)
      // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT
   
      if (max_threads_per_svc_queue == 0)
        {
          max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
        }
   
      // cout << "MAX_THREADS_PER_SVC_QUEUE = " << MAX_THREADS_PER_SVC_QUEUE << endl;
      // cout << "max_threads_per_svc_queue set to = " << max_threads_per_svc_queue << endl;
   
   
    AutoMutex autoMut(_meta_dispatcher_mutex);    AutoMutex autoMut(_meta_dispatcher_mutex);
  
    if (_meta_dispatcher == 0)    if (_meta_dispatcher == 0)
Line 160 
Line 216 
       {       {
          throw NullPointer();          throw NullPointer();
       }       }
         //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
         //   minimum_cnt, maximum_cnt, deallocateWait);
         //
       _thread_pool =       _thread_pool =
           new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);           new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
    }    }
Line 219 
Line 278 
    // Clean up in case there are extra stuff on the queue.    // Clean up in case there are extra stuff on the queue.
   while (_incoming.count())   while (_incoming.count())
   {   {
       try {
     delete _incoming.remove_first();     delete _incoming.remove_first();
       } catch (const ListClosed &e)
       {
         // If the list is closed, there is nothing we can do.
         break;
       }
   }   }
 } }
  
Line 246 
Line 311 
  
    msg->op->_op_dest = this;    msg->op->_op_dest = this;
    msg->op->_request.insert_first(msg);    msg->op->_request.insert_first(msg);
      try {
    _incoming.insert_last_wait(msg->op);    _incoming.insert_last_wait(msg->op);
    _polling_sem.signal();    _polling_sem.signal();
      } catch (const ListClosed &)
      {
           // This means the queue has already been shut-down (happens  when there
       // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
       // processed.
        delete msg;
      }
      catch (const Permission &)
      {
        delete msg;
      }
 } }
  
  


Legend:
Removed from v.1.107  
changed lines
  Added in v.1.112

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2