1 karl 1.64 //%2003////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.64 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Development
4 // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
|
7 mike 1.2 //
8 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
9 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
10 // deal in the Software without restriction, including without limitation the
11 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
12 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
13 // furnished to do so, subject to the following conditions:
14 //
|
15 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
16 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
17 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
18 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
19 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
21 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 //
24 //==============================================================================
25 //
26 // Author: Mike Brasher (mbrasher@bmc.com)
27 //
|
28 mday 1.49 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
|
29 a.arora 1.71 // Amit K Arora (Bug#1153) amita@in.ibm.com
|
30 mike 1.2 //
31 //%/////////////////////////////////////////////////////////////////////////////
32
33 #include <Pegasus/Common/Config.h>
|
34 mday 1.40
|
35 mike 1.2 #include <cstring>
36 #include "Monitor.h"
37 #include "MessageQueue.h"
38 #include "Socket.h"
|
39 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
40 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
41 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
42 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
43 mike 1.2
44 #ifdef PEGASUS_OS_TYPE_WINDOWS
45 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
46 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
47 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
48 mday 1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
|
49 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
50 otherwise, less than 64 clients (the default) will be able to connect to the \
51 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
52 mday 1.5
|
53 mike 1.2 # endif
54 # define FD_SETSIZE 1024
|
55 mday 1.5 # include <windows.h>
|
56 mike 1.2 #else
57 # include <sys/types.h>
58 # include <sys/socket.h>
59 # include <sys/time.h>
60 # include <netinet/in.h>
61 # include <netdb.h>
62 # include <arpa/inet.h>
63 #endif
64
65 PEGASUS_USING_STD;
66
67 PEGASUS_NAMESPACE_BEGIN
68
|
69 mday 1.18
|
70 mday 1.25 static AtomicInt _connections = 0;
71
72 static struct timeval create_time = {0, 1};
|
73 mday 1.38 static struct timeval destroy_time = {300, 0};
|
74 mday 1.26 static struct timeval deadlock_time = {0, 0};
|
75 mday 1.18
|
76 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
77 //
78 // MonitorRep
79 //
80 ////////////////////////////////////////////////////////////////////////////////
81
82 struct MonitorRep
83 {
84 fd_set rd_fd_set;
85 fd_set wr_fd_set;
86 fd_set ex_fd_set;
87 fd_set active_rd_fd_set;
88 fd_set active_wr_fd_set;
89 fd_set active_ex_fd_set;
90 };
91
92 ////////////////////////////////////////////////////////////////////////////////
93 //
94 // Monitor
95 //
96 ////////////////////////////////////////////////////////////////////////////////
97 mike 1.2
|
98 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
99 mike 1.2 Monitor::Monitor()
|
100 a.arora 1.73 : _module_handle(0),
101 _controller(0),
102 _async(false),
103 _stopConnections(0),
104 _stopConnectionsSem(0),
105 _solicitSocketCount(0)
|
106 mike 1.2 {
|
107 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
108 mike 1.2 Socket::initializeInterface();
|
109 mday 1.25 _rep = 0;
|
110 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
111 a.arora 1.73
112 // setup the tickler
113 initializeTickler();
114
115 // Start the count at 1 because initilizeTickler()
116 // has added an entry in the first position of the
117 // _entries array
118 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
119 mday 1.37 {
120 _MonitorEntry entry(0, 0, 0);
121 _entries.append(entry);
122 }
|
123 mike 1.2 }
124
|
125 mday 1.18 Monitor::Monitor(Boolean async)
|
126 a.arora 1.73 : _module_handle(0),
127 _controller(0),
128 _async(async),
129 _stopConnections(0),
130 _stopConnectionsSem(0),
131 _solicitSocketCount(0)
|
132 mday 1.18 {
|
133 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
134 mday 1.18 Socket::initializeInterface();
|
135 mday 1.25 _rep = 0;
|
136 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
137 a.arora 1.73
138 // setup the tickler
139 initializeTickler();
140
141 // Start the count at 1 because initilizeTickler()
142 // has added an entry in the first position of the
143 // _entries array
144 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
145 mday 1.37 {
146 _MonitorEntry entry(0, 0, 0);
147 _entries.append(entry);
148 }
|
149 mday 1.18 }
|
150 mday 1.20
|
151 mike 1.2 Monitor::~Monitor()
152 {
|
153 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
154 "deregistering with module controller");
|
155 kumpf 1.10
|
156 kumpf 1.11 if(_module_handle != NULL)
|
157 mday 1.8 {
158 _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
159 _controller = 0;
|
160 kumpf 1.10 delete _module_handle;
|
161 mday 1.8 }
|
162 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
|
163 kumpf 1.48
|
164 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
165 mike 1.2 Socket::uninitializeInterface();
|
166 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
167 "returning from monitor destructor");
|
168 mday 1.18 }
169
|
170 a.arora 1.73 void Monitor::initializeTickler(){
171 /*
172 NOTE: On any errors trying to
173 setup out tickle connection,
174 throw an exception/end the server
175 */
176
177 /* setup the tickle server/listener */
178
179 // get a socket for the server side
180 if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
181 //handle error
182 MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
183 "Received error number $0 while creating the internal socket.",
184 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
185 errno);
186 #else
187 WSAGetLastError());
188 #endif
189 throw Exception(parms);
190 }
191 a.arora 1.73
192 // initialize the address
193 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
194 #ifdef PEGASUS_OS_ZOS
195 _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
196 #else
197 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
198 #pragma convert(37)
199 #endif
200 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
201 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
202 #pragma convert(0)
203 #endif
204 #endif
205 _tickle_server_addr.sin_family = PF_INET;
206 _tickle_server_addr.sin_port = 0;
207
208 PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_server_addr);
209
210 // bind server side to socket
211 if((::bind(_tickle_server_socket,
212 a.arora 1.73 (struct sockaddr *)&_tickle_server_addr,
213 sizeof(_tickle_server_addr))) < 0){
214 // handle error
215 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
216 "Received error number $0 while binding the internal socket.",
217 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
218 errno);
219 #else
220 WSAGetLastError());
221 #endif
222 throw Exception(parms);
223 }
224
225 // tell the kernel we are a server
226 if((::listen(_tickle_server_socket,3)) < 0){
227 // handle error
228 MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
229 "Received error number $0 while listening to the internal socket.",
230 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
231 errno);
232 #else
233 a.arora 1.73 WSAGetLastError());
234 #endif
235 throw Exception(parms);
236 }
237
238 // make sure we have the correct socket for our server
239 int sock = ::getsockname(_tickle_server_socket,
240 (struct sockaddr*)&_tickle_server_addr,
241 &_addr_size);
242 if(sock < 0){
243 // handle error
244 MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
245 "Received error number $0 while getting the internal socket name.",
246 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
247 errno);
248 #else
249 WSAGetLastError());
250 #endif
251 throw Exception(parms);
252 }
253
254 a.arora 1.73 /* set up the tickle client/connector */
255
256 // get a socket for our tickle client
257 if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
258 // handle error
259 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
260 "Received error number $0 while creating the internal client socket.",
261 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
262 errno);
263 #else
264 WSAGetLastError());
265 #endif
266 throw Exception(parms);
267 }
268
269 // setup the address of the client
270 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
271 #ifdef PEGASUS_OS_ZOS
272 _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
273 #else
274 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
275 a.arora 1.73 #pragma convert(37)
276 #endif
277 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
278 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
279 #pragma convert(0)
280 #endif
281 #endif
282 _tickle_client_addr.sin_family = PF_INET;
283 _tickle_client_addr.sin_port = 0;
284
285 // bind socket to client side
286 if((::bind(_tickle_client_socket,
287 (struct sockaddr*)&_tickle_client_addr,
288 sizeof(_tickle_client_addr))) < 0){
289 // handle error
290 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
291 "Received error number $0 while binding the internal client socket.",
292 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
293 errno);
294 #else
295 WSAGetLastError());
296 a.arora 1.73 #endif
297 throw Exception(parms);
298 }
299
300 // connect to server side
301 if((::connect(_tickle_client_socket,
302 (struct sockaddr*)&_tickle_server_addr,
303 sizeof(_tickle_server_addr))) < 0){
304 // handle error
305 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
306 "Received error number $0 while connecting the internal client socket.",
307 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
308 errno);
309 #else
310 WSAGetLastError());
311 #endif
312 throw Exception(parms);
313 }
314
315 /* set up the slave connection */
316 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
317 a.arora 1.73 PEGASUS_SOCKLEN_SIZE peer_size = sizeof(_tickle_peer_addr);
318 pegasus_sleep(1);
319
320 // this call may fail, we will try a max of 20 times to establish this peer connection
321 if((_tickle_peer_socket = ::accept(_tickle_server_socket,
322 (struct sockaddr*)&_tickle_peer_addr,
323 &peer_size)) < 0){
324 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
325 // Only retry on non-windows platforms.
326 if(_tickle_peer_socket == -1 && errno == EAGAIN)
327 {
328 int retries = 0;
329 do
330 {
331 pegasus_sleep(1);
332 _tickle_peer_socket = ::accept(_tickle_server_socket,
333 (struct sockaddr*)&_tickle_peer_addr,
334 &peer_size);
335 retries++;
336 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
337 }
338 a.arora 1.73 #endif
339 }
340 if(_tickle_peer_socket == -1){
341 // handle error
342 MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
343 "Received error number $0 while accepting the internal socket connection.",
344 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
345 errno);
346 #else
347 WSAGetLastError());
348 #endif
349 throw Exception(parms);
350 }
351 // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
352 // checks entries with IDLE state for events
353 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
354 entry._status = _MonitorEntry::IDLE;
355 _entries.append(entry);
356 }
357
358 void Monitor::tickle(void)
359 a.arora 1.73 {
360 static char _buffer[] =
361 {
362 '0','0'
363 };
364
365 Socket::disableBlocking(_tickle_client_socket);
366 Socket::write(_tickle_client_socket,&_buffer, 2);
367 Socket::enableBlocking(_tickle_client_socket);
368 }
369
|
370 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
371 {
|
372 mday 1.18
|
373 mday 1.25 Boolean handled_events = false;
|
374 a.arora 1.73 int i = 0;
375
|
376 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
377 a.arora 1.73
|
378 mday 1.25 fd_set fdread;
379 FD_ZERO(&fdread);
|
380 a.arora 1.73
|
381 mday 1.37 _entry_mut.lock(pegasus_thread_self());
|
382 mday 1.13
|
383 kumpf 1.48 // Check the stopConnections flag. If set, clear the Acceptor monitor entries
384 if (_stopConnections == 1)
385 {
386 for ( int indx = 0; indx < (int)_entries.size(); indx++)
387 {
388 if (_entries[indx]._type == Monitor::ACCEPTOR)
389 {
390 if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)
391 {
392 if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||
393 _entries[indx]._status.value() == _MonitorEntry::DYING )
394 {
395 // remove the entry
396 _entries[indx]._status = _MonitorEntry::EMPTY;
397 }
398 else
399 {
400 // set status to DYING
|
401 kumpf 1.52 _entries[indx]._status = _MonitorEntry::DYING;
|
402 kumpf 1.48 }
403 }
404 }
405 }
406 _stopConnections = 0;
|
407 a.arora 1.73 _stopConnectionsSem.signal();
|
408 kumpf 1.48 }
|
409 kumpf 1.51
|
410 kumpf 1.68 for( int indx = 0; indx < (int)_entries.size(); indx++)
411 {
412 if ((_entries[indx]._status.value() == _MonitorEntry::DYING) &&
413 (_entries[indx]._type == Monitor::CONNECTION))
414 {
415 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
416 PEGASUS_ASSERT(q != 0);
417 MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
418 Message* message= new CloseConnectionMessage(_entries[indx].socket);
419 message->dest = o.getQueueId();
420
421 // HTTPAcceptor is responsible for closing the connection.
422 // The lock is released to allow HTTPAcceptor to call
423 // unsolicitSocketMessages to free the entry.
424 // Once HTTPAcceptor completes processing of the close
425 // connection, the lock is re-requested and processing of
426 // the for loop continues. This is safe with the current
427 // implementation of the _entries object. Note that the
428 // loop condition accesses the _entries.size() on each
429 // iteration, so that a change in size while the mutex is
430 // unlocked will not result in an ArrayIndexOutOfBounds
431 kumpf 1.68 // exception.
432
433 _entry_mut.unlock();
434 o.enqueue(message);
435 _entry_mut.lock(pegasus_thread_self());
436 }
437 }
438
|
439 kumpf 1.51 Uint32 _idleEntries = 0;
|
440 a.arora 1.73
441 /*
442 We will keep track of the maximum socket number and pass this value
443 to the kernel as a parameter to SELECT. This loop seems like a good
444 place to calculate the max file descriptor (maximum socket number)
445 because we have to traverse the entire array.
446 */
447 int maxSocketCurrentPass = 0;
|
448 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
|
449 mike 1.2 {
|
450 a.arora 1.73 if(maxSocketCurrentPass < _entries[indx].socket)
451 maxSocketCurrentPass = _entries[indx].socket;
452
|
453 mday 1.37 if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
|
454 mday 1.25 {
|
455 kumpf 1.51 _idleEntries++;
|
456 mday 1.25 FD_SET(_entries[indx].socket, &fdread);
457 }
|
458 mday 1.13 }
|
459 s.hills 1.62
|
460 a.arora 1.73 /*
461 Add 1 then assign maxSocket accordingly. We add 1 to account for
462 descriptors starting at 0.
463 */
464 maxSocketCurrentPass++;
465
|
466 kumpf 1.51 _entry_mut.unlock();
|
467 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
468 kumpf 1.51 _entry_mut.lock(pegasus_thread_self());
|
469 mday 1.25
|
470 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
471 kumpf 1.50 if(events == SOCKET_ERROR)
|
472 mike 1.2 #else
|
473 kumpf 1.50 if(events == -1)
|
474 mike 1.2 #endif
|
475 mday 1.13 {
|
476 kumpf 1.50 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
477 "Monitor::run - errorno = %d has occurred on select.", errno);
478 // The EBADF error indicates that one or more or the file
479 // descriptions was not valid. This could indicate that
480 // the _entries structure has been corrupted or that
481 // we have a synchronization error.
482
483 PEGASUS_ASSERT(errno != EBADF);
484 }
485 else if (events)
486 {
|
487 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
488 "Monitor::run select event received events = %d, monitoring %d idle entries",
489 events, _idleEntries);
|
490 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
491 {
|
492 kumpf 1.53 // The Monitor should only look at entries in the table that are IDLE (i.e.,
493 // owned by the Monitor).
494 if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&
495 (FD_ISSET(_entries[indx].socket, &fdread)))
|
496 mday 1.25 {
497 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
|
498 kumpf 1.53 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
499 "Monitor::run indx = %d, queueId = %d, q = %p",
500 indx, _entries[indx].queueId, q);
501 PEGASUS_ASSERT(q !=0);
|
502 mday 1.37
503 try
|
504 mday 1.25 {
|
505 mday 1.37 if(_entries[indx]._type == Monitor::CONNECTION)
506 {
|
507 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
508 "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
|
509 mday 1.37 static_cast<HTTPConnection *>(q)->_entry_index = indx;
510 _entries[indx]._status = _MonitorEntry::BUSY;
|
511 kumpf 1.66 // If allocate_and_awaken failure, retry on next iteration
|
512 a.arora 1.73 /* Removed for PEP 183.
|
513 kumpf 1.69 if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
514 (void *)q, _dispatch))
|
515 kumpf 1.67 {
516 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
517 "Monitor::run: Insufficient resources to process request.");
518 _entries[indx]._status = _MonitorEntry::IDLE;
519 _entry_mut.unlock();
520 return true;
521 }
|
522 a.arora 1.73 */
523 // Added for PEP 183
524 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
525 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
526 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
527 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
528 try
529 {
530 dst->run(1);
531 }
532 catch (...)
533 {
534 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
535 "Monitor::_dispatch: exception received");
536 }
537 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
538 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
539
540 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
541 // Once the HTTPConnection thread has set the status value to either
542 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
543 a.arora 1.73 // to the Monitor. It is no longer permissible to access the connection
544 // or the entry in the _entries table.
545 if (dst->_connectionClosePending)
546 {
547 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
548 }
549 else
550 {
551 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
552 }
553 // end Added for PEP 183
554 }
555 else if( _entries[indx]._type == Monitor::INTERNAL){
556 // set ourself to BUSY,
557 // read the data
558 // and set ourself back to IDLE
559
560 _entries[indx]._status == _MonitorEntry::BUSY;
561 static char buffer[2];
562 Socket::disableBlocking(_entries[indx].socket);
563 Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
564 a.arora 1.73 Socket::enableBlocking(_entries[indx].socket);
565 _entries[indx]._status == _MonitorEntry::IDLE;
|
566 mday 1.37 }
567 else
|
568 mday 1.25 {
|
569 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
570 "Non-connection entry, indx = %d, has been received.", indx);
|
571 mday 1.37 int events = 0;
572 events |= SocketMessage::READ;
573 Message *msg = new SocketMessage(_entries[indx].socket, events);
574 _entries[indx]._status = _MonitorEntry::BUSY;
575 _entry_mut.unlock();
|
576 mday 1.27
|
577 mday 1.37 q->enqueue(msg);
578 _entries[indx]._status = _MonitorEntry::IDLE;
|
579 mday 1.25 return true;
580 }
581 }
|
582 mday 1.37 catch(...)
|
583 mday 1.25 {
584 }
585 handled_events = true;
586 }
587 }
|
588 mday 1.24 }
|
589 mday 1.37 _entry_mut.unlock();
|
590 mday 1.13 return(handled_events);
|
591 mike 1.2 }
592
|
593 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
594 kumpf 1.48 {
595 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
596 a.arora 1.73 // set boolean then tickle the server to recognize _stopConnections
|
597 kumpf 1.48 _stopConnections = 1;
|
598 a.arora 1.73 tickle();
|
599 kumpf 1.48
|
600 chuck 1.74 if (wait)
|
601 a.arora 1.73 {
|
602 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
603 // caller of this function may unbind the ports while the monitor
604 // is still accepting connections on them.
605 try
606 {
607 _stopConnectionsSem.time_wait(10000);
608 }
609 catch (TimeOut &)
610 {
611 // The monitor is probably busy processng a very long request, and is
612 // not accepting connections. Let the caller unbind the ports.
613 }
|
614 a.arora 1.73 }
615
|
616 kumpf 1.48 PEG_METHOD_EXIT();
617 }
|
618 mday 1.25
|
619 mday 1.37
|
620 mday 1.25 int Monitor::solicitSocketMessages(
|
621 mike 1.2 Sint32 socket,
622 Uint32 events,
|
623 mday 1.8 Uint32 queueId,
624 int type)
|
625 mike 1.2 {
|
626 a.arora 1.73 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
627 _entry_mut.lock(pegasus_thread_self());
628 // Check to see if we need to dynamically grow the _entries array
629 // We always want the _entries array to 2 bigger than the
630 // current connections requested
631 _solicitSocketCount++; // bump the count
632 int size = (int)_entries.size();
633 if(_solicitSocketCount >= (size-1)){
634 for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){
635 _MonitorEntry entry(0, 0, 0);
636 _entries.append(entry);
637 }
638 }
|
639 kumpf 1.4
|
640 a.arora 1.73 int index;
641 for(index = 1; index < (int)_entries.size(); index++)
|
642 mday 1.25 {
|
643 a.arora 1.73 try
|
644 mday 1.37 {
|
645 a.arora 1.73 if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
646 {
647 _entries[index].socket = socket;
648 _entries[index].queueId = queueId;
649 _entries[index]._type = type;
650 _entries[index]._status = _MonitorEntry::IDLE;
651 _entry_mut.unlock();
652
653 return index;
654 }
|
655 mday 1.37 }
656 catch(...)
|
657 mday 1.25 {
658 }
659 }
|
660 a.arora 1.73 _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful
|
661 kumpf 1.50 _entry_mut.unlock();
|
662 mday 1.25 PEG_METHOD_EXIT();
|
663 kumpf 1.50 return -1;
|
664 a.arora 1.73
|
665 mike 1.2 }
666
|
667 mday 1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
|
668 mike 1.2 {
|
669 kumpf 1.50
|
670 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
671 mday 1.37 _entry_mut.lock(pegasus_thread_self());
|
672 a.arora 1.73
673 /*
674 Start at index = 1 because _entries[0] is the tickle entry which never needs
675 to be EMPTY;
676 */
677 int index;
678 for(index = 1; index < _entries.size(); index++)
|
679 mike 1.2 {
|
680 mday 1.25 if(_entries[index].socket == socket)
681 {
|
682 a.arora 1.73 _entries[index]._status = _MonitorEntry::EMPTY;
683 _entries[index].socket = -1;
684 _solicitSocketCount--;
685 break;
|
686 mday 1.25 }
|
687 mike 1.2 }
|
688 a.arora 1.73
689 /*
690 Dynamic Contraction:
691 To remove excess entries we will start from the end of the _entries array
692 and remove all entries with EMPTY status until we find the first NON EMPTY.
693 This prevents the positions, of the NON EMPTY entries, from being changed.
694 */
695 index = _entries.size() - 1;
696 while(_entries[index]._status == _MonitorEntry::EMPTY){
697 if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
698 _entries.remove(index);
699 index--;
700 }
701
|
702 mday 1.37 _entry_mut.unlock();
|
703 kumpf 1.4 PEG_METHOD_EXIT();
|
704 mike 1.2 }
705
|
706 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
707 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
708 {
|
709 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
710 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
711 kumpf 1.53 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
712 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
713 kumpf 1.51 try
714 {
715 dst->run(1);
716 }
717 catch (...)
718 {
719 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
720 "Monitor::_dispatch: exception received");
721 }
722 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
723 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
724
|
725 kumpf 1.53 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
|
726 kumpf 1.68
727 // Once the HTTPConnection thread has set the status value to either
728 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
729 // to the Monitor. It is no longer permissible to access the connection
730 // or the entry in the _entries table.
|
731 kumpf 1.50 if (dst->_connectionClosePending)
732 {
|
733 kumpf 1.68 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
734 }
735 else
736 {
737 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
738 kumpf 1.50 }
|
739 mday 1.8 return 0;
|
740 mday 1.40 }
741
742
743
744 ////************************* monitor 2 *****************************////
|
745 mday 1.43 ////************************* monitor 2 *****************************////
746 ////************************* monitor 2 *****************************////
747 ////************************* monitor 2 *****************************////
748 ////************************* monitor 2 *****************************////
749 ////************************* monitor 2 *****************************////
750 ////************************* monitor 2 *****************************////
|
751 mday 1.40
752
|
753 mday 1.56
754
755
|
756 mday 1.42 m2e_rep::m2e_rep(void)
|
757 mday 1.43 :Base(), state(IDLE)
758
|
759 mday 1.42 {
760 }
761
762 m2e_rep::m2e_rep(monitor_2_entry_type _type,
763 pegasus_socket _sock,
764 void* _accept,
765 void* _dispatch)
|
766 mday 1.43 : Base(), type(_type), state(IDLE), psock(_sock),
|
767 mday 1.42 accept_parm(_accept), dispatch_parm(_dispatch)
768 {
769
770 }
771
772 m2e_rep::~m2e_rep(void)
773 {
774 }
775
776 m2e_rep::m2e_rep(const m2e_rep& r)
777 : Base()
778 {
779 if(this != &r){
780 type = r.type;
781 psock = r.psock;
782 accept_parm = r.accept_parm;
783 dispatch_parm = r.dispatch_parm;
|
784 mday 1.43 state = IDLE;
785
|
786 mday 1.42 }
787 }
788
789
790 m2e_rep& m2e_rep::operator =(const m2e_rep& r)
791 {
792 if(this != &r) {
793 type = r.type;
794 psock = r.psock;
795 accept_parm = r.accept_parm;
796 dispatch_parm = r.dispatch_parm;
|
797 mday 1.43 state = IDLE;
|
798 mday 1.42 }
799 return *this;
800 }
801
802 Boolean m2e_rep::operator ==(const m2e_rep& r)
803 {
804 if(this == &r)
805 return true;
806 return false;
807 }
808
809 Boolean m2e_rep::operator ==(void* r)
810 {
811 if((void*)this == r)
812 return true;
813 return false;
814 }
815
816 m2e_rep::operator pegasus_socket() const
817 {
818 return psock;
819 mday 1.42 }
820
821
|
822 mday 1.40 monitor_2_entry::monitor_2_entry(void)
823 {
|
824 mday 1.42 _rep = new m2e_rep();
|
825 mday 1.40 }
826
|
827 mday 1.42 monitor_2_entry::monitor_2_entry(pegasus_socket& _psock,
828 monitor_2_entry_type _type,
829 void* _accept_parm, void* _dispatch_parm)
|
830 mday 1.40 {
|
831 mday 1.42 _rep = new m2e_rep(_type, _psock, _accept_parm, _dispatch_parm);
|
832 mday 1.40 }
833
834 monitor_2_entry::monitor_2_entry(const monitor_2_entry& e)
835 {
836 if(this != &e){
|
837 mday 1.42 Inc(this->_rep = e._rep);
|
838 mday 1.40 }
839 }
840
841 monitor_2_entry::~monitor_2_entry(void)
842 {
|
843 mday 1.60
|
844 mday 1.42 Dec(_rep);
|
845 mday 1.40 }
846
847 monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e)
848 {
849 if(this != &e){
|
850 mday 1.42 Dec(_rep);
851 Inc(this->_rep = e._rep);
|
852 mday 1.40 }
853 return *this;
854 }
855
|
856 mday 1.42 Boolean monitor_2_entry::operator ==(const monitor_2_entry& me) const
|
857 mday 1.40 {
858 if(this == &me)
859 return true;
860 return false;
861 }
862
|
863 mday 1.42 Boolean monitor_2_entry::operator ==(void* k) const
|
864 mday 1.40 {
865 if((void *)this == k)
866 return true;
867 return false;
868 }
869
870
|
871 mday 1.42 monitor_2_entry_type monitor_2_entry::get_type(void) const
|
872 mday 1.40 {
|
873 mday 1.42 return _rep->type;
874 }
875
876 void monitor_2_entry::set_type(monitor_2_entry_type t)
877 {
878 _rep->type = t;
879 }
880
881
|
882 mday 1.43 monitor_2_entry_state monitor_2_entry::get_state(void) const
883 {
884 return (monitor_2_entry_state) _rep->state.value();
885 }
886
887 void monitor_2_entry::set_state(monitor_2_entry_state t)
888 {
889 _rep->state = t;
890 }
891
|
892 mday 1.42 void* monitor_2_entry::get_accept(void) const
893 {
894 return _rep->accept_parm;
895 }
896
897 void monitor_2_entry::set_accept(void* a)
898 {
899 _rep->accept_parm = a;
900 }
901
902
903 void* monitor_2_entry::get_dispatch(void) const
904 {
905 return _rep->dispatch_parm;
906 }
907
908 void monitor_2_entry::set_dispatch(void* a)
909 {
910 _rep->dispatch_parm = a;
911 }
912
913 mday 1.42 pegasus_socket monitor_2_entry::get_sock(void) const
914 {
915 return _rep->psock;
916 }
917
918
919 void monitor_2_entry::set_sock(pegasus_socket& s)
920 {
921 _rep->psock = s;
922
|
923 mday 1.40 }
924
|
925 mday 1.59 //static monitor_2* _m2_instance;
|
926 mday 1.40
|
927 mday 1.49 AsyncDQueue<HTTPConnection2> monitor_2::_connections(true, 0);
928
|
929 mday 1.40 monitor_2::monitor_2(void)
|
930 mday 1.42 : _session_dispatch(0), _accept_dispatch(0), _listeners(true, 0),
|
931 mday 1.49 _ready(true, 0), _die(0), _requestCount(0)
|
932 mday 1.40 {
933 try {
934
935 bsd_socket_factory _factory;
936
937 // set up the listener/acceptor
938 pegasus_socket temp = pegasus_socket(&_factory);
939
940 temp.socket(PF_INET, SOCK_STREAM, 0);
941 // initialize the address
942 memset(&_tickle_addr, 0, sizeof(_tickle_addr));
|
943 marek 1.47 #ifdef PEGASUS_OS_ZOS
944 _tickle_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
945 #else
|
946 chuck 1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
947 #pragma convert(37)
948 #endif
|
949 mday 1.40 _tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
|
950 chuck 1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
951 #pragma convert(0)
952 #endif
|
953 marek 1.47 #endif
|
954 mday 1.40 _tickle_addr.sin_family = PF_INET;
955 _tickle_addr.sin_port = 0;
956
957 PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr);
958
959 temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr));
960 temp.listen(3);
961 temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size);
962
963 // set up the connector
964
965 pegasus_socket tickler = pegasus_socket(&_factory);
966 tickler.socket(PF_INET, SOCK_STREAM, 0);
967 struct sockaddr_in _addr;
968 memset(&_addr, 0, sizeof(_addr));
|
969 kumpf 1.48 #ifdef PEGASUS_OS_ZOS
|
970 marek 1.47 _addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
971 #else
|
972 mday 1.40 _addr.sin_addr.s_addr = inet_addr("127.0.0.1");
|
973 marek 1.47 #endif
|
974 mday 1.40 _addr.sin_family = PF_INET;
975 _addr.sin_port = 0;
976 tickler.bind((struct sockaddr*)&_addr, sizeof(_addr));
977 tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr));
978
|
979 mday 1.42 _tickler.set_sock(tickler);
980 _tickler.set_type(INTERNAL);
|
981 mday 1.43 _tickler.set_state(BUSY);
982
|
983 mday 1.40 struct sockaddr_in peer;
984 memset(&peer, 0, sizeof(peer));
985 PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
986
987 pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size);
|
988 mday 1.57
|
989 mday 1.42 monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL, 0, 0);
|
990 a.arora 1.71
991 // No need to set _tickle's state as BUSY, since monitor_2::run() now
992 // does a select only on sockets which are in IDLE (default) state.
993 // _tickle->set_state(BUSY);
|
994 mday 1.43
|
995 mday 1.40 _listeners.insert_first(_tickle);
996
997 }
998 catch(...){ }
999 }
1000
1001 monitor_2::~monitor_2(void)
1002 {
|
1003 mday 1.60
1004 stop();
1005
|
1006 mday 1.41 try {
1007 monitor_2_entry* temp = _listeners.remove_first();
1008 while(temp){
1009 delete temp;
1010 temp = _listeners.remove_first();
1011 }
1012 }
|
1013 mday 1.60
|
1014 mday 1.41 catch(...){ }
|
1015 mday 1.60
1016
1017 try
1018 {
1019 HTTPConnection2* temp = _connections.remove_first();
1020 while(temp)
1021 {
1022 delete temp;
1023 temp = _connections.remove_first();
1024 }
1025 }
1026 catch(...)
1027 {
1028 }
1029
1030
|
1031 mday 1.40 }
1032
1033
1034 void monitor_2::run(void)
1035 {
1036 monitor_2_entry* temp;
|
1037 a.arora 1.71 int _nonIdle=0, _idleCount=0, events;
1038
|
1039 mday 1.40 while(_die.value() == 0) {
|
1040 a.arora 1.71 _nonIdle=_idleCount=0;
|
1041 mday 1.49
|
1042 mday 1.57 struct timeval tv_idle = { 60, 0 };
|
1043 mday 1.56
|
1044 mday 1.40 // place all sockets in the select set
1045 FD_ZERO(&rd_fd_set);
1046 try {
1047 _listeners.lock(pegasus_thread_self());
1048 temp = _listeners.next(0);
|
1049 a.arora 1.71 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1050 "monitor_2::run:Creating New FD list for SELECT.");
|
1051 mday 1.40 while(temp != 0 ){
|
1052 mday 1.57 if(temp->get_state() == CLOSED ) {
|
1053 mday 1.43 monitor_2_entry* closed = temp;
|
1054 a.arora 1.72 temp = _listeners.next(closed);
|
1055 mday 1.43 _listeners.remove_no_lock(closed);
|
1056 a.arora 1.71
1057 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1058 "monitor_2::run:Deleteing CLOSED socket fd=%d.",(Sint32)closed->get_sock());
|
1059 mday 1.60
|
1060 mday 1.49 HTTPConnection2 *cn = monitor_2::remove_connection((Sint32)(closed->get_sock()));
1061 delete cn;
|
1062 mday 1.43 delete closed;
1063 }
|
1064 mday 1.45 if(temp == 0)
1065 break;
|
1066 a.arora 1.71
1067
1068 //Count the number if IDLE sockets
1069 if(temp->get_state() != IDLE ) _nonIdle++;
1070 else _idleCount++;
1071
|
1072 mday 1.46 Sint32 fd = (Sint32) temp->get_sock();
|
1073 a.arora 1.71
1074 //Select should be called ONLY on the FDs which are in IDLE state
1075 if((fd >= 0) && (temp->get_state() == IDLE))
1076 {
1077 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1078 "monitor_2::run:Adding FD %d to the list for SELECT.",fd);
1079 FD_SET(fd , &rd_fd_set);
1080 }
1081 temp = _listeners.next(temp);
|
1082 mday 1.40 }
1083 _listeners.unlock();
1084 }
1085 catch(...){
1086 return;
1087 }
|
1088 a.arora 1.71
|
1089 mday 1.42 // important - the dispatch routine has pointers to all the
1090 // entries that are readable. These entries can be changed but
1091 // the pointer must not be tampered with.
|
1092 mday 1.56 if(_connections.count() )
|
1093 a.arora 1.71 events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL);
|
1094 mday 1.56 else
|
1095 a.arora 1.71 events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, &tv_idle);
|
1096 mday 1.57
1097 if(_die.value())
1098 {
1099 break;
1100 }
|
1101 a.arora 1.71
1102 #ifdef PEGASUS_OS_TYPE_WINDOWS
1103 if(events == SOCKET_ERROR)
1104 #else
1105 if(events == -1)
1106 #endif
1107 {
1108 Tracer::trace(TRC_HTTP, Tracer::LEVEL2,
1109 "monitor_2:run:INVALID FD. errorno = %d on select.", errno);
1110 // The EBADF error indicates that one or more or the file
1111 // descriptions was not valid. This could indicate that
1112 // the _entries structure has been corrupted or that
1113 // we have a synchronization error.
1114
1115 // Keeping the line below commented for time being.
1116 // PEGASUS_ASSERT(errno != EBADF);
1117 }
1118 else if (events)
1119 {
1120 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1121 "monitor_2::run select event received events = %d, monitoring %d idle entries", events, _idleCount);
1122 a.arora 1.71
|
1123 mday 1.57
|
1124 mday 1.40 try {
1125 _listeners.lock(pegasus_thread_self());
1126 temp = _listeners.next(0);
1127 while(temp != 0 ){
|
1128 a.arora 1.72 Sint32 fd = (Sint32) temp->get_sock();
1129 if(fd >= 0 && FD_ISSET(fd, &rd_fd_set)) {
1130 if(temp->get_type() != CLIENTSESSION) temp->set_state(BUSY);
|
1131 mday 1.42 FD_CLR(fd, &rd_fd_set);
1132 monitor_2_entry* ready = new monitor_2_entry(*temp);
|
1133 mday 1.49 try
1134 {
1135 _ready.insert_first(ready);
1136 }
1137 catch(...)
1138 {
1139 }
1140
|
1141 mday 1.42 _requestCount++;
|
1142 mday 1.40 }
1143 temp = _listeners.next(temp);
1144 }
1145 _listeners.unlock();
1146 }
1147 catch(...){
1148 return;
1149 }
1150 // now handle the sockets that are ready to read
|
1151 mday 1.56 if(_ready.count())
1152 _dispatch();
1153 else
1154 {
1155 if(_connections.count() == 0 )
1156 _idle_dispatch(_idle_parm);
1157 }
|
1158 a.arora 1.71 } // if events
|
1159 mday 1.40 } // while alive
|
1160 a.arora 1.72 _die=0;
|
1161 mday 1.57
|
1162 mday 1.40 }
1163
|
1164 dj.gorey 1.70 int monitor_2::solicitSocketMessages(
1165 Sint32 socket,
1166 Uint32 events,
1167 Uint32 queueId,
1168 int type)
1169 {
1170
|
1171 a.arora 1.71 PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::solicitSocketMessages");
|
1172 dj.gorey 1.70
1173 _entry_mut.lock(pegasus_thread_self());
1174
1175 for(int index = 0; index < (int)_entries.size(); index++)
1176 {
1177 try
1178 {
1179 if(_entries[index]._status.value() == monitor_2_entry::EMPTY)
1180 {
1181 _entries[index].socket = socket;
1182 //_entries[index].queueId = queueId;
1183 //_entries[index]._type = type;
|
1184 a.arora 1.71 _entries[index]._status = IDLE;
|
1185 dj.gorey 1.70 _entry_mut.unlock();
1186
1187 return index;
1188 }
1189 }
1190 catch(...)
1191 {
1192 }
1193
1194 }
1195 _entry_mut.unlock();
1196 PEG_METHOD_EXIT();
1197 return -1;
1198 }
1199
1200
1201 void monitor_2::unsolicitSocketMessages(Sint32 socket)
1202 {
1203
1204 PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::unsolicitSocketMessages");
1205 _entry2_mut.lock(pegasus_thread_self());
1206 dj.gorey 1.70
1207 for(int index = 0; index < (int)_entries2.size(); index++)
1208 {
1209 if(_entries2[index].socket == socket)
1210 {
1211 _entries2[index]._status = monitor_2_entry::EMPTY;
1212 _entries2[index].socket = -1;
1213 break;
1214 }
1215 }
1216 _entry2_mut.unlock();
1217 PEG_METHOD_EXIT();
1218 }
1219
|
1220 mday 1.42 void* monitor_2::set_session_dispatch(void (*dp)(monitor_2_entry*))
|
1221 mday 1.40 {
|
1222 mday 1.42 void* old = (void *)_session_dispatch;
|
1223 mday 1.40 _session_dispatch = dp;
1224 return old;
1225 }
1226
|
1227 mday 1.42 void* monitor_2::set_accept_dispatch(void (*dp)(monitor_2_entry*))
1228 {
1229 void* old = (void*)_accept_dispatch;
1230 _accept_dispatch = dp;
1231 return old;
|
1232 mday 1.56 }
1233
1234 void* monitor_2::set_idle_dispatch(void (*dp)(void*))
1235 {
1236 void* old = (void*)_idle_dispatch;
1237 _idle_dispatch = dp;
1238 return old;
1239 }
1240
1241 void* monitor_2::set_idle_parm(void* parm)
1242 {
1243 void* old = _idle_parm;
1244 _idle_parm = parm;
1245 return old;
|
1246 mday 1.42 }
1247
|
1248 mday 1.40
|
1249 mday 1.60
1250 //-----------------------------------------------------------------
1251 // Note on deleting the monitor_2_entry nodes:
1252 // Each case: in the switch statement needs to handle the deletion
1253 // of the monitor_2_entry * node differently. A SESSION dispatch
1254 // routine MUST DELETE the entry during its dispatch handling.
1255 // All other dispatch routines MUST NOT delete the entry during the
1256 // dispatch handling, but must allow monitor_2::_dispatch to delete
1257 // the entry.
1258 //
1259 // The reason is pretty obscure and it is debatable whether or not
1260 // to even bother, but during cimserver shutdown the single monitor_2_entry*
1261 // will leak unless the _session_dispatch routine takes care of deleting it.
1262 //
1263 // The reason is that a shutdown messages completely stops everything and
1264 // the _session_dispatch routine never returns. So monitor_2::_dispatch is
1265 // never able to do its own cleanup.
1266 //
1267 // << Mon Oct 13 09:33:33 2003 mdd >>
1268 //-----------------------------------------------------------------
1269
|
1270 mday 1.40 void monitor_2::_dispatch(void)
1271 {
|
1272 mday 1.49 monitor_2_entry* entry;
1273
1274 try
1275 {
1276
1277 entry = _ready.remove_first();
1278 }
1279 catch(...)
1280 {
1281 }
1282
1283 while(entry != 0 ) {
|
1284 mday 1.42 switch(entry->get_type()) {
|
1285 mday 1.40 case INTERNAL:
1286 static char buffer[2];
|
1287 mday 1.49 entry->get_sock().disableBlocking();
|
1288 mday 1.42 entry->get_sock().read(&buffer, 2);
|
1289 mday 1.49 entry->get_sock().enableBlocking();
|
1290 a.arora 1.71 entry->set_state(IDLE); // Set state of the socket to IDLE so that
1291 // monitor_2::run can add to the list of FDs
1292 // on which select would be called.
1293
1294
1295
|
1296 mday 1.60 delete entry;
1297
|
1298 mday 1.40 break;
1299 case LISTEN:
1300 {
1301 static struct sockaddr peer;
1302 static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
|
1303 mday 1.49 entry->get_sock().disableBlocking();
|
1304 mday 1.42 pegasus_socket connected = entry->get_sock().accept(&peer, &peer_size);
|
1305 a.arora 1.71 entry->set_state(IDLE); // Set state of the LISTEN socket to IDLE
|
1306 mday 1.65 #ifdef PEGASUS_OS_TYPE_WINDOWS
1307 if((Sint32)connected == SOCKET_ERROR)
1308 #else
1309 if((Sint32)connected == -1 )
1310 #endif
1311 {
1312 delete entry;
1313 break;
1314 }
1315
|
1316 mday 1.49 entry->get_sock().enableBlocking();
|
1317 mday 1.42 monitor_2_entry *temp = add_entry(connected, SESSION, entry->get_accept(), entry->get_dispatch());
1318 if(temp && _accept_dispatch != 0)
|
1319 mday 1.49 _accept_dispatch(temp);
|
1320 mday 1.60 delete entry;
1321
|
1322 mday 1.40 }
1323 break;
1324 case SESSION:
|
1325 a.arora 1.72 case CLIENTSESSION:
|
1326 mday 1.60 if(_session_dispatch != 0 )
1327 {
1328 // NOTE: _session_dispatch will delete entry - do not do it here
|
1329 a.arora 1.72 unsigned client=0;
1330 if(entry->get_type() == CLIENTSESSION) client = 1;
1331 Sint32 sock=(Sint32)(entry->get_sock());
1332
1333 _session_dispatch(entry);
1334
1335 if(client)
1336 {
1337 HTTPConnection2 *cn = monitor_2::remove_connection(sock);
1338 if(cn) delete cn;
1339 // stop();
1340 _die=1;
1341 }
|
1342 mday 1.60 }
1343
|
1344 mday 1.40 else {
1345 static char buffer[4096];
|
1346 mday 1.42 int bytes = entry->get_sock().read(&buffer, 4096);
|
1347 mday 1.60 delete entry;
|
1348 mday 1.40 }
1349
1350 break;
1351 case UNTYPED:
1352 default:
|
1353 mday 1.60 delete entry;
|
1354 mday 1.40 break;
1355 }
|
1356 mday 1.42 _requestCount--;
|
1357 mday 1.49
1358 if(_ready.count() == 0 )
1359 break;
1360
1361 try
1362 {
1363 entry = _ready.remove_first();
1364 }
1365 catch(...)
1366 {
1367 }
1368
|
1369 mday 1.40 }
1370 }
1371
1372 void monitor_2::stop(void)
1373 {
1374 _die = 1;
1375 tickle();
1376 // shut down the listener list, free the list nodes
|
1377 mday 1.42 _tickler.get_sock().close();
|
1378 mday 1.40 _listeners.shutdown_queue();
1379 }
1380
1381 void monitor_2::tickle(void)
1382 {
1383 static char _buffer[] =
1384 {
1385 '0','0'
1386 };
1387
|
1388 mday 1.57 _tickler.get_sock().disableBlocking();
1389
|
1390 mday 1.42 _tickler.get_sock().write(&_buffer, 2);
|
1391 mday 1.57 _tickler.get_sock().enableBlocking();
1392
|
1393 mday 1.40 }
1394
1395
|
1396 mday 1.42 monitor_2_entry* monitor_2::add_entry(pegasus_socket& ps,
1397 monitor_2_entry_type type,
1398 void* accept_parm,
1399 void* dispatch_parm)
|
1400 mday 1.40 {
|
1401 a.arora 1.71 Sint32 fd1,fd2;
1402
1403 fd2=(Sint32) ps;
1404
|
1405 mday 1.42 monitor_2_entry* m2e = new monitor_2_entry(ps, type, accept_parm, dispatch_parm);
|
1406 a.arora 1.71
1407 // The purpose of the following piece of code is to avoid duplicate entries in
1408 // the _listeners list. Would it be too much of an overhead ?
1409 try {
1410
1411 monitor_2_entry* temp;
1412
1413 _listeners.lock(pegasus_thread_self());
1414 temp = _listeners.next(0);
1415 while(temp != 0 )
1416 {
1417 fd1=(Sint32) temp->get_sock();
1418
1419 if(fd1 == fd2)
1420 {
1421
1422 Tracer::trace(TRC_HTTP, Tracer::LEVEL3,
1423 "monitor_2::add_entry:Request for duplicate entry in _listeners for %d FD.", fd1);
1424 if(temp->get_state() == CLOSED)
1425 {
1426 temp->set_state(IDLE);
1427 a.arora 1.71 Tracer::trace(TRC_HTTP, Tracer::LEVEL3,
1428 "monitor_2::add_entry:CLOSED state changed to IDLE for %d.", fd1);
1429 }
1430 _listeners.unlock();
1431 delete m2e;
1432 return 0;
1433 }
1434 temp = _listeners.next(temp);
1435 }
1436 }
1437 catch(...)
1438 {
1439 delete m2e;
1440 return 0;
1441 }
1442
1443
1444 _listeners.unlock();
1445
1446
|
1447 mday 1.40 try{
1448 _listeners.insert_first(m2e);
1449 }
1450 catch(...){
1451 delete m2e;
|
1452 mday 1.42 return 0;
|
1453 mday 1.40 }
|
1454 a.arora 1.71 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1455 "monitor_2::add_entry:SUCCESSFULLY added to _listeners list. FD = %d.", fd2);
|
1456 mday 1.40 tickle();
|
1457 mday 1.42 return m2e;
|
1458 mday 1.40 }
1459
1460 Boolean monitor_2::remove_entry(Sint32 s)
1461 {
1462 monitor_2_entry* temp;
1463 try {
1464 _listeners.try_lock(pegasus_thread_self());
1465 temp = _listeners.next(0);
1466 while(temp != 0){
|
1467 mday 1.42 if(s == (Sint32)temp->_rep->psock ){
|
1468 mday 1.40 temp = _listeners.remove_no_lock(temp);
1469 delete temp;
1470 _listeners.unlock();
1471 return true;
1472 }
1473 temp = _listeners.next(temp);
1474 }
1475 _listeners.unlock();
1476 }
1477 catch(...){
1478 }
1479 return false;
|
1480 mday 1.7 }
|
1481 mday 1.37
|
1482 mday 1.42 Uint32 monitor_2::getOutstandingRequestCount(void)
1483 {
1484 return _requestCount.value();
1485
|
1486 mday 1.49 }
1487
1488
1489 HTTPConnection2* monitor_2::remove_connection(Sint32 sock)
1490 {
1491
1492 HTTPConnection2* temp;
1493 try
1494 {
1495 monitor_2::_connections.lock(pegasus_thread_self());
1496 temp = monitor_2::_connections.next(0);
1497 while(temp != 0 )
1498 {
1499 if(sock == temp->getSocket())
1500 {
1501 temp = monitor_2::_connections.remove_no_lock(temp);
1502 monitor_2::_connections.unlock();
1503 return temp;
1504 }
1505 temp = monitor_2::_connections.next(temp);
1506 }
1507 mday 1.49 monitor_2::_connections.unlock();
1508 }
1509 catch(...)
1510 {
1511 }
1512 return 0;
1513 }
1514
1515 Boolean monitor_2::insert_connection(HTTPConnection2* connection)
1516 {
1517 try
1518 {
1519 monitor_2::_connections.insert_first(connection);
1520 }
1521 catch(...)
1522 {
1523 return false;
1524 }
1525 return true;
|
1526 mday 1.42 }
|
1527 mday 1.7
|
1528 mike 1.2
1529 PEGASUS_NAMESPACE_END
|