(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.62 and 1.64.2.1

version 1.62, 2002/07/12 23:10:04 version 1.64.2.1, 2002/10/25 20:49:43
Line 39 
Line 39 
 Mutex MessageQueueService::_meta_dispatcher_mutex; Mutex MessageQueueService::_meta_dispatcher_mutex;
  
 static struct timeval create_time = {0, 1}; static struct timeval create_time = {0, 1};
 static struct timeval destroy_time = {15, 0};  static struct timeval destroy_time = {10, 0};
 static struct timeval deadlock_time = {0, 0}; static struct timeval deadlock_time = {0, 0};
  
 ThreadPool *MessageQueueService::_thread_pool = 0; ThreadPool *MessageQueueService::_thread_pool = 0;
Line 48 
Line 48 
  
 Thread* MessageQueueService::_polling_thread = 0; Thread* MessageQueueService::_polling_thread = 0;
  
   ThreadPool *MessageQueueService::get_thread_pool(void)
   {
      return _thread_pool;
   }
  
 int MessageQueueService::kill_idle_threads(void)  void unload_idle_providers(void);
   
   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL  MessageQueueService::kill_idle_threads(void *parm)
 { {
    static struct timeval now, last;     Thread *myself = reinterpret_cast<Thread *>(parm);
   
      static struct timeval now, last = {0,0};
    gettimeofday(&now, NULL);    gettimeofday(&now, NULL);
    int dead_threads = 0;    int dead_threads = 0;
  
    if( now.tv_sec - last.tv_sec > 0 )     if( now.tv_sec - last.tv_sec > 300 )
    {    {
   
         PEGASUS_STD(cout) << "Work Thread Pool currently has " <<
            MessageQueueService::_thread_pool->running_count() +
            MessageQueueService::_thread_pool->pool_count() << " Threads." << PEGASUS_STD(endl);
       gettimeofday(&last, NULL);       gettimeofday(&last, NULL);
       try       try
       {       {
          dead_threads =  _thread_pool->kill_dead_threads();           dead_threads =  MessageQueueService::_thread_pool->kill_dead_threads();
       }       }
       catch(IPCException& )        catch(...)
       {       {
  
       }       }
   
      }
      exit_thread((PEGASUS_THREAD_RETURN)dead_threads);
      return (PEGASUS_THREAD_RETURN)dead_threads;
   }
   
   void MessageQueueService::force_shutdown(void)
   {
      PEGASUS_STD(cout) << "Forcing shutdown of CIMOM Message Router" << PEGASUS_STD(endl);
      MessageQueueService::_stop_polling = 1;
      MessageQueueService *svc;
   
      _polling_list.lock();
      svc = _polling_list.next(0);
      while(svc != 0)
      {
         PEGASUS_STD(cout) << "Stopping " << svc->getQueueName() << PEGASUS_STD(endl);
         _polling_sem.signal();
         svc->_shutdown_incoming_queue();
         _polling_sem.signal();
         svc = _polling_list.next(svc);
    }    }
    return dead_threads;     _polling_list.unlock();
 } }
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
Line 89 
Line 122 
          if(service->_incoming.count() > 0 )          if(service->_incoming.count() > 0 )
          {          {
             _thread_pool->allocate_and_awaken(service, _req_proc);             _thread_pool->allocate_and_awaken(service, _req_proc);
 //          service->_req_proc(service);  
          }          }
          service = list->next(service);          service = list->next(service);
       }       }
       list->unlock();       list->unlock();
         if(_check_idle_flag.value() != 0 )
         {
            _check_idle_flag = 0;
            Thread th(kill_idle_threads, 0, true);
            th.run();
         }
    }    }
   
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
    return(0);    return(0);
 } }
Line 102 
Line 141 
  
 Semaphore MessageQueueService::_polling_sem(0); Semaphore MessageQueueService::_polling_sem(0);
 AtomicInt MessageQueueService::_stop_polling(0); AtomicInt MessageQueueService::_stop_polling(0);
   AtomicInt MessageQueueService::_check_idle_flag(0);
  
  
 MessageQueueService::MessageQueueService(const char *name, MessageQueueService::MessageQueueService(const char *name,
Line 118 
Line 158 
      _callback_ready(0),      _callback_ready(0),
      _req_thread(_req_proc, this, false),      _req_thread(_req_proc, this, false),
      _callback_thread(_callback_proc, this, false)      _callback_thread(_callback_proc, this, false)
   
 { {
  
    _capabilities = (capabilities | module_capabilities::async);    _capabilities = (capabilities | module_capabilities::async);
Line 150 
Line 189 
    if( false == register_service(name, _capabilities, _mask) )    if( false == register_service(name, _capabilities, _mask) )
    {    {
       _meta_dispatcher_mutex.unlock();       _meta_dispatcher_mutex.unlock();
       throw BindFailed("MessageQueueService Base Unable to register with  Meta Dispatcher");        throw UninitializedObjectException();
    }    }
  
    _polling_list.insert_last(this);    _polling_list.insert_last(this);
Line 183 
Line 222 
       _meta_dispatcher->_shutdown_routed_queue();       _meta_dispatcher->_shutdown_routed_queue();
       delete _meta_dispatcher;       delete _meta_dispatcher;
       _meta_dispatcher = 0;       _meta_dispatcher = 0;
   
    }    }
    _meta_dispatcher_mutex.unlock();    _meta_dispatcher_mutex.unlock();
    _polling_list.remove(this);    _polling_list.remove(this);
 } }
  
   
   
   
   
 void MessageQueueService::_shutdown_incoming_queue(void) void MessageQueueService::_shutdown_incoming_queue(void)
 { {
  
Line 487 
Line 529 
       AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);       AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
       AsyncOpNode *op = async->op;       AsyncOpNode *op = async->op;
       request->_async = 0;       request->_async = 0;
       // this request is probably going to be deleted !!        // the legacy request is going to be deleted by its handler
       // remove it from the op node       // remove it from the op node
       op->_request.remove(request);  
         static_cast<AsyncLegacyOperationStart *>(async)->get_action();
  
  
       AsyncLegacyOperationResult *async_result =       AsyncLegacyOperationResult *async_result =
Line 548 
Line 591 
  
 // ATTN optimization remove the message checking altogether in the base // ATTN optimization remove the message checking altogether in the base
 // << Mon Feb 18 14:02:20 2002 mdd >> // << Mon Feb 18 14:02:20 2002 mdd >>
    op->lock();  //   op->lock();
    Message *rq = op->_request.next(0);    Message *rq = op->_request.next(0);
    Message *rp = op->_response.next(0);    Message *rp = op->_response.next(0);
    op->unlock();  //   op->unlock();
  
    if(  (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&    if(  (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&
         _die.value() == 0  )         _die.value() == 0  )


Legend:
Removed from v.1.62  
changed lines
  Added in v.1.64.2.1

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2