(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.115 and 1.116

version 1.115, 2005/08/10 04:08:02 version 1.116, 2005/08/16 22:46:53
Line 114 
Line 114 
          break;          break;
       }       }
  
         // The polling_routine thread must hold the lock on the
         // _polling_thread list while processing incoming messages.
         // This lock is used to give this thread ownership of
         // services on the _polling_routine list.
   
         // This is necessary to avoid confict with other threads
         // processing the _polling_list
         // (e.g., MessageQueueServer::~MessageQueueService).
   
       list->lock();       list->lock();
       int list_index = 0;  
       MessageQueueService *service = list->next(0);       MessageQueueService *service = list->next(0);
         ThreadStatus rtn = PEGASUS_THREAD_OK;
       while(service != NULL)       while(service != NULL)
         {         {
           ThreadStatus rtn;            if ((service->_incoming.count() > 0) &&
           rtn = PEGASUS_THREAD_OK;                (service->_die.value() == 0) &&
           if (service->_incoming.count() > 0                (service->_threads < max_threads_per_svc_queue))
               && service->_die.value() == 0            {
               && service->_threads <= max_threads_per_svc_queue)               // The _threads count is used to track the
             rtn = _thread_pool->allocate_and_awaken(service, _req_proc,               // number of active threads that have been allocated
                                                         &_polling_sem);               // to process messages for this service.
   
                // The _threads count MUST be incremented while
                // the polling_routine owns the _polling_thread
                // lock and has ownership of the service object.
  
                service->_threads++;
                try
                {
                    rtn = _thread_pool->allocate_and_awaken(
                         service, _req_proc, &_polling_sem);
                }
                catch (...)
                {
                    service->_threads--;
   
                    // allocate_and_awaken should never generate an exception.
                    PEGASUS_ASSERT(0);
                }
           // if no more threads available, break from processing loop           // if no more threads available, break from processing loop
           if (rtn != PEGASUS_THREAD_OK )           if (rtn != PEGASUS_THREAD_OK )
             {             {
                    service->_threads--;
                 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,                 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
                         "Not enough threads to process this request. Skipping.");                         "Not enough threads to process this request. Skipping.");
  
Line 144 
Line 171 
               pegasus_yield();               pegasus_yield();
               service = NULL;               service = NULL;
             }             }
           else            }
             if (service != NULL)
             {             {
               service = list->next(service);               service = list->next(service);
             }             }
Line 259 
Line 287 
 { {
    _die = 1;    _die = 1;
  
      // The polling_routine locks the _polling_list while
      // processing the incoming messages for services on the
      // list.  Deleting the service from the _polling_list
      // prior to processing, avoids synchronization issues
      // with the _polling_routine.
   
      _polling_list.remove(this);
   
      // ATTN: The code for closing the _incoming queue
      // is not working correctly. In OpenPegasus 2.5,
      // execution of the following code is very timing
      // dependent. This needs to be fix.
      // See Bug 4079 for details.
    if (_incoming_queue_shutdown.value() == 0)    if (_incoming_queue_shutdown.value() == 0)
    {    {
       _shutdown_incoming_queue();       _shutdown_incoming_queue();
   
    }    }
  
      // Wait until all threads processing the messages
      // for this service have completed.
   
  while (_threads.value() > 0)  while (_threads.value() > 0)
      {      {
           pegasus_yield();           pegasus_yield();
      }      }
    _polling_list.remove(this);  
    {    {
      AutoMutex autoMut(_meta_dispatcher_mutex);      AutoMutex autoMut(_meta_dispatcher_mutex);
      _service_count--;      _service_count--;
Line 366 
Line 409 
  
         if (service->_die.value() != 0)         if (service->_die.value() != 0)
         {         {
               service->_threads--;
             return (0);             return (0);
         }         }
             service->_threads++;  
         // pull messages off the incoming queue and dispatch them. then         // pull messages off the incoming queue and dispatch them. then
         // check pending messages that are non-blocking         // check pending messages that are non-blocking
         AsyncOpNode *operation = 0;         AsyncOpNode *operation = 0;


Legend:
Removed from v.1.115  
changed lines
  Added in v.1.116

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2