1 mike 1.2 //%/////////////////////////////////////////////////////////////////////////////
2 //
|
3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
|
5 mike 1.2 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
7 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
10 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
|
13 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
14 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
16 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
19 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 //==============================================================================
23 //
24 // Author: Mike Brasher (mbrasher@bmc.com)
25 //
26 // Modified By:
27 //
28 //%/////////////////////////////////////////////////////////////////////////////
29
30 #include <Pegasus/Common/Config.h>
31 #include <cstring>
32 #include "Monitor.h"
33 #include "MessageQueue.h"
34 #include "Socket.h"
|
35 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
36 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
37 mike 1.2
38 #ifdef PEGASUS_OS_TYPE_WINDOWS
39 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
40 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
41 of <winsock.h>. It may have been indirectly included (e.g., by including \
42 <windows.h>). Find the inclusion of that header which is visible to this \
43 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
44 otherwise, less than 64 clients (the default) will be able to connect to the \
45 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
46 mday 1.5
|
47 mike 1.2 # endif
48 # define FD_SETSIZE 1024
|
49 mday 1.5 # include <windows.h>
|
50 mike 1.2 #else
51 # include <sys/types.h>
52 # include <sys/socket.h>
53 # include <sys/time.h>
54 # include <netinet/in.h>
55 # include <netdb.h>
56 # include <arpa/inet.h>
57 # include <unistd.h>
58 #endif
59
60 PEGASUS_USING_STD;
61
62 PEGASUS_NAMESPACE_BEGIN
63
|
64 mday 1.18
|
65 mday 1.19 static struct timeval create_time = {0, 10};
66 static struct timeval destroy_time = {5, 0};
|
67 mday 1.20 static struct timeval deadlock_time = {1000, 0};
|
68 mday 1.18
|
69 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
70 //
71 // MonitorRep
72 //
73 ////////////////////////////////////////////////////////////////////////////////
74
75 struct MonitorRep
76 {
77 fd_set rd_fd_set;
78 fd_set wr_fd_set;
79 fd_set ex_fd_set;
80 fd_set active_rd_fd_set;
81 fd_set active_wr_fd_set;
82 fd_set active_ex_fd_set;
83 };
84
85 ////////////////////////////////////////////////////////////////////////////////
86 //
87 // Monitor
88 //
89 ////////////////////////////////////////////////////////////////////////////////
90 mike 1.2
91 Monitor::Monitor()
|
92 mday 1.7 : _module_handle(0), _controller(0), _async(false)
|
93 mike 1.2 {
94 Socket::initializeInterface();
95 _rep = new MonitorRep;
96 FD_ZERO(&_rep->rd_fd_set);
97 FD_ZERO(&_rep->wr_fd_set);
98 FD_ZERO(&_rep->ex_fd_set);
99 FD_ZERO(&_rep->active_rd_fd_set);
100 FD_ZERO(&_rep->active_wr_fd_set);
101 FD_ZERO(&_rep->active_ex_fd_set);
102 }
103
|
104 mday 1.18 Monitor::Monitor(Boolean async)
105 : _module_handle(0), _controller(0), _async(async)
106 {
107 Socket::initializeInterface();
108 _rep = new MonitorRep;
109 FD_ZERO(&_rep->rd_fd_set);
110 FD_ZERO(&_rep->wr_fd_set);
111 FD_ZERO(&_rep->ex_fd_set);
112 FD_ZERO(&_rep->active_rd_fd_set);
113 FD_ZERO(&_rep->active_wr_fd_set);
114 FD_ZERO(&_rep->active_ex_fd_set);
|
115 mday 1.19 if( _async == true )
116 {
117 _thread_pool = new ThreadPool(0,
118 "Monitor",
119 0,
120 20,
121 create_time,
122 destroy_time,
123 deadlock_time);
124 }
|
125 mday 1.20 else
126 _thread_pool = 0;
|
127 mday 1.18 }
|
128 mday 1.20
|
129 mike 1.2 Monitor::~Monitor()
130 {
|
131 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
132 "deregistering with module controller");
|
133 kumpf 1.10
|
134 kumpf 1.11 if(_module_handle != NULL)
|
135 mday 1.8 {
136 _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
137 _controller = 0;
|
138 kumpf 1.10 delete _module_handle;
|
139 mday 1.8 }
|
140 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
|
141 mday 1.8
|
142 kumpf 1.6 delete _rep;
|
143 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
144 mike 1.2 Socket::uninitializeInterface();
|
145 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
146 "returning from monitor destructor");
|
147 mday 1.21 if(_async == true)
|
148 mday 1.20 delete _thread_pool;
|
149 mike 1.2 }
150
|
151 mday 1.7
|
152 mday 1.18 int Monitor::kill_idle_threads()
153 {
154 static struct timeval now, last;
155 gettimeofday(&now, NULL);
|
156 mday 1.20 int dead_threads = 0;
|
157 mday 1.18
158 if( now.tv_sec - last.tv_sec > 0 )
159 {
160 gettimeofday(&last, NULL);
|
161 mday 1.20 try
162 {
163
164 dead_threads = _thread_pool->kill_dead_threads();
165 }
166 catch(IPCException& )
167 {
168 }
169
|
170 mday 1.18 }
|
171 mday 1.20 return dead_threads;
|
172 mday 1.18 }
173
|
174 mday 1.7
175 //<<< Tue May 14 20:38:26 2002 mdd >>>
176 // register with module controller
177 // when it is time to enqueue the message,
178 // use an async_thread_exec call to
179 // isolate the entire if(events) { enqueue -> fd_clear } block
180 // let the thread pool grow and shrink according to load.
181
|
182 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
183 {
|
184 mday 1.18
|
185 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
186
187 // Windows select() has a strange little bug. It returns immediately if
188 // there are no descriptors in the set even if the timeout is non-zero.
189 // To work around this, we call Sleep() for now:
190
191 if (_entries.size() == 0)
192 Sleep(milliseconds);
|
193 mday 1.18
|
194 mike 1.2 #endif
|
195 mday 1.18
|
196 mike 1.2 // Check for events on the selected file descriptors. Only do this if
197 // there were no undispatched events from last time.
198
|
199 mday 1.8 int count = 0;
|
200 mday 1.13
201 memcpy(&_rep->active_rd_fd_set, &_rep->rd_fd_set, sizeof(fd_set));
|
202 mday 1.16 // memcpy(&_rep->active_wr_fd_set, &_rep->wr_fd_set, sizeof(fd_set));
|
203 mday 1.13 memcpy(&_rep->active_ex_fd_set, &_rep->ex_fd_set, sizeof(fd_set));
204
205 const Uint32 SECONDS = milliseconds / 1000;
206 const Uint32 MICROSECONDS = (milliseconds % 1000) * 1000;
207 struct timeval tv = { SECONDS, MICROSECONDS };
208
209 count = select(
210 FD_SETSIZE,
211 &_rep->active_rd_fd_set,
|
212 mday 1.16 // &_rep->active_wr_fd_set,
213 NULL,
|
214 mday 1.13 &_rep->active_ex_fd_set,
215 &tv);
|
216 mday 1.18 if(count == 0)
|
217 mike 1.2 {
|
218 mday 1.13 return false;
219 }
|
220 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
221 mday 1.13 else if (count == SOCKET_ERROR)
|
222 mike 1.2 #else
|
223 mday 1.13 else if (count == -1)
|
224 mike 1.2 #endif
|
225 mday 1.13 {
226 return false;
|
227 mike 1.2 }
|
228 mday 1.13
|
229 mday 1.24
|
230 mday 1.13 Boolean handled_events = false;
|
231 mday 1.24 try { _connection_mutex.try_lock(pegasus_thread_self()); }
232 catch(AlreadyLocked){
233 pegasus_sleep(1);
234 return false;
235 }
236
|
237 mday 1.13 for (Uint32 i = 0, n = _entries.size(); i < _entries.size(); i++)
|
238 mike 1.2 {
239 Sint32 socket = _entries[i].socket;
240 Uint32 events = 0;
241
|
242 mday 1.8 if(_entries[i].dying.value() > 0 )
243 {
244 if(_entries[i]._type == Monitor::CONNECTION)
245 {
246
247 MessageQueue *q = MessageQueue::lookup(_entries[i].queueId);
|
248 mday 1.12 if(q && static_cast<HTTPConnection *>(q)->is_dying() &&
249 (0 == static_cast<HTTPConnection *>(q)->refcount.value()))
|
250 mday 1.8 {
251 static_cast<HTTPConnection *>(q)->lock_connection();
252 static_cast<HTTPConnection *>(q)->unlock_connection();
253
254 MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
255 Message* message= new CloseConnectionMessage(static_cast<HTTPConnection *>(q)->getSocket());
256 message->dest = o.getQueueId();
|
257 mday 1.24 _connection_mutex.unlock();
258
|
259 mday 1.8 o.enqueue(message);
|
260 mday 1.24 return true;
|
261 kumpf 1.11 i--;
|
262 mday 1.8 n = _entries.size();
263 }
264 }
265 }
266
|
267 mike 1.2 if (FD_ISSET(socket, &_rep->active_rd_fd_set))
268 events |= SocketMessage::READ;
269
270 if (FD_ISSET(socket, &_rep->active_ex_fd_set))
271 events |= SocketMessage::EXCEPTION;
|
272 mday 1.7
|
273 mike 1.2 if (events)
274 {
|
275 kumpf 1.4 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
276 mday 1.16 "Monitor::run - Socket Event Detected events = %d", events);
277 if (events & SocketMessage::READ)
|
278 mday 1.8 {
|
279 mday 1.16 FD_CLR(socket, &_rep->active_rd_fd_set);
|
280 mday 1.8 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
281 mday 1.16 "Monitor::run FD_CLR READ");
|
282 mday 1.8 }
|
283 mday 1.16 else if (events & SocketMessage::EXCEPTION)
|
284 mday 1.8 {
285 FD_CLR(socket, &_rep->active_ex_fd_set);
286 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
287 "Monitor::run FD_CLR EXECEPTION");
288 }
|
289 mike 1.2 MessageQueue* queue = MessageQueue::lookup(_entries[i].queueId);
|
290 mday 1.8 if( ! queue )
291 {
|
292 mday 1.22 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
293 "Monitor::run lookup for connection entry failed, unsoliciting");
|
294 mday 1.24 _connection_mutex.unlock();
|
295 mday 1.8 unsolicitSocketMessages(socket);
|
296 mday 1.24 return true;
|
297 mday 1.8 }
|
298 mday 1.7
|
299 mday 1.18 if(_async == true && _entries[i]._type == Monitor::CONNECTION)
|
300 mday 1.7 {
|
301 mday 1.23
|
302 mday 1.13 if( static_cast<HTTPConnection *>(queue)->refcount.value() == 0 )
|
303 mday 1.12 {
|
304 mday 1.22 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
305 "Monitor::run dispatching thread to idle connection");
|
306 mday 1.12 static_cast<HTTPConnection *>(queue)->refcount++;
307 if( false == static_cast<HTTPConnection *>(queue)->is_dying())
|
308 mday 1.19 _thread_pool->allocate_and_awaken((void *)queue, _dispatch);
|
309 mday 1.12 else
310 static_cast<HTTPConnection *>(queue)->refcount--;
311 }
|
312 mday 1.22 else
|
313 mday 1.23 pegasus_sleep(1);
|
314 mike 1.2 }
|
315 mday 1.7 else
|
316 mike 1.2 {
|
317 mday 1.24 _connection_mutex.unlock();
318
|
319 mday 1.22 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
320 "Monitor::run enqueueing to non-connection HTTP class");
|
321 mday 1.8 Message* message = new SocketMessage(socket, events);
322 queue->enqueue(message);
|
323 mday 1.24 return true;
324
|
325 mike 1.2 }
|
326 mday 1.12 count--;
|
327 mday 1.14 pegasus_yield();
|
328 mike 1.2 }
|
329 mday 1.13 handled_events = true;
|
330 mike 1.2 }
|
331 mday 1.24 _connection_mutex.unlock();
|
332 mday 1.13 return(handled_events);
|
333 mike 1.2 }
334
335 Boolean Monitor::solicitSocketMessages(
336 Sint32 socket,
337 Uint32 events,
|
338 mday 1.8 Uint32 queueId,
339 int type)
|
340 mike 1.2 {
|
341 kumpf 1.4 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solictSocketMessage");
342
|
343 mike 1.2 // See whether a handler is already registered for this one:
344 Uint32 pos = _findEntry(socket);
345
346 if (pos != PEGASUS_NOT_FOUND)
|
347 kumpf 1.4 {
348 PEG_METHOD_EXIT();
|
349 mike 1.2 return false;
|
350 kumpf 1.4 }
|
351 mike 1.2
352 // Set the events:
353
354 if (events & SocketMessage::READ)
355 FD_SET(socket, &_rep->rd_fd_set);
356
357 if (events & SocketMessage::WRITE)
358 FD_SET(socket, &_rep->wr_fd_set);
359
360 if (events & SocketMessage::EXCEPTION)
361 FD_SET(socket, &_rep->ex_fd_set);
362
363 // Add the entry to the list:
|
364 mday 1.8 _MonitorEntry entry(socket, queueId, type);
|
365 mike 1.2 _entries.append(entry);
|
366 mday 1.8
|
367 mike 1.2 // Success!
|
368 mday 1.12
|
369 kumpf 1.4 PEG_METHOD_EXIT();
|
370 mike 1.2 return true;
371 }
372
373 Boolean Monitor::unsolicitSocketMessages(Sint32 socket)
374 {
|
375 kumpf 1.4 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessage");
376
|
377 mike 1.2 // Look for the given entry and remove it:
378
|
379 mday 1.24 _connection_mutex.lock(pegasus_thread_self());
380
|
381 mike 1.2 for (Uint32 i = 0, n = _entries.size(); i < n; i++)
382 {
383 if (_entries[i].socket == socket)
384 {
385 Sint32 socket = _entries[i].socket;
386 FD_CLR(socket, &_rep->rd_fd_set);
387 FD_CLR(socket, &_rep->wr_fd_set);
388 FD_CLR(socket, &_rep->ex_fd_set);
389 _entries.remove(i);
|
390 kumpf 1.11 // ATTN-RK-P3-20020521: Need "Socket::close(socket);" here?
|
391 mday 1.18 Socket::close(socket);
|
392 kumpf 1.4 PEG_METHOD_EXIT();
|
393 mday 1.24 _connection_mutex.unlock();
|
394 mike 1.2 return true;
395 }
396 }
|
397 kumpf 1.4 PEG_METHOD_EXIT();
|
398 mday 1.24 _connection_mutex.unlock();
399
|
400 mike 1.2 return false;
401 }
402
|
403 mday 1.8 Uint32 Monitor::_findEntry(Sint32 socket)
|
404 mike 1.2 {
|
405 mday 1.24 _connection_mutex.lock(pegasus_thread_self());
406
|
407 mday 1.7 for (Uint32 i = 0, n = _entries.size(); i < n; i++)
|
408 mike 1.2 {
409 if (_entries[i].socket == socket)
|
410 mday 1.24 {
411 _connection_mutex.unlock();
|
412 mike 1.2 return i;
|
413 mday 1.24 }
|
414 mike 1.2 }
|
415 mday 1.24 _connection_mutex.unlock();
|
416 mike 1.2 return PEG_NOT_FOUND;
|
417 mday 1.7 }
418
419
420 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
421 {
|
422 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
423 if( true == dst->is_dying())
|
424 kumpf 1.11 {
425 dst->refcount--;
|
426 mday 1.8 return 0;
|
427 kumpf 1.11 }
|
428 mday 1.8 if( false == dst->is_dying())
|
429 mday 1.12 {
|
430 mday 1.14 if(false == dst->run(1))
431 pegasus_sleep(1);
432
|
433 mday 1.12 }
|
434 kumpf 1.11 dst->refcount--;
|
435 mday 1.8 return 0;
|
436 mday 1.7 }
437
|
438 mike 1.2
439 PEGASUS_NAMESPACE_END
|