version 1.32.2.4, 2002/10/30 21:20:05
|
version 1.39.6.1, 2003/08/14 18:31:23
|
|
|
// | // |
//%///////////////////////////////////////////////////////////////////////////// | //%///////////////////////////////////////////////////////////////////////////// |
| |
|
#include <Pegasus/Common/Config.h> |
|
#include <cstring> |
#include "Monitor.h" | #include "Monitor.h" |
|
#include "MessageQueue.h" |
|
#include "Socket.h" |
|
#include <Pegasus/Common/Tracer.h> |
|
#include <Pegasus/Common/HTTPConnection.h> |
| |
#ifdef PEGASUS_OS_TYPE_WINDOWS | #ifdef PEGASUS_OS_TYPE_WINDOWS |
# if defined(FD_SETSIZE) && FD_SETSIZE != 1024 | # if defined(FD_SETSIZE) && FD_SETSIZE != 1024 |
|
|
| |
# endif | # endif |
# define FD_SETSIZE 1024 | # define FD_SETSIZE 1024 |
# include <windows.h> |
g# include <windows.h> |
#else | #else |
# include <sys/types.h> | # include <sys/types.h> |
# include <sys/socket.h> | # include <sys/socket.h> |
|
|
# include <netinet/in.h> | # include <netinet/in.h> |
# include <netdb.h> | # include <netdb.h> |
# include <arpa/inet.h> | # include <arpa/inet.h> |
# include <unistd.h> |
|
#endif | #endif |
| |
PEGASUS_USING_STD; | PEGASUS_USING_STD; |
|
|
| |
| |
static struct timeval create_time = {0, 1}; | static struct timeval create_time = {0, 1}; |
static struct timeval destroy_time = {15, 0}; |
static struct timeval destroy_time = {300, 0}; |
static struct timeval deadlock_time = {0, 0}; | static struct timeval deadlock_time = {0, 0}; |
| |
//////////////////////////////////////////////////////////////////////////////// | //////////////////////////////////////////////////////////////////////////////// |
|
|
//////////////////////////////////////////////////////////////////////////////// | //////////////////////////////////////////////////////////////////////////////// |
| |
Monitor::Monitor() | Monitor::Monitor() |
: _async(false) |
: _module_handle(0), _controller(0), _async(false) |
{ | { |
Socket::initializeInterface(); | Socket::initializeInterface(); |
_rep = 0; | _rep = 0; |
_entries.reserveCapacity(32); | _entries.reserveCapacity(32); |
int i = 0; |
for( int i = 0; i < 32; i++ ) |
for( ; i < 32; i++ ) |
|
{ | { |
_MonitorEntry entry(0, 0, 0); | _MonitorEntry entry(0, 0, 0); |
_entries.append(entry); | _entries.append(entry); |
|
|
} | } |
| |
Monitor::Monitor(Boolean async) | Monitor::Monitor(Boolean async) |
: _async(async) |
: _module_handle(0), _controller(0), _async(async) |
{ | { |
Socket::initializeInterface(); | Socket::initializeInterface(); |
_rep = 0; | _rep = 0; |
_entries.reserveCapacity(32); | _entries.reserveCapacity(32); |
int i = 0; |
for( int i = 0; i < 32; i++ ) |
for( ; i < 32; i++ ) |
|
{ | { |
_MonitorEntry entry(0, 0, 0); | _MonitorEntry entry(0, 0, 0); |
_entries.append(entry); | _entries.append(entry); |
} | } |
|
|
if( _async == true ) | if( _async == true ) |
|
|
{ | { |
_thread_pool = new ThreadPool(0, | _thread_pool = new ThreadPool(0, |
"Monitor", | "Monitor", |
1, |
0, |
0, | 0, |
create_time, | create_time, |
destroy_time, | destroy_time, |
|
|
Tracer::trace(TRC_HTTP, Tracer::LEVEL4, | Tracer::trace(TRC_HTTP, Tracer::LEVEL4, |
"deregistering with module controller"); | "deregistering with module controller"); |
| |
|
if(_module_handle != NULL) |
|
{ |
|
_controller->deregister_module(PEGASUS_MODULENAME_MONITOR); |
|
_controller = 0; |
|
delete _module_handle; |
|
} |
Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep"); | Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep"); |
| |
Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface"); | Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface"); |
|
|
gettimeofday(&now, NULL); | gettimeofday(&now, NULL); |
int dead_threads = 0; | int dead_threads = 0; |
| |
if( now.tv_sec - last.tv_sec > 300 ) |
if( now.tv_sec - last.tv_sec > 120 ) |
{ | { |
PEGASUS_STD(cout) << "Monitor Thread Pool currently has " << |
|
_thread_pool->running_count() + |
|
_thread_pool->pool_count() << " Threads." << PEGASUS_STD(endl); |
|
gettimeofday(&last, NULL); | gettimeofday(&last, NULL); |
try | try |
{ | { |
|
|
| |
Boolean handled_events = false; | Boolean handled_events = false; |
int i = 0; | int i = 0; |
|
#if defined(PEGASUS_OS_OS400) || defined(PEGASUS_OS_HPUX) |
|
struct timeval tv = {milliseconds/1000, milliseconds%1000*1000}; |
|
#else |
struct timeval tv = {0,1}; | struct timeval tv = {0,1}; |
|
#endif |
fd_set fdread; | fd_set fdread; |
FD_ZERO(&fdread); | FD_ZERO(&fdread); |
_entry_mut.lock(pegasus_thread_self()); | _entry_mut.lock(pegasus_thread_self()); |
|
|
MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId); | MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId); |
if(q == 0) | if(q == 0) |
{ | { |
PEGASUS_STD(cout) << "Monitor:: found an empty connection slot" << PEGASUS_STD(endl); |
|
try | try |
{ | { |
_entries[indx]._status = _MonitorEntry::EMPTY; | _entries[indx]._status = _MonitorEntry::EMPTY; |
} | } |
catch(...) | catch(...) |
{ | { |
|
|
} | } |
_entry_mut.unlock(); |
continue; |
return true; |
|
} | } |
try | try |
{ | { |
|
|
Message *msg = new SocketMessage(_entries[indx].socket, events); | Message *msg = new SocketMessage(_entries[indx].socket, events); |
_entries[indx]._status = _MonitorEntry::BUSY; | _entries[indx]._status = _MonitorEntry::BUSY; |
_entry_mut.unlock(); | _entry_mut.unlock(); |
|
|
q->enqueue(msg); | q->enqueue(msg); |
_entries[indx]._status = _MonitorEntry::IDLE; | _entries[indx]._status = _MonitorEntry::IDLE; |
return true; | return true; |
|
|
} | } |
| |
| |
|
|
int Monitor::solicitSocketMessages( | int Monitor::solicitSocketMessages( |
Sint32 socket, | Sint32 socket, |
Uint32 events, | Uint32 events, |
|
|
int type) | int type) |
{ | { |
| |
PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solictSocketMessage"); |
PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages"); |
| |
int index = -1; | int index = -1; |
_entry_mut.lock(pegasus_thread_self()); | _entry_mut.lock(pegasus_thread_self()); |
|
|
break; | break; |
} | } |
} | } |
|
|
_entry_mut.unlock(); | _entry_mut.unlock(); |
|
|
PEG_METHOD_EXIT(); | PEG_METHOD_EXIT(); |
if( _async == true ) |
|
PEGASUS_STD(cout) << "Monitor:: running " << _thread_pool->running_count() << |
|
" idle " << _thread_pool->pool_count() << PEGASUS_STD(endl); |
|
|
|
} | } |
| |
|
|
PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm) | PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm) |
{ | { |
HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm); | HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm); |
|
|
} | } |
| |
| |
|
|
PEGASUS_NAMESPACE_END | PEGASUS_NAMESPACE_END |