(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.6 and 1.13

version 1.6, 2002/05/08 20:28:44 version 1.13, 2002/05/28 20:05:35
Line 32 
Line 32 
 #include "MessageQueue.h" #include "MessageQueue.h"
 #include "Socket.h" #include "Socket.h"
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
   #include <Pegasus/Common/HTTPConnection.h>
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
Line 82 
Line 83 
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
  
 Monitor::Monitor() Monitor::Monitor()
      : _module_handle(0), _controller(0), _async(false)
 { {
     Socket::initializeInterface();     Socket::initializeInterface();
   
     _rep = new MonitorRep;     _rep = new MonitorRep;
     FD_ZERO(&_rep->rd_fd_set);     FD_ZERO(&_rep->rd_fd_set);
     FD_ZERO(&_rep->wr_fd_set);     FD_ZERO(&_rep->wr_fd_set);
Line 96 
Line 97 
  
 Monitor::~Monitor() Monitor::~Monitor()
 { {
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                     "deregistering with module controller");
   
       if(_module_handle != NULL)
       {
          _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
          _controller = 0;
          delete _module_handle;
       }
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
   
     delete _rep;     delete _rep;
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
     Socket::uninitializeInterface();     Socket::uninitializeInterface();
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                     "returning from monitor destructor");
 } }
  
   
   
   //<<< Tue May 14 20:38:26 2002 mdd >>>
   //  register with module controller
   //  when it is time to enqueue the message,
   // use an async_thread_exec call to
   // isolate the entire if(events) { enqueue -> fd_clear } block
   //  let the thread pool grow and shrink according to load.
   
 Boolean Monitor::run(Uint32 milliseconds) Boolean Monitor::run(Uint32 milliseconds)
 { {
      // register the monitor as a module to gain access to the cimserver's thread pool
      // <<< Wed May 15 09:52:16 2002 mdd >>>
      while( _module_handle == NULL)
      {
         try
         {
            _controller = &(ModuleController::register_module(PEGASUS_QUEUENAME_CONTROLSERVICE,
                                                              PEGASUS_MODULENAME_MONITOR,
                                                              (void *)this,
                                                              0,
                                                              0,
                                                              0,
                                                              &_module_handle));
   
         }
         catch(IncompatibleTypes &)
         {
            ModuleController* controlService =
               new ModuleController(PEGASUS_QUEUENAME_CONTROLSERVICE);
         }
         catch( AlreadyExists & )
         {
            break;
         }
      }
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
  
     // Windows select() has a strange little bug. It returns immediately if     // Windows select() has a strange little bug. It returns immediately if
Line 116 
Line 166 
     // Check for events on the selected file descriptors. Only do this if     // Check for events on the selected file descriptors. Only do this if
     // there were no undispatched events from last time.     // there were no undispatched events from last time.
  
     static int count = 0;      int count = 0;
   
  
     if (count == 0)  
     {  
         memcpy(&_rep->active_rd_fd_set, &_rep->rd_fd_set, sizeof(fd_set));         memcpy(&_rep->active_rd_fd_set, &_rep->rd_fd_set, sizeof(fd_set));
         memcpy(&_rep->active_wr_fd_set, &_rep->wr_fd_set, sizeof(fd_set));         memcpy(&_rep->active_wr_fd_set, &_rep->wr_fd_set, sizeof(fd_set));
         memcpy(&_rep->active_ex_fd_set, &_rep->ex_fd_set, sizeof(fd_set));         memcpy(&_rep->active_ex_fd_set, &_rep->ex_fd_set, sizeof(fd_set));
Line 134 
Line 183 
             &_rep->active_wr_fd_set,             &_rep->active_wr_fd_set,
             &_rep->active_ex_fd_set,             &_rep->active_ex_fd_set,
             &tv);             &tv);
   
         if (count == 0)         if (count == 0)
       {
          pegasus_sleep(milliseconds);
   
             return false;             return false;
       }
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
         else if (count == SOCKET_ERROR)         else if (count == SOCKET_ERROR)
 #else #else
         else if (count == -1)         else if (count == -1)
 #endif #endif
         {         {
             count = 0;         pegasus_sleep(milliseconds);
             return false;             return false;
         }         }
     }  
  
     for (Uint32 i = 0, n = _entries.size(); i < n; i++)      Boolean handled_events = false;
       for (Uint32 i = 0, n = _entries.size(); i < _entries.size(); i++)
     {     {
         Sint32 socket = _entries[i].socket;         Sint32 socket = _entries[i].socket;
         Uint32 events = 0;         Uint32 events = 0;
  
           if(_entries[i].dying.value() > 0 )
           {
              if(_entries[i]._type == Monitor::CONNECTION)
              {
   
                 MessageQueue *q = MessageQueue::lookup(_entries[i].queueId);
                 if(q && static_cast<HTTPConnection *>(q)->is_dying() &&
                    (0 == static_cast<HTTPConnection *>(q)->refcount.value()))
                 {
                    static_cast<HTTPConnection *>(q)->lock_connection();
                    static_cast<HTTPConnection *>(q)->unlock_connection();
   
                    MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
                    Message* message= new CloseConnectionMessage(static_cast<HTTPConnection *>(q)->getSocket());
                    message->dest = o.getQueueId();
                    o.enqueue(message);
                    i--;
                    n = _entries.size();
                 }
              }
           }
   
         if (FD_ISSET(socket, &_rep->active_rd_fd_set))         if (FD_ISSET(socket, &_rep->active_rd_fd_set))
             events |= SocketMessage::READ;             events |= SocketMessage::READ;
  
Line 166 
Line 241 
         {         {
             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                "Monitor::run - Socket Event Detected events = %d", events);                "Monitor::run - Socket Event Detected events = %d", events);
   
             MessageQueue* queue = MessageQueue::lookup(_entries[i].queueId);  
   
             if (!queue)  
                 unsolicitSocketMessages(_entries[i].queueId);  
   
   
             Message* message = new SocketMessage(socket, events);  
             queue->enqueue(message);  
   
             if (events & SocketMessage::WRITE)             if (events & SocketMessage::WRITE)
             {             {
                 FD_CLR(socket, &_rep->active_wr_fd_set);                 FD_CLR(socket, &_rep->active_wr_fd_set);
                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                    "Monitor::run FD_CLR WRITE");                    "Monitor::run FD_CLR WRITE");
             }             }
   
             if (events & SocketMessage::EXCEPTION)             if (events & SocketMessage::EXCEPTION)
             {             {
                 FD_CLR(socket, &_rep->active_ex_fd_set);                 FD_CLR(socket, &_rep->active_ex_fd_set);
                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                    "Monitor::run FD_CLR EXECEPTION");                    "Monitor::run FD_CLR EXECEPTION");
             }             }
   
             if (events & SocketMessage::READ)             if (events & SocketMessage::READ)
             {             {
                 FD_CLR(socket, &_rep->active_rd_fd_set);                 FD_CLR(socket, &_rep->active_rd_fd_set);
                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                    "Monitor::run FD_CLR READ");                    "Monitor::run FD_CLR READ");
             }             }
               MessageQueue* queue = MessageQueue::lookup(_entries[i].queueId);
               if( ! queue )
               {
                  unsolicitSocketMessages(socket);
                  break;
               }
  
               if(_entries[i]._type == Monitor::CONNECTION)
               {
                  if( static_cast<HTTPConnection *>(queue)->refcount.value() == 0 )
                  {
                     static_cast<HTTPConnection *>(queue)->refcount++;
                     if( false == static_cast<HTTPConnection *>(queue)->is_dying())
                        _controller->async_thread_exec(*_module_handle, _dispatch, (void *)queue);
                     else
                        static_cast<HTTPConnection *>(queue)->refcount--;
                  }
               }
               else
               {
                  Message* message = new SocketMessage(socket, events);
                  queue->enqueue(message);
               }
             count--;             count--;
             return true;  
         }         }
           handled_events = true;
     }     }
       return(handled_events);
     return false;  
 } }
  
 Boolean Monitor::solicitSocketMessages( Boolean Monitor::solicitSocketMessages(
     Sint32 socket,     Sint32 socket,
     Uint32 events,     Uint32 events,
     Uint32 queueId)      Uint32 queueId,
       int type)
 { {
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solictSocketMessage");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solictSocketMessage");
  
     // See whether a handler is already registered for this one:     // See whether a handler is already registered for this one:
   
     Uint32 pos = _findEntry(socket);     Uint32 pos = _findEntry(socket);
  
     if (pos != PEGASUS_NOT_FOUND)     if (pos != PEGASUS_NOT_FOUND)
Line 234 
Line 318 
         FD_SET(socket, &_rep->ex_fd_set);         FD_SET(socket, &_rep->ex_fd_set);
  
     // Add the entry to the list:     // Add the entry to the list:
       _MonitorEntry entry(socket, queueId, type);
     _MonitorEntry entry = { socket, queueId };  
     _entries.append(entry);     _entries.append(entry);
  
     // Success!     // Success!
Line 259 
Line 342 
             FD_CLR(socket, &_rep->wr_fd_set);             FD_CLR(socket, &_rep->wr_fd_set);
             FD_CLR(socket, &_rep->ex_fd_set);             FD_CLR(socket, &_rep->ex_fd_set);
             _entries.remove(i);             _entries.remove(i);
               // ATTN-RK-P3-20020521: Need "Socket::close(socket);" here?
             PEG_METHOD_EXIT();             PEG_METHOD_EXIT();
             return true;             return true;
         }         }
     }     }
   
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
     return false;     return false;
 } }
  
 Uint32 Monitor::_findEntry(Sint32 socket) const  Uint32 Monitor::_findEntry(Sint32 socket)
 { {
     for (Uint32 i = 0, n = _entries.size(); i < n; i++)     for (Uint32 i = 0, n = _entries.size(); i < n; i++)
     {     {
Line 279 
Line 362 
     return PEG_NOT_FOUND;     return PEG_NOT_FOUND;
 } }
  
   
   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
   {
      HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
      if( true == dst->is_dying())
      {
         dst->refcount--;
         return 0;
      }
      if( false == dst->is_dying())
      {
         dst->run(1);
      }
      dst->refcount--;
      return 0;
   }
   
   
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.6  
changed lines
  Added in v.1.13

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2