(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.116.4.2 and 1.131

version 1.116.4.2, 2007/12/14 20:47:55 version 1.131, 2008/02/29 18:12:38
Line 42 
Line 42 
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
 #include "ArrayIterator.h" #include "ArrayIterator.h"
   #include "HostAddress.h"
 #include <errno.h> #include <errno.h>
  
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 static AtomicInt _connections(0);  
   
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
 // Monitor  // Tickler
 // //
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32  Tickler::Tickler()
 Monitor::Monitor()      : _listenSocket(PEGASUS_INVALID_SOCKET),
    : _stopConnections(0),        _clientSocket(PEGASUS_INVALID_SOCKET),
      _stopConnectionsSem(0),        _serverSocket(PEGASUS_INVALID_SOCKET)
      _solicitSocketCount(0),  
      _tickle_client_socket(-1),  
      _tickle_server_socket(-1),  
      _tickle_peer_socket(-1)  
 { {
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;      try
     Socket::initializeInterface();      {
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);          _initialize();
       }
     // setup the tickler      catch (...)
     initializeTickler();  
   
     // Start the count at 1 because initilizeTickler()  
     // has added an entry in the first position of the  
     // _entries array  
     for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)  
     {     {
        _MonitorEntry entry(0, 0, 0);          _uninitialize();
        _entries.append(entry);          throw;
     }     }
 } }
  
 Monitor::~Monitor()  Tickler::~Tickler()
 { {
     uninitializeTickler();      _uninitialize();
     Socket::uninitializeInterface();  
     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,  
                   "returning from monitor destructor");  
 } }
 void Monitor::uninitializeTickler()  
 {  
     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");  
  
     try  #if defined(PEGASUS_OS_TYPE_UNIX)
   
   // Use an anonymous pipe for the tickle connection.
   
   void Tickler::_initialize()
     {     {
         if (_tickle_peer_socket >= 0)      int fds[2];
   
       if (pipe(fds) == -1)
         {         {
             Socket::close(_tickle_peer_socket);          MessageLoaderParms parms(
               "Common.Monitor.TICKLE_CREATE",
               "Received error number $0 while creating the internal socket.",
               getSocketError());
           throw Exception(parms);
         }         }
         if (_tickle_client_socket >= 0)  
         {      _serverSocket = fds[0];
             Socket::close(_tickle_client_socket);      _clientSocket = fds[1];
         }         }
         if (_tickle_server_socket >= 0)  
   #else
   
   // Use an external loopback socket connection to allow the tickle socket to
   // be included in the select() array on non-Unix platforms.
   
   void Tickler::_initialize()
         {         {
             Socket::close(_tickle_server_socket);      //
         }      // Set up the addresses for the listen, client, and server sockets
     }      // based on whether IPv6 is enabled.
     catch (...)      //
   
       Socket::initializeInterface();
   
   # ifdef PEGASUS_ENABLE_IPV6
       struct sockaddr_storage listenAddress;
       struct sockaddr_storage clientAddress;
       struct sockaddr_storage serverAddress;
   # else
       struct sockaddr_in listenAddress;
       struct sockaddr_in clientAddress;
       struct sockaddr_in serverAddress;
   # endif
   
       int addressFamily;
       SocketLength addressLength;
   
       memset(&listenAddress, 0, sizeof (listenAddress));
   
   # ifdef PEGASUS_ENABLE_IPV6
       if (System::isIPv6StackActive())
     {     {
         PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,          // Use the IPv6 loopback address for the listen sockets
                   "Failed to close tickle sockets");          HostAddress::convertTextToBinary(
               HostAddress::AT_IPV6,
               "::1",
               &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
           listenAddress.ss_family = AF_INET6;
           reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
   
           addressFamily = AF_INET6;
           addressLength = sizeof(struct sockaddr_in6);
     }     }
       else
   # endif
       {
           // Use the IPv4 loopback address for the listen sockets
           HostAddress::convertTextToBinary(
               HostAddress::AT_IPV4,
               "127.0.0.1",
               &reinterpret_cast<struct sockaddr_in*>(
                   &listenAddress)->sin_addr.s_addr);
           reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
               AF_INET;
           reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
  
           addressFamily = AF_INET;
           addressLength = sizeof(struct sockaddr_in);
 } }
  
 void Monitor::initializeTickler()      // Use the same address for the client socket as the listen socket
 {      clientAddress = listenAddress;
     /*  
        NOTE: On any errors trying to  
              setup out tickle connection,  
              throw an exception/end the server  
     */  
  
     /* setup the tickle server/listener */      //
     // try until the tcpip is restarted      // Set up a listen socket to allow the tickle client and server to connect
     do      //
     {  
         // get a socket for the server side      // Create the listen socket
         if ((_tickle_server_socket =      if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
                  Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==  
              PEGASUS_INVALID_SOCKET)              PEGASUS_INVALID_SOCKET)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
Line 140 
Line 175 
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // initialize the address      // Bind the listen socket to the loopback address
         memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));      if (::bind(
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM              _listenSocket,
 #pragma convert(37)              reinterpret_cast<struct sockaddr*>(&listenAddress),
 #endif              addressLength) < 0)
         _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(0)  
 #endif  
         _tickle_server_addr.sin_family = PF_INET;  
         _tickle_server_addr.sin_port = 0;  
   
         SocketLength _addr_size = sizeof(_tickle_server_addr);  
   
         // bind server side to socket  
         if ((::bind(_tickle_server_socket,  
                  reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),  
                  sizeof(_tickle_server_addr))) < 0)  
         {         {
 #ifdef PEGASUS_OS_ZOS  
             MessageLoaderParms parms(  
                 "Common.Monitor.TICKLE_BIND_LONG",  
                 "Received error:$0 while binding the internal socket.",  
                 strerror(errno));  
 #else  
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_BIND",                 "Common.Monitor.TICKLE_BIND",
                 "Received error number $0 while binding the internal socket.",                 "Received error number $0 while binding the internal socket.",
                 getSocketError());                 getSocketError());
 #endif  
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // tell the kernel we are a server      // Listen for a connection from the tickle client
         if ((::listen(_tickle_server_socket, 3)) < 0)      if ((::listen(_listenSocket, 3)) < 0)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_LISTEN",                 "Common.Monitor.TICKLE_LISTEN",
                 "Received error number $0 while listening to the internal "              "Received error number $0 while listening to the internal socket.",
                     "socket.",  
                 getSocketError());                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // make sure we have the correct socket for our server      // Verify we have the correct listen socket
       SocketLength tmpAddressLength = addressLength;
         int sock = ::getsockname(         int sock = ::getsockname(
             _tickle_server_socket,          _listenSocket,
             reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),          reinterpret_cast<struct sockaddr*>(&listenAddress),
             &_addr_size);          &tmpAddressLength);
         if (sock < 0)         if (sock < 0)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_SOCKNAME",                 "Common.Monitor.TICKLE_SOCKNAME",
                 "Received error number $0 while getting the internal socket "              "Received error number $0 while getting the internal socket name.",
                     "name.",  
                 getSocketError());                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         /* set up the tickle client/connector */      //
       // Set up the client side of the tickle connection.
       //
  
         // get a socket for our tickle client      // Create the client socket
         if ((_tickle_client_socket =      if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
                  Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==  
             PEGASUS_INVALID_SOCKET)             PEGASUS_INVALID_SOCKET)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
Line 214 
Line 229 
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // setup the address of the client      // Bind the client socket to the loopback address
         memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));      if (::bind(
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM              _clientSocket,
 # pragma convert(37)              reinterpret_cast<struct sockaddr*>(&clientAddress),
 #endif              addressLength) < 0)
         _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 # pragma convert(0)  
 #endif  
         _tickle_client_addr.sin_family = PF_INET;  
         _tickle_client_addr.sin_port = 0;  
   
         // bind socket to client side  
         if ((::bind(_tickle_client_socket,  
                  reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),  
                  sizeof(_tickle_client_addr))) < 0)  
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_CLIENT_BIND",                 "Common.Monitor.TICKLE_CLIENT_BIND",
Line 239 
Line 243 
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         // connect to server side      // Connect the client socket to the listen socket address
         if ((::connect(_tickle_client_socket,      if (::connect(
                  reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),              _clientSocket,
                  sizeof(_tickle_server_addr))) < 0)              reinterpret_cast<struct sockaddr*>(&listenAddress),
               addressLength) < 0)
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_CLIENT_CONNECT",                 "Common.Monitor.TICKLE_CLIENT_CONNECT",
                 "Received error number $0 while connecting the internal "              "Received error number $0 while connecting the internal client "
                     "client socket.",                  "socket.",
                 getSocketError());                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
  
         /* set up the slave connection */      //
         memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));      // Set up the server side of the tickle connection.
         SocketLength peer_size = sizeof(_tickle_peer_addr);      //
         Threads::sleep(1);  
       tmpAddressLength = addressLength;
         // this call may fail, we will try a max of 20 times to establish  
         // this peer connection      // Accept the client socket connection.
         if ((_tickle_peer_socket = ::accept(_tickle_server_socket,      _serverSocket = ::accept(
                  reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),          _listenSocket,
                  &peer_size)) < 0)          reinterpret_cast<struct sockaddr*>(&serverAddress),
         {          &tmpAddressLength);
             if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&  
                 getSocketError() == PEGASUS_NETWORK_TRYAGAIN)      if (_serverSocket == PEGASUS_SOCKET_ERROR)
             {  
                 int retries = 0;  
                 do  
                 {  
                     Threads::sleep(1);  
                     _tickle_peer_socket = ::accept(  
                         _tickle_server_socket,  
                         reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),  
                         &peer_size);  
                     retries++;  
                 } while (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&  
                          getSocketError() == PEGASUS_NETWORK_TRYAGAIN &&  
                          retries < 20);  
             }  
             // TCP/IP is down, destroy sockets and retry again.  
             if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&  
                 getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)  
             {  
                 // destroy everything  
                 uninitializeTickler();  
                 // retry again.  
                 continue;  
             }  
         }  
         if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR)  
         {         {
             MessageLoaderParms parms(             MessageLoaderParms parms(
                 "Common.Monitor.TICKLE_ACCEPT",                 "Common.Monitor.TICKLE_ACCEPT",
                 "Received error number $0 while accepting the internal "              "Received error number $0 while accepting the internal socket "
                     "socket connection.",                  "connection.",
                 getSocketError());                 getSocketError());
             throw Exception(parms);             throw Exception(parms);
         }         }
         else  
         {      //
             // socket is ok      // Close the listen socket and make the other sockets non-blocking
             break;      //
   
       Socket::close(_listenSocket);
       Socket::disableBlocking(_serverSocket);
       Socket::disableBlocking(_clientSocket);
         }         }
     } while (1); // try until TCP/IP is restarted  
  
     Socket::disableBlocking(_tickle_peer_socket);  #endif
     Socket::disableBlocking(_tickle_client_socket);  
  
     // add the tickler to the list of entries to be monitored and set to  void Tickler::_uninitialize()
     // IDLE because Monitor only  {
     // checks entries with IDLE state for events      PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);  
     entry._status = _MonitorEntry::IDLE;  
  
     // is the tickler initalized as first socket on startup ?      try
     if (_entries.size()==0)  
     {     {
        // if yes, append a new entry          Socket::close(_serverSocket);
        _entries.append(entry);          Socket::close(_clientSocket);
           Socket::close(_listenSocket);
     }     }
     else      catch (...)
     {     {
        // if not, overwrite the tickler entry with new socket          PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
        _entries[0]=entry;              "Failed to close tickle sockets");
     }     }
       Socket::uninitializeInterface();
 } }
  
 void Monitor::tickle()  
   ////////////////////////////////////////////////////////////////////////////////
   //
   // Monitor
   //
   ////////////////////////////////////////////////////////////////////////////////
   
   #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
   Monitor::Monitor()
      : _stopConnections(0),
        _stopConnectionsSem(0),
        _solicitSocketCount(0)
 { {
     static char _buffer[] =      int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
       _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
   
       // Create a MonitorEntry for the Tickler and set its state to IDLE so the
       // Monitor will watch for its events.
       _entries.append(MonitorEntry(
           _tickler.getServerSocket(),
           1,
           MonitorEntry::STATUS_IDLE,
           MonitorEntry::TYPE_INTERNAL));
   
       // Start the count at 1 because _entries[0] is the Tickler
       for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
     {     {
       '0','0'          _entries.append(MonitorEntry());
     };      }
   }
  
     AutoMutex autoMutex(_tickle_mutex);  Monitor::~Monitor()
     Socket::write(_tickle_client_socket,&_buffer, 2);  {
       PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                     "returning from monitor destructor");
   }
   
   void Monitor::tickle()
   {
       Socket::write(_tickler.getClientSocket(), "\0", 1);
 } }
  
 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )  void Monitor::setState(
       Uint32 index,
       MonitorEntry::Status status)
 { {
       AutoMutex autoEntryMutex(_entriesMutex);
     // Set the state to requested state     // Set the state to requested state
     _entries[index]._status = status;      _entries[index].status = status;
 } }
  
 void Monitor::run(Uint32 milliseconds) void Monitor::run(Uint32 milliseconds)
 { {
   
     int i = 0;  
   
     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
  
     fd_set fdread;     fd_set fdread;
     FD_ZERO(&fdread);     FD_ZERO(&fdread);
  
     AutoMutex autoEntryMutex(_entry_mut);      AutoMutex autoEntryMutex(_entriesMutex);
  
     ArrayIterator<_MonitorEntry> entries(_entries);      ArrayIterator<MonitorEntry> entries(_entries);
  
     // Check the stopConnections flag.  If set, clear the Acceptor monitor     // Check the stopConnections flag.  If set, clear the Acceptor monitor
     // entries     // entries
     if (_stopConnections.get() == 1)     if (_stopConnections.get() == 1)
     {     {
         for ( int indx = 0; indx < (int)entries.size(); indx++)          for (Uint32 indx = 0; indx < entries.size(); indx++)
         {         {
             if (entries[indx]._type == Monitor::ACCEPTOR)              if (entries[indx].type == MonitorEntry::TYPE_ACCEPTOR)
             {             {
                 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)                  if (entries[indx].status != MonitorEntry::STATUS_EMPTY)
                 {                 {
                    if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||                      if (entries[indx].status == MonitorEntry::STATUS_IDLE ||
                         entries[indx]._status.get() == _MonitorEntry::DYING )                          entries[indx].status == MonitorEntry::STATUS_DYING)
                    {                    {
                        // remove the entry                        // remove the entry
                        entries[indx]._status = _MonitorEntry::EMPTY;                          entries[indx].status = MonitorEntry::STATUS_EMPTY;
                    }                    }
                    else                    else
                    {                    {
                        // set status to DYING                        // set status to DYING
                       entries[indx]._status = _MonitorEntry::DYING;                          entries[indx].status = MonitorEntry::STATUS_DYING;
                    }                    }
                }                }
            }            }
Line 386 
Line 398 
         _stopConnectionsSem.signal();         _stopConnectionsSem.signal();
     }     }
  
     for (int indx = 0; indx < (int)entries.size(); indx++)      for (Uint32 indx = 0; indx < entries.size(); indx++)
     {     {
         const _MonitorEntry &entry = entries[indx];          const MonitorEntry& entry = entries[indx];
         if ((entry._status.get() == _MonitorEntry::DYING) &&  
             (entry._type == Monitor::CONNECTION))          if ((entry.status == MonitorEntry::STATUS_DYING) &&
               (entry.type == MonitorEntry::TYPE_CONNECTION))
         {         {
             MessageQueue *q = MessageQueue::lookup(entry.queueId);             MessageQueue *q = MessageQueue::lookup(entry.queueId);
             PEGASUS_ASSERT(q != 0);             PEGASUS_ASSERT(q != 0);
Line 431 
Line 444 
             // unlocked will not result in an ArrayIndexOutOfBounds             // unlocked will not result in an ArrayIndexOutOfBounds
             // exception.             // exception.
  
             _entry_mut.unlock();              _entriesMutex.unlock();
             o.enqueue(message);             o.enqueue(message);
             _entry_mut.lock();              _entriesMutex.lock();
  
             // After enqueue a message and the autoEntryMutex has been             // After enqueue a message and the autoEntryMutex has been
             // released and locked again, the array of _entries can be             // released and locked again, the array of _entries can be
Line 452 
Line 465 
         because we have to traverse the entire array.         because we have to traverse the entire array.
     */     */
     SocketHandle maxSocketCurrentPass = 0;     SocketHandle maxSocketCurrentPass = 0;
     for (int indx = 0; indx < (int)entries.size(); indx++)      for (Uint32 indx = 0; indx < entries.size(); indx++)
     {     {
        if (maxSocketCurrentPass < entries[indx].socket)        if (maxSocketCurrentPass < entries[indx].socket)
            maxSocketCurrentPass = entries[indx].socket;            maxSocketCurrentPass = entries[indx].socket;
  
        if (entries[indx]._status.get() == _MonitorEntry::IDLE)          if (entries[indx].status == MonitorEntry::STATUS_IDLE)
        {        {
            _idleEntries++;            _idleEntries++;
            FD_SET(entries[indx].socket, &fdread);            FD_SET(entries[indx].socket, &fdread);
Line 470 
Line 483 
     */     */
     maxSocketCurrentPass++;     maxSocketCurrentPass++;
  
     _entry_mut.unlock();      _entriesMutex.unlock();
  
     //     //
     // The first argument to select() is ignored on Windows and it is not     // The first argument to select() is ignored on Windows and it is not
Line 482 
Line 495 
 #else #else
     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
 #endif #endif
     _entry_mut.lock();      int selectErrno = getSocketError();
   
       _entriesMutex.lock();
  
     struct timeval timeNow;     struct timeval timeNow;
     Time::gettimeofday(&timeNow);     Time::gettimeofday(&timeNow);
Line 495 
Line 510 
     if (events == PEGASUS_SOCKET_ERROR)     if (events == PEGASUS_SOCKET_ERROR)
     {     {
         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
             "Monitor::run - errorno = %d has occurred on select.", errno));              "Monitor::run - select() returned error %d.", selectErrno));
         // The EBADF error indicates that one or more or the file         // The EBADF error indicates that one or more or the file
         // descriptions was not valid. This could indicate that         // descriptions was not valid. This could indicate that
         // the entries structure has been corrupted or that         // the entries structure has been corrupted or that
         // we have a synchronization error.         // we have a synchronization error.
  
         PEGASUS_ASSERT(errno != EBADF);          PEGASUS_ASSERT(selectErrno != EBADF);
     }     }
     else if (events)     else if (events)
     {     {
Line 509 
Line 524 
             "Monitor::run select event received events = %d, monitoring %d "             "Monitor::run select event received events = %d, monitoring %d "
                 "idle entries",                 "idle entries",
             events, _idleEntries));             events, _idleEntries));
         for (int indx = 0; indx < (int)entries.size(); indx++)          for (Uint32 indx = 0; indx < entries.size(); indx++)
         {         {
             // The Monitor should only look at entries in the table that are             // The Monitor should only look at entries in the table that are
             // IDLE (i.e., owned by the Monitor).             // IDLE (i.e., owned by the Monitor).
             if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&              if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
                 (FD_ISSET(entries[indx].socket, &fdread)))                 (FD_ISSET(entries[indx].socket, &fdread)))
             {             {
                 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);                 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
                   PEGASUS_ASSERT(q != 0);
                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                     "Monitor::run indx = %d, queueId =  %d, q = %p",                     "Monitor::run indx = %d, queueId =  %d, q = %p",
                     indx, entries[indx].queueId, q));                     indx, entries[indx].queueId, q));
                 PEGASUS_ASSERT(q !=0);  
  
                 try                 try
                 {                 {
                     if (entries[indx]._type == Monitor::CONNECTION)                      if (entries[indx].type == MonitorEntry::TYPE_CONNECTION)
                     {                     {
                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                             "entries[indx].type for indx = %d is "                              "entries[%d].type is TYPE_CONNECTION",
                                 "Monitor::CONNECTION",  
                             indx));                             indx));
   
                         HTTPConnection *dst =                         HTTPConnection *dst =
                             reinterpret_cast<HTTPConnection *>(q);                             reinterpret_cast<HTTPConnection *>(q);
                         dst->_entry_index = indx;                         dst->_entry_index = indx;
Line 547 
Line 562 
                         if (!dst->closeConnectionOnTimeout(&timeNow))                         if (!dst->closeConnectionOnTimeout(&timeNow))
                         {                         {
                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                             "Monitor::_dispatch: entering run() for "                                  "Entering HTTPConnection::run() for "
                                 "indx = %d, queueId = %d, q = %p",                                 "indx = %d, queueId = %d, q = %p",
                             dst->_entry_index,                                  indx, entries[indx].queueId, q));
                                 dst->_monitor->  
                                     _entries[dst->_entry_index].queueId,  
                             dst));  
  
                         try                         try
                         {                         {
Line 560 
Line 572 
                         }                         }
                         catch (...)                         catch (...)
                         {                         {
                             PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,                                  PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL2,
                                 "Monitor::_dispatch: exception received");                                      "Caught exception from "
                         }                                      "HTTPConnection::run()");
                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,  
                             "Monitor::_dispatch: exited run() for index %d",  
                             dst->_entry_index));  
                         }                         }
                               PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                                   "Exited HTTPConnection::run()");
                     }                     }
                     else if (entries[indx]._type == Monitor::INTERNAL)  
                     {  
                         // set ourself to BUSY,  
                         // read the data  
                         // and set ourself back to IDLE  
  
                         entries[indx]._status = _MonitorEntry::BUSY;  
                         static char buffer[2];  
                         Sint32 amt =  
                             Socket::read(entries[indx].socket,&buffer, 2);  
  
                         if (amt == PEGASUS_SOCKET_ERROR &&  
                             getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)  
                         {  
                             PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,  
                                 "Monitor::run: Tickler socket got an IO error. "  
                                     "Going to re-create Socket and wait for "  
                                     "TCP/IP restart.");  
                             uninitializeTickler();  
                             initializeTickler();  
                         }                         }
                         else                      else if (entries[indx].type == MonitorEntry::TYPE_INTERNAL)
                         {                         {
                             entries[indx]._status = _MonitorEntry::IDLE;                          char buffer;
                         }                          Sint32 ignored =
                               Socket::read(entries[indx].socket, &buffer, 1);
                     }                     }
                     else                     else
                     {                     {
Line 604 
Line 598 
                         events |= SocketMessage::READ;                         events |= SocketMessage::READ;
                         Message* msg = new SocketMessage(                         Message* msg = new SocketMessage(
                             entries[indx].socket, events);                             entries[indx].socket, events);
                         entries[indx]._status = _MonitorEntry::BUSY;                          entries[indx].status = MonitorEntry::STATUS_BUSY;
                         _entry_mut.unlock();                          _entriesMutex.unlock();
                         q->enqueue(msg);                         q->enqueue(msg);
                         _entry_mut.lock();                          _entriesMutex.lock();
  
                         // After enqueue a message and the autoEntryMutex has                         // After enqueue a message and the autoEntryMutex has
                         // been released and locked again, the array of                         // been released and locked again, the array of
                         // entries can be changed. The ArrayIterator has be                          // entries can be changed. The ArrayIterator has to be
                         // reset with the original _entries                          // reset with the latest _entries.
                         entries.reset(_entries);                         entries.reset(_entries);
                         entries[indx]._status = _MonitorEntry::IDLE;                          entries[indx].status = MonitorEntry::STATUS_IDLE;
                     }                     }
                 }                 }
                 catch (...)                 catch (...)
Line 624 
Line 618 
             // else check for accept pending (ie. SSL handshake pending) or             // else check for accept pending (ie. SSL handshake pending) or
             // idle connection timeouts for sockets from which we did not             // idle connection timeouts for sockets from which we did not
             // receive data.             // receive data.
             else if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&              else if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
                 entries[indx]._type == Monitor::CONNECTION)                  entries[indx].type == MonitorEntry::TYPE_CONNECTION)
   
             {             {
                 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);                 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
                 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);                 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
Line 638 
Line 633 
     // to check if there are any pending SSL handshakes that have timed out.     // to check if there are any pending SSL handshakes that have timed out.
     else     else
     {     {
         for (int indx = 0; indx < (int)entries.size(); indx++)          for (Uint32 indx = 0; indx < entries.size(); indx++)
         {         {
             if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&              if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
                 entries[indx]._type == Monitor::CONNECTION)                  entries[indx].type == MonitorEntry::TYPE_CONNECTION)
             {             {
                 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);                 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
                 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);                 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
Line 675 
Line 670 
     SocketHandle socket,     SocketHandle socket,
     Uint32 events,     Uint32 events,
     Uint32 queueId,     Uint32 queueId,
     int type)      Uint32 type)
 { {
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
     AutoMutex autoMut(_entry_mut);      AutoMutex autoMut(_entriesMutex);
   
     // Check to see if we need to dynamically grow the _entries array     // Check to see if we need to dynamically grow the _entries array
     // We always want the _entries array to 2 bigger than the      // We always want the _entries array to be 2 bigger than the
     // current connections requested     // current connections requested
     _solicitSocketCount++;  // bump the count     _solicitSocketCount++;  // bump the count
     int size = (int)_entries.size();  
     if ((int)_solicitSocketCount >= (size-1))      for (Uint32 i = _entries.size(); i < _solicitSocketCount + 1; i++)
     {  
         for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)  
         {         {
             _MonitorEntry entry(0, 0, 0);          _entries.append(MonitorEntry());
             _entries.append(entry);  
         }  
     }     }
  
     int index;      for (Uint32 index = 1; index < _entries.size(); index++)
     for (index = 1; index < (int)_entries.size(); index++)  
     {     {
         try         try
         {         {
             if (_entries[index]._status.get() == _MonitorEntry::EMPTY)              if (_entries[index].status == MonitorEntry::STATUS_EMPTY)
             {             {
                 _entries[index].socket = socket;                 _entries[index].socket = socket;
                 _entries[index].queueId  = queueId;                 _entries[index].queueId  = queueId;
                 _entries[index]._type = type;                  _entries[index].type = type;
                 _entries[index]._status = _MonitorEntry::IDLE;                  _entries[index].status = MonitorEntry::STATUS_IDLE;
  
                 return index;                  return (int)index;
             }             }
         }         }
         catch (...)         catch (...)
Line 721 
Line 712 
 void Monitor::unsolicitSocketMessages(SocketHandle socket) void Monitor::unsolicitSocketMessages(SocketHandle socket)
 { {
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
     AutoMutex autoMut(_entry_mut);      AutoMutex autoMut(_entriesMutex);
  
     /*     /*
         Start at index = 1 because _entries[0] is the tickle entry which         Start at index = 1 because _entries[0] is the tickle entry which
         never needs to be EMPTY;          never needs to be reset to EMPTY;
     */     */
     unsigned int index;      for (Uint32 index = 1; index < _entries.size(); index++)
     for (index = 1; index < _entries.size(); index++)  
     {     {
         if (_entries[index].socket == socket)         if (_entries[index].socket == socket)
         {         {
             _entries[index]._status = _MonitorEntry::EMPTY;              _entries[index].reset();
             _entries[index].socket = PEGASUS_INVALID_SOCKET;  
             _solicitSocketCount--;             _solicitSocketCount--;
             break;             break;
         }         }
Line 746 
Line 735 
         first NON EMPTY.  This prevents the positions, of the NON EMPTY         first NON EMPTY.  This prevents the positions, of the NON EMPTY
         entries, from being changed.         entries, from being changed.
     */     */
     index = _entries.size() - 1;      for (Uint32 index = _entries.size() - 1;
     while (_entries[index]._status.get() == _MonitorEntry::EMPTY)           (_entries[index].status == MonitorEntry::STATUS_EMPTY) &&
                (index >= MAX_NUMBER_OF_MONITOR_ENTRIES);
            index--)
     {     {
         if (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)  
                 _entries.remove(index);                 _entries.remove(index);
         index--;  
     }  
     PEG_METHOD_EXIT();  
 } }
  
 // Note: this is no longer called with PEP 183.      PEG_METHOD_EXIT();
 ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm)  
 {  
     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);  
     PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,  
         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, "  
             "q = %p",  
         dst->_entry_index,  
         dst->_monitor->_entries[dst->_entry_index].queueId,  
         dst));  
   
     try  
     {  
         dst->run(1);  
     }  
     catch (...)  
     {  
         PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,  
             "Monitor::_dispatch: exception received");  
     }  
     PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,  
         "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));  
   
     PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==  
         _MonitorEntry::BUSY);  
   
     // Once the HTTPConnection thread has set the status value to either  
     // Monitor::DYING or Monitor::IDLE, it has returned control of the  
     // connection to the Monitor.  It is no longer permissible to access the  
     // connection or the entry in the _entries table.  
     if (dst->_connectionClosePending)  
     {  
         dst->_monitor->_entries[dst->_entry_index]._status =  
             _MonitorEntry::DYING;  
     }  
     else  
     {  
         dst->_monitor->_entries[dst->_entry_index]._status =  
             _MonitorEntry::IDLE;  
     }  
     return 0;  
 } }
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.116.4.2  
changed lines
  Added in v.1.131

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2