1 mike 1.2 //%/////////////////////////////////////////////////////////////////////////////
2 //
|
3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
|
5 mike 1.2 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
7 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
10 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
|
13 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
14 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
16 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
19 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 //==============================================================================
23 //
24 // Author: Mike Brasher (mbrasher@bmc.com)
25 //
|
26 mday 1.49 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
|
27 mike 1.2 //
28 //%/////////////////////////////////////////////////////////////////////////////
29
30 #include <Pegasus/Common/Config.h>
|
31 mday 1.40
|
32 mike 1.2 #include <cstring>
33 #include "Monitor.h"
34 #include "MessageQueue.h"
35 #include "Socket.h"
|
36 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
37 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
38 mike 1.2
39 #ifdef PEGASUS_OS_TYPE_WINDOWS
40 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
41 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
42 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
43 mday 1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
|
44 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
45 otherwise, less than 64 clients (the default) will be able to connect to the \
46 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
47 mday 1.5
|
48 mike 1.2 # endif
49 # define FD_SETSIZE 1024
|
50 mday 1.5 # include <windows.h>
|
51 mike 1.2 #else
52 # include <sys/types.h>
53 # include <sys/socket.h>
54 # include <sys/time.h>
55 # include <netinet/in.h>
56 # include <netdb.h>
57 # include <arpa/inet.h>
58 #endif
59
60 PEGASUS_USING_STD;
61
62 PEGASUS_NAMESPACE_BEGIN
63
|
64 mday 1.18
|
65 mday 1.25 static AtomicInt _connections = 0;
66
67
68 static struct timeval create_time = {0, 1};
|
69 mday 1.38 static struct timeval destroy_time = {300, 0};
|
70 mday 1.26 static struct timeval deadlock_time = {0, 0};
|
71 mday 1.18
|
72 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
73 //
74 // MonitorRep
75 //
76 ////////////////////////////////////////////////////////////////////////////////
77
78 struct MonitorRep
79 {
80 fd_set rd_fd_set;
81 fd_set wr_fd_set;
82 fd_set ex_fd_set;
83 fd_set active_rd_fd_set;
84 fd_set active_wr_fd_set;
85 fd_set active_ex_fd_set;
86 };
87
88 ////////////////////////////////////////////////////////////////////////////////
89 //
90 // Monitor
91 //
92 ////////////////////////////////////////////////////////////////////////////////
93 mike 1.2
|
94 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
95 mike 1.2 Monitor::Monitor()
|
96 kumpf 1.48 : _module_handle(0), _controller(0), _async(false), _stopConnections(0)
|
97 mike 1.2 {
|
98 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
99 mike 1.2 Socket::initializeInterface();
|
100 mday 1.25 _rep = 0;
|
101 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
102 for( int i = 0; i < numberOfMonitorEntriesToAllocate; i++ )
|
103 mday 1.37 {
104 _MonitorEntry entry(0, 0, 0);
105 _entries.append(entry);
106 }
|
107 mike 1.2 }
108
|
109 mday 1.18 Monitor::Monitor(Boolean async)
|
110 kumpf 1.48 : _module_handle(0), _controller(0), _async(async), _stopConnections(0)
|
111 mday 1.18 {
|
112 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
113 mday 1.18 Socket::initializeInterface();
|
114 mday 1.25 _rep = 0;
|
115 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
116 for( int i = 0; i < numberOfMonitorEntriesToAllocate; i++ )
|
117 mday 1.37 {
118 _MonitorEntry entry(0, 0, 0);
119 _entries.append(entry);
120 }
|
121 mday 1.19 if( _async == true )
122 {
123 _thread_pool = new ThreadPool(0,
124 "Monitor",
|
125 mday 1.38 0,
|
126 mday 1.25 0,
|
127 mday 1.19 create_time,
128 destroy_time,
129 deadlock_time);
130 }
|
131 mday 1.20 else
132 _thread_pool = 0;
|
133 mday 1.18 }
|
134 mday 1.20
|
135 mike 1.2 Monitor::~Monitor()
136 {
|
137 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
138 "deregistering with module controller");
|
139 kumpf 1.10
|
140 kumpf 1.11 if(_module_handle != NULL)
|
141 mday 1.8 {
142 _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
143 _controller = 0;
|
144 kumpf 1.10 delete _module_handle;
|
145 mday 1.8 }
|
146 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
|
147 kumpf 1.48
|
148 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
149 mike 1.2 Socket::uninitializeInterface();
|
150 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
151 "returning from monitor destructor");
|
152 mday 1.21 if(_async == true)
|
153 mday 1.20 delete _thread_pool;
|
154 mike 1.2 }
155
|
156 mday 1.7
|
157 mday 1.18 int Monitor::kill_idle_threads()
158 {
159 static struct timeval now, last;
160 gettimeofday(&now, NULL);
|
161 mday 1.20 int dead_threads = 0;
|
162 mday 1.18
|
163 mday 1.38 if( now.tv_sec - last.tv_sec > 120 )
|
164 mday 1.18 {
165 gettimeofday(&last, NULL);
|
166 mday 1.20 try
167 {
168 dead_threads = _thread_pool->kill_dead_threads();
169 }
170 catch(IPCException& )
171 {
172 }
173
|
174 mday 1.18 }
|
175 mday 1.20 return dead_threads;
|
176 mday 1.18 }
177
|
178 mday 1.7
|
179 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
180 {
|
181 mday 1.18
|
182 mday 1.25 Boolean handled_events = false;
|
183 mday 1.40 int i = 0;
|
184 mday 1.37 #if defined(PEGASUS_OS_OS400) || defined(PEGASUS_OS_HPUX)
|
185 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
186 #else
|
187 kumpf 1.35 struct timeval tv = {0, 1};
188 #endif
|
189 mday 1.25 fd_set fdread;
190 FD_ZERO(&fdread);
|
191 mday 1.37 _entry_mut.lock(pegasus_thread_self());
|
192 mday 1.13
|
193 kumpf 1.48 // Check the stopConnections flag. If set, clear the Acceptor monitor entries
194 if (_stopConnections == 1)
195 {
196 for ( int indx = 0; indx < (int)_entries.size(); indx++)
197 {
198 if (_entries[indx]._type == Monitor::ACCEPTOR)
199 {
200 if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)
201 {
202 if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||
203 _entries[indx]._status.value() == _MonitorEntry::DYING )
204 {
205 // remove the entry
206 _entries[indx]._status = _MonitorEntry::EMPTY;
207 }
208 else
209 {
210 // set status to DYING
|
211 kumpf 1.52 _entries[indx]._status = _MonitorEntry::DYING;
|
212 kumpf 1.48 }
213 }
214 }
215 }
216 _stopConnections = 0;
217 }
|
218 kumpf 1.51
219 Uint32 _idleEntries = 0;
|
220 kumpf 1.48
|
221 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
|
222 mike 1.2 {
|
223 mday 1.37 if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
|
224 mday 1.25 {
|
225 kumpf 1.51 _idleEntries++;
|
226 mday 1.25 FD_SET(_entries[indx].socket, &fdread);
227 }
|
228 mday 1.13 }
|
229 kumpf 1.51
230 _entry_mut.unlock();
|
231 mday 1.25 int events = select(FD_SETSIZE, &fdread, NULL, NULL, &tv);
|
232 kumpf 1.51 _entry_mut.lock(pegasus_thread_self());
|
233 mday 1.25
|
234 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
235 kumpf 1.50 if(events == SOCKET_ERROR)
|
236 mike 1.2 #else
|
237 kumpf 1.50 if(events == -1)
|
238 mike 1.2 #endif
|
239 mday 1.13 {
|
240 kumpf 1.50 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
241 "Monitor::run - errorno = %d has occurred on select.", errno);
242 // The EBADF error indicates that one or more or the file
243 // descriptions was not valid. This could indicate that
244 // the _entries structure has been corrupted or that
245 // we have a synchronization error.
246
247 PEGASUS_ASSERT(errno != EBADF);
248 }
249 else if (events)
250 {
|
251 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
252 "Monitor::run select event received events = %d, monitoring %d idle entries",
253 events, _idleEntries);
|
254 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
255 {
|
256 kumpf 1.53 // The Monitor should only look at entries in the table that are IDLE (i.e.,
257 // owned by the Monitor).
258 if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&
259 (FD_ISSET(_entries[indx].socket, &fdread)))
|
260 mday 1.25 {
261 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
|
262 kumpf 1.53 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
263 "Monitor::run indx = %d, queueId = %d, q = %p",
264 indx, _entries[indx].queueId, q);
265 PEGASUS_ASSERT(q !=0);
|
266 mday 1.37
267 try
|
268 mday 1.25 {
|
269 mday 1.37 if(_entries[indx]._type == Monitor::CONNECTION)
270 {
|
271 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
272 "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
|
273 mday 1.37 static_cast<HTTPConnection *>(q)->_entry_index = indx;
274 if(static_cast<HTTPConnection *>(q)->_dying.value() > 0 )
275 {
|
276 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
277 "Monitor::run processing dying value > 0 for indx = %d, connection being closed.",
278 indx);
|
279 mday 1.37 _entries[indx]._status = _MonitorEntry::DYING;
280 MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
281 Message* message= new CloseConnectionMessage(_entries[indx].socket);
282 message->dest = o.getQueueId();
283 _entry_mut.unlock();
284 o.enqueue(message);
285 return true;
286 }
287 _entries[indx]._status = _MonitorEntry::BUSY;
288 _thread_pool->allocate_and_awaken((void *)q, _dispatch);
289 }
290 else
|
291 mday 1.25 {
|
292 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
293 "Non-connection entry, indx = %d, has been received.", indx);
|
294 mday 1.37 int events = 0;
295 events |= SocketMessage::READ;
296 Message *msg = new SocketMessage(_entries[indx].socket, events);
297 _entries[indx]._status = _MonitorEntry::BUSY;
298 _entry_mut.unlock();
|
299 mday 1.27
|
300 mday 1.37 q->enqueue(msg);
301 _entries[indx]._status = _MonitorEntry::IDLE;
|
302 mday 1.25 return true;
303 }
304 }
|
305 mday 1.37 catch(...)
|
306 mday 1.25 {
307 }
308 handled_events = true;
309 }
310 }
|
311 mday 1.24 }
|
312 mday 1.37 _entry_mut.unlock();
|
313 mday 1.13 return(handled_events);
|
314 mike 1.2 }
315
|
316 kumpf 1.48 void Monitor::stopListeningForConnections()
317 {
318 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
319
320 _stopConnections = 1;
321
322 PEG_METHOD_EXIT();
323 }
|
324 mday 1.25
|
325 mday 1.37
|
326 mday 1.25 int Monitor::solicitSocketMessages(
|
327 mike 1.2 Sint32 socket,
328 Uint32 events,
|
329 mday 1.8 Uint32 queueId,
330 int type)
|
331 mike 1.2 {
|
332 kumpf 1.4
|
333 kumpf 1.31 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
334 mike 1.2
|
335 mday 1.37 _entry_mut.lock(pegasus_thread_self());
|
336 mday 1.25
|
337 kumpf 1.50 for(int index = 0; index < (int)_entries.size(); index++)
|
338 mday 1.25 {
|
339 mday 1.37 try
340 {
341 if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
342 {
343 _entries[index].socket = socket;
344 _entries[index].queueId = queueId;
345 _entries[index]._type = type;
346 _entries[index]._status = _MonitorEntry::IDLE;
347 _entry_mut.unlock();
348
349 return index;
350 }
351 }
352 catch(...)
|
353 mday 1.25 {
354 }
|
355 mday 1.37
|
356 mday 1.25 }
|
357 kumpf 1.50 _entry_mut.unlock();
|
358 mday 1.25 PEG_METHOD_EXIT();
|
359 kumpf 1.50 return -1;
|
360 mike 1.2 }
361
|
362 mday 1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
|
363 mike 1.2 {
|
364 kumpf 1.50
|
365 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
366 mday 1.37 _entry_mut.lock(pegasus_thread_self());
|
367 mday 1.27
|
368 mday 1.25 for(int index = 0; index < (int)_entries.size(); index++)
|
369 mike 1.2 {
|
370 mday 1.25 if(_entries[index].socket == socket)
371 {
372 _entries[index]._status = _MonitorEntry::EMPTY;
|
373 kumpf 1.53 _entries[index].socket = -1;
|
374 mday 1.37 break;
|
375 mday 1.25 }
|
376 mike 1.2 }
|
377 mday 1.37 _entry_mut.unlock();
|
378 kumpf 1.4 PEG_METHOD_EXIT();
|
379 mike 1.2 }
380
|
381 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
382 {
|
383 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
384 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
385 kumpf 1.53 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
386 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
387 kumpf 1.51 try
388 {
389 dst->run(1);
390 }
391 catch (...)
392 {
393 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
394 "Monitor::_dispatch: exception received");
395 }
396 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
397 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
398
|
399 kumpf 1.50 dst->_monitor->_entry_mut.lock(pegasus_thread_self());
400 // It shouldn't be necessary to set status = _MonitorEntry::IDLE
401 // if the connection is being closed. However, the current logic
402 // in Monitor::run requires this value to be set for the close
403 // to be processed.
|
404 kumpf 1.53
405 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
|
406 kumpf 1.50 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
407 if (dst->_connectionClosePending)
408 {
409 dst->_dying = 1;
410 }
411 dst->_monitor->_entry_mut.unlock();
|
412 mday 1.8 return 0;
|
413 mday 1.40 }
414
415
416
417 ////************************* monitor 2 *****************************////
|
418 mday 1.43 ////************************* monitor 2 *****************************////
419 ////************************* monitor 2 *****************************////
420 ////************************* monitor 2 *****************************////
421 ////************************* monitor 2 *****************************////
422 ////************************* monitor 2 *****************************////
423 ////************************* monitor 2 *****************************////
|
424 mday 1.40
425
|
426 mday 1.42 m2e_rep::m2e_rep(void)
|
427 mday 1.43 :Base(), state(IDLE)
428
|
429 mday 1.42 {
430 }
431
432 m2e_rep::m2e_rep(monitor_2_entry_type _type,
433 pegasus_socket _sock,
434 void* _accept,
435 void* _dispatch)
|
436 mday 1.43 : Base(), type(_type), state(IDLE), psock(_sock),
|
437 mday 1.42 accept_parm(_accept), dispatch_parm(_dispatch)
438 {
439
440 }
441
442 m2e_rep::~m2e_rep(void)
443 {
444 }
445
446 m2e_rep::m2e_rep(const m2e_rep& r)
447 : Base()
448 {
449 if(this != &r){
450 type = r.type;
451 psock = r.psock;
452 accept_parm = r.accept_parm;
453 dispatch_parm = r.dispatch_parm;
|
454 mday 1.43 state = IDLE;
455
|
456 mday 1.42 }
457 }
458
459
460 m2e_rep& m2e_rep::operator =(const m2e_rep& r)
461 {
462 if(this != &r) {
463 type = r.type;
464 psock = r.psock;
465 accept_parm = r.accept_parm;
466 dispatch_parm = r.dispatch_parm;
|
467 mday 1.43 state = IDLE;
|
468 mday 1.42 }
469 return *this;
470 }
471
472 Boolean m2e_rep::operator ==(const m2e_rep& r)
473 {
474 if(this == &r)
475 return true;
476 return false;
477 }
478
479 Boolean m2e_rep::operator ==(void* r)
480 {
481 if((void*)this == r)
482 return true;
483 return false;
484 }
485
486 m2e_rep::operator pegasus_socket() const
487 {
488 return psock;
489 mday 1.42 }
490
491
|
492 mday 1.40 monitor_2_entry::monitor_2_entry(void)
493 {
|
494 mday 1.42 _rep = new m2e_rep();
|
495 mday 1.40 }
496
|
497 mday 1.42 monitor_2_entry::monitor_2_entry(pegasus_socket& _psock,
498 monitor_2_entry_type _type,
499 void* _accept_parm, void* _dispatch_parm)
|
500 mday 1.40 {
|
501 mday 1.42 _rep = new m2e_rep(_type, _psock, _accept_parm, _dispatch_parm);
|
502 mday 1.40 }
503
504 monitor_2_entry::monitor_2_entry(const monitor_2_entry& e)
505 {
506 if(this != &e){
|
507 mday 1.42 Inc(this->_rep = e._rep);
|
508 mday 1.40 }
509 }
510
511 monitor_2_entry::~monitor_2_entry(void)
512 {
|
513 mday 1.42 Dec(_rep);
|
514 mday 1.40 }
515
516 monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e)
517 {
518 if(this != &e){
|
519 mday 1.42 Dec(_rep);
520 Inc(this->_rep = e._rep);
|
521 mday 1.40 }
522 return *this;
523 }
524
|
525 mday 1.42 Boolean monitor_2_entry::operator ==(const monitor_2_entry& me) const
|
526 mday 1.40 {
527 if(this == &me)
528 return true;
529 return false;
530 }
531
|
532 mday 1.42 Boolean monitor_2_entry::operator ==(void* k) const
|
533 mday 1.40 {
534 if((void *)this == k)
535 return true;
536 return false;
537 }
538
539
|
540 mday 1.42 monitor_2_entry_type monitor_2_entry::get_type(void) const
|
541 mday 1.40 {
|
542 mday 1.42 return _rep->type;
543 }
544
545 void monitor_2_entry::set_type(monitor_2_entry_type t)
546 {
547 _rep->type = t;
548 }
549
550
|
551 mday 1.43 monitor_2_entry_state monitor_2_entry::get_state(void) const
552 {
553 return (monitor_2_entry_state) _rep->state.value();
554 }
555
556 void monitor_2_entry::set_state(monitor_2_entry_state t)
557 {
558 _rep->state = t;
559 }
560
|
561 mday 1.42 void* monitor_2_entry::get_accept(void) const
562 {
563 return _rep->accept_parm;
564 }
565
566 void monitor_2_entry::set_accept(void* a)
567 {
568 _rep->accept_parm = a;
569 }
570
571
572 void* monitor_2_entry::get_dispatch(void) const
573 {
574 return _rep->dispatch_parm;
575 }
576
577 void monitor_2_entry::set_dispatch(void* a)
578 {
579 _rep->dispatch_parm = a;
580 }
581
582 mday 1.42 pegasus_socket monitor_2_entry::get_sock(void) const
583 {
584 return _rep->psock;
585 }
586
587
588 void monitor_2_entry::set_sock(pegasus_socket& s)
589 {
590 _rep->psock = s;
591
|
592 mday 1.40 }
593
594
|
595 mday 1.49 AsyncDQueue<HTTPConnection2> monitor_2::_connections(true, 0);
596
597
|
598 mday 1.40 monitor_2::monitor_2(void)
|
599 mday 1.42 : _session_dispatch(0), _accept_dispatch(0), _listeners(true, 0),
|
600 mday 1.49 _ready(true, 0), _die(0), _requestCount(0)
|
601 mday 1.40 {
602 try {
603
604 bsd_socket_factory _factory;
605
606 // set up the listener/acceptor
607 pegasus_socket temp = pegasus_socket(&_factory);
608
609 temp.socket(PF_INET, SOCK_STREAM, 0);
610 // initialize the address
611 memset(&_tickle_addr, 0, sizeof(_tickle_addr));
|
612 marek 1.47 #ifdef PEGASUS_OS_ZOS
613 _tickle_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
614 #else
|
615 chuck 1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
616 #pragma convert(37)
617 #endif
|
618 mday 1.40 _tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
|
619 chuck 1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
620 #pragma convert(0)
621 #endif
|
622 marek 1.47 #endif
|
623 mday 1.40 _tickle_addr.sin_family = PF_INET;
624 _tickle_addr.sin_port = 0;
625
626 PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr);
627
628 temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr));
629 temp.listen(3);
630 temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size);
631
632 // set up the connector
633
634 pegasus_socket tickler = pegasus_socket(&_factory);
635 tickler.socket(PF_INET, SOCK_STREAM, 0);
636 struct sockaddr_in _addr;
637 memset(&_addr, 0, sizeof(_addr));
|
638 kumpf 1.48 #ifdef PEGASUS_OS_ZOS
|
639 marek 1.47 _addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
640 #else
|
641 mday 1.40 _addr.sin_addr.s_addr = inet_addr("127.0.0.1");
|
642 marek 1.47 #endif
|
643 mday 1.40 _addr.sin_family = PF_INET;
644 _addr.sin_port = 0;
645 tickler.bind((struct sockaddr*)&_addr, sizeof(_addr));
646 tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr));
647
|
648 mday 1.42 _tickler.set_sock(tickler);
649 _tickler.set_type(INTERNAL);
|
650 mday 1.43 _tickler.set_state(BUSY);
651
|
652 mday 1.40 struct sockaddr_in peer;
653 memset(&peer, 0, sizeof(peer));
654 PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
655
656 pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size);
|
657 mday 1.42 monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL, 0, 0);
|
658 mday 1.43 _tickle->set_state(BUSY);
659
|
660 mday 1.40 _listeners.insert_first(_tickle);
661
662 }
663 catch(...){ }
664 }
665
666 monitor_2::~monitor_2(void)
667 {
|
668 mday 1.41 try {
669 monitor_2_entry* temp = _listeners.remove_first();
670 while(temp){
671 delete temp;
672 temp = _listeners.remove_first();
673 }
674 }
675 catch(...){ }
|
676 mday 1.40 }
677
678
679 void monitor_2::run(void)
680 {
681 monitor_2_entry* temp;
682 while(_die.value() == 0) {
|
683 mday 1.49
|
684 mday 1.45 struct timeval tv = {0, 0};
|
685 mday 1.40
686 // place all sockets in the select set
687 FD_ZERO(&rd_fd_set);
688 try {
689 _listeners.lock(pegasus_thread_self());
690 temp = _listeners.next(0);
691 while(temp != 0 ){
|
692 mday 1.43 if(temp->get_state() == CLOSED ){
693 monitor_2_entry* closed = temp;
694 temp = _listeners.next(closed);
695 _listeners.remove_no_lock(closed);
|
696 mday 1.49 HTTPConnection2 *cn = monitor_2::remove_connection((Sint32)(closed->get_sock()));
697 delete cn;
|
698 mday 1.43 delete closed;
699 }
|
700 mday 1.45 if(temp == 0)
701 break;
|
702 mday 1.46 Sint32 fd = (Sint32) temp->get_sock();
703 if(fd >= 0 )
704 FD_SET(fd , &rd_fd_set);
|
705 mday 1.40 temp = _listeners.next(temp);
706 }
707 _listeners.unlock();
708 }
709 catch(...){
710 return;
711 }
|
712 mday 1.42 // important - the dispatch routine has pointers to all the
713 // entries that are readable. These entries can be changed but
714 // the pointer must not be tampered with.
|
715 mday 1.40
716 int events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL);
717 try {
718 _listeners.lock(pegasus_thread_self());
719 temp = _listeners.next(0);
720 while(temp != 0 ){
|
721 mday 1.42 Sint32 fd = (Sint32) temp->get_sock();
|
722 mday 1.46 if(fd >= 0 && FD_ISSET(fd, &rd_fd_set)) {
|
723 mday 1.43 temp->set_state(BUSY);
|
724 mday 1.42 FD_CLR(fd, &rd_fd_set);
725 monitor_2_entry* ready = new monitor_2_entry(*temp);
|
726 mday 1.49 try
727 {
728 _ready.insert_first(ready);
729 }
730 catch(...)
731 {
732 }
733
|
734 mday 1.42 _requestCount++;
|
735 mday 1.40 }
736 temp = _listeners.next(temp);
737 }
738 _listeners.unlock();
739 }
740 catch(...){
741 return;
742 }
743 // now handle the sockets that are ready to read
744 _dispatch();
745 } // while alive
746 }
747
|
748 mday 1.42 void* monitor_2::set_session_dispatch(void (*dp)(monitor_2_entry*))
|
749 mday 1.40 {
|
750 mday 1.42 void* old = (void *)_session_dispatch;
|
751 mday 1.40 _session_dispatch = dp;
752 return old;
753 }
754
|
755 mday 1.42 void* monitor_2::set_accept_dispatch(void (*dp)(monitor_2_entry*))
756 {
757 void* old = (void*)_accept_dispatch;
758 _accept_dispatch = dp;
759 return old;
760
761 }
762
|
763 mday 1.40
|
764 mday 1.42 // important - the dispatch routine has pointers to all the
765 // entries that are readable. These entries can be changed but
766 // the pointer must not be tampered with.
|
767 mday 1.40 void monitor_2::_dispatch(void)
768 {
|
769 mday 1.49 monitor_2_entry* entry;
770
771 if(_ready.count() == 0 )
772 return;
773
774
775 try
776 {
777
778 entry = _ready.remove_first();
779 }
780 catch(...)
781 {
782 }
783
784 while(entry != 0 ) {
|
785 mday 1.42 switch(entry->get_type()) {
|
786 mday 1.40 case INTERNAL:
787 static char buffer[2];
|
788 mday 1.49 entry->get_sock().disableBlocking();
|
789 mday 1.42 entry->get_sock().read(&buffer, 2);
|
790 mday 1.49 entry->get_sock().enableBlocking();
|
791 mday 1.40 break;
792 case LISTEN:
793 {
794 static struct sockaddr peer;
795 static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
|
796 mday 1.49 entry->get_sock().disableBlocking();
|
797 mday 1.42 pegasus_socket connected = entry->get_sock().accept(&peer, &peer_size);
|
798 mday 1.49 entry->get_sock().enableBlocking();
|
799 mday 1.42 monitor_2_entry *temp = add_entry(connected, SESSION, entry->get_accept(), entry->get_dispatch());
800 if(temp && _accept_dispatch != 0)
|
801 mday 1.49 _accept_dispatch(temp);
|
802 mday 1.40 }
803 break;
804 case SESSION:
805 if(_session_dispatch != 0 )
|
806 mday 1.42 _session_dispatch(entry);
|
807 mday 1.40 else {
808 static char buffer[4096];
|
809 mday 1.42 int bytes = entry->get_sock().read(&buffer, 4096);
|
810 mday 1.40 }
811
812 break;
813 case UNTYPED:
814 default:
815 break;
816 }
|
817 mday 1.42 _requestCount--;
|
818 mday 1.40 delete entry;
|
819 mday 1.49
820 if(_ready.count() == 0 )
821 break;
822
823 try
824 {
825 entry = _ready.remove_first();
826 }
827 catch(...)
828 {
829 }
830
|
831 mday 1.40 }
832 }
833
834 void monitor_2::stop(void)
835 {
836 _die = 1;
837 tickle();
838
839 // shut down the listener list, free the list nodes
|
840 mday 1.42 _tickler.get_sock().close();
|
841 mday 1.40 _listeners.shutdown_queue();
842 }
843
844 void monitor_2::tickle(void)
845 {
846 static char _buffer[] =
847 {
848 '0','0'
849 };
850
|
851 mday 1.42 _tickler.get_sock().write(&_buffer, 2);
|
852 mday 1.40 }
853
854
|
855 mday 1.42 monitor_2_entry* monitor_2::add_entry(pegasus_socket& ps,
856 monitor_2_entry_type type,
857 void* accept_parm,
858 void* dispatch_parm)
|
859 mday 1.40 {
|
860 mday 1.42 monitor_2_entry* m2e = new monitor_2_entry(ps, type, accept_parm, dispatch_parm);
|
861 mday 1.40
862 try{
863 _listeners.insert_first(m2e);
864 }
865 catch(...){
866 delete m2e;
|
867 mday 1.42 return 0;
|
868 mday 1.40 }
869 tickle();
|
870 mday 1.42 return m2e;
|
871 mday 1.40 }
872
873 Boolean monitor_2::remove_entry(Sint32 s)
874 {
875 monitor_2_entry* temp;
876 try {
877 _listeners.try_lock(pegasus_thread_self());
878 temp = _listeners.next(0);
879 while(temp != 0){
|
880 mday 1.42 if(s == (Sint32)temp->_rep->psock ){
|
881 mday 1.40 temp = _listeners.remove_no_lock(temp);
882 delete temp;
883 _listeners.unlock();
884 return true;
885 }
886 temp = _listeners.next(temp);
887 }
888 _listeners.unlock();
889 }
890 catch(...){
891 }
892 return false;
|
893 mday 1.7 }
|
894 mday 1.37
|
895 mday 1.42 Uint32 monitor_2::getOutstandingRequestCount(void)
896 {
897 return _requestCount.value();
898
|
899 mday 1.49 }
900
901
902 HTTPConnection2* monitor_2::remove_connection(Sint32 sock)
903 {
904
905 HTTPConnection2* temp;
906 try
907 {
908 monitor_2::_connections.lock(pegasus_thread_self());
909 temp = monitor_2::_connections.next(0);
910 while(temp != 0 )
911 {
912 if(sock == temp->getSocket())
913 {
914 temp = monitor_2::_connections.remove_no_lock(temp);
915 monitor_2::_connections.unlock();
916 return temp;
917 }
918 temp = monitor_2::_connections.next(temp);
919 }
920 mday 1.49 monitor_2::_connections.unlock();
921 }
922 catch(...)
923 {
924 }
925 return 0;
926 }
927
928 Boolean monitor_2::insert_connection(HTTPConnection2* connection)
929 {
930 try
931 {
932 monitor_2::_connections.insert_first(connection);
933 }
934 catch(...)
935 {
936 return false;
937 }
938 return true;
|
939 mday 1.42 }
|
940 mday 1.7
|
941 mike 1.2
942 PEGASUS_NAMESPACE_END
|