(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.53 and 1.70.2.1

version 1.53, 2002/06/07 00:03:32 version 1.70.2.1, 2003/08/08 17:39:38
Line 1 
Line 1 
 //%////-*-c++-*-//////////////////////////////////////////////////////////////// //%////-*-c++-*-////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM  // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
   // The Open Group, Tivoli Systems
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 37 
Line 38 
 AtomicInt MessageQueueService::_xid(1); AtomicInt MessageQueueService::_xid(1);
 Mutex MessageQueueService::_meta_dispatcher_mutex; Mutex MessageQueueService::_meta_dispatcher_mutex;
  
 static struct timeval create_time = {0, 100};  static struct timeval create_time = {0, 1};
 static struct timeval destroy_time = {1, 0};  static struct timeval destroy_time = {300, 0};
 static struct timeval deadlock_time = {10, 0};  static struct timeval deadlock_time = {0, 0};
  
 ThreadPool MessageQueueService::_thread_pool(2, "MessageQueueService", 2, 20,  ThreadPool *MessageQueueService::_thread_pool = 0;
                                              create_time, destroy_time, deadlock_time);  
  
 DQueue<MessageQueueService> MessageQueueService::_polling_list(true); DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
  
 int MessageQueueService::kill_idle_threads(void)  Thread* MessageQueueService::_polling_thread = 0;
   
   ThreadPool *MessageQueueService::get_thread_pool(void)
   {
      return _thread_pool;
   }
   
   void unload_idle_providers(void);
   
   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL  MessageQueueService::kill_idle_threads(void *parm)
 { {
    static struct timeval now, last;  
      static struct timeval now, last = {0,0};
    gettimeofday(&now, NULL);    gettimeofday(&now, NULL);
      int dead_threads = 0;
  
    if( now.tv_sec - last.tv_sec > 0 )     if( now.tv_sec - last.tv_sec > 120 )
    {    {
       gettimeofday(&last, NULL);       gettimeofday(&last, NULL);
       return _thread_pool.kill_dead_threads();        try
         {
            dead_threads =  MessageQueueService::_thread_pool->kill_dead_threads();
    }    }
    return 0;        catch(...)
         {
   
         }
      }
      exit_thread((PEGASUS_THREAD_RETURN)dead_threads);
      return (PEGASUS_THREAD_RETURN)dead_threads;
   }
   
   
   void MessageQueueService::force_shutdown(void)
   {
      PEGASUS_STD(cout) << "Forcing shutdown of CIMOM Message Router" << PEGASUS_STD(endl);
      MessageQueueService::_stop_polling = 1;
      MessageQueueService *svc;
   
      _polling_list.lock();
      svc = _polling_list.next(0);
      while(svc != 0)
      {
         PEGASUS_STD(cout) << "Stopping " << svc->getQueueName() << PEGASUS_STD(endl);
         _polling_sem.signal();
         svc->_shutdown_incoming_queue();
         _polling_sem.signal();
         svc = _polling_list.next(svc);
 } }
      _polling_list.unlock();
   }
   
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
 { {
    Thread *myself = reinterpret_cast<Thread *>(parm);    Thread *myself = reinterpret_cast<Thread *>(parm);
   
    DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());    DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
   
    while ( _stop_polling.value()  == 0 )    while ( _stop_polling.value()  == 0 )
    {    {
       _polling_sem.wait();       _polling_sem.wait();
         if(_stop_polling.value() != 0 )
         {
            break;
         }
   
       list->lock();       list->lock();
       MessageQueueService *service = list->next(0);       MessageQueueService *service = list->next(0);
       while(service != NULL)       while(service != NULL)
       {       {
          if(service->_incoming.count() > 0 )          if(service->_incoming.count() > 0 )
          {          {
             _thread_pool.allocate_and_awaken(service, _req_proc);              _thread_pool->allocate_and_awaken(service, _req_proc);
   
 //          service->_req_proc(service);  
          }          }
   
          service = list->next(service);          service = list->next(service);
       }       }
       list->unlock();       list->unlock();
         if(_check_idle_flag.value() != 0 )
         {
            _check_idle_flag = 0;
            Thread th(kill_idle_threads, 0, true);
            th.run();
         }
    }    }
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
    return(0);    return(0);
 } }
  
 Thread MessageQueueService::_polling_thread(polling_routine,  
                                             reinterpret_cast<void *>(&_polling_list),  
                                             false);  
   
  
 Semaphore MessageQueueService::_polling_sem(0); Semaphore MessageQueueService::_polling_sem(0);
 AtomicInt MessageQueueService::_stop_polling(0); AtomicInt MessageQueueService::_stop_polling(0);
   AtomicInt MessageQueueService::_check_idle_flag(0);
  
  
 MessageQueueService::MessageQueueService(const char *name, MessageQueueService::MessageQueueService(const char *name,
Line 112 
Line 155 
      _callback_thread(_callback_proc, this, false)      _callback_thread(_callback_proc, this, false)
  
 { {
   
    _capabilities = (capabilities | module_capabilities::async);    _capabilities = (capabilities | module_capabilities::async);
  
    _default_op_timeout.tv_sec = 30;    _default_op_timeout.tv_sec = 30;
Line 128 
Line 172 
          _meta_dispatcher_mutex.unlock();          _meta_dispatcher_mutex.unlock();
          throw NullPointer();          throw NullPointer();
       }       }
       _polling_thread.run();        _thread_pool = new ThreadPool(0, "MessageQueueService", 0, 0,
                                       create_time, destroy_time, deadlock_time);
   
         _polling_thread = new Thread(polling_routine,
                                      reinterpret_cast<void *>(&_polling_list),
                                      false);
         _polling_thread->run();
    }    }
    _service_count++;    _service_count++;
  
    if( false == register_service(name, _capabilities, _mask) )    if( false == register_service(name, _capabilities, _mask) )
    {    {
       _meta_dispatcher_mutex.unlock();       _meta_dispatcher_mutex.unlock();
       throw BindFailed("MessageQueueService Base Unable to register with  Meta Dispatcher");        throw BindFailedException("MessageQueueService Base Unable to register with  Meta Dispatcher");
    }    }
  
    _polling_list.insert_last(this);    _polling_list.insert_last(this);
Line 150 
Line 200 
 MessageQueueService::~MessageQueueService(void) MessageQueueService::~MessageQueueService(void)
 { {
    _die = 1;    _die = 1;
    if (_incoming_queue_shutdown.value() == 0 )     // IBM-KR: This causes a new message (IO_CLOSE) to be spawned, which
    {     // doesn't get picked up anyone. The idea was that the message would be
       _shutdown_incoming_queue();     // picked up handle_AsyncIoctl which closes the queue and does cleaning.
    }     // That described behavior has never surfaced itself. If it does appear,
      // uncomment the if ( ..) { } block below.
   
      // Note: The handle_AsyncIcotl does get called when force_shutdown(void) gets
      // called during Pegasus shutdown procedure (in case you ever wondered).
   
      //if (_incoming_queue_shutdown.value() == 0 )
      //{
      //   _shutdown_incoming_queue();
      //}
    _callback_ready.signal();    _callback_ready.signal();
 //   _callback_thread.join(); //   _callback_thread.join();
  
Line 163 
Line 222 
    {    {
       _stop_polling++;       _stop_polling++;
       _polling_sem.signal();       _polling_sem.signal();
       _polling_thread.join();        _polling_thread->join();
         delete _polling_thread;
         _polling_thread = 0;
       _meta_dispatcher->_shutdown_routed_queue();       _meta_dispatcher->_shutdown_routed_queue();
       delete _meta_dispatcher;       delete _meta_dispatcher;
       _meta_dispatcher = 0;       _meta_dispatcher = 0;
         delete _thread_pool;
         _thread_pool = 0;
    }    }
    _meta_dispatcher_mutex.unlock();    _meta_dispatcher_mutex.unlock();
    _polling_list.remove(this);    _polling_list.remove(this);
      // Clean up in case there are extra stuff on the queue.
     while (_incoming.count())
     {
           delete _incoming.remove_first();
     }
 } }
  
 void MessageQueueService::_shutdown_incoming_queue(void) void MessageQueueService::_shutdown_incoming_queue(void)
Line 248 
Line 315 
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 { {
 //   Thread *myself = reinterpret_cast<Thread *>(parm);  
 //   MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());  
    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
    // pull messages off the incoming queue and dispatch them. then    // pull messages off the incoming queue and dispatch them. then
    // check pending messages that are non-blocking    // check pending messages that are non-blocking
    AsyncOpNode *operation = 0;    AsyncOpNode *operation = 0;
  
 //    while ( service->_die.value() == 0 )     if ( service->_die.value() == 0 )
 //    {      {
          try          try
          {          {
             operation = service->_incoming.remove_first();             operation = service->_incoming.remove_first();
          }          }
          catch(ListClosed & )          catch(ListClosed & )
          {          {
 //          break;              operation = 0;
   
               return(0);
          }          }
          if( operation )          if( operation )
          {          {
 //          operation->_thread_ptr = pegasus_thread_self();  
             operation->_service_ptr = service;             operation->_service_ptr = service;
             service->_handle_incoming_operation(operation);             service->_handle_incoming_operation(operation);
          }          }
 //    }      }
  
 //    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );  
    return(0);    return(0);
 } }
  
Line 473 
Line 538 
       AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);       AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
       AsyncOpNode *op = async->op;       AsyncOpNode *op = async->op;
       request->_async = 0;       request->_async = 0;
       // this request is probably going to be deleted !!        // the legacy request is going to be deleted by its handler
       // remove it from the op node       // remove it from the op node
       op->_request.remove(request);  
         static_cast<AsyncLegacyOperationStart *>(async)->get_action();
   
  
       AsyncLegacyOperationResult *async_result =       AsyncLegacyOperationResult *async_result =
          new AsyncLegacyOperationResult(          new AsyncLegacyOperationResult(
Line 650 
Line 717 
  
          // shutdown the AsyncDQueue          // shutdown the AsyncDQueue
          service->_incoming.shutdown_queue();          service->_incoming.shutdown_queue();
            AsyncOpNode *op = req->op;
            op->_request.remove_first();
            op->release();
            return_op(op);
            delete req;
          // exit the thread !          // exit the thread !
 //       myself->exit_self( (PEGASUS_THREAD_RETURN) 1 ); //       myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
          return;          return;


Legend:
Removed from v.1.53  
changed lines
  Added in v.1.70.2.1

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2