(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.88.2.4 and 1.88.2.6

version 1.88.2.4, 2005/08/12 23:47:57 version 1.88.2.6, 2005/08/23 22:34:37
Line 167 
Line 167 
          break;          break;
       }       }
  
         // The polling_routine thread must hold the lock on the
         // _polling_thread list while processing incoming messages.
         // This lock is used to give this thread ownership of
         // services on the _polling_routine list.
   
         // This is necessary to avoid confict with other threads
         // processing the _polling_list
         // (e.g., MessageQueueServer::~MessageQueueService).
   
       list->lock();       list->lock();
       MessageQueueService *service = list->next(0);       MessageQueueService *service = list->next(0);
         ThreadStatus rtn = PEGASUS_THREAD_OK;
       while(service != NULL)       while(service != NULL)
       {       {
          ThreadStatus rtn = PEGASUS_THREAD_OK;            if ((service->_incoming.count() > 0) &&
          if (service->_incoming.count() > 0 &&                (service->_die.value() == 0) &&
               service->_die.value() == 0 &&                (service->_threads < max_threads_per_svc_queue))
               service->_threads <= max_threads_per_svc_queue)            {
                // The _threads count is used to track the
                // number of active threads that have been allocated
                // to process messages for this service.
   
                // The _threads count MUST be incremented while
                // the polling_routine owns the _polling_thread
                // lock and has ownership of the service object.
   
                service->_threads++;
                try
          {          {
            rtn = _thread_pool->allocate_and_awaken(service,                   rtn = _thread_pool->allocate_and_awaken(
                _req_proc, &_polling_sem);                        service, _req_proc, &_polling_sem);
                }
                catch (...)
                {
                    service->_threads--;
   
                    // allocate_and_awaken should never generate an exception.
                    PEGASUS_ASSERT(0);
          }          }
          // if no more threads available, break from processing loop          // if no more threads available, break from processing loop
          if (rtn != PEGASUS_THREAD_OK )          if (rtn != PEGASUS_THREAD_OK )
          {          {
                    service->_threads--;
             Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,             Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
                "Not enough threads to process this request. Skipping.");                "Not enough threads to process this request. Skipping.");
  
Line 196 
Line 224 
             pegasus_yield();             pegasus_yield();
             service = NULL;             service = NULL;
             }             }
          else            }
             if (service != NULL)
          {          {
             service = list->next(service);             service = list->next(service);
          }          }
       }       }
       list->unlock();       list->unlock();
   
       if(_check_idle_flag.value() != 0 )       if(_check_idle_flag.value() != 0 )
       {       {
          _check_idle_flag = 0;          _check_idle_flag = 0;
Line 323 
Line 353 
 { {
    _die = 1;    _die = 1;
  
      // The polling_routine locks the _polling_list while
      // processing the incoming messages for services on the
      // list.  Deleting the service from the _polling_list
      // prior to processing, avoids synchronization issues
      // with the _polling_routine.
   
      _polling_list.remove(this);
   
      _callback_ready.signal();
   
      // ATTN: The code for closing the _incoming queue
      // is not working correctly. In OpenPegasus 2.4,
      // execution of the following code is very timing
      // dependent. This needs to be fix.
      // See Bug 4079 for details.
    if (_incoming_queue_shutdown.value() == 0 )    if (_incoming_queue_shutdown.value() == 0 )
    {    {
       _shutdown_incoming_queue();       _shutdown_incoming_queue();
    }    }
  
      // Wait until all threads processing the messages
      // for this service have completed.
   
    while (_threads.value() > 0)    while (_threads.value() > 0)
    {    {
        pegasus_yield();        pegasus_yield();
    }    }
    _polling_list.remove(this);  
   
    _callback_ready.signal();  
  
    {    {
      AutoMutex autoMut(_meta_dispatcher_mutex);      AutoMutex autoMut(_meta_dispatcher_mutex);
Line 465 
Line 510 
  
    if ( service->_die.value() == 0 )    if ( service->_die.value() == 0 )
     {     {
          service->_threads++;  
          try          try
          {          {
             operation = service->_incoming.remove_first();             operation = service->_incoming.remove_first();
Line 734 
Line 778 
 Boolean MessageQueueService::accept_async(AsyncOpNode *op) Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 { {
    if (_incoming_queue_shutdown.value() > 0 )    if (_incoming_queue_shutdown.value() > 0 )
       return false;        return true;
  
 // ATTN optimization remove the message checking altogether in the base // ATTN optimization remove the message checking altogether in the base
 // << Mon Feb 18 14:02:20 2002 mdd >> // << Mon Feb 18 14:02:20 2002 mdd >>


Legend:
Removed from v.1.88.2.4  
changed lines
  Added in v.1.88.2.6

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2