(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.121 and 1.121.4.1

version 1.121, 2007/07/20 18:43:40 version 1.121.4.1, 2007/11/05 17:03:29
Line 42 
Line 42 
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
 #include "ArrayIterator.h" #include "ArrayIterator.h"
   #include "HostAddress.h"
 #include <errno.h> #include <errno.h>
  
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 static AtomicInt _connections(0);  
   
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
 // Monitor  // Tickler
 // //
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32  Tickler::Tickler()
 Monitor::Monitor()      : _listenSocket(PEGASUS_INVALID_SOCKET),
    : _stopConnections(0),        _clientSocket(PEGASUS_INVALID_SOCKET),
      _stopConnectionsSem(0),        _serverSocket(PEGASUS_INVALID_SOCKET)
      _solicitSocketCount(0),  
      _tickle_client_socket(-1),  
      _tickle_server_socket(-1),  
      _tickle_peer_socket(-1)  
 { {
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;      try
     Socket::initializeInterface();  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);  
   
     // setup the tickler  
     initializeTickler();  
   
     // Start the count at 1 because initilizeTickler()  
     // has added an entry in the first position of the  
     // _entries array  
     for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)  
     {     {
        _MonitorEntry entry(0, 0, 0);          _initialize();
        _entries.append(entry);      }
       catch (...)
       {
           _uninitialize();
           throw;
     }     }
 } }
  
 Monitor::~Monitor()  Tickler::~Tickler()
 { {
     uninitializeTickler();      _uninitialize();
     Socket::uninitializeInterface();  
     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,  
                   "returning from monitor destructor");  
 } }
 void Monitor::uninitializeTickler()  
 {  
     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");  
  
     try  #if defined(PEGASUS_OS_TYPE_UNIX) || defined(PEGASUS_OS_VMS)
   
   // Use an anonymous pipe for the tickle connection.
   
   void Tickler::_initialize()
     {     {
         if (_tickle_peer_socket >= 0)      int fds[2];
   
       if (pipe(fds) == -1)
         {         {
             Socket::close(_tickle_peer_socket);          MessageLoaderParms parms(
               "Common.Monitor.TICKLE_CREATE",
               "Received error number $0 while creating the internal socket.",
               getSocketError());
           throw Exception(parms);
         }         }
         if (_tickle_client_socket >= 0)  
         {      _serverSocket = fds[0];
             Socket::close(_tickle_client_socket);      _clientSocket = fds[1];
         }         }
         if (_tickle_server_socket >= 0)  
   #else
   
   // Use an external loopback socket connection to allow the tickle socket to
   // be included in the select() array on Windows.
   
   void Tickler::_initialize()
         {         {
             Socket::close(_tickle_server_socket);      //
         }      // Set up the addresses for the listen, client, and server sockets
     }      // based on whether IPv6 is enabled.
     catch (...)      //
   
       Socket::initializeInterface();
   
   # ifdef PEGASUS_ENABLE_IPV6
       struct sockaddr_storage listenAddress;
       struct sockaddr_storage clientAddress;
       struct sockaddr_storage serverAddress;
   # else
       struct sockaddr_in listenAddress;
       struct sockaddr_in clientAddress;
       struct sockaddr_in serverAddress;
   # endif
   
       int addressFamily;
       SocketLength addressLength;
   
       memset(&listenAddress, 0, sizeof (listenAddress));
   
   # ifdef PEGASUS_ENABLE_IPV6
       if (System::isIPv6StackActive())
     {     {
         PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,          // Use the IPv6 loopback address for the listen sockets
                   "Failed to close tickle sockets");          HostAddress::convertTextToBinary(
               HostAddress::AT_IPV6,
               "::1",
               &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
           listenAddress.ss_family = AF_INET6;
           reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
   
           addressFamily = AF_INET6;
           addressLength = sizeof(struct sockaddr_in6);
     }     }
       else
   # endif
       {
           // Use the IPv4 loopback address for the listen sockets
           HostAddress::convertTextToBinary(
               HostAddress::AT_IPV4,
               "127.0.0.1",
               &reinterpret_cast<struct sockaddr_in*>(
                   &listenAddress)->sin_addr.s_addr);
           reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
               AF_INET;
           reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
  
           addressFamily = AF_INET;
           addressLength = sizeof(struct sockaddr_in);
 } }
  
 void Monitor::initializeTickler()      // Use the same address for the client socket as the listen socket
 {      clientAddress = listenAddress;
     /*  
        NOTE: On any errors trying to      //
              setup out tickle connection,      // Set up a listen socket to allow the tickle client and server to connect
              throw an exception/end the server      //
     */  
  
     /* setup the tickle server/listener */      // Create the listen socket
     // try until the tcpip is restarted      if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
     do  
     {  
         // get a socket for the server side  
         if ((_tickle_server_socket =  
                  Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==  
              PEGASUS_INVALID_SOCKET)              PEGASUS_INVALID_SOCKET)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
Line 140 
Line 175 
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // initialize the address      // Bind the listen socket to the loopback address
         memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));      if (::bind(
               _listenSocket,
         _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");              reinterpret_cast<struct sockaddr*>(&listenAddress),
         _tickle_server_addr.sin_family = PF_INET;              addressLength) < 0)
         _tickle_server_addr.sin_port = 0;  
   
         SocketLength _addr_size = sizeof(_tickle_server_addr);  
   
         // bind server side to socket  
         if ((::bind(_tickle_server_socket,  
                  reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),  
                  sizeof(_tickle_server_addr))) < 0)  
         {         {
 #ifdef PEGASUS_OS_ZOS  
             MessageLoaderParms parms(  
                 "Common.Monitor.TICKLE_BIND_LONG",  
                 "Received error:$0 while binding the internal socket.",  
                 strerror(errno));  
 #else  
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_BIND",                 "Common.Monitor.TICKLE_BIND",
                 "Received error number $0 while binding the internal socket.",                 "Received error number $0 while binding the internal socket.",
                 getSocketError());                 getSocketError());
 #endif  
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // tell the kernel we are a server      // Listen for a connection from the tickle client
         if ((::listen(_tickle_server_socket, 3)) < 0)      if ((::listen(_listenSocket, 3)) < 0)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_LISTEN",                 "Common.Monitor.TICKLE_LISTEN",
                 "Received error number $0 while listening to the internal "              "Received error number $0 while listening to the internal socket.",
                     "socket.",  
                 getSocketError());                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // make sure we have the correct socket for our server      // Verify we have the correct listen socket
       SocketLength tmpAddressLength = addressLength;
         int sock = ::getsockname(         int sock = ::getsockname(
             _tickle_server_socket,          _listenSocket,
             reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),          reinterpret_cast<struct sockaddr*>(&listenAddress),
             &_addr_size);          &tmpAddressLength);
         if (sock < 0)         if (sock < 0)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_SOCKNAME",                 "Common.Monitor.TICKLE_SOCKNAME",
                 "Received error number $0 while getting the internal socket "              "Received error number $0 while getting the internal socket name.",
                     "name.",  
                 getSocketError());                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         /* set up the tickle client/connector */      //
       // Set up the client side of the tickle connection.
       //
  
         // get a socket for our tickle client      // Create the client socket
         if ((_tickle_client_socket =      if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
                  Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==  
             PEGASUS_INVALID_SOCKET)             PEGASUS_INVALID_SOCKET)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
Line 209 
Line 229 
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // setup the address of the client      // Bind the client socket to the loopback address
         memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));      if (::bind(
               _clientSocket,
         _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");              reinterpret_cast<struct sockaddr*>(&clientAddress),
         _tickle_client_addr.sin_family = PF_INET;              addressLength) < 0)
         _tickle_client_addr.sin_port = 0;  
   
         // bind socket to client side  
         if ((::bind(_tickle_client_socket,  
                  reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),  
                  sizeof(_tickle_client_addr))) < 0)  
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_CLIENT_BIND",                 "Common.Monitor.TICKLE_CLIENT_BIND",
Line 229 
Line 243 
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // connect to server side      // Connect the client socket to the listen socket address
         if ((::connect(_tickle_client_socket,      if (::connect(
                  reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),              _clientSocket,
                  sizeof(_tickle_server_addr))) < 0)              reinterpret_cast<struct sockaddr*>(&listenAddress),
               addressLength) < 0)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_CLIENT_CONNECT",                 "Common.Monitor.TICKLE_CLIENT_CONNECT",
                 "Received error number $0 while connecting the internal "              "Received error number $0 while connecting the internal client "
                     "client socket.",                  "socket.",
                 getSocketError());                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         /* set up the slave connection */      //
         memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));      // Set up the server side of the tickle connection.
         SocketLength peer_size = sizeof(_tickle_peer_addr);      //
         Threads::sleep(1);  
       tmpAddressLength = addressLength;
         // this call may fail, we will try a max of 20 times to establish  
         // this peer connection      // Accept the client socket connection.
         if ((_tickle_peer_socket = ::accept(_tickle_server_socket,      _serverSocket = ::accept(
                  reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),          _listenSocket,
                  &peer_size)) < 0)          reinterpret_cast<struct sockaddr*>(&serverAddress),
         {          &tmpAddressLength);
             if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&  
                 getSocketError() == PEGASUS_NETWORK_TRYAGAIN)      if (_serverSocket == PEGASUS_SOCKET_ERROR)
             {  
                 int retries = 0;  
                 do  
                 {  
                     Threads::sleep(1);  
                     _tickle_peer_socket = ::accept(  
                         _tickle_server_socket,  
                         reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),  
                         &peer_size);  
                     retries++;  
                 } while (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&  
                          getSocketError() == PEGASUS_NETWORK_TRYAGAIN &&  
                          retries < 20);  
             }  
             // TCP/IP is down, destroy sockets and retry again.  
             if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&  
                 getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)  
             {  
                 // destroy everything  
                 uninitializeTickler();  
                 // retry again.  
                 continue;  
             }  
         }  
         if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR)  
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_ACCEPT",                 "Common.Monitor.TICKLE_ACCEPT",
                 "Received error number $0 while accepting the internal "              "Received error number $0 while accepting the internal socket "
                     "socket connection.",                  "connection.",
                 getSocketError());                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
         else  
       //
       // Close the listen socket and make the other sockets non-blocking
       //
   
       Socket::close(_listenSocket);
       _listenSocket = PEGASUS_INVALID_SOCKET;
   
       Socket::disableBlocking(_serverSocket);
       Socket::disableBlocking(_clientSocket);
   }
   
   #endif
   
   void Tickler::_uninitialize()
         {         {
             // socket is ok      PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
             break;  
       try
       {
           if (_serverSocket != PEGASUS_INVALID_SOCKET)
           {
               Socket::close(_serverSocket);
               _serverSocket = PEGASUS_INVALID_SOCKET;
           }
           if (_clientSocket != PEGASUS_INVALID_SOCKET)
           {
               Socket::close(_clientSocket);
               _clientSocket = PEGASUS_INVALID_SOCKET;
           }
           if (_listenSocket != PEGASUS_INVALID_SOCKET)
           {
               Socket::close(_listenSocket);
               _listenSocket = PEGASUS_INVALID_SOCKET;
           }
       }
       catch (...)
       {
           PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
               "Failed to close tickle sockets");
         }         }
     } while (1); // try until TCP/IP is restarted      Socket::uninitializeInterface();
   }
   
   
   ////////////////////////////////////////////////////////////////////////////////
   //
   // Monitor
   //
   ////////////////////////////////////////////////////////////////////////////////
  
     Socket::disableBlocking(_tickle_peer_socket);  #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
     Socket::disableBlocking(_tickle_client_socket);  Monitor::Monitor()
      : _stopConnections(0),
        _stopConnectionsSem(0),
        _solicitSocketCount(0)
   {
       int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
       _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
  
     // add the tickler to the list of entries to be monitored and set to      // Create a MonitorEntry for the Tickler and set its state to IDLE so the
     // IDLE because Monitor only      // Monitor will watch for its events.
     // checks entries with IDLE state for events      _MonitorEntry entry(_tickler.getServerSocket(), 1, INTERNAL);
     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);  
     entry._status = _MonitorEntry::IDLE;     entry._status = _MonitorEntry::IDLE;
       _entries.append(entry);
  
     // is the tickler initalized as first socket on startup ?      // Start the count at 1 because _entries[0] is the Tickler
     if (_entries.size()==0)      for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
     {     {
        // if yes, append a new entry         _MonitorEntry entry(0, 0, 0);
        _entries.append(entry);        _entries.append(entry);
     }     }
     else  
     {  
        // if not, overwrite the tickler entry with new socket  
        _entries[0]=entry;  
     }     }
   
   Monitor::~Monitor()
   {
       PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                     "returning from monitor destructor");
 } }
  
 void Monitor::tickle() void Monitor::tickle()
 { {
     static char _buffer[] =      AutoMutex autoMutex(_tickleMutex);
     {      Socket::write(_tickler.getClientSocket(), "\0\0", 2);
       '0','0'  
     };  
   
     AutoMutex autoMutex(_tickle_mutex);  
     Socket::write(_tickle_client_socket,&_buffer, 2);  
 } }
  
 void Monitor::setState( void Monitor::setState(
Line 339 
Line 375 
  
 void Monitor::run(Uint32 milliseconds) void Monitor::run(Uint32 milliseconds)
 { {
   
   
     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
  
     fd_set fdread;     fd_set fdread;
Line 599 
Line 633 
                         Sint32 amt =                         Sint32 amt =
                             Socket::read(entries[indx].socket,&buffer, 2);                             Socket::read(entries[indx].socket,&buffer, 2);
  
                         if (amt == PEGASUS_SOCKET_ERROR &&  
                             getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)  
                         {  
                             PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,  
                                 "Monitor::run: Tickler socket got an IO error. "  
                                     "Going to re-create Socket and wait for "  
                                     "TCP/IP restart.");  
                             uninitializeTickler();  
                             initializeTickler();  
                         }  
                         else  
                         {  
                             entries[indx]._status = _MonitorEntry::IDLE;                             entries[indx]._status = _MonitorEntry::IDLE;
                         }                         }
                     }  
                     else                     else
                     {                     {
                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,


Legend:
Removed from v.1.121  
changed lines
  Added in v.1.121.4.1

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2