(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.98 and 1.100

version 1.98, 2005/11/22 20:10:30 version 1.100, 2005/12/02 18:25:52
Line 48 
Line 48 
 #include <Pegasus/Common/HTTPConnection.h> #include <Pegasus/Common/HTTPConnection.h>
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
   #include "ArrayIterator.h"
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
Line 354 
Line 355 
  
     AutoMutex autoEntryMutex(_entry_mut);     AutoMutex autoEntryMutex(_entry_mut);
  
       ArrayIterator<_MonitorEntry> entries(_entries);
   
     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
     if (_stopConnections.get() == 1)     if (_stopConnections.get() == 1)
     {     {
         for ( int indx = 0; indx < (int)_entries.size(); indx++)          for ( int indx = 0; indx < (int)entries.size(); indx++)
         {         {
             if (_entries[indx]._type == Monitor::ACCEPTOR)              if (entries[indx]._type == Monitor::ACCEPTOR)
             {             {
                 if ( _entries[indx]._status.get() != _MonitorEntry::EMPTY)                  if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
                 {                 {
                    if ( _entries[indx]._status.get() == _MonitorEntry::IDLE ||                     if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
                         _entries[indx]._status.get() == _MonitorEntry::DYING )                          entries[indx]._status.get() == _MonitorEntry::DYING )
                    {                    {
                        // remove the entry                        // remove the entry
                        _entries[indx]._status = _MonitorEntry::EMPTY;                         entries[indx]._status = _MonitorEntry::EMPTY;
                    }                    }
                    else                    else
                    {                    {
                        // set status to DYING                        // set status to DYING
                       _entries[indx]._status = _MonitorEntry::DYING;                        entries[indx]._status = _MonitorEntry::DYING;
                    }                    }
                }                }
            }            }
Line 381 
Line 384 
         _stopConnectionsSem.signal();         _stopConnectionsSem.signal();
     }     }
  
     for( int indx = 0; indx < (int)_entries.size(); indx++)      for( int indx = 0; indx < (int)entries.size(); indx++)
     {     {
                          const _MonitorEntry &entry = _entries[indx];                           const _MonitorEntry &entry = entries[indx];
        if ((entry._status.get() == _MonitorEntry::DYING) &&        if ((entry._status.get() == _MonitorEntry::DYING) &&
                                          (entry._type == Monitor::CONNECTION))                                          (entry._type == Monitor::CONNECTION))
        {        {
Line 420 
Line 423 
           // Once HTTPAcceptor completes processing of the close           // Once HTTPAcceptor completes processing of the close
           // connection, the lock is re-requested and processing of           // connection, the lock is re-requested and processing of
           // the for loop continues.  This is safe with the current           // the for loop continues.  This is safe with the current
           // implementation of the _entries object.  Note that the            // implementation of the entries object.  Note that the
           // loop condition accesses the _entries.size() on each            // loop condition accesses the entries.size() on each
           // iteration, so that a change in size while the mutex is           // iteration, so that a change in size while the mutex is
           // unlocked will not result in an ArrayIndexOutOfBounds           // unlocked will not result in an ArrayIndexOutOfBounds
           // exception.           // exception.
Line 441 
Line 444 
         because we have to traverse the entire array.         because we have to traverse the entire array.
     */     */
     PEGASUS_SOCKET maxSocketCurrentPass = 0;     PEGASUS_SOCKET maxSocketCurrentPass = 0;
     for( int indx = 0; indx < (int)_entries.size(); indx++)      for( int indx = 0; indx < (int)entries.size(); indx++)
     {     {
        if(maxSocketCurrentPass < _entries[indx].socket)         if(maxSocketCurrentPass < entries[indx].socket)
             maxSocketCurrentPass = _entries[indx].socket;              maxSocketCurrentPass = entries[indx].socket;
  
        if(_entries[indx]._status.get() == _MonitorEntry::IDLE)         if(entries[indx]._status.get() == _MonitorEntry::IDLE)
        {        {
            _idleEntries++;            _idleEntries++;
            FD_SET(_entries[indx].socket, &fdread);             FD_SET(entries[indx].socket, &fdread);
        }        }
     }     }
  
Line 483 
Line 486 
           "Monitor::run - errorno = %d has occurred on select.", errno);           "Monitor::run - errorno = %d has occurred on select.", errno);
        // The EBADF error indicates that one or more or the file        // The EBADF error indicates that one or more or the file
        // descriptions was not valid. This could indicate that        // descriptions was not valid. This could indicate that
        // the _entries structure has been corrupted or that         // the entries structure has been corrupted or that
        // we have a synchronization error.        // we have a synchronization error.
  
        PEGASUS_ASSERT(errno != EBADF);        PEGASUS_ASSERT(errno != EBADF);
Line 493 
Line 496 
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run select event received events = %d, monitoring %d idle entries",           "Monitor::run select event received events = %d, monitoring %d idle entries",
            events, _idleEntries);            events, _idleEntries);
        for( int indx = 0; indx < (int)_entries.size(); indx++)         for( int indx = 0; indx < (int)entries.size(); indx++)
        {        {
           // The Monitor should only look at entries in the table that are IDLE (i.e.,           // The Monitor should only look at entries in the table that are IDLE (i.e.,
           // owned by the Monitor).           // owned by the Monitor).
           if((_entries[indx]._status.get() == _MonitorEntry::IDLE) &&            if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
              (FD_ISSET(_entries[indx].socket, &fdread)))               (FD_ISSET(entries[indx].socket, &fdread)))
           {           {
              MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);               MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                   "Monitor::run indx = %d, queueId =  %d, q = %p",                   "Monitor::run indx = %d, queueId =  %d, q = %p",
                   indx, _entries[indx].queueId, q);                    indx, entries[indx].queueId, q);
              PEGASUS_ASSERT(q !=0);              PEGASUS_ASSERT(q !=0);
  
              try              try
              {              {
                 if(_entries[indx]._type == Monitor::CONNECTION)                  if(entries[indx]._type == Monitor::CONNECTION)
                 {                 {
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                      "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);                       "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
                    static_cast<HTTPConnection *>(q)->_entry_index = indx;                    static_cast<HTTPConnection *>(q)->_entry_index = indx;
  
                    // Do not update the entry just yet. The entry gets updated once                    // Do not update the entry just yet. The entry gets updated once
                    // the request has been read.                    // the request has been read.
                    //_entries[indx]._status = _MonitorEntry::BUSY;                     //entries[indx]._status = _MonitorEntry::BUSY;
  
                    // If allocate_and_awaken failure, retry on next iteration                    // If allocate_and_awaken failure, retry on next iteration
 /* Removed for PEP 183. /* Removed for PEP 183.
Line 525 
Line 528 
                    {                    {
                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                           "Monitor::run: Insufficient resources to process request.");                           "Monitor::run: Insufficient resources to process request.");
                       _entries[indx]._status = _MonitorEntry::IDLE;                        entries[indx]._status = _MonitorEntry::IDLE;
                       return true;                       return true;
                    }                    }
 */ */
Line 566 
Line 569 
                    //}                    //}
 // end Added for PEP 183 // end Added for PEP 183
                 }                 }
                 else if( _entries[indx]._type == Monitor::INTERNAL){                  else if( entries[indx]._type == Monitor::INTERNAL){
                         // set ourself to BUSY,                         // set ourself to BUSY,
                         // read the data                         // read the data
                         // and set ourself back to IDLE                         // and set ourself back to IDLE
  
                         _entries[indx]._status.get() == _MonitorEntry::BUSY;                          entries[indx]._status = _MonitorEntry::BUSY;
                         static char buffer[2];                         static char buffer[2];
                         Socket::disableBlocking(_entries[indx].socket);                          Socket::disableBlocking(entries[indx].socket);
                         Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);                          Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
                         Socket::enableBlocking(_entries[indx].socket);                          Socket::enableBlocking(entries[indx].socket);
                         _entries[indx]._status.get() == _MonitorEntry::IDLE;                          entries[indx]._status = _MonitorEntry::IDLE;
                 }                 }
                 else                 else
                 {                 {
Line 584 
Line 587 
                      "Non-connection entry, indx = %d, has been received.", indx);                      "Non-connection entry, indx = %d, has been received.", indx);
                    int events = 0;                    int events = 0;
                    events |= SocketMessage::READ;                    events |= SocketMessage::READ;
                    Message *msg = new SocketMessage(_entries[indx].socket, events);                     Message *msg = new SocketMessage(entries[indx].socket, events);
                    _entries[indx]._status = _MonitorEntry::BUSY;                     entries[indx]._status = _MonitorEntry::BUSY;
                    autoEntryMutex.unlock();                    autoEntryMutex.unlock();
                    q->enqueue(msg);                    q->enqueue(msg);
                    autoEntryMutex.lock();                    autoEntryMutex.lock();
                    _entries[indx]._status = _MonitorEntry::IDLE;                     entries[indx]._status = _MonitorEntry::IDLE;
  
                    return true;                    return true;
                 }                 }


Legend:
Removed from v.1.98  
changed lines
  Added in v.1.100

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2