(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.153 and 1.159

version 1.153, 2008/12/01 17:49:52 version 1.159, 2008/12/16 18:56:00
Line 33 
Line 33 
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
 #include <Pegasus/Common/MessageLoader.h> #include <Pegasus/Common/MessageLoader.h>
  
   PEGASUS_USING_STD;
   
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 cimom *MessageQueueService::_meta_dispatcher = 0; cimom *MessageQueueService::_meta_dispatcher = 0;
Line 72 
Line 74 
     void* parm)     void* parm)
 { {
     Thread *myself = reinterpret_cast<Thread *>(parm);     Thread *myself = reinterpret_cast<Thread *>(parm);
     List<MessageQueueService, Mutex> *list =      MessageQueueService::PollingList *list =
         reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm());          reinterpret_cast<MessageQueueService::PollingList*>(myself->get_parm());
  
       try
       {
     while (_stop_polling.get()  == 0)     while (_stop_polling.get()  == 0)
     {     {
         _polling_sem.wait();         _polling_sem.wait();
Line 93 
Line 97 
         // processing the _polling_list         // processing the _polling_list
         // (e.g., MessageQueueServer::~MessageQueueService).         // (e.g., MessageQueueServer::~MessageQueueService).
  
         list->lock();              _polling_list_mutex.lock();
         MessageQueueService *service = list->front();         MessageQueueService *service = list->front();
         ThreadStatus rtn = PEGASUS_THREAD_OK;         ThreadStatus rtn = PEGASUS_THREAD_OK;
         while (service != NULL)         while (service != NULL)
Line 111 
Line 115 
                 // lock and has ownership of the service object.                 // lock and has ownership of the service object.
  
                 service->_threads++;                 service->_threads++;
                 try  
                 {  
                     rtn = _thread_pool->allocate_and_awaken(                     rtn = _thread_pool->allocate_and_awaken(
                         service, _req_proc, &_polling_sem);                         service, _req_proc, &_polling_sem);
                 }  
                 catch (...)  
                 {  
                     service->_threads--;  
   
                     // allocate_and_awaken should never generate an exception.  
                     PEGASUS_ASSERT(0);  
                 }  
                 // if no more threads available, break from processing loop                 // if no more threads available, break from processing loop
                 if (rtn != PEGASUS_THREAD_OK )                 if (rtn != PEGASUS_THREAD_OK )
                 {                 {
Line 136 
Line 130 
                         service->_threads.get()));                         service->_threads.get()));
  
                     Threads::yield();                     Threads::yield();
                     service = NULL;                          break;
                 }                 }
             }             }
             if (service != NULL)  
             {  
                 service = list->next_of(service);                 service = list->next_of(service);
             }             }
               _polling_list_mutex.unlock();
         }         }
         list->unlock();  
     }     }
       catch(const Exception &e)
       {
           PEG_TRACE((TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1,
               "Exception caught in MessageQueueService::polling_routine : %s",
                   (const char*)e.getMessage().getCString()));
       }
       catch(const exception &e)
       {
           PEG_TRACE((TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1,
               "Exception caught in MessageQueueService::polling_routine : %s",
                   e.what()));
       }
       catch(...)
       {
           PEG_TRACE_CSTRING(TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1,
               "Unknown Exception caught in MessageQueueService::polling_routine");
       }
   
       PEGASUS_ASSERT(_stop_polling.get());
   
     return ThreadReturnType(0);     return ThreadReturnType(0);
 } }
  
Line 195 
Line 207 
     }     }
     _service_count++;     _service_count++;
  
     _get_polling_list()->insert_back(this);      // Add to the polling list
       if (!_polling_list)
       {
           _polling_list = new PollingList;
       }
       _polling_list->insert_back(this);
      _meta_dispatcher->registerCIMService(this);
 } }
  
  
 MessageQueueService::~MessageQueueService() MessageQueueService::~MessageQueueService()
 { {
   
     // Close incoming queue.     // Close incoming queue.
     if (_incoming_queue_shutdown.get() == 0)     if (_incoming_queue_shutdown.get() == 0)
     {     {
Line 220 
Line 239 
     // die now.     // die now.
     _die = 1;     _die = 1;
  
       _meta_dispatcher->deregisterCIMService(this);
   
     // Wait until all threads processing the messages     // Wait until all threads processing the messages
     // for this service have completed.     // for this service have completed.
     while (_threads.get() > 0)     while (_threads.get() > 0)
Line 227 
Line 248 
         Threads::yield();         Threads::yield();
     }     }
  
   
     // The polling_routine locks the _polling_list while     // The polling_routine locks the _polling_list while
     // processing the incoming messages for services on the     // processing the incoming messages for services on the
     // list.  Deleting the service from the _polling_list     // list.  Deleting the service from the _polling_list
Line 236 
Line 258 
  
     {     {
         AutoMutex autoMut(_meta_dispatcher_mutex);         AutoMutex autoMut(_meta_dispatcher_mutex);
   
         _service_count--;         _service_count--;
         // If we are last service to die, delete metadispatcher.         // If we are last service to die, delete metadispatcher.
         if (_service_count.get() == 0)         if (_service_count.get() == 0)
Line 254 
Line 277 
             delete _thread_pool;             delete _thread_pool;
             _thread_pool = 0;             _thread_pool = 0;
         }         }
     } // mutex unlocks here      }
  
     // Clean up any extra stuff on the queue.     // Clean up any extra stuff on the queue.
     AsyncOpNode* op = 0;     AsyncOpNode* op = 0;
Line 476 
Line 499 
  
 Boolean MessageQueueService::accept_async(AsyncOpNode* op) Boolean MessageQueueService::accept_async(AsyncOpNode* op)
 { {
       if (!_isRunning)
       {
           // Don't accept any messages other than start.
           if (op->_request.get()->getType() != ASYNC_CIMSERVICE_START)
           {
               return false;
           }
       }
   
     if (_incoming_queue_shutdown.get() > 0)     if (_incoming_queue_shutdown.get() > 0)
         return false;         return false;
   
     if (_polling_thread == NULL)     if (_polling_thread == NULL)
     {     {
           PEGASUS_ASSERT(_polling_list);
         _polling_thread = new Thread(         _polling_thread = new Thread(
             polling_routine,             polling_routine,
             reinterpret_cast<void *>(_get_polling_list()),              reinterpret_cast<void *>(_polling_list),
             false);             false);
         ThreadStatus tr = PEGASUS_THREAD_OK;         ThreadStatus tr = PEGASUS_THREAD_OK;
         while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)         while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
Line 694 
Line 728 
     return queue->getQueueId();     return queue->getQueueId();
 } }
  
 MessageQueueService::PollingList* MessageQueueService::_get_polling_list()  
 {  
     _polling_list_mutex.lock();  
   
     if (!_polling_list)  
         _polling_list = new PollingList;  
   
     _polling_list_mutex.unlock();  
   
     return _polling_list;  
 }  
   
 void MessageQueueService::_removeFromPollingList(MessageQueueService *service) void MessageQueueService::_removeFromPollingList(MessageQueueService *service)
 { {
     _polling_list_mutex.lock();     _polling_list_mutex.lock();


Legend:
Removed from v.1.153  
changed lines
  Added in v.1.159

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2