1 karl 1.103 //%2006////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mike 1.2 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
15 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
18 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.103 //
|
21 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
24 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
27 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
|
34 mike 1.107 #include "Network.h"
|
35 mike 1.2 #include <Pegasus/Common/Config.h>
36 #include <cstring>
37 #include "Monitor.h"
38 #include "MessageQueue.h"
39 #include "Socket.h"
|
40 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
41 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
42 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
43 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
44 mike 1.100 #include "ArrayIterator.h"
|
45 mike 1.113 #include <errno.h>
|
46 mike 1.2
47 PEGASUS_USING_STD;
48
49 PEGASUS_NAMESPACE_BEGIN
50
|
51 mike 1.96 static AtomicInt _connections(0);
|
52 mday 1.25
|
53 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
54 //
55 // Monitor
56 //
57 ////////////////////////////////////////////////////////////////////////////////
58
|
59 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
60 mike 1.2 Monitor::Monitor()
|
61 kumpf 1.87 : _stopConnections(0),
|
62 a.arora 1.73 _stopConnectionsSem(0),
|
63 a.dunfey 1.76 _solicitSocketCount(0),
|
64 a.dunfey 1.77 _tickle_client_socket(-1),
65 _tickle_server_socket(-1),
66 _tickle_peer_socket(-1)
|
67 mike 1.2 {
|
68 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
69 mike 1.2 Socket::initializeInterface();
|
70 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
71 a.arora 1.73
72 // setup the tickler
73 initializeTickler();
74
|
75 r.kieninger 1.83 // Start the count at 1 because initilizeTickler()
|
76 a.arora 1.73 // has added an entry in the first position of the
77 // _entries array
|
78 kumpf 1.116 for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
|
79 mday 1.37 {
80 _MonitorEntry entry(0, 0, 0);
81 _entries.append(entry);
82 }
|
83 mday 1.18 }
|
84 mday 1.20
|
85 mike 1.2 Monitor::~Monitor()
86 {
|
87 thilo.boehm 1.115 uninitializeTickler();
88 Socket::uninitializeInterface();
|
89 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
90 thilo.boehm 1.115 "returning from monitor destructor");
91 }
|
92 kumpf 1.116 void Monitor::uninitializeTickler()
93 {
|
94 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
95 a.dunfey 1.76
|
96 kumpf 1.116 try
97 {
98 if (_tickle_peer_socket >= 0)
|
99 a.dunfey 1.76 {
100 Socket::close(_tickle_peer_socket);
101 }
|
102 kumpf 1.116 if (_tickle_client_socket >= 0)
|
103 a.dunfey 1.76 {
104 Socket::close(_tickle_client_socket);
105 }
|
106 kumpf 1.116 if (_tickle_server_socket >= 0)
|
107 a.dunfey 1.76 {
108 Socket::close(_tickle_server_socket);
109 }
110 }
|
111 kumpf 1.116 catch (...)
|
112 a.dunfey 1.76 {
|
113 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
114 a.dunfey 1.76 "Failed to close tickle sockets");
115 }
116
|
117 mday 1.18 }
118
|
119 kumpf 1.116 void Monitor::initializeTickler()
120 {
|
121 r.kieninger 1.83 /*
122 NOTE: On any errors trying to
123 setup out tickle connection,
|
124 a.arora 1.73 throw an exception/end the server
125 */
126
127 /* setup the tickle server/listener */
|
128 kumpf 1.116 // try until the tcpip is restarted
|
129 thilo.boehm 1.115 do
130 {
131 // get a socket for the server side
|
132 kumpf 1.116 if ((_tickle_server_socket =
133 Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==
134 PEGASUS_INVALID_SOCKET)
135 {
136 MessageLoaderParms parms(
137 "Common.Monitor.TICKLE_CREATE",
138 "Received error number $0 while creating the internal socket.",
139 getSocketError());
|
140 thilo.boehm 1.115 throw Exception(parms);
141 }
|
142 a.arora 1.73
|
143 thilo.boehm 1.115 // initialize the address
144 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
|
145 a.arora 1.73 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
146 #pragma convert(37)
147 #endif
|
148 thilo.boehm 1.115 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
|
149 a.arora 1.73 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
150 #pragma convert(0)
151 #endif
|
152 thilo.boehm 1.115 _tickle_server_addr.sin_family = PF_INET;
153 _tickle_server_addr.sin_port = 0;
|
154 a.arora 1.73
|
155 thilo.boehm 1.115 SocketLength _addr_size = sizeof(_tickle_server_addr);
|
156 a.arora 1.73
|
157 thilo.boehm 1.115 // bind server side to socket
|
158 kumpf 1.116 if ((::bind(_tickle_server_socket,
159 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
160 sizeof(_tickle_server_addr))) < 0)
|
161 thilo.boehm 1.115 {
|
162 r.kieninger 1.83 #ifdef PEGASUS_OS_ZOS
|
163 kumpf 1.116 MessageLoaderParms parms(
164 "Common.Monitor.TICKLE_BIND_LONG",
165 "Received error:$0 while binding the internal socket.",
166 strerror(errno));
|
167 r.kieninger 1.83 #else
|
168 kumpf 1.116 MessageLoaderParms parms(
169 "Common.Monitor.TICKLE_BIND",
170 "Received error number $0 while binding the internal socket.",
171 getSocketError());
|
172 r.kieninger 1.83 #endif
|
173 thilo.boehm 1.115 throw Exception(parms);
174 }
175
176 // tell the kernel we are a server
|
177 kumpf 1.116 if ((::listen(_tickle_server_socket, 3)) < 0)
|
178 thilo.boehm 1.115 {
|
179 kumpf 1.116 MessageLoaderParms parms(
180 "Common.Monitor.TICKLE_LISTEN",
181 "Received error number $0 while listening to the internal "
182 "socket.",
183 getSocketError());
|
184 thilo.boehm 1.115 throw Exception(parms);
185 }
186
187 // make sure we have the correct socket for our server
|
188 kumpf 1.116 int sock = ::getsockname(
189 _tickle_server_socket,
190 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
191 &_addr_size);
192 if (sock < 0)
193 {
194 MessageLoaderParms parms(
195 "Common.Monitor.TICKLE_SOCKNAME",
196 "Received error number $0 while getting the internal socket "
197 "name.",
198 getSocketError());
|
199 thilo.boehm 1.115 throw Exception(parms);
200 }
|
201 a.arora 1.73
|
202 thilo.boehm 1.115 /* set up the tickle client/connector */
|
203 r.kieninger 1.83
|
204 thilo.boehm 1.115 // get a socket for our tickle client
|
205 kumpf 1.116 if ((_tickle_client_socket =
206 Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==
207 PEGASUS_INVALID_SOCKET)
208 {
209 MessageLoaderParms parms(
210 "Common.Monitor.TICKLE_CLIENT_CREATE",
211 "Received error number $0 while creating the internal client "
212 "socket.",
213 getSocketError());
|
214 thilo.boehm 1.115 throw Exception(parms);
215 }
|
216 a.arora 1.73
|
217 thilo.boehm 1.115 // setup the address of the client
218 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
|
219 a.arora 1.73 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
|
220 kumpf 1.116 # pragma convert(37)
|
221 a.arora 1.73 #endif
|
222 thilo.boehm 1.115 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
|
223 a.arora 1.73 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
|
224 kumpf 1.116 # pragma convert(0)
|
225 a.arora 1.73 #endif
|
226 thilo.boehm 1.115 _tickle_client_addr.sin_family = PF_INET;
227 _tickle_client_addr.sin_port = 0;
|
228 a.arora 1.73
|
229 thilo.boehm 1.115 // bind socket to client side
|
230 kumpf 1.116 if ((::bind(_tickle_client_socket,
231 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
232 sizeof(_tickle_client_addr))) < 0)
233 {
234 MessageLoaderParms parms(
235 "Common.Monitor.TICKLE_CLIENT_BIND",
236 "Received error number $0 while binding the internal client "
237 "socket.",
238 getSocketError());
|
239 thilo.boehm 1.115 throw Exception(parms);
240 }
241
242 // connect to server side
|
243 kumpf 1.116 if ((::connect(_tickle_client_socket,
244 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
245 sizeof(_tickle_server_addr))) < 0)
246 {
247 MessageLoaderParms parms(
248 "Common.Monitor.TICKLE_CLIENT_CONNECT",
249 "Received error number $0 while connecting the internal "
250 "client socket.",
251 getSocketError());
|
252 thilo.boehm 1.115 throw Exception(parms);
253 }
254
255 /* set up the slave connection */
256 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
257 SocketLength peer_size = sizeof(_tickle_peer_addr);
258 Threads::sleep(1);
259
|
260 kumpf 1.116 // this call may fail, we will try a max of 20 times to establish
261 // this peer connection
262 if ((_tickle_peer_socket = ::accept(_tickle_server_socket,
263 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
264 &peer_size)) < 0)
|
265 thilo.boehm 1.115 {
|
266 kumpf 1.116 if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
267 getSocketError() == PEGASUS_NETWORK_TRYAGAIN)
|
268 thilo.boehm 1.115 {
269 int retries = 0;
270 do
271 {
272 Threads::sleep(1);
|
273 kumpf 1.116 _tickle_peer_socket = ::accept(
274 _tickle_server_socket,
275 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
276 &peer_size);
|
277 thilo.boehm 1.115 retries++;
|
278 kumpf 1.116 } while (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
279 getSocketError() == PEGASUS_NETWORK_TRYAGAIN &&
280 retries < 20);
|
281 thilo.boehm 1.115 }
282 // TCP/IP is down, destroy sockets and retry again.
|
283 kumpf 1.116 if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
284 getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
|
285 thilo.boehm 1.115 {
286 // destroy everything
287 uninitializeTickler();
288 // retry again.
289 continue;
290 }
291 }
|
292 kumpf 1.116 if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR)
|
293 thilo.boehm 1.115 {
|
294 kumpf 1.116 MessageLoaderParms parms(
295 "Common.Monitor.TICKLE_ACCEPT",
296 "Received error number $0 while accepting the internal "
297 "socket connection.",
298 getSocketError());
|
299 thilo.boehm 1.115 throw Exception(parms);
|
300 kumpf 1.116 }
301 else
|
302 thilo.boehm 1.115 {
303 // socket is ok
304 break;
|
305 a.arora 1.73 }
|
306 kumpf 1.116 } while (1); // try until TCP/IP is restarted
|
307 marek 1.111
308 Socket::disableBlocking(_tickle_peer_socket);
309 Socket::disableBlocking(_tickle_client_socket);
310
|
311 kumpf 1.116 // add the tickler to the list of entries to be monitored and set to
312 // IDLE because Monitor only
|
313 a.arora 1.73 // checks entries with IDLE state for events
314 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
315 entry._status = _MonitorEntry::IDLE;
|
316 thilo.boehm 1.115
317 // is the tickler initalized as first socket on startup ?
318 if (_entries.size()==0)
319 {
320 // if yes, append a new entry
321 _entries.append(entry);
322 }
323 else
324 {
325 // if not, overwrite the tickler entry with new socket
326 _entries[0]=entry;
327 }
|
328 a.arora 1.73 }
329
|
330 kumpf 1.116 void Monitor::tickle()
|
331 a.arora 1.73 {
|
332 sushma.fernandes 1.78 static char _buffer[] =
|
333 a.arora 1.73 {
334 '0','0'
335 };
|
336 r.kieninger 1.83
|
337 sushma.fernandes 1.78 AutoMutex autoMutex(_tickle_mutex);
338 Socket::write(_tickle_client_socket,&_buffer, 2);
339 }
340
341 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
342 {
343 // Set the state to requested state
344 _entries[index]._status = status;
|
345 a.arora 1.73 }
346
|
347 kumpf 1.114 void Monitor::run(Uint32 milliseconds)
|
348 mike 1.2 {
|
349 mday 1.18
|
350 r.kieninger 1.83
|
351 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
352 a.arora 1.73
|
353 mday 1.25 fd_set fdread;
354 FD_ZERO(&fdread);
|
355 a.arora 1.73
|
356 kumpf 1.94 AutoMutex autoEntryMutex(_entry_mut);
|
357 r.kieninger 1.83
|
358 mike 1.100 ArrayIterator<_MonitorEntry> entries(_entries);
359
|
360 kumpf 1.116 // Check the stopConnections flag. If set, clear the Acceptor monitor
361 // entries
|
362 mike 1.96 if (_stopConnections.get() == 1)
|
363 kumpf 1.48 {
|
364 mike 1.100 for ( int indx = 0; indx < (int)entries.size(); indx++)
|
365 kumpf 1.48 {
|
366 mike 1.100 if (entries[indx]._type == Monitor::ACCEPTOR)
|
367 kumpf 1.48 {
|
368 mike 1.100 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
|
369 kumpf 1.48 {
|
370 mike 1.100 if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
371 entries[indx]._status.get() == _MonitorEntry::DYING )
|
372 kumpf 1.48 {
373 // remove the entry
|
374 kumpf 1.116 entries[indx]._status = _MonitorEntry::EMPTY;
|
375 kumpf 1.48 }
376 else
377 {
378 // set status to DYING
|
379 mike 1.100 entries[indx]._status = _MonitorEntry::DYING;
|
380 kumpf 1.48 }
381 }
382 }
383 }
384 _stopConnections = 0;
|
385 kumpf 1.116 _stopConnectionsSem.signal();
|
386 kumpf 1.48 }
|
387 kumpf 1.51
|
388 kumpf 1.116 for (int indx = 0; indx < (int)entries.size(); indx++)
|
389 kumpf 1.68 {
|
390 kumpf 1.116 const _MonitorEntry &entry = entries[indx];
391 if ((entry._status.get() == _MonitorEntry::DYING) &&
392 (entry._type == Monitor::CONNECTION))
393 {
394 MessageQueue *q = MessageQueue::lookup(entry.queueId);
395 PEGASUS_ASSERT(q != 0);
396 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
397
398 if (h._connectionClosePending == false)
399 continue;
400
401 // NOTE: do not attempt to delete while there are pending responses
402 // coming thru. The last response to come thru after a
403 // _connectionClosePending will reset _responsePending to false
404 // and then cause the monitor to rerun this code and clean up.
405 // (see HTTPConnection.cpp)
406
407 if (h._responsePending == true)
408 {
|
409 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
410 kumpf 1.116 "Monitor::run - Ignoring connection delete request "
411 "because responses are still pending. "
412 "connection=0x%p, socket=%d\n",
|
413 marek 1.118 (void *)&h, h.getSocket()));
|
414 kumpf 1.116 continue;
415 }
416 h._connectionClosePending = false;
417 MessageQueue &o = h.get_owner();
418 Message* message= new CloseConnectionMessage(entry.socket);
419 message->dest = o.getQueueId();
420
421 // HTTPAcceptor is responsible for closing the connection.
422 // The lock is released to allow HTTPAcceptor to call
423 // unsolicitSocketMessages to free the entry.
424 // Once HTTPAcceptor completes processing of the close
425 // connection, the lock is re-requested and processing of
426 // the for loop continues. This is safe with the current
427 // implementation of the entries object. Note that the
428 // loop condition accesses the entries.size() on each
429 // iteration, so that a change in size while the mutex is
430 // unlocked will not result in an ArrayIndexOutOfBounds
431 // exception.
432
433 _entry_mut.unlock();
434 o.enqueue(message);
435 kumpf 1.116 _entry_mut.lock();
436
437 // After enqueue a message and the autoEntryMutex has been
438 // released and locked again, the array of _entries can be
439 // changed. The ArrayIterator has be reset with the original
440 // _entries.
441 entries.reset(_entries);
442 }
|
443 kumpf 1.68 }
444
|
445 kumpf 1.51 Uint32 _idleEntries = 0;
|
446 r.kieninger 1.83
|
447 a.arora 1.73 /*
|
448 david.dillard 1.95 We will keep track of the maximum socket number and pass this value
449 to the kernel as a parameter to SELECT. This loop seems like a good
450 place to calculate the max file descriptor (maximum socket number)
451 because we have to traverse the entire array.
|
452 r.kieninger 1.83 */
|
453 mike 1.106 SocketHandle maxSocketCurrentPass = 0;
|
454 kumpf 1.116 for (int indx = 0; indx < (int)entries.size(); indx++)
|
455 mike 1.2 {
|
456 kumpf 1.116 if (maxSocketCurrentPass < entries[indx].socket)
457 maxSocketCurrentPass = entries[indx].socket;
|
458 a.arora 1.73
|
459 kumpf 1.116 if (entries[indx]._status.get() == _MonitorEntry::IDLE)
|
460 mday 1.25 {
|
461 david.dillard 1.95 _idleEntries++;
|
462 mike 1.100 FD_SET(entries[indx].socket, &fdread);
|
463 mday 1.25 }
|
464 mday 1.13 }
|
465 s.hills 1.62
|
466 a.arora 1.73 /*
|
467 david.dillard 1.95 Add 1 then assign maxSocket accordingly. We add 1 to account for
468 descriptors starting at 0.
|
469 a.arora 1.73 */
470 maxSocketCurrentPass++;
471
|
472 mike 1.112 _entry_mut.unlock();
|
473 david.dillard 1.95
474 //
475 // The first argument to select() is ignored on Windows and it is not
476 // a socket value. The original code assumed that the number of sockets
477 // and a socket value have the same type. On Windows they do not.
478 //
479 #ifdef PEGASUS_OS_TYPE_WINDOWS
480 int events = select(0, &fdread, NULL, NULL, &tv);
481 #else
|
482 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
483 david.dillard 1.95 #endif
|
484 mike 1.112 _entry_mut.lock();
|
485 kumpf 1.116
486 // After enqueue a message and the autoEntryMutex has been released and
487 // locked again, the array of _entries can be changed. The ArrayIterator
488 // has be reset with the original _entries
|
489 r.kieninger 1.102 entries.reset(_entries);
|
490 mike 1.106
491 if (events == PEGASUS_SOCKET_ERROR)
|
492 mday 1.13 {
|
493 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
494 "Monitor::run - errorno = %d has occurred on select.", errno));
|
495 kumpf 1.116 // The EBADF error indicates that one or more or the file
496 // descriptions was not valid. This could indicate that
497 // the entries structure has been corrupted or that
498 // we have a synchronization error.
|
499 kumpf 1.50
|
500 kumpf 1.116 PEGASUS_ASSERT(errno != EBADF);
|
501 kumpf 1.50 }
502 else if (events)
503 {
|
504 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
505 kumpf 1.116 "Monitor::run select event received events = %d, monitoring %d "
506 "idle entries",
|
507 marek 1.118 events, _idleEntries));
|
508 kumpf 1.116 for (int indx = 0; indx < (int)entries.size(); indx++)
509 {
510 // The Monitor should only look at entries in the table that are
511 // IDLE (i.e., owned by the Monitor).
512 if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
513 (FD_ISSET(entries[indx].socket, &fdread)))
514 {
515 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
|
516 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
517 kumpf 1.116 "Monitor::run indx = %d, queueId = %d, q = %p",
|
518 marek 1.118 indx, entries[indx].queueId, q));
|
519 kumpf 1.116 PEGASUS_ASSERT(q !=0);
520
521 try
|
522 david.dillard 1.95 {
|
523 kumpf 1.116 if (entries[indx]._type == Monitor::CONNECTION)
524 {
|
525 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
526 kumpf 1.116 "entries[indx].type for indx = %d is "
527 "Monitor::CONNECTION",
|
528 marek 1.118 indx));
|
529 kumpf 1.116 static_cast<HTTPConnection *>(q)->_entry_index = indx;
530
531 // Do not update the entry just yet. The entry gets
532 // updated once the request has been read.
533 //entries[indx]._status = _MonitorEntry::BUSY;
|
534 sushma.fernandes 1.78
|
535 kumpf 1.116 // If allocate_and_awaken failure, retry on next
536 // iteration
|
537 a.arora 1.73 /* Removed for PEP 183.
|
538 kumpf 1.116 if (!MessageQueueService::get_thread_pool()->
539 allocate_and_awaken((void *)q, _dispatch))
540 {
|
541 kumpf 1.119 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA,
542 Tracer::LEVEL2,
|
543 kumpf 1.116 "Monitor::run: Insufficient resources to "
544 "process request.");
545 entries[indx]._status = _MonitorEntry::IDLE;
546 return true;
547 }
|
548 a.arora 1.73 */
549 // Added for PEP 183
|
550 kumpf 1.116 HTTPConnection *dst =
551 reinterpret_cast<HTTPConnection *>(q);
|
552 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
553 kumpf 1.116 "Monitor::_dispatch: entering run() for "
554 "indx = %d, queueId = %d, q = %p",
555 dst->_entry_index,
556 dst->_monitor->_entries[dst->_entry_index].queueId,
|
557 marek 1.118 dst));
|
558 kumpf 1.116
559 try
560 {
561 dst->run(1);
562 }
563 catch (...)
564 {
|
565 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
566 kumpf 1.116 "Monitor::_dispatch: exception received");
567 }
|
568 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
569 kumpf 1.116 "Monitor::_dispatch: exited run() for index %d",
|
570 marek 1.118 dst->_entry_index));
|
571 kumpf 1.116
572 // It is possible the entry status may not be set to
573 // busy. The following will fail in that case.
574 // PEGASUS_ASSERT(dst->_monitor->_entries[
575 // dst->_entry_index]._status.get() ==
576 // _MonitorEntry::BUSY);
577 // Once the HTTPConnection thread has set the status
578 // value to either Monitor::DYING or Monitor::IDLE,
579 // it has returned control of the connection to the
580 // Monitor. It is no longer permissible to access
581 // the connection or the entry in the _entries table.
582
583 // The following is not relevant as the worker thread
584 // or the reader thread will update the status of the
585 // entry.
586 //if (dst->_connectionClosePending)
587 //{
588 // dst->_monitor->_entries[dst->_entry_index]._status =
589 // _MonitorEntry::DYING;
590 //}
591 //else
592 kumpf 1.116 //{
593 // dst->_monitor->_entries[dst->_entry_index]._status =
594 // _MonitorEntry::IDLE;
595 //}
|
596 r.kieninger 1.83 // end Added for PEP 183
|
597 kumpf 1.116 }
598 else if (entries[indx]._type == Monitor::INTERNAL)
599 {
600 // set ourself to BUSY,
|
601 r.kieninger 1.83 // read the data
|
602 a.arora 1.73 // and set ourself back to IDLE
|
603 r.kieninger 1.83
|
604 kumpf 1.116 entries[indx]._status = _MonitorEntry::BUSY;
605 static char buffer[2];
606 Sint32 amt =
607 Socket::read(entries[indx].socket,&buffer, 2);
608
609 if (amt == PEGASUS_SOCKET_ERROR &&
610 getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
611 {
|
612 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
613 kumpf 1.116 "Monitor::run: Tickler socket got an IO error. "
614 "Going to re-create Socket and wait for "
615 "TCP/IP restart.");
616 uninitializeTickler();
617 initializeTickler();
618 }
619 else
620 {
621 entries[indx]._status = _MonitorEntry::IDLE;
622 }
623 }
624 else
625 {
|
626 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
627 kumpf 1.116 "Non-connection entry, indx = %d, has been "
628 "received.",
|
629 marek 1.118 indx));
|
630 kumpf 1.116 int events = 0;
631 events |= SocketMessage::READ;
632 Message* msg = new SocketMessage(
633 entries[indx].socket, events);
634 entries[indx]._status = _MonitorEntry::BUSY;
635 _entry_mut.unlock();
636 q->enqueue(msg);
637 _entry_mut.lock();
638
639 // After enqueue a message and the autoEntryMutex has
640 // been released and locked again, the array of
641 // entries can be changed. The ArrayIterator has be
642 // reset with the original _entries
643 entries.reset(_entries);
644 entries[indx]._status = _MonitorEntry::IDLE;
645 }
646 }
647 catch (...)
648 {
649 }
|
650 thilo.boehm 1.115 }
|
651 kumpf 1.116 }
|
652 mday 1.24 }
|
653 mike 1.2 }
654
|
655 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
656 kumpf 1.48 {
657 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
658 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
659 kumpf 1.48 _stopConnections = 1;
|
660 a.arora 1.73 tickle();
|
661 kumpf 1.48
|
662 chuck 1.74 if (wait)
|
663 a.arora 1.73 {
|
664 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
665 // caller of this function may unbind the ports while the monitor
666 // is still accepting connections on them.
|
667 kumpf 1.101 _stopConnectionsSem.wait();
|
668 a.arora 1.73 }
|
669 r.kieninger 1.83
|
670 kumpf 1.48 PEG_METHOD_EXIT();
671 }
|
672 mday 1.25
|
673 mday 1.37
|
674 kumpf 1.116 int Monitor::solicitSocketMessages(
|
675 mike 1.106 SocketHandle socket,
|
676 mike 1.2 Uint32 events,
|
677 r.kieninger 1.83 Uint32 queueId,
|
678 mday 1.8 int type)
|
679 mike 1.2 {
|
680 kumpf 1.116 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
681 AutoMutex autoMut(_entry_mut);
682 // Check to see if we need to dynamically grow the _entries array
683 // We always want the _entries array to 2 bigger than the
684 // current connections requested
685 _solicitSocketCount++; // bump the count
686 int size = (int)_entries.size();
687 if ((int)_solicitSocketCount >= (size-1))
688 {
689 for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)
690 {
691 _MonitorEntry entry(0, 0, 0);
692 _entries.append(entry);
693 }
694 }
|
695 a.arora 1.73
|
696 kumpf 1.116 int index;
697 for (index = 1; index < (int)_entries.size(); index++)
698 {
699 try
700 {
701 if (_entries[index]._status.get() == _MonitorEntry::EMPTY)
702 {
703 _entries[index].socket = socket;
704 _entries[index].queueId = queueId;
705 _entries[index]._type = type;
706 _entries[index]._status = _MonitorEntry::IDLE;
707
708 return index;
709 }
710 }
711 catch (...)
712 {
713 }
714 }
715 // decrease the count, if we are here we didn't do anything meaningful
716 _solicitSocketCount--;
717 kumpf 1.116 PEG_METHOD_EXIT();
718 return -1;
|
719 mike 1.2 }
720
|
721 mike 1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
|
722 mike 1.2 {
|
723 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
724 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
725 a.arora 1.73
726 /*
|
727 kumpf 1.116 Start at index = 1 because _entries[0] is the tickle entry which
728 never needs to be EMPTY;
|
729 a.arora 1.73 */
|
730 w.otsuka 1.89 unsigned int index;
|
731 kumpf 1.116 for (index = 1; index < _entries.size(); index++)
|
732 mike 1.2 {
|
733 kumpf 1.116 if (_entries[index].socket == socket)
734 {
735 _entries[index]._status = _MonitorEntry::EMPTY;
736 _entries[index].socket = PEGASUS_INVALID_SOCKET;
737 _solicitSocketCount--;
738 break;
739 }
|
740 mike 1.2 }
|
741 a.arora 1.73
742 /*
|
743 kumpf 1.116 Dynamic Contraction:
744 To remove excess entries we will start from the end of the _entries
745 array and remove all entries with EMPTY status until we find the
746 first NON EMPTY. This prevents the positions, of the NON EMPTY
747 entries, from being changed.
|
748 r.kieninger 1.83 */
|
749 a.arora 1.73 index = _entries.size() - 1;
|
750 kumpf 1.116 while (_entries[index]._status.get() == _MonitorEntry::EMPTY)
751 {
752 if (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
|
753 a.arora 1.73 _entries.remove(index);
|
754 kumpf 1.116 index--;
|
755 a.arora 1.73 }
|
756 kumpf 1.4 PEG_METHOD_EXIT();
|
757 mike 1.2 }
758
|
759 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
760 kumpf 1.116 ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm)
|
761 mday 1.7 {
|
762 kumpf 1.116 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
763 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
764 kumpf 1.116 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, "
765 "q = %p",
766 dst->_entry_index,
767 dst->_monitor->_entries[dst->_entry_index].queueId,
|
768 marek 1.118 dst));
|
769 kumpf 1.116
770 try
771 {
772 dst->run(1);
773 }
774 catch (...)
775 {
|
776 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
777 kumpf 1.116 "Monitor::_dispatch: exception received");
778 }
|
779 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
780 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));
|
781 kumpf 1.116
782 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==
783 _MonitorEntry::BUSY);
784
785 // Once the HTTPConnection thread has set the status value to either
786 // Monitor::DYING or Monitor::IDLE, it has returned control of the
787 // connection to the Monitor. It is no longer permissible to access the
788 // connection or the entry in the _entries table.
789 if (dst->_connectionClosePending)
790 {
791 dst->_monitor->_entries[dst->_entry_index]._status =
792 _MonitorEntry::DYING;
793 }
794 else
795 {
796 dst->_monitor->_entries[dst->_entry_index]._status =
797 _MonitorEntry::IDLE;
798 }
799 return 0;
|
800 mday 1.40 }
801
|
802 mike 1.2 PEGASUS_NAMESPACE_END
|