1 karl 1.103 //%2006////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mike 1.2 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
15 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
18 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.103 //
|
21 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
24 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
27 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Mike Brasher (mbrasher@bmc.com)
33 //
|
34 r.kieninger 1.83 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
|
35 a.arora 1.71 // Amit K Arora (Bug#1153) amita@in.ibm.com
|
36 alagaraja 1.75 // Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
|
37 sushma.fernandes 1.78 // Sushma Fernandes (sushma@hp.com) for Bug#2057
|
38 joyce.j 1.84 // Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
|
39 kumpf 1.85 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
40 mike 1.2 //
41 //%/////////////////////////////////////////////////////////////////////////////
42
|
43 mike 1.107 #include "Network.h"
|
44 mike 1.2 #include <Pegasus/Common/Config.h>
45 #include <cstring>
46 #include "Monitor.h"
47 #include "MessageQueue.h"
48 #include "Socket.h"
|
49 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
50 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
51 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
52 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
53 mike 1.100 #include "ArrayIterator.h"
|
54 mike 1.2
55 PEGASUS_USING_STD;
56
57 PEGASUS_NAMESPACE_BEGIN
58
|
59 mike 1.96 static AtomicInt _connections(0);
|
60 mday 1.25
|
61 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
62 //
|
63 mike 1.106 // _getError()
64 //
65 ////////////////////////////////////////////////////////////////////////////////
66
67 static inline int _getError()
68 {
69 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
70 mike 1.108 return WSAGetLastError();
|
71 mike 1.106 #else
72 return errno;
73 #endif
74 }
75
76 ////////////////////////////////////////////////////////////////////////////////
77 //
|
78 mike 1.2 // Monitor
79 //
80 ////////////////////////////////////////////////////////////////////////////////
81
|
82 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
83 mike 1.2 Monitor::Monitor()
|
84 kumpf 1.87 : _stopConnections(0),
|
85 a.arora 1.73 _stopConnectionsSem(0),
|
86 a.dunfey 1.76 _solicitSocketCount(0),
|
87 a.dunfey 1.77 _tickle_client_socket(-1),
88 _tickle_server_socket(-1),
89 _tickle_peer_socket(-1)
|
90 mike 1.2 {
|
91 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
92 mike 1.2 Socket::initializeInterface();
|
93 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
94 a.arora 1.73
95 // setup the tickler
96 initializeTickler();
97
|
98 r.kieninger 1.83 // Start the count at 1 because initilizeTickler()
|
99 a.arora 1.73 // has added an entry in the first position of the
100 // _entries array
101 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
102 mday 1.37 {
103 _MonitorEntry entry(0, 0, 0);
104 _entries.append(entry);
105 }
|
106 mday 1.18 }
|
107 mday 1.20
|
108 mike 1.2 Monitor::~Monitor()
109 {
|
110 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
111 a.dunfey 1.76
112 try{
|
113 a.dunfey 1.77 if(_tickle_peer_socket >= 0)
|
114 a.dunfey 1.76 {
115 Socket::close(_tickle_peer_socket);
116 }
|
117 a.dunfey 1.77 if(_tickle_client_socket >= 0)
|
118 a.dunfey 1.76 {
119 Socket::close(_tickle_client_socket);
120 }
|
121 a.dunfey 1.77 if(_tickle_server_socket >= 0)
|
122 a.dunfey 1.76 {
123 Socket::close(_tickle_server_socket);
124 }
125 }
126 catch(...)
127 {
128 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
129 "Failed to close tickle sockets");
130 }
131
|
132 mike 1.2 Socket::uninitializeInterface();
|
133 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
134 "returning from monitor destructor");
|
135 mday 1.18 }
136
|
137 a.arora 1.73 void Monitor::initializeTickler(){
|
138 r.kieninger 1.83 /*
139 NOTE: On any errors trying to
140 setup out tickle connection,
|
141 a.arora 1.73 throw an exception/end the server
142 */
143
144 /* setup the tickle server/listener */
145
146 // get a socket for the server side
|
147 david.dillard 1.95 if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
|
148 a.arora 1.73 //handle error
149 MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
150 "Received error number $0 while creating the internal socket.",
|
151 mike 1.106 _getError());
|
152 a.arora 1.73 throw Exception(parms);
153 }
|
154 marek 1.104
155 // set TCP_NODELAY
156 int opt = 1;
157 setsockopt(_tickle_server_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));
|
158 a.arora 1.73
159 // initialize the address
160 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
161 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
162 #pragma convert(37)
163 #endif
164 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
165 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
166 #pragma convert(0)
167 #endif
168 _tickle_server_addr.sin_family = PF_INET;
169 _tickle_server_addr.sin_port = 0;
170
|
171 mike 1.106 socklen_t _addr_size = sizeof(_tickle_server_addr);
|
172 a.arora 1.73
173 // bind server side to socket
174 if((::bind(_tickle_server_socket,
|
175 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
176 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
177 // handle error
|
178 r.kieninger 1.83 #ifdef PEGASUS_OS_ZOS
179 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
180 "Received error:$0 while binding the internal socket.",strerror(errno));
181 #else
|
182 a.arora 1.73 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
183 "Received error number $0 while binding the internal socket.",
|
184 mike 1.106 _getError());
|
185 r.kieninger 1.83 #endif
|
186 a.arora 1.73 throw Exception(parms);
187 }
188
189 // tell the kernel we are a server
190 if((::listen(_tickle_server_socket,3)) < 0){
191 // handle error
192 MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
193 "Received error number $0 while listening to the internal socket.",
|
194 mike 1.106 _getError());
|
195 a.arora 1.73 throw Exception(parms);
196 }
|
197 r.kieninger 1.83
|
198 a.arora 1.73 // make sure we have the correct socket for our server
199 int sock = ::getsockname(_tickle_server_socket,
|
200 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
201 &_addr_size);
|
202 a.arora 1.73 if(sock < 0){
203 // handle error
204 MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
205 "Received error number $0 while getting the internal socket name.",
|
206 mike 1.106 _getError());
|
207 a.arora 1.73 throw Exception(parms);
208 }
209
210 /* set up the tickle client/connector */
|
211 r.kieninger 1.83
|
212 a.arora 1.73 // get a socket for our tickle client
|
213 david.dillard 1.95 if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
|
214 a.arora 1.73 // handle error
215 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
216 "Received error number $0 while creating the internal client socket.",
|
217 mike 1.106 _getError());
|
218 a.arora 1.73 throw Exception(parms);
219 }
|
220 marek 1.104
221 // set TCP_NODELAY
222 setsockopt(_tickle_client_socket, IPPROTO_TCP, TCP_NODELAY, (char*)&opt, sizeof(opt));
|
223 a.arora 1.73
224 // setup the address of the client
225 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
226 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
227 #pragma convert(37)
228 #endif
229 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
230 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
231 #pragma convert(0)
232 #endif
233 _tickle_client_addr.sin_family = PF_INET;
234 _tickle_client_addr.sin_port = 0;
235
236 // bind socket to client side
237 if((::bind(_tickle_client_socket,
|
238 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
|
239 a.arora 1.73 sizeof(_tickle_client_addr))) < 0){
240 // handle error
241 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
242 "Received error number $0 while binding the internal client socket.",
|
243 mike 1.106 _getError());
|
244 a.arora 1.73 throw Exception(parms);
245 }
246
247 // connect to server side
248 if((::connect(_tickle_client_socket,
|
249 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
250 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
251 // handle error
252 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
253 "Received error number $0 while connecting the internal client socket.",
|
254 mike 1.106 _getError());
|
255 a.arora 1.73 throw Exception(parms);
256 }
257
258 /* set up the slave connection */
259 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
|
260 mike 1.106 socklen_t peer_size = sizeof(_tickle_peer_addr);
|
261 r.kieninger 1.83 pegasus_sleep(1);
|
262 a.arora 1.73
263 // this call may fail, we will try a max of 20 times to establish this peer connection
264 if((_tickle_peer_socket = ::accept(_tickle_server_socket,
|
265 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
266 &peer_size)) < 0){
|
267 a.arora 1.73 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
268 // Only retry on non-windows platforms.
269 if(_tickle_peer_socket == -1 && errno == EAGAIN)
270 {
|
271 r.kieninger 1.83 int retries = 0;
|
272 a.arora 1.73 do
273 {
274 pegasus_sleep(1);
275 _tickle_peer_socket = ::accept(_tickle_server_socket,
|
276 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
277 &peer_size);
|
278 a.arora 1.73 retries++;
279 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
280 }
281 #endif
282 }
283 if(_tickle_peer_socket == -1){
284 // handle error
285 MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
286 "Received error number $0 while accepting the internal socket connection.",
|
287 mike 1.106 _getError());
|
288 a.arora 1.73 throw Exception(parms);
289 }
290 // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
291 // checks entries with IDLE state for events
292 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
293 entry._status = _MonitorEntry::IDLE;
294 _entries.append(entry);
295 }
296
297 void Monitor::tickle(void)
298 {
|
299 sushma.fernandes 1.78 static char _buffer[] =
|
300 a.arora 1.73 {
301 '0','0'
302 };
|
303 r.kieninger 1.83
|
304 sushma.fernandes 1.78 AutoMutex autoMutex(_tickle_mutex);
|
305 r.kieninger 1.83 Socket::disableBlocking(_tickle_client_socket);
|
306 sushma.fernandes 1.78 Socket::write(_tickle_client_socket,&_buffer, 2);
|
307 r.kieninger 1.83 Socket::enableBlocking(_tickle_client_socket);
|
308 sushma.fernandes 1.78 }
309
310 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
311 {
312 // Set the state to requested state
313 _entries[index]._status = status;
|
314 a.arora 1.73 }
315
|
316 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
317 {
|
318 mday 1.18
|
319 mday 1.25 Boolean handled_events = false;
|
320 a.arora 1.73 int i = 0;
|
321 r.kieninger 1.83
|
322 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
323 a.arora 1.73
|
324 mday 1.25 fd_set fdread;
325 FD_ZERO(&fdread);
|
326 a.arora 1.73
|
327 kumpf 1.94 AutoMutex autoEntryMutex(_entry_mut);
|
328 r.kieninger 1.83
|
329 mike 1.100 ArrayIterator<_MonitorEntry> entries(_entries);
330
|
331 r.kieninger 1.83 // Check the stopConnections flag. If set, clear the Acceptor monitor entries
|
332 mike 1.96 if (_stopConnections.get() == 1)
|
333 kumpf 1.48 {
|
334 mike 1.100 for ( int indx = 0; indx < (int)entries.size(); indx++)
|
335 kumpf 1.48 {
|
336 mike 1.100 if (entries[indx]._type == Monitor::ACCEPTOR)
|
337 kumpf 1.48 {
|
338 mike 1.100 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
|
339 kumpf 1.48 {
|
340 mike 1.100 if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
341 entries[indx]._status.get() == _MonitorEntry::DYING )
|
342 kumpf 1.48 {
343 // remove the entry
|
344 mike 1.100 entries[indx]._status = _MonitorEntry::EMPTY;
|
345 kumpf 1.48 }
346 else
347 {
348 // set status to DYING
|
349 mike 1.100 entries[indx]._status = _MonitorEntry::DYING;
|
350 kumpf 1.48 }
351 }
352 }
353 }
354 _stopConnections = 0;
|
355 a.arora 1.73 _stopConnectionsSem.signal();
|
356 kumpf 1.48 }
|
357 kumpf 1.51
|
358 mike 1.100 for( int indx = 0; indx < (int)entries.size(); indx++)
|
359 kumpf 1.68 {
|
360 mike 1.100 const _MonitorEntry &entry = entries[indx];
|
361 mike 1.96 if ((entry._status.get() == _MonitorEntry::DYING) &&
|
362 brian.campbell 1.80 (entry._type == Monitor::CONNECTION))
|
363 kumpf 1.68 {
|
364 brian.campbell 1.80 MessageQueue *q = MessageQueue::lookup(entry.queueId);
|
365 kumpf 1.68 PEGASUS_ASSERT(q != 0);
|
366 brian.campbell 1.80 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
|
367 r.kieninger 1.83
|
368 brian.campbell 1.80 if (h._connectionClosePending == false)
369 continue;
370
371 // NOTE: do not attempt to delete while there are pending responses
|
372 r.kieninger 1.83 // coming thru. The last response to come thru after a
|
373 brian.campbell 1.80 // _connectionClosePending will reset _responsePending to false
374 // and then cause the monitor to rerun this code and clean up.
375 // (see HTTPConnection.cpp)
376
377 if (h._responsePending == true)
378 {
379 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
380 "Ignoring connection delete request because "
381 "responses are still pending. "
|
382 r.kieninger 1.83 "connection=0x%p, socket=%d\n",
|
383 brian.campbell 1.81 (void *)&h, h.getSocket());
|
384 brian.campbell 1.80 continue;
385 }
386 h._connectionClosePending = false;
387 MessageQueue &o = h.get_owner();
388 Message* message= new CloseConnectionMessage(entry.socket);
|
389 kumpf 1.68 message->dest = o.getQueueId();
390
|
391 r.kieninger 1.83 // HTTPAcceptor is responsible for closing the connection.
|
392 kumpf 1.68 // The lock is released to allow HTTPAcceptor to call
|
393 r.kieninger 1.83 // unsolicitSocketMessages to free the entry.
|
394 kumpf 1.68 // Once HTTPAcceptor completes processing of the close
395 // connection, the lock is re-requested and processing of
396 // the for loop continues. This is safe with the current
|
397 mike 1.100 // implementation of the entries object. Note that the
398 // loop condition accesses the entries.size() on each
|
399 kumpf 1.68 // iteration, so that a change in size while the mutex is
400 // unlocked will not result in an ArrayIndexOutOfBounds
401 // exception.
402
|
403 kumpf 1.94 autoEntryMutex.unlock();
|
404 kumpf 1.68 o.enqueue(message);
|
405 kumpf 1.94 autoEntryMutex.lock();
|
406 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
407 // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
408 entries.reset(_entries);
|
409 kumpf 1.68 }
410 }
411
|
412 kumpf 1.51 Uint32 _idleEntries = 0;
|
413 r.kieninger 1.83
|
414 a.arora 1.73 /*
|
415 david.dillard 1.95 We will keep track of the maximum socket number and pass this value
416 to the kernel as a parameter to SELECT. This loop seems like a good
417 place to calculate the max file descriptor (maximum socket number)
418 because we have to traverse the entire array.
|
419 r.kieninger 1.83 */
|
420 mike 1.106 SocketHandle maxSocketCurrentPass = 0;
|
421 mike 1.100 for( int indx = 0; indx < (int)entries.size(); indx++)
|
422 mike 1.2 {
|
423 mike 1.100 if(maxSocketCurrentPass < entries[indx].socket)
424 maxSocketCurrentPass = entries[indx].socket;
|
425 a.arora 1.73
|
426 mike 1.100 if(entries[indx]._status.get() == _MonitorEntry::IDLE)
|
427 mday 1.25 {
|
428 david.dillard 1.95 _idleEntries++;
|
429 mike 1.100 FD_SET(entries[indx].socket, &fdread);
|
430 mday 1.25 }
|
431 mday 1.13 }
|
432 s.hills 1.62
|
433 a.arora 1.73 /*
|
434 david.dillard 1.95 Add 1 then assign maxSocket accordingly. We add 1 to account for
435 descriptors starting at 0.
|
436 a.arora 1.73 */
437 maxSocketCurrentPass++;
438
|
439 kumpf 1.94 autoEntryMutex.unlock();
|
440 david.dillard 1.95
441 //
442 // The first argument to select() is ignored on Windows and it is not
443 // a socket value. The original code assumed that the number of sockets
444 // and a socket value have the same type. On Windows they do not.
445 //
446 #ifdef PEGASUS_OS_TYPE_WINDOWS
447 int events = select(0, &fdread, NULL, NULL, &tv);
448 #else
|
449 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
450 david.dillard 1.95 #endif
|
451 kumpf 1.94 autoEntryMutex.lock();
|
452 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
453 // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
454 entries.reset(_entries);
|
455 mike 1.106
456 if (events == PEGASUS_SOCKET_ERROR)
|
457 mday 1.13 {
|
458 kumpf 1.50 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
459 "Monitor::run - errorno = %d has occurred on select.", errno);
460 // The EBADF error indicates that one or more or the file
461 // descriptions was not valid. This could indicate that
|
462 mike 1.100 // the entries structure has been corrupted or that
|
463 kumpf 1.50 // we have a synchronization error.
464
465 PEGASUS_ASSERT(errno != EBADF);
466 }
467 else if (events)
468 {
|
469 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
470 r.kieninger 1.83 "Monitor::run select event received events = %d, monitoring %d idle entries",
|
471 david.dillard 1.95 events, _idleEntries);
|
472 mike 1.100 for( int indx = 0; indx < (int)entries.size(); indx++)
|
473 mday 1.25 {
|
474 kumpf 1.53 // The Monitor should only look at entries in the table that are IDLE (i.e.,
475 // owned by the Monitor).
|
476 mike 1.100 if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
477 (FD_ISSET(entries[indx].socket, &fdread)))
|
478 david.dillard 1.95 {
|
479 mike 1.100 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
|
480 kumpf 1.53 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
481 "Monitor::run indx = %d, queueId = %d, q = %p",
|
482 mike 1.100 indx, entries[indx].queueId, q);
|
483 kumpf 1.53 PEGASUS_ASSERT(q !=0);
|
484 mday 1.37
|
485 david.dillard 1.95 try
486 {
|
487 mike 1.100 if(entries[indx]._type == Monitor::CONNECTION)
|
488 david.dillard 1.95 {
|
489 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
490 mike 1.100 "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
|
491 david.dillard 1.95 static_cast<HTTPConnection *>(q)->_entry_index = indx;
|
492 sushma.fernandes 1.78
493 // Do not update the entry just yet. The entry gets updated once
|
494 r.kieninger 1.83 // the request has been read.
|
495 mike 1.100 //entries[indx]._status = _MonitorEntry::BUSY;
|
496 sushma.fernandes 1.78
|
497 kumpf 1.66 // If allocate_and_awaken failure, retry on next iteration
|
498 a.arora 1.73 /* Removed for PEP 183.
|
499 kumpf 1.69 if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
500 (void *)q, _dispatch))
|
501 kumpf 1.67 {
502 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
503 "Monitor::run: Insufficient resources to process request.");
|
504 mike 1.100 entries[indx]._status = _MonitorEntry::IDLE;
|
505 kumpf 1.67 return true;
506 }
|
507 a.arora 1.73 */
508 // Added for PEP 183
|
509 david.dillard 1.95 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
510 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
511 a.arora 1.73 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
512 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
513 try
514 {
515 dst->run(1);
516 }
|
517 david.dillard 1.95 catch (...)
518 {
519 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
520 "Monitor::_dispatch: exception received");
521 }
522 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
523 a.arora 1.73 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
524
|
525 sushma.fernandes 1.78 // It is possible the entry status may not be set to busy.
|
526 r.kieninger 1.83 // The following will fail in that case.
|
527 mike 1.96 // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
|
528 a.arora 1.73 // Once the HTTPConnection thread has set the status value to either
529 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
530 // to the Monitor. It is no longer permissible to access the connection
531 // or the entry in the _entries table.
|
532 sushma.fernandes 1.78
533 // The following is not relevant as the worker thread or the
534 // reader thread will update the status of the entry.
535 //if (dst->_connectionClosePending)
|
536 r.kieninger 1.83 //{
|
537 sushma.fernandes 1.78 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
538 //}
539 //else
540 //{
541 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
542 r.kieninger 1.83 //}
543 // end Added for PEP 183
|
544 a.arora 1.73 }
|
545 mike 1.100 else if( entries[indx]._type == Monitor::INTERNAL){
|
546 r.kieninger 1.83 // set ourself to BUSY,
547 // read the data
|
548 a.arora 1.73 // and set ourself back to IDLE
|
549 r.kieninger 1.83
|
550 mike 1.100 entries[indx]._status = _MonitorEntry::BUSY;
|
551 a.arora 1.73 static char buffer[2];
|
552 mike 1.100 Socket::disableBlocking(entries[indx].socket);
553 Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
554 Socket::enableBlocking(entries[indx].socket);
555 entries[indx]._status = _MonitorEntry::IDLE;
|
556 mday 1.37 }
557 else
|
558 mday 1.25 {
|
559 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
560 "Non-connection entry, indx = %d, has been received.", indx);
|
561 mday 1.37 int events = 0;
562 events |= SocketMessage::READ;
|
563 mike 1.100 Message *msg = new SocketMessage(entries[indx].socket, events);
564 entries[indx]._status = _MonitorEntry::BUSY;
|
565 kumpf 1.94 autoEntryMutex.unlock();
|
566 mday 1.37 q->enqueue(msg);
|
567 kumpf 1.94 autoEntryMutex.lock();
|
568 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
569 // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
570 entries.reset(_entries);
|
571 mike 1.100 entries[indx]._status = _MonitorEntry::IDLE;
|
572 kumpf 1.94
|
573 mday 1.25 return true;
574 }
575 }
|
576 mday 1.37 catch(...)
|
577 mday 1.25 {
578 }
579 handled_events = true;
580 }
581 }
|
582 mday 1.24 }
|
583 kumpf 1.94
|
584 mday 1.13 return(handled_events);
|
585 mike 1.2 }
586
|
587 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
588 kumpf 1.48 {
589 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
590 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
591 kumpf 1.48 _stopConnections = 1;
|
592 a.arora 1.73 tickle();
|
593 kumpf 1.48
|
594 chuck 1.74 if (wait)
|
595 a.arora 1.73 {
|
596 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
597 // caller of this function may unbind the ports while the monitor
598 // is still accepting connections on them.
|
599 kumpf 1.101 _stopConnectionsSem.wait();
|
600 a.arora 1.73 }
|
601 r.kieninger 1.83
|
602 kumpf 1.48 PEG_METHOD_EXIT();
603 }
|
604 mday 1.25
|
605 mday 1.37
|
606 mday 1.25 int Monitor::solicitSocketMessages(
|
607 mike 1.106 SocketHandle socket,
|
608 mike 1.2 Uint32 events,
|
609 r.kieninger 1.83 Uint32 queueId,
|
610 mday 1.8 int type)
|
611 mike 1.2 {
|
612 r.kieninger 1.83 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
613 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
614 a.arora 1.73 // Check to see if we need to dynamically grow the _entries array
615 // We always want the _entries array to 2 bigger than the
616 // current connections requested
617 _solicitSocketCount++; // bump the count
618 int size = (int)_entries.size();
|
619 w.otsuka 1.89 if((int)_solicitSocketCount >= (size-1)){
620 for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
|
621 a.arora 1.73 _MonitorEntry entry(0, 0, 0);
622 _entries.append(entry);
623 }
624 }
|
625 kumpf 1.4
|
626 a.arora 1.73 int index;
627 for(index = 1; index < (int)_entries.size(); index++)
|
628 mday 1.25 {
|
629 a.arora 1.73 try
|
630 mday 1.37 {
|
631 mike 1.96 if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
|
632 a.arora 1.73 {
633 _entries[index].socket = socket;
634 _entries[index].queueId = queueId;
635 _entries[index]._type = type;
636 _entries[index]._status = _MonitorEntry::IDLE;
|
637 r.kieninger 1.83
|
638 a.arora 1.73 return index;
639 }
|
640 mday 1.37 }
641 catch(...)
|
642 mday 1.25 {
643 }
644 }
|
645 a.arora 1.73 _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful
|
646 mday 1.25 PEG_METHOD_EXIT();
|
647 kumpf 1.50 return -1;
|
648 a.arora 1.73
|
649 mike 1.2 }
650
|
651 mike 1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
|
652 mike 1.2 {
|
653 kumpf 1.50
|
654 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
655 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
656 a.arora 1.73
657 /*
658 Start at index = 1 because _entries[0] is the tickle entry which never needs
659 to be EMPTY;
660 */
|
661 w.otsuka 1.89 unsigned int index;
|
662 a.arora 1.73 for(index = 1; index < _entries.size(); index++)
|
663 mike 1.2 {
|
664 mday 1.25 if(_entries[index].socket == socket)
665 {
|
666 a.arora 1.73 _entries[index]._status = _MonitorEntry::EMPTY;
|
667 david.dillard 1.95 _entries[index].socket = PEGASUS_INVALID_SOCKET;
|
668 a.arora 1.73 _solicitSocketCount--;
669 break;
|
670 mday 1.25 }
|
671 mike 1.2 }
|
672 a.arora 1.73
673 /*
674 Dynamic Contraction:
675 To remove excess entries we will start from the end of the _entries array
676 and remove all entries with EMPTY status until we find the first NON EMPTY.
677 This prevents the positions, of the NON EMPTY entries, from being changed.
|
678 r.kieninger 1.83 */
|
679 a.arora 1.73 index = _entries.size() - 1;
|
680 mike 1.96 while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
|
681 a.arora 1.73 if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
682 _entries.remove(index);
683 index--;
684 }
|
685 kumpf 1.4 PEG_METHOD_EXIT();
|
686 mike 1.2 }
687
|
688 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
689 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
690 {
|
691 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
692 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
693 kumpf 1.53 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
694 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
695 kumpf 1.51 try
696 {
697 dst->run(1);
698 }
699 catch (...)
700 {
701 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
702 "Monitor::_dispatch: exception received");
703 }
704 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
705 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
|
706 r.kieninger 1.83
|
707 mike 1.96 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
|
708 kumpf 1.68
709 // Once the HTTPConnection thread has set the status value to either
710 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
711 // to the Monitor. It is no longer permissible to access the connection
712 // or the entry in the _entries table.
|
713 kumpf 1.50 if (dst->_connectionClosePending)
714 {
|
715 kumpf 1.68 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
716 }
717 else
718 {
719 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
720 kumpf 1.50 }
|
721 mday 1.8 return 0;
|
722 mday 1.40 }
723
|
724 mike 1.2 PEGASUS_NAMESPACE_END
|