(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.117 and 1.121.4.5

version 1.117, 2007/02/14 05:13:59 version 1.121.4.5, 2008/03/26 19:21:44
Line 42 
Line 42 
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
 #include "ArrayIterator.h" #include "ArrayIterator.h"
   #include "HostAddress.h"
 #include <errno.h> #include <errno.h>
  
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 static AtomicInt _connections(0);  
   
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
 // Monitor  // Tickler
 // //
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32  Tickler::Tickler()
 Monitor::Monitor()      : _listenSocket(PEGASUS_INVALID_SOCKET),
    : _stopConnections(0),        _clientSocket(PEGASUS_INVALID_SOCKET),
      _stopConnectionsSem(0),        _serverSocket(PEGASUS_INVALID_SOCKET)
      _solicitSocketCount(0),  
      _tickle_client_socket(-1),  
      _tickle_server_socket(-1),  
      _tickle_peer_socket(-1)  
 { {
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;      try
     Socket::initializeInterface();  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);  
   
     // setup the tickler  
     initializeTickler();  
   
     // Start the count at 1 because initilizeTickler()  
     // has added an entry in the first position of the  
     // _entries array  
     for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)  
     {     {
        _MonitorEntry entry(0, 0, 0);          _initialize();
        _entries.append(entry);      }
       catch (...)
       {
           _uninitialize();
           throw;
     }     }
 } }
  
 Monitor::~Monitor()  Tickler::~Tickler()
 { {
     uninitializeTickler();      _uninitialize();
     Socket::uninitializeInterface();  
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                   "returning from monitor destructor");  
 } }
 void Monitor::uninitializeTickler()  
 {  
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");  
  
     try  void Tickler::notify()
     {  
         if (_tickle_peer_socket >= 0)  
         {         {
             Socket::close(_tickle_peer_socket);      Socket::write(_clientSocket, "\0", 1);
         }         }
         if (_tickle_client_socket >= 0)  
   void Tickler::reset()
         {         {
             Socket::close(_tickle_client_socket);      // Clear all bytes from the tickle socket
         }      char buffer[32];
         if (_tickle_server_socket >= 0)      while (Socket::read(_serverSocket, buffer, 32) > 0)
         {         {
             Socket::close(_tickle_server_socket);  
         }         }
     }     }
     catch (...)  
     {  
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                   "Failed to close tickle sockets");  
     }  
  
 }  #if defined(PEGASUS_OS_TYPE_UNIX)
  
 void Monitor::initializeTickler()  // Use an anonymous pipe for the tickle connection.
   
   void Tickler::_initialize()
 { {
     /*      int fds[2];
        NOTE: On any errors trying to  
              setup out tickle connection,  
              throw an exception/end the server  
     */  
  
     /* setup the tickle server/listener */      if (pipe(fds) == -1)
     // try until the tcpip is restarted  
     do  
     {  
         // get a socket for the server side  
         if ((_tickle_server_socket =  
                  Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==  
              PEGASUS_INVALID_SOCKET)  
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_CREATE",                 "Common.Monitor.TICKLE_CREATE",
Line 140 
Line 107 
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // initialize the address      _serverSocket = fds[0];
         memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));      _clientSocket = fds[1];
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(37)      Socket::disableBlocking(_serverSocket);
   }
   
   #else
   
   // Use an external loopback socket connection to allow the tickle socket to
   // be included in the select() array on non-Unix platforms.
   
   void Tickler::_initialize()
   {
       //
       // Set up the addresses for the listen, client, and server sockets
       // based on whether IPv6 is enabled.
       //
   
       Socket::initializeInterface();
   
   # ifdef PEGASUS_ENABLE_IPV6
       struct sockaddr_storage listenAddress;
       struct sockaddr_storage clientAddress;
       struct sockaddr_storage serverAddress;
   # else
       struct sockaddr_in listenAddress;
       struct sockaddr_in clientAddress;
       struct sockaddr_in serverAddress;
 #endif #endif
         _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM      int addressFamily;
 #pragma convert(0)      SocketLength addressLength;
   
       memset(&listenAddress, 0, sizeof (listenAddress));
   
   # ifdef PEGASUS_ENABLE_IPV6
       if (System::isIPv6StackActive())
       {
           // Use the IPv6 loopback address for the listen sockets
           HostAddress::convertTextToBinary(
               HostAddress::AT_IPV6,
               "::1",
               &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
           listenAddress.ss_family = AF_INET6;
           reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
   
           addressFamily = AF_INET6;
           addressLength = sizeof(struct sockaddr_in6);
       }
       else
 #endif #endif
         _tickle_server_addr.sin_family = PF_INET;      {
         _tickle_server_addr.sin_port = 0;          // Use the IPv4 loopback address for the listen sockets
           HostAddress::convertTextToBinary(
               HostAddress::AT_IPV4,
               "127.0.0.1",
               &reinterpret_cast<struct sockaddr_in*>(
                   &listenAddress)->sin_addr.s_addr);
           reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
               AF_INET;
           reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
   
           addressFamily = AF_INET;
           addressLength = sizeof(struct sockaddr_in);
       }
  
         SocketLength _addr_size = sizeof(_tickle_server_addr);      // Use the same address for the client socket as the listen socket
       clientAddress = listenAddress;
  
         // bind server side to socket      //
         if ((::bind(_tickle_server_socket,      // Set up a listen socket to allow the tickle client and server to connect
                  reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),      //
                  sizeof(_tickle_server_addr))) < 0)  
       // Create the listen socket
       if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
                PEGASUS_INVALID_SOCKET)
         {         {
 #ifdef PEGASUS_OS_ZOS  
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_BIND_LONG",              "Common.Monitor.TICKLE_CREATE",
                 "Received error:$0 while binding the internal socket.",              "Received error number $0 while creating the internal socket.",
                 strerror(errno));              getSocketError());
 #else          throw Exception(parms);
       }
   
       // Bind the listen socket to the loopback address
       if (::bind(
               _listenSocket,
               reinterpret_cast<struct sockaddr*>(&listenAddress),
               addressLength) < 0)
       {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_BIND",                 "Common.Monitor.TICKLE_BIND",
                 "Received error number $0 while binding the internal socket.",                 "Received error number $0 while binding the internal socket.",
                 getSocketError());                 getSocketError());
 #endif  
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // tell the kernel we are a server      // Listen for a connection from the tickle client
         if ((::listen(_tickle_server_socket, 3)) < 0)      if ((::listen(_listenSocket, 3)) < 0)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_LISTEN",                 "Common.Monitor.TICKLE_LISTEN",
                 "Received error number $0 while listening to the internal "              "Received error number $0 while listening to the internal socket.",
                     "socket.",  
                 getSocketError());                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // make sure we have the correct socket for our server      // Verify we have the correct listen socket
       SocketLength tmpAddressLength = addressLength;
         int sock = ::getsockname(         int sock = ::getsockname(
             _tickle_server_socket,          _listenSocket,
             reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),          reinterpret_cast<struct sockaddr*>(&listenAddress),
             &_addr_size);          &tmpAddressLength);
         if (sock < 0)         if (sock < 0)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_SOCKNAME",                 "Common.Monitor.TICKLE_SOCKNAME",
                 "Received error number $0 while getting the internal socket "              "Received error number $0 while getting the internal socket name.",
                     "name.",  
                 getSocketError());                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         /* set up the tickle client/connector */      //
       // Set up the client side of the tickle connection.
       //
  
         // get a socket for our tickle client      // Create the client socket
         if ((_tickle_client_socket =      if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
                  Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==  
             PEGASUS_INVALID_SOCKET)             PEGASUS_INVALID_SOCKET)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
Line 214 
Line 245 
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // setup the address of the client      // Bind the client socket to the loopback address
         memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));      if (::bind(
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM              _clientSocket,
 # pragma convert(37)              reinterpret_cast<struct sockaddr*>(&clientAddress),
 #endif              addressLength) < 0)
         _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 # pragma convert(0)  
 #endif  
         _tickle_client_addr.sin_family = PF_INET;  
         _tickle_client_addr.sin_port = 0;  
   
         // bind socket to client side  
         if ((::bind(_tickle_client_socket,  
                  reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),  
                  sizeof(_tickle_client_addr))) < 0)  
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_CLIENT_BIND",                 "Common.Monitor.TICKLE_CLIENT_BIND",
Line 239 
Line 259 
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // connect to server side      // Connect the client socket to the listen socket address
         if ((::connect(_tickle_client_socket,      if (::connect(
                  reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),              _clientSocket,
                  sizeof(_tickle_server_addr))) < 0)              reinterpret_cast<struct sockaddr*>(&listenAddress),
               addressLength) < 0)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_CLIENT_CONNECT",                 "Common.Monitor.TICKLE_CLIENT_CONNECT",
                 "Received error number $0 while connecting the internal "              "Received error number $0 while connecting the internal client "
                     "client socket.",                  "socket.",
                 getSocketError());                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         /* set up the slave connection */      //
         memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));      // Set up the server side of the tickle connection.
         SocketLength peer_size = sizeof(_tickle_peer_addr);      //
         Threads::sleep(1);  
       tmpAddressLength = addressLength;
         // this call may fail, we will try a max of 20 times to establish  
         // this peer connection      // Accept the client socket connection.
         if ((_tickle_peer_socket = ::accept(_tickle_server_socket,      _serverSocket = ::accept(
                  reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),          _listenSocket,
                  &peer_size)) < 0)          reinterpret_cast<struct sockaddr*>(&serverAddress),
         {          &tmpAddressLength);
             if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&  
                 getSocketError() == PEGASUS_NETWORK_TRYAGAIN)      if (_serverSocket == PEGASUS_SOCKET_ERROR)
             {  
                 int retries = 0;  
                 do  
                 {  
                     Threads::sleep(1);  
                     _tickle_peer_socket = ::accept(  
                         _tickle_server_socket,  
                         reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),  
                         &peer_size);  
                     retries++;  
                 } while (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&  
                          getSocketError() == PEGASUS_NETWORK_TRYAGAIN &&  
                          retries < 20);  
             }  
             // TCP/IP is down, destroy sockets and retry again.  
             if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&  
                 getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)  
             {  
                 // destroy everything  
                 uninitializeTickler();  
                 // retry again.  
                 continue;  
             }  
         }  
         if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR)  
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_ACCEPT",                 "Common.Monitor.TICKLE_ACCEPT",
                 "Received error number $0 while accepting the internal "              "Received error number $0 while accepting the internal socket "
                     "socket connection.",                  "connection.",
                 getSocketError());                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
         else  
       //
       // Close the listen socket and make the other sockets non-blocking
       //
   
       Socket::close(_listenSocket);
       _listenSocket = PEGASUS_INVALID_SOCKET;
   
       Socket::disableBlocking(_serverSocket);
       Socket::disableBlocking(_clientSocket);
   }
   
   #endif
   
   void Tickler::_uninitialize()
         {         {
             // socket is ok      PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
             break;  
       try
       {
           if (_serverSocket != PEGASUS_INVALID_SOCKET)
           {
               Socket::close(_serverSocket);
               _serverSocket = PEGASUS_INVALID_SOCKET;
           }
           if (_clientSocket != PEGASUS_INVALID_SOCKET)
           {
               Socket::close(_clientSocket);
               _clientSocket = PEGASUS_INVALID_SOCKET;
           }
           if (_listenSocket != PEGASUS_INVALID_SOCKET)
           {
               Socket::close(_listenSocket);
               _listenSocket = PEGASUS_INVALID_SOCKET;
           }
       }
       catch (...)
       {
           PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
               "Failed to close tickle sockets");
       }
       Socket::uninitializeInterface();
         }         }
     } while (1); // try until TCP/IP is restarted  
  
     Socket::disableBlocking(_tickle_peer_socket);  
     Socket::disableBlocking(_tickle_client_socket);  
  
     // add the tickler to the list of entries to be monitored and set to  ////////////////////////////////////////////////////////////////////////////////
     // IDLE because Monitor only  //
     // checks entries with IDLE state for events  // Monitor
     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);  //
   ////////////////////////////////////////////////////////////////////////////////
   
   #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
   Monitor::Monitor()
      : _stopConnections(0),
        _stopConnectionsSem(0),
        _solicitSocketCount(0)
   {
       int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
       _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
   
       // Create a MonitorEntry for the Tickler and set its state to IDLE so the
       // Monitor will watch for its events.
       _MonitorEntry entry(_tickler.getReadHandle(), 1, INTERNAL);
     entry._status = _MonitorEntry::IDLE;     entry._status = _MonitorEntry::IDLE;
       _entries.append(entry);
  
     // is the tickler initalized as first socket on startup ?      // Start the count at 1 because _entries[0] is the Tickler
     if (_entries.size()==0)      for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
     {     {
        // if yes, append a new entry         _MonitorEntry entry(0, 0, 0);
        _entries.append(entry);        _entries.append(entry);
     }     }
     else  
     {  
        // if not, overwrite the tickler entry with new socket  
        _entries[0]=entry;  
     }     }
   
   Monitor::~Monitor()
   {
       PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                     "returning from monitor destructor");
 } }
  
 void Monitor::tickle() void Monitor::tickle()
 { {
     static char _buffer[] =      _tickler.notify();
     {  
       '0','0'  
     };  
   
     AutoMutex autoMutex(_tickle_mutex);  
     Socket::write(_tickle_client_socket,&_buffer, 2);  
 } }
  
 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )  void Monitor::setState(
       Uint32 index,
       _MonitorEntry::entry_status status)
 { {
       AutoMutex autoEntryMutex(_entry_mut);
     // Set the state to requested state     // Set the state to requested state
     _entries[index]._status = status;     _entries[index]._status = status;
 } }
  
 void Monitor::run(Uint32 milliseconds) void Monitor::run(Uint32 milliseconds)
 { {
   
   
     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
  
     fd_set fdread;     fd_set fdread;
Line 406 
Line 448 
  
             if (h._responsePending == true)             if (h._responsePending == true)
             {             {
                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                  PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                     "Monitor::run - Ignoring connection delete request "                     "Monitor::run - Ignoring connection delete request "
                         "because responses are still pending. "                         "because responses are still pending. "
                         "connection=0x%p, socket=%d\n",                         "connection=0x%p, socket=%d\n",
                     (void *)&h, h.getSocket());                      (void *)&h, h.getSocket()));
                 continue;                 continue;
             }             }
             h._connectionClosePending = false;             h._connectionClosePending = false;
Line 483 
Line 525 
 #endif #endif
     _entry_mut.lock();     _entry_mut.lock();
  
       struct timeval timeNow;
       Time::gettimeofday(&timeNow);
   
     // After enqueue a message and the autoEntryMutex has been released and     // After enqueue a message and the autoEntryMutex has been released and
     // locked again, the array of _entries can be changed. The ArrayIterator     // locked again, the array of _entries can be changed. The ArrayIterator
     // has be reset with the original _entries     // has be reset with the original _entries
Line 490 
Line 535 
  
     if (events == PEGASUS_SOCKET_ERROR)     if (events == PEGASUS_SOCKET_ERROR)
     {     {
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
             "Monitor::run - errorno = %d has occurred on select.", errno);              "Monitor::run - errorno = %d has occurred on select.", errno));
         // The EBADF error indicates that one or more or the file         // The EBADF error indicates that one or more or the file
         // descriptions was not valid. This could indicate that         // descriptions was not valid. This could indicate that
         // the entries structure has been corrupted or that         // the entries structure has been corrupted or that
Line 501 
Line 546 
     }     }
     else if (events)     else if (events)
     {     {
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
             "Monitor::run select event received events = %d, monitoring %d "             "Monitor::run select event received events = %d, monitoring %d "
                 "idle entries",                 "idle entries",
             events, _idleEntries);              events, _idleEntries));
         for (int indx = 0; indx < (int)entries.size(); indx++)         for (int indx = 0; indx < (int)entries.size(); indx++)
         {         {
             // The Monitor should only look at entries in the table that are             // The Monitor should only look at entries in the table that are
Line 513 
Line 558 
                 (FD_ISSET(entries[indx].socket, &fdread)))                 (FD_ISSET(entries[indx].socket, &fdread)))
             {             {
                 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);                 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                  PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                     "Monitor::run indx = %d, queueId =  %d, q = %p",                     "Monitor::run indx = %d, queueId =  %d, q = %p",
                     indx, entries[indx].queueId, q);                      indx, entries[indx].queueId, q));
                 PEGASUS_ASSERT(q !=0);                 PEGASUS_ASSERT(q !=0);
  
                 try                 try
                 {                 {
                     if (entries[indx]._type == Monitor::CONNECTION)                     if (entries[indx]._type == Monitor::CONNECTION)
                     {                     {
                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                             "entries[indx].type for indx = %d is "                             "entries[indx].type for indx = %d is "
                                 "Monitor::CONNECTION",                                 "Monitor::CONNECTION",
                             indx);                              indx));
                         static_cast<HTTPConnection *>(q)->_entry_index = indx;  
   
                         // Do not update the entry just yet. The entry gets  
                         // updated once the request has been read.  
                         //entries[indx]._status = _MonitorEntry::BUSY;  
   
                         // If allocate_and_awaken failure, retry on next  
                         // iteration  
 /* Removed for PEP 183.  
                         if (!MessageQueueService::get_thread_pool()->  
                                 allocate_and_awaken((void *)q, _dispatch))  
                         {  
                             Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,  
                                 "Monitor::run: Insufficient resources to "  
                                     "process request.");  
                             entries[indx]._status = _MonitorEntry::IDLE;  
                             return true;  
                         }  
 */  
 // Added for PEP 183  
                         HTTPConnection *dst =                         HTTPConnection *dst =
                             reinterpret_cast<HTTPConnection *>(q);                             reinterpret_cast<HTTPConnection *>(q);
                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          dst->_entry_index = indx;
   
                           // Update idle start time because we have received some
                           // data. Any data is good data at this point, and we'll
                           // keep the connection alive, even if we've exceeded
                           // the idleConnectionTimeout, which will be checked
                           // when we call closeConnectionOnTimeout() next.
                           Time::gettimeofday(&dst->_idleStartTime);
   
                           // Check for accept pending (ie. SSL handshake pending)
                           // or idle connection timeouts for sockets from which
                           // we received data (avoiding extra queue lookup below).
                           if (!dst->closeConnectionOnTimeout(&timeNow))
                           {
   
                           PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                             "Monitor::_dispatch: entering run() for "                             "Monitor::_dispatch: entering run() for "
                                 "indx = %d, queueId = %d, q = %p",                                 "indx = %d, queueId = %d, q = %p",
                             dst->_entry_index,                             dst->_entry_index,
                             dst->_monitor->_entries[dst->_entry_index].queueId,                                  dst->_monitor->
                             dst);                                      _entries[dst->_entry_index].queueId,
                               dst));
  
                         try                         try
                         {                         {
Line 561 
Line 602 
                         }                         }
                         catch (...)                         catch (...)
                         {                         {
                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                              PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                                 "Monitor::_dispatch: exception received");                                 "Monitor::_dispatch: exception received");
                         }                         }
                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                             "Monitor::_dispatch: exited run() for index %d",                             "Monitor::_dispatch: exited run() for index %d",
                             dst->_entry_index);                              dst->_entry_index));
                           }
                         // It is possible the entry status may not be set to  
                         // busy.  The following will fail in that case.  
                         // PEGASUS_ASSERT(dst->_monitor->_entries[  
                         //     dst->_entry_index]._status.get() ==  
                         //    _MonitorEntry::BUSY);  
                         // Once the HTTPConnection thread has set the status  
                         // value to either Monitor::DYING or Monitor::IDLE,  
                         // it has returned control of the connection to the  
                         // Monitor.  It is no longer permissible to access  
                         // the connection or the entry in the _entries table.  
   
                         // The following is not relevant as the worker thread  
                         // or the reader thread will update the status of the  
                         // entry.  
                         //if (dst->_connectionClosePending)  
                         //{  
                         //  dst->_monitor->_entries[dst->_entry_index]._status =  
                         //    _MonitorEntry::DYING;  
                         //}  
                         //else  
                         //{  
                         //  dst->_monitor->_entries[dst->_entry_index]._status =  
                         //    _MonitorEntry::IDLE;  
                         //}  
 // end Added for PEP 183  
                     }                     }
                     else if (entries[indx]._type == Monitor::INTERNAL)                     else if (entries[indx]._type == Monitor::INTERNAL)
                     {                     {
                         // set ourself to BUSY,                          _tickler.reset();
                         // read the data  
                         // and set ourself back to IDLE  
   
                         entries[indx]._status = _MonitorEntry::BUSY;  
                         static char buffer[2];  
                         Sint32 amt =  
                             Socket::read(entries[indx].socket,&buffer, 2);  
   
                         if (amt == PEGASUS_SOCKET_ERROR &&  
                             getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)  
                         {  
                             Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                                 "Monitor::run: Tickler socket got an IO error. "  
                                     "Going to re-create Socket and wait for "  
                                     "TCP/IP restart.");  
                             uninitializeTickler();  
                             initializeTickler();  
                         }                         }
                         else                         else
                         {                         {
                             entries[indx]._status = _MonitorEntry::IDLE;                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                         }  
                     }  
                     else  
                     {  
                         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                             "Non-connection entry, indx = %d, has been "                             "Non-connection entry, indx = %d, has been "
                                 "received.",                                 "received.",
                             indx);                              indx));
                         int events = 0;                         int events = 0;
                         events |= SocketMessage::READ;                         events |= SocketMessage::READ;
                         Message* msg = new SocketMessage(                         Message* msg = new SocketMessage(
Line 647 
Line 641 
                 {                 {
                 }                 }
             }             }
               // else check for accept pending (ie. SSL handshake pending) or
               // idle connection timeouts for sockets from which we did not
               // receive data.
               else if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
                   entries[indx]._type == Monitor::CONNECTION)
               {
                   MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
                   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
                   dst->_entry_index = indx;
                   dst->closeConnectionOnTimeout(&timeNow);
               }
           }
       }
       // else if "events" is zero (ie. select timed out) then we still need
       // to check if there are any pending SSL handshakes that have timed out.
       else
       {
           for (int indx = 0; indx < (int)entries.size(); indx++)
           {
               if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
                   entries[indx]._type == Monitor::CONNECTION)
               {
                   MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
                   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
                   dst->_entry_index = indx;
                   dst->closeConnectionOnTimeout(&timeNow);
               }
         }         }
     }     }
 } }
Line 759 
Line 780 
 ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm) ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm)
 { {
     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,      PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, "         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, "
             "q = %p",             "q = %p",
         dst->_entry_index,         dst->_entry_index,
         dst->_monitor->_entries[dst->_entry_index].queueId,         dst->_monitor->_entries[dst->_entry_index].queueId,
         dst);          dst));
  
     try     try
     {     {
Line 772 
Line 793 
     }     }
     catch (...)     catch (...)
     {     {
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
             "Monitor::_dispatch: exception received");             "Monitor::_dispatch: exception received");
     }     }
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,      PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
         "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);          "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));
  
     PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==     PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==
         _MonitorEntry::BUSY);         _MonitorEntry::BUSY);


Legend:
Removed from v.1.117  
changed lines
  Added in v.1.121.4.5

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2