1 karl 1.79 //%2004////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 mike 1.2 //
10 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
11 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
12 // deal in the Software without restriction, including without limitation the
13 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
14 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
15 // furnished to do so, subject to the following conditions:
|
16 r.kieninger 1.83 //
|
17 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
18 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
19 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
20 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
21 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
22 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
23 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
24 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 //
26 //==============================================================================
27 //
28 // Author: Mike Brasher (mbrasher@bmc.com)
29 //
|
30 r.kieninger 1.83 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
|
31 a.arora 1.71 // Amit K Arora (Bug#1153) amita@in.ibm.com
|
32 alagaraja 1.75 // Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
|
33 sushma.fernandes 1.78 // Sushma Fernandes (sushma@hp.com) for Bug#2057
|
34 joyce.j 1.84 // Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
|
35 kumpf 1.85 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
36 mike 1.2 //
37 //%/////////////////////////////////////////////////////////////////////////////
38
39 #include <Pegasus/Common/Config.h>
|
40 mday 1.40
|
41 mike 1.2 #include <cstring>
42 #include "Monitor.h"
43 #include "MessageQueue.h"
44 #include "Socket.h"
|
45 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
46 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
47 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
48 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
49 mike 1.2
50 #ifdef PEGASUS_OS_TYPE_WINDOWS
51 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
52 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
53 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
54 mday 1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
|
55 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
56 otherwise, less than 64 clients (the default) will be able to connect to the \
57 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
58 mday 1.5
|
59 mike 1.2 # endif
60 # define FD_SETSIZE 1024
|
61 mday 1.5 # include <windows.h>
|
62 mike 1.2 #else
63 # include <sys/types.h>
64 # include <sys/socket.h>
65 # include <sys/time.h>
66 # include <netinet/in.h>
67 # include <netdb.h>
68 # include <arpa/inet.h>
69 #endif
70
71 PEGASUS_USING_STD;
72
73 PEGASUS_NAMESPACE_BEGIN
74
|
75 kumpf 1.86 // Define a platform-neutral socket length type
76 #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM)
77 typedef size_t PEGASUS_SOCKLEN_T;
78 #elif defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) || defined(PEGASUS_OS_LINUX) || (defined(PEGASUS_OS_SOLARIS) && !defined(SUNOS_5_6))
79 typedef socklen_t PEGASUS_SOCKLEN_T;
80 #else
81 typedef int PEGASUS_SOCKLEN_T;
82 #endif
|
83 mday 1.18
|
84 mday 1.25 static AtomicInt _connections = 0;
85
86 static struct timeval create_time = {0, 1};
|
87 mday 1.38 static struct timeval destroy_time = {300, 0};
|
88 mday 1.26 static struct timeval deadlock_time = {0, 0};
|
89 mday 1.18
|
90 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
91 //
92 // MonitorRep
93 //
94 ////////////////////////////////////////////////////////////////////////////////
95
96 struct MonitorRep
97 {
98 fd_set rd_fd_set;
99 fd_set wr_fd_set;
100 fd_set ex_fd_set;
101 fd_set active_rd_fd_set;
102 fd_set active_wr_fd_set;
103 fd_set active_ex_fd_set;
104 };
105
106 ////////////////////////////////////////////////////////////////////////////////
107 //
108 // Monitor
109 //
110 ////////////////////////////////////////////////////////////////////////////////
111 mike 1.2
|
112 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
113 mike 1.2 Monitor::Monitor()
|
114 r.kieninger 1.83 : _module_handle(0),
|
115 a.arora 1.73 _controller(0),
116 _async(false),
117 _stopConnections(0),
118 _stopConnectionsSem(0),
|
119 a.dunfey 1.76 _solicitSocketCount(0),
|
120 a.dunfey 1.77 _tickle_client_socket(-1),
121 _tickle_server_socket(-1),
122 _tickle_peer_socket(-1)
|
123 mike 1.2 {
|
124 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
125 mike 1.2 Socket::initializeInterface();
|
126 mday 1.25 _rep = 0;
|
127 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
128 a.arora 1.73
129 // setup the tickler
130 initializeTickler();
|
131 r.kieninger 1.83
132 // Start the count at 1 because initilizeTickler()
133 // has added an entry in the first position of the
|
134 a.arora 1.73 // _entries array
135 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
136 mday 1.37 {
137 _MonitorEntry entry(0, 0, 0);
138 _entries.append(entry);
139 }
|
140 mike 1.2 }
141
|
142 mday 1.18 Monitor::Monitor(Boolean async)
|
143 a.arora 1.73 : _module_handle(0),
144 _controller(0),
145 _async(async),
146 _stopConnections(0),
147 _stopConnectionsSem(0),
|
148 a.dunfey 1.76 _solicitSocketCount(0),
|
149 a.dunfey 1.77 _tickle_client_socket(-1),
150 _tickle_server_socket(-1),
151 _tickle_peer_socket(-1)
|
152 mday 1.18 {
|
153 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
154 mday 1.18 Socket::initializeInterface();
|
155 mday 1.25 _rep = 0;
|
156 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
157 a.arora 1.73
158 // setup the tickler
159 initializeTickler();
160
|
161 r.kieninger 1.83 // Start the count at 1 because initilizeTickler()
|
162 a.arora 1.73 // has added an entry in the first position of the
163 // _entries array
164 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
165 mday 1.37 {
166 _MonitorEntry entry(0, 0, 0);
167 _entries.append(entry);
168 }
|
169 mday 1.18 }
|
170 mday 1.20
|
171 mike 1.2 Monitor::~Monitor()
172 {
|
173 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
174 "deregistering with module controller");
|
175 kumpf 1.10
|
176 joyce.j 1.84 if(_module_handle.get() != NULL)
|
177 mday 1.8 {
178 _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
|
179 joyce.j 1.84 _controller.reset();
180 _module_handle.reset();
|
181 mday 1.8 }
|
182 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
|
183 kumpf 1.48
|
184 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
185 a.dunfey 1.76
186 try{
|
187 a.dunfey 1.77 if(_tickle_peer_socket >= 0)
|
188 a.dunfey 1.76 {
189 Socket::close(_tickle_peer_socket);
190 }
|
191 a.dunfey 1.77 if(_tickle_client_socket >= 0)
|
192 a.dunfey 1.76 {
193 Socket::close(_tickle_client_socket);
194 }
|
195 a.dunfey 1.77 if(_tickle_server_socket >= 0)
|
196 a.dunfey 1.76 {
197 Socket::close(_tickle_server_socket);
198 }
199 }
200 catch(...)
201 {
202 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
203 "Failed to close tickle sockets");
204 }
205
|
206 mike 1.2 Socket::uninitializeInterface();
|
207 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
208 "returning from monitor destructor");
|
209 mday 1.18 }
210
|
211 a.arora 1.73 void Monitor::initializeTickler(){
|
212 r.kieninger 1.83 /*
213 NOTE: On any errors trying to
214 setup out tickle connection,
|
215 a.arora 1.73 throw an exception/end the server
216 */
217
218 /* setup the tickle server/listener */
219
220 // get a socket for the server side
221 if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
222 //handle error
223 MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
224 "Received error number $0 while creating the internal socket.",
225 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
226 errno);
227 #else
228 WSAGetLastError());
229 #endif
230 throw Exception(parms);
231 }
232
233 // initialize the address
234 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
235 #ifdef PEGASUS_OS_ZOS
236 a.arora 1.73 _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
237 #else
238 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
239 #pragma convert(37)
240 #endif
241 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
242 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
243 #pragma convert(0)
244 #endif
245 #endif
246 _tickle_server_addr.sin_family = PF_INET;
247 _tickle_server_addr.sin_port = 0;
248
|
249 kumpf 1.86 PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
|
250 a.arora 1.73
251 // bind server side to socket
252 if((::bind(_tickle_server_socket,
|
253 r.kieninger 1.83 (struct sockaddr *)&_tickle_server_addr,
|
254 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
255 // handle error
|
256 r.kieninger 1.83 #ifdef PEGASUS_OS_ZOS
257 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
258 "Received error:$0 while binding the internal socket.",strerror(errno));
259 #else
|
260 a.arora 1.73 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
261 "Received error number $0 while binding the internal socket.",
262 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
263 errno);
264 #else
265 WSAGetLastError());
266 #endif
|
267 r.kieninger 1.83 #endif
|
268 a.arora 1.73 throw Exception(parms);
269 }
270
271 // tell the kernel we are a server
272 if((::listen(_tickle_server_socket,3)) < 0){
273 // handle error
274 MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
275 "Received error number $0 while listening to the internal socket.",
276 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
277 errno);
278 #else
279 WSAGetLastError());
280 #endif
281 throw Exception(parms);
282 }
|
283 r.kieninger 1.83
|
284 a.arora 1.73 // make sure we have the correct socket for our server
285 int sock = ::getsockname(_tickle_server_socket,
286 (struct sockaddr*)&_tickle_server_addr,
|
287 r.kieninger 1.83 &_addr_size);
|
288 a.arora 1.73 if(sock < 0){
289 // handle error
290 MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
291 "Received error number $0 while getting the internal socket name.",
292 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
293 errno);
294 #else
295 WSAGetLastError());
296 #endif
297 throw Exception(parms);
298 }
299
300 /* set up the tickle client/connector */
|
301 r.kieninger 1.83
|
302 a.arora 1.73 // get a socket for our tickle client
303 if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
304 // handle error
305 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
306 "Received error number $0 while creating the internal client socket.",
307 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
308 errno);
309 #else
310 WSAGetLastError());
311 #endif
312 throw Exception(parms);
313 }
314
315 // setup the address of the client
316 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
317 #ifdef PEGASUS_OS_ZOS
318 _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
319 #else
320 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
321 #pragma convert(37)
322 #endif
323 a.arora 1.73 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
324 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
325 #pragma convert(0)
326 #endif
327 #endif
328 _tickle_client_addr.sin_family = PF_INET;
329 _tickle_client_addr.sin_port = 0;
330
331 // bind socket to client side
332 if((::bind(_tickle_client_socket,
333 (struct sockaddr*)&_tickle_client_addr,
334 sizeof(_tickle_client_addr))) < 0){
335 // handle error
336 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
337 "Received error number $0 while binding the internal client socket.",
338 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
339 errno);
340 #else
341 WSAGetLastError());
342 #endif
343 throw Exception(parms);
344 a.arora 1.73 }
345
346 // connect to server side
347 if((::connect(_tickle_client_socket,
348 (struct sockaddr*)&_tickle_server_addr,
349 sizeof(_tickle_server_addr))) < 0){
350 // handle error
351 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
352 "Received error number $0 while connecting the internal client socket.",
353 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
354 errno);
355 #else
356 WSAGetLastError());
357 #endif
358 throw Exception(parms);
359 }
360
361 /* set up the slave connection */
362 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
|
363 kumpf 1.86 PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
|
364 r.kieninger 1.83 pegasus_sleep(1);
|
365 a.arora 1.73
366 // this call may fail, we will try a max of 20 times to establish this peer connection
367 if((_tickle_peer_socket = ::accept(_tickle_server_socket,
368 (struct sockaddr*)&_tickle_peer_addr,
369 &peer_size)) < 0){
370 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
371 // Only retry on non-windows platforms.
372 if(_tickle_peer_socket == -1 && errno == EAGAIN)
373 {
|
374 r.kieninger 1.83 int retries = 0;
|
375 a.arora 1.73 do
376 {
377 pegasus_sleep(1);
378 _tickle_peer_socket = ::accept(_tickle_server_socket,
379 (struct sockaddr*)&_tickle_peer_addr,
380 &peer_size);
381 retries++;
382 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
383 }
384 #endif
385 }
386 if(_tickle_peer_socket == -1){
387 // handle error
388 MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
389 "Received error number $0 while accepting the internal socket connection.",
390 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
391 errno);
392 #else
393 WSAGetLastError());
394 #endif
395 throw Exception(parms);
396 a.arora 1.73 }
397 // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
398 // checks entries with IDLE state for events
399 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
400 entry._status = _MonitorEntry::IDLE;
401 _entries.append(entry);
402 }
403
404 void Monitor::tickle(void)
405 {
|
406 sushma.fernandes 1.78 static char _buffer[] =
|
407 a.arora 1.73 {
408 '0','0'
409 };
|
410 r.kieninger 1.83
|
411 sushma.fernandes 1.78 AutoMutex autoMutex(_tickle_mutex);
|
412 r.kieninger 1.83 Socket::disableBlocking(_tickle_client_socket);
|
413 sushma.fernandes 1.78 Socket::write(_tickle_client_socket,&_buffer, 2);
|
414 r.kieninger 1.83 Socket::enableBlocking(_tickle_client_socket);
|
415 sushma.fernandes 1.78 }
416
417 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
418 {
419 // Set the state to requested state
420 _entries[index]._status = status;
|
421 a.arora 1.73 }
422
|
423 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
424 {
|
425 mday 1.18
|
426 mday 1.25 Boolean handled_events = false;
|
427 a.arora 1.73 int i = 0;
|
428 r.kieninger 1.83
|
429 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
430 a.arora 1.73
|
431 mday 1.25 fd_set fdread;
432 FD_ZERO(&fdread);
|
433 a.arora 1.73
|
434 mday 1.37 _entry_mut.lock(pegasus_thread_self());
|
435 r.kieninger 1.83
436 // Check the stopConnections flag. If set, clear the Acceptor monitor entries
437 if (_stopConnections == 1)
|
438 kumpf 1.48 {
439 for ( int indx = 0; indx < (int)_entries.size(); indx++)
440 {
441 if (_entries[indx]._type == Monitor::ACCEPTOR)
442 {
443 if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)
444 {
445 if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||
446 _entries[indx]._status.value() == _MonitorEntry::DYING )
447 {
448 // remove the entry
449 _entries[indx]._status = _MonitorEntry::EMPTY;
450 }
451 else
452 {
453 // set status to DYING
|
454 kumpf 1.52 _entries[indx]._status = _MonitorEntry::DYING;
|
455 kumpf 1.48 }
456 }
457 }
458 }
459 _stopConnections = 0;
|
460 a.arora 1.73 _stopConnectionsSem.signal();
|
461 kumpf 1.48 }
|
462 kumpf 1.51
|
463 kumpf 1.68 for( int indx = 0; indx < (int)_entries.size(); indx++)
464 {
|
465 brian.campbell 1.80 const _MonitorEntry &entry = _entries[indx];
466 if ((entry._status.value() == _MonitorEntry::DYING) &&
467 (entry._type == Monitor::CONNECTION))
|
468 kumpf 1.68 {
|
469 brian.campbell 1.80 MessageQueue *q = MessageQueue::lookup(entry.queueId);
|
470 kumpf 1.68 PEGASUS_ASSERT(q != 0);
|
471 brian.campbell 1.80 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
|
472 r.kieninger 1.83
|
473 brian.campbell 1.80 if (h._connectionClosePending == false)
474 continue;
475
476 // NOTE: do not attempt to delete while there are pending responses
|
477 r.kieninger 1.83 // coming thru. The last response to come thru after a
|
478 brian.campbell 1.80 // _connectionClosePending will reset _responsePending to false
479 // and then cause the monitor to rerun this code and clean up.
480 // (see HTTPConnection.cpp)
481
482 if (h._responsePending == true)
483 {
484 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
485 "Ignoring connection delete request because "
486 "responses are still pending. "
|
487 r.kieninger 1.83 "connection=0x%p, socket=%d\n",
|
488 brian.campbell 1.81 (void *)&h, h.getSocket());
|
489 brian.campbell 1.80 continue;
490 }
491 h._connectionClosePending = false;
492 MessageQueue &o = h.get_owner();
493 Message* message= new CloseConnectionMessage(entry.socket);
|
494 kumpf 1.68 message->dest = o.getQueueId();
495
|
496 r.kieninger 1.83 // HTTPAcceptor is responsible for closing the connection.
|
497 kumpf 1.68 // The lock is released to allow HTTPAcceptor to call
|
498 r.kieninger 1.83 // unsolicitSocketMessages to free the entry.
|
499 kumpf 1.68 // Once HTTPAcceptor completes processing of the close
500 // connection, the lock is re-requested and processing of
501 // the for loop continues. This is safe with the current
502 // implementation of the _entries object. Note that the
503 // loop condition accesses the _entries.size() on each
504 // iteration, so that a change in size while the mutex is
505 // unlocked will not result in an ArrayIndexOutOfBounds
506 // exception.
507
508 _entry_mut.unlock();
509 o.enqueue(message);
510 _entry_mut.lock(pegasus_thread_self());
511 }
512 }
513
|
514 kumpf 1.51 Uint32 _idleEntries = 0;
|
515 r.kieninger 1.83
|
516 a.arora 1.73 /*
517 We will keep track of the maximum socket number and pass this value
|
518 r.kieninger 1.83 to the kernel as a parameter to SELECT. This loop seems like a good
|
519 a.arora 1.73 place to calculate the max file descriptor (maximum socket number)
520 because we have to traverse the entire array.
|
521 r.kieninger 1.83 */
|
522 a.arora 1.73 int maxSocketCurrentPass = 0;
|
523 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
|
524 mike 1.2 {
|
525 a.arora 1.73 if(maxSocketCurrentPass < _entries[indx].socket)
526 maxSocketCurrentPass = _entries[indx].socket;
527
|
528 mday 1.37 if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
|
529 mday 1.25 {
|
530 kumpf 1.51 _idleEntries++;
|
531 mday 1.25 FD_SET(_entries[indx].socket, &fdread);
532 }
|
533 mday 1.13 }
|
534 s.hills 1.62
|
535 a.arora 1.73 /*
536 Add 1 then assign maxSocket accordingly. We add 1 to account for
537 descriptors starting at 0.
538 */
539 maxSocketCurrentPass++;
540
|
541 r.kieninger 1.83 _entry_mut.unlock();
|
542 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
543 kumpf 1.51 _entry_mut.lock(pegasus_thread_self());
|
544 mday 1.25
|
545 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
546 kumpf 1.50 if(events == SOCKET_ERROR)
|
547 mike 1.2 #else
|
548 kumpf 1.50 if(events == -1)
|
549 mike 1.2 #endif
|
550 mday 1.13 {
|
551 kumpf 1.50 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
552 "Monitor::run - errorno = %d has occurred on select.", errno);
553 // The EBADF error indicates that one or more or the file
554 // descriptions was not valid. This could indicate that
555 // the _entries structure has been corrupted or that
556 // we have a synchronization error.
557
558 PEGASUS_ASSERT(errno != EBADF);
559 }
560 else if (events)
561 {
|
562 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
563 r.kieninger 1.83 "Monitor::run select event received events = %d, monitoring %d idle entries",
|
564 kumpf 1.51 events, _idleEntries);
|
565 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
566 {
|
567 kumpf 1.53 // The Monitor should only look at entries in the table that are IDLE (i.e.,
568 // owned by the Monitor).
|
569 r.kieninger 1.83 if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&
|
570 kumpf 1.53 (FD_ISSET(_entries[indx].socket, &fdread)))
|
571 mday 1.25 {
572 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
|
573 kumpf 1.53 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
574 "Monitor::run indx = %d, queueId = %d, q = %p",
575 indx, _entries[indx].queueId, q);
576 PEGASUS_ASSERT(q !=0);
|
577 mday 1.37
|
578 r.kieninger 1.83 try
|
579 mday 1.25 {
|
580 mday 1.37 if(_entries[indx]._type == Monitor::CONNECTION)
581 {
|
582 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
583 "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
|
584 mday 1.37 static_cast<HTTPConnection *>(q)->_entry_index = indx;
|
585 sushma.fernandes 1.78
586 // Do not update the entry just yet. The entry gets updated once
|
587 r.kieninger 1.83 // the request has been read.
|
588 sushma.fernandes 1.78 //_entries[indx]._status = _MonitorEntry::BUSY;
589
|
590 kumpf 1.66 // If allocate_and_awaken failure, retry on next iteration
|
591 a.arora 1.73 /* Removed for PEP 183.
|
592 kumpf 1.69 if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
593 (void *)q, _dispatch))
|
594 kumpf 1.67 {
595 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
596 "Monitor::run: Insufficient resources to process request.");
597 _entries[indx]._status = _MonitorEntry::IDLE;
598 _entry_mut.unlock();
599 return true;
600 }
|
601 a.arora 1.73 */
602 // Added for PEP 183
603 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
604 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
605 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
606 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
607 try
608 {
609 dst->run(1);
610 }
611 catch (...)
612 {
613 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
614 "Monitor::_dispatch: exception received");
615 }
616 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
617 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
618
|
619 sushma.fernandes 1.78 // It is possible the entry status may not be set to busy.
|
620 r.kieninger 1.83 // The following will fail in that case.
621 // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
|
622 a.arora 1.73 // Once the HTTPConnection thread has set the status value to either
623 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
624 // to the Monitor. It is no longer permissible to access the connection
625 // or the entry in the _entries table.
|
626 sushma.fernandes 1.78
627 // The following is not relevant as the worker thread or the
628 // reader thread will update the status of the entry.
629 //if (dst->_connectionClosePending)
|
630 r.kieninger 1.83 //{
|
631 sushma.fernandes 1.78 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
632 //}
633 //else
634 //{
635 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
636 r.kieninger 1.83 //}
637 // end Added for PEP 183
|
638 a.arora 1.73 }
639 else if( _entries[indx]._type == Monitor::INTERNAL){
|
640 r.kieninger 1.83 // set ourself to BUSY,
641 // read the data
|
642 a.arora 1.73 // and set ourself back to IDLE
|
643 r.kieninger 1.83
|
644 a.arora 1.73 _entries[indx]._status == _MonitorEntry::BUSY;
645 static char buffer[2];
646 Socket::disableBlocking(_entries[indx].socket);
647 Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
648 Socket::enableBlocking(_entries[indx].socket);
649 _entries[indx]._status == _MonitorEntry::IDLE;
|
650 mday 1.37 }
651 else
|
652 mday 1.25 {
|
653 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
654 "Non-connection entry, indx = %d, has been received.", indx);
|
655 mday 1.37 int events = 0;
656 events |= SocketMessage::READ;
657 Message *msg = new SocketMessage(_entries[indx].socket, events);
658 _entries[indx]._status = _MonitorEntry::BUSY;
659 _entry_mut.unlock();
|
660 mday 1.27
|
661 mday 1.37 q->enqueue(msg);
662 _entries[indx]._status = _MonitorEntry::IDLE;
|
663 mday 1.25 return true;
664 }
665 }
|
666 mday 1.37 catch(...)
|
667 mday 1.25 {
668 }
669 handled_events = true;
670 }
671 }
|
672 mday 1.24 }
|
673 mday 1.37 _entry_mut.unlock();
|
674 mday 1.13 return(handled_events);
|
675 mike 1.2 }
676
|
677 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
678 kumpf 1.48 {
679 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
680 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
681 kumpf 1.48 _stopConnections = 1;
|
682 a.arora 1.73 tickle();
|
683 kumpf 1.48
|
684 chuck 1.74 if (wait)
|
685 a.arora 1.73 {
|
686 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
687 // caller of this function may unbind the ports while the monitor
688 // is still accepting connections on them.
689 try
690 {
691 _stopConnectionsSem.time_wait(10000);
692 }
693 catch (TimeOut &)
694 {
695 // The monitor is probably busy processng a very long request, and is
696 // not accepting connections. Let the caller unbind the ports.
697 }
|
698 a.arora 1.73 }
|
699 r.kieninger 1.83
|
700 kumpf 1.48 PEG_METHOD_EXIT();
701 }
|
702 mday 1.25
|
703 mday 1.37
|
704 mday 1.25 int Monitor::solicitSocketMessages(
|
705 r.kieninger 1.83 Sint32 socket,
|
706 mike 1.2 Uint32 events,
|
707 r.kieninger 1.83 Uint32 queueId,
|
708 mday 1.8 int type)
|
709 mike 1.2 {
|
710 r.kieninger 1.83 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
711 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
712 a.arora 1.73 // Check to see if we need to dynamically grow the _entries array
713 // We always want the _entries array to 2 bigger than the
714 // current connections requested
715 _solicitSocketCount++; // bump the count
716 int size = (int)_entries.size();
717 if(_solicitSocketCount >= (size-1)){
718 for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){
719 _MonitorEntry entry(0, 0, 0);
720 _entries.append(entry);
721 }
722 }
|
723 kumpf 1.4
|
724 a.arora 1.73 int index;
725 for(index = 1; index < (int)_entries.size(); index++)
|
726 mday 1.25 {
|
727 a.arora 1.73 try
|
728 mday 1.37 {
|
729 a.arora 1.73 if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
730 {
731 _entries[index].socket = socket;
732 _entries[index].queueId = queueId;
733 _entries[index]._type = type;
734 _entries[index]._status = _MonitorEntry::IDLE;
|
735 r.kieninger 1.83
|
736 a.arora 1.73 return index;
737 }
|
738 mday 1.37 }
739 catch(...)
|
740 mday 1.25 {
741 }
742 }
|
743 a.arora 1.73 _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful
|
744 mday 1.25 PEG_METHOD_EXIT();
|
745 kumpf 1.50 return -1;
|
746 a.arora 1.73
|
747 mike 1.2 }
748
|
749 mday 1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
|
750 mike 1.2 {
|
751 kumpf 1.50
|
752 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
753 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
754 a.arora 1.73
755 /*
756 Start at index = 1 because _entries[0] is the tickle entry which never needs
757 to be EMPTY;
758 */
759 int index;
760 for(index = 1; index < _entries.size(); index++)
|
761 mike 1.2 {
|
762 mday 1.25 if(_entries[index].socket == socket)
763 {
|
764 a.arora 1.73 _entries[index]._status = _MonitorEntry::EMPTY;
765 _entries[index].socket = -1;
766 _solicitSocketCount--;
767 break;
|
768 mday 1.25 }
|
769 mike 1.2 }
|
770 a.arora 1.73
771 /*
772 Dynamic Contraction:
773 To remove excess entries we will start from the end of the _entries array
774 and remove all entries with EMPTY status until we find the first NON EMPTY.
775 This prevents the positions, of the NON EMPTY entries, from being changed.
|
776 r.kieninger 1.83 */
|
777 a.arora 1.73 index = _entries.size() - 1;
778 while(_entries[index]._status == _MonitorEntry::EMPTY){
779 if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
780 _entries.remove(index);
781 index--;
782 }
783
|
784 kumpf 1.4 PEG_METHOD_EXIT();
|
785 mike 1.2 }
786
|
787 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
788 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
789 {
|
790 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
791 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
792 kumpf 1.53 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
793 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
794 kumpf 1.51 try
795 {
796 dst->run(1);
797 }
798 catch (...)
799 {
800 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
801 "Monitor::_dispatch: exception received");
802 }
803 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
804 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
|
805 r.kieninger 1.83
|
806 kumpf 1.53 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
|
807 kumpf 1.68
808 // Once the HTTPConnection thread has set the status value to either
809 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
810 // to the Monitor. It is no longer permissible to access the connection
811 // or the entry in the _entries table.
|
812 kumpf 1.50 if (dst->_connectionClosePending)
813 {
|
814 kumpf 1.68 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
815 }
816 else
817 {
818 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
819 kumpf 1.50 }
|
820 mday 1.8 return 0;
|
821 mday 1.40 }
822
|
823 mike 1.2 PEGASUS_NAMESPACE_END
|