(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.90 and 1.100

version 1.90, 2005/02/05 22:59:23 version 1.100, 2005/12/02 18:25:52
Line 48 
Line 48 
 #include <Pegasus/Common/HTTPConnection.h> #include <Pegasus/Common/HTTPConnection.h>
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
   #include "ArrayIterator.h"
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \ #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 of <winsock.h>. It may have been indirectly included (e.g., by including \ of <winsock.h>. It may have been indirectly included (e.g., by including \
 <windows.h>). Finthe inclusion of that header which is visible to this \  <windows.h>). Find inclusion of that header which is visible to this \
 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \ compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 otherwise, less than 64 clients (the default) will be able to connect to the \ otherwise, less than 64 clients (the default) will be able to connect to the \
 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM." CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
Line 74 
Line 75 
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 // Define a platform-neutral socket length type  static AtomicInt _connections(0);
 #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM)  
 typedef size_t PEGASUS_SOCKLEN_T;  
 #elif defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) || defined(PEGASUS_OS_LINUX) || (defined(PEGASUS_OS_SOLARIS) && !defined(SUNOS_5_6))  
 typedef socklen_t PEGASUS_SOCKLEN_T;  
 #else  
 typedef int PEGASUS_SOCKLEN_T;  
 #endif  
   
 static AtomicInt _connections = 0;  
   
 static struct timeval create_time = {0, 1};  
 static struct timeval destroy_time = {300, 0};  
 static struct timeval deadlock_time = {0, 0};  
  
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
Line 160 
Line 148 
     /* setup the tickle server/listener */     /* setup the tickle server/listener */
  
     // get a socket for the server side     // get a socket for the server side
     if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){      if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
         //handle error         //handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",         MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
                                  "Received error number $0 while creating the internal socket.",                                  "Received error number $0 while creating the internal socket.",
Line 174 
Line 162 
  
     // initialize the address     // initialize the address
     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
 #ifdef PEGASUS_OS_ZOS  
     _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(37) #pragma convert(37)
 #endif #endif
Line 184 
Line 169 
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(0) #pragma convert(0)
 #endif #endif
 #endif  
     _tickle_server_addr.sin_family = PF_INET;     _tickle_server_addr.sin_family = PF_INET;
     _tickle_server_addr.sin_port = 0;     _tickle_server_addr.sin_port = 0;
  
Line 242 
Line 226 
     /* set up the tickle client/connector */     /* set up the tickle client/connector */
  
     // get a socket for our tickle client     // get a socket for our tickle client
     if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){      if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
                          "Received error number $0 while creating the internal client socket.",                          "Received error number $0 while creating the internal client socket.",
Line 256 
Line 240 
  
     // setup the address of the client     // setup the address of the client
     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
 #ifdef PEGASUS_OS_ZOS  
     _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(37) #pragma convert(37)
 #endif #endif
Line 266 
Line 247 
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(0) #pragma convert(0)
 #endif #endif
 #endif  
     _tickle_client_addr.sin_family = PF_INET;     _tickle_client_addr.sin_family = PF_INET;
     _tickle_client_addr.sin_port = 0;     _tickle_client_addr.sin_port = 0;
  
Line 373 
Line 353 
     fd_set fdread;     fd_set fdread;
     FD_ZERO(&fdread);     FD_ZERO(&fdread);
  
     _entry_mut.lock(pegasus_thread_self());      AutoMutex autoEntryMutex(_entry_mut);
   
       ArrayIterator<_MonitorEntry> entries(_entries);
  
     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
     if (_stopConnections == 1)      if (_stopConnections.get() == 1)
     {     {
         for ( int indx = 0; indx < (int)_entries.size(); indx++)          for ( int indx = 0; indx < (int)entries.size(); indx++)
         {         {
             if (_entries[indx]._type == Monitor::ACCEPTOR)              if (entries[indx]._type == Monitor::ACCEPTOR)
             {             {
                 if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)                  if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
                 {                 {
                    if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||                     if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
                         _entries[indx]._status.value() == _MonitorEntry::DYING )                          entries[indx]._status.get() == _MonitorEntry::DYING )
                    {                    {
                        // remove the entry                        // remove the entry
                        _entries[indx]._status = _MonitorEntry::EMPTY;                         entries[indx]._status = _MonitorEntry::EMPTY;
                    }                    }
                    else                    else
                    {                    {
                        // set status to DYING                        // set status to DYING
                       _entries[indx]._status = _MonitorEntry::DYING;                        entries[indx]._status = _MonitorEntry::DYING;
                    }                    }
                }                }
            }            }
Line 402 
Line 384 
         _stopConnectionsSem.signal();         _stopConnectionsSem.signal();
     }     }
  
     for( int indx = 0; indx < (int)_entries.size(); indx++)      for( int indx = 0; indx < (int)entries.size(); indx++)
     {     {
                          const _MonitorEntry &entry = _entries[indx];                           const _MonitorEntry &entry = entries[indx];
        if ((entry._status.value() == _MonitorEntry::DYING) &&         if ((entry._status.get() == _MonitorEntry::DYING) &&
                                          (entry._type == Monitor::CONNECTION))                                          (entry._type == Monitor::CONNECTION))
        {        {
           MessageQueue *q = MessageQueue::lookup(entry.queueId);           MessageQueue *q = MessageQueue::lookup(entry.queueId);
Line 441 
Line 423 
           // Once HTTPAcceptor completes processing of the close           // Once HTTPAcceptor completes processing of the close
           // connection, the lock is re-requested and processing of           // connection, the lock is re-requested and processing of
           // the for loop continues.  This is safe with the current           // the for loop continues.  This is safe with the current
           // implementation of the _entries object.  Note that the            // implementation of the entries object.  Note that the
           // loop condition accesses the _entries.size() on each            // loop condition accesses the entries.size() on each
           // iteration, so that a change in size while the mutex is           // iteration, so that a change in size while the mutex is
           // unlocked will not result in an ArrayIndexOutOfBounds           // unlocked will not result in an ArrayIndexOutOfBounds
           // exception.           // exception.
  
           _entry_mut.unlock();            autoEntryMutex.unlock();
           o.enqueue(message);           o.enqueue(message);
           _entry_mut.lock(pegasus_thread_self());            autoEntryMutex.lock();
        }        }
     }     }
  
Line 461 
Line 443 
         place to calculate the max file descriptor (maximum socket number)         place to calculate the max file descriptor (maximum socket number)
         because we have to traverse the entire array.         because we have to traverse the entire array.
     */     */
     int maxSocketCurrentPass = 0;      PEGASUS_SOCKET maxSocketCurrentPass = 0;
     for( int indx = 0; indx < (int)_entries.size(); indx++)      for( int indx = 0; indx < (int)entries.size(); indx++)
     {     {
        if(maxSocketCurrentPass < _entries[indx].socket)         if(maxSocketCurrentPass < entries[indx].socket)
           maxSocketCurrentPass = _entries[indx].socket;              maxSocketCurrentPass = entries[indx].socket;
  
        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)         if(entries[indx]._status.get() == _MonitorEntry::IDLE)
        {        {
           _idleEntries++;           _idleEntries++;
           FD_SET(_entries[indx].socket, &fdread);             FD_SET(entries[indx].socket, &fdread);
        }        }
     }     }
  
Line 480 
Line 462 
     */     */
     maxSocketCurrentPass++;     maxSocketCurrentPass++;
  
     _entry_mut.unlock();      autoEntryMutex.unlock();
   
       //
       // The first argument to select() is ignored on Windows and it is not
       // a socket value.  The original code assumed that the number of sockets
       // and a socket value have the same type.  On Windows they do not.
       //
   #ifdef PEGASUS_OS_TYPE_WINDOWS
       int events = select(0, &fdread, NULL, NULL, &tv);
   #else
     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
    _entry_mut.lock(pegasus_thread_self());  #endif
       autoEntryMutex.lock();
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
     if(events == SOCKET_ERROR)     if(events == SOCKET_ERROR)
Line 494 
Line 486 
           "Monitor::run - errorno = %d has occurred on select.", errno);           "Monitor::run - errorno = %d has occurred on select.", errno);
        // The EBADF error indicates that one or more or the file        // The EBADF error indicates that one or more or the file
        // descriptions was not valid. This could indicate that        // descriptions was not valid. This could indicate that
        // the _entries structure has been corrupted or that         // the entries structure has been corrupted or that
        // we have a synchronization error.        // we have a synchronization error.
  
        PEGASUS_ASSERT(errno != EBADF);        PEGASUS_ASSERT(errno != EBADF);
Line 504 
Line 496 
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run select event received events = %d, monitoring %d idle entries",           "Monitor::run select event received events = %d, monitoring %d idle entries",
            events, _idleEntries);            events, _idleEntries);
        for( int indx = 0; indx < (int)_entries.size(); indx++)         for( int indx = 0; indx < (int)entries.size(); indx++)
        {        {
           // The Monitor should only look at entries in the table that are IDLE (i.e.,           // The Monitor should only look at entries in the table that are IDLE (i.e.,
           // owned by the Monitor).           // owned by the Monitor).
           if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&            if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
              (FD_ISSET(_entries[indx].socket, &fdread)))               (FD_ISSET(entries[indx].socket, &fdread)))
           {           {
              MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);               MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                   "Monitor::run indx = %d, queueId =  %d, q = %p",                   "Monitor::run indx = %d, queueId =  %d, q = %p",
                   indx, _entries[indx].queueId, q);                    indx, entries[indx].queueId, q);
              PEGASUS_ASSERT(q !=0);              PEGASUS_ASSERT(q !=0);
  
              try              try
              {              {
                 if(_entries[indx]._type == Monitor::CONNECTION)                  if(entries[indx]._type == Monitor::CONNECTION)
                 {                 {
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                      "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);                       "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
                    static_cast<HTTPConnection *>(q)->_entry_index = indx;                    static_cast<HTTPConnection *>(q)->_entry_index = indx;
  
                    // Do not update the entry just yet. The entry gets updated once                    // Do not update the entry just yet. The entry gets updated once
                    // the request has been read.                    // the request has been read.
                    //_entries[indx]._status = _MonitorEntry::BUSY;                     //entries[indx]._status = _MonitorEntry::BUSY;
  
                    // If allocate_and_awaken failure, retry on next iteration                    // If allocate_and_awaken failure, retry on next iteration
 /* Removed for PEP 183. /* Removed for PEP 183.
Line 536 
Line 528 
                    {                    {
                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                           "Monitor::run: Insufficient resources to process request.");                           "Monitor::run: Insufficient resources to process request.");
                       _entries[indx]._status = _MonitorEntry::IDLE;                        entries[indx]._status = _MonitorEntry::IDLE;
                       _entry_mut.unlock();  
                       return true;                       return true;
                    }                    }
 */ */
Line 560 
Line 551 
  
                    // It is possible the entry status may not be set to busy.                    // It is possible the entry status may not be set to busy.
                    // The following will fail in that case.                    // The following will fail in that case.
                    // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);                     // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
                    // Once the HTTPConnection thread has set the status value to either                    // Once the HTTPConnection thread has set the status value to either
                    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection                    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
                    // to the Monitor.  It is no longer permissible to access the connection                    // to the Monitor.  It is no longer permissible to access the connection
Line 578 
Line 569 
                    //}                    //}
 // end Added for PEP 183 // end Added for PEP 183
                 }                 }
                 else if( _entries[indx]._type == Monitor::INTERNAL){                  else if( entries[indx]._type == Monitor::INTERNAL){
                         // set ourself to BUSY,                         // set ourself to BUSY,
                         // read the data                         // read the data
                         // and set ourself back to IDLE                         // and set ourself back to IDLE
  
                         _entries[indx]._status == _MonitorEntry::BUSY;                          entries[indx]._status = _MonitorEntry::BUSY;
                         static char buffer[2];                         static char buffer[2];
                         Socket::disableBlocking(_entries[indx].socket);                          Socket::disableBlocking(entries[indx].socket);
                         Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);                          Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
                         Socket::enableBlocking(_entries[indx].socket);                          Socket::enableBlocking(entries[indx].socket);
                         _entries[indx]._status == _MonitorEntry::IDLE;                          entries[indx]._status = _MonitorEntry::IDLE;
                 }                 }
                 else                 else
                 {                 {
Line 596 
Line 587 
                      "Non-connection entry, indx = %d, has been received.", indx);                      "Non-connection entry, indx = %d, has been received.", indx);
                    int events = 0;                    int events = 0;
                    events |= SocketMessage::READ;                    events |= SocketMessage::READ;
                    Message *msg = new SocketMessage(_entries[indx].socket, events);                     Message *msg = new SocketMessage(entries[indx].socket, events);
                    _entries[indx]._status = _MonitorEntry::BUSY;                     entries[indx]._status = _MonitorEntry::BUSY;
                    _entry_mut.unlock();                     autoEntryMutex.unlock();
   
                    q->enqueue(msg);                    q->enqueue(msg);
                    _entries[indx]._status = _MonitorEntry::IDLE;                     autoEntryMutex.lock();
                      entries[indx]._status = _MonitorEntry::IDLE;
   
                    return true;                    return true;
                 }                 }
              }              }
Line 612 
Line 604 
           }           }
        }        }
     }     }
     _entry_mut.unlock();  
     return(handled_events);     return(handled_events);
 } }
  
Line 644 
Line 636 
  
  
 int  Monitor::solicitSocketMessages( int  Monitor::solicitSocketMessages(
     Sint32 socket,      PEGASUS_SOCKET socket,
     Uint32 events,     Uint32 events,
     Uint32 queueId,     Uint32 queueId,
     int type)     int type)
Line 668 
Line 660 
    {    {
       try       try
       {       {
          if(_entries[index]._status.value() == _MonitorEntry::EMPTY)           if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
          {          {
             _entries[index].socket = socket;             _entries[index].socket = socket;
             _entries[index].queueId  = queueId;             _entries[index].queueId  = queueId;
Line 688 
Line 680 
  
 } }
  
 void Monitor::unsolicitSocketMessages(Sint32 socket)  void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
 { {
  
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
Line 704 
Line 696 
        if(_entries[index].socket == socket)        if(_entries[index].socket == socket)
        {        {
           _entries[index]._status = _MonitorEntry::EMPTY;           _entries[index]._status = _MonitorEntry::EMPTY;
           _entries[index].socket = -1;            _entries[index].socket = PEGASUS_INVALID_SOCKET;
           _solicitSocketCount--;           _solicitSocketCount--;
           break;           break;
        }        }
Line 717 
Line 709 
         This prevents the positions, of the NON EMPTY entries, from being changed.         This prevents the positions, of the NON EMPTY entries, from being changed.
     */     */
     index = _entries.size() - 1;     index = _entries.size() - 1;
     while(_entries[index]._status == _MonitorEntry::EMPTY){      while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
                 _entries.remove(index);                 _entries.remove(index);
         index--;         index--;
     }     }
   
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 } }
  
Line 745 
Line 736 
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
  
    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);     PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
  
    // Once the HTTPConnection thread has set the status value to either    // Once the HTTPConnection thread has set the status value to either
    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection


Legend:
Removed from v.1.90  
changed lines
  Added in v.1.100

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2