1 mike 1.2 //%/////////////////////////////////////////////////////////////////////////////
2 //
|
3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
|
5 mike 1.2 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
7 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
10 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
|
13 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
14 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
16 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
19 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 //==============================================================================
23 //
24 // Author: Mike Brasher (mbrasher@bmc.com)
25 //
26 // Modified By:
27 //
28 //%/////////////////////////////////////////////////////////////////////////////
29
30 #include <Pegasus/Common/Config.h>
31 #include <cstring>
32 #include "Monitor.h"
33 #include "MessageQueue.h"
34 #include "Socket.h"
|
35 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
36 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
37 mike 1.2
38 #ifdef PEGASUS_OS_TYPE_WINDOWS
39 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
40 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
41 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
42 mday 1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
|
43 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
44 otherwise, less than 64 clients (the default) will be able to connect to the \
45 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
46 mday 1.5
|
47 mike 1.2 # endif
48 # define FD_SETSIZE 1024
|
49 mday 1.39.6.1 g# include <windows.h>
|
50 mike 1.2 #else
51 # include <sys/types.h>
52 # include <sys/socket.h>
53 # include <sys/time.h>
54 # include <netinet/in.h>
55 # include <netdb.h>
56 # include <arpa/inet.h>
57 #endif
58
59 PEGASUS_USING_STD;
60
61 PEGASUS_NAMESPACE_BEGIN
62
|
63 mday 1.18
|
64 mday 1.25 static AtomicInt _connections = 0;
65
66
67 static struct timeval create_time = {0, 1};
|
68 mday 1.38 static struct timeval destroy_time = {300, 0};
|
69 mday 1.26 static struct timeval deadlock_time = {0, 0};
|
70 mday 1.18
|
71 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
72 //
73 // MonitorRep
74 //
75 ////////////////////////////////////////////////////////////////////////////////
76
77 struct MonitorRep
78 {
79 fd_set rd_fd_set;
80 fd_set wr_fd_set;
81 fd_set ex_fd_set;
82 fd_set active_rd_fd_set;
83 fd_set active_wr_fd_set;
84 fd_set active_ex_fd_set;
85 };
86
87 ////////////////////////////////////////////////////////////////////////////////
88 //
89 // Monitor
90 //
91 ////////////////////////////////////////////////////////////////////////////////
92 mike 1.2
93 Monitor::Monitor()
|
94 mday 1.7 : _module_handle(0), _controller(0), _async(false)
|
95 mike 1.2 {
96 Socket::initializeInterface();
|
97 mday 1.25 _rep = 0;
|
98 mday 1.37 _entries.reserveCapacity(32);
99 for( int i = 0; i < 32; i++ )
100 {
101 _MonitorEntry entry(0, 0, 0);
102 _entries.append(entry);
103 }
|
104 mike 1.2 }
105
|
106 mday 1.18 Monitor::Monitor(Boolean async)
107 : _module_handle(0), _controller(0), _async(async)
108 {
109 Socket::initializeInterface();
|
110 mday 1.25 _rep = 0;
|
111 mday 1.37 _entries.reserveCapacity(32);
112 for( int i = 0; i < 32; i++ )
113 {
114 _MonitorEntry entry(0, 0, 0);
115 _entries.append(entry);
116 }
|
117 mday 1.19 if( _async == true )
118 {
119 _thread_pool = new ThreadPool(0,
120 "Monitor",
|
121 mday 1.38 0,
|
122 mday 1.25 0,
|
123 mday 1.19 create_time,
124 destroy_time,
125 deadlock_time);
126 }
|
127 mday 1.20 else
128 _thread_pool = 0;
|
129 mday 1.18 }
|
130 mday 1.20
|
131 mike 1.2 Monitor::~Monitor()
132 {
|
133 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
134 "deregistering with module controller");
|
135 kumpf 1.10
|
136 kumpf 1.11 if(_module_handle != NULL)
|
137 mday 1.8 {
138 _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
139 _controller = 0;
|
140 kumpf 1.10 delete _module_handle;
|
141 mday 1.8 }
|
142 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
|
143 mday 1.8
|
144 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
145 mike 1.2 Socket::uninitializeInterface();
|
146 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
147 "returning from monitor destructor");
|
148 mday 1.21 if(_async == true)
|
149 mday 1.20 delete _thread_pool;
|
150 mike 1.2 }
151
|
152 mday 1.7
|
153 mday 1.18 int Monitor::kill_idle_threads()
154 {
155 static struct timeval now, last;
156 gettimeofday(&now, NULL);
|
157 mday 1.20 int dead_threads = 0;
|
158 mday 1.18
|
159 mday 1.38 if( now.tv_sec - last.tv_sec > 120 )
|
160 mday 1.18 {
161 gettimeofday(&last, NULL);
|
162 mday 1.20 try
163 {
164 dead_threads = _thread_pool->kill_dead_threads();
165 }
166 catch(IPCException& )
167 {
168 }
169
|
170 mday 1.18 }
|
171 mday 1.20 return dead_threads;
|
172 mday 1.18 }
173
|
174 mday 1.7
|
175 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
176 {
|
177 mday 1.18
|
178 mday 1.25 Boolean handled_events = false;
179 int i = 0;
|
180 mday 1.37 #if defined(PEGASUS_OS_OS400) || defined(PEGASUS_OS_HPUX)
|
181 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
182 #else
|
183 kumpf 1.35 struct timeval tv = {0, 1};
184 #endif
|
185 mday 1.25 fd_set fdread;
186 FD_ZERO(&fdread);
|
187 mday 1.37 _entry_mut.lock(pegasus_thread_self());
|
188 mday 1.13
|
189 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
|
190 mike 1.2 {
|
191 mday 1.37 if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
|
192 mday 1.25 {
193 FD_SET(_entries[indx].socket, &fdread);
194 }
|
195 mday 1.13 }
|
196 mday 1.37
|
197 mday 1.25
198 int events = select(FD_SETSIZE, &fdread, NULL, NULL, &tv);
199
|
200 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
201 mday 1.25 if(events && events != SOCKET_ERROR )
|
202 mike 1.2 #else
|
203 mday 1.25 if(events && events != -1 )
|
204 mike 1.2 #endif
|
205 mday 1.13 {
|
206 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
207 {
208 if(FD_ISSET(_entries[indx].socket, &fdread))
209 {
210 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
211 if(q == 0)
212 {
|
213 mday 1.37 try
214 {
215 _entries[indx]._status = _MonitorEntry::EMPTY;
216 }
217 catch(...)
218 {
219
220 }
221 continue;
|
222 mday 1.25 }
|
223 mday 1.37 try
|
224 mday 1.25 {
|
225 mday 1.37 if(_entries[indx]._type == Monitor::CONNECTION)
226 {
227 static_cast<HTTPConnection *>(q)->_entry_index = indx;
228 if(static_cast<HTTPConnection *>(q)->_dying.value() > 0 )
229 {
230 _entries[indx]._status = _MonitorEntry::DYING;
231 MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
232 Message* message= new CloseConnectionMessage(_entries[indx].socket);
233 message->dest = o.getQueueId();
234 _entry_mut.unlock();
235 o.enqueue(message);
236 return true;
237 }
238 _entries[indx]._status = _MonitorEntry::BUSY;
239 _thread_pool->allocate_and_awaken((void *)q, _dispatch);
240 }
241 else
|
242 mday 1.25 {
|
243 mday 1.37 int events = 0;
244 events |= SocketMessage::READ;
245 Message *msg = new SocketMessage(_entries[indx].socket, events);
246 _entries[indx]._status = _MonitorEntry::BUSY;
247 _entry_mut.unlock();
|
248 mday 1.27
|
249 mday 1.37 q->enqueue(msg);
250 _entries[indx]._status = _MonitorEntry::IDLE;
|
251 mday 1.25 return true;
252 }
253 }
|
254 mday 1.37 catch(...)
|
255 mday 1.25 {
256 }
257 handled_events = true;
258 }
259 }
|
260 mday 1.24 }
|
261 mday 1.37 _entry_mut.unlock();
|
262 mday 1.13 return(handled_events);
|
263 mike 1.2 }
264
|
265 mday 1.25
|
266 mday 1.37
|
267 mday 1.25 int Monitor::solicitSocketMessages(
|
268 mike 1.2 Sint32 socket,
269 Uint32 events,
|
270 mday 1.8 Uint32 queueId,
271 int type)
|
272 mike 1.2 {
|
273 kumpf 1.4
|
274 kumpf 1.31 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
275 mike 1.2
|
276 mday 1.37 int index = -1;
277 _entry_mut.lock(pegasus_thread_self());
|
278 mday 1.25
279 for(index = 0; index < (int)_entries.size(); index++)
280 {
|
281 mday 1.37 try
282 {
283 if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
284 {
285 _entries[index].socket = socket;
286 _entries[index].queueId = queueId;
287 _entries[index]._type = type;
288 _entries[index]._status = _MonitorEntry::IDLE;
289 _entry_mut.unlock();
290
291 return index;
292 }
293 }
294 catch(...)
|
295 mday 1.25 {
296 }
|
297 mday 1.37
|
298 mday 1.25 }
|
299 mday 1.37 _entry_mut.unlock();
|
300 mday 1.25 PEG_METHOD_EXIT();
301 return index;
|
302 mike 1.2 }
303
|
304 mday 1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
|
305 mike 1.2 {
|
306 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
307 mday 1.37 _entry_mut.lock(pegasus_thread_self());
|
308 mday 1.27
|
309 mday 1.25 for(int index = 0; index < (int)_entries.size(); index++)
|
310 mike 1.2 {
|
311 mday 1.25 if(_entries[index].socket == socket)
312 {
313 _entries[index]._status = _MonitorEntry::EMPTY;
|
314 mday 1.37 break;
|
315 mday 1.25 }
|
316 mike 1.2 }
|
317 mday 1.37 _entry_mut.unlock();
|
318 kumpf 1.4 PEG_METHOD_EXIT();
|
319 mike 1.2 }
320
|
321 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
322 {
|
323 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
324 mday 1.37
|
325 mday 1.25 dst->run(1);
|
326 mday 1.37 if( dst->_monitor->_entries.size() > (Uint32)dst->_entry_index )
327 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
328
|
329 mday 1.8 return 0;
|
330 mday 1.7 }
|
331 mday 1.37
|
332 mday 1.7
|
333 mike 1.2
334 PEGASUS_NAMESPACE_END
|