(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.107 and 1.122

version 1.107, 2006/06/26 22:56:40 version 1.122, 2007/10/19 18:12:26
Line 29 
Line 29 
 // //
 //============================================================================== //==============================================================================
 // //
 // Author: Mike Brasher (mbrasher@bmc.com)  
 //  
 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com  
 //              Amit K Arora (Bug#1153) amita@in.ibm.com  
 //              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090  
 //              Sushma Fernandes (sushma@hp.com) for Bug#2057  
 //              Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101  
 //              Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)  
 //  
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include "Network.h" #include "Network.h"
Line 51 
Line 42 
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
 #include "ArrayIterator.h" #include "ArrayIterator.h"
   #include "HostAddress.h"
   #include <errno.h>
  
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
Line 60 
Line 53 
  
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
 // _getError()  // Tickler
 // //
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
  
 static inline int _getError()  Tickler::Tickler()
       : _listenSocket(PEGASUS_INVALID_SOCKET),
         _clientSocket(PEGASUS_INVALID_SOCKET),
         _serverSocket(PEGASUS_INVALID_SOCKET)
 { {
 #ifdef PEGASUS_OS_TYPE_WINDOWS  
     return WSAGetLastError()  
 #else  
     return errno;  
 #endif  
 } }
  
 ////////////////////////////////////////////////////////////////////////////////  Tickler::~Tickler()
 //  {
 // Monitor      uninitialize();
 //  }
 ////////////////////////////////////////////////////////////////////////////////  
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32  Boolean Tickler::initialize()
 Monitor::Monitor()  
    : _stopConnections(0),  
      _stopConnectionsSem(0),  
      _solicitSocketCount(0),  
      _tickle_client_socket(-1),  
      _tickle_server_socket(-1),  
      _tickle_peer_socket(-1)  
 { {
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;      //
     Socket::initializeInterface();      // Set up the addresses for the listen, client, and server sockets
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);      // based on whether IPv6 is enabled.
       //
  
     // setup the tickler  #ifdef PEGASUS_ENABLE_IPV6
     initializeTickler();      struct sockaddr_storage listenAddress;
       struct sockaddr_storage clientAddress;
       struct sockaddr_storage serverAddress;
   #else
       struct sockaddr_in listenAddress;
       struct sockaddr_in clientAddress;
       struct sockaddr_in serverAddress;
   #endif
  
     // Start the count at 1 because initilizeTickler()      int addressFamily;
     // has added an entry in the first position of the      SocketLength addressLength;
     // _entries array  
     for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )  
     {  
        _MonitorEntry entry(0, 0, 0);  
        _entries.append(entry);  
     }  
 }  
  
 Monitor::~Monitor()  #ifdef PEGASUS_ENABLE_IPV6
       if (System::isIPv6StackActive())
 { {
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");          // Use the IPv6 loopback address for the listen sockets
           HostAddress::convertTextToBinary(
               HostAddress::AT_IPV6,
               "::1",
               &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
           listenAddress.ss_family = AF_INET6;
           reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
  
     try{          addressFamily = AF_INET6;
         if(_tickle_peer_socket >= 0)          addressLength = sizeof(struct sockaddr_in6);
         {  
             Socket::close(_tickle_peer_socket);  
         }  
         if(_tickle_client_socket >= 0)  
         {  
             Socket::close(_tickle_client_socket);  
         }  
         if(_tickle_server_socket >= 0)  
         {  
             Socket::close(_tickle_server_socket);  
         }         }
     }      else
     catch(...)  #endif
     {     {
         Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          // Use the IPv4 loopback address for the listen sockets
                   "Failed to close tickle sockets");          HostAddress::convertTextToBinary(
     }              HostAddress::AT_IPV4,
               "127.0.0.1",
               &reinterpret_cast<struct sockaddr_in*>(
                   &listenAddress)->sin_addr.s_addr);
           reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
               AF_INET;
           reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
  
     Socket::uninitializeInterface();          addressFamily = AF_INET;
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          addressLength = sizeof(struct sockaddr_in);
                   "returning from monitor destructor");  
 } }
  
 void Monitor::initializeTickler(){      // Use the same address for the client socket as the listen socket
     /*      clientAddress = listenAddress;
        NOTE: On any errors trying to  
              setup out tickle connection,  
              throw an exception/end the server  
     */  
  
     /* setup the tickle server/listener */      //
       // Set up a listen socket to allow the tickle client and server to connect
       //
  
     // get a socket for the server side      // Create the listen socket
     if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){      if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
         //handle error               PEGASUS_INVALID_SOCKET)
         MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",      {
           MessageLoaderParms parms(
               "Common.Monitor.TICKLE_CREATE",
                                  "Received error number $0 while creating the internal socket.",                                  "Received error number $0 while creating the internal socket.",
                                  _getError());              getSocketError());
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // set TCP_NODELAY      // Bind the listen socket to the loopback address
     int opt = 1;      if (::bind(
     setsockopt(_tickle_server_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));              _listenSocket,
               reinterpret_cast<struct sockaddr*>(&listenAddress),
     // initialize the address              addressLength) < 0)
     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));      {
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(37)  
 #endif  
     _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM  
 #pragma convert(0)  
 #endif  
     _tickle_server_addr.sin_family = PF_INET;  
     _tickle_server_addr.sin_port = 0;  
   
     socklen_t _addr_size = sizeof(_tickle_server_addr);  
   
     // bind server side to socket  
     if((::bind(_tickle_server_socket,  
                reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),  
                sizeof(_tickle_server_addr))) < 0){  
         // handle error  
 #ifdef PEGASUS_OS_ZOS #ifdef PEGASUS_OS_ZOS
     MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",          MessageLoaderParms parms(
                                  "Received error:$0 while binding the internal socket.",strerror(errno));              "Common.Monitor.TICKLE_BIND_LONG",
               "Received error:$0 while binding the internal socket.",
               strerror(errno));
 #else #else
         MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",          MessageLoaderParms parms(
               "Common.Monitor.TICKLE_BIND",
                                  "Received error number $0 while binding the internal socket.",                                  "Received error number $0 while binding the internal socket.",
                                  _getError());              getSocketError());
 #endif #endif
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // tell the kernel we are a server      // Listen for a connection from the tickle client
     if((::listen(_tickle_server_socket,3)) < 0){      if ((::listen(_listenSocket, 3)) < 0)
         // handle error      {
         MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",          MessageLoaderParms parms(
               "Common.Monitor.TICKLE_LISTEN",
                          "Received error number $0 while listening to the internal socket.",                          "Received error number $0 while listening to the internal socket.",
                                  _getError());              getSocketError());
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // make sure we have the correct socket for our server      // Verify we have the correct listen socket
     int sock = ::getsockname(_tickle_server_socket,      SocketLength tmpAddressLength = addressLength;
                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),      int sock = ::getsockname(
                    &_addr_size);          _listenSocket,
     if(sock < 0){          reinterpret_cast<struct sockaddr*>(&listenAddress),
         // handle error          &tmpAddressLength);
         MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",      if (sock < 0)
       {
           MessageLoaderParms parms(
               "Common.Monitor.TICKLE_SOCKNAME",
                          "Received error number $0 while getting the internal socket name.",                          "Received error number $0 while getting the internal socket name.",
                                  _getError());              getSocketError());
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     /* set up the tickle client/connector */      //
       // Set up the client side of the tickle connection.
       //
  
     // get a socket for our tickle client      // Create the client socket
     if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){      if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
         // handle error               PEGASUS_INVALID_SOCKET)
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",      {
                          "Received error number $0 while creating the internal client socket.",          MessageLoaderParms parms(
                                  _getError());              "Common.Monitor.TICKLE_CLIENT_CREATE",
               "Received error number $0 while creating the internal client "
                   "socket.",
               getSocketError());
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // set TCP_NODELAY      // Bind the client socket to the loopback address
     setsockopt(_tickle_client_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));      if (::bind(
               _clientSocket,
     // setup the address of the client              reinterpret_cast<struct sockaddr*>(&clientAddress),
     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));              addressLength) < 0)
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM      {
 #pragma convert(37)          MessageLoaderParms parms(
 #endif              "Common.Monitor.TICKLE_CLIENT_BIND",
     _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");              "Received error number $0 while binding the internal client "
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM                  "socket.",
 #pragma convert(0)              getSocketError());
 #endif  
     _tickle_client_addr.sin_family = PF_INET;  
     _tickle_client_addr.sin_port = 0;  
   
     // bind socket to client side  
     if((::bind(_tickle_client_socket,  
                reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),  
                sizeof(_tickle_client_addr))) < 0){  
         // handle error  
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",  
                          "Received error number $0 while binding the internal client socket.",  
                                  _getError());  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     // connect to server side      // Connect the client socket to the listen socket address
     if((::connect(_tickle_client_socket,      if (::connect(
                   reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),              _clientSocket,
                   sizeof(_tickle_server_addr))) < 0){              reinterpret_cast<struct sockaddr*>(&listenAddress),
         // handle error              addressLength) < 0)
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",      {
                          "Received error number $0 while connecting the internal client socket.",          MessageLoaderParms parms(
                                  _getError());              "Common.Monitor.TICKLE_CLIENT_CONNECT",
               "Received error number $0 while connecting the internal client "
                   "socket.",
               getSocketError());
         throw Exception(parms);         throw Exception(parms);
     }     }
  
     /* set up the slave connection */      //
     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));      // Set up the server side of the tickle connection.
     socklen_t peer_size = sizeof(_tickle_peer_addr);      //
     pegasus_sleep(1);  
       tmpAddressLength = addressLength;
     // this call may fail, we will try a max of 20 times to establish this peer connection  
     if((_tickle_peer_socket = ::accept(_tickle_server_socket,      // Accept the client socket connection.  The accept call may fail with
             reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),      // EAGAIN.  Try a max of 20 times to establish this connection.
             &peer_size)) < 0){      unsigned int retries = 0;
 #if !defined(PEGASUS_OS_TYPE_WINDOWS)  
         // Only retry on non-windows platforms.  
         if(_tickle_peer_socket == -1 && errno == EAGAIN)  
         {  
           int retries = 0;  
           do           do
           {           {
             pegasus_sleep(1);          _serverSocket = ::accept(
             _tickle_peer_socket = ::accept(_tickle_server_socket,              _listenSocket,
                 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),              reinterpret_cast<struct sockaddr*>(&serverAddress),
                 &peer_size);              &tmpAddressLength);
             retries++;  
           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);          if ((_serverSocket != PEGASUS_SOCKET_ERROR) ||
               (getSocketError() != PEGASUS_NETWORK_TRYAGAIN))
           {
               break;
         }         }
 #endif  
           Threads::sleep(1);
           retries++;
       } while (retries <= 20);
   
       if (_serverSocket == PEGASUS_SOCKET_ERROR)
       {
           if (getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
           {
               // TCP/IP is down
               uninitialize();
               return false;
     }     }
     if(_tickle_peer_socket == -1){  
         // handle error          MessageLoaderParms parms(
         MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",              "Common.Monitor.TICKLE_ACCEPT",
                          "Received error number $0 while accepting the internal socket connection.",              "Received error number $0 while accepting the internal socket "
                                  _getError());                  "connection.",
               getSocketError());
         throw Exception(parms);         throw Exception(parms);
     }     }
     // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only  
     // checks entries with IDLE state for events      //
     _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);      // Close the listen socket and make the other sockets non-blocking
     entry._status = _MonitorEntry::IDLE;      //
   
       Socket::close(_listenSocket);
       _listenSocket = PEGASUS_INVALID_SOCKET;
   
       Socket::disableBlocking(_serverSocket);
       Socket::disableBlocking(_clientSocket);
       return true;
   }
   
   void Tickler::uninitialize()
   {
       PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
   
       try
       {
           if (_serverSocket != PEGASUS_INVALID_SOCKET)
           {
               Socket::close(_serverSocket);
               _serverSocket = PEGASUS_INVALID_SOCKET;
           }
           if (_clientSocket != PEGASUS_INVALID_SOCKET)
           {
               Socket::close(_clientSocket);
               _clientSocket = PEGASUS_INVALID_SOCKET;
           }
           if (_listenSocket != PEGASUS_INVALID_SOCKET)
           {
               Socket::close(_listenSocket);
               _listenSocket = PEGASUS_INVALID_SOCKET;
           }
       }
       catch (...)
       {
           PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
               "Failed to close tickle sockets");
       }
   }
   
   
   ////////////////////////////////////////////////////////////////////////////////
   //
   // Monitor
   //
   ////////////////////////////////////////////////////////////////////////////////
   
   #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
   Monitor::Monitor()
      : _stopConnections(0),
        _stopConnectionsSem(0),
        _solicitSocketCount(0)
   {
       int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
       Socket::initializeInterface();
       _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
   
       // setup the tickler
       initializeTickler();
   
       // Start the count at 1 because initilizeTickler()
       // has added an entry in the first position of the
       // _entries array
       for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
       {
          _MonitorEntry entry(0, 0, 0);
     _entries.append(entry);     _entries.append(entry);
 } }
   }
   
   Monitor::~Monitor()
   {
       _tickler.uninitialize();
       Socket::uninitializeInterface();
       PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                     "returning from monitor destructor");
   }
  
 void Monitor::tickle(void)  void Monitor::initializeTickler()
 { {
     static char _buffer[] =      while (!_tickler.initialize())
     {     {
       '0','0'          // Retry until TCP/IP is started
     };      }
  
     AutoMutex autoMutex(_tickle_mutex);      // Create a MonitorEntry for the tickler and set its state to IDLE so the
     Socket::disableBlocking(_tickle_client_socket);      // Monitor will watch for its events.
     Socket::write(_tickle_client_socket,&_buffer, 2);      _MonitorEntry entry(_tickler.getServerSocket(), 1, INTERNAL);
     Socket::enableBlocking(_tickle_client_socket);      entry._status = _MonitorEntry::IDLE;
   
       if (_entries.size() == 0)
       {
           // The tickler has not been initialized before; add its entry at the
           // beginning of the list.
           _entries.append(entry);
       }
       else
       {
           // Overwrite the existing tickler entry.
           _entries[0] = entry;
       }
   }
   
   void Monitor::tickle()
   {
       AutoMutex autoMutex(_tickleMutex);
       Socket::write(_tickler.getClientSocket(), "\0\0", 2);
 } }
  
 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )  void Monitor::setState(
       Uint32 index,
       _MonitorEntry::entry_status status)
 { {
       AutoMutex autoEntryMutex(_entry_mut);
     // Set the state to requested state     // Set the state to requested state
     _entries[index]._status = status;     _entries[index]._status = status;
 } }
  
 Boolean Monitor::run(Uint32 milliseconds)  void Monitor::run(Uint32 milliseconds)
 { {
   
     Boolean handled_events = false;  
     int i = 0;  
   
     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};     struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
  
     fd_set fdread;     fd_set fdread;
Line 328 
Line 400 
  
     ArrayIterator<_MonitorEntry> entries(_entries);     ArrayIterator<_MonitorEntry> entries(_entries);
  
     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries      // Check the stopConnections flag.  If set, clear the Acceptor monitor
       // entries
     if (_stopConnections.get() == 1)     if (_stopConnections.get() == 1)
     {     {
         for ( int indx = 0; indx < (int)entries.size(); indx++)         for ( int indx = 0; indx < (int)entries.size(); indx++)
Line 376 
Line 449 
  
                                         if (h._responsePending == true)                                         if (h._responsePending == true)
                                         {                                         {
                                                 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "                  PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                                                                                                         "Ignoring connection delete request because "                      "Monitor::run - Ignoring connection delete request "
                                                                                                         "responses are still pending. "                          "because responses are still pending. "
                                                                                                         "connection=0x%p, socket=%d\n",                                                                                                         "connection=0x%p, socket=%d\n",
                                                                                                         (void *)&h, h.getSocket());                      (void *)&h, h.getSocket()));
                                                 continue;                                                 continue;
                                         }                                         }
                                         h._connectionClosePending = false;                                         h._connectionClosePending = false;
Line 400 
Line 473 
           // unlocked will not result in an ArrayIndexOutOfBounds           // unlocked will not result in an ArrayIndexOutOfBounds
           // exception.           // exception.
  
           autoEntryMutex.unlock();              _entry_mut.unlock();
           o.enqueue(message);           o.enqueue(message);
           autoEntryMutex.lock();              _entry_mut.lock();
           // After enqueue a message and the autoEntryMutex has been released and locked again,  
           // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.              // After enqueue a message and the autoEntryMutex has been
               // released and locked again, the array of _entries can be
               // changed. The ArrayIterator has be reset with the original
               // _entries.
           entries.reset(_entries);           entries.reset(_entries);
        }        }
     }     }
Line 436 
Line 512 
     */     */
     maxSocketCurrentPass++;     maxSocketCurrentPass++;
  
     autoEntryMutex.unlock();      _entry_mut.unlock();
  
     //     //
     // The first argument to select() is ignored on Windows and it is not     // The first argument to select() is ignored on Windows and it is not
Line 448 
Line 524 
 #else #else
     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
 #endif #endif
     autoEntryMutex.lock();      _entry_mut.lock();
     // After enqueue a message and the autoEntryMutex has been released and locked again,  
     // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries      // After enqueue a message and the autoEntryMutex has been released and
       // locked again, the array of _entries can be changed. The ArrayIterator
       // has be reset with the original _entries
     entries.reset(_entries);     entries.reset(_entries);
  
     if (events == PEGASUS_SOCKET_ERROR)     if (events == PEGASUS_SOCKET_ERROR)
     {     {
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run - errorno = %d has occurred on select.", errno);              "Monitor::run - errorno = %d has occurred on select.", errno));
        // The EBADF error indicates that one or more or the file        // The EBADF error indicates that one or more or the file
        // descriptions was not valid. This could indicate that        // descriptions was not valid. This could indicate that
        // the entries structure has been corrupted or that        // the entries structure has been corrupted or that
Line 466 
Line 544 
     }     }
     else if (events)     else if (events)
     {     {
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run select event received events = %d, monitoring %d idle entries",              "Monitor::run select event received events = %d, monitoring %d "
            events, _idleEntries);                  "idle entries",
               events, _idleEntries));
        for( int indx = 0; indx < (int)entries.size(); indx++)        for( int indx = 0; indx < (int)entries.size(); indx++)
        {        {
           // The Monitor should only look at entries in the table that are IDLE (i.e.,              // The Monitor should only look at entries in the table that are
           // owned by the Monitor).              // IDLE (i.e., owned by the Monitor).
           if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&           if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
              (FD_ISSET(entries[indx].socket, &fdread)))              (FD_ISSET(entries[indx].socket, &fdread)))
           {           {
              MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);              MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                  PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                   "Monitor::run indx = %d, queueId =  %d, q = %p",                   "Monitor::run indx = %d, queueId =  %d, q = %p",
                   indx, entries[indx].queueId, q);                      indx, entries[indx].queueId, q));
              PEGASUS_ASSERT(q !=0);              PEGASUS_ASSERT(q !=0);
  
              try              try
              {              {
                 if(entries[indx]._type == Monitor::CONNECTION)                 if(entries[indx]._type == Monitor::CONNECTION)
                 {                 {
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                      "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);                              "entries[indx].type for indx = %d is "
                                   "Monitor::CONNECTION",
                               indx));
                    static_cast<HTTPConnection *>(q)->_entry_index = indx;                    static_cast<HTTPConnection *>(q)->_entry_index = indx;
  
                    // Do not update the entry just yet. The entry gets updated once                          // Do not update the entry just yet. The entry gets
                    // the request has been read.                          // updated once the request has been read.
                    //entries[indx]._status = _MonitorEntry::BUSY;                    //entries[indx]._status = _MonitorEntry::BUSY;
  
                    // If allocate_and_awaken failure, retry on next iteration                          // If allocate_and_awaken failure, retry on next
                           // iteration
 /* Removed for PEP 183. /* Removed for PEP 183.
                    if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(                          if (!MessageQueueService::get_thread_pool()->
                            (void *)q, _dispatch))                                  allocate_and_awaken((void *)q, _dispatch))
                    {                    {
                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                              PEG_TRACE_CSTRING(TRC_DISCARDED_DATA,
                           "Monitor::run: Insufficient resources to process request.");                                  Tracer::LEVEL2,
                                   "Monitor::run: Insufficient resources to "
                                       "process request.");
                       entries[indx]._status = _MonitorEntry::IDLE;                       entries[indx]._status = _MonitorEntry::IDLE;
                       return true;                       return true;
                    }                    }
 */ */
 // Added for PEP 183 // Added for PEP 183
                    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);                          HTTPConnection *dst =
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                              reinterpret_cast<HTTPConnection *>(q);
                          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                    dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);                              "Monitor::_dispatch: entering run() for "
                                   "indx = %d, queueId = %d, q = %p",
                               dst->_entry_index,
                               dst->_monitor->_entries[dst->_entry_index].queueId,
                               dst));
   
                    try                    try
                    {                    {
                        dst->run(1);                        dst->run(1);
                    }                    }
                    catch (...)                    catch (...)
                    {                    {
                        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                              PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                        "Monitor::_dispatch: exception received");                        "Monitor::_dispatch: exception received");
                    }                    }
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                    "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);                              "Monitor::_dispatch: exited run() for index %d",
                               dst->_entry_index));
                    // It is possible the entry status may not be set to busy.  
                    // The following will fail in that case.                          // It is possible the entry status may not be set to
                    // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);                          // busy.  The following will fail in that case.
                    // Once the HTTPConnection thread has set the status value to either                          // PEGASUS_ASSERT(dst->_monitor->_entries[
                    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection                          //     dst->_entry_index]._status.get() ==
                    // to the Monitor.  It is no longer permissible to access the connection                          //    _MonitorEntry::BUSY);
                    // or the entry in the _entries table.                          // Once the HTTPConnection thread has set the status
                           // value to either Monitor::DYING or Monitor::IDLE,
                    // The following is not relevant as the worker thread or the                          // it has returned control of the connection to the
                    // reader thread will update the status of the entry.                          // Monitor.  It is no longer permissible to access
                           // the connection or the entry in the _entries table.
   
                           // The following is not relevant as the worker thread
                           // or the reader thread will update the status of the
                           // entry.
                    //if (dst->_connectionClosePending)                    //if (dst->_connectionClosePending)
                    //{                    //{
                    //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;                          //  dst->_monitor->_entries[dst->_entry_index]._status =
                           //    _MonitorEntry::DYING;
                    //}                    //}
                    //else                    //else
                    //{                    //{
                    //  dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;                          //  dst->_monitor->_entries[dst->_entry_index]._status =
                           //    _MonitorEntry::IDLE;
                    //}                    //}
 // end Added for PEP 183 // end Added for PEP 183
                 }                 }
                 else if( entries[indx]._type == Monitor::INTERNAL){                      else if (entries[indx]._type == Monitor::INTERNAL)
                       {
                         // set ourself to BUSY,                         // set ourself to BUSY,
                         // read the data                         // read the data
                         // and set ourself back to IDLE                         // and set ourself back to IDLE
  
                         entries[indx]._status = _MonitorEntry::BUSY;                         entries[indx]._status = _MonitorEntry::BUSY;
                         static char buffer[2];                         static char buffer[2];
                         Socket::disableBlocking(entries[indx].socket);                          Sint32 amt =
                         Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);                              Socket::read(entries[indx].socket,&buffer, 2);
                         Socket::enableBlocking(entries[indx].socket);  
                           if (amt == PEGASUS_SOCKET_ERROR &&
                               getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
                           {
                               PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                                   "Monitor::run: Tickler socket got an IO error. "
                                       "Going to re-create Socket and wait for "
                                       "TCP/IP restart.");
                               _tickler.uninitialize();
                               initializeTickler();
                           }
                           else
                           {
                         entries[indx]._status = _MonitorEntry::IDLE;                         entries[indx]._status = _MonitorEntry::IDLE;
                 }                 }
                       }
                 else                 else
                 {                 {
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                          PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                      "Non-connection entry, indx = %d, has been received.", indx);                              "Non-connection entry, indx = %d, has been "
                                   "received.",
                               indx));
                    int events = 0;                    int events = 0;
                    events |= SocketMessage::READ;                    events |= SocketMessage::READ;
                    Message *msg = new SocketMessage(entries[indx].socket, events);                          Message* msg = new SocketMessage(
                               entries[indx].socket, events);
                    entries[indx]._status = _MonitorEntry::BUSY;                    entries[indx]._status = _MonitorEntry::BUSY;
                    autoEntryMutex.unlock();                          _entry_mut.unlock();
                    q->enqueue(msg);                    q->enqueue(msg);
                    autoEntryMutex.lock();                          _entry_mut.lock();
            // After enqueue a message and the autoEntryMutex has been released and locked again,  
            // the array of entries can be changed. The ArrayIterator has be reset with the original _entries                          // After enqueue a message and the autoEntryMutex has
                           // been released and locked again, the array of
                           // entries can be changed. The ArrayIterator has be
                           // reset with the original _entries
            entries.reset(_entries);            entries.reset(_entries);
                    entries[indx]._status = _MonitorEntry::IDLE;                    entries[indx]._status = _MonitorEntry::IDLE;
   
                    return true;  
                 }                 }
              }              }
              catch(...)              catch(...)
              {              {
              }              }
              handled_events = true;  
           }           }
        }        }
     }     }
   
     return(handled_events);  
 } }
  
 void Monitor::stopListeningForConnections(Boolean wait) void Monitor::stopListeningForConnections(Boolean wait)
Line 616 
Line 727 
    // current connections requested    // current connections requested
    _solicitSocketCount++;  // bump the count    _solicitSocketCount++;  // bump the count
    int size = (int)_entries.size();    int size = (int)_entries.size();
    if((int)_solicitSocketCount >= (size-1)){      if ((int)_solicitSocketCount >= (size-1))
         for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){      {
           for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)
           {
                 _MonitorEntry entry(0, 0, 0);                 _MonitorEntry entry(0, 0, 0);
                 _entries.append(entry);                 _entries.append(entry);
         }         }
Line 642 
Line 755 
       {       {
       }       }
    }    }
    _solicitSocketCount--;  // decrease the count, if we are here we didnt do anything meaningful      // decrease the count, if we are here we didn't do anything meaningful
       _solicitSocketCount--;
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
    return -1;    return -1;
   
 } }
  
 void Monitor::unsolicitSocketMessages(SocketHandle socket) void Monitor::unsolicitSocketMessages(SocketHandle socket)
 { {
   
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
     AutoMutex autoMut(_entry_mut);     AutoMutex autoMut(_entry_mut);
  
     /*     /*
         Start at index = 1 because _entries[0] is the tickle entry which never needs          Start at index = 1 because _entries[0] is the tickle entry which
         to be EMPTY;          never needs to be EMPTY;
     */     */
     unsigned int index;     unsigned int index;
     for(index = 1; index < _entries.size(); index++)     for(index = 1; index < _entries.size(); index++)
Line 672 
Line 784 
  
     /*     /*
         Dynamic Contraction:         Dynamic Contraction:
         To remove excess entries we will start from the end of the _entries array          To remove excess entries we will start from the end of the _entries
         and remove all entries with EMPTY status until we find the first NON EMPTY.          array and remove all entries with EMPTY status until we find the
         This prevents the positions, of the NON EMPTY entries, from being changed.          first NON EMPTY.  This prevents the positions, of the NON EMPTY
           entries, from being changed.
     */     */
     index = _entries.size() - 1;     index = _entries.size() - 1;
     while(_entries[index]._status.get() == _MonitorEntry::EMPTY){      while (_entries[index]._status.get() == _MonitorEntry::EMPTY)
       {
         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
                 _entries.remove(index);                 _entries.remove(index);
         index--;         index--;
Line 686 
Line 800 
 } }
  
 // Note: this is no longer called with PEP 183. // Note: this is no longer called with PEP 183.
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)  ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm)
 { {
    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);    HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,      PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, q = %p",          "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, "
         dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);              "q = %p",
           dst->_entry_index,
           dst->_monitor->_entries[dst->_entry_index].queueId,
           dst));
   
    try    try
    {    {
       dst->run(1);       dst->run(1);
    }    }
    catch (...)    catch (...)
    {    {
       Tracer::trace(TRC_HTTP, Tracer::LEVEL4,          PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::_dispatch: exception received");           "Monitor::_dispatch: exception received");
    }    }
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,      PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);          "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));
  
    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);      PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==
           _MonitorEntry::BUSY);
  
    // Once the HTTPConnection thread has set the status value to either    // Once the HTTPConnection thread has set the status value to either
    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection      // Monitor::DYING or Monitor::IDLE, it has returned control of the
    // to the Monitor.  It is no longer permissible to access the connection      // connection to the Monitor.  It is no longer permissible to access the
    // or the entry in the _entries table.      // connection or the entry in the _entries table.
    if (dst->_connectionClosePending)    if (dst->_connectionClosePending)
    {    {
       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;          dst->_monitor->_entries[dst->_entry_index]._status =
               _MonitorEntry::DYING;
    }    }
    else    else
    {    {
       dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;          dst->_monitor->_entries[dst->_entry_index]._status =
               _MonitorEntry::IDLE;
    }    }
    return 0;    return 0;
 } }


Legend:
Removed from v.1.107  
changed lines
  Added in v.1.122

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2