version 1.72.2.2, 2004/08/07 21:01:29
|
version 1.72.2.3, 2004/08/11 18:50:44
|
|
|
// get a socket for the server side | // get a socket for the server side |
if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){ | if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){ |
//handle error | //handle error |
throw Exception("Monitor::initializeTickler(), create socket failed on tickle server."); |
MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE", |
|
"Received error number $0 while creating the internal socket.", |
|
errno); |
|
throw Exception(parms); |
} | } |
| |
// initialize the address | // initialize the address |
|
|
PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_server_addr); | PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_server_addr); |
| |
// bind server side to socket | // bind server side to socket |
if((::bind(_tickle_server_socket,(struct sockaddr *)&_tickle_server_addr, sizeof(_tickle_server_addr))) < 0){ |
if((::bind(_tickle_server_socket, |
|
(struct sockaddr *)&_tickle_server_addr, |
|
sizeof(_tickle_server_addr))) < 0){ |
// handle error | // handle error |
throw Exception("Monitor::initializeTickler(), bind failed on tickle server socket."); |
MessageLoaderParms parms("Common.Monitor.TICKLE_BIND", |
|
"Received error number $0 while binding the internal socket.", |
|
errno); |
|
throw Exception(parms); |
} | } |
| |
// tell the kernel we are a server | // tell the kernel we are a server |
if((::listen(_tickle_server_socket,3)) < 0){ | if((::listen(_tickle_server_socket,3)) < 0){ |
// handle error | // handle error |
throw Exception("Monitor::initializeTickler(), listen failed on tickle server socket"); |
MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN", |
|
"Received error number $0 while listening to the internal socket.", |
|
errno); |
|
throw Exception(parms); |
} | } |
| |
// make sure we have the correct socket for our server | // make sure we have the correct socket for our server |
int sock = ::getsockname(_tickle_server_socket,(struct sockaddr*)&_tickle_server_addr, &_addr_size); |
int sock = ::getsockname(_tickle_server_socket, |
|
(struct sockaddr*)&_tickle_server_addr, |
|
&_addr_size); |
if(sock < 0){ | if(sock < 0){ |
// handle error | // handle error |
throw Exception("Monitor::initializeTickler(), getsockname failed on tickle server socket"); |
MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME", |
|
"Received error number $0 while getting the internal socket name.", |
|
errno); |
|
throw Exception(parms); |
} | } |
| |
/* set up the tickle client/connector */ | /* set up the tickle client/connector */ |
|
|
// get a socket for our tickle client | // get a socket for our tickle client |
if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){ | if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){ |
// handle error | // handle error |
throw Exception("Monitor::initializeTickler(), create socket failed on tickle client."); |
MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE", |
|
"Received error number $0 while creating the internal client socket.", |
|
errno); |
|
throw Exception(parms); |
} | } |
| |
// setup the address of the client | // setup the address of the client |
|
|
_tickle_client_addr.sin_port = 0; | _tickle_client_addr.sin_port = 0; |
| |
// bind socket to client side | // bind socket to client side |
if((::bind(_tickle_client_socket,(struct sockaddr*)&_tickle_client_addr, sizeof(_tickle_client_addr))) < 0){ |
if((::bind(_tickle_client_socket, |
|
(struct sockaddr*)&_tickle_client_addr, |
|
sizeof(_tickle_client_addr))) < 0){ |
// handle error | // handle error |
throw Exception("Monitor::initializeTickler(), bind failed on tickle client socket."); |
MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND", |
|
"Received error number $0 while binding the internal client socket.", |
|
errno); |
|
throw Exception(parms); |
} | } |
| |
// connect to server side | // connect to server side |
if((::connect(_tickle_client_socket,(struct sockaddr*)&_tickle_server_addr, sizeof(_tickle_server_addr))) < 0){ |
if((::connect(_tickle_client_socket, |
|
(struct sockaddr*)&_tickle_server_addr, |
|
sizeof(_tickle_server_addr))) < 0){ |
// handle error | // handle error |
throw Exception("Monitor::initializeTickler(), connect failed between tickle client and tickle server."); |
MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT", |
|
"Received error number $0 while connecting the internal client socket.", |
|
errno); |
|
throw Exception(parms); |
} | } |
| |
/* set up the slave connection */ | /* set up the slave connection */ |
|
|
pegasus_sleep(1); | pegasus_sleep(1); |
| |
// this call may fail, we will try a max of 20 times to establish this peer connection | // this call may fail, we will try a max of 20 times to establish this peer connection |
if((_tickle_peer_socket = ::accept(_tickle_server_socket,(struct sockaddr*)&_tickle_peer_addr, &peer_size)) < 0){ |
if((_tickle_peer_socket = ::accept(_tickle_server_socket, |
|
(struct sockaddr*)&_tickle_peer_addr, |
|
&peer_size)) < 0){ |
if(_tickle_peer_socket == -1 && errno == EAGAIN) | if(_tickle_peer_socket == -1 && errno == EAGAIN) |
{ | { |
int retries = 0; | int retries = 0; |
do | do |
{ | { |
pegasus_sleep(1); | pegasus_sleep(1); |
_tickle_peer_socket = ::accept(_tickle_server_socket,(struct sockaddr*)&_tickle_peer_addr, &peer_size); |
_tickle_peer_socket = ::accept(_tickle_server_socket, |
|
(struct sockaddr*)&_tickle_peer_addr, |
|
&peer_size); |
retries++; | retries++; |
} while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20); | } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20); |
} | } |
} | } |
if(_tickle_peer_socket == -1){ | if(_tickle_peer_socket == -1){ |
// handle error | // handle error |
throw Exception("Monitor::initializeTickler(), accept failed, peer socket connection not established."); |
MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT", |
|
"Received error number $0 while accepting the internal socket connection.", |
|
errno); |
|
throw Exception(parms); |
} | } |
// add the tickler to the list of entries to be monitored and set to IDLE because Monitor only | // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only |
// checks entries with IDLE state for events | // checks entries with IDLE state for events |
|
|
| |
Boolean handled_events = false; | Boolean handled_events = false; |
int i = 0; | int i = 0; |
// #if defined(PEGASUS_OS_OS400) || defined(PEGASUS_OS_HPUX) |
|
struct timeval tv = {milliseconds/1000, milliseconds%1000*1000}; | struct timeval tv = {milliseconds/1000, milliseconds%1000*1000}; |
//#else |
|
// struct timeval tv = {0, 1}; |
|
//#endif |
|
fd_set fdread; | fd_set fdread; |
FD_ZERO(&fdread); | FD_ZERO(&fdread); |
|
|
_entry_mut.lock(pegasus_thread_self()); | _entry_mut.lock(pegasus_thread_self()); |
| |
// Check the stopConnections flag. If set, clear the Acceptor monitor entries | // Check the stopConnections flag. If set, clear the Acceptor monitor entries |
|
|
*/ | */ |
maxSocketCurrentPass++; | maxSocketCurrentPass++; |
| |
// Fixed in monitor_2 but added because Monitor is still the default monitor. |
|
// When _idleEntries is 0 don't immediately return, otherwise this loops out of control |
|
// kicking off kill idle thread threads. E.g. There is nothing to select on when the cimserver |
|
// is shutting down. |
|
/*if( _idleEntries == 0 ) |
|
{ |
|
Thread::sleep( milliseconds ); |
|
_entry_mut.unlock(); |
|
return false; |
|
}*/ |
|
|
|
_entry_mut.unlock(); | _entry_mut.unlock(); |
//int events = select(FD_SETSIZE, &fdread, NULL, NULL, &tv); |
|
int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv); | int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv); |
_entry_mut.lock(pegasus_thread_self()); | _entry_mut.lock(pegasus_thread_self()); |
| |
|
|
static_cast<HTTPConnection *>(q)->_entry_index = indx; | static_cast<HTTPConnection *>(q)->_entry_index = indx; |
_entries[indx]._status = _MonitorEntry::BUSY; | _entries[indx]._status = _MonitorEntry::BUSY; |
// If allocate_and_awaken failure, retry on next iteration | // If allocate_and_awaken failure, retry on next iteration |
/* |
/* Removed for PEP 183. |
if (!MessageQueueService::get_thread_pool()->allocate_and_awaken( | if (!MessageQueueService::get_thread_pool()->allocate_and_awaken( |
(void *)q, _dispatch)) | (void *)q, _dispatch)) |
{ | { |
|
|
return true; | return true; |
} | } |
*/ | */ |
// begin hack |
// Added for PEP 183 |
HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q); | HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q); |
Tracer::trace(TRC_HTTP, Tracer::LEVEL4, | Tracer::trace(TRC_HTTP, Tracer::LEVEL4, |
"Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p", | "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p", |
|
|
"Monitor::_dispatch: exited run() for index %d", dst->_entry_index); | "Monitor::_dispatch: exited run() for index %d", dst->_entry_index); |
| |
PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY); | PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY); |
|
|
// Once the HTTPConnection thread has set the status value to either | // Once the HTTPConnection thread has set the status value to either |
// Monitor::DYING or Monitor::IDLE, it has returned control of the connection | // Monitor::DYING or Monitor::IDLE, it has returned control of the connection |
// to the Monitor. It is no longer permissible to access the connection | // to the Monitor. It is no longer permissible to access the connection |
|
|
{ | { |
dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE; | dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE; |
} | } |
|
// end Added for PEP 183 |
// end hack |
|
} | } |
else if( _entries[indx]._type == Monitor::INTERNAL){ | else if( _entries[indx]._type == Monitor::INTERNAL){ |
// set ourself to BUSY, | // set ourself to BUSY, |
|
|
int type) | int type) |
{ | { |
PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages"); | PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages"); |
|
|
_entry_mut.lock(pegasus_thread_self()); | _entry_mut.lock(pegasus_thread_self()); |
// Check to see if we need to dynamically grow the _entries array | // Check to see if we need to dynamically grow the _entries array |
// We always want the _entries array to 2 bigger than the | // We always want the _entries array to 2 bigger than the |
|
|
catch(...) | catch(...) |
{ | { |
} | } |
|
|
} | } |
_solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful | _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful |
_entry_mut.unlock(); | _entry_mut.unlock(); |
|
|
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
} | } |
| |
|
// Note: this is no longer called with PEP 183. |
PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm) | PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm) |
{ | { |
HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm); | HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm); |