version 1.31.2.1, 2002/10/25 20:49:43
|
version 1.32.2.6, 2002/10/31 19:40:47
|
|
|
// | // |
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
| |
#include <Pegasus/Common/Config.h> |
|
#include <cstring> |
|
#include "Monitor.h" | #include "Monitor.h" |
#include "MessageQueue.h" |
|
#include "Socket.h" |
|
#include <Pegasus/Common/Tracer.h> |
|
#include <Pegasus/Common/HTTPConnection.h> |
|
| |
#ifdef PEGASUS_OS_TYPE_WINDOWS | #ifdef PEGASUS_OS_TYPE_WINDOWS |
# if defined(FD_SETSIZE) && FD_SETSIZE != 1024 | # if defined(FD_SETSIZE) && FD_SETSIZE != 1024 |
|
|
| |
static struct timeval create_time = {0, 1}; | static struct timeval create_time = {0, 1}; |
static struct timeval destroy_time = {15, 0}; | static struct timeval destroy_time = {15, 0}; |
static struct timeval deadlock_time = {0, 0}; |
static struct timeval deadlock_time = {300, 0}; |
| |
//////////////////////////////////////////////////////////////////////////////// | //////////////////////////////////////////////////////////////////////////////// |
// | // |
|
|
//////////////////////////////////////////////////////////////////////////////// | //////////////////////////////////////////////////////////////////////////////// |
| |
Monitor::Monitor() | Monitor::Monitor() |
: _module_handle(0), _controller(0), _async(false) |
: _async(false) |
{ | { |
Socket::initializeInterface(); | Socket::initializeInterface(); |
_rep = 0; | _rep = 0; |
|
|
} | } |
| |
Monitor::Monitor(Boolean async) | Monitor::Monitor(Boolean async) |
: _module_handle(0), _controller(0), _async(async) |
: _async(async) |
{ | { |
Socket::initializeInterface(); | Socket::initializeInterface(); |
_rep = 0; | _rep = 0; |
|
|
Tracer::trace(TRC_HTTP, Tracer::LEVEL4, | Tracer::trace(TRC_HTTP, Tracer::LEVEL4, |
"deregistering with module controller"); | "deregistering with module controller"); |
| |
if(_module_handle != NULL) |
|
{ |
|
_controller->deregister_module(PEGASUS_MODULENAME_MONITOR); |
|
_controller = 0; |
|
delete _module_handle; |
|
} |
|
Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep"); | Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep"); |
| |
Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface"); | Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface"); |
|
|
struct timeval tv = {0,1}; | struct timeval tv = {0,1}; |
fd_set fdread; | fd_set fdread; |
FD_ZERO(&fdread); | FD_ZERO(&fdread); |
|
_entry_mut.lock(pegasus_thread_self()); |
| |
for( int indx = 0; indx < (int)_entries.size(); indx++) | for( int indx = 0; indx < (int)_entries.size(); indx++) |
{ | { |
|
|
} | } |
} | } |
| |
|
|
int events = select(FD_SETSIZE, &fdread, NULL, NULL, &tv); | int events = select(FD_SETSIZE, &fdread, NULL, NULL, &tv); |
| |
#ifdef PEGASUS_OS_TYPE_WINDOWS | #ifdef PEGASUS_OS_TYPE_WINDOWS |
|
|
} | } |
catch(...) | catch(...) |
{ | { |
|
|
} | } |
return true; |
continue; |
} | } |
try | try |
{ | { |
|
|
MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner(); | MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner(); |
Message* message= new CloseConnectionMessage(_entries[indx].socket); | Message* message= new CloseConnectionMessage(_entries[indx].socket); |
message->dest = o.getQueueId(); | message->dest = o.getQueueId(); |
|
_entry_mut.unlock(); |
o.enqueue(message); | o.enqueue(message); |
return true; | return true; |
} | } |
|
|
events |= SocketMessage::READ; | events |= SocketMessage::READ; |
Message *msg = new SocketMessage(_entries[indx].socket, events); | Message *msg = new SocketMessage(_entries[indx].socket, events); |
_entries[indx]._status = _MonitorEntry::BUSY; | _entries[indx]._status = _MonitorEntry::BUSY; |
|
_entry_mut.unlock(); |
q->enqueue(msg); | q->enqueue(msg); |
_entries[indx]._status = _MonitorEntry::IDLE; | _entries[indx]._status = _MonitorEntry::IDLE; |
return true; | return true; |
|
|
} | } |
} | } |
} | } |
|
_entry_mut.unlock(); |
return(handled_events); | return(handled_events); |
} | } |
| |
|
|
PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solictSocketMessage"); | PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solictSocketMessage"); |
| |
int index = -1; | int index = -1; |
|
_entry_mut.lock(pegasus_thread_self()); |
|
|
for(index = 0; index < (int)_entries.size(); index++) | for(index = 0; index < (int)_entries.size(); index++) |
{ | { |
try | try |
{ | { |
if(_entries[index]._status.value() == _MonitorEntry::EMPTY) | if(_entries[index]._status.value() == _MonitorEntry::EMPTY) |
{ | { |
|
|
_entries[index].socket = socket; | _entries[index].socket = socket; |
_entries[index].queueId = queueId; | _entries[index].queueId = queueId; |
_entries[index]._type = type; | _entries[index]._type = type; |
_entries[index]._status = _MonitorEntry::IDLE; | _entries[index]._status = _MonitorEntry::IDLE; |
|
_entry_mut.unlock(); |
|
|
return index; | return index; |
} | } |
} | } |
catch(...) | catch(...) |
{ | { |
} | } |
|
|
} | } |
|
_entry_mut.unlock(); |
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
return index; | return index; |
} | } |
|
|
void Monitor::unsolicitSocketMessages(Sint32 socket) | void Monitor::unsolicitSocketMessages(Sint32 socket) |
{ | { |
PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages"); | PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages"); |
|
_entry_mut.lock(pegasus_thread_self()); |
| |
for(int index = 0; index < (int)_entries.size(); index++) | for(int index = 0; index < (int)_entries.size(); index++) |
{ | { |
if(_entries[index].socket == socket) | if(_entries[index].socket == socket) |
{ | { |
_entries[index]._status = _MonitorEntry::EMPTY; | _entries[index]._status = _MonitorEntry::EMPTY; |
|
break; |
} | } |
} | } |
| |
|
_entry_mut.unlock(); |
|
|
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
if( _async == true ) | if( _async == true ) |
PEGASUS_STD(cout) << "Monitor:: running " << _thread_pool->running_count() << | PEGASUS_STD(cout) << "Monitor:: running " << _thread_pool->running_count() << |