(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.85 and 1.94.8.2

version 1.85, 2005/01/27 18:57:28 version 1.94.8.2, 2005/10/21 17:34:38
Line 1 
Line 1 
 //%2004////////////////////////////////////////////////////////////////////////  //%2005////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems. // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
Line 6 
Line 6 
 // IBM Corp.; EMC Corporation, The Open Group. // IBM Corp.; EMC Corporation, The Open Group.
 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.; // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group. // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; VERITAS Software Corporation; The Open Group.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 47 
Line 49 
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
  
   
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \ #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
Line 73 
Line 74 
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
   // Define a platform-neutral socket length type
   #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) || defined(PEGASUS_OS_VMS)
   typedef size_t PEGASUS_SOCKLEN_T;
   #elif defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) || defined(PEGASUS_OS_LINUX) || (defined(PEGASUS_OS_SOLARIS) && !defined(SUNOS_5_6))
   typedef socklen_t PEGASUS_SOCKLEN_T;
   #else
   typedef int PEGASUS_SOCKLEN_T;
   #endif
  
 static AtomicInt _connections = 0;  static AtomicInt _connections(0);
   
 static struct timeval create_time = {0, 1};  
 static struct timeval destroy_time = {300, 0};  
 static struct timeval deadlock_time = {0, 0};  
   
 ////////////////////////////////////////////////////////////////////////////////  
 //  
 // MonitorRep  
 //  
 ////////////////////////////////////////////////////////////////////////////////  
   
 struct MonitorRep  
 {  
     fd_set rd_fd_set;  
     fd_set wr_fd_set;  
     fd_set ex_fd_set;  
     fd_set active_rd_fd_set;  
     fd_set active_wr_fd_set;  
     fd_set active_ex_fd_set;  
 };  
  
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
Line 104 
Line 93 
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 Monitor::Monitor() Monitor::Monitor()
    : _module_handle(0),     : _stopConnections(0),
      _controller(0),  
      _async(false),  
      _stopConnections(0),  
      _stopConnectionsSem(0),      _stopConnectionsSem(0),
      _solicitSocketCount(0),      _solicitSocketCount(0),
      _tickle_client_socket(-1),      _tickle_client_socket(-1),
Line 116 
Line 102 
 { {
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
     Socket::initializeInterface();     Socket::initializeInterface();
     _rep = 0;  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);  
   
     // setup the tickler  
     initializeTickler();  
   
     // Start the count at 1 because initilizeTickler()  
     // has added an entry in the first position of the  
     // _entries array  
     for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )  
     {  
        _MonitorEntry entry(0, 0, 0);  
        _entries.append(entry);  
     }  
 }  
   
 Monitor::Monitor(Boolean async)  
    : _module_handle(0),  
      _controller(0),  
      _async(async),  
      _stopConnections(0),  
      _stopConnectionsSem(0),  
      _solicitSocketCount(0),  
      _tickle_client_socket(-1),  
      _tickle_server_socket(-1),  
      _tickle_peer_socket(-1)  
 {  
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;  
     Socket::initializeInterface();  
     _rep = 0;  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
  
     // setup the tickler     // setup the tickler
Line 163 
Line 119 
  
 Monitor::~Monitor() Monitor::~Monitor()
 { {
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                   "deregistering with module controller");  
   
     if(_module_handle.get() != NULL)  
     {  
        _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);  
        _controller.reset();  
        _module_handle.reset();  
     }  
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");  
   
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
  
     try{     try{
Line 239 
Line 184 
     _tickle_server_addr.sin_family = PF_INET;     _tickle_server_addr.sin_family = PF_INET;
     _tickle_server_addr.sin_port = 0;     _tickle_server_addr.sin_port = 0;
  
     PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_server_addr);      PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
  
     // bind server side to socket     // bind server side to socket
     if((::bind(_tickle_server_socket,     if((::bind(_tickle_server_socket,
                (struct sockaddr *)&_tickle_server_addr,                 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                sizeof(_tickle_server_addr))) < 0){                sizeof(_tickle_server_addr))) < 0){
         // handle error         // handle error
 #ifdef PEGASUS_OS_ZOS #ifdef PEGASUS_OS_ZOS
Line 276 
Line 221 
  
     // make sure we have the correct socket for our server     // make sure we have the correct socket for our server
     int sock = ::getsockname(_tickle_server_socket,     int sock = ::getsockname(_tickle_server_socket,
                              (struct sockaddr*)&_tickle_server_addr,                     reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                              &_addr_size);                              &_addr_size);
     if(sock < 0){     if(sock < 0){
         // handle error         // handle error
Line 323 
Line 268 
  
     // bind socket to client side     // bind socket to client side
     if((::bind(_tickle_client_socket,     if((::bind(_tickle_client_socket,
                (struct sockaddr*)&_tickle_client_addr,                 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
                sizeof(_tickle_client_addr))) < 0){                sizeof(_tickle_client_addr))) < 0){
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
Line 338 
Line 283 
  
     // connect to server side     // connect to server side
     if((::connect(_tickle_client_socket,     if((::connect(_tickle_client_socket,
                   (struct sockaddr*)&_tickle_server_addr,                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                   sizeof(_tickle_server_addr))) < 0){                   sizeof(_tickle_server_addr))) < 0){
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
Line 353 
Line 298 
  
     /* set up the slave connection */     /* set up the slave connection */
     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
     PEGASUS_SOCKLEN_SIZE peer_size = sizeof(_tickle_peer_addr);      PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
     pegasus_sleep(1);     pegasus_sleep(1);
  
     // this call may fail, we will try a max of 20 times to establish this peer connection     // this call may fail, we will try a max of 20 times to establish this peer connection
     if((_tickle_peer_socket = ::accept(_tickle_server_socket,     if((_tickle_peer_socket = ::accept(_tickle_server_socket,
                                        (struct sockaddr*)&_tickle_peer_addr,              reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
                                        &peer_size)) < 0){                                        &peer_size)) < 0){
 #if !defined(PEGASUS_OS_TYPE_WINDOWS) #if !defined(PEGASUS_OS_TYPE_WINDOWS)
         // Only retry on non-windows platforms.         // Only retry on non-windows platforms.
Line 369 
Line 314 
           {           {
             pegasus_sleep(1);             pegasus_sleep(1);
             _tickle_peer_socket = ::accept(_tickle_server_socket,             _tickle_peer_socket = ::accept(_tickle_server_socket,
                                            (struct sockaddr*)&_tickle_peer_addr,                  reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
                                            &peer_size);                                            &peer_size);
             retries++;             retries++;
           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
Line 424 
Line 369 
     fd_set fdread;     fd_set fdread;
     FD_ZERO(&fdread);     FD_ZERO(&fdread);
  
     _entry_mut.lock(pegasus_thread_self());      AutoMutex autoEntryMutex(_entry_mut);
  
     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
     if (_stopConnections == 1)      if (_stopConnections.get() == 1)
     {     {
         for ( int indx = 0; indx < (int)_entries.size(); indx++)         for ( int indx = 0; indx < (int)_entries.size(); indx++)
         {         {
             if (_entries[indx]._type == Monitor::ACCEPTOR)             if (_entries[indx]._type == Monitor::ACCEPTOR)
             {             {
                 if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)                  if ( _entries[indx]._status.get() != _MonitorEntry::EMPTY)
                 {                 {
                    if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||                     if ( _entries[indx]._status.get() == _MonitorEntry::IDLE ||
                         _entries[indx]._status.value() == _MonitorEntry::DYING )                          _entries[indx]._status.get() == _MonitorEntry::DYING )
                    {                    {
                        // remove the entry                        // remove the entry
                        _entries[indx]._status = _MonitorEntry::EMPTY;                        _entries[indx]._status = _MonitorEntry::EMPTY;
Line 456 
Line 401 
     for( int indx = 0; indx < (int)_entries.size(); indx++)     for( int indx = 0; indx < (int)_entries.size(); indx++)
     {     {
                          const _MonitorEntry &entry = _entries[indx];                          const _MonitorEntry &entry = _entries[indx];
        if ((entry._status.value() == _MonitorEntry::DYING) &&         if ((entry._status.get() == _MonitorEntry::DYING) &&
                                          (entry._type == Monitor::CONNECTION))                                          (entry._type == Monitor::CONNECTION))
        {        {
           MessageQueue *q = MessageQueue::lookup(entry.queueId);           MessageQueue *q = MessageQueue::lookup(entry.queueId);
Line 498 
Line 443 
           // unlocked will not result in an ArrayIndexOutOfBounds           // unlocked will not result in an ArrayIndexOutOfBounds
           // exception.           // exception.
  
           _entry_mut.unlock();            autoEntryMutex.unlock();
           o.enqueue(message);           o.enqueue(message);
           _entry_mut.lock(pegasus_thread_self());            autoEntryMutex.lock();
        }        }
     }     }
  
Line 518 
Line 463 
        if(maxSocketCurrentPass < _entries[indx].socket)        if(maxSocketCurrentPass < _entries[indx].socket)
           maxSocketCurrentPass = _entries[indx].socket;           maxSocketCurrentPass = _entries[indx].socket;
  
        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)         if(_entries[indx]._status.get() == _MonitorEntry::IDLE)
        {        {
           _idleEntries++;           _idleEntries++;
           FD_SET(_entries[indx].socket, &fdread);           FD_SET(_entries[indx].socket, &fdread);
Line 531 
Line 476 
     */     */
     maxSocketCurrentPass++;     maxSocketCurrentPass++;
  
     _entry_mut.unlock();      autoEntryMutex.unlock();
     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
    _entry_mut.lock(pegasus_thread_self());      autoEntryMutex.lock();
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
     if(events == SOCKET_ERROR)     if(events == SOCKET_ERROR)
Line 559 
Line 504 
        {        {
           // The Monitor should only look at entries in the table that are IDLE (i.e.,           // The Monitor should only look at entries in the table that are IDLE (i.e.,
           // owned by the Monitor).           // owned by the Monitor).
           if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&            if((_entries[indx]._status.get() == _MonitorEntry::IDLE) &&
              (FD_ISSET(_entries[indx].socket, &fdread)))              (FD_ISSET(_entries[indx].socket, &fdread)))
           {           {
              MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);              MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
Line 588 
Line 533 
                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                           "Monitor::run: Insufficient resources to process request.");                           "Monitor::run: Insufficient resources to process request.");
                       _entries[indx]._status = _MonitorEntry::IDLE;                       _entries[indx]._status = _MonitorEntry::IDLE;
                       _entry_mut.unlock();  
                       return true;                       return true;
                    }                    }
 */ */
Line 611 
Line 555 
  
                    // It is possible the entry status may not be set to busy.                    // It is possible the entry status may not be set to busy.
                    // The following will fail in that case.                    // The following will fail in that case.
                    // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);                     // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
                    // Once the HTTPConnection thread has set the status value to either                    // Once the HTTPConnection thread has set the status value to either
                    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection                    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
                    // to the Monitor.  It is no longer permissible to access the connection                    // to the Monitor.  It is no longer permissible to access the connection
Line 634 
Line 578 
                         // read the data                         // read the data
                         // and set ourself back to IDLE                         // and set ourself back to IDLE
  
                         _entries[indx]._status == _MonitorEntry::BUSY;                          _entries[indx]._status.get() == _MonitorEntry::BUSY;
                         static char buffer[2];                         static char buffer[2];
                         Socket::disableBlocking(_entries[indx].socket);                         Socket::disableBlocking(_entries[indx].socket);
                         Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);                         Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
                         Socket::enableBlocking(_entries[indx].socket);                         Socket::enableBlocking(_entries[indx].socket);
                         _entries[indx]._status == _MonitorEntry::IDLE;                          _entries[indx]._status.get() == _MonitorEntry::IDLE;
                 }                 }
                 else                 else
                 {                 {
Line 649 
Line 593 
                    events |= SocketMessage::READ;                    events |= SocketMessage::READ;
                    Message *msg = new SocketMessage(_entries[indx].socket, events);                    Message *msg = new SocketMessage(_entries[indx].socket, events);
                    _entries[indx]._status = _MonitorEntry::BUSY;                    _entries[indx]._status = _MonitorEntry::BUSY;
                    _entry_mut.unlock();                     autoEntryMutex.unlock();
   
                    q->enqueue(msg);                    q->enqueue(msg);
                      autoEntryMutex.lock();
                    _entries[indx]._status = _MonitorEntry::IDLE;                    _entries[indx]._status = _MonitorEntry::IDLE;
   
                    return true;                    return true;
                 }                 }
              }              }
Line 663 
Line 608 
           }           }
        }        }
     }     }
     _entry_mut.unlock();  
     return(handled_events);     return(handled_events);
 } }
  
Line 707 
Line 652 
    // current connections requested    // current connections requested
    _solicitSocketCount++;  // bump the count    _solicitSocketCount++;  // bump the count
    int size = (int)_entries.size();    int size = (int)_entries.size();
    if(_solicitSocketCount >= (size-1)){     if((int)_solicitSocketCount >= (size-1)){
         for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){          for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
                 _MonitorEntry entry(0, 0, 0);                 _MonitorEntry entry(0, 0, 0);
                 _entries.append(entry);                 _entries.append(entry);
         }         }
Line 719 
Line 664 
    {    {
       try       try
       {       {
          if(_entries[index]._status.value() == _MonitorEntry::EMPTY)           if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
          {          {
             _entries[index].socket = socket;             _entries[index].socket = socket;
             _entries[index].queueId  = queueId;             _entries[index].queueId  = queueId;
Line 749 
Line 694 
         Start at index = 1 because _entries[0] is the tickle entry which never needs         Start at index = 1 because _entries[0] is the tickle entry which never needs
         to be EMPTY;         to be EMPTY;
     */     */
     int index;      unsigned int index;
     for(index = 1; index < _entries.size(); index++)     for(index = 1; index < _entries.size(); index++)
     {     {
        if(_entries[index].socket == socket)        if(_entries[index].socket == socket)
Line 768 
Line 713 
         This prevents the positions, of the NON EMPTY entries, from being changed.         This prevents the positions, of the NON EMPTY entries, from being changed.
     */     */
     index = _entries.size() - 1;     index = _entries.size() - 1;
     while(_entries[index]._status == _MonitorEntry::EMPTY){      while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
                 _entries.remove(index);                 _entries.remove(index);
         index--;         index--;
     }     }
   
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 } }
  
Line 796 
Line 740 
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
  
    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);     PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
  
    // Once the HTTPConnection thread has set the status value to either    // Once the HTTPConnection thread has set the status value to either
    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection


Legend:
Removed from v.1.85  
changed lines
  Added in v.1.94.8.2

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2