1 mike 1.2 //%/////////////////////////////////////////////////////////////////////////////
2 //
|
3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
|
5 mike 1.2 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
7 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
10 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
|
13 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
14 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
16 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
19 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 //==============================================================================
23 //
24 // Author: Mike Brasher (mbrasher@bmc.com)
25 //
26 // Modified By:
27 //
28 //%/////////////////////////////////////////////////////////////////////////////
29
30 #include <Pegasus/Common/Config.h>
|
31 mday 1.40
|
32 mike 1.2 #include <cstring>
33 #include "Monitor.h"
34 #include "MessageQueue.h"
35 #include "Socket.h"
|
36 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
37 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
38 mike 1.2
39 #ifdef PEGASUS_OS_TYPE_WINDOWS
40 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
41 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
42 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
43 mday 1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
|
44 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
45 otherwise, less than 64 clients (the default) will be able to connect to the \
46 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
47 mday 1.5
|
48 mike 1.2 # endif
49 # define FD_SETSIZE 1024
|
50 mday 1.5 # include <windows.h>
|
51 mike 1.2 #else
52 # include <sys/types.h>
53 # include <sys/socket.h>
54 # include <sys/time.h>
55 # include <netinet/in.h>
56 # include <netdb.h>
57 # include <arpa/inet.h>
58 #endif
59
60 PEGASUS_USING_STD;
61
62 PEGASUS_NAMESPACE_BEGIN
63
|
64 mday 1.18
|
65 mday 1.25 static AtomicInt _connections = 0;
66
67
68 static struct timeval create_time = {0, 1};
|
69 mday 1.38 static struct timeval destroy_time = {300, 0};
|
70 mday 1.26 static struct timeval deadlock_time = {0, 0};
|
71 mday 1.18
|
72 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
73 //
74 // MonitorRep
75 //
76 ////////////////////////////////////////////////////////////////////////////////
77
78 struct MonitorRep
79 {
80 fd_set rd_fd_set;
81 fd_set wr_fd_set;
82 fd_set ex_fd_set;
83 fd_set active_rd_fd_set;
84 fd_set active_wr_fd_set;
85 fd_set active_ex_fd_set;
86 };
87
88 ////////////////////////////////////////////////////////////////////////////////
89 //
90 // Monitor
91 //
92 ////////////////////////////////////////////////////////////////////////////////
93 mike 1.2
94 Monitor::Monitor()
|
95 mday 1.7 : _module_handle(0), _controller(0), _async(false)
|
96 mike 1.2 {
97 Socket::initializeInterface();
|
98 mday 1.25 _rep = 0;
|
99 mday 1.37 _entries.reserveCapacity(32);
100 for( int i = 0; i < 32; i++ )
101 {
102 _MonitorEntry entry(0, 0, 0);
103 _entries.append(entry);
104 }
|
105 mike 1.2 }
106
|
107 mday 1.18 Monitor::Monitor(Boolean async)
108 : _module_handle(0), _controller(0), _async(async)
109 {
110 Socket::initializeInterface();
|
111 mday 1.25 _rep = 0;
|
112 mday 1.37 _entries.reserveCapacity(32);
113 for( int i = 0; i < 32; i++ )
114 {
115 _MonitorEntry entry(0, 0, 0);
116 _entries.append(entry);
117 }
|
118 mday 1.19 if( _async == true )
119 {
120 _thread_pool = new ThreadPool(0,
121 "Monitor",
|
122 mday 1.38 0,
|
123 mday 1.25 0,
|
124 mday 1.19 create_time,
125 destroy_time,
126 deadlock_time);
127 }
|
128 mday 1.20 else
129 _thread_pool = 0;
|
130 mday 1.18 }
|
131 mday 1.20
|
132 mike 1.2 Monitor::~Monitor()
133 {
|
134 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
135 "deregistering with module controller");
|
136 kumpf 1.10
|
137 kumpf 1.11 if(_module_handle != NULL)
|
138 mday 1.8 {
139 _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
140 _controller = 0;
|
141 kumpf 1.10 delete _module_handle;
|
142 mday 1.8 }
|
143 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
|
144 mday 1.8
|
145 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
146 mike 1.2 Socket::uninitializeInterface();
|
147 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
148 "returning from monitor destructor");
|
149 mday 1.21 if(_async == true)
|
150 mday 1.20 delete _thread_pool;
|
151 mike 1.2 }
152
|
153 mday 1.7
|
154 mday 1.18 int Monitor::kill_idle_threads()
155 {
156 static struct timeval now, last;
157 gettimeofday(&now, NULL);
|
158 mday 1.20 int dead_threads = 0;
|
159 mday 1.18
|
160 mday 1.38 if( now.tv_sec - last.tv_sec > 120 )
|
161 mday 1.18 {
162 gettimeofday(&last, NULL);
|
163 mday 1.20 try
164 {
165 dead_threads = _thread_pool->kill_dead_threads();
166 }
167 catch(IPCException& )
168 {
169 }
170
|
171 mday 1.18 }
|
172 mday 1.20 return dead_threads;
|
173 mday 1.18 }
174
|
175 mday 1.7
|
176 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
177 {
|
178 mday 1.18
|
179 mday 1.25 Boolean handled_events = false;
|
180 mday 1.40 int i = 0;
|
181 mday 1.37 #if defined(PEGASUS_OS_OS400) || defined(PEGASUS_OS_HPUX)
|
182 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
183 #else
|
184 kumpf 1.35 struct timeval tv = {0, 1};
185 #endif
|
186 mday 1.25 fd_set fdread;
187 FD_ZERO(&fdread);
|
188 mday 1.37 _entry_mut.lock(pegasus_thread_self());
|
189 mday 1.13
|
190 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
|
191 mike 1.2 {
|
192 mday 1.37 if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
|
193 mday 1.25 {
194 FD_SET(_entries[indx].socket, &fdread);
195 }
|
196 mday 1.13 }
|
197 mday 1.37
|
198 mday 1.25
199 int events = select(FD_SETSIZE, &fdread, NULL, NULL, &tv);
200
|
201 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
202 mday 1.25 if(events && events != SOCKET_ERROR )
|
203 mike 1.2 #else
|
204 mday 1.25 if(events && events != -1 )
|
205 mike 1.2 #endif
|
206 mday 1.13 {
|
207 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
208 {
209 if(FD_ISSET(_entries[indx].socket, &fdread))
210 {
211 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
212 if(q == 0)
213 {
|
214 mday 1.37 try
215 {
216 _entries[indx]._status = _MonitorEntry::EMPTY;
217 }
218 catch(...)
219 {
220
221 }
222 continue;
|
223 mday 1.25 }
|
224 mday 1.37 try
|
225 mday 1.25 {
|
226 mday 1.37 if(_entries[indx]._type == Monitor::CONNECTION)
227 {
228 static_cast<HTTPConnection *>(q)->_entry_index = indx;
229 if(static_cast<HTTPConnection *>(q)->_dying.value() > 0 )
230 {
231 _entries[indx]._status = _MonitorEntry::DYING;
232 MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
233 Message* message= new CloseConnectionMessage(_entries[indx].socket);
234 message->dest = o.getQueueId();
235 _entry_mut.unlock();
236 o.enqueue(message);
237 return true;
238 }
239 _entries[indx]._status = _MonitorEntry::BUSY;
240 _thread_pool->allocate_and_awaken((void *)q, _dispatch);
241 }
242 else
|
243 mday 1.25 {
|
244 mday 1.37 int events = 0;
245 events |= SocketMessage::READ;
246 Message *msg = new SocketMessage(_entries[indx].socket, events);
247 _entries[indx]._status = _MonitorEntry::BUSY;
248 _entry_mut.unlock();
|
249 mday 1.27
|
250 mday 1.37 q->enqueue(msg);
251 _entries[indx]._status = _MonitorEntry::IDLE;
|
252 mday 1.25 return true;
253 }
254 }
|
255 mday 1.37 catch(...)
|
256 mday 1.25 {
257 }
258 handled_events = true;
259 }
260 }
|
261 mday 1.24 }
|
262 mday 1.37 _entry_mut.unlock();
|
263 mday 1.13 return(handled_events);
|
264 mike 1.2 }
265
|
266 mday 1.25
|
267 mday 1.37
|
268 mday 1.25 int Monitor::solicitSocketMessages(
|
269 mike 1.2 Sint32 socket,
270 Uint32 events,
|
271 mday 1.8 Uint32 queueId,
272 int type)
|
273 mike 1.2 {
|
274 kumpf 1.4
|
275 kumpf 1.31 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
276 mike 1.2
|
277 mday 1.37 int index = -1;
278 _entry_mut.lock(pegasus_thread_self());
|
279 mday 1.25
280 for(index = 0; index < (int)_entries.size(); index++)
281 {
|
282 mday 1.37 try
283 {
284 if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
285 {
286 _entries[index].socket = socket;
287 _entries[index].queueId = queueId;
288 _entries[index]._type = type;
289 _entries[index]._status = _MonitorEntry::IDLE;
290 _entry_mut.unlock();
291
292 return index;
293 }
294 }
295 catch(...)
|
296 mday 1.25 {
297 }
|
298 mday 1.37
|
299 mday 1.25 }
|
300 mday 1.37 _entry_mut.unlock();
|
301 mday 1.25 PEG_METHOD_EXIT();
302 return index;
|
303 mike 1.2 }
304
|
305 mday 1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
|
306 mike 1.2 {
|
307 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
308 mday 1.37 _entry_mut.lock(pegasus_thread_self());
|
309 mday 1.27
|
310 mday 1.25 for(int index = 0; index < (int)_entries.size(); index++)
|
311 mike 1.2 {
|
312 mday 1.25 if(_entries[index].socket == socket)
313 {
314 _entries[index]._status = _MonitorEntry::EMPTY;
|
315 mday 1.37 break;
|
316 mday 1.25 }
|
317 mike 1.2 }
|
318 mday 1.37 _entry_mut.unlock();
|
319 kumpf 1.4 PEG_METHOD_EXIT();
|
320 mike 1.2 }
321
|
322 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
323 {
|
324 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
325 mday 1.37
|
326 mday 1.25 dst->run(1);
|
327 mday 1.37 if( dst->_monitor->_entries.size() > (Uint32)dst->_entry_index )
328 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
329
|
330 mday 1.8 return 0;
|
331 mday 1.40 }
332
333
334
335 ////************************* monitor 2 *****************************////
336
337
338 monitor_2_entry::monitor_2_entry(void)
339 : type(UNTYPED)
340 {
341 }
342
343 monitor_2_entry::monitor_2_entry(pegasus_socket& _psock, monitor_2_entry_type _type)
344 : type(_type), psock(_psock)
345 {
346
347 }
348
349 monitor_2_entry::monitor_2_entry(const monitor_2_entry& e)
350 {
351 if(this != &e){
352 mday 1.40 psock = e.psock;
353 type = e.type;
354 }
355 }
356
357 monitor_2_entry::~monitor_2_entry(void)
358 {
359 }
360
361 monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e)
362 {
363 if(this != &e){
364 psock = e.psock;
365 type = e.type;
366 }
367 return *this;
368 }
369
370 Boolean monitor_2_entry::operator ==(const monitor_2_entry& me)
371 {
372 if(this == &me)
373 mday 1.40 return true;
374 if((Sint32)this->psock == (Sint32)me.psock)
375 return true;
376 return false;
377 }
378
379 Boolean monitor_2_entry::operator ==(void* k)
380 {
381 if((void *)this == k)
382 return true;
383 return false;
384 }
385
386
387 monitor_2_entry::operator pegasus_socket() const
388 {
389 return psock;
390 }
391
392
393 monitor_2::monitor_2(void)
394 mday 1.40 : _session_dispatch(0), _listeners(true, 0),_ready(true), _die(0)
395 {
396 try {
397
398 bsd_socket_factory _factory;
399
400 // set up the listener/acceptor
401 pegasus_socket temp = pegasus_socket(&_factory);
402
403 temp.socket(PF_INET, SOCK_STREAM, 0);
404 // initialize the address
405 memset(&_tickle_addr, 0, sizeof(_tickle_addr));
406 _tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
407 _tickle_addr.sin_family = PF_INET;
408 _tickle_addr.sin_port = 0;
409
410 PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr);
411
412 temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr));
413 temp.listen(3);
414 temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size);
415 mday 1.40
416 // set up the connector
417
418 pegasus_socket tickler = pegasus_socket(&_factory);
419 tickler.socket(PF_INET, SOCK_STREAM, 0);
420 struct sockaddr_in _addr;
421 memset(&_addr, 0, sizeof(_addr));
422 _addr.sin_addr.s_addr = inet_addr("127.0.0.1");
423 _addr.sin_family = PF_INET;
424 _addr.sin_port = 0;
425 tickler.bind((struct sockaddr*)&_addr, sizeof(_addr));
426 tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr));
427
428 _tickler.psock = tickler;
429 _tickler.type = INTERNAL;
430
431 struct sockaddr_in peer;
432 memset(&peer, 0, sizeof(peer));
433 PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
434
435 pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size);
436 mday 1.40 monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL);
437 _listeners.insert_first(_tickle);
438
439 }
440 catch(...){ }
441 }
442
443 monitor_2::~monitor_2(void)
444 {
445
446
447 }
448
449
450 void monitor_2::run(void)
451 {
452 monitor_2_entry* temp;
453 while(_die.value() == 0) {
454 struct timeval tv = {0, 0};
455
456 // place all sockets in the select set
457 mday 1.40 FD_ZERO(&rd_fd_set);
458 try {
459 _listeners.lock(pegasus_thread_self());
460 temp = _listeners.next(0);
461 while(temp != 0 ){
462 FD_SET((Sint32)temp->psock, &rd_fd_set);
463 temp = _listeners.next(temp);
464 }
465 _listeners.unlock();
466 }
467 catch(...){
468 return;
469 }
470
471 int events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL);
472 try {
473 _listeners.lock(pegasus_thread_self());
474
475 temp = _listeners.next(0);
476 while(temp != 0 ){
477 if(FD_ISSET((Sint32)temp->psock, &rd_fd_set)) {
478 mday 1.40 FD_CLR((Sint32)temp->psock, &rd_fd_set);
479 monitor_2_entry* entry = new monitor_2_entry(*temp);
480 _ready.insert_first((void*)entry);
481 }
482 temp = _listeners.next(temp);
483 }
484 _listeners.unlock();
485 }
486 catch(...){
487 return;
488 }
489
490 // now handle the sockets that are ready to read
491 _dispatch();
492 } // while alive
493 }
494
495 void* monitor_2::set_session_dispatch(void (*dp)(pegasus_socket&))
496 {
497 void* old = (void*)_session_dispatch;
498 _session_dispatch = dp;
499 mday 1.40 return old;
500 }
501
502
503 void monitor_2::_dispatch(void)
504 {
505 monitor_2_entry* entry = (monitor_2_entry*) _ready.remove_first();
506 while(entry != 0 ){
507 switch(entry->type) {
508 case INTERNAL:
509 static char buffer[2];
510 entry->psock.read(&buffer, 2);
511 break;
512 case LISTEN:
513 {
514 static struct sockaddr peer;
515 static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
516 pegasus_socket connected = entry->psock.accept(&peer, &peer_size);
517 add_entry(connected, SESSION);
518 }
519 break;
520 mday 1.40 case SESSION:
521 if(_session_dispatch != 0 )
522 _session_dispatch(entry->psock);
523 else {
524 static char buffer[4096];
525 int bytes = entry->psock.read(&buffer, 4096);
526 }
527
528 break;
529
530 case UNTYPED:
531 default:
532 break;
533
534 }
535 delete entry;
536 entry = (monitor_2_entry*) _ready.remove_first();
537
538 }
539 }
540
541 mday 1.40
542 void monitor_2::stop(void)
543 {
544 _die = 1;
545 tickle();
546
547 // shut down the listener list, free the list nodes
548 _tickler.psock.close();
549 _listeners.shutdown_queue();
550 }
551
552 void monitor_2::tickle(void)
553 {
554 static char _buffer[] =
555 {
556 '0','0'
557 };
558
559 _tickler.psock.write(&_buffer, 2);
560 }
561
562 mday 1.40
563 Boolean monitor_2::add_entry(pegasus_socket& ps, monitor_2_entry_type type)
564 {
565 monitor_2_entry* m2e = new monitor_2_entry(ps, type);
566
567 try{
568 _listeners.insert_first(m2e);
569 }
570 catch(...){
571 delete m2e;
572 return false;
573 }
574 tickle();
575 return true;
576 }
577
578 Boolean monitor_2::remove_entry(Sint32 s)
579 {
580 monitor_2_entry* temp;
581 try {
582 _listeners.try_lock(pegasus_thread_self());
583 mday 1.40 temp = _listeners.next(0);
584 while(temp != 0){
585 if(s == (Sint32)temp->psock ){
586 temp = _listeners.remove_no_lock(temp);
587 delete temp;
588 _listeners.unlock();
589 return true;
590 }
591 temp = _listeners.next(temp);
592 }
593 _listeners.unlock();
594 }
595 catch(...){
596 }
597 return false;
|
598 mday 1.7 }
|
599 mday 1.37
|
600 mday 1.7
|
601 mike 1.2
602 PEGASUS_NAMESPACE_END
|