(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.88.2.7 and 1.102

version 1.88.2.7, 2005/09/02 17:44:00 version 1.102, 2005/04/14 15:37:46
Line 30 
Line 30 
 // Author: Mike Day (mdday@us.ibm.com) // Author: Mike Day (mdday@us.ibm.com)
 // //
 // Modified By: // Modified By:
 //              Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090  //              Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090,#2657
   //              Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#3259
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
Line 46 
Line 47 
 AtomicInt MessageQueueService::_xid(1); AtomicInt MessageQueueService::_xid(1);
 Mutex MessageQueueService::_meta_dispatcher_mutex; Mutex MessageQueueService::_meta_dispatcher_mutex;
  
 static struct timeval create_time = {0, 1};  static struct timeval deallocateWait = {300, 0};
 static struct timeval destroy_time = {300, 0};  
 static struct timeval deadlock_time = {0, 0};  
  
 ThreadPool *MessageQueueService::_thread_pool = 0; ThreadPool *MessageQueueService::_thread_pool = 0;
  
 DQueue<MessageQueueService>* MessageQueueService::_polling_list = 0;  DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
  
 Thread* MessageQueueService::_polling_thread = 0; Thread* MessageQueueService::_polling_thread = 0;
  
Line 61 
Line 60 
    return _thread_pool;    return _thread_pool;
 } }
  
 //  
 // MAX_THREADS_PER_SVC_QUEUE_LIMIT  
 //  
 // JR Wunderlich Jun 6, 2005  
 //  
   
 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000  
 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5  
   
 Uint32 max_threads_per_svc_queue;  
   
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL  MessageQueueService::kill_idle_threads(void *parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL  MessageQueueService::kill_idle_threads(void *parm)
 { {
  
Line 84 
Line 72 
       gettimeofday(&last, NULL);       gettimeofday(&last, NULL);
       try       try
       {       {
          dead_threads =  MessageQueueService::_thread_pool->kill_dead_threads();           dead_threads = MessageQueueService::_thread_pool->cleanupIdleThreads();
       }       }
       catch(...)       catch(...)
       {       {
Line 101 
Line 89 
 #endif #endif
 } }
  
   
 void MessageQueueService::force_shutdown(Boolean destroy_flag)  
 {  
    return;  
   
 #ifdef MESSAGEQUEUESERVICE_DEBUG  
         //l10n  
    MessageLoaderParms parms("Common.MessageQueueService.FORCING_SHUTDOWN",  
                             "Forcing shutdown of CIMOM Message Router");  
    PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);  
 #endif  
   
   
    MessageQueueService *svc;  
    int counter = 0;  
    _polling_list->lock();  
    svc = _polling_list->next(0);  
   
    while(svc != 0)  
    {  
 #ifdef MESSAGEQUEUESERVICE_DEBUG  
                 //l10n - reuse same MessageLoaderParms to avoid multiple creates  
         parms.msg_id = "Common.MessageQueueService.STOPPING_SERVICE";  
                 parms.default_msg = "Stopping $0";  
                 parms.arg0 = svc->getQueueName();  
                 PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);  
 #endif  
   
       _polling_sem->signal();  
       svc->_shutdown_incoming_queue();  
       counter++;  
       _polling_sem->signal();  
       svc = _polling_list->next(svc);  
    }  
    _polling_list->unlock();  
   
    _polling_sem->signal();  
   
    *MessageQueueService::_stop_polling = 1;  
   
    if(destroy_flag == true)  
    {  
   
       svc = _polling_list->remove_last();  
       while(svc)  
       {  
          delete svc;  
          svc = _polling_list->remove_last();  
       }  
   
    }  
 }  
   
   
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
 { {
    Thread *myself = reinterpret_cast<Thread *>(parm);    Thread *myself = reinterpret_cast<Thread *>(parm);
    DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());    DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
    while ( _stop_polling->value()  == 0 )     while ( _stop_polling.value()  == 0 )
    {    {
       _polling_sem->wait();        _polling_sem.wait();
       if(_stop_polling->value() != 0 )  
         if(_stop_polling.value() != 0 )
       {       {
          break;          break;
       }       }
  
       // The polling_routine thread must hold the lock on the  
       // _polling_thread list while processing incoming messages.  
       // This lock is used to give this thread ownership of  
       // services on the _polling_routine list.  
   
       // This is necessary to avoid confict with other threads  
       // processing the _polling_list  
       // (e.g., MessageQueueServer::~MessageQueueService).  
   
       list->lock();       list->lock();
       MessageQueueService *service = list->next(0);       MessageQueueService *service = list->next(0);
       ThreadStatus rtn = PEGASUS_THREAD_OK;  
       while (service != NULL)       while (service != NULL)
       {       {
           if ((service->_incoming.count() > 0) &&           if(service->_incoming.count() > 0 )
               (service->_die.value() == 0) &&  
               (service->_threads < max_threads_per_svc_queue))  
           {  
              // The _threads count is used to track the  
              // number of active threads that have been allocated  
              // to process messages for this service.  
   
              // The _threads count MUST be incremented while  
              // the polling_routine owns the _polling_thread  
              // lock and has ownership of the service object.  
   
              service->_threads++;  
              try  
              {              {
                  rtn = _thread_pool->allocate_and_awaken(              _thread_pool->allocate_and_awaken(service, _req_proc);
                       service, _req_proc, _polling_sem);  
              }  
              catch (...)  
              {  
                  service->_threads--;  
   
                  // allocate_and_awaken should never generate an exception.  
                  PEGASUS_ASSERT(0);  
              }  
              // if no more threads available, break from processing loop  
              if (rtn != PEGASUS_THREAD_OK )  
              {  
                  service->_threads--;  
                  Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,  
                     "Not enough threads to process this request. Skipping.");  
   
                  Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,  
                     "Could not allocate thread for %s. " \  
                     "Queue has %d messages waiting and %d threads servicing." \  
                     "Skipping the service for right now. ",  
                     service->getQueueName(),  
                     service->_incoming.count(),  
                     service->_threads.value());  
   
                  pegasus_yield();  
                  service = NULL;  
               }  
           }           }
           if (service != NULL)  
           {  
              service = list->next(service);              service = list->next(service);
           }           }
       }  
       list->unlock();       list->unlock();
         if(_check_idle_flag.value() != 0 )
       if(_check_idle_flag->value() != 0 )  
       {  
          *_check_idle_flag = 0;  
   
          // try to do idle thread clean up processing when system is not busy  
          // if system is busy there may not be a thread available to allocate  
          // so nothing will be done and that is OK.  
   
          if ( _thread_pool->allocate_and_awaken(service, kill_idle_threads,  
               _polling_sem) != PEGASUS_THREAD_OK)  
          {          {
              Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,           _check_idle_flag = 0;
                 "Not enough threads to kill idle threads. What an irony.");  
  
              Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,           // If there are insufficent resources to run
                 "Could not allocate thread to kill idle threads." \           // kill_idle_threads, then just return.
                 "Skipping. ");           _thread_pool->allocate_and_awaken(service, kill_idle_threads);
          }  
       }       }
    }    }
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
Line 257 
Line 127 
 } }
  
  
 Semaphore* MessageQueueService::_polling_sem = 0;  Semaphore MessageQueueService::_polling_sem(0);
 AtomicInt* MessageQueueService::_stop_polling = 0;  AtomicInt MessageQueueService::_stop_polling(0);
 AtomicInt* MessageQueueService::_check_idle_flag = 0;  AtomicInt MessageQueueService::_check_idle_flag(0);
  
  
 MessageQueueService::MessageQueueService(const char *name, MessageQueueService::MessageQueueService(const char *name,
Line 270 
Line 140 
  
      _mask(mask),      _mask(mask),
      _die(0),      _die(0),
      _threads(0),  
      _incoming(true, 0),      _incoming(true, 0),
      _callback(true),      _callback(true),
      _incoming_queue_shutdown(0),      _incoming_queue_shutdown(0),
Line 285 
Line 154 
    _default_op_timeout.tv_sec = 30;    _default_op_timeout.tv_sec = 30;
    _default_op_timeout.tv_usec = 100;    _default_op_timeout.tv_usec = 100;
  
    max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;  
   
    // if requested threads gt MAX_THREADS_PER_SVC_QUEUE_LIMIT  
    // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT  
   
    if (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT)  
    {  
       max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;  
    }  
   
    // if requested threads eq 0 (unlimited)  
    // then set to MAX_THREADS_PER_SVC_QUEUE_LIMIT  
   
    if (max_threads_per_svc_queue == 0)  
    {  
       max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_DEFAULT;  
    }  
   
    // cout << "MAX_THREADS_PER_SVC_QUEUE = " << MAX_THREADS_PER_SVC_QUEUE << endl;  
    // cout << "max_threads_per_svc_queue set to = " << max_threads_per_svc_queue << endl;  
   
    AutoMutex autoMut(_meta_dispatcher_mutex);    AutoMutex autoMut(_meta_dispatcher_mutex);
  
    if( _meta_dispatcher == 0 )    if( _meta_dispatcher == 0 )
    {    {
       // Instantiate the common objects        _stop_polling = 0;
       _polling_list = new DQueue<MessageQueueService>(true);  
       _stop_polling = new AtomicInt(0);  
       _polling_sem = new Semaphore(0);  
       _check_idle_flag = new AtomicInt(0);  
   
       *_stop_polling = 0;  
       PEGASUS_ASSERT( _service_count.value() == 0 );       PEGASUS_ASSERT( _service_count.value() == 0 );
       _meta_dispatcher = new cimom();       _meta_dispatcher = new cimom();
       if (_meta_dispatcher == NULL )       if (_meta_dispatcher == NULL )
       {       {
          throw NullPointer();          throw NullPointer();
       }       }
       _thread_pool = new ThreadPool(0, "MessageQueueService", 0, 0,        _thread_pool =
                                     create_time, destroy_time, deadlock_time);            new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
   
       _polling_thread = new Thread(polling_routine,  
                                    reinterpret_cast<void *>(_polling_list),  
                                    false);  
       while (!_polling_thread->run())  
       {  
          pegasus_yield();  
       }  
    }    }
    _service_count++;    _service_count++;
  
Line 346 
Line 180 
       throw BindFailedException(parms);       throw BindFailedException(parms);
    }    }
  
    _polling_list->insert_last(this);     _polling_list.insert_last(this);
  
 //   _meta_dispatcher_mutex.unlock();  //Bug#1090 //   _meta_dispatcher_mutex.unlock();  //Bug#1090
 //   _callback_thread.run(); //   _callback_thread.run();
Line 359 
Line 193 
 { {
    _die = 1;    _die = 1;
  
    // The polling_routine locks the _polling_list while  
    // processing the incoming messages for services on the  
    // list.  Deleting the service from the _polling_list  
    // prior to processing, avoids synchronization issues  
    // with the _polling_routine.  
   
    _polling_list->remove(this);  
   
    _callback_ready.signal();  
   
    // ATTN: The code for closing the _incoming queue  
    // is not working correctly. In OpenPegasus 2.4,  
    // execution of the following code is very timing  
    // dependent. This needs to be fix.  
    // See Bug 4079 for details.  
    if (_incoming_queue_shutdown.value() == 0)    if (_incoming_queue_shutdown.value() == 0)
    {    {
        _shutdown_incoming_queue();        _shutdown_incoming_queue();
    }    }
      _callback_ready.signal();
    // Wait until all threads processing the messages  
    // for this service have completed.  
   
    while (_threads.value() > 0)  
    {  
       pegasus_yield();  
    }  
  
    {    {
      AutoMutex autoMut(_meta_dispatcher_mutex);      AutoMutex autoMut(_meta_dispatcher_mutex);
Line 393 
Line 205 
      if (_service_count.value() == 0 )      if (_service_count.value() == 0 )
      {      {
  
       (*_stop_polling)++;        _stop_polling++;
       _polling_sem->signal();        _polling_sem.signal();
             if (_polling_thread) {
       _polling_thread->join();       _polling_thread->join();
       delete _polling_thread;       delete _polling_thread;
       _polling_thread = 0;       _polling_thread = 0;
             }
       _meta_dispatcher->_shutdown_routed_queue();       _meta_dispatcher->_shutdown_routed_queue();
       delete _meta_dispatcher;       delete _meta_dispatcher;
       _meta_dispatcher = 0;       _meta_dispatcher = 0;
  
       delete _thread_pool;       delete _thread_pool;
       _thread_pool = 0;       _thread_pool = 0;
   
       // Clean up the common objects  
       delete _check_idle_flag;  
       delete _polling_sem;  
       delete _stop_polling;  
       delete _polling_list;  
      }      }
    } // mutex unlocks here    } // mutex unlocks here
      _polling_list.remove(this);
    // Clean up in case there are extra stuff on the queue.    // Clean up in case there are extra stuff on the queue.
    while (_incoming.count())    while (_incoming.count())
    {    {
        try  
        {  
           delete _incoming.remove_first();           delete _incoming.remove_first();
        }        }
        catch (const ListClosed &e)  
        {  
           // If the list is closed, there is nothing we can do.  
           break;  
        }  
    }  
 } }
  
 void MessageQueueService::_shutdown_incoming_queue(void) void MessageQueueService::_shutdown_incoming_queue(void)
Line 451 
Line 252 
    msg->op->_op_dest = this;    msg->op->_op_dest = this;
    msg->op->_request.insert_first(msg);    msg->op->_request.insert_first(msg);
  
    try  
    {  
        _incoming.insert_last_wait(msg->op);        _incoming.insert_last_wait(msg->op);
        _polling_sem->signal();  
    }  
    catch (const ListClosed &)  
    {  
        // This means the queue has already been shut-down (happens  when there  
        // are two AsyncIoctrl::IO_CLOSE messages generated and one got first  
        // processed.  
        delete msg;  
    }  
    catch (const Permission &)  
    {  
        delete msg;  
    }  
 } }
  
  
Line 516 
Line 303 
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 { {
    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
   
      if ( service->_die.value() != 0)
                    return (0);
   
    // pull messages off the incoming queue and dispatch them. then    // pull messages off the incoming queue and dispatch them. then
    // check pending messages that are non-blocking    // check pending messages that are non-blocking
    AsyncOpNode *operation = 0;    AsyncOpNode *operation = 0;
  
    if ( service->_die.value() == 0 )           // many operations may have been queued.
            do
     {     {
          try          try
          {          {
Line 528 
Line 320 
          }          }
          catch(ListClosed & )          catch(ListClosed & )
          {          {
             operation = 0;                           break;
             service->_threads--;  
             return(0);  
          }          }
   
          if( operation )          if( operation )
          {          {
             operation->_service_ptr = service;             operation->_service_ptr = service;
             service->_handle_incoming_operation(operation);             service->_handle_incoming_operation(operation);
          }          }
     }           } while (operation);
    service->_threads--;  
    return(0);    return(0);
 } }
  
Line 709 
Line 500 
    Message* response)    Message* response)
  
 { {
   
     STAT_COPYDISPATCHER
   
    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
                     "MessageQueueService::_enqueueResponse");                     "MessageQueueService::_enqueueResponse");
  
Line 790 
Line 584 
 Boolean MessageQueueService::accept_async(AsyncOpNode *op) Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 { {
    if (_incoming_queue_shutdown.value() > 0 )    if (_incoming_queue_shutdown.value() > 0 )
       return true;        return false;
      if (_polling_thread == NULL)  {
         _polling_thread = new Thread(polling_routine,
                                      reinterpret_cast<void *>(&_polling_list),
                                      false);
         while (!_polling_thread->run())
         {
            pegasus_yield();
         }
           }
 // ATTN optimization remove the message checking altogether in the base // ATTN optimization remove the message checking altogether in the base
 // << Mon Feb 18 14:02:20 2002 mdd >> // << Mon Feb 18 14:02:20 2002 mdd >>
    op->lock();    op->lock();
Line 803 
Line 605 
         _die.value() == 0  )         _die.value() == 0  )
    {    {
       _incoming.insert_last_wait(op);       _incoming.insert_last_wait(op);
       _polling_sem->signal();        _polling_sem.signal();
       return true;       return true;
    }    }
    return false;    return false;
Line 1030 
Line 832 
    op->_op_dest = MessageQueue::lookup(destination); // destination of this message    op->_op_dest = MessageQueue::lookup(destination); // destination of this message
    op->_flags |= ASYNC_OPFLAGS_CALLBACK;    op->_flags |= ASYNC_OPFLAGS_CALLBACK;
    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
    op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
    // initialize the callback data    // initialize the callback data
    op->_async_callback = callback;   // callback function to be executed by recpt. of response    op->_async_callback = callback;   // callback function to be executed by recpt. of response
    op->_callback_node = op;          // the op node    op->_callback_node = op;          // the op node
Line 1155 
Line 956 
              (void *)0);              (void *)0);
  
    request->op->_client_sem.wait();    request->op->_client_sem.wait();
   
    request->op->lock();    request->op->lock();
    AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());    AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
    rpl->op = 0;    rpl->op = 0;


Legend:
Removed from v.1.88.2.7  
changed lines
  Added in v.1.102

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2