1 karl 1.103 //%2006////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mike 1.2 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
15 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
18 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.103 //
|
21 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
24 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
27 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
|
34 mike 1.107 #include "Network.h"
|
35 mike 1.2 #include <Pegasus/Common/Config.h>
36 #include <cstring>
37 #include "Monitor.h"
38 #include "MessageQueue.h"
39 #include "Socket.h"
|
40 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
41 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
42 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
43 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
44 mike 1.100 #include "ArrayIterator.h"
|
45 kumpf 1.122 #include "HostAddress.h"
|
46 mike 1.113 #include <errno.h>
|
47 mike 1.2
48 PEGASUS_USING_STD;
49
50 PEGASUS_NAMESPACE_BEGIN
51
52 ////////////////////////////////////////////////////////////////////////////////
53 //
|
54 kumpf 1.122 // Tickler
55 //
56 ////////////////////////////////////////////////////////////////////////////////
57
58 Tickler::Tickler()
59 : _listenSocket(PEGASUS_INVALID_SOCKET),
60 _clientSocket(PEGASUS_INVALID_SOCKET),
61 _serverSocket(PEGASUS_INVALID_SOCKET)
62 {
|
63 kumpf 1.123 try
64 {
65 _initialize();
66 }
67 catch (...)
68 {
69 _uninitialize();
70 throw;
71 }
|
72 kumpf 1.122 }
73
74 Tickler::~Tickler()
75 {
|
76 kumpf 1.123 _uninitialize();
|
77 kumpf 1.122 }
78
|
79 kumpf 1.127 #if defined(PEGASUS_OS_TYPE_UNIX)
|
80 kumpf 1.123
81 // Use an anonymous pipe for the tickle connection.
82
83 void Tickler::_initialize()
84 {
85 int fds[2];
86
87 if (pipe(fds) == -1)
88 {
89 MessageLoaderParms parms(
90 "Common.Monitor.TICKLE_CREATE",
91 "Received error number $0 while creating the internal socket.",
92 getSocketError());
93 throw Exception(parms);
94 }
95
96 _serverSocket = fds[0];
97 _clientSocket = fds[1];
98 }
99
100 #else
101 kumpf 1.123
102 // Use an external loopback socket connection to allow the tickle socket to
|
103 kumpf 1.127 // be included in the select() array on non-Unix platforms.
|
104 kumpf 1.123
105 void Tickler::_initialize()
|
106 kumpf 1.122 {
107 //
108 // Set up the addresses for the listen, client, and server sockets
109 // based on whether IPv6 is enabled.
110 //
111
|
112 venkat.puvvada 1.124 Socket::initializeInterface();
113
|
114 kumpf 1.123 # ifdef PEGASUS_ENABLE_IPV6
|
115 kumpf 1.122 struct sockaddr_storage listenAddress;
116 struct sockaddr_storage clientAddress;
117 struct sockaddr_storage serverAddress;
|
118 kumpf 1.123 # else
|
119 kumpf 1.122 struct sockaddr_in listenAddress;
120 struct sockaddr_in clientAddress;
121 struct sockaddr_in serverAddress;
|
122 kumpf 1.123 # endif
|
123 kumpf 1.122
124 int addressFamily;
125 SocketLength addressLength;
126
|
127 dave.sudlik 1.125 memset(&listenAddress, 0, sizeof (listenAddress));
128
|
129 kumpf 1.123 # ifdef PEGASUS_ENABLE_IPV6
|
130 kumpf 1.122 if (System::isIPv6StackActive())
131 {
132 // Use the IPv6 loopback address for the listen sockets
133 HostAddress::convertTextToBinary(
134 HostAddress::AT_IPV6,
135 "::1",
136 &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
137 listenAddress.ss_family = AF_INET6;
138 reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
139
140 addressFamily = AF_INET6;
141 addressLength = sizeof(struct sockaddr_in6);
142 }
143 else
|
144 kumpf 1.123 # endif
|
145 kumpf 1.122 {
146 // Use the IPv4 loopback address for the listen sockets
147 HostAddress::convertTextToBinary(
148 HostAddress::AT_IPV4,
149 "127.0.0.1",
150 &reinterpret_cast<struct sockaddr_in*>(
151 &listenAddress)->sin_addr.s_addr);
152 reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
153 AF_INET;
154 reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
155
156 addressFamily = AF_INET;
157 addressLength = sizeof(struct sockaddr_in);
158 }
159
160 // Use the same address for the client socket as the listen socket
161 clientAddress = listenAddress;
162
163 //
164 // Set up a listen socket to allow the tickle client and server to connect
165 //
166 kumpf 1.122
167 // Create the listen socket
168 if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
169 PEGASUS_INVALID_SOCKET)
170 {
171 MessageLoaderParms parms(
172 "Common.Monitor.TICKLE_CREATE",
173 "Received error number $0 while creating the internal socket.",
174 getSocketError());
175 throw Exception(parms);
176 }
177
178 // Bind the listen socket to the loopback address
179 if (::bind(
180 _listenSocket,
181 reinterpret_cast<struct sockaddr*>(&listenAddress),
182 addressLength) < 0)
183 {
184 MessageLoaderParms parms(
185 "Common.Monitor.TICKLE_BIND",
186 "Received error number $0 while binding the internal socket.",
187 kumpf 1.122 getSocketError());
188 throw Exception(parms);
189 }
190
191 // Listen for a connection from the tickle client
192 if ((::listen(_listenSocket, 3)) < 0)
193 {
194 MessageLoaderParms parms(
195 "Common.Monitor.TICKLE_LISTEN",
196 "Received error number $0 while listening to the internal socket.",
197 getSocketError());
198 throw Exception(parms);
199 }
200
201 // Verify we have the correct listen socket
202 SocketLength tmpAddressLength = addressLength;
203 int sock = ::getsockname(
204 _listenSocket,
205 reinterpret_cast<struct sockaddr*>(&listenAddress),
206 &tmpAddressLength);
207 if (sock < 0)
208 kumpf 1.122 {
209 MessageLoaderParms parms(
210 "Common.Monitor.TICKLE_SOCKNAME",
211 "Received error number $0 while getting the internal socket name.",
212 getSocketError());
213 throw Exception(parms);
214 }
215
216 //
217 // Set up the client side of the tickle connection.
218 //
219
220 // Create the client socket
221 if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
222 PEGASUS_INVALID_SOCKET)
223 {
224 MessageLoaderParms parms(
225 "Common.Monitor.TICKLE_CLIENT_CREATE",
226 "Received error number $0 while creating the internal client "
227 "socket.",
228 getSocketError());
229 kumpf 1.122 throw Exception(parms);
230 }
231
232 // Bind the client socket to the loopback address
233 if (::bind(
234 _clientSocket,
235 reinterpret_cast<struct sockaddr*>(&clientAddress),
236 addressLength) < 0)
237 {
238 MessageLoaderParms parms(
239 "Common.Monitor.TICKLE_CLIENT_BIND",
240 "Received error number $0 while binding the internal client "
241 "socket.",
242 getSocketError());
243 throw Exception(parms);
244 }
245
246 // Connect the client socket to the listen socket address
247 if (::connect(
248 _clientSocket,
249 reinterpret_cast<struct sockaddr*>(&listenAddress),
250 kumpf 1.122 addressLength) < 0)
251 {
252 MessageLoaderParms parms(
253 "Common.Monitor.TICKLE_CLIENT_CONNECT",
254 "Received error number $0 while connecting the internal client "
255 "socket.",
256 getSocketError());
257 throw Exception(parms);
258 }
259
260 //
261 // Set up the server side of the tickle connection.
262 //
263
264 tmpAddressLength = addressLength;
265
|
266 kumpf 1.123 // Accept the client socket connection.
267 _serverSocket = ::accept(
268 _listenSocket,
269 reinterpret_cast<struct sockaddr*>(&serverAddress),
270 &tmpAddressLength);
|
271 kumpf 1.122
272 if (_serverSocket == PEGASUS_SOCKET_ERROR)
273 {
274 MessageLoaderParms parms(
275 "Common.Monitor.TICKLE_ACCEPT",
276 "Received error number $0 while accepting the internal socket "
277 "connection.",
278 getSocketError());
279 throw Exception(parms);
280 }
281
282 //
283 // Close the listen socket and make the other sockets non-blocking
284 //
285
286 Socket::close(_listenSocket);
287 _listenSocket = PEGASUS_INVALID_SOCKET;
288
289 Socket::disableBlocking(_serverSocket);
290 Socket::disableBlocking(_clientSocket);
291 }
292 kumpf 1.122
|
293 kumpf 1.123 #endif
294
295 void Tickler::_uninitialize()
|
296 kumpf 1.122 {
297 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
298
299 try
300 {
301 if (_serverSocket != PEGASUS_INVALID_SOCKET)
302 {
303 Socket::close(_serverSocket);
304 _serverSocket = PEGASUS_INVALID_SOCKET;
305 }
306 if (_clientSocket != PEGASUS_INVALID_SOCKET)
307 {
308 Socket::close(_clientSocket);
309 _clientSocket = PEGASUS_INVALID_SOCKET;
310 }
311 if (_listenSocket != PEGASUS_INVALID_SOCKET)
312 {
313 Socket::close(_listenSocket);
314 _listenSocket = PEGASUS_INVALID_SOCKET;
315 }
316 }
317 kumpf 1.122 catch (...)
318 {
319 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
320 "Failed to close tickle sockets");
321 }
|
322 venkat.puvvada 1.124 Socket::uninitializeInterface();
|
323 kumpf 1.122 }
324
325
326 ////////////////////////////////////////////////////////////////////////////////
327 //
|
328 mike 1.2 // Monitor
329 //
330 ////////////////////////////////////////////////////////////////////////////////
331
|
332 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
333 mike 1.2 Monitor::Monitor()
|
334 kumpf 1.87 : _stopConnections(0),
|
335 a.arora 1.73 _stopConnectionsSem(0),
|
336 kumpf 1.122 _solicitSocketCount(0)
|
337 mike 1.2 {
|
338 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
339 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
340 a.arora 1.73
|
341 kumpf 1.123 // Create a MonitorEntry for the Tickler and set its state to IDLE so the
342 // Monitor will watch for its events.
|
343 kumpf 1.126 _entries.append(MonitorEntry(
344 _tickler.getServerSocket(),
345 1,
346 MonitorEntry::STATUS_IDLE,
347 MonitorEntry::TYPE_INTERNAL));
|
348 a.arora 1.73
|
349 kumpf 1.123 // Start the count at 1 because _entries[0] is the Tickler
|
350 kumpf 1.116 for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
|
351 mday 1.37 {
|
352 kumpf 1.126 _entries.append(MonitorEntry());
|
353 mday 1.37 }
|
354 mday 1.18 }
|
355 mday 1.20
|
356 mike 1.2 Monitor::~Monitor()
357 {
|
358 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
359 thilo.boehm 1.115 "returning from monitor destructor");
360 }
|
361 mday 1.18
|
362 kumpf 1.116 void Monitor::tickle()
|
363 a.arora 1.73 {
|
364 kumpf 1.126 Socket::write(_tickler.getClientSocket(), "\0", 1);
|
365 sushma.fernandes 1.78 }
366
|
367 kumpf 1.121 void Monitor::setState(
368 Uint32 index,
|
369 kumpf 1.126 MonitorEntry::Status status)
|
370 sushma.fernandes 1.78 {
|
371 kumpf 1.126 AutoMutex autoEntryMutex(_entriesMutex);
|
372 sushma.fernandes 1.78 // Set the state to requested state
|
373 kumpf 1.126 _entries[index].status = status;
|
374 a.arora 1.73 }
375
|
376 kumpf 1.114 void Monitor::run(Uint32 milliseconds)
|
377 mike 1.2 {
|
378 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
379 a.arora 1.73
|
380 mday 1.25 fd_set fdread;
381 FD_ZERO(&fdread);
|
382 a.arora 1.73
|
383 kumpf 1.126 AutoMutex autoEntryMutex(_entriesMutex);
|
384 r.kieninger 1.83
|
385 kumpf 1.126 ArrayIterator<MonitorEntry> entries(_entries);
|
386 mike 1.100
|
387 kumpf 1.116 // Check the stopConnections flag. If set, clear the Acceptor monitor
388 // entries
|
389 mike 1.96 if (_stopConnections.get() == 1)
|
390 kumpf 1.48 {
|
391 kumpf 1.126 for (Uint32 indx = 0; indx < entries.size(); indx++)
|
392 kumpf 1.48 {
|
393 kumpf 1.126 if (entries[indx].type == MonitorEntry::TYPE_ACCEPTOR)
|
394 kumpf 1.48 {
|
395 kumpf 1.126 if (entries[indx].status != MonitorEntry::STATUS_EMPTY)
|
396 kumpf 1.48 {
|
397 kumpf 1.126 if (entries[indx].status == MonitorEntry::STATUS_IDLE ||
398 entries[indx].status == MonitorEntry::STATUS_DYING)
399 {
400 // remove the entry
401 entries[indx].status = MonitorEntry::STATUS_EMPTY;
402 }
403 else
404 {
405 // set status to DYING
406 entries[indx].status = MonitorEntry::STATUS_DYING;
407 }
408 }
409 }
|
410 kumpf 1.48 }
411 _stopConnections = 0;
|
412 kumpf 1.116 _stopConnectionsSem.signal();
|
413 kumpf 1.48 }
|
414 kumpf 1.51
|
415 kumpf 1.126 for (Uint32 indx = 0; indx < entries.size(); indx++)
|
416 kumpf 1.68 {
|
417 kumpf 1.126 const MonitorEntry& entry = entries[indx];
418
419 if ((entry.status == MonitorEntry::STATUS_DYING) &&
420 (entry.type == MonitorEntry::TYPE_CONNECTION))
|
421 kumpf 1.116 {
422 MessageQueue *q = MessageQueue::lookup(entry.queueId);
423 PEGASUS_ASSERT(q != 0);
424 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
425
426 if (h._connectionClosePending == false)
427 continue;
428
429 // NOTE: do not attempt to delete while there are pending responses
430 // coming thru. The last response to come thru after a
431 // _connectionClosePending will reset _responsePending to false
432 // and then cause the monitor to rerun this code and clean up.
433 // (see HTTPConnection.cpp)
434
435 if (h._responsePending == true)
436 {
|
437 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
438 kumpf 1.116 "Monitor::run - Ignoring connection delete request "
439 "because responses are still pending. "
440 "connection=0x%p, socket=%d\n",
|
441 marek 1.118 (void *)&h, h.getSocket()));
|
442 kumpf 1.116 continue;
443 }
444 h._connectionClosePending = false;
445 MessageQueue &o = h.get_owner();
446 Message* message= new CloseConnectionMessage(entry.socket);
447 message->dest = o.getQueueId();
448
449 // HTTPAcceptor is responsible for closing the connection.
450 // The lock is released to allow HTTPAcceptor to call
451 // unsolicitSocketMessages to free the entry.
452 // Once HTTPAcceptor completes processing of the close
453 // connection, the lock is re-requested and processing of
454 // the for loop continues. This is safe with the current
455 // implementation of the entries object. Note that the
456 // loop condition accesses the entries.size() on each
457 // iteration, so that a change in size while the mutex is
458 // unlocked will not result in an ArrayIndexOutOfBounds
459 // exception.
460
|
461 kumpf 1.126 _entriesMutex.unlock();
|
462 kumpf 1.116 o.enqueue(message);
|
463 kumpf 1.126 _entriesMutex.lock();
|
464 kumpf 1.116
465 // After enqueue a message and the autoEntryMutex has been
466 // released and locked again, the array of _entries can be
467 // changed. The ArrayIterator has be reset with the original
468 // _entries.
469 entries.reset(_entries);
470 }
|
471 kumpf 1.68 }
472
|
473 kumpf 1.51 Uint32 _idleEntries = 0;
|
474 r.kieninger 1.83
|
475 a.arora 1.73 /*
|
476 david.dillard 1.95 We will keep track of the maximum socket number and pass this value
477 to the kernel as a parameter to SELECT. This loop seems like a good
478 place to calculate the max file descriptor (maximum socket number)
479 because we have to traverse the entire array.
|
480 r.kieninger 1.83 */
|
481 mike 1.106 SocketHandle maxSocketCurrentPass = 0;
|
482 kumpf 1.126 for (Uint32 indx = 0; indx < entries.size(); indx++)
|
483 mike 1.2 {
|
484 kumpf 1.126 if (maxSocketCurrentPass < entries[indx].socket)
485 maxSocketCurrentPass = entries[indx].socket;
|
486 a.arora 1.73
|
487 kumpf 1.126 if (entries[indx].status == MonitorEntry::STATUS_IDLE)
488 {
489 _idleEntries++;
490 FD_SET(entries[indx].socket, &fdread);
491 }
|
492 mday 1.13 }
|
493 s.hills 1.62
|
494 a.arora 1.73 /*
|
495 david.dillard 1.95 Add 1 then assign maxSocket accordingly. We add 1 to account for
496 descriptors starting at 0.
|
497 a.arora 1.73 */
498 maxSocketCurrentPass++;
499
|
500 kumpf 1.126 _entriesMutex.unlock();
|
501 david.dillard 1.95
502 //
503 // The first argument to select() is ignored on Windows and it is not
504 // a socket value. The original code assumed that the number of sockets
505 // and a socket value have the same type. On Windows they do not.
506 //
507 #ifdef PEGASUS_OS_TYPE_WINDOWS
508 int events = select(0, &fdread, NULL, NULL, &tv);
509 #else
|
510 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
511 david.dillard 1.95 #endif
|
512 kumpf 1.126 _entriesMutex.lock();
|
513 kumpf 1.116
|
514 dave.sudlik 1.128 struct timeval timeNow;
515 Time::gettimeofday(&timeNow);
516
|
517 kumpf 1.116 // After enqueue a message and the autoEntryMutex has been released and
518 // locked again, the array of _entries can be changed. The ArrayIterator
519 // has be reset with the original _entries
|
520 r.kieninger 1.102 entries.reset(_entries);
|
521 mike 1.106
522 if (events == PEGASUS_SOCKET_ERROR)
|
523 mday 1.13 {
|
524 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
525 "Monitor::run - errorno = %d has occurred on select.", errno));
|
526 kumpf 1.116 // The EBADF error indicates that one or more or the file
527 // descriptions was not valid. This could indicate that
528 // the entries structure has been corrupted or that
529 // we have a synchronization error.
|
530 kumpf 1.50
|
531 kumpf 1.116 PEGASUS_ASSERT(errno != EBADF);
|
532 kumpf 1.50 }
533 else if (events)
534 {
|
535 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
536 kumpf 1.116 "Monitor::run select event received events = %d, monitoring %d "
537 "idle entries",
|
538 marek 1.118 events, _idleEntries));
|
539 kumpf 1.126 for (Uint32 indx = 0; indx < entries.size(); indx++)
|
540 kumpf 1.116 {
541 // The Monitor should only look at entries in the table that are
542 // IDLE (i.e., owned by the Monitor).
|
543 kumpf 1.126 if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
|
544 kumpf 1.116 (FD_ISSET(entries[indx].socket, &fdread)))
545 {
|
546 kumpf 1.126 MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
547 PEGASUS_ASSERT(q != 0);
|
548 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
549 kumpf 1.126 "Monitor::run indx = %d, queueId = %d, q = %p",
|
550 marek 1.118 indx, entries[indx].queueId, q));
|
551 kumpf 1.116
552 try
|
553 david.dillard 1.95 {
|
554 kumpf 1.126 if (entries[indx].type == MonitorEntry::TYPE_CONNECTION)
|
555 kumpf 1.116 {
|
556 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
557 kumpf 1.126 "entries[%d].type is TYPE_CONNECTION",
|
558 marek 1.118 indx));
|
559 kumpf 1.116
|
560 dave.sudlik 1.128 HTTPConnection *dst =
|
561 kumpf 1.116 reinterpret_cast<HTTPConnection *>(q);
|
562 dave.sudlik 1.128 dst->_entry_index = indx;
|
563 kumpf 1.116
|
564 dave.sudlik 1.128 // Update idle start time because we have received some
565 // data. Any data is good data at this point, and we'll
566 // keep the connection alive, even if we've exceeded
567 // the idleConnectionTimeout, which will be checked
568 // when we call closeConnectionOnTimeout() next.
569 Time::gettimeofday(&dst->_idleStartTime);
570
571 // Check for accept pending (ie. SSL handshake pending)
572 // or idle connection timeouts for sockets from which
573 // we received data (avoiding extra queue lookup below).
574 if (!dst->closeConnectionOnTimeout(&timeNow))
|
575 kumpf 1.116 {
|
576 dave.sudlik 1.128 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
577 "Entering HTTPConnection::run() for "
578 "indx = %d, queueId = %d, q = %p",
579 indx, entries[indx].queueId, q));
580
581 try
582 {
583 dst->run(1);
584 }
585 catch (...)
586 {
587 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL2,
588 "Caught exception from "
589 "HTTPConnection::run()");
590 }
591 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
592 "Exited HTTPConnection::run()");
593 }
594
595
|
596 kumpf 1.116 }
|
597 kumpf 1.126 else if (entries[indx].type == MonitorEntry::TYPE_INTERNAL)
|
598 kumpf 1.116 {
|
599 kumpf 1.126 char buffer;
600 Sint32 ignored =
601 Socket::read(entries[indx].socket, &buffer, 1);
|
602 kumpf 1.116 }
603 else
604 {
|
605 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
606 kumpf 1.116 "Non-connection entry, indx = %d, has been "
607 "received.",
|
608 marek 1.118 indx));
|
609 kumpf 1.116 int events = 0;
610 events |= SocketMessage::READ;
611 Message* msg = new SocketMessage(
612 entries[indx].socket, events);
|
613 kumpf 1.126 entries[indx].status = MonitorEntry::STATUS_BUSY;
614 _entriesMutex.unlock();
|
615 kumpf 1.116 q->enqueue(msg);
|
616 kumpf 1.126 _entriesMutex.lock();
|
617 kumpf 1.116
618 // After enqueue a message and the autoEntryMutex has
619 // been released and locked again, the array of
|
620 kumpf 1.126 // entries can be changed. The ArrayIterator has to be
621 // reset with the latest _entries.
|
622 kumpf 1.116 entries.reset(_entries);
|
623 kumpf 1.126 entries[indx].status = MonitorEntry::STATUS_IDLE;
|
624 kumpf 1.116 }
625 }
626 catch (...)
627 {
628 }
|
629 thilo.boehm 1.115 }
|
630 dave.sudlik 1.128 // else check for accept pending (ie. SSL handshake pending) or
631 // idle connection timeouts for sockets from which we did not
632 // receive data.
633 else if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
634 entries[indx].type == MonitorEntry::TYPE_CONNECTION)
635
636 {
637 MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
638 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
639 dst->_entry_index = indx;
640 dst->closeConnectionOnTimeout(&timeNow);
641 }
642 }
643 }
644 // else if "events" is zero (ie. select timed out) then we still need
645 // to check if there are any pending SSL handshakes that have timed out.
646 else
647 {
648 for (Uint32 indx = 0; indx < entries.size(); indx++)
649 {
650 if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
651 dave.sudlik 1.128 entries[indx].type == MonitorEntry::TYPE_CONNECTION)
652 {
653 MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
654 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
655 dst->_entry_index = indx;
656 dst->closeConnectionOnTimeout(&timeNow);
657 }
|
658 kumpf 1.116 }
|
659 mday 1.24 }
|
660 mike 1.2 }
661
|
662 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
663 kumpf 1.48 {
664 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
665 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
666 kumpf 1.48 _stopConnections = 1;
|
667 a.arora 1.73 tickle();
|
668 kumpf 1.48
|
669 chuck 1.74 if (wait)
|
670 a.arora 1.73 {
|
671 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
672 // caller of this function may unbind the ports while the monitor
673 // is still accepting connections on them.
|
674 kumpf 1.101 _stopConnectionsSem.wait();
|
675 a.arora 1.73 }
|
676 r.kieninger 1.83
|
677 kumpf 1.48 PEG_METHOD_EXIT();
678 }
|
679 mday 1.25
|
680 mday 1.37
|
681 kumpf 1.116 int Monitor::solicitSocketMessages(
|
682 mike 1.106 SocketHandle socket,
|
683 mike 1.2 Uint32 events,
|
684 r.kieninger 1.83 Uint32 queueId,
|
685 kumpf 1.126 Uint32 type)
|
686 mike 1.2 {
|
687 kumpf 1.116 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
688 kumpf 1.126 AutoMutex autoMut(_entriesMutex);
689
|
690 kumpf 1.116 // Check to see if we need to dynamically grow the _entries array
|
691 kumpf 1.126 // We always want the _entries array to be 2 bigger than the
|
692 kumpf 1.116 // current connections requested
693 _solicitSocketCount++; // bump the count
|
694 kumpf 1.126
695 for (Uint32 i = _entries.size(); i < _solicitSocketCount + 1; i++)
|
696 kumpf 1.116 {
|
697 kumpf 1.126 _entries.append(MonitorEntry());
|
698 kumpf 1.116 }
|
699 a.arora 1.73
|
700 kumpf 1.126 for (Uint32 index = 1; index < _entries.size(); index++)
|
701 kumpf 1.116 {
702 try
703 {
|
704 kumpf 1.126 if (_entries[index].status == MonitorEntry::STATUS_EMPTY)
|
705 kumpf 1.116 {
706 _entries[index].socket = socket;
707 _entries[index].queueId = queueId;
|
708 kumpf 1.126 _entries[index].type = type;
709 _entries[index].status = MonitorEntry::STATUS_IDLE;
|
710 kumpf 1.116
|
711 kumpf 1.126 return (int)index;
|
712 kumpf 1.116 }
713 }
714 catch (...)
715 {
716 }
717 }
718 // decrease the count, if we are here we didn't do anything meaningful
719 _solicitSocketCount--;
720 PEG_METHOD_EXIT();
721 return -1;
|
722 mike 1.2 }
723
|
724 mike 1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
|
725 mike 1.2 {
|
726 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
727 kumpf 1.126 AutoMutex autoMut(_entriesMutex);
|
728 a.arora 1.73
729 /*
|
730 kumpf 1.116 Start at index = 1 because _entries[0] is the tickle entry which
|
731 kumpf 1.126 never needs to be reset to EMPTY;
|
732 a.arora 1.73 */
|
733 kumpf 1.126 for (Uint32 index = 1; index < _entries.size(); index++)
|
734 mike 1.2 {
|
735 kumpf 1.116 if (_entries[index].socket == socket)
736 {
|
737 kumpf 1.126 _entries[index].reset();
|
738 kumpf 1.116 _solicitSocketCount--;
739 break;
740 }
|
741 mike 1.2 }
|
742 a.arora 1.73
743 /*
|
744 kumpf 1.116 Dynamic Contraction:
745 To remove excess entries we will start from the end of the _entries
746 array and remove all entries with EMPTY status until we find the
747 first NON EMPTY. This prevents the positions, of the NON EMPTY
748 entries, from being changed.
|
749 r.kieninger 1.83 */
|
750 kumpf 1.126 for (Uint32 index = _entries.size() - 1;
751 (_entries[index].status == MonitorEntry::STATUS_EMPTY) &&
752 (index >= MAX_NUMBER_OF_MONITOR_ENTRIES);
753 index--)
|
754 kumpf 1.116 {
|
755 kumpf 1.126 _entries.remove(index);
|
756 a.arora 1.73 }
|
757 kumpf 1.126
|
758 kumpf 1.4 PEG_METHOD_EXIT();
|
759 mike 1.2 }
760
761 PEGASUS_NAMESPACE_END
|