(file) Return to Monitor.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/Monitor.cpp between version 1.86 and 1.94.8.1

version 1.86, 2005/01/27 20:34:46 version 1.94.8.1, 2005/10/17 21:05:32
Line 1 
Line 1 
 //%2004////////////////////////////////////////////////////////////////////////  //%2005////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems. // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
Line 6 
Line 6 
 // IBM Corp.; EMC Corporation, The Open Group. // IBM Corp.; EMC Corporation, The Open Group.
 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.; // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group. // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; VERITAS Software Corporation; The Open Group.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 73 
Line 75 
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
 // Define a platform-neutral socket length type // Define a platform-neutral socket length type
 #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM)  #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) || defined(PEGASUS_OS_VMS)
 typedef size_t PEGASUS_SOCKLEN_T; typedef size_t PEGASUS_SOCKLEN_T;
 #elif defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) || defined(PEGASUS_OS_LINUX) || (defined(PEGASUS_OS_SOLARIS) && !defined(SUNOS_5_6)) #elif defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) || defined(PEGASUS_OS_LINUX) || (defined(PEGASUS_OS_SOLARIS) && !defined(SUNOS_5_6))
 typedef socklen_t PEGASUS_SOCKLEN_T; typedef socklen_t PEGASUS_SOCKLEN_T;
Line 81 
Line 83 
 typedef int PEGASUS_SOCKLEN_T; typedef int PEGASUS_SOCKLEN_T;
 #endif #endif
  
 static AtomicInt _connections = 0;  static AtomicInt _connections(0);
   
 static struct timeval create_time = {0, 1};  
 static struct timeval destroy_time = {300, 0};  
 static struct timeval deadlock_time = {0, 0};  
   
 ////////////////////////////////////////////////////////////////////////////////  
 //  
 // MonitorRep  
 //  
 ////////////////////////////////////////////////////////////////////////////////  
   
 struct MonitorRep  
 {  
     fd_set rd_fd_set;  
     fd_set wr_fd_set;  
     fd_set ex_fd_set;  
     fd_set active_rd_fd_set;  
     fd_set active_wr_fd_set;  
     fd_set active_ex_fd_set;  
 };  
  
 //////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
 // //
Line 111 
Line 93 
  
 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32 #define MAX_NUMBER_OF_MONITOR_ENTRIES  32
 Monitor::Monitor() Monitor::Monitor()
    : _module_handle(0),     : _stopConnections(0),
      _controller(0),  
      _async(false),  
      _stopConnections(0),  
      _stopConnectionsSem(0),  
      _solicitSocketCount(0),  
      _tickle_client_socket(-1),  
      _tickle_server_socket(-1),  
      _tickle_peer_socket(-1)  
 {  
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;  
     Socket::initializeInterface();  
     _rep = 0;  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);  
   
     // setup the tickler  
     initializeTickler();  
   
     // Start the count at 1 because initilizeTickler()  
     // has added an entry in the first position of the  
     // _entries array  
     for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )  
     {  
        _MonitorEntry entry(0, 0, 0);  
        _entries.append(entry);  
     }  
 }  
   
 Monitor::Monitor(Boolean async)  
    : _module_handle(0),  
      _controller(0),  
      _async(async),  
      _stopConnections(0),  
      _stopConnectionsSem(0),      _stopConnectionsSem(0),
      _solicitSocketCount(0),      _solicitSocketCount(0),
      _tickle_client_socket(-1),      _tickle_client_socket(-1),
Line 152 
Line 102 
 { {
     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;     int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
     Socket::initializeInterface();     Socket::initializeInterface();
     _rep = 0;  
     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);     _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
  
     // setup the tickler     // setup the tickler
Line 170 
Line 119 
  
 Monitor::~Monitor() Monitor::~Monitor()
 { {
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4,  
                   "deregistering with module controller");  
   
     if(_module_handle.get() != NULL)  
     {  
        _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);  
        _controller.reset();  
        _module_handle.reset();  
     }  
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");  
   
     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");     Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
  
     try{     try{
Line 250 
Line 188 
  
     // bind server side to socket     // bind server side to socket
     if((::bind(_tickle_server_socket,     if((::bind(_tickle_server_socket,
                (struct sockaddr *)&_tickle_server_addr,                 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                sizeof(_tickle_server_addr))) < 0){                sizeof(_tickle_server_addr))) < 0){
         // handle error         // handle error
 #ifdef PEGASUS_OS_ZOS #ifdef PEGASUS_OS_ZOS
Line 283 
Line 221 
  
     // make sure we have the correct socket for our server     // make sure we have the correct socket for our server
     int sock = ::getsockname(_tickle_server_socket,     int sock = ::getsockname(_tickle_server_socket,
                              (struct sockaddr*)&_tickle_server_addr,                     reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                              &_addr_size);                              &_addr_size);
     if(sock < 0){     if(sock < 0){
         // handle error         // handle error
Line 330 
Line 268 
  
     // bind socket to client side     // bind socket to client side
     if((::bind(_tickle_client_socket,     if((::bind(_tickle_client_socket,
                (struct sockaddr*)&_tickle_client_addr,                 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
                sizeof(_tickle_client_addr))) < 0){                sizeof(_tickle_client_addr))) < 0){
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
Line 345 
Line 283 
  
     // connect to server side     // connect to server side
     if((::connect(_tickle_client_socket,     if((::connect(_tickle_client_socket,
                   (struct sockaddr*)&_tickle_server_addr,                    reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
                   sizeof(_tickle_server_addr))) < 0){                   sizeof(_tickle_server_addr))) < 0){
         // handle error         // handle error
         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",         MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
Line 365 
Line 303 
  
     // this call may fail, we will try a max of 20 times to establish this peer connection     // this call may fail, we will try a max of 20 times to establish this peer connection
     if((_tickle_peer_socket = ::accept(_tickle_server_socket,     if((_tickle_peer_socket = ::accept(_tickle_server_socket,
                                        (struct sockaddr*)&_tickle_peer_addr,              reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
                                        &peer_size)) < 0){                                        &peer_size)) < 0){
 #if !defined(PEGASUS_OS_TYPE_WINDOWS) #if !defined(PEGASUS_OS_TYPE_WINDOWS)
         // Only retry on non-windows platforms.         // Only retry on non-windows platforms.
Line 376 
Line 314 
           {           {
             pegasus_sleep(1);             pegasus_sleep(1);
             _tickle_peer_socket = ::accept(_tickle_server_socket,             _tickle_peer_socket = ::accept(_tickle_server_socket,
                                            (struct sockaddr*)&_tickle_peer_addr,                  reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
                                            &peer_size);                                            &peer_size);
             retries++;             retries++;
           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);           } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
Line 431 
Line 369 
     fd_set fdread;     fd_set fdread;
     FD_ZERO(&fdread);     FD_ZERO(&fdread);
  
     _entry_mut.lock(pegasus_thread_self());      AutoMutex autoEntryMutex(_entry_mut);
  
     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries     // Check the stopConnections flag.  If set, clear the Acceptor monitor entries
     if (_stopConnections == 1)      if (_stopConnections.value() == 1)
     {     {
         for ( int indx = 0; indx < (int)_entries.size(); indx++)         for ( int indx = 0; indx < (int)_entries.size(); indx++)
         {         {
Line 505 
Line 443 
           // unlocked will not result in an ArrayIndexOutOfBounds           // unlocked will not result in an ArrayIndexOutOfBounds
           // exception.           // exception.
  
           _entry_mut.unlock();            autoEntryMutex.unlock();
           o.enqueue(message);           o.enqueue(message);
           _entry_mut.lock(pegasus_thread_self());            autoEntryMutex.lock();
        }        }
     }     }
  
Line 538 
Line 476 
     */     */
     maxSocketCurrentPass++;     maxSocketCurrentPass++;
  
     _entry_mut.unlock();      autoEntryMutex.unlock();
     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);     int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
    _entry_mut.lock(pegasus_thread_self());      autoEntryMutex.lock();
  
 #ifdef PEGASUS_OS_TYPE_WINDOWS #ifdef PEGASUS_OS_TYPE_WINDOWS
     if(events == SOCKET_ERROR)     if(events == SOCKET_ERROR)
Line 595 
Line 533 
                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,                       Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                           "Monitor::run: Insufficient resources to process request.");                           "Monitor::run: Insufficient resources to process request.");
                       _entries[indx]._status = _MonitorEntry::IDLE;                       _entries[indx]._status = _MonitorEntry::IDLE;
                       _entry_mut.unlock();  
                       return true;                       return true;
                    }                    }
 */ */
Line 641 
Line 578 
                         // read the data                         // read the data
                         // and set ourself back to IDLE                         // and set ourself back to IDLE
  
                         _entries[indx]._status == _MonitorEntry::BUSY;                          _entries[indx]._status.value() == _MonitorEntry::BUSY;
                         static char buffer[2];                         static char buffer[2];
                         Socket::disableBlocking(_entries[indx].socket);                         Socket::disableBlocking(_entries[indx].socket);
                         Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);                         Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
                         Socket::enableBlocking(_entries[indx].socket);                         Socket::enableBlocking(_entries[indx].socket);
                         _entries[indx]._status == _MonitorEntry::IDLE;                          _entries[indx]._status.value() == _MonitorEntry::IDLE;
                 }                 }
                 else                 else
                 {                 {
Line 656 
Line 593 
                    events |= SocketMessage::READ;                    events |= SocketMessage::READ;
                    Message *msg = new SocketMessage(_entries[indx].socket, events);                    Message *msg = new SocketMessage(_entries[indx].socket, events);
                    _entries[indx]._status = _MonitorEntry::BUSY;                    _entries[indx]._status = _MonitorEntry::BUSY;
                    _entry_mut.unlock();                     autoEntryMutex.unlock();
   
                    q->enqueue(msg);                    q->enqueue(msg);
                      autoEntryMutex.lock();
                    _entries[indx]._status = _MonitorEntry::IDLE;                    _entries[indx]._status = _MonitorEntry::IDLE;
   
                    return true;                    return true;
                 }                 }
              }              }
Line 670 
Line 608 
           }           }
        }        }
     }     }
     _entry_mut.unlock();  
     return(handled_events);     return(handled_events);
 } }
  
Line 714 
Line 652 
    // current connections requested    // current connections requested
    _solicitSocketCount++;  // bump the count    _solicitSocketCount++;  // bump the count
    int size = (int)_entries.size();    int size = (int)_entries.size();
    if(_solicitSocketCount >= (size-1)){     if((int)_solicitSocketCount >= (size-1)){
         for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){          for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
                 _MonitorEntry entry(0, 0, 0);                 _MonitorEntry entry(0, 0, 0);
                 _entries.append(entry);                 _entries.append(entry);
         }         }
Line 756 
Line 694 
         Start at index = 1 because _entries[0] is the tickle entry which never needs         Start at index = 1 because _entries[0] is the tickle entry which never needs
         to be EMPTY;         to be EMPTY;
     */     */
     int index;      unsigned int index;
     for(index = 1; index < _entries.size(); index++)     for(index = 1; index < _entries.size(); index++)
     {     {
        if(_entries[index].socket == socket)        if(_entries[index].socket == socket)
Line 775 
Line 713 
         This prevents the positions, of the NON EMPTY entries, from being changed.         This prevents the positions, of the NON EMPTY entries, from being changed.
     */     */
     index = _entries.size() - 1;     index = _entries.size() - 1;
     while(_entries[index]._status == _MonitorEntry::EMPTY){      while(_entries[index]._status.value() == _MonitorEntry::EMPTY){
         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)         if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
                 _entries.remove(index);                 _entries.remove(index);
         index--;         index--;
     }     }
   
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 } }
  


Legend:
Removed from v.1.86  
changed lines
  Added in v.1.94.8.1

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2