(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.88.2.2 and 1.97

version 1.88.2.2, 2005/04/14 21:12:01 version 1.97, 2005/03/20 18:17:49
Line 30 
Line 30 
 // Author: Mike Day (mdday@us.ibm.com) // Author: Mike Day (mdday@us.ibm.com)
 // //
 // Modified By: // Modified By:
 //              Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090  //              Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090,#2657
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
Line 48 
Line 48 
  
 static struct timeval create_time = {0, 1}; static struct timeval create_time = {0, 1};
 static struct timeval destroy_time = {300, 0}; static struct timeval destroy_time = {300, 0};
 static struct timeval deadlock_time = {0, 0};  
  
 ThreadPool *MessageQueueService::_thread_pool = 0; ThreadPool *MessageQueueService::_thread_pool = 0;
  
Line 95 
Line 94 
 { {
    return;    return;
  
   #if !defined(PEGASUS_OS_VMS) // Bugzilla 3090
   
 #ifdef MESSAGEQUEUESERVICE_DEBUG #ifdef MESSAGEQUEUESERVICE_DEBUG
         //l10n         //l10n
    MessageLoaderParms parms("Common.MessageQueueService.FORCING_SHUTDOWN",    MessageLoaderParms parms("Common.MessageQueueService.FORCING_SHUTDOWN",
Line 141 
Line 142 
       }       }
  
    }    }
   #endif // PEGASUS_OS_VMS
 } }
  
  
Line 150 
Line 152 
    DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());    DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
    while ( _stop_polling.value()  == 0 )    while ( _stop_polling.value()  == 0 )
    {    {
         try
         {
       _polling_sem.wait();       _polling_sem.wait();
         }
         catch (WaitInterrupted)
         {
            PEG_TRACE_STRING(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
               "polling_routine(): got WaitInterrupted exception." );
         }
   
       if(_stop_polling.value() != 0 )       if(_stop_polling.value() != 0 )
       {       {
          break;          break;
Line 212 
Line 223 
  
    if( _meta_dispatcher == 0 )    if( _meta_dispatcher == 0 )
    {    {
       _stop_polling = 0;  
       PEGASUS_ASSERT( _service_count.value() == 0 );       PEGASUS_ASSERT( _service_count.value() == 0 );
       _meta_dispatcher = new cimom();       _meta_dispatcher = new cimom();
       if (_meta_dispatcher == NULL )       if (_meta_dispatcher == NULL )
Line 220 
Line 230 
          throw NullPointer();          throw NullPointer();
       }       }
       _thread_pool = new ThreadPool(0, "MessageQueueService", 0, 0,       _thread_pool = new ThreadPool(0, "MessageQueueService", 0, 0,
                                     create_time, destroy_time, deadlock_time);                                      create_time, destroy_time);
  
       _polling_thread = new Thread(polling_routine,  
                                    reinterpret_cast<void *>(&_polling_list),  
                                    false);  
       while (!_polling_thread->run())  
       {  
          pegasus_yield();  
       }  
    }    }
    _service_count++;    _service_count++;
  
Line 269 
Line 272 
  
       _stop_polling++;       _stop_polling++;
       _polling_sem.signal();       _polling_sem.signal();
             if (_polling_thread) {
       _polling_thread->join();       _polling_thread->join();
       delete _polling_thread;       delete _polling_thread;
       _polling_thread = 0;       _polling_thread = 0;
             }
       _meta_dispatcher->_shutdown_routed_queue();       _meta_dispatcher->_shutdown_routed_queue();
       delete _meta_dispatcher;       delete _meta_dispatcher;
       _meta_dispatcher = 0;       _meta_dispatcher = 0;
Line 336 
Line 341 
  
    while ( service->_die.value() == 0 )    while ( service->_die.value() == 0 )
    {    {
         try
         {
       service->_callback_ready.wait();       service->_callback_ready.wait();
         }
         catch (WaitInterrupted)
         {
            PEG_TRACE_STRING(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
               "_callback_proc(): got WaitInterrupted exception." );
         }
  
       service->_callback.lock();       service->_callback.lock();
       operation = service->_callback.next(0);       operation = service->_callback.next(0);
Line 363 
Line 376 
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 { {
    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
   
      if ( service->_die.value() != 0)
                    return (0);
   
    // pull messages off the incoming queue and dispatch them. then    // pull messages off the incoming queue and dispatch them. then
    // check pending messages that are non-blocking    // check pending messages that are non-blocking
    AsyncOpNode *operation = 0;    AsyncOpNode *operation = 0;
  
    if ( service->_die.value() == 0 )           // many operations may have been queued.
            do
     {     {
          try          try
          {          {
Line 375 
Line 393 
          }          }
          catch(ListClosed & )          catch(ListClosed & )
          {          {
             operation = 0;                           break;
   
             return(0);  
          }          }
   
          if( operation )          if( operation )
          {          {
             operation->_service_ptr = service;             operation->_service_ptr = service;
             service->_handle_incoming_operation(operation);             service->_handle_incoming_operation(operation);
          }          }
     }           } while (operation);
  
    return(0);    return(0);
 } }
Line 638 
Line 655 
 { {
    if (_incoming_queue_shutdown.value() > 0 )    if (_incoming_queue_shutdown.value() > 0 )
       return false;       return false;
      if (_polling_thread == NULL)  {
         _polling_thread = new Thread(polling_routine,
                                      reinterpret_cast<void *>(&_polling_list),
                                      false);
         while (!_polling_thread->run())
         {
            pegasus_yield();
         }
           }
 // ATTN optimization remove the message checking altogether in the base // ATTN optimization remove the message checking altogether in the base
 // << Mon Feb 18 14:02:20 2002 mdd >> // << Mon Feb 18 14:02:20 2002 mdd >>
    op->lock();    op->lock();
Line 877 
Line 902 
    op->_op_dest = MessageQueue::lookup(destination); // destination of this message    op->_op_dest = MessageQueue::lookup(destination); // destination of this message
    op->_flags |= ASYNC_OPFLAGS_CALLBACK;    op->_flags |= ASYNC_OPFLAGS_CALLBACK;
    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
    op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
    // initialize the callback data    // initialize the callback data
    op->_async_callback = callback;   // callback function to be executed by recpt. of response    op->_async_callback = callback;   // callback function to be executed by recpt. of response
    op->_callback_node = op;          // the op node    op->_callback_node = op;          // the op node
Line 1001 
Line 1025 
              this,              this,
              (void *)0);              (void *)0);
  
      try
      {
    request->op->_client_sem.wait();    request->op->_client_sem.wait();
      }
      catch (WaitInterrupted)
      {
         PEG_TRACE_STRING(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
            "SendWait(): got WaitInterrupted exception" );
      }
    request->op->lock();    request->op->lock();
    AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());    AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
    rpl->op = 0;    rpl->op = 0;


Legend:
Removed from v.1.88.2.2  
changed lines
  Added in v.1.97

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2