1 karl 1.90 //%2005////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 mike 1.2 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
13 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
16 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
|
18 david.dillard 1.95 //
|
19 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
20 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
22 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
25 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Mike Brasher (mbrasher@bmc.com)
31 //
|
32 r.kieninger 1.83 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
|
33 a.arora 1.71 // Amit K Arora (Bug#1153) amita@in.ibm.com
|
34 alagaraja 1.75 // Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
|
35 sushma.fernandes 1.78 // Sushma Fernandes (sushma@hp.com) for Bug#2057
|
36 joyce.j 1.84 // Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
|
37 kumpf 1.85 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
38 mike 1.2 //
39 //%/////////////////////////////////////////////////////////////////////////////
40
41 #include <Pegasus/Common/Config.h>
|
42 mday 1.40
|
43 mike 1.2 #include <cstring>
44 #include "Monitor.h"
45 #include "MessageQueue.h"
46 #include "Socket.h"
|
47 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
48 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
49 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
50 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
51 mike 1.2
52 #ifdef PEGASUS_OS_TYPE_WINDOWS
53 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
54 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
55 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
56 david.dillard 1.95 <windows.h>). Find inclusion of that header which is visible to this \
|
57 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
58 otherwise, less than 64 clients (the default) will be able to connect to the \
59 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
60 mday 1.5
|
61 mike 1.2 # endif
62 # define FD_SETSIZE 1024
|
63 mday 1.5 # include <windows.h>
|
64 mike 1.2 #else
65 # include <sys/types.h>
66 # include <sys/socket.h>
67 # include <sys/time.h>
68 # include <netinet/in.h>
69 # include <netdb.h>
70 # include <arpa/inet.h>
71 #endif
72
73 PEGASUS_USING_STD;
74
75 PEGASUS_NAMESPACE_BEGIN
76
|
77 kumpf 1.86 // Define a platform-neutral socket length type
|
78 gs.keenan 1.91 #if defined(PEGASUS_PLATFORM_ZOS_ZSERIES_IBM) || defined(PEGASUS_OS_VMS)
|
79 kumpf 1.86 typedef size_t PEGASUS_SOCKLEN_T;
80 #elif defined(PEGASUS_PLATFORM_AIX_RS_IBMCXX) || defined(PEGASUS_OS_LINUX) || (defined(PEGASUS_OS_SOLARIS) && !defined(SUNOS_5_6))
81 typedef socklen_t PEGASUS_SOCKLEN_T;
82 #else
83 typedef int PEGASUS_SOCKLEN_T;
84 #endif
|
85 mday 1.18
|
86 mike 1.96 static AtomicInt _connections(0);
|
87 mday 1.25
|
88 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
89 //
90 // Monitor
91 //
92 ////////////////////////////////////////////////////////////////////////////////
93
|
94 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
95 mike 1.2 Monitor::Monitor()
|
96 kumpf 1.87 : _stopConnections(0),
|
97 a.arora 1.73 _stopConnectionsSem(0),
|
98 a.dunfey 1.76 _solicitSocketCount(0),
|
99 a.dunfey 1.77 _tickle_client_socket(-1),
100 _tickle_server_socket(-1),
101 _tickle_peer_socket(-1)
|
102 mike 1.2 {
|
103 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
104 mike 1.2 Socket::initializeInterface();
|
105 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
106 a.arora 1.73
107 // setup the tickler
108 initializeTickler();
109
|
110 r.kieninger 1.83 // Start the count at 1 because initilizeTickler()
|
111 a.arora 1.73 // has added an entry in the first position of the
112 // _entries array
113 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
114 mday 1.37 {
115 _MonitorEntry entry(0, 0, 0);
116 _entries.append(entry);
117 }
|
118 mday 1.18 }
|
119 mday 1.20
|
120 mike 1.2 Monitor::~Monitor()
121 {
|
122 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
123 a.dunfey 1.76
124 try{
|
125 a.dunfey 1.77 if(_tickle_peer_socket >= 0)
|
126 a.dunfey 1.76 {
127 Socket::close(_tickle_peer_socket);
128 }
|
129 a.dunfey 1.77 if(_tickle_client_socket >= 0)
|
130 a.dunfey 1.76 {
131 Socket::close(_tickle_client_socket);
132 }
|
133 a.dunfey 1.77 if(_tickle_server_socket >= 0)
|
134 a.dunfey 1.76 {
135 Socket::close(_tickle_server_socket);
136 }
137 }
138 catch(...)
139 {
140 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
141 "Failed to close tickle sockets");
142 }
143
|
144 mike 1.2 Socket::uninitializeInterface();
|
145 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
146 "returning from monitor destructor");
|
147 mday 1.18 }
148
|
149 a.arora 1.73 void Monitor::initializeTickler(){
|
150 r.kieninger 1.83 /*
151 NOTE: On any errors trying to
152 setup out tickle connection,
|
153 a.arora 1.73 throw an exception/end the server
154 */
155
156 /* setup the tickle server/listener */
157
158 // get a socket for the server side
|
159 david.dillard 1.95 if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
|
160 a.arora 1.73 //handle error
161 MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
162 "Received error number $0 while creating the internal socket.",
163 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
164 errno);
165 #else
166 WSAGetLastError());
167 #endif
168 throw Exception(parms);
169 }
170
171 // initialize the address
172 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
173 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
174 #pragma convert(37)
175 #endif
176 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
177 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
178 #pragma convert(0)
179 #endif
180 _tickle_server_addr.sin_family = PF_INET;
181 a.arora 1.73 _tickle_server_addr.sin_port = 0;
182
|
183 kumpf 1.86 PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
|
184 a.arora 1.73
185 // bind server side to socket
186 if((::bind(_tickle_server_socket,
|
187 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
188 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
189 // handle error
|
190 r.kieninger 1.83 #ifdef PEGASUS_OS_ZOS
191 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
192 "Received error:$0 while binding the internal socket.",strerror(errno));
193 #else
|
194 a.arora 1.73 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
195 "Received error number $0 while binding the internal socket.",
196 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
197 errno);
198 #else
199 WSAGetLastError());
200 #endif
|
201 r.kieninger 1.83 #endif
|
202 a.arora 1.73 throw Exception(parms);
203 }
204
205 // tell the kernel we are a server
206 if((::listen(_tickle_server_socket,3)) < 0){
207 // handle error
208 MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
209 "Received error number $0 while listening to the internal socket.",
210 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
211 errno);
212 #else
213 WSAGetLastError());
214 #endif
215 throw Exception(parms);
216 }
|
217 r.kieninger 1.83
|
218 a.arora 1.73 // make sure we have the correct socket for our server
219 int sock = ::getsockname(_tickle_server_socket,
|
220 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
221 &_addr_size);
|
222 a.arora 1.73 if(sock < 0){
223 // handle error
224 MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
225 "Received error number $0 while getting the internal socket name.",
226 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
227 errno);
228 #else
229 WSAGetLastError());
230 #endif
231 throw Exception(parms);
232 }
233
234 /* set up the tickle client/connector */
|
235 r.kieninger 1.83
|
236 a.arora 1.73 // get a socket for our tickle client
|
237 david.dillard 1.95 if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
|
238 a.arora 1.73 // handle error
239 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
240 "Received error number $0 while creating the internal client socket.",
241 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
242 errno);
243 #else
244 WSAGetLastError());
245 #endif
246 throw Exception(parms);
247 }
248
249 // setup the address of the client
250 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
251 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
252 #pragma convert(37)
253 #endif
254 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
255 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
256 #pragma convert(0)
257 #endif
258 _tickle_client_addr.sin_family = PF_INET;
259 a.arora 1.73 _tickle_client_addr.sin_port = 0;
260
261 // bind socket to client side
262 if((::bind(_tickle_client_socket,
|
263 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
|
264 a.arora 1.73 sizeof(_tickle_client_addr))) < 0){
265 // handle error
266 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
267 "Received error number $0 while binding the internal client socket.",
268 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
269 errno);
270 #else
271 WSAGetLastError());
272 #endif
273 throw Exception(parms);
274 }
275
276 // connect to server side
277 if((::connect(_tickle_client_socket,
|
278 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
279 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
280 // handle error
281 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
282 "Received error number $0 while connecting the internal client socket.",
283 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
284 errno);
285 #else
286 WSAGetLastError());
287 #endif
288 throw Exception(parms);
289 }
290
291 /* set up the slave connection */
292 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
|
293 kumpf 1.86 PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
|
294 r.kieninger 1.83 pegasus_sleep(1);
|
295 a.arora 1.73
296 // this call may fail, we will try a max of 20 times to establish this peer connection
297 if((_tickle_peer_socket = ::accept(_tickle_server_socket,
|
298 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
299 &peer_size)) < 0){
|
300 a.arora 1.73 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
301 // Only retry on non-windows platforms.
302 if(_tickle_peer_socket == -1 && errno == EAGAIN)
303 {
|
304 r.kieninger 1.83 int retries = 0;
|
305 a.arora 1.73 do
306 {
307 pegasus_sleep(1);
308 _tickle_peer_socket = ::accept(_tickle_server_socket,
|
309 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
310 &peer_size);
|
311 a.arora 1.73 retries++;
312 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
313 }
314 #endif
315 }
316 if(_tickle_peer_socket == -1){
317 // handle error
318 MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
319 "Received error number $0 while accepting the internal socket connection.",
320 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
321 errno);
322 #else
323 WSAGetLastError());
324 #endif
325 throw Exception(parms);
326 }
327 // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
328 // checks entries with IDLE state for events
329 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
330 entry._status = _MonitorEntry::IDLE;
331 _entries.append(entry);
332 a.arora 1.73 }
333
334 void Monitor::tickle(void)
335 {
|
336 sushma.fernandes 1.78 static char _buffer[] =
|
337 a.arora 1.73 {
338 '0','0'
339 };
|
340 r.kieninger 1.83
|
341 sushma.fernandes 1.78 AutoMutex autoMutex(_tickle_mutex);
|
342 r.kieninger 1.83 Socket::disableBlocking(_tickle_client_socket);
|
343 sushma.fernandes 1.78 Socket::write(_tickle_client_socket,&_buffer, 2);
|
344 r.kieninger 1.83 Socket::enableBlocking(_tickle_client_socket);
|
345 sushma.fernandes 1.78 }
346
347 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
348 {
349 // Set the state to requested state
350 _entries[index]._status = status;
|
351 a.arora 1.73 }
352
|
353 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
354 {
|
355 mday 1.18
|
356 mday 1.25 Boolean handled_events = false;
|
357 a.arora 1.73 int i = 0;
|
358 r.kieninger 1.83
|
359 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
360 a.arora 1.73
|
361 mday 1.25 fd_set fdread;
362 FD_ZERO(&fdread);
|
363 a.arora 1.73
|
364 kumpf 1.94 AutoMutex autoEntryMutex(_entry_mut);
|
365 r.kieninger 1.83
366 // Check the stopConnections flag. If set, clear the Acceptor monitor entries
|
367 mike 1.96 if (_stopConnections.get() == 1)
|
368 kumpf 1.48 {
369 for ( int indx = 0; indx < (int)_entries.size(); indx++)
370 {
371 if (_entries[indx]._type == Monitor::ACCEPTOR)
372 {
|
373 mike 1.96 if ( _entries[indx]._status.get() != _MonitorEntry::EMPTY)
|
374 kumpf 1.48 {
|
375 mike 1.96 if ( _entries[indx]._status.get() == _MonitorEntry::IDLE ||
376 _entries[indx]._status.get() == _MonitorEntry::DYING )
|
377 kumpf 1.48 {
378 // remove the entry
379 _entries[indx]._status = _MonitorEntry::EMPTY;
380 }
381 else
382 {
383 // set status to DYING
|
384 kumpf 1.52 _entries[indx]._status = _MonitorEntry::DYING;
|
385 kumpf 1.48 }
386 }
387 }
388 }
389 _stopConnections = 0;
|
390 a.arora 1.73 _stopConnectionsSem.signal();
|
391 kumpf 1.48 }
|
392 kumpf 1.51
|
393 kumpf 1.68 for( int indx = 0; indx < (int)_entries.size(); indx++)
394 {
|
395 brian.campbell 1.80 const _MonitorEntry &entry = _entries[indx];
|
396 mike 1.96 if ((entry._status.get() == _MonitorEntry::DYING) &&
|
397 brian.campbell 1.80 (entry._type == Monitor::CONNECTION))
|
398 kumpf 1.68 {
|
399 brian.campbell 1.80 MessageQueue *q = MessageQueue::lookup(entry.queueId);
|
400 kumpf 1.68 PEGASUS_ASSERT(q != 0);
|
401 brian.campbell 1.80 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
|
402 r.kieninger 1.83
|
403 brian.campbell 1.80 if (h._connectionClosePending == false)
404 continue;
405
406 // NOTE: do not attempt to delete while there are pending responses
|
407 r.kieninger 1.83 // coming thru. The last response to come thru after a
|
408 brian.campbell 1.80 // _connectionClosePending will reset _responsePending to false
409 // and then cause the monitor to rerun this code and clean up.
410 // (see HTTPConnection.cpp)
411
412 if (h._responsePending == true)
413 {
414 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
415 "Ignoring connection delete request because "
416 "responses are still pending. "
|
417 r.kieninger 1.83 "connection=0x%p, socket=%d\n",
|
418 brian.campbell 1.81 (void *)&h, h.getSocket());
|
419 brian.campbell 1.80 continue;
420 }
421 h._connectionClosePending = false;
422 MessageQueue &o = h.get_owner();
423 Message* message= new CloseConnectionMessage(entry.socket);
|
424 kumpf 1.68 message->dest = o.getQueueId();
425
|
426 r.kieninger 1.83 // HTTPAcceptor is responsible for closing the connection.
|
427 kumpf 1.68 // The lock is released to allow HTTPAcceptor to call
|
428 r.kieninger 1.83 // unsolicitSocketMessages to free the entry.
|
429 kumpf 1.68 // Once HTTPAcceptor completes processing of the close
430 // connection, the lock is re-requested and processing of
431 // the for loop continues. This is safe with the current
432 // implementation of the _entries object. Note that the
433 // loop condition accesses the _entries.size() on each
434 // iteration, so that a change in size while the mutex is
435 // unlocked will not result in an ArrayIndexOutOfBounds
436 // exception.
437
|
438 kumpf 1.94 autoEntryMutex.unlock();
|
439 kumpf 1.68 o.enqueue(message);
|
440 kumpf 1.94 autoEntryMutex.lock();
|
441 kumpf 1.68 }
442 }
443
|
444 kumpf 1.51 Uint32 _idleEntries = 0;
|
445 r.kieninger 1.83
|
446 a.arora 1.73 /*
|
447 david.dillard 1.95 We will keep track of the maximum socket number and pass this value
448 to the kernel as a parameter to SELECT. This loop seems like a good
449 place to calculate the max file descriptor (maximum socket number)
450 because we have to traverse the entire array.
|
451 r.kieninger 1.83 */
|
452 david.dillard 1.95 PEGASUS_SOCKET maxSocketCurrentPass = 0;
|
453 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
|
454 mike 1.2 {
|
455 a.arora 1.73 if(maxSocketCurrentPass < _entries[indx].socket)
|
456 david.dillard 1.95 maxSocketCurrentPass = _entries[indx].socket;
|
457 a.arora 1.73
|
458 mike 1.96 if(_entries[indx]._status.get() == _MonitorEntry::IDLE)
|
459 mday 1.25 {
|
460 david.dillard 1.95 _idleEntries++;
461 FD_SET(_entries[indx].socket, &fdread);
|
462 mday 1.25 }
|
463 mday 1.13 }
|
464 s.hills 1.62
|
465 a.arora 1.73 /*
|
466 david.dillard 1.95 Add 1 then assign maxSocket accordingly. We add 1 to account for
467 descriptors starting at 0.
|
468 a.arora 1.73 */
469 maxSocketCurrentPass++;
470
|
471 kumpf 1.94 autoEntryMutex.unlock();
|
472 david.dillard 1.95
473 //
474 // The first argument to select() is ignored on Windows and it is not
475 // a socket value. The original code assumed that the number of sockets
476 // and a socket value have the same type. On Windows they do not.
477 //
478 #ifdef PEGASUS_OS_TYPE_WINDOWS
479 int events = select(0, &fdread, NULL, NULL, &tv);
480 #else
|
481 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
482 david.dillard 1.95 #endif
|
483 kumpf 1.94 autoEntryMutex.lock();
|
484 mday 1.25
|
485 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
486 kumpf 1.50 if(events == SOCKET_ERROR)
|
487 mike 1.2 #else
|
488 kumpf 1.50 if(events == -1)
|
489 mike 1.2 #endif
|
490 mday 1.13 {
|
491 kumpf 1.50 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
492 "Monitor::run - errorno = %d has occurred on select.", errno);
493 // The EBADF error indicates that one or more or the file
494 // descriptions was not valid. This could indicate that
495 // the _entries structure has been corrupted or that
496 // we have a synchronization error.
497
498 PEGASUS_ASSERT(errno != EBADF);
499 }
500 else if (events)
501 {
|
502 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
503 r.kieninger 1.83 "Monitor::run select event received events = %d, monitoring %d idle entries",
|
504 david.dillard 1.95 events, _idleEntries);
|
505 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
506 {
|
507 kumpf 1.53 // The Monitor should only look at entries in the table that are IDLE (i.e.,
508 // owned by the Monitor).
|
509 mike 1.96 if((_entries[indx]._status.get() == _MonitorEntry::IDLE) &&
|
510 david.dillard 1.95 (FD_ISSET(_entries[indx].socket, &fdread)))
511 {
512 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
|
513 kumpf 1.53 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
514 "Monitor::run indx = %d, queueId = %d, q = %p",
515 indx, _entries[indx].queueId, q);
516 PEGASUS_ASSERT(q !=0);
|
517 mday 1.37
|
518 david.dillard 1.95 try
519 {
520 if(_entries[indx]._type == Monitor::CONNECTION)
521 {
|
522 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
523 "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
|
524 david.dillard 1.95 static_cast<HTTPConnection *>(q)->_entry_index = indx;
|
525 sushma.fernandes 1.78
526 // Do not update the entry just yet. The entry gets updated once
|
527 r.kieninger 1.83 // the request has been read.
|
528 david.dillard 1.95 //_entries[indx]._status = _MonitorEntry::BUSY;
|
529 sushma.fernandes 1.78
|
530 kumpf 1.66 // If allocate_and_awaken failure, retry on next iteration
|
531 a.arora 1.73 /* Removed for PEP 183.
|
532 kumpf 1.69 if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
533 (void *)q, _dispatch))
|
534 kumpf 1.67 {
535 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
536 "Monitor::run: Insufficient resources to process request.");
537 _entries[indx]._status = _MonitorEntry::IDLE;
538 return true;
539 }
|
540 a.arora 1.73 */
541 // Added for PEP 183
|
542 david.dillard 1.95 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
543 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
544 a.arora 1.73 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
545 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
546 try
547 {
548 dst->run(1);
549 }
|
550 david.dillard 1.95 catch (...)
551 {
552 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
553 "Monitor::_dispatch: exception received");
554 }
555 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
556 a.arora 1.73 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
557
|
558 sushma.fernandes 1.78 // It is possible the entry status may not be set to busy.
|
559 r.kieninger 1.83 // The following will fail in that case.
|
560 mike 1.96 // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
|
561 a.arora 1.73 // Once the HTTPConnection thread has set the status value to either
562 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
563 // to the Monitor. It is no longer permissible to access the connection
564 // or the entry in the _entries table.
|
565 sushma.fernandes 1.78
566 // The following is not relevant as the worker thread or the
567 // reader thread will update the status of the entry.
568 //if (dst->_connectionClosePending)
|
569 r.kieninger 1.83 //{
|
570 sushma.fernandes 1.78 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
571 //}
572 //else
573 //{
574 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
575 r.kieninger 1.83 //}
576 // end Added for PEP 183
|
577 a.arora 1.73 }
578 else if( _entries[indx]._type == Monitor::INTERNAL){
|
579 r.kieninger 1.83 // set ourself to BUSY,
580 // read the data
|
581 a.arora 1.73 // and set ourself back to IDLE
|
582 r.kieninger 1.83
|
583 mike 1.96 _entries[indx]._status.get() == _MonitorEntry::BUSY;
|
584 a.arora 1.73 static char buffer[2];
585 Socket::disableBlocking(_entries[indx].socket);
586 Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
587 Socket::enableBlocking(_entries[indx].socket);
|
588 mike 1.96 _entries[indx]._status.get() == _MonitorEntry::IDLE;
|
589 mday 1.37 }
590 else
|
591 mday 1.25 {
|
592 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
593 "Non-connection entry, indx = %d, has been received.", indx);
|
594 mday 1.37 int events = 0;
595 events |= SocketMessage::READ;
596 Message *msg = new SocketMessage(_entries[indx].socket, events);
597 _entries[indx]._status = _MonitorEntry::BUSY;
|
598 kumpf 1.94 autoEntryMutex.unlock();
|
599 mday 1.37 q->enqueue(msg);
|
600 kumpf 1.94 autoEntryMutex.lock();
|
601 mday 1.37 _entries[indx]._status = _MonitorEntry::IDLE;
|
602 kumpf 1.94
|
603 mday 1.25 return true;
604 }
605 }
|
606 mday 1.37 catch(...)
|
607 mday 1.25 {
608 }
609 handled_events = true;
610 }
611 }
|
612 mday 1.24 }
|
613 kumpf 1.94
|
614 mday 1.13 return(handled_events);
|
615 mike 1.2 }
616
|
617 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
618 kumpf 1.48 {
619 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
620 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
621 kumpf 1.48 _stopConnections = 1;
|
622 a.arora 1.73 tickle();
|
623 kumpf 1.48
|
624 chuck 1.74 if (wait)
|
625 a.arora 1.73 {
|
626 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
627 // caller of this function may unbind the ports while the monitor
628 // is still accepting connections on them.
629 try
630 {
631 _stopConnectionsSem.time_wait(10000);
632 }
633 catch (TimeOut &)
634 {
635 // The monitor is probably busy processng a very long request, and is
636 // not accepting connections. Let the caller unbind the ports.
637 }
|
638 a.arora 1.73 }
|
639 r.kieninger 1.83
|
640 kumpf 1.48 PEG_METHOD_EXIT();
641 }
|
642 mday 1.25
|
643 mday 1.37
|
644 mday 1.25 int Monitor::solicitSocketMessages(
|
645 david.dillard 1.95 PEGASUS_SOCKET socket,
|
646 mike 1.2 Uint32 events,
|
647 r.kieninger 1.83 Uint32 queueId,
|
648 mday 1.8 int type)
|
649 mike 1.2 {
|
650 r.kieninger 1.83 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
651 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
652 a.arora 1.73 // Check to see if we need to dynamically grow the _entries array
653 // We always want the _entries array to 2 bigger than the
654 // current connections requested
655 _solicitSocketCount++; // bump the count
656 int size = (int)_entries.size();
|
657 w.otsuka 1.89 if((int)_solicitSocketCount >= (size-1)){
658 for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
|
659 a.arora 1.73 _MonitorEntry entry(0, 0, 0);
660 _entries.append(entry);
661 }
662 }
|
663 kumpf 1.4
|
664 a.arora 1.73 int index;
665 for(index = 1; index < (int)_entries.size(); index++)
|
666 mday 1.25 {
|
667 a.arora 1.73 try
|
668 mday 1.37 {
|
669 mike 1.96 if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
|
670 a.arora 1.73 {
671 _entries[index].socket = socket;
672 _entries[index].queueId = queueId;
673 _entries[index]._type = type;
674 _entries[index]._status = _MonitorEntry::IDLE;
|
675 r.kieninger 1.83
|
676 a.arora 1.73 return index;
677 }
|
678 mday 1.37 }
679 catch(...)
|
680 mday 1.25 {
681 }
682 }
|
683 a.arora 1.73 _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful
|
684 mday 1.25 PEG_METHOD_EXIT();
|
685 kumpf 1.50 return -1;
|
686 a.arora 1.73
|
687 mike 1.2 }
688
|
689 david.dillard 1.95 void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
|
690 mike 1.2 {
|
691 kumpf 1.50
|
692 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
693 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
694 a.arora 1.73
695 /*
696 Start at index = 1 because _entries[0] is the tickle entry which never needs
697 to be EMPTY;
698 */
|
699 w.otsuka 1.89 unsigned int index;
|
700 a.arora 1.73 for(index = 1; index < _entries.size(); index++)
|
701 mike 1.2 {
|
702 mday 1.25 if(_entries[index].socket == socket)
703 {
|
704 a.arora 1.73 _entries[index]._status = _MonitorEntry::EMPTY;
|
705 david.dillard 1.95 _entries[index].socket = PEGASUS_INVALID_SOCKET;
|
706 a.arora 1.73 _solicitSocketCount--;
707 break;
|
708 mday 1.25 }
|
709 mike 1.2 }
|
710 a.arora 1.73
711 /*
712 Dynamic Contraction:
713 To remove excess entries we will start from the end of the _entries array
714 and remove all entries with EMPTY status until we find the first NON EMPTY.
715 This prevents the positions, of the NON EMPTY entries, from being changed.
|
716 r.kieninger 1.83 */
|
717 a.arora 1.73 index = _entries.size() - 1;
|
718 mike 1.96 while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
|
719 a.arora 1.73 if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
720 _entries.remove(index);
721 index--;
722 }
|
723 kumpf 1.4 PEG_METHOD_EXIT();
|
724 mike 1.2 }
725
|
726 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
727 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
728 {
|
729 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
730 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
731 kumpf 1.53 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
732 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
733 kumpf 1.51 try
734 {
735 dst->run(1);
736 }
737 catch (...)
738 {
739 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
740 "Monitor::_dispatch: exception received");
741 }
742 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
743 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
|
744 r.kieninger 1.83
|
745 mike 1.96 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
|
746 kumpf 1.68
747 // Once the HTTPConnection thread has set the status value to either
748 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
749 // to the Monitor. It is no longer permissible to access the connection
750 // or the entry in the _entries table.
|
751 kumpf 1.50 if (dst->_connectionClosePending)
752 {
|
753 kumpf 1.68 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
754 }
755 else
756 {
757 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
758 kumpf 1.50 }
|
759 mday 1.8 return 0;
|
760 mday 1.40 }
761
|
762 mike 1.2 PEGASUS_NAMESPACE_END
|