version 1.103, 2006/01/30 16:17:05
|
version 1.103.2.3, 2007/12/14 20:56:54
|
|
|
// Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com) | // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com) |
// | // |
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
|
//NOCHKSRC |
| |
#include <Pegasus/Common/Config.h> | #include <Pegasus/Common/Config.h> |
| |
|
|
#endif | #endif |
throw Exception(parms); | throw Exception(parms); |
} | } |
|
|
|
Socket::disableBlocking(_tickle_peer_socket); |
|
Socket::disableBlocking(_tickle_client_socket); |
|
|
// add the tickler to the list of entries to be monitored and set to IDLE because Monitor only | // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only |
// checks entries with IDLE state for events | // checks entries with IDLE state for events |
_MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL); | _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL); |
|
|
}; | }; |
| |
AutoMutex autoMutex(_tickle_mutex); | AutoMutex autoMutex(_tickle_mutex); |
Socket::disableBlocking(_tickle_client_socket); |
|
Socket::write(_tickle_client_socket,&_buffer, 2); | Socket::write(_tickle_client_socket,&_buffer, 2); |
Socket::enableBlocking(_tickle_client_socket); |
|
} | } |
| |
void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status ) | void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status ) |
|
|
_entries[index]._status = status; | _entries[index]._status = status; |
} | } |
| |
Boolean Monitor::run(Uint32 milliseconds) |
void Monitor::run(Uint32 milliseconds) |
{ | { |
| |
Boolean handled_events = false; |
|
int i = 0; | int i = 0; |
| |
struct timeval tv = {milliseconds/1000, milliseconds%1000*1000}; | struct timeval tv = {milliseconds/1000, milliseconds%1000*1000}; |
|
|
int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv); | int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv); |
#endif | #endif |
autoEntryMutex.lock(); | autoEntryMutex.lock(); |
|
|
|
TimeValue timeNow; |
|
timeNow = TimeValue::getCurrentTime(); |
|
|
// After enqueue a message and the autoEntryMutex has been released and locked again, | // After enqueue a message and the autoEntryMutex has been released and locked again, |
// the array of _entries can be changed. The ArrayIterator has be reset with the original _entries | // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries |
entries.reset(_entries); | entries.reset(_entries); |
|
|
{ | { |
Tracer::trace(TRC_HTTP, Tracer::LEVEL4, | Tracer::trace(TRC_HTTP, Tracer::LEVEL4, |
"entries[indx].type for indx = %d is Monitor::CONNECTION", indx); | "entries[indx].type for indx = %d is Monitor::CONNECTION", indx); |
static_cast<HTTPConnection *>(q)->_entry_index = indx; |
HTTPConnection *dst = |
|
reinterpret_cast<HTTPConnection *>(q); |
// Do not update the entry just yet. The entry gets updated once |
dst->_entry_index = indx; |
// the request has been read. |
|
//entries[indx]._status = _MonitorEntry::BUSY; |
// Update idle start time because we have received some |
|
// data. Any data is good data at this point, and we'll |
// If allocate_and_awaken failure, retry on next iteration |
// keep the connection alive, even if we've exceeded |
/* Removed for PEP 183. |
// the idleConnectionTimeout, which will be checked |
if (!MessageQueueService::get_thread_pool()->allocate_and_awaken( |
// when we call closeConnectionOnTimeout() next. |
(void *)q, _dispatch)) |
dst->_idleStartTime = TimeValue::getCurrentTime(); |
|
|
|
// Check for accept pending (ie. SSL handshake pending) |
|
// or idle connection timeouts for sockets from which |
|
// we received data (avoiding extra queue lookup below). |
|
if (!dst->closeConnectionOnTimeout(timeNow)) |
{ | { |
Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2, |
|
"Monitor::run: Insufficient resources to process request."); |
|
entries[indx]._status = _MonitorEntry::IDLE; |
|
return true; |
|
} |
|
*/ |
|
// Added for PEP 183 |
|
HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q); |
|
Tracer::trace(TRC_HTTP, Tracer::LEVEL4, | Tracer::trace(TRC_HTTP, Tracer::LEVEL4, |
"Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p", | "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p", |
dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst); | dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst); |
|
|
} | } |
Tracer::trace(TRC_HTTP, Tracer::LEVEL4, | Tracer::trace(TRC_HTTP, Tracer::LEVEL4, |
"Monitor::_dispatch: exited run() for index %d", dst->_entry_index); | "Monitor::_dispatch: exited run() for index %d", dst->_entry_index); |
|
} |
// It is possible the entry status may not be set to busy. |
|
// The following will fail in that case. |
|
// PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY); |
|
// Once the HTTPConnection thread has set the status value to either |
|
// Monitor::DYING or Monitor::IDLE, it has returned control of the connection |
|
// to the Monitor. It is no longer permissible to access the connection |
|
// or the entry in the _entries table. |
|
|
|
// The following is not relevant as the worker thread or the |
|
// reader thread will update the status of the entry. |
|
//if (dst->_connectionClosePending) |
|
//{ |
|
// dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING; |
|
//} |
|
//else |
|
//{ |
|
// dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE; |
|
//} |
|
// end Added for PEP 183 |
|
} | } |
else if( entries[indx]._type == Monitor::INTERNAL){ | else if( entries[indx]._type == Monitor::INTERNAL){ |
// set ourself to BUSY, | // set ourself to BUSY, |
|
|
| |
entries[indx]._status = _MonitorEntry::BUSY; | entries[indx]._status = _MonitorEntry::BUSY; |
static char buffer[2]; | static char buffer[2]; |
Socket::disableBlocking(entries[indx].socket); |
|
Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2); | Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2); |
Socket::enableBlocking(entries[indx].socket); |
|
entries[indx]._status = _MonitorEntry::IDLE; | entries[indx]._status = _MonitorEntry::IDLE; |
} | } |
else | else |
|
|
// the array of entries can be changed. The ArrayIterator has be reset with the original _entries | // the array of entries can be changed. The ArrayIterator has be reset with the original _entries |
entries.reset(_entries); | entries.reset(_entries); |
entries[indx]._status = _MonitorEntry::IDLE; | entries[indx]._status = _MonitorEntry::IDLE; |
|
|
return true; |
|
} | } |
} | } |
catch(...) | catch(...) |
{ | { |
} | } |
handled_events = true; |
} |
|
// else check for accept pending (ie. SSL handshake pending) or |
|
// idle connection timeouts for sockets from which we did not |
|
// receive data. |
|
else if ((entries[indx]._status.get() == _MonitorEntry::IDLE) && |
|
entries[indx]._type == Monitor::CONNECTION) |
|
{ |
|
MessageQueue *q = MessageQueue::lookup(entries[indx].queueId); |
|
HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q); |
|
dst->_entry_index = indx; |
|
dst->closeConnectionOnTimeout(timeNow); |
|
} |
|
} |
|
} |
|
// else if "events" is zero (ie. select timed out) then we still need |
|
// to check if there are any pending SSL handshakes that have timed out. |
|
else |
|
{ |
|
for (int indx = 0; indx < (int)entries.size(); indx++) |
|
{ |
|
if ((entries[indx]._status.get() == _MonitorEntry::IDLE) && |
|
entries[indx]._type == Monitor::CONNECTION) |
|
{ |
|
MessageQueue *q = MessageQueue::lookup(entries[indx].queueId); |
|
HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q); |
|
dst->_entry_index = indx; |
|
dst->closeConnectionOnTimeout(timeNow); |
} | } |
} | } |
} | } |
|
|
return(handled_events); |
|
} | } |
| |
void Monitor::stopListeningForConnections(Boolean wait) | void Monitor::stopListeningForConnections(Boolean wait) |