(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.129 and 1.130

version 1.129, 2006/09/13 21:05:37 version 1.130, 2006/10/03 18:16:03
Line 31 
Line 31 
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 // #include <iostream.h>  
 #include "MessageQueueService.h" #include "MessageQueueService.h"
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
 #include <Pegasus/Common/MessageLoader.h> #include <Pegasus/Common/MessageLoader.h>
 #include <Pegasus/Common/StatisticalData.h>  
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
Line 57 
Line 55 
    return _thread_pool;    return _thread_pool;
 } }
  
 void MessageQueueService::cleanupThreadPool()  
 {  
     _check_idle_flag = 1;  
     _polling_sem.signal();  
 }  
   
 // //
 // MAX_THREADS_PER_SVC_QUEUE // MAX_THREADS_PER_SVC_QUEUE
 // //
Line 78 
Line 70 
  
 Uint32 max_threads_per_svc_queue; Uint32 max_threads_per_svc_queue;
  
 ThreadReturnType PEGASUS_THREAD_CDECL  
 MessageQueueService::kill_idle_threads(void *parm)  
 {  
   
    static struct timeval now, last = {0,0};  
    Time::gettimeofday(&now);  
    int dead_threads = 0;  
   
    if (now.tv_sec - last.tv_sec > 120)  
    {  
       Time::gettimeofday(&last);  
       try  
       {  
          dead_threads = MessageQueueService::_thread_pool->cleanupIdleThreads();  
       }  
       catch(...)  
       {  
   
       }  
    }  
   
 #ifdef PEGASUS_POINTER_64BIT  
    return (ThreadReturnType)(Uint64)dead_threads;  
 #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX  
    return (ThreadReturnType)(unsigned long)dead_threads;  
 #else  
    return (ThreadReturnType)(Uint32)dead_threads;  
 #endif  
 }  
   
 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm) ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
 { {
    Thread *myself = reinterpret_cast<Thread *>(parm);    Thread *myself = reinterpret_cast<Thread *>(parm);
Line 124 
Line 86 
       }       }
  
       // The polling_routine thread must hold the lock on the       // The polling_routine thread must hold the lock on the
       // _polling_thread list while processing incoming messages.        // _polling_list while processing incoming messages.
       // This lock is used to give this thread ownership of       // This lock is used to give this thread ownership of
       // services on the _polling_routine list.       // services on the _polling_routine list.
  
Line 187 
Line 149 
           }           }
       }       }
       list->unlock();       list->unlock();
   
       if (_check_idle_flag.get() != 0)  
       {  
          _check_idle_flag = 0;  
          // try to do idle thread clean up processing when system is not busy  
          // if system is busy there may not be a thread available to allocate  
          // so nothing will be done and that is OK.  
   
          if ( _thread_pool->allocate_and_awaken(service, kill_idle_threads, &_polling_sem) != PEGASUS_THREAD_OK)  
          {  
                 Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,  
                         "Not enough threads to kill idle threads. What an irony.");  
   
                 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,  
                         "Could not allocate thread to kill idle threads." \  
                         "Skipping. ");  
          }  
   
   
       }  
    }    }
    myself->exit_self( (ThreadReturnType) 1 );    myself->exit_self( (ThreadReturnType) 1 );
    return(0);    return(0);
Line 215 
Line 157 
  
 Semaphore MessageQueueService::_polling_sem(0); Semaphore MessageQueueService::_polling_sem(0);
 AtomicInt MessageQueueService::_stop_polling(0); AtomicInt MessageQueueService::_stop_polling(0);
 AtomicInt MessageQueueService::_check_idle_flag(0);  
  
  
 MessageQueueService::MessageQueueService( MessageQueueService::MessageQueueService(
Line 257 
Line 198 
       _stop_polling = 0;       _stop_polling = 0;
       PEGASUS_ASSERT(_service_count.get() == 0);       PEGASUS_ASSERT(_service_count.get() == 0);
       _meta_dispatcher = new cimom();       _meta_dispatcher = new cimom();
       if (_meta_dispatcher == NULL)  
       {  
          throw NullPointer();  
       }  
       //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",       //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
       //   minimum_cnt, maximum_cnt, deallocateWait);       //   minimum_cnt, maximum_cnt, deallocateWait);
       //       //
Line 271 
Line 209 
  
    if (false == register_service(name, _capabilities, _mask))    if (false == register_service(name, _capabilities, _mask))
    {    {
       //l10n  
       //throw BindFailedException("MessageQueueService Base Unable to register with  Meta Dispatcher");  
       MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",       MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
          "MessageQueueService Base Unable to register with  Meta Dispatcher");           "CIM base message queue service is unable to register with the CIMOM "
                "dispatcher.");
       throw BindFailedException(parms);       throw BindFailedException(parms);
    }    }
  
    _get_polling_list()->insert_back(this);    _get_polling_list()->insert_back(this);
   
 } }
  
  
Line 915 
Line 850 
 } }
  
  
 Boolean MessageQueueService::ForwardOp(  
     AsyncOpNode *op,  
     Uint32 destination)  
 {  
    PEGASUS_ASSERT(op != 0);  
    op->lock();  
    op->_op_dest = MessageQueue::lookup(destination);  
    op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);  
    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);  
    op->unlock();  
    if (op->_op_dest == 0)  
       return false;  
   
    return  _meta_dispatcher->route_async(op);  
 }  
   
   
 Boolean MessageQueueService::SendAsync( Boolean MessageQueueService::SendAsync(
     AsyncOpNode *op,     AsyncOpNode *op,
     Uint32 destination,     Uint32 destination,


Legend:
Removed from v.1.129  
changed lines
  Added in v.1.130

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2