1 karl 1.90 //%2005////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 mike 1.2 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
13 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
16 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
|
18 karl 1.90 //
|
19 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
20 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
22 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
25 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Mike Brasher (mbrasher@bmc.com)
31 //
|
32 r.kieninger 1.83 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
|
33 a.arora 1.71 // Amit K Arora (Bug#1153) amita@in.ibm.com
|
34 alagaraja 1.75 // Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
|
35 sushma.fernandes 1.78 // Sushma Fernandes (sushma@hp.com) for Bug#2057
|
36 joyce.j 1.84 // Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
|
37 kumpf 1.85 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
38 mike 1.2 //
39 //%/////////////////////////////////////////////////////////////////////////////
40
41 #include <Pegasus/Common/Config.h>
|
42 mday 1.40
|
43 mike 1.2 #include <cstring>
44 #include "Monitor.h"
45 #include "MessageQueue.h"
46 #include "Socket.h"
|
47 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
48 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
49 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
50 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
51 mike 1.2
52 #ifdef PEGASUS_OS_TYPE_WINDOWS
53 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
54 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
55 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
56 mday 1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
|
57 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
58 otherwise, less than 64 clients (the default) will be able to connect to the \
59 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
60 mday 1.5
|
61 mike 1.2 # endif
62 # define FD_SETSIZE 1024
|
63 mday 1.5 # include <windows.h>
|
64 mike 1.2 #else
65 # include <sys/types.h>
66 # include <sys/socket.h>
67 # include <sys/time.h>
68 # include <netinet/in.h>
69 # include <netdb.h>
70 # include <arpa/inet.h>
71 #endif
72
73 PEGASUS_USING_STD;
74
75 PEGASUS_NAMESPACE_BEGIN
76
|
77 kumpf 1.86 // Define a platform-neutral socket length type
|
78 gs.keenan 1.91 #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) || defined(PEGASUS_OS_VMS)
|
79 kumpf 1.86 typedef size_t PEGASUS_SOCKLEN_T;
80 #elif defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) || defined(PEGASUS_OS_LINUX) || (defined(PEGASUS_OS_SOLARIS) && !defined(SUNOS_5_6))
81 typedef socklen_t PEGASUS_SOCKLEN_T;
82 #else
83 typedef int PEGASUS_SOCKLEN_T;
84 #endif
|
85 mday 1.18
|
86 mday 1.25 static AtomicInt _connections = 0;
87
|
88 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
89 //
90 // Monitor
91 //
92 ////////////////////////////////////////////////////////////////////////////////
93
|
94 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
95 mike 1.2 Monitor::Monitor()
|
96 kumpf 1.87 : _stopConnections(0),
|
97 a.arora 1.73 _stopConnectionsSem(0),
|
98 a.dunfey 1.76 _solicitSocketCount(0),
|
99 a.dunfey 1.77 _tickle_client_socket(-1),
100 _tickle_server_socket(-1),
101 _tickle_peer_socket(-1)
|
102 mike 1.2 {
|
103 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
104 mike 1.2 Socket::initializeInterface();
|
105 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
106 a.arora 1.73
107 // setup the tickler
108 initializeTickler();
109
|
110 r.kieninger 1.83 // Start the count at 1 because initilizeTickler()
|
111 a.arora 1.73 // has added an entry in the first position of the
112 // _entries array
113 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
114 mday 1.37 {
115 _MonitorEntry entry(0, 0, 0);
116 _entries.append(entry);
117 }
|
118 mday 1.18 }
|
119 mday 1.20
|
120 mike 1.2 Monitor::~Monitor()
121 {
|
122 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
123 a.dunfey 1.76
124 try{
|
125 a.dunfey 1.77 if(_tickle_peer_socket >= 0)
|
126 a.dunfey 1.76 {
127 Socket::close(_tickle_peer_socket);
128 }
|
129 a.dunfey 1.77 if(_tickle_client_socket >= 0)
|
130 a.dunfey 1.76 {
131 Socket::close(_tickle_client_socket);
132 }
|
133 a.dunfey 1.77 if(_tickle_server_socket >= 0)
|
134 a.dunfey 1.76 {
135 Socket::close(_tickle_server_socket);
136 }
137 }
138 catch(...)
139 {
140 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
141 "Failed to close tickle sockets");
142 }
143
|
144 mike 1.2 Socket::uninitializeInterface();
|
145 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
146 "returning from monitor destructor");
|
147 mday 1.18 }
148
|
149 a.arora 1.73 void Monitor::initializeTickler(){
|
150 r.kieninger 1.83 /*
151 NOTE: On any errors trying to
152 setup out tickle connection,
|
153 a.arora 1.73 throw an exception/end the server
154 */
155
156 /* setup the tickle server/listener */
157
158 // get a socket for the server side
159 if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
160 //handle error
161 MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
162 "Received error number $0 while creating the internal socket.",
163 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
164 errno);
165 #else
166 WSAGetLastError());
167 #endif
168 throw Exception(parms);
169 }
170
171 // initialize the address
172 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
173 #ifdef PEGASUS_OS_ZOS
174 a.arora 1.73 _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
175 #else
176 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
177 #pragma convert(37)
178 #endif
179 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
180 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
181 #pragma convert(0)
182 #endif
183 #endif
184 _tickle_server_addr.sin_family = PF_INET;
185 _tickle_server_addr.sin_port = 0;
186
|
187 kumpf 1.86 PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
|
188 a.arora 1.73
189 // bind server side to socket
190 if((::bind(_tickle_server_socket,
|
191 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
192 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
193 // handle error
|
194 r.kieninger 1.83 #ifdef PEGASUS_OS_ZOS
195 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
196 "Received error:$0 while binding the internal socket.",strerror(errno));
197 #else
|
198 a.arora 1.73 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
199 "Received error number $0 while binding the internal socket.",
200 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
201 errno);
202 #else
203 WSAGetLastError());
204 #endif
|
205 r.kieninger 1.83 #endif
|
206 a.arora 1.73 throw Exception(parms);
207 }
208
209 // tell the kernel we are a server
210 if((::listen(_tickle_server_socket,3)) < 0){
211 // handle error
212 MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
213 "Received error number $0 while listening to the internal socket.",
214 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
215 errno);
216 #else
217 WSAGetLastError());
218 #endif
219 throw Exception(parms);
220 }
|
221 r.kieninger 1.83
|
222 a.arora 1.73 // make sure we have the correct socket for our server
223 int sock = ::getsockname(_tickle_server_socket,
|
224 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
225 &_addr_size);
|
226 a.arora 1.73 if(sock < 0){
227 // handle error
228 MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
229 "Received error number $0 while getting the internal socket name.",
230 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
231 errno);
232 #else
233 WSAGetLastError());
234 #endif
235 throw Exception(parms);
236 }
237
238 /* set up the tickle client/connector */
|
239 r.kieninger 1.83
|
240 a.arora 1.73 // get a socket for our tickle client
241 if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
242 // handle error
243 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
244 "Received error number $0 while creating the internal client socket.",
245 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
246 errno);
247 #else
248 WSAGetLastError());
249 #endif
250 throw Exception(parms);
251 }
252
253 // setup the address of the client
254 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
255 #ifdef PEGASUS_OS_ZOS
256 _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
257 #else
258 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
259 #pragma convert(37)
260 #endif
261 a.arora 1.73 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
262 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
263 #pragma convert(0)
264 #endif
265 #endif
266 _tickle_client_addr.sin_family = PF_INET;
267 _tickle_client_addr.sin_port = 0;
268
269 // bind socket to client side
270 if((::bind(_tickle_client_socket,
|
271 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
|
272 a.arora 1.73 sizeof(_tickle_client_addr))) < 0){
273 // handle error
274 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
275 "Received error number $0 while binding the internal client socket.",
276 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
277 errno);
278 #else
279 WSAGetLastError());
280 #endif
281 throw Exception(parms);
282 }
283
284 // connect to server side
285 if((::connect(_tickle_client_socket,
|
286 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
287 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
288 // handle error
289 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
290 "Received error number $0 while connecting the internal client socket.",
291 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
292 errno);
293 #else
294 WSAGetLastError());
295 #endif
296 throw Exception(parms);
297 }
298
299 /* set up the slave connection */
300 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
|
301 kumpf 1.86 PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
|
302 r.kieninger 1.83 pegasus_sleep(1);
|
303 a.arora 1.73
304 // this call may fail, we will try a max of 20 times to establish this peer connection
305 if((_tickle_peer_socket = ::accept(_tickle_server_socket,
|
306 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
307 &peer_size)) < 0){
|
308 a.arora 1.73 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
309 // Only retry on non-windows platforms.
310 if(_tickle_peer_socket == -1 && errno == EAGAIN)
311 {
|
312 r.kieninger 1.83 int retries = 0;
|
313 a.arora 1.73 do
314 {
315 pegasus_sleep(1);
316 _tickle_peer_socket = ::accept(_tickle_server_socket,
|
317 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
318 &peer_size);
|
319 a.arora 1.73 retries++;
320 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
321 }
322 #endif
323 }
324 if(_tickle_peer_socket == -1){
325 // handle error
326 MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
327 "Received error number $0 while accepting the internal socket connection.",
328 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
329 errno);
330 #else
331 WSAGetLastError());
332 #endif
333 throw Exception(parms);
334 }
335 // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
336 // checks entries with IDLE state for events
337 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
338 entry._status = _MonitorEntry::IDLE;
339 _entries.append(entry);
340 a.arora 1.73 }
341
342 void Monitor::tickle(void)
343 {
|
344 sushma.fernandes 1.78 static char _buffer[] =
|
345 a.arora 1.73 {
346 '0','0'
347 };
|
348 r.kieninger 1.83
|
349 sushma.fernandes 1.78 AutoMutex autoMutex(_tickle_mutex);
|
350 r.kieninger 1.83 Socket::disableBlocking(_tickle_client_socket);
|
351 sushma.fernandes 1.78 Socket::write(_tickle_client_socket,&_buffer, 2);
|
352 r.kieninger 1.83 Socket::enableBlocking(_tickle_client_socket);
|
353 sushma.fernandes 1.78 }
354
355 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
356 {
357 // Set the state to requested state
358 _entries[index]._status = status;
|
359 a.arora 1.73 }
360
|
361 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
362 {
|
363 mday 1.18
|
364 mday 1.25 Boolean handled_events = false;
|
365 a.arora 1.73 int i = 0;
|
366 r.kieninger 1.83
|
367 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
368 a.arora 1.73
|
369 mday 1.25 fd_set fdread;
370 FD_ZERO(&fdread);
|
371 a.arora 1.73
|
372 kumpf 1.94 AutoMutex autoEntryMutex(_entry_mut);
|
373 r.kieninger 1.83
374 // Check the stopConnections flag. If set, clear the Acceptor monitor entries
375 if (_stopConnections == 1)
|
376 kumpf 1.48 {
377 for ( int indx = 0; indx < (int)_entries.size(); indx++)
378 {
379 if (_entries[indx]._type == Monitor::ACCEPTOR)
380 {
381 if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)
382 {
383 if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||
384 _entries[indx]._status.value() == _MonitorEntry::DYING )
385 {
386 // remove the entry
387 _entries[indx]._status = _MonitorEntry::EMPTY;
388 }
389 else
390 {
391 // set status to DYING
|
392 kumpf 1.52 _entries[indx]._status = _MonitorEntry::DYING;
|
393 kumpf 1.48 }
394 }
395 }
396 }
397 _stopConnections = 0;
|
398 a.arora 1.73 _stopConnectionsSem.signal();
|
399 kumpf 1.48 }
|
400 kumpf 1.51
|
401 kumpf 1.68 for( int indx = 0; indx < (int)_entries.size(); indx++)
402 {
|
403 brian.campbell 1.80 const _MonitorEntry &entry = _entries[indx];
404 if ((entry._status.value() == _MonitorEntry::DYING) &&
405 (entry._type == Monitor::CONNECTION))
|
406 kumpf 1.68 {
|
407 brian.campbell 1.80 MessageQueue *q = MessageQueue::lookup(entry.queueId);
|
408 kumpf 1.68 PEGASUS_ASSERT(q != 0);
|
409 brian.campbell 1.80 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
|
410 r.kieninger 1.83
|
411 brian.campbell 1.80 if (h._connectionClosePending == false)
412 continue;
413
414 // NOTE: do not attempt to delete while there are pending responses
|
415 r.kieninger 1.83 // coming thru. The last response to come thru after a
|
416 brian.campbell 1.80 // _connectionClosePending will reset _responsePending to false
417 // and then cause the monitor to rerun this code and clean up.
418 // (see HTTPConnection.cpp)
419
420 if (h._responsePending == true)
421 {
422 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
423 "Ignoring connection delete request because "
424 "responses are still pending. "
|
425 r.kieninger 1.83 "connection=0x%p, socket=%d\n",
|
426 brian.campbell 1.81 (void *)&h, h.getSocket());
|
427 brian.campbell 1.80 continue;
428 }
429 h._connectionClosePending = false;
430 MessageQueue &o = h.get_owner();
431 Message* message= new CloseConnectionMessage(entry.socket);
|
432 kumpf 1.68 message->dest = o.getQueueId();
433
|
434 r.kieninger 1.83 // HTTPAcceptor is responsible for closing the connection.
|
435 kumpf 1.68 // The lock is released to allow HTTPAcceptor to call
|
436 r.kieninger 1.83 // unsolicitSocketMessages to free the entry.
|
437 kumpf 1.68 // Once HTTPAcceptor completes processing of the close
438 // connection, the lock is re-requested and processing of
439 // the for loop continues. This is safe with the current
440 // implementation of the _entries object. Note that the
441 // loop condition accesses the _entries.size() on each
442 // iteration, so that a change in size while the mutex is
443 // unlocked will not result in an ArrayIndexOutOfBounds
444 // exception.
445
|
446 kumpf 1.94 autoEntryMutex.unlock();
|
447 kumpf 1.68 o.enqueue(message);
|
448 kumpf 1.94 autoEntryMutex.lock();
|
449 kumpf 1.68 }
450 }
451
|
452 kumpf 1.51 Uint32 _idleEntries = 0;
|
453 r.kieninger 1.83
|
454 a.arora 1.73 /*
455 We will keep track of the maximum socket number and pass this value
|
456 r.kieninger 1.83 to the kernel as a parameter to SELECT. This loop seems like a good
|
457 a.arora 1.73 place to calculate the max file descriptor (maximum socket number)
458 because we have to traverse the entire array.
|
459 r.kieninger 1.83 */
|
460 a.arora 1.73 int maxSocketCurrentPass = 0;
|
461 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
|
462 mike 1.2 {
|
463 a.arora 1.73 if(maxSocketCurrentPass < _entries[indx].socket)
464 maxSocketCurrentPass = _entries[indx].socket;
465
|
466 mday 1.37 if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
|
467 mday 1.25 {
|
468 kumpf 1.51 _idleEntries++;
|
469 mday 1.25 FD_SET(_entries[indx].socket, &fdread);
470 }
|
471 mday 1.13 }
|
472 s.hills 1.62
|
473 a.arora 1.73 /*
474 Add 1 then assign maxSocket accordingly. We add 1 to account for
475 descriptors starting at 0.
476 */
477 maxSocketCurrentPass++;
478
|
479 kumpf 1.94 autoEntryMutex.unlock();
|
480 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
481 kumpf 1.94 autoEntryMutex.lock();
|
482 mday 1.25
|
483 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
484 kumpf 1.50 if(events == SOCKET_ERROR)
|
485 mike 1.2 #else
|
486 kumpf 1.50 if(events == -1)
|
487 mike 1.2 #endif
|
488 mday 1.13 {
|
489 kumpf 1.50 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
490 "Monitor::run - errorno = %d has occurred on select.", errno);
491 // The EBADF error indicates that one or more or the file
492 // descriptions was not valid. This could indicate that
493 // the _entries structure has been corrupted or that
494 // we have a synchronization error.
495
496 PEGASUS_ASSERT(errno != EBADF);
497 }
498 else if (events)
499 {
|
500 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
501 r.kieninger 1.83 "Monitor::run select event received events = %d, monitoring %d idle entries",
|
502 kumpf 1.51 events, _idleEntries);
|
503 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
504 {
|
505 kumpf 1.53 // The Monitor should only look at entries in the table that are IDLE (i.e.,
506 // owned by the Monitor).
|
507 r.kieninger 1.83 if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&
|
508 kumpf 1.53 (FD_ISSET(_entries[indx].socket, &fdread)))
|
509 mday 1.25 {
510 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
|
511 kumpf 1.53 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
512 "Monitor::run indx = %d, queueId = %d, q = %p",
513 indx, _entries[indx].queueId, q);
514 PEGASUS_ASSERT(q !=0);
|
515 mday 1.37
|
516 r.kieninger 1.83 try
|
517 mday 1.25 {
|
518 mday 1.37 if(_entries[indx]._type == Monitor::CONNECTION)
519 {
|
520 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
521 "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
|
522 mday 1.37 static_cast<HTTPConnection *>(q)->_entry_index = indx;
|
523 sushma.fernandes 1.78
524 // Do not update the entry just yet. The entry gets updated once
|
525 r.kieninger 1.83 // the request has been read.
|
526 sushma.fernandes 1.78 //_entries[indx]._status = _MonitorEntry::BUSY;
527
|
528 kumpf 1.66 // If allocate_and_awaken failure, retry on next iteration
|
529 a.arora 1.73 /* Removed for PEP 183.
|
530 kumpf 1.69 if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
531 (void *)q, _dispatch))
|
532 kumpf 1.67 {
533 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
534 "Monitor::run: Insufficient resources to process request.");
535 _entries[indx]._status = _MonitorEntry::IDLE;
536 return true;
537 }
|
538 a.arora 1.73 */
539 // Added for PEP 183
540 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
541 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
542 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
543 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
544 try
545 {
546 dst->run(1);
547 }
548 catch (...)
549 {
550 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
551 "Monitor::_dispatch: exception received");
552 }
553 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
554 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
555
|
556 sushma.fernandes 1.78 // It is possible the entry status may not be set to busy.
|
557 r.kieninger 1.83 // The following will fail in that case.
558 // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
|
559 a.arora 1.73 // Once the HTTPConnection thread has set the status value to either
560 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
561 // to the Monitor. It is no longer permissible to access the connection
562 // or the entry in the _entries table.
|
563 sushma.fernandes 1.78
564 // The following is not relevant as the worker thread or the
565 // reader thread will update the status of the entry.
566 //if (dst->_connectionClosePending)
|
567 r.kieninger 1.83 //{
|
568 sushma.fernandes 1.78 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
569 //}
570 //else
571 //{
572 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
573 r.kieninger 1.83 //}
574 // end Added for PEP 183
|
575 a.arora 1.73 }
576 else if( _entries[indx]._type == Monitor::INTERNAL){
|
577 r.kieninger 1.83 // set ourself to BUSY,
578 // read the data
|
579 a.arora 1.73 // and set ourself back to IDLE
|
580 r.kieninger 1.83
|
581 a.arora 1.73 _entries[indx]._status == _MonitorEntry::BUSY;
582 static char buffer[2];
583 Socket::disableBlocking(_entries[indx].socket);
584 Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
585 Socket::enableBlocking(_entries[indx].socket);
586 _entries[indx]._status == _MonitorEntry::IDLE;
|
587 mday 1.37 }
588 else
|
589 mday 1.25 {
|
590 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
591 "Non-connection entry, indx = %d, has been received.", indx);
|
592 mday 1.37 int events = 0;
593 events |= SocketMessage::READ;
594 Message *msg = new SocketMessage(_entries[indx].socket, events);
595 _entries[indx]._status = _MonitorEntry::BUSY;
|
596 kumpf 1.94 autoEntryMutex.unlock();
|
597 mday 1.37 q->enqueue(msg);
|
598 kumpf 1.94 autoEntryMutex.lock();
|
599 mday 1.37 _entries[indx]._status = _MonitorEntry::IDLE;
|
600 kumpf 1.94
|
601 mday 1.25 return true;
602 }
603 }
|
604 mday 1.37 catch(...)
|
605 mday 1.25 {
606 }
607 handled_events = true;
608 }
609 }
|
610 mday 1.24 }
|
611 kumpf 1.94
|
612 mday 1.13 return(handled_events);
|
613 mike 1.2 }
614
|
615 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
616 kumpf 1.48 {
617 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
618 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
619 kumpf 1.48 _stopConnections = 1;
|
620 a.arora 1.73 tickle();
|
621 kumpf 1.48
|
622 chuck 1.74 if (wait)
|
623 a.arora 1.73 {
|
624 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
625 // caller of this function may unbind the ports while the monitor
626 // is still accepting connections on them.
627 try
628 {
629 _stopConnectionsSem.time_wait(10000);
630 }
631 catch (TimeOut &)
632 {
633 // The monitor is probably busy processng a very long request, and is
634 // not accepting connections. Let the caller unbind the ports.
635 }
|
636 a.arora 1.73 }
|
637 r.kieninger 1.83
|
638 kumpf 1.48 PEG_METHOD_EXIT();
639 }
|
640 mday 1.25
|
641 mday 1.37
|
642 mday 1.25 int Monitor::solicitSocketMessages(
|
643 r.kieninger 1.83 Sint32 socket,
|
644 mike 1.2 Uint32 events,
|
645 r.kieninger 1.83 Uint32 queueId,
|
646 mday 1.8 int type)
|
647 mike 1.2 {
|
648 r.kieninger 1.83 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
649 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
650 a.arora 1.73 // Check to see if we need to dynamically grow the _entries array
651 // We always want the _entries array to 2 bigger than the
652 // current connections requested
653 _solicitSocketCount++; // bump the count
654 int size = (int)_entries.size();
|
655 w.otsuka 1.89 if((int)_solicitSocketCount >= (size-1)){
656 for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
|
657 a.arora 1.73 _MonitorEntry entry(0, 0, 0);
658 _entries.append(entry);
659 }
660 }
|
661 kumpf 1.4
|
662 a.arora 1.73 int index;
663 for(index = 1; index < (int)_entries.size(); index++)
|
664 mday 1.25 {
|
665 a.arora 1.73 try
|
666 mday 1.37 {
|
667 a.arora 1.73 if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
668 {
669 _entries[index].socket = socket;
670 _entries[index].queueId = queueId;
671 _entries[index]._type = type;
672 _entries[index]._status = _MonitorEntry::IDLE;
|
673 r.kieninger 1.83
|
674 a.arora 1.73 return index;
675 }
|
676 mday 1.37 }
677 catch(...)
|
678 mday 1.25 {
679 }
680 }
|
681 a.arora 1.73 _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful
|
682 mday 1.25 PEG_METHOD_EXIT();
|
683 kumpf 1.50 return -1;
|
684 a.arora 1.73
|
685 mike 1.2 }
686
|
687 mday 1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
|
688 mike 1.2 {
|
689 kumpf 1.50
|
690 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
691 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
692 a.arora 1.73
693 /*
694 Start at index = 1 because _entries[0] is the tickle entry which never needs
695 to be EMPTY;
696 */
|
697 w.otsuka 1.89 unsigned int index;
|
698 a.arora 1.73 for(index = 1; index < _entries.size(); index++)
|
699 mike 1.2 {
|
700 mday 1.25 if(_entries[index].socket == socket)
701 {
|
702 a.arora 1.73 _entries[index]._status = _MonitorEntry::EMPTY;
703 _entries[index].socket = -1;
704 _solicitSocketCount--;
705 break;
|
706 mday 1.25 }
|
707 mike 1.2 }
|
708 a.arora 1.73
709 /*
710 Dynamic Contraction:
711 To remove excess entries we will start from the end of the _entries array
712 and remove all entries with EMPTY status until we find the first NON EMPTY.
713 This prevents the positions, of the NON EMPTY entries, from being changed.
|
714 r.kieninger 1.83 */
|
715 a.arora 1.73 index = _entries.size() - 1;
716 while(_entries[index]._status == _MonitorEntry::EMPTY){
717 if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
718 _entries.remove(index);
719 index--;
720 }
|
721 kumpf 1.4 PEG_METHOD_EXIT();
|
722 mike 1.2 }
723
|
724 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
725 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
726 {
|
727 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
728 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
729 kumpf 1.53 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
730 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
731 kumpf 1.51 try
732 {
733 dst->run(1);
734 }
735 catch (...)
736 {
737 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
738 "Monitor::_dispatch: exception received");
739 }
740 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
741 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
|
742 r.kieninger 1.83
|
743 kumpf 1.53 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
|
744 kumpf 1.68
745 // Once the HTTPConnection thread has set the status value to either
746 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
747 // to the Monitor. It is no longer permissible to access the connection
748 // or the entry in the _entries table.
|
749 kumpf 1.50 if (dst->_connectionClosePending)
750 {
|
751 kumpf 1.68 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
752 }
753 else
754 {
755 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
756 kumpf 1.50 }
|
757 mday 1.8 return 0;
|
758 mday 1.40 }
759
|
760 mike 1.2 PEGASUS_NAMESPACE_END
|