version 1.39, 2003/05/06 16:36:44
|
version 1.40, 2003/08/15 20:31:52
|
|
|
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
| |
#include <Pegasus/Common/Config.h> | #include <Pegasus/Common/Config.h> |
|
|
#include <cstring> | #include <cstring> |
#include "Monitor.h" | #include "Monitor.h" |
#include "MessageQueue.h" | #include "MessageQueue.h" |
|
|
| |
| |
| |
|
////************************* monitor 2 *****************************//// |
|
|
|
|
|
monitor_2_entry::monitor_2_entry(void) |
|
: type(UNTYPED) |
|
{ |
|
} |
|
|
|
monitor_2_entry::monitor_2_entry(pegasus_socket& _psock, monitor_2_entry_type _type) |
|
: type(_type), psock(_psock) |
|
{ |
|
|
|
} |
|
|
|
monitor_2_entry::monitor_2_entry(const monitor_2_entry& e) |
|
{ |
|
if(this != &e){ |
|
psock = e.psock; |
|
type = e.type; |
|
} |
|
} |
|
|
|
monitor_2_entry::~monitor_2_entry(void) |
|
{ |
|
} |
|
|
|
monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e) |
|
{ |
|
if(this != &e){ |
|
psock = e.psock; |
|
type = e.type; |
|
} |
|
return *this; |
|
} |
|
|
|
Boolean monitor_2_entry::operator ==(const monitor_2_entry& me) |
|
{ |
|
if(this == &me) |
|
return true; |
|
if((Sint32)this->psock == (Sint32)me.psock) |
|
return true; |
|
return false; |
|
} |
|
|
|
Boolean monitor_2_entry::operator ==(void* k) |
|
{ |
|
if((void *)this == k) |
|
return true; |
|
return false; |
|
} |
|
|
|
|
|
monitor_2_entry::operator pegasus_socket() const |
|
{ |
|
return psock; |
|
} |
|
|
|
|
|
monitor_2::monitor_2(void) |
|
: _session_dispatch(0), _listeners(true, 0),_ready(true), _die(0) |
|
{ |
|
try { |
|
|
|
bsd_socket_factory _factory; |
|
|
|
// set up the listener/acceptor |
|
pegasus_socket temp = pegasus_socket(&_factory); |
|
|
|
temp.socket(PF_INET, SOCK_STREAM, 0); |
|
// initialize the address |
|
memset(&_tickle_addr, 0, sizeof(_tickle_addr)); |
|
_tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); |
|
_tickle_addr.sin_family = PF_INET; |
|
_tickle_addr.sin_port = 0; |
|
|
|
PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr); |
|
|
|
temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr)); |
|
temp.listen(3); |
|
temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size); |
|
|
|
// set up the connector |
|
|
|
pegasus_socket tickler = pegasus_socket(&_factory); |
|
tickler.socket(PF_INET, SOCK_STREAM, 0); |
|
struct sockaddr_in _addr; |
|
memset(&_addr, 0, sizeof(_addr)); |
|
_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); |
|
_addr.sin_family = PF_INET; |
|
_addr.sin_port = 0; |
|
tickler.bind((struct sockaddr*)&_addr, sizeof(_addr)); |
|
tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr)); |
|
|
|
_tickler.psock = tickler; |
|
_tickler.type = INTERNAL; |
|
|
|
struct sockaddr_in peer; |
|
memset(&peer, 0, sizeof(peer)); |
|
PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer); |
|
|
|
pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size); |
|
monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL); |
|
_listeners.insert_first(_tickle); |
|
|
|
} |
|
catch(...){ } |
|
} |
|
|
|
monitor_2::~monitor_2(void) |
|
{ |
|
|
|
|
|
} |
|
|
|
|
|
void monitor_2::run(void) |
|
{ |
|
monitor_2_entry* temp; |
|
while(_die.value() == 0) { |
|
struct timeval tv = {0, 0}; |
|
|
|
// place all sockets in the select set |
|
FD_ZERO(&rd_fd_set); |
|
try { |
|
_listeners.lock(pegasus_thread_self()); |
|
temp = _listeners.next(0); |
|
while(temp != 0 ){ |
|
FD_SET((Sint32)temp->psock, &rd_fd_set); |
|
temp = _listeners.next(temp); |
|
} |
|
_listeners.unlock(); |
|
} |
|
catch(...){ |
|
return; |
|
} |
|
|
|
int events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL); |
|
try { |
|
_listeners.lock(pegasus_thread_self()); |
|
|
|
temp = _listeners.next(0); |
|
while(temp != 0 ){ |
|
if(FD_ISSET((Sint32)temp->psock, &rd_fd_set)) { |
|
FD_CLR((Sint32)temp->psock, &rd_fd_set); |
|
monitor_2_entry* entry = new monitor_2_entry(*temp); |
|
_ready.insert_first((void*)entry); |
|
} |
|
temp = _listeners.next(temp); |
|
} |
|
_listeners.unlock(); |
|
} |
|
catch(...){ |
|
return; |
|
} |
|
|
|
// now handle the sockets that are ready to read |
|
_dispatch(); |
|
} // while alive |
|
} |
|
|
|
void* monitor_2::set_session_dispatch(void (*dp)(pegasus_socket&)) |
|
{ |
|
void* old = (void*)_session_dispatch; |
|
_session_dispatch = dp; |
|
return old; |
|
} |
|
|
|
|
|
void monitor_2::_dispatch(void) |
|
{ |
|
monitor_2_entry* entry = (monitor_2_entry*) _ready.remove_first(); |
|
while(entry != 0 ){ |
|
switch(entry->type) { |
|
case INTERNAL: |
|
static char buffer[2]; |
|
entry->psock.read(&buffer, 2); |
|
break; |
|
case LISTEN: |
|
{ |
|
static struct sockaddr peer; |
|
static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer); |
|
pegasus_socket connected = entry->psock.accept(&peer, &peer_size); |
|
add_entry(connected, SESSION); |
|
} |
|
break; |
|
case SESSION: |
|
if(_session_dispatch != 0 ) |
|
_session_dispatch(entry->psock); |
|
else { |
|
static char buffer[4096]; |
|
int bytes = entry->psock.read(&buffer, 4096); |
|
} |
|
|
|
break; |
|
|
|
case UNTYPED: |
|
default: |
|
break; |
|
|
|
} |
|
delete entry; |
|
entry = (monitor_2_entry*) _ready.remove_first(); |
|
|
|
} |
|
} |
|
|
|
|
|
void monitor_2::stop(void) |
|
{ |
|
_die = 1; |
|
tickle(); |
|
|
|
// shut down the listener list, free the list nodes |
|
_tickler.psock.close(); |
|
_listeners.shutdown_queue(); |
|
} |
|
|
|
void monitor_2::tickle(void) |
|
{ |
|
static char _buffer[] = |
|
{ |
|
'0','0' |
|
}; |
|
|
|
_tickler.psock.write(&_buffer, 2); |
|
} |
|
|
|
|
|
Boolean monitor_2::add_entry(pegasus_socket& ps, monitor_2_entry_type type) |
|
{ |
|
monitor_2_entry* m2e = new monitor_2_entry(ps, type); |
|
|
|
try{ |
|
_listeners.insert_first(m2e); |
|
} |
|
catch(...){ |
|
delete m2e; |
|
return false; |
|
} |
|
tickle(); |
|
return true; |
|
} |
|
|
|
Boolean monitor_2::remove_entry(Sint32 s) |
|
{ |
|
monitor_2_entry* temp; |
|
try { |
|
_listeners.try_lock(pegasus_thread_self()); |
|
temp = _listeners.next(0); |
|
while(temp != 0){ |
|
if(s == (Sint32)temp->psock ){ |
|
temp = _listeners.remove_no_lock(temp); |
|
delete temp; |
|
_listeners.unlock(); |
|
return true; |
|
} |
|
temp = _listeners.next(temp); |
|
} |
|
_listeners.unlock(); |
|
} |
|
catch(...){ |
|
} |
|
return false; |
|
} |
|
|
|
|
|
|
PEGASUS_NAMESPACE_END | PEGASUS_NAMESPACE_END |