(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.51 and 1.53

version 1.51, 2002/06/01 00:56:36 version 1.53, 2002/06/07 00:03:32
Line 1 
Line 1 
 //%/////////////////////////////////////////////////////////////////////////////  //%////-*-c++-*-////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,  // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
 // The Open Group, Tivoli Systems  
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 38 
Line 37 
 AtomicInt MessageQueueService::_xid(1); AtomicInt MessageQueueService::_xid(1);
 Mutex MessageQueueService::_meta_dispatcher_mutex; Mutex MessageQueueService::_meta_dispatcher_mutex;
  
   static struct timeval create_time = {0, 100};
   static struct timeval destroy_time = {1, 0};
   static struct timeval deadlock_time = {10, 0};
   
   ThreadPool MessageQueueService::_thread_pool(2, "MessageQueueService", 2, 20,
                                                create_time, destroy_time, deadlock_time);
   
   DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
   
   int MessageQueueService::kill_idle_threads(void)
   {
      static struct timeval now, last;
      gettimeofday(&now, NULL);
   
      if( now.tv_sec - last.tv_sec > 0 )
      {
         gettimeofday(&last, NULL);
         return _thread_pool.kill_dead_threads();
      }
      return 0;
   }
   
   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
   {
      Thread *myself = reinterpret_cast<Thread *>(parm);
   
      DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
   
      while ( _stop_polling.value()  == 0 )
      {
         _polling_sem.wait();
         list->lock();
         MessageQueueService *service = list->next(0);
         while(service != NULL)
         {
            if(service->_incoming.count() > 0 )
            {
               _thread_pool.allocate_and_awaken(service, _req_proc);
   
   //          service->_req_proc(service);
            }
   
            service = list->next(service);
         }
         list->unlock();
      }
      myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
      return(0);
   }
   
   Thread MessageQueueService::_polling_thread(polling_routine,
                                               reinterpret_cast<void *>(&_polling_list),
                                               false);
   
   
   Semaphore MessageQueueService::_polling_sem(0);
   AtomicInt MessageQueueService::_stop_polling(0);
   
  
 MessageQueueService::MessageQueueService(const char *name, MessageQueueService::MessageQueueService(const char *name,
                                          Uint32 queueID,                                          Uint32 queueID,
Line 53 
Line 110 
      _callback_ready(0),      _callback_ready(0),
      _req_thread(_req_proc, this, false),      _req_thread(_req_proc, this, false),
      _callback_thread(_callback_proc, this, false)      _callback_thread(_callback_proc, this, false)
   
 { {
    _capabilities = (capabilities | module_capabilities::async);    _capabilities = (capabilities | module_capabilities::async);
  
Line 68 
Line 126 
       if (_meta_dispatcher == NULL )       if (_meta_dispatcher == NULL )
       {       {
          _meta_dispatcher_mutex.unlock();          _meta_dispatcher_mutex.unlock();
   
          throw NullPointer();          throw NullPointer();
       }       }
         _polling_thread.run();
    }    }
    _service_count++;    _service_count++;
  
Line 81 
Line 138 
       throw BindFailed("MessageQueueService Base Unable to register with  Meta Dispatcher");       throw BindFailed("MessageQueueService Base Unable to register with  Meta Dispatcher");
    }    }
  
      _polling_list.insert_last(this);
   
    _meta_dispatcher_mutex.unlock();    _meta_dispatcher_mutex.unlock();
 //   _callback_thread.run(); //   _callback_thread.run();
  
    _req_thread.run();  //   _req_thread.run();
 } }
  
  
Line 96 
Line 155 
       _shutdown_incoming_queue();       _shutdown_incoming_queue();
    }    }
    _callback_ready.signal();    _callback_ready.signal();
    _callback_thread.join();  //   _callback_thread.join();
  
    _meta_dispatcher_mutex.lock(pegasus_thread_self());    _meta_dispatcher_mutex.lock(pegasus_thread_self());
    _service_count--;    _service_count--;
    if (_service_count.value() == 0 )    if (_service_count.value() == 0 )
    {    {
         _stop_polling++;
         _polling_sem.signal();
         _polling_thread.join();
       _meta_dispatcher->_shutdown_routed_queue();       _meta_dispatcher->_shutdown_routed_queue();
       delete _meta_dispatcher;       delete _meta_dispatcher;
       _meta_dispatcher = 0;       _meta_dispatcher = 0;
   
    }    }
    _meta_dispatcher_mutex.unlock();    _meta_dispatcher_mutex.unlock();
      _polling_list.remove(this);
 } }
  
   
   
 void MessageQueueService::_shutdown_incoming_queue(void) void MessageQueueService::_shutdown_incoming_queue(void)
 { {
  
Line 137 
Line 198 
  
    _incoming.insert_last_wait(msg->op);    _incoming.insert_last_wait(msg->op);
  
    _req_thread.join();  //   _req_thread.join();
  
 } }
  
Line 187 
Line 248 
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 { {
    Thread *myself = reinterpret_cast<Thread *>(parm);  //   Thread *myself = reinterpret_cast<Thread *>(parm);
    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());  //   MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
      MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
    // pull messages off the incoming queue and dispatch them. then    // pull messages off the incoming queue and dispatch them. then
    // check pending messages that are non-blocking    // check pending messages that are non-blocking
    AsyncOpNode *operation = 0;    AsyncOpNode *operation = 0;
  
    while ( service->_die.value() == 0 )  //    while ( service->_die.value() == 0 )
    {  //    {
          try          try
          {          {
             operation = service->_incoming.remove_first_wait();              operation = service->_incoming.remove_first();
          }          }
          catch(ListClosed & )          catch(ListClosed & )
          {          {
             break;  //          break;
          }          }
          if( operation )          if( operation )
          {          {
             operation->_thread_ptr = myself;  //          operation->_thread_ptr = pegasus_thread_self();
             operation->_service_ptr = service;             operation->_service_ptr = service;
             service->_handle_incoming_operation(operation);             service->_handle_incoming_operation(operation);
          }          }
    }  //    }
  
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );  //    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
    return(0);    return(0);
 } }
  
Line 481 
Line 542 
         _die.value() == 0  )         _die.value() == 0  )
    {    {
       _incoming.insert_last_wait(op);       _incoming.insert_last_wait(op);
         _polling_sem.signal();
       return true;       return true;
    }    }
 //    else //    else
Line 551 
Line 613 
       case AsyncIoctl::IO_CLOSE:       case AsyncIoctl::IO_CLOSE:
       {       {
          // save my bearings          // save my bearings
          Thread *myself = req->op->_thread_ptr;  //       Thread *myself = req->op->_thread_ptr;
          MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);          MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
  
          // respond to this message.          // respond to this message.
Line 578 
Line 640 
             }             }
             if( operation )             if( operation )
             {             {
                operation->_thread_ptr = myself;  //             operation->_thread_ptr = myself;
                operation->_service_ptr = service;                operation->_service_ptr = service;
                service->_handle_incoming_operation(operation);                service->_handle_incoming_operation(operation);
             }             }
Line 589 
Line 651 
          // shutdown the AsyncDQueue          // shutdown the AsyncDQueue
          service->_incoming.shutdown_queue();          service->_incoming.shutdown_queue();
          // exit the thread !          // exit the thread !
          myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );  //       myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
          return;          return;
       }       }
  


Legend:
Removed from v.1.51  
changed lines
  Added in v.1.53

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2