(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.85 and 1.100

version 1.85, 2005/01/27 18:57:28 version 1.100, 2005/12/02 18:25:52
Line 1 
Line 1 
 //%2004////////////////////////////////////////////////////////////////////////  //%2005////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems. // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
Line 6 
Line 6 
 // IBM Corp.; EMC Corporation, The Open Group. // IBM Corp.; EMC Corporation, The Open Group.
 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.; // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group. // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; VERITAS Software Corporation; The Open Group.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 46 
Line 48 
 #include <Pegasus/Common/HTTPConnection.h> #include <Pegasus/Common/HTTPConnection.h>
 #include <Pegasus/Common/MessageQueueService.h> #include <Pegasus/Common/MessageQueueService.h>
 #include <Pegasus/Common/Exception.h> #include <Pegasus/Common/Exception.h>
   #include "ArrayIterator.h"
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
 #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \ #  error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
 of <winsock.h>. It may have been indirectly included (e.g., by including \ of <winsock.h>. It may have been indirectly included (e.g., by including \
 <windows.h>). Finthe inclusion of that header which is visible to this \  <windows.h>). Find inclusion of that header which is visible to this \
 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \ compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
 otherwise, less than 64 clients (the default) will be able to connect to the \ otherwise, less than 64 clients (the default) will be able to connect to the \
 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM." CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
Line 73 
Line 75 
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
   static AtomicInt _connections(0);
 static AtomicInt _connections = 0;  
   
 static struct timeval create_time = {0, 1};  
 static struct timeval destroy_time = {300, 0};  
 static struct timeval deadlock_time = {0, 0};  
   
 ////////////////////////////////////////////////////////////////////////////////  
 //  
 // MonitorRep  
 //  
 ////////////////////////////////////////////////////////////////////////////////  
   
 struct MonitorRep  
 {  
     fd_set rd_fd_set;  
     fd_set wr_fd_set;  
     fd_set ex_fd_set;  
     fd_set active_rd_fd_set;  
     fd_set active_wr_fd_set;  
     fd_set active_ex_fd_set;  
 };  
  
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
Line 104 
Line 85 
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 Monitor::Monitor() Monitor::Monitor()
    : _module_handle(0),     : _stopConnections(0),
      _controller(0),  
      _async(false),  
      _stopConnections(0),  
      _stopConnectionsSem(0),      _stopConnectionsSem(0),
      _solicitSocketCount(0),      _solicitSocketCount(0),
      _tickle_client_socket(-1),      _tickle_client_socket(-1),
Line 116 
Line 94 
 { {
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
     Socket::initializeInterface();     Socket::initializeInterface();
     _rep = 0;  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);  
   
     // setup the tickler  
     initializeTickler();  
   
     // Start the count at 1 because initilizeTickler()  
     // has added an entry in the first position of the  
     // _entries array  
     for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )  
     {  
        _MonitorEntry entry(0, 0, 0);  
        _entries.append(entry);  
     }  
 }  
   
 Monitor::Monitor(Boolean async)  
    : _module_handle(0),  
      _controller(0),  
      _async(async),  
      _stopConnections(0),  
      _stopConnectionsSem(0),  
      _solicitSocketCount(0),  
      _tickle_client_socket(-1),  
      _tickle_server_socket(-1),  
      _tickle_peer_socket(-1)  
 {  
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;  
     Socket::initializeInterface();  
     _rep = 0;  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
  
     // setup the tickler     // setup the tickler
Line 163 
Line 111 
  
 Monitor::~Monitor() Monitor::~Monitor()
 { {
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                   "deregistering with module controller");  
   
     if(_module_handle.get() != NULL)  
     {  
        _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);  
        _controller.reset();  
        _module_handle.reset();  
     }  
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");  
   
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
  
     try{     try{
Line 211 
Line 148 
     /* setup the tickle server/listener */     /* setup the tickle server/listener */
  
     // get a socket for the server side     // get a socket for the server side
     if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){      if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
         //handle error         //handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",         MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
                                  "Received error number $0 while creating the internal socket.",                                  "Received error number $0 while creating the internal socket.",
Line 225 
Line 162 
  
     // initialize the address     // initialize the address
     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));     memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
 #ifdef PEGASUS_OS_ZOS  
     _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(37) #pragma convert(37)
 #endif #endif
Line 235 
Line 169 
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(0) #pragma convert(0)
 #endif #endif
 #endif  
     _tickle_server_addr.sin_family = PF_INET;     _tickle_server_addr.sin_family = PF_INET;
     _tickle_server_addr.sin_port = 0;     _tickle_server_addr.sin_port = 0;
  
     PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_server_addr);      PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
  
     // bind server side to socket     // bind server side to socket
     if((::bind(_tickle_server_socket,     if((::bind(_tickle_server_socket,
                (struct sockaddr *)&_tickle_server_addr,                 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                sizeof(_tickle_server_addr))) < 0){                sizeof(_tickle_server_addr))) < 0){
         // handle error         // handle error
 #ifdef PEGASUS_OS_ZOS #ifdef PEGASUS_OS_ZOS
Line 276 
Line 209 
  
     // make sure we have the correct socket for our server     // make sure we have the correct socket for our server
     int sock = ::getsockname(_tickle_server_socket,     int sock = ::getsockname(_tickle_server_socket,
                              (struct sockaddr*)&_tickle_server_addr,                     reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                              &_addr_size);                              &_addr_size);
     if(sock < 0){     if(sock < 0){
         // handle error         // handle error
Line 293 
Line 226 
     /* set up the tickle client/connector */     /* set up the tickle client/connector */
  
     // get a socket for our tickle client     // get a socket for our tickle client
     if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){      if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
                          "Received error number $0 while creating the internal client socket.",                          "Received error number $0 while creating the internal client socket.",
Line 307 
Line 240 
  
     // setup the address of the client     // setup the address of the client
     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));     memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
 #ifdef PEGASUS_OS_ZOS  
     _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");  
 #else  
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(37) #pragma convert(37)
 #endif #endif
Line 317 
Line 247 
 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
 #pragma convert(0) #pragma convert(0)
 #endif #endif
 #endif  
     _tickle_client_addr.sin_family = PF_INET;     _tickle_client_addr.sin_family = PF_INET;
     _tickle_client_addr.sin_port = 0;     _tickle_client_addr.sin_port = 0;
  
     // bind socket to client side     // bind socket to client side
     if((::bind(_tickle_client_socket,     if((::bind(_tickle_client_socket,
                (struct sockaddr*)&_tickle_client_addr,                 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
                sizeof(_tickle_client_addr))) < 0){                sizeof(_tickle_client_addr))) < 0){
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
Line 338 
Line 267 
  
     // connect to server side     // connect to server side
     if((::connect(_tickle_client_socket,     if((::connect(_tickle_client_socket,
                   (struct sockaddr*)&_tickle_server_addr,                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                   sizeof(_tickle_server_addr))) < 0){                   sizeof(_tickle_server_addr))) < 0){
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
Line 353 
Line 282 
  
     /* set up the slave connection */     /* set up the slave connection */
     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));     memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
     PEGASUS_SOCKLEN_SIZE peer_size = sizeof(_tickle_peer_addr);      PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
     pegasus_sleep(1);     pegasus_sleep(1);
  
     // this call may fail, we will try a max of 20 times to establish this peer connection     // this call may fail, we will try a max of 20 times to establish this peer connection
     if((_tickle_peer_socket = ::accept(_tickle_server_socket,     if((_tickle_peer_socket = ::accept(_tickle_server_socket,
                                        (struct sockaddr*)&_tickle_peer_addr,              reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
                                        &peer_size)) < 0){                                        &peer_size)) < 0){
 #if !defined(PEGASUS_OS_TYPE_WINDOWS) #if !defined(PEGASUS_OS_TYPE_WINDOWS)
         // Only retry on non-windows platforms.         // Only retry on non-windows platforms.
Line 369 
Line 298 
           {           {
             pegasus_sleep(1);             pegasus_sleep(1);
             _tickle_peer_socket = ::accept(_tickle_server_socket,             _tickle_peer_socket = ::accept(_tickle_server_socket,
                                            (struct sockaddr*)&_tickle_peer_addr,                  reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
                                            &peer_size);                                            &peer_size);
             retries++;             retries++;
           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
Line 424 
Line 353 
     fd_set fdread;     fd_set fdread;
     FD_ZERO(&fdread);     FD_ZERO(&fdread);
  
     _entry_mut.lock(pegasus_thread_self());      AutoMutex autoEntryMutex(_entry_mut);
   
       ArrayIterator<_MonitorEntry> entries(_entries);
  
     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
     if (_stopConnections == 1)      if (_stopConnections.get() == 1)
     {     {
         for ( int indx = 0; indx < (int)_entries.size(); indx++)          for ( int indx = 0; indx < (int)entries.size(); indx++)
         {         {
             if (_entries[indx]._type == Monitor::ACCEPTOR)              if (entries[indx]._type == Monitor::ACCEPTOR)
             {             {
                 if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)                  if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
                 {                 {
                    if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||                     if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
                         _entries[indx]._status.value() == _MonitorEntry::DYING )                          entries[indx]._status.get() == _MonitorEntry::DYING )
                    {                    {
                        // remove the entry                        // remove the entry
                        _entries[indx]._status = _MonitorEntry::EMPTY;                         entries[indx]._status = _MonitorEntry::EMPTY;
                    }                    }
                    else                    else
                    {                    {
                        // set status to DYING                        // set status to DYING
                       _entries[indx]._status = _MonitorEntry::DYING;                        entries[indx]._status = _MonitorEntry::DYING;
                    }                    }
                }                }
            }            }
Line 453 
Line 384 
         _stopConnectionsSem.signal();         _stopConnectionsSem.signal();
     }     }
  
     for( int indx = 0; indx < (int)_entries.size(); indx++)      for( int indx = 0; indx < (int)entries.size(); indx++)
     {     {
                          const _MonitorEntry &entry = _entries[indx];                           const _MonitorEntry &entry = entries[indx];
        if ((entry._status.value() == _MonitorEntry::DYING) &&         if ((entry._status.get() == _MonitorEntry::DYING) &&
                                          (entry._type == Monitor::CONNECTION))                                          (entry._type == Monitor::CONNECTION))
        {        {
           MessageQueue *q = MessageQueue::lookup(entry.queueId);           MessageQueue *q = MessageQueue::lookup(entry.queueId);
Line 492 
Line 423 
           // Once HTTPAcceptor completes processing of the close           // Once HTTPAcceptor completes processing of the close
           // connection, the lock is re-requested and processing of           // connection, the lock is re-requested and processing of
           // the for loop continues.  This is safe with the current           // the for loop continues.  This is safe with the current
           // implementation of the _entries object.  Note that the            // implementation of the entries object.  Note that the
           // loop condition accesses the _entries.size() on each            // loop condition accesses the entries.size() on each
           // iteration, so that a change in size while the mutex is           // iteration, so that a change in size while the mutex is
           // unlocked will not result in an ArrayIndexOutOfBounds           // unlocked will not result in an ArrayIndexOutOfBounds
           // exception.           // exception.
  
           _entry_mut.unlock();            autoEntryMutex.unlock();
           o.enqueue(message);           o.enqueue(message);
           _entry_mut.lock(pegasus_thread_self());            autoEntryMutex.lock();
        }        }
     }     }
  
Line 512 
Line 443 
         place to calculate the max file descriptor (maximum socket number)         place to calculate the max file descriptor (maximum socket number)
         because we have to traverse the entire array.         because we have to traverse the entire array.
     */     */
     int maxSocketCurrentPass = 0;      PEGASUS_SOCKET maxSocketCurrentPass = 0;
     for( int indx = 0; indx < (int)_entries.size(); indx++)      for( int indx = 0; indx < (int)entries.size(); indx++)
     {     {
        if(maxSocketCurrentPass < _entries[indx].socket)         if(maxSocketCurrentPass < entries[indx].socket)
           maxSocketCurrentPass = _entries[indx].socket;              maxSocketCurrentPass = entries[indx].socket;
  
        if(_entries[indx]._status.value() == _MonitorEntry::IDLE)         if(entries[indx]._status.get() == _MonitorEntry::IDLE)
        {        {
           _idleEntries++;           _idleEntries++;
           FD_SET(_entries[indx].socket, &fdread);             FD_SET(entries[indx].socket, &fdread);
        }        }
     }     }
  
Line 531 
Line 462 
     */     */
     maxSocketCurrentPass++;     maxSocketCurrentPass++;
  
     _entry_mut.unlock();      autoEntryMutex.unlock();
   
       //
       // The first argument to select() is ignored on Windows and it is not
       // a socket value.  The original code assumed that the number of sockets
       // and a socket value have the same type.  On Windows they do not.
       //
   #ifdef PEGASUS_OS_TYPE_WINDOWS
       int events = select(0, &fdread, NULL, NULL, &tv);
   #else
     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
    _entry_mut.lock(pegasus_thread_self());  #endif
       autoEntryMutex.lock();
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
     if(events == SOCKET_ERROR)     if(events == SOCKET_ERROR)
Line 545 
Line 486 
           "Monitor::run - errorno = %d has occurred on select.", errno);           "Monitor::run - errorno = %d has occurred on select.", errno);
        // The EBADF error indicates that one or more or the file        // The EBADF error indicates that one or more or the file
        // descriptions was not valid. This could indicate that        // descriptions was not valid. This could indicate that
        // the _entries structure has been corrupted or that         // the entries structure has been corrupted or that
        // we have a synchronization error.        // we have a synchronization error.
  
        PEGASUS_ASSERT(errno != EBADF);        PEGASUS_ASSERT(errno != EBADF);
Line 555 
Line 496 
        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,        Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::run select event received events = %d, monitoring %d idle entries",           "Monitor::run select event received events = %d, monitoring %d idle entries",
            events, _idleEntries);            events, _idleEntries);
        for( int indx = 0; indx < (int)_entries.size(); indx++)         for( int indx = 0; indx < (int)entries.size(); indx++)
        {        {
           // The Monitor should only look at entries in the table that are IDLE (i.e.,           // The Monitor should only look at entries in the table that are IDLE (i.e.,
           // owned by the Monitor).           // owned by the Monitor).
           if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&            if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
              (FD_ISSET(_entries[indx].socket, &fdread)))               (FD_ISSET(entries[indx].socket, &fdread)))
           {           {
              MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);               MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,              Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                   "Monitor::run indx = %d, queueId =  %d, q = %p",                   "Monitor::run indx = %d, queueId =  %d, q = %p",
                   indx, _entries[indx].queueId, q);                    indx, entries[indx].queueId, q);
              PEGASUS_ASSERT(q !=0);              PEGASUS_ASSERT(q !=0);
  
              try              try
              {              {
                 if(_entries[indx]._type == Monitor::CONNECTION)                  if(entries[indx]._type == Monitor::CONNECTION)
                 {                 {
                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,                    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
                      "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);                       "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
                    static_cast<HTTPConnection *>(q)->_entry_index = indx;                    static_cast<HTTPConnection *>(q)->_entry_index = indx;
  
                    // Do not update the entry just yet. The entry gets updated once                    // Do not update the entry just yet. The entry gets updated once
                    // the request has been read.                    // the request has been read.
                    //_entries[indx]._status = _MonitorEntry::BUSY;                     //entries[indx]._status = _MonitorEntry::BUSY;
  
                    // If allocate_and_awaken failure, retry on next iteration                    // If allocate_and_awaken failure, retry on next iteration
 /* Removed for PEP 183. /* Removed for PEP 183.
Line 587 
Line 528 
                    {                    {
                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                           "Monitor::run: Insufficient resources to process request.");                           "Monitor::run: Insufficient resources to process request.");
                       _entries[indx]._status = _MonitorEntry::IDLE;                        entries[indx]._status = _MonitorEntry::IDLE;
                       _entry_mut.unlock();  
                       return true;                       return true;
                    }                    }
 */ */
Line 611 
Line 551 
  
                    // It is possible the entry status may not be set to busy.                    // It is possible the entry status may not be set to busy.
                    // The following will fail in that case.                    // The following will fail in that case.
                    // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);                     // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
                    // Once the HTTPConnection thread has set the status value to either                    // Once the HTTPConnection thread has set the status value to either
                    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection                    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
                    // to the Monitor.  It is no longer permissible to access the connection                    // to the Monitor.  It is no longer permissible to access the connection
Line 629 
Line 569 
                    //}                    //}
 // end Added for PEP 183 // end Added for PEP 183
                 }                 }
                 else if( _entries[indx]._type == Monitor::INTERNAL){                  else if( entries[indx]._type == Monitor::INTERNAL){
                         // set ourself to BUSY,                         // set ourself to BUSY,
                         // read the data                         // read the data
                         // and set ourself back to IDLE                         // and set ourself back to IDLE
  
                         _entries[indx]._status == _MonitorEntry::BUSY;                          entries[indx]._status = _MonitorEntry::BUSY;
                         static char buffer[2];                         static char buffer[2];
                         Socket::disableBlocking(_entries[indx].socket);                          Socket::disableBlocking(entries[indx].socket);
                         Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);                          Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
                         Socket::enableBlocking(_entries[indx].socket);                          Socket::enableBlocking(entries[indx].socket);
                         _entries[indx]._status == _MonitorEntry::IDLE;                          entries[indx]._status = _MonitorEntry::IDLE;
                 }                 }
                 else                 else
                 {                 {
Line 647 
Line 587 
                      "Non-connection entry, indx = %d, has been received.", indx);                      "Non-connection entry, indx = %d, has been received.", indx);
                    int events = 0;                    int events = 0;
                    events |= SocketMessage::READ;                    events |= SocketMessage::READ;
                    Message *msg = new SocketMessage(_entries[indx].socket, events);                     Message *msg = new SocketMessage(entries[indx].socket, events);
                    _entries[indx]._status = _MonitorEntry::BUSY;                     entries[indx]._status = _MonitorEntry::BUSY;
                    _entry_mut.unlock();                     autoEntryMutex.unlock();
   
                    q->enqueue(msg);                    q->enqueue(msg);
                    _entries[indx]._status = _MonitorEntry::IDLE;                     autoEntryMutex.lock();
                      entries[indx]._status = _MonitorEntry::IDLE;
   
                    return true;                    return true;
                 }                 }
              }              }
Line 663 
Line 604 
           }           }
        }        }
     }     }
     _entry_mut.unlock();  
     return(handled_events);     return(handled_events);
 } }
  
Line 695 
Line 636 
  
  
 int  Monitor::solicitSocketMessages( int  Monitor::solicitSocketMessages(
     Sint32 socket,      PEGASUS_SOCKET socket,
     Uint32 events,     Uint32 events,
     Uint32 queueId,     Uint32 queueId,
     int type)     int type)
Line 707 
Line 648 
    // current connections requested    // current connections requested
    _solicitSocketCount++;  // bump the count    _solicitSocketCount++;  // bump the count
    int size = (int)_entries.size();    int size = (int)_entries.size();
    if(_solicitSocketCount >= (size-1)){     if((int)_solicitSocketCount >= (size-1)){
         for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){          for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
                 _MonitorEntry entry(0, 0, 0);                 _MonitorEntry entry(0, 0, 0);
                 _entries.append(entry);                 _entries.append(entry);
         }         }
Line 719 
Line 660 
    {    {
       try       try
       {       {
          if(_entries[index]._status.value() == _MonitorEntry::EMPTY)           if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
          {          {
             _entries[index].socket = socket;             _entries[index].socket = socket;
             _entries[index].queueId  = queueId;             _entries[index].queueId  = queueId;
Line 739 
Line 680 
  
 } }
  
 void Monitor::unsolicitSocketMessages(Sint32 socket)  void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
 { {
  
     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");     PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
Line 749 
Line 690 
         Start at index = 1 because _entries[0] is the tickle entry which never needs         Start at index = 1 because _entries[0] is the tickle entry which never needs
         to be EMPTY;         to be EMPTY;
     */     */
     int index;      unsigned int index;
     for(index = 1; index < _entries.size(); index++)     for(index = 1; index < _entries.size(); index++)
     {     {
        if(_entries[index].socket == socket)        if(_entries[index].socket == socket)
        {        {
           _entries[index]._status = _MonitorEntry::EMPTY;           _entries[index]._status = _MonitorEntry::EMPTY;
           _entries[index].socket = -1;            _entries[index].socket = PEGASUS_INVALID_SOCKET;
           _solicitSocketCount--;           _solicitSocketCount--;
           break;           break;
        }        }
Line 768 
Line 709 
         This prevents the positions, of the NON EMPTY entries, from being changed.         This prevents the positions, of the NON EMPTY entries, from being changed.
     */     */
     index = _entries.size() - 1;     index = _entries.size() - 1;
     while(_entries[index]._status == _MonitorEntry::EMPTY){      while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
                 _entries.remove(index);                 _entries.remove(index);
         index--;         index--;
     }     }
   
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 } }
  
Line 796 
Line 736 
    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,    Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);           "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
  
    PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);     PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
  
    // Once the HTTPConnection thread has set the status value to either    // Once the HTTPConnection thread has set the status value to either
    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection    // Monitor::DYING or Monitor::IDLE, it has returned control of the connection


Legend:
Removed from v.1.85  
changed lines
  Added in v.1.100

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2