1 karl 1.103 //%2006////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mike 1.2 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
15 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
18 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.103 //
|
21 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
24 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
27 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Mike Brasher (mbrasher@bmc.com)
33 //
|
34 r.kieninger 1.83 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
|
35 a.arora 1.71 // Amit K Arora (Bug#1153) amita@in.ibm.com
|
36 alagaraja 1.75 // Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
|
37 sushma.fernandes 1.78 // Sushma Fernandes (sushma@hp.com) for Bug#2057
|
38 joyce.j 1.84 // Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
|
39 kumpf 1.85 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
40 mike 1.2 //
41 //%/////////////////////////////////////////////////////////////////////////////
42
43 #include <Pegasus/Common/Config.h>
|
44 mday 1.40
|
45 mike 1.2 #include <cstring>
46 #include "Monitor.h"
47 #include "MessageQueue.h"
48 #include "Socket.h"
|
49 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
50 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
51 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
52 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
53 mike 1.100 #include "ArrayIterator.h"
|
54 mike 1.2
55 #ifdef PEGASUS_OS_TYPE_WINDOWS
56 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
57 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
58 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
59 david.dillard 1.95 <windows.h>). Find inclusion of that header which is visible to this \
|
60 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
61 otherwise, less than 64 clients (the default) will be able to connect to the \
62 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
63 mday 1.5
|
64 mike 1.2 # endif
65 # define FD_SETSIZE 1024
|
66 mday 1.5 # include <windows.h>
|
67 mike 1.2 #else
68 # include <sys/types.h>
69 # include <sys/socket.h>
70 # include <sys/time.h>
71 # include <netinet/in.h>
72 # include <netdb.h>
73 # include <arpa/inet.h>
74 #endif
75
76 PEGASUS_USING_STD;
77
78 PEGASUS_NAMESPACE_BEGIN
79
|
80 mike 1.96 static AtomicInt _connections(0);
|
81 mday 1.25
|
82 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
83 //
84 // Monitor
85 //
86 ////////////////////////////////////////////////////////////////////////////////
87
|
88 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
89 mike 1.2 Monitor::Monitor()
|
90 kumpf 1.87 : _stopConnections(0),
|
91 a.arora 1.73 _stopConnectionsSem(0),
|
92 a.dunfey 1.76 _solicitSocketCount(0),
|
93 a.dunfey 1.77 _tickle_client_socket(-1),
94 _tickle_server_socket(-1),
95 _tickle_peer_socket(-1)
|
96 mike 1.2 {
|
97 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
98 mike 1.2 Socket::initializeInterface();
|
99 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
100 a.arora 1.73
101 // setup the tickler
102 initializeTickler();
103
|
104 r.kieninger 1.83 // Start the count at 1 because initilizeTickler()
|
105 a.arora 1.73 // has added an entry in the first position of the
106 // _entries array
107 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
108 mday 1.37 {
109 _MonitorEntry entry(0, 0, 0);
110 _entries.append(entry);
111 }
|
112 mday 1.18 }
|
113 mday 1.20
|
114 mike 1.2 Monitor::~Monitor()
115 {
|
116 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
117 a.dunfey 1.76
118 try{
|
119 a.dunfey 1.77 if(_tickle_peer_socket >= 0)
|
120 a.dunfey 1.76 {
121 Socket::close(_tickle_peer_socket);
122 }
|
123 a.dunfey 1.77 if(_tickle_client_socket >= 0)
|
124 a.dunfey 1.76 {
125 Socket::close(_tickle_client_socket);
126 }
|
127 a.dunfey 1.77 if(_tickle_server_socket >= 0)
|
128 a.dunfey 1.76 {
129 Socket::close(_tickle_server_socket);
130 }
131 }
132 catch(...)
133 {
134 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
135 "Failed to close tickle sockets");
136 }
137
|
138 mike 1.2 Socket::uninitializeInterface();
|
139 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
140 "returning from monitor destructor");
|
141 mday 1.18 }
142
|
143 a.arora 1.73 void Monitor::initializeTickler(){
|
144 r.kieninger 1.83 /*
145 NOTE: On any errors trying to
146 setup out tickle connection,
|
147 a.arora 1.73 throw an exception/end the server
148 */
149
150 /* setup the tickle server/listener */
151
152 // get a socket for the server side
|
153 david.dillard 1.95 if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
|
154 a.arora 1.73 //handle error
155 MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
156 "Received error number $0 while creating the internal socket.",
157 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
158 errno);
159 #else
160 WSAGetLastError());
161 #endif
162 throw Exception(parms);
163 }
164
165 // initialize the address
166 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
167 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
168 #pragma convert(37)
169 #endif
170 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
171 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
172 #pragma convert(0)
173 #endif
174 _tickle_server_addr.sin_family = PF_INET;
175 a.arora 1.73 _tickle_server_addr.sin_port = 0;
176
|
177 kumpf 1.86 PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
|
178 a.arora 1.73
179 // bind server side to socket
180 if((::bind(_tickle_server_socket,
|
181 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
182 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
183 // handle error
|
184 r.kieninger 1.83 #ifdef PEGASUS_OS_ZOS
185 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
186 "Received error:$0 while binding the internal socket.",strerror(errno));
187 #else
|
188 a.arora 1.73 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
189 "Received error number $0 while binding the internal socket.",
190 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
191 errno);
192 #else
193 WSAGetLastError());
194 #endif
|
195 r.kieninger 1.83 #endif
|
196 a.arora 1.73 throw Exception(parms);
197 }
198
199 // tell the kernel we are a server
200 if((::listen(_tickle_server_socket,3)) < 0){
201 // handle error
202 MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
203 "Received error number $0 while listening to the internal socket.",
204 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
205 errno);
206 #else
207 WSAGetLastError());
208 #endif
209 throw Exception(parms);
210 }
|
211 r.kieninger 1.83
|
212 a.arora 1.73 // make sure we have the correct socket for our server
213 int sock = ::getsockname(_tickle_server_socket,
|
214 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
215 &_addr_size);
|
216 a.arora 1.73 if(sock < 0){
217 // handle error
218 MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
219 "Received error number $0 while getting the internal socket name.",
220 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
221 errno);
222 #else
223 WSAGetLastError());
224 #endif
225 throw Exception(parms);
226 }
227
228 /* set up the tickle client/connector */
|
229 r.kieninger 1.83
|
230 a.arora 1.73 // get a socket for our tickle client
|
231 david.dillard 1.95 if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
|
232 a.arora 1.73 // handle error
233 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
234 "Received error number $0 while creating the internal client socket.",
235 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
236 errno);
237 #else
238 WSAGetLastError());
239 #endif
240 throw Exception(parms);
241 }
242
243 // setup the address of the client
244 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
245 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
246 #pragma convert(37)
247 #endif
248 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
249 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
250 #pragma convert(0)
251 #endif
252 _tickle_client_addr.sin_family = PF_INET;
253 a.arora 1.73 _tickle_client_addr.sin_port = 0;
254
255 // bind socket to client side
256 if((::bind(_tickle_client_socket,
|
257 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
|
258 a.arora 1.73 sizeof(_tickle_client_addr))) < 0){
259 // handle error
260 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
261 "Received error number $0 while binding the internal client socket.",
262 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
263 errno);
264 #else
265 WSAGetLastError());
266 #endif
267 throw Exception(parms);
268 }
269
270 // connect to server side
271 if((::connect(_tickle_client_socket,
|
272 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
273 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
274 // handle error
275 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
276 "Received error number $0 while connecting the internal client socket.",
277 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
278 errno);
279 #else
280 WSAGetLastError());
281 #endif
282 throw Exception(parms);
283 }
284
285 /* set up the slave connection */
286 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
|
287 kumpf 1.86 PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
|
288 r.kieninger 1.83 pegasus_sleep(1);
|
289 a.arora 1.73
290 // this call may fail, we will try a max of 20 times to establish this peer connection
291 if((_tickle_peer_socket = ::accept(_tickle_server_socket,
|
292 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
293 &peer_size)) < 0){
|
294 a.arora 1.73 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
295 // Only retry on non-windows platforms.
296 if(_tickle_peer_socket == -1 && errno == EAGAIN)
297 {
|
298 r.kieninger 1.83 int retries = 0;
|
299 a.arora 1.73 do
300 {
301 pegasus_sleep(1);
302 _tickle_peer_socket = ::accept(_tickle_server_socket,
|
303 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
304 &peer_size);
|
305 a.arora 1.73 retries++;
306 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
307 }
308 #endif
309 }
310 if(_tickle_peer_socket == -1){
311 // handle error
312 MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
313 "Received error number $0 while accepting the internal socket connection.",
314 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
315 errno);
316 #else
317 WSAGetLastError());
318 #endif
319 throw Exception(parms);
320 }
321 // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
322 // checks entries with IDLE state for events
323 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
324 entry._status = _MonitorEntry::IDLE;
325 _entries.append(entry);
326 a.arora 1.73 }
327
328 void Monitor::tickle(void)
329 {
|
330 sushma.fernandes 1.78 static char _buffer[] =
|
331 a.arora 1.73 {
332 '0','0'
333 };
|
334 r.kieninger 1.83
|
335 sushma.fernandes 1.78 AutoMutex autoMutex(_tickle_mutex);
|
336 r.kieninger 1.83 Socket::disableBlocking(_tickle_client_socket);
|
337 sushma.fernandes 1.78 Socket::write(_tickle_client_socket,&_buffer, 2);
|
338 r.kieninger 1.83 Socket::enableBlocking(_tickle_client_socket);
|
339 sushma.fernandes 1.78 }
340
341 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
342 {
343 // Set the state to requested state
344 _entries[index]._status = status;
|
345 a.arora 1.73 }
346
|
347 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
348 {
|
349 mday 1.18
|
350 mday 1.25 Boolean handled_events = false;
|
351 a.arora 1.73 int i = 0;
|
352 r.kieninger 1.83
|
353 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
354 a.arora 1.73
|
355 mday 1.25 fd_set fdread;
356 FD_ZERO(&fdread);
|
357 a.arora 1.73
|
358 kumpf 1.94 AutoMutex autoEntryMutex(_entry_mut);
|
359 r.kieninger 1.83
|
360 mike 1.100 ArrayIterator<_MonitorEntry> entries(_entries);
361
|
362 r.kieninger 1.83 // Check the stopConnections flag. If set, clear the Acceptor monitor entries
|
363 mike 1.96 if (_stopConnections.get() == 1)
|
364 kumpf 1.48 {
|
365 mike 1.100 for ( int indx = 0; indx < (int)entries.size(); indx++)
|
366 kumpf 1.48 {
|
367 mike 1.100 if (entries[indx]._type == Monitor::ACCEPTOR)
|
368 kumpf 1.48 {
|
369 mike 1.100 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
|
370 kumpf 1.48 {
|
371 mike 1.100 if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
372 entries[indx]._status.get() == _MonitorEntry::DYING )
|
373 kumpf 1.48 {
374 // remove the entry
|
375 mike 1.100 entries[indx]._status = _MonitorEntry::EMPTY;
|
376 kumpf 1.48 }
377 else
378 {
379 // set status to DYING
|
380 mike 1.100 entries[indx]._status = _MonitorEntry::DYING;
|
381 kumpf 1.48 }
382 }
383 }
384 }
385 _stopConnections = 0;
|
386 a.arora 1.73 _stopConnectionsSem.signal();
|
387 kumpf 1.48 }
|
388 kumpf 1.51
|
389 mike 1.100 for( int indx = 0; indx < (int)entries.size(); indx++)
|
390 kumpf 1.68 {
|
391 mike 1.100 const _MonitorEntry &entry = entries[indx];
|
392 mike 1.96 if ((entry._status.get() == _MonitorEntry::DYING) &&
|
393 brian.campbell 1.80 (entry._type == Monitor::CONNECTION))
|
394 kumpf 1.68 {
|
395 brian.campbell 1.80 MessageQueue *q = MessageQueue::lookup(entry.queueId);
|
396 kumpf 1.68 PEGASUS_ASSERT(q != 0);
|
397 brian.campbell 1.80 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
|
398 r.kieninger 1.83
|
399 brian.campbell 1.80 if (h._connectionClosePending == false)
400 continue;
401
402 // NOTE: do not attempt to delete while there are pending responses
|
403 r.kieninger 1.83 // coming thru. The last response to come thru after a
|
404 brian.campbell 1.80 // _connectionClosePending will reset _responsePending to false
405 // and then cause the monitor to rerun this code and clean up.
406 // (see HTTPConnection.cpp)
407
408 if (h._responsePending == true)
409 {
410 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
411 "Ignoring connection delete request because "
412 "responses are still pending. "
|
413 r.kieninger 1.83 "connection=0x%p, socket=%d\n",
|
414 brian.campbell 1.81 (void *)&h, h.getSocket());
|
415 brian.campbell 1.80 continue;
416 }
417 h._connectionClosePending = false;
418 MessageQueue &o = h.get_owner();
419 Message* message= new CloseConnectionMessage(entry.socket);
|
420 kumpf 1.68 message->dest = o.getQueueId();
421
|
422 r.kieninger 1.83 // HTTPAcceptor is responsible for closing the connection.
|
423 kumpf 1.68 // The lock is released to allow HTTPAcceptor to call
|
424 r.kieninger 1.83 // unsolicitSocketMessages to free the entry.
|
425 kumpf 1.68 // Once HTTPAcceptor completes processing of the close
426 // connection, the lock is re-requested and processing of
427 // the for loop continues. This is safe with the current
|
428 mike 1.100 // implementation of the entries object. Note that the
429 // loop condition accesses the entries.size() on each
|
430 kumpf 1.68 // iteration, so that a change in size while the mutex is
431 // unlocked will not result in an ArrayIndexOutOfBounds
432 // exception.
433
|
434 kumpf 1.94 autoEntryMutex.unlock();
|
435 kumpf 1.68 o.enqueue(message);
|
436 kumpf 1.94 autoEntryMutex.lock();
|
437 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
438 // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
439 entries.reset(_entries);
|
440 kumpf 1.68 }
441 }
442
|
443 kumpf 1.51 Uint32 _idleEntries = 0;
|
444 r.kieninger 1.83
|
445 a.arora 1.73 /*
|
446 david.dillard 1.95 We will keep track of the maximum socket number and pass this value
447 to the kernel as a parameter to SELECT. This loop seems like a good
448 place to calculate the max file descriptor (maximum socket number)
449 because we have to traverse the entire array.
|
450 r.kieninger 1.83 */
|
451 david.dillard 1.95 PEGASUS_SOCKET maxSocketCurrentPass = 0;
|
452 mike 1.100 for( int indx = 0; indx < (int)entries.size(); indx++)
|
453 mike 1.2 {
|
454 mike 1.100 if(maxSocketCurrentPass < entries[indx].socket)
455 maxSocketCurrentPass = entries[indx].socket;
|
456 a.arora 1.73
|
457 mike 1.100 if(entries[indx]._status.get() == _MonitorEntry::IDLE)
|
458 mday 1.25 {
|
459 david.dillard 1.95 _idleEntries++;
|
460 mike 1.100 FD_SET(entries[indx].socket, &fdread);
|
461 mday 1.25 }
|
462 mday 1.13 }
|
463 s.hills 1.62
|
464 a.arora 1.73 /*
|
465 david.dillard 1.95 Add 1 then assign maxSocket accordingly. We add 1 to account for
466 descriptors starting at 0.
|
467 a.arora 1.73 */
468 maxSocketCurrentPass++;
469
|
470 kumpf 1.94 autoEntryMutex.unlock();
|
471 david.dillard 1.95
472 //
473 // The first argument to select() is ignored on Windows and it is not
474 // a socket value. The original code assumed that the number of sockets
475 // and a socket value have the same type. On Windows they do not.
476 //
477 #ifdef PEGASUS_OS_TYPE_WINDOWS
478 int events = select(0, &fdread, NULL, NULL, &tv);
479 #else
|
480 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
481 david.dillard 1.95 #endif
|
482 kumpf 1.94 autoEntryMutex.lock();
|
483 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
484 // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
485 entries.reset(_entries);
|
486 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
487 kumpf 1.50 if(events == SOCKET_ERROR)
|
488 mike 1.2 #else
|
489 kumpf 1.50 if(events == -1)
|
490 mike 1.2 #endif
|
491 mday 1.13 {
|
492 kumpf 1.50 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
493 "Monitor::run - errorno = %d has occurred on select.", errno);
494 // The EBADF error indicates that one or more or the file
495 // descriptions was not valid. This could indicate that
|
496 mike 1.100 // the entries structure has been corrupted or that
|
497 kumpf 1.50 // we have a synchronization error.
498
499 PEGASUS_ASSERT(errno != EBADF);
500 }
501 else if (events)
502 {
|
503 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
504 r.kieninger 1.83 "Monitor::run select event received events = %d, monitoring %d idle entries",
|
505 david.dillard 1.95 events, _idleEntries);
|
506 mike 1.100 for( int indx = 0; indx < (int)entries.size(); indx++)
|
507 mday 1.25 {
|
508 kumpf 1.53 // The Monitor should only look at entries in the table that are IDLE (i.e.,
509 // owned by the Monitor).
|
510 mike 1.100 if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
511 (FD_ISSET(entries[indx].socket, &fdread)))
|
512 david.dillard 1.95 {
|
513 mike 1.100 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
|
514 kumpf 1.53 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
515 "Monitor::run indx = %d, queueId = %d, q = %p",
|
516 mike 1.100 indx, entries[indx].queueId, q);
|
517 kumpf 1.53 PEGASUS_ASSERT(q !=0);
|
518 mday 1.37
|
519 david.dillard 1.95 try
520 {
|
521 mike 1.100 if(entries[indx]._type == Monitor::CONNECTION)
|
522 david.dillard 1.95 {
|
523 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
524 mike 1.100 "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
|
525 david.dillard 1.95 static_cast<HTTPConnection *>(q)->_entry_index = indx;
|
526 sushma.fernandes 1.78
527 // Do not update the entry just yet. The entry gets updated once
|
528 r.kieninger 1.83 // the request has been read.
|
529 mike 1.100 //entries[indx]._status = _MonitorEntry::BUSY;
|
530 sushma.fernandes 1.78
|
531 kumpf 1.66 // If allocate_and_awaken failure, retry on next iteration
|
532 a.arora 1.73 /* Removed for PEP 183.
|
533 kumpf 1.69 if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
534 (void *)q, _dispatch))
|
535 kumpf 1.67 {
536 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
537 "Monitor::run: Insufficient resources to process request.");
|
538 mike 1.100 entries[indx]._status = _MonitorEntry::IDLE;
|
539 kumpf 1.67 return true;
540 }
|
541 a.arora 1.73 */
542 // Added for PEP 183
|
543 david.dillard 1.95 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
544 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
545 a.arora 1.73 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
546 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
547 try
548 {
549 dst->run(1);
550 }
|
551 david.dillard 1.95 catch (...)
552 {
553 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
554 "Monitor::_dispatch: exception received");
555 }
556 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
557 a.arora 1.73 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
558
|
559 sushma.fernandes 1.78 // It is possible the entry status may not be set to busy.
|
560 r.kieninger 1.83 // The following will fail in that case.
|
561 mike 1.96 // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
|
562 a.arora 1.73 // Once the HTTPConnection thread has set the status value to either
563 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
564 // to the Monitor. It is no longer permissible to access the connection
565 // or the entry in the _entries table.
|
566 sushma.fernandes 1.78
567 // The following is not relevant as the worker thread or the
568 // reader thread will update the status of the entry.
569 //if (dst->_connectionClosePending)
|
570 r.kieninger 1.83 //{
|
571 sushma.fernandes 1.78 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
572 //}
573 //else
574 //{
575 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
576 r.kieninger 1.83 //}
577 // end Added for PEP 183
|
578 a.arora 1.73 }
|
579 mike 1.100 else if( entries[indx]._type == Monitor::INTERNAL){
|
580 r.kieninger 1.83 // set ourself to BUSY,
581 // read the data
|
582 a.arora 1.73 // and set ourself back to IDLE
|
583 r.kieninger 1.83
|
584 mike 1.100 entries[indx]._status = _MonitorEntry::BUSY;
|
585 a.arora 1.73 static char buffer[2];
|
586 mike 1.100 Socket::disableBlocking(entries[indx].socket);
587 Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
588 Socket::enableBlocking(entries[indx].socket);
589 entries[indx]._status = _MonitorEntry::IDLE;
|
590 mday 1.37 }
591 else
|
592 mday 1.25 {
|
593 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
594 "Non-connection entry, indx = %d, has been received.", indx);
|
595 mday 1.37 int events = 0;
596 events |= SocketMessage::READ;
|
597 mike 1.100 Message *msg = new SocketMessage(entries[indx].socket, events);
598 entries[indx]._status = _MonitorEntry::BUSY;
|
599 kumpf 1.94 autoEntryMutex.unlock();
|
600 mday 1.37 q->enqueue(msg);
|
601 kumpf 1.94 autoEntryMutex.lock();
|
602 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
603 // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
604 entries.reset(_entries);
|
605 mike 1.100 entries[indx]._status = _MonitorEntry::IDLE;
|
606 kumpf 1.94
|
607 mday 1.25 return true;
608 }
609 }
|
610 mday 1.37 catch(...)
|
611 mday 1.25 {
612 }
613 handled_events = true;
614 }
615 }
|
616 mday 1.24 }
|
617 kumpf 1.94
|
618 mday 1.13 return(handled_events);
|
619 mike 1.2 }
620
|
621 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
622 kumpf 1.48 {
623 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
624 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
625 kumpf 1.48 _stopConnections = 1;
|
626 a.arora 1.73 tickle();
|
627 kumpf 1.48
|
628 chuck 1.74 if (wait)
|
629 a.arora 1.73 {
|
630 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
631 // caller of this function may unbind the ports while the monitor
632 // is still accepting connections on them.
|
633 kumpf 1.101 _stopConnectionsSem.wait();
|
634 a.arora 1.73 }
|
635 r.kieninger 1.83
|
636 kumpf 1.48 PEG_METHOD_EXIT();
637 }
|
638 mday 1.25
|
639 mday 1.37
|
640 mday 1.25 int Monitor::solicitSocketMessages(
|
641 david.dillard 1.95 PEGASUS_SOCKET socket,
|
642 mike 1.2 Uint32 events,
|
643 r.kieninger 1.83 Uint32 queueId,
|
644 mday 1.8 int type)
|
645 mike 1.2 {
|
646 r.kieninger 1.83 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
647 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
648 a.arora 1.73 // Check to see if we need to dynamically grow the _entries array
649 // We always want the _entries array to 2 bigger than the
650 // current connections requested
651 _solicitSocketCount++; // bump the count
652 int size = (int)_entries.size();
|
653 w.otsuka 1.89 if((int)_solicitSocketCount >= (size-1)){
654 for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
|
655 a.arora 1.73 _MonitorEntry entry(0, 0, 0);
656 _entries.append(entry);
657 }
658 }
|
659 kumpf 1.4
|
660 a.arora 1.73 int index;
661 for(index = 1; index < (int)_entries.size(); index++)
|
662 mday 1.25 {
|
663 a.arora 1.73 try
|
664 mday 1.37 {
|
665 mike 1.96 if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
|
666 a.arora 1.73 {
667 _entries[index].socket = socket;
668 _entries[index].queueId = queueId;
669 _entries[index]._type = type;
670 _entries[index]._status = _MonitorEntry::IDLE;
|
671 r.kieninger 1.83
|
672 a.arora 1.73 return index;
673 }
|
674 mday 1.37 }
675 catch(...)
|
676 mday 1.25 {
677 }
678 }
|
679 a.arora 1.73 _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful
|
680 mday 1.25 PEG_METHOD_EXIT();
|
681 kumpf 1.50 return -1;
|
682 a.arora 1.73
|
683 mike 1.2 }
684
|
685 david.dillard 1.95 void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
|
686 mike 1.2 {
|
687 kumpf 1.50
|
688 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
689 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
690 a.arora 1.73
691 /*
692 Start at index = 1 because _entries[0] is the tickle entry which never needs
693 to be EMPTY;
694 */
|
695 w.otsuka 1.89 unsigned int index;
|
696 a.arora 1.73 for(index = 1; index < _entries.size(); index++)
|
697 mike 1.2 {
|
698 mday 1.25 if(_entries[index].socket == socket)
699 {
|
700 a.arora 1.73 _entries[index]._status = _MonitorEntry::EMPTY;
|
701 david.dillard 1.95 _entries[index].socket = PEGASUS_INVALID_SOCKET;
|
702 a.arora 1.73 _solicitSocketCount--;
703 break;
|
704 mday 1.25 }
|
705 mike 1.2 }
|
706 a.arora 1.73
707 /*
708 Dynamic Contraction:
709 To remove excess entries we will start from the end of the _entries array
710 and remove all entries with EMPTY status until we find the first NON EMPTY.
711 This prevents the positions, of the NON EMPTY entries, from being changed.
|
712 r.kieninger 1.83 */
|
713 a.arora 1.73 index = _entries.size() - 1;
|
714 mike 1.96 while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
|
715 a.arora 1.73 if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
716 _entries.remove(index);
717 index--;
718 }
|
719 kumpf 1.4 PEG_METHOD_EXIT();
|
720 mike 1.2 }
721
|
722 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
723 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
724 {
|
725 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
726 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
727 kumpf 1.53 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
728 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
729 kumpf 1.51 try
730 {
731 dst->run(1);
732 }
733 catch (...)
734 {
735 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
736 "Monitor::_dispatch: exception received");
737 }
738 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
739 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
|
740 r.kieninger 1.83
|
741 mike 1.96 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
|
742 kumpf 1.68
743 // Once the HTTPConnection thread has set the status value to either
744 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
745 // to the Monitor. It is no longer permissible to access the connection
746 // or the entry in the _entries table.
|
747 kumpf 1.50 if (dst->_connectionClosePending)
748 {
|
749 kumpf 1.68 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
750 }
751 else
752 {
753 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
754 kumpf 1.50 }
|
755 mday 1.8 return 0;
|
756 mday 1.40 }
757
|
758 mike 1.2 PEGASUS_NAMESPACE_END
|