(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.122 and 1.130

version 1.122, 2007/10/19 18:12:26 version 1.130, 2008/02/27 20:21:17
Line 43 
Line 43 
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
 #include "ArrayIterator.h" #include "ArrayIterator.h"
 #include "HostAddress.h" #include "HostAddress.h"
 #include <errno.h>  
  
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 static AtomicInt _connections(0);  
   
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
 // Tickler // Tickler
Line 62 
Line 59 
       _clientSocket(PEGASUS_INVALID_SOCKET),       _clientSocket(PEGASUS_INVALID_SOCKET),
       _serverSocket(PEGASUS_INVALID_SOCKET)       _serverSocket(PEGASUS_INVALID_SOCKET)
 { {
       try
       {
           _initialize();
       }
       catch (...)
       {
           _uninitialize();
           throw;
       }
 } }
  
 Tickler::~Tickler() Tickler::~Tickler()
 { {
     uninitialize();      _uninitialize();
   }
   
   #if defined(PEGASUS_OS_TYPE_UNIX)
   
   // Use an anonymous pipe for the tickle connection.
   
   void Tickler::_initialize()
   {
       int fds[2];
   
       if (pipe(fds) == -1)
       {
           MessageLoaderParms parms(
               "Common.Monitor.TICKLE_CREATE",
               "Received error number $0 while creating the internal socket.",
               getSocketError());
           throw Exception(parms);
 } }
  
 Boolean Tickler::initialize()      _serverSocket = fds[0];
       _clientSocket = fds[1];
   }
   
   #else
   
   // Use an external loopback socket connection to allow the tickle socket to
   // be included in the select() array on non-Unix platforms.
   
   void Tickler::_initialize()
 { {
     //     //
     // Set up the addresses for the listen, client, and server sockets     // Set up the addresses for the listen, client, and server sockets
     // based on whether IPv6 is enabled.     // based on whether IPv6 is enabled.
     //     //
  
       Socket::initializeInterface();
   
 #ifdef PEGASUS_ENABLE_IPV6 #ifdef PEGASUS_ENABLE_IPV6
     struct sockaddr_storage listenAddress;     struct sockaddr_storage listenAddress;
     struct sockaddr_storage clientAddress;     struct sockaddr_storage clientAddress;
Line 89 
Line 123 
     int addressFamily;     int addressFamily;
     SocketLength addressLength;     SocketLength addressLength;
  
       memset(&listenAddress, 0, sizeof (listenAddress));
   
 #ifdef PEGASUS_ENABLE_IPV6 #ifdef PEGASUS_ENABLE_IPV6
     if (System::isIPv6StackActive())     if (System::isIPv6StackActive())
     {     {
Line 144 
Line 180 
             reinterpret_cast<struct sockaddr*>(&listenAddress),             reinterpret_cast<struct sockaddr*>(&listenAddress),
             addressLength) < 0)             addressLength) < 0)
     {     {
 #ifdef PEGASUS_OS_ZOS  
         MessageLoaderParms parms(  
             "Common.Monitor.TICKLE_BIND_LONG",  
             "Received error:$0 while binding the internal socket.",  
             strerror(errno));  
 #else  
         MessageLoaderParms parms(         MessageLoaderParms parms(
             "Common.Monitor.TICKLE_BIND",             "Common.Monitor.TICKLE_BIND",
             "Received error number $0 while binding the internal socket.",             "Received error number $0 while binding the internal socket.",
             getSocketError());             getSocketError());
 #endif  
         throw Exception(parms);         throw Exception(parms);
     }     }
  
Line 233 
Line 262 
  
     tmpAddressLength = addressLength;     tmpAddressLength = addressLength;
  
     // Accept the client socket connection.  The accept call may fail with      // Accept the client socket connection.
     // EAGAIN.  Try a max of 20 times to establish this connection.  
     unsigned int retries = 0;  
     do  
     {  
         _serverSocket = ::accept(         _serverSocket = ::accept(
             _listenSocket,             _listenSocket,
             reinterpret_cast<struct sockaddr*>(&serverAddress),             reinterpret_cast<struct sockaddr*>(&serverAddress),
             &tmpAddressLength);             &tmpAddressLength);
  
         if ((_serverSocket != PEGASUS_SOCKET_ERROR) ||  
             (getSocketError() != PEGASUS_NETWORK_TRYAGAIN))  
         {  
             break;  
         }  
   
         Threads::sleep(1);  
         retries++;  
     } while (retries <= 20);  
   
     if (_serverSocket == PEGASUS_SOCKET_ERROR)     if (_serverSocket == PEGASUS_SOCKET_ERROR)
     {     {
         if (getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)  
         {  
             // TCP/IP is down  
             uninitialize();  
             return false;  
         }  
   
         MessageLoaderParms parms(         MessageLoaderParms parms(
             "Common.Monitor.TICKLE_ACCEPT",             "Common.Monitor.TICKLE_ACCEPT",
             "Received error number $0 while accepting the internal socket "             "Received error number $0 while accepting the internal socket "
Line 275 
Line 283 
     //     //
  
     Socket::close(_listenSocket);     Socket::close(_listenSocket);
     _listenSocket = PEGASUS_INVALID_SOCKET;  
   
     Socket::disableBlocking(_serverSocket);     Socket::disableBlocking(_serverSocket);
     Socket::disableBlocking(_clientSocket);     Socket::disableBlocking(_clientSocket);
     return true;  
 } }
  
 void Tickler::uninitialize()  #endif
   
   void Tickler::_uninitialize()
 { {
     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
  
     try     try
     {     {
         if (_serverSocket != PEGASUS_INVALID_SOCKET)  
         {  
             Socket::close(_serverSocket);             Socket::close(_serverSocket);
             _serverSocket = PEGASUS_INVALID_SOCKET;  
         }  
         if (_clientSocket != PEGASUS_INVALID_SOCKET)  
         {  
             Socket::close(_clientSocket);             Socket::close(_clientSocket);
             _clientSocket = PEGASUS_INVALID_SOCKET;  
         }  
         if (_listenSocket != PEGASUS_INVALID_SOCKET)  
         {  
             Socket::close(_listenSocket);             Socket::close(_listenSocket);
             _listenSocket = PEGASUS_INVALID_SOCKET;  
         }  
     }     }
     catch (...)     catch (...)
     {     {
         PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,         PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
             "Failed to close tickle sockets");             "Failed to close tickle sockets");
     }     }
       Socket::uninitializeInterface();
 } }
  
  
Line 325 
Line 321 
      _solicitSocketCount(0)      _solicitSocketCount(0)
 { {
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
     Socket::initializeInterface();  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
  
     // setup the tickler      // Create a MonitorEntry for the Tickler and set its state to IDLE so the
     initializeTickler();      // Monitor will watch for its events.
       _entries.append(MonitorEntry(
           _tickler.getServerSocket(),
           1,
           MonitorEntry::STATUS_IDLE,
           MonitorEntry::TYPE_INTERNAL));
  
     // Start the count at 1 because initilizeTickler()      // Start the count at 1 because _entries[0] is the Tickler
     // has added an entry in the first position of the  
     // _entries array  
     for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)     for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
     {     {
        _MonitorEntry entry(0, 0, 0);          _entries.append(MonitorEntry());
        _entries.append(entry);  
     }     }
 } }
  
 Monitor::~Monitor() Monitor::~Monitor()
 { {
     _tickler.uninitialize();  
     Socket::uninitializeInterface();  
     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,     PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                   "returning from monitor destructor");                   "returning from monitor destructor");
 } }
  
 void Monitor::initializeTickler()  
 {  
     while (!_tickler.initialize())  
     {  
         // Retry until TCP/IP is started  
     }  
   
     // Create a MonitorEntry for the tickler and set its state to IDLE so the  
     // Monitor will watch for its events.  
     _MonitorEntry entry(_tickler.getServerSocket(), 1, INTERNAL);  
     entry._status = _MonitorEntry::IDLE;  
   
     if (_entries.size() == 0)  
     {  
         // The tickler has not been initialized before; add its entry at the  
         // beginning of the list.  
         _entries.append(entry);  
     }  
     else  
     {  
         // Overwrite the existing tickler entry.  
         _entries[0] = entry;  
     }  
 }  
   
 void Monitor::tickle() void Monitor::tickle()
 { {
     AutoMutex autoMutex(_tickleMutex);      Socket::write(_tickler.getClientSocket(), "\0", 1);
     Socket::write(_tickler.getClientSocket(), "\0\0", 2);  
 } }
  
 void Monitor::setState( void Monitor::setState(
     Uint32 index,     Uint32 index,
     _MonitorEntry::entry_status status)      MonitorEntry::Status status)
 { {
     AutoMutex autoEntryMutex(_entry_mut);      AutoMutex autoEntryMutex(_entriesMutex);
     // Set the state to requested state     // Set the state to requested state
     _entries[index]._status = status;      _entries[index].status = status;
 } }
  
 void Monitor::run(Uint32 milliseconds) void Monitor::run(Uint32 milliseconds)
Line 396 
Line 365 
     fd_set fdread;     fd_set fdread;
     FD_ZERO(&fdread);     FD_ZERO(&fdread);
  
     AutoMutex autoEntryMutex(_entry_mut);      AutoMutex autoEntryMutex(_entriesMutex);
  
     ArrayIterator<_MonitorEntry> entries(_entries);      ArrayIterator<MonitorEntry> entries(_entries);
  
     // Check the stopConnections flag.  If set, clear the Acceptor monitor     // Check the stopConnections flag.  If set, clear the Acceptor monitor
     // entries     // entries
     if (_stopConnections.get() == 1)     if (_stopConnections.get() == 1)
     {     {
         for ( int indx = 0; indx < (int)entries.size(); indx++)          for (Uint32 indx = 0; indx < entries.size(); indx++)
         {         {
             if (entries[indx]._type == Monitor::ACCEPTOR)              if (entries[indx].type == MonitorEntry::TYPE_ACCEPTOR)
             {             {
                 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)                  if (entries[indx].status != MonitorEntry::STATUS_EMPTY)
                 {                 {
                    if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||                      if (entries[indx].status == MonitorEntry::STATUS_IDLE ||
                         entries[indx]._status.get() == _MonitorEntry::DYING )                          entries[indx].status == MonitorEntry::STATUS_DYING)
                    {                    {
                        // remove the entry                        // remove the entry
                        entries[indx]._status = _MonitorEntry::EMPTY;                          entries[indx].status = MonitorEntry::STATUS_EMPTY;
                    }                    }
                    else                    else
                    {                    {
                        // set status to DYING                        // set status to DYING
                       entries[indx]._status = _MonitorEntry::DYING;                          entries[indx].status = MonitorEntry::STATUS_DYING;
                    }                    }
                }                }
            }            }
Line 428 
Line 397 
         _stopConnectionsSem.signal();         _stopConnectionsSem.signal();
     }     }
  
     for (int indx = 0; indx < (int)entries.size(); indx++)      for (Uint32 indx = 0; indx < entries.size(); indx++)
     {     {
         const _MonitorEntry &entry = entries[indx];          const MonitorEntry& entry = entries[indx];
         if ((entry._status.get() == _MonitorEntry::DYING) &&  
             (entry._type == Monitor::CONNECTION))          if ((entry.status == MonitorEntry::STATUS_DYING) &&
               (entry.type == MonitorEntry::TYPE_CONNECTION))
         {         {
             MessageQueue *q = MessageQueue::lookup(entry.queueId);             MessageQueue *q = MessageQueue::lookup(entry.queueId);
             PEGASUS_ASSERT(q != 0);             PEGASUS_ASSERT(q != 0);
Line 473 
Line 443 
             // unlocked will not result in an ArrayIndexOutOfBounds             // unlocked will not result in an ArrayIndexOutOfBounds
             // exception.             // exception.
  
             _entry_mut.unlock();              _entriesMutex.unlock();
             o.enqueue(message);             o.enqueue(message);
             _entry_mut.lock();              _entriesMutex.lock();
  
             // After enqueue a message and the autoEntryMutex has been             // After enqueue a message and the autoEntryMutex has been
             // released and locked again, the array of _entries can be             // released and locked again, the array of _entries can be
Line 494 
Line 464 
         because we have to traverse the entire array.         because we have to traverse the entire array.
     */     */
     SocketHandle maxSocketCurrentPass = 0;     SocketHandle maxSocketCurrentPass = 0;
     for (int indx = 0; indx < (int)entries.size(); indx++)      for (Uint32 indx = 0; indx < entries.size(); indx++)
     {     {
        if (maxSocketCurrentPass < entries[indx].socket)        if (maxSocketCurrentPass < entries[indx].socket)
            maxSocketCurrentPass = entries[indx].socket;            maxSocketCurrentPass = entries[indx].socket;
  
        if (entries[indx]._status.get() == _MonitorEntry::IDLE)          if (entries[indx].status == MonitorEntry::STATUS_IDLE)
        {        {
            _idleEntries++;            _idleEntries++;
            FD_SET(entries[indx].socket, &fdread);            FD_SET(entries[indx].socket, &fdread);
Line 512 
Line 482 
     */     */
     maxSocketCurrentPass++;     maxSocketCurrentPass++;
  
     _entry_mut.unlock();      _entriesMutex.unlock();
  
     //     //
     // The first argument to select() is ignored on Windows and it is not     // The first argument to select() is ignored on Windows and it is not
Line 524 
Line 494 
 #else #else
     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
 #endif #endif
     _entry_mut.lock();      int selectErrno = getSocketError();
   
       _entriesMutex.lock();
   
       struct timeval timeNow;
       Time::gettimeofday(&timeNow);
  
     // After enqueue a message and the autoEntryMutex has been released and     // After enqueue a message and the autoEntryMutex has been released and
     // locked again, the array of _entries can be changed. The ArrayIterator     // locked again, the array of _entries can be changed. The ArrayIterator
Line 534 
Line 509 
     if (events == PEGASUS_SOCKET_ERROR)     if (events == PEGASUS_SOCKET_ERROR)
     {     {
         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
             "Monitor::run - errorno = %d has occurred on select.", errno));              "Monitor::run - select() returned error %d.", selectErrno));
         // The EBADF error indicates that one or more or the file         // The EBADF error indicates that one or more or the file
         // descriptions was not valid. This could indicate that         // descriptions was not valid. This could indicate that
         // the entries structure has been corrupted or that         // the entries structure has been corrupted or that
         // we have a synchronization error.         // we have a synchronization error.
  
         PEGASUS_ASSERT(errno != EBADF);          PEGASUS_ASSERT(selectErrno != EBADF);
     }     }
     else if (events)     else if (events)
     {     {
Line 548 
Line 523 
             "Monitor::run select event received events = %d, monitoring %d "             "Monitor::run select event received events = %d, monitoring %d "
                 "idle entries",                 "idle entries",
             events, _idleEntries));             events, _idleEntries));
         for (int indx = 0; indx < (int)entries.size(); indx++)          for (Uint32 indx = 0; indx < entries.size(); indx++)
         {         {
             // The Monitor should only look at entries in the table that are             // The Monitor should only look at entries in the table that are
             // IDLE (i.e., owned by the Monitor).             // IDLE (i.e., owned by the Monitor).
             if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&              if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
                 (FD_ISSET(entries[indx].socket, &fdread)))                 (FD_ISSET(entries[indx].socket, &fdread)))
             {             {
                 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);                 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
                   PEGASUS_ASSERT(q != 0);
                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,                 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                     "Monitor::run indx = %d, queueId =  %d, q = %p",                     "Monitor::run indx = %d, queueId =  %d, q = %p",
                     indx, entries[indx].queueId, q));                     indx, entries[indx].queueId, q));
                 PEGASUS_ASSERT(q !=0);  
  
                 try                 try
                 {                 {
                     if (entries[indx]._type == Monitor::CONNECTION)                      if (entries[indx].type == MonitorEntry::TYPE_CONNECTION)
                     {                     {
                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                             "entries[indx].type for indx = %d is "                              "entries[%d].type is TYPE_CONNECTION",
                                 "Monitor::CONNECTION",  
                             indx));                             indx));
                         static_cast<HTTPConnection *>(q)->_entry_index = indx;  
  
                         // Do not update the entry just yet. The entry gets  
                         // updated once the request has been read.  
                         //entries[indx]._status = _MonitorEntry::BUSY;  
   
                         // If allocate_and_awaken failure, retry on next  
                         // iteration  
 /* Removed for PEP 183.  
                         if (!MessageQueueService::get_thread_pool()->  
                                 allocate_and_awaken((void *)q, _dispatch))  
                         {  
                             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA,  
                                 Tracer::LEVEL2,  
                                 "Monitor::run: Insufficient resources to "  
                                     "process request.");  
                             entries[indx]._status = _MonitorEntry::IDLE;  
                             return true;  
                         }  
 */  
 // Added for PEP 183  
                         HTTPConnection *dst =                         HTTPConnection *dst =
                             reinterpret_cast<HTTPConnection *>(q);                             reinterpret_cast<HTTPConnection *>(q);
                           dst->_entry_index = indx;
   
                           // Update idle start time because we have received some
                           // data. Any data is good data at this point, and we'll
                           // keep the connection alive, even if we've exceeded
                           // the idleConnectionTimeout, which will be checked
                           // when we call closeConnectionOnTimeout() next.
                           Time::gettimeofday(&dst->_idleStartTime);
   
                           // Check for accept pending (ie. SSL handshake pending)
                           // or idle connection timeouts for sockets from which
                           // we received data (avoiding extra queue lookup below).
                           if (!dst->closeConnectionOnTimeout(&timeNow))
                           {
                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
                             "Monitor::_dispatch: entering run() for "                                  "Entering HTTPConnection::run() for "
                                 "indx = %d, queueId = %d, q = %p",                                 "indx = %d, queueId = %d, q = %p",
                             dst->_entry_index,                                  indx, entries[indx].queueId, q));
                             dst->_monitor->_entries[dst->_entry_index].queueId,  
                             dst));  
  
                         try                         try
                         {                         {
Line 605 
Line 571 
                         }                         }
                         catch (...)                         catch (...)
                         {                         {
                                   PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL2,
                                       "Caught exception from "
                                       "HTTPConnection::run()");
                               }
                             PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,                             PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
                                 "Monitor::_dispatch: exception received");                                  "Exited HTTPConnection::run()");
                         }                         }
                         PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,  
                             "Monitor::_dispatch: exited run() for index %d",  
                             dst->_entry_index));  
  
                         // It is possible the entry status may not be set to  
                         // busy.  The following will fail in that case.  
                         // PEGASUS_ASSERT(dst->_monitor->_entries[  
                         //     dst->_entry_index]._status.get() ==  
                         //    _MonitorEntry::BUSY);  
                         // Once the HTTPConnection thread has set the status  
                         // value to either Monitor::DYING or Monitor::IDLE,  
                         // it has returned control of the connection to the  
                         // Monitor.  It is no longer permissible to access  
                         // the connection or the entry in the _entries table.  
   
                         // The following is not relevant as the worker thread  
                         // or the reader thread will update the status of the  
                         // entry.  
                         //if (dst->_connectionClosePending)  
                         //{  
                         //  dst->_monitor->_entries[dst->_entry_index]._status =  
                         //    _MonitorEntry::DYING;  
                         //}  
                         //else  
                         //{  
                         //  dst->_monitor->_entries[dst->_entry_index]._status =  
                         //    _MonitorEntry::IDLE;  
                         //}  
 // end Added for PEP 183  
                     }  
                     else if (entries[indx]._type == Monitor::INTERNAL)  
                     {  
                         // set ourself to BUSY,  
                         // read the data  
                         // and set ourself back to IDLE  
   
                         entries[indx]._status = _MonitorEntry::BUSY;  
                         static char buffer[2];  
                         Sint32 amt =  
                             Socket::read(entries[indx].socket,&buffer, 2);  
  
                         if (amt == PEGASUS_SOCKET_ERROR &&  
                             getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)  
                         {  
                             PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,  
                                 "Monitor::run: Tickler socket got an IO error. "  
                                     "Going to re-create Socket and wait for "  
                                     "TCP/IP restart.");  
                             _tickler.uninitialize();  
                             initializeTickler();  
                         }                         }
                         else                      else if (entries[indx].type == MonitorEntry::TYPE_INTERNAL)
                         {                         {
                             entries[indx]._status = _MonitorEntry::IDLE;                          char buffer;
                         }                          Sint32 ignored =
                               Socket::read(entries[indx].socket, &buffer, 1);
                     }                     }
                     else                     else
                     {                     {
Line 674 
Line 597 
                         events |= SocketMessage::READ;                         events |= SocketMessage::READ;
                         Message* msg = new SocketMessage(                         Message* msg = new SocketMessage(
                             entries[indx].socket, events);                             entries[indx].socket, events);
                         entries[indx]._status = _MonitorEntry::BUSY;                          entries[indx].status = MonitorEntry::STATUS_BUSY;
                         _entry_mut.unlock();                          _entriesMutex.unlock();
                         q->enqueue(msg);                         q->enqueue(msg);
                         _entry_mut.lock();                          _entriesMutex.lock();
  
                         // After enqueue a message and the autoEntryMutex has                         // After enqueue a message and the autoEntryMutex has
                         // been released and locked again, the array of                         // been released and locked again, the array of
                         // entries can be changed. The ArrayIterator has be                          // entries can be changed. The ArrayIterator has to be
                         // reset with the original _entries                          // reset with the latest _entries.
                         entries.reset(_entries);                         entries.reset(_entries);
                         entries[indx]._status = _MonitorEntry::IDLE;                          entries[indx].status = MonitorEntry::STATUS_IDLE;
                     }                     }
                 }                 }
                 catch (...)                 catch (...)
                 {                 {
                 }                 }
             }             }
               // else check for accept pending (ie. SSL handshake pending) or
               // idle connection timeouts for sockets from which we did not
               // receive data.
               else if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
                   entries[indx].type == MonitorEntry::TYPE_CONNECTION)
   
               {
                   MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
                   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
                   dst->_entry_index = indx;
                   dst->closeConnectionOnTimeout(&timeNow);
               }
           }
       }
       // else if "events" is zero (ie. select timed out) then we still need
       // to check if there are any pending SSL handshakes that have timed out.
       else
       {
           for (Uint32 indx = 0; indx < entries.size(); indx++)
           {
               if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
                   entries[indx].type == MonitorEntry::TYPE_CONNECTION)
               {
                   MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
                   HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
                   dst->_entry_index = indx;
                   dst->closeConnectionOnTimeout(&timeNow);
               }
         }         }
     }     }
 } }
Line 718 
Line 669 
     SocketHandle socket,     SocketHandle socket,
     Uint32 events,     Uint32 events,
     Uint32 queueId,     Uint32 queueId,
     int type)      Uint32 type)
 { {
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
     AutoMutex autoMut(_entry_mut);      AutoMutex autoMut(_entriesMutex);
   
     // Check to see if we need to dynamically grow the _entries array     // Check to see if we need to dynamically grow the _entries array
     // We always want the _entries array to 2 bigger than the      // We always want the _entries array to be 2 bigger than the
     // current connections requested     // current connections requested
     _solicitSocketCount++;  // bump the count     _solicitSocketCount++;  // bump the count
     int size = (int)_entries.size();  
     if ((int)_solicitSocketCount >= (size-1))      for (Uint32 i = _entries.size(); i < _solicitSocketCount + 1; i++)
     {  
         for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)  
         {         {
             _MonitorEntry entry(0, 0, 0);          _entries.append(MonitorEntry());
             _entries.append(entry);  
         }  
     }     }
  
     int index;      for (Uint32 index = 1; index < _entries.size(); index++)
     for (index = 1; index < (int)_entries.size(); index++)  
     {     {
         try         try
         {         {
             if (_entries[index]._status.get() == _MonitorEntry::EMPTY)              if (_entries[index].status == MonitorEntry::STATUS_EMPTY)
             {             {
                 _entries[index].socket = socket;                 _entries[index].socket = socket;
                 _entries[index].queueId  = queueId;                 _entries[index].queueId  = queueId;
                 _entries[index]._type = type;                  _entries[index].type = type;
                 _entries[index]._status = _MonitorEntry::IDLE;                  _entries[index].status = MonitorEntry::STATUS_IDLE;
  
                 return index;                  return (int)index;
             }             }
         }         }
         catch (...)         catch (...)
Line 764 
Line 711 
 void Monitor::unsolicitSocketMessages(SocketHandle socket) void Monitor::unsolicitSocketMessages(SocketHandle socket)
 { {
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
     AutoMutex autoMut(_entry_mut);      AutoMutex autoMut(_entriesMutex);
  
     /*     /*
         Start at index = 1 because _entries[0] is the tickle entry which         Start at index = 1 because _entries[0] is the tickle entry which
         never needs to be EMPTY;          never needs to be reset to EMPTY;
     */     */
     unsigned int index;      for (Uint32 index = 1; index < _entries.size(); index++)
     for (index = 1; index < _entries.size(); index++)  
     {     {
         if (_entries[index].socket == socket)         if (_entries[index].socket == socket)
         {         {
             _entries[index]._status = _MonitorEntry::EMPTY;              _entries[index].reset();
             _entries[index].socket = PEGASUS_INVALID_SOCKET;  
             _solicitSocketCount--;             _solicitSocketCount--;
             break;             break;
         }         }
Line 789 
Line 734 
         first NON EMPTY.  This prevents the positions, of the NON EMPTY         first NON EMPTY.  This prevents the positions, of the NON EMPTY
         entries, from being changed.         entries, from being changed.
     */     */
     index = _entries.size() - 1;      for (Uint32 index = _entries.size() - 1;
     while (_entries[index]._status.get() == _MonitorEntry::EMPTY)           (_entries[index].status == MonitorEntry::STATUS_EMPTY) &&
                (index >= MAX_NUMBER_OF_MONITOR_ENTRIES);
            index--)
     {     {
         if (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)  
                 _entries.remove(index);                 _entries.remove(index);
         index--;  
     }     }
     PEG_METHOD_EXIT();  
 }  
   
 // Note: this is no longer called with PEP 183.  
 ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm)  
 {  
     HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);  
     PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,  
         "Monitor::_dispatch: entering run() for indx  = %d, queueId = %d, "  
             "q = %p",  
         dst->_entry_index,  
         dst->_monitor->_entries[dst->_entry_index].queueId,  
         dst));  
  
     try      PEG_METHOD_EXIT();
     {  
         dst->run(1);  
     }  
     catch (...)  
     {  
         PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,  
             "Monitor::_dispatch: exception received");  
     }  
     PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,  
         "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));  
   
     PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==  
         _MonitorEntry::BUSY);  
   
     // Once the HTTPConnection thread has set the status value to either  
     // Monitor::DYING or Monitor::IDLE, it has returned control of the  
     // connection to the Monitor.  It is no longer permissible to access the  
     // connection or the entry in the _entries table.  
     if (dst->_connectionClosePending)  
     {  
         dst->_monitor->_entries[dst->_entry_index]._status =  
             _MonitorEntry::DYING;  
     }  
     else  
     {  
         dst->_monitor->_entries[dst->_entry_index]._status =  
             _MonitorEntry::IDLE;  
     }  
     return 0;  
 } }
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.122  
changed lines
  Added in v.1.130

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2