1 karl 1.103 //%2006////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mike 1.2 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
15 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
18 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.103 //
|
21 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
24 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
27 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
|
34 mike 1.107 #include "Network.h"
|
35 mike 1.2 #include <Pegasus/Common/Config.h>
36 #include <cstring>
37 #include "Monitor.h"
38 #include "MessageQueue.h"
39 #include "Socket.h"
|
40 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
41 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
42 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
43 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
44 mike 1.100 #include "ArrayIterator.h"
|
45 kumpf 1.122 #include "HostAddress.h"
|
46 mike 1.2
47 PEGASUS_USING_STD;
48
49 PEGASUS_NAMESPACE_BEGIN
50
51 ////////////////////////////////////////////////////////////////////////////////
52 //
|
53 kumpf 1.122 // Tickler
54 //
55 ////////////////////////////////////////////////////////////////////////////////
56
57 Tickler::Tickler()
58 : _listenSocket(PEGASUS_INVALID_SOCKET),
59 _clientSocket(PEGASUS_INVALID_SOCKET),
60 _serverSocket(PEGASUS_INVALID_SOCKET)
61 {
|
62 kumpf 1.123 try
63 {
64 _initialize();
65 }
66 catch (...)
67 {
68 _uninitialize();
69 throw;
70 }
|
71 kumpf 1.122 }
72
73 Tickler::~Tickler()
74 {
|
75 kumpf 1.123 _uninitialize();
|
76 kumpf 1.122 }
77
|
78 kumpf 1.127 #if defined(PEGASUS_OS_TYPE_UNIX)
|
79 kumpf 1.123
80 // Use an anonymous pipe for the tickle connection.
81
82 void Tickler::_initialize()
83 {
84 int fds[2];
85
86 if (pipe(fds) == -1)
87 {
88 MessageLoaderParms parms(
89 "Common.Monitor.TICKLE_CREATE",
90 "Received error number $0 while creating the internal socket.",
91 getSocketError());
92 throw Exception(parms);
93 }
94
95 _serverSocket = fds[0];
96 _clientSocket = fds[1];
97 }
98
99 #else
100 kumpf 1.123
101 // Use an external loopback socket connection to allow the tickle socket to
|
102 kumpf 1.127 // be included in the select() array on non-Unix platforms.
|
103 kumpf 1.123
104 void Tickler::_initialize()
|
105 kumpf 1.122 {
106 //
107 // Set up the addresses for the listen, client, and server sockets
108 // based on whether IPv6 is enabled.
109 //
110
|
111 venkat.puvvada 1.124 Socket::initializeInterface();
112
|
113 kumpf 1.123 # ifdef PEGASUS_ENABLE_IPV6
|
114 kumpf 1.122 struct sockaddr_storage listenAddress;
115 struct sockaddr_storage clientAddress;
116 struct sockaddr_storage serverAddress;
|
117 kumpf 1.123 # else
|
118 kumpf 1.122 struct sockaddr_in listenAddress;
119 struct sockaddr_in clientAddress;
120 struct sockaddr_in serverAddress;
|
121 kumpf 1.123 # endif
|
122 kumpf 1.122
123 int addressFamily;
124 SocketLength addressLength;
125
|
126 dave.sudlik 1.125 memset(&listenAddress, 0, sizeof (listenAddress));
127
|
128 kumpf 1.123 # ifdef PEGASUS_ENABLE_IPV6
|
129 kumpf 1.122 if (System::isIPv6StackActive())
130 {
131 // Use the IPv6 loopback address for the listen sockets
132 HostAddress::convertTextToBinary(
133 HostAddress::AT_IPV6,
134 "::1",
135 &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
136 listenAddress.ss_family = AF_INET6;
137 reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
138
139 addressFamily = AF_INET6;
140 addressLength = sizeof(struct sockaddr_in6);
141 }
142 else
|
143 kumpf 1.123 # endif
|
144 kumpf 1.122 {
145 // Use the IPv4 loopback address for the listen sockets
146 HostAddress::convertTextToBinary(
147 HostAddress::AT_IPV4,
148 "127.0.0.1",
149 &reinterpret_cast<struct sockaddr_in*>(
150 &listenAddress)->sin_addr.s_addr);
151 reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
152 AF_INET;
153 reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
154
155 addressFamily = AF_INET;
156 addressLength = sizeof(struct sockaddr_in);
157 }
158
159 // Use the same address for the client socket as the listen socket
160 clientAddress = listenAddress;
161
162 //
163 // Set up a listen socket to allow the tickle client and server to connect
164 //
165 kumpf 1.122
166 // Create the listen socket
167 if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
168 PEGASUS_INVALID_SOCKET)
169 {
170 MessageLoaderParms parms(
171 "Common.Monitor.TICKLE_CREATE",
172 "Received error number $0 while creating the internal socket.",
173 getSocketError());
174 throw Exception(parms);
175 }
176
177 // Bind the listen socket to the loopback address
178 if (::bind(
179 _listenSocket,
180 reinterpret_cast<struct sockaddr*>(&listenAddress),
181 addressLength) < 0)
182 {
183 MessageLoaderParms parms(
184 "Common.Monitor.TICKLE_BIND",
185 "Received error number $0 while binding the internal socket.",
186 kumpf 1.122 getSocketError());
187 throw Exception(parms);
188 }
189
190 // Listen for a connection from the tickle client
191 if ((::listen(_listenSocket, 3)) < 0)
192 {
193 MessageLoaderParms parms(
194 "Common.Monitor.TICKLE_LISTEN",
195 "Received error number $0 while listening to the internal socket.",
196 getSocketError());
197 throw Exception(parms);
198 }
199
200 // Verify we have the correct listen socket
201 SocketLength tmpAddressLength = addressLength;
202 int sock = ::getsockname(
203 _listenSocket,
204 reinterpret_cast<struct sockaddr*>(&listenAddress),
205 &tmpAddressLength);
206 if (sock < 0)
207 kumpf 1.122 {
208 MessageLoaderParms parms(
209 "Common.Monitor.TICKLE_SOCKNAME",
210 "Received error number $0 while getting the internal socket name.",
211 getSocketError());
212 throw Exception(parms);
213 }
214
215 //
216 // Set up the client side of the tickle connection.
217 //
218
219 // Create the client socket
220 if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
221 PEGASUS_INVALID_SOCKET)
222 {
223 MessageLoaderParms parms(
224 "Common.Monitor.TICKLE_CLIENT_CREATE",
225 "Received error number $0 while creating the internal client "
226 "socket.",
227 getSocketError());
228 kumpf 1.122 throw Exception(parms);
229 }
230
231 // Bind the client socket to the loopback address
232 if (::bind(
233 _clientSocket,
234 reinterpret_cast<struct sockaddr*>(&clientAddress),
235 addressLength) < 0)
236 {
237 MessageLoaderParms parms(
238 "Common.Monitor.TICKLE_CLIENT_BIND",
239 "Received error number $0 while binding the internal client "
240 "socket.",
241 getSocketError());
242 throw Exception(parms);
243 }
244
245 // Connect the client socket to the listen socket address
246 if (::connect(
247 _clientSocket,
248 reinterpret_cast<struct sockaddr*>(&listenAddress),
249 kumpf 1.122 addressLength) < 0)
250 {
251 MessageLoaderParms parms(
252 "Common.Monitor.TICKLE_CLIENT_CONNECT",
253 "Received error number $0 while connecting the internal client "
254 "socket.",
255 getSocketError());
256 throw Exception(parms);
257 }
258
259 //
260 // Set up the server side of the tickle connection.
261 //
262
263 tmpAddressLength = addressLength;
264
|
265 kumpf 1.123 // Accept the client socket connection.
266 _serverSocket = ::accept(
267 _listenSocket,
268 reinterpret_cast<struct sockaddr*>(&serverAddress),
269 &tmpAddressLength);
|
270 kumpf 1.122
271 if (_serverSocket == PEGASUS_SOCKET_ERROR)
272 {
273 MessageLoaderParms parms(
274 "Common.Monitor.TICKLE_ACCEPT",
275 "Received error number $0 while accepting the internal socket "
276 "connection.",
277 getSocketError());
278 throw Exception(parms);
279 }
280
281 //
282 // Close the listen socket and make the other sockets non-blocking
283 //
284
285 Socket::close(_listenSocket);
286 Socket::disableBlocking(_serverSocket);
287 Socket::disableBlocking(_clientSocket);
288 }
289
|
290 kumpf 1.123 #endif
291
292 void Tickler::_uninitialize()
|
293 kumpf 1.122 {
294 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
295
296 try
297 {
|
298 kumpf 1.129 Socket::close(_serverSocket);
299 Socket::close(_clientSocket);
300 Socket::close(_listenSocket);
|
301 kumpf 1.122 }
302 catch (...)
303 {
304 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
305 "Failed to close tickle sockets");
306 }
|
307 venkat.puvvada 1.124 Socket::uninitializeInterface();
|
308 kumpf 1.122 }
309
310
311 ////////////////////////////////////////////////////////////////////////////////
312 //
|
313 mike 1.2 // Monitor
314 //
315 ////////////////////////////////////////////////////////////////////////////////
316
|
317 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
318 mike 1.2 Monitor::Monitor()
|
319 kumpf 1.87 : _stopConnections(0),
|
320 a.arora 1.73 _stopConnectionsSem(0),
|
321 kumpf 1.122 _solicitSocketCount(0)
|
322 mike 1.2 {
|
323 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
324 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
325 a.arora 1.73
|
326 kumpf 1.123 // Create a MonitorEntry for the Tickler and set its state to IDLE so the
327 // Monitor will watch for its events.
|
328 kumpf 1.126 _entries.append(MonitorEntry(
329 _tickler.getServerSocket(),
330 1,
331 MonitorEntry::STATUS_IDLE,
332 MonitorEntry::TYPE_INTERNAL));
|
333 a.arora 1.73
|
334 kumpf 1.123 // Start the count at 1 because _entries[0] is the Tickler
|
335 kumpf 1.116 for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
|
336 mday 1.37 {
|
337 kumpf 1.126 _entries.append(MonitorEntry());
|
338 mday 1.37 }
|
339 mday 1.18 }
|
340 mday 1.20
|
341 mike 1.2 Monitor::~Monitor()
342 {
|
343 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
344 thilo.boehm 1.115 "returning from monitor destructor");
345 }
|
346 mday 1.18
|
347 kumpf 1.116 void Monitor::tickle()
|
348 a.arora 1.73 {
|
349 kumpf 1.126 Socket::write(_tickler.getClientSocket(), "\0", 1);
|
350 sushma.fernandes 1.78 }
351
|
352 kumpf 1.121 void Monitor::setState(
353 Uint32 index,
|
354 kumpf 1.126 MonitorEntry::Status status)
|
355 sushma.fernandes 1.78 {
|
356 kumpf 1.126 AutoMutex autoEntryMutex(_entriesMutex);
|
357 sushma.fernandes 1.78 // Set the state to requested state
|
358 kumpf 1.126 _entries[index].status = status;
|
359 a.arora 1.73 }
360
|
361 kumpf 1.114 void Monitor::run(Uint32 milliseconds)
|
362 mike 1.2 {
|
363 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
364 a.arora 1.73
|
365 mday 1.25 fd_set fdread;
366 FD_ZERO(&fdread);
|
367 a.arora 1.73
|
368 kumpf 1.126 AutoMutex autoEntryMutex(_entriesMutex);
|
369 r.kieninger 1.83
|
370 kumpf 1.126 ArrayIterator<MonitorEntry> entries(_entries);
|
371 mike 1.100
|
372 kumpf 1.116 // Check the stopConnections flag. If set, clear the Acceptor monitor
373 // entries
|
374 mike 1.96 if (_stopConnections.get() == 1)
|
375 kumpf 1.48 {
|
376 kumpf 1.126 for (Uint32 indx = 0; indx < entries.size(); indx++)
|
377 kumpf 1.48 {
|
378 kumpf 1.126 if (entries[indx].type == MonitorEntry::TYPE_ACCEPTOR)
|
379 kumpf 1.48 {
|
380 kumpf 1.126 if (entries[indx].status != MonitorEntry::STATUS_EMPTY)
|
381 kumpf 1.48 {
|
382 kumpf 1.126 if (entries[indx].status == MonitorEntry::STATUS_IDLE ||
383 entries[indx].status == MonitorEntry::STATUS_DYING)
384 {
385 // remove the entry
386 entries[indx].status = MonitorEntry::STATUS_EMPTY;
387 }
388 else
389 {
390 // set status to DYING
391 entries[indx].status = MonitorEntry::STATUS_DYING;
392 }
393 }
394 }
|
395 kumpf 1.48 }
396 _stopConnections = 0;
|
397 kumpf 1.116 _stopConnectionsSem.signal();
|
398 kumpf 1.48 }
|
399 kumpf 1.51
|
400 kumpf 1.126 for (Uint32 indx = 0; indx < entries.size(); indx++)
|
401 kumpf 1.68 {
|
402 kumpf 1.126 const MonitorEntry& entry = entries[indx];
403
404 if ((entry.status == MonitorEntry::STATUS_DYING) &&
405 (entry.type == MonitorEntry::TYPE_CONNECTION))
|
406 kumpf 1.116 {
407 MessageQueue *q = MessageQueue::lookup(entry.queueId);
408 PEGASUS_ASSERT(q != 0);
409 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
410
411 if (h._connectionClosePending == false)
412 continue;
413
414 // NOTE: do not attempt to delete while there are pending responses
415 // coming thru. The last response to come thru after a
416 // _connectionClosePending will reset _responsePending to false
417 // and then cause the monitor to rerun this code and clean up.
418 // (see HTTPConnection.cpp)
419
420 if (h._responsePending == true)
421 {
|
422 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
423 kumpf 1.116 "Monitor::run - Ignoring connection delete request "
424 "because responses are still pending. "
425 "connection=0x%p, socket=%d\n",
|
426 marek 1.118 (void *)&h, h.getSocket()));
|
427 kumpf 1.116 continue;
428 }
429 h._connectionClosePending = false;
430 MessageQueue &o = h.get_owner();
431 Message* message= new CloseConnectionMessage(entry.socket);
432 message->dest = o.getQueueId();
433
434 // HTTPAcceptor is responsible for closing the connection.
435 // The lock is released to allow HTTPAcceptor to call
436 // unsolicitSocketMessages to free the entry.
437 // Once HTTPAcceptor completes processing of the close
438 // connection, the lock is re-requested and processing of
439 // the for loop continues. This is safe with the current
440 // implementation of the entries object. Note that the
441 // loop condition accesses the entries.size() on each
442 // iteration, so that a change in size while the mutex is
443 // unlocked will not result in an ArrayIndexOutOfBounds
444 // exception.
445
|
446 kumpf 1.126 _entriesMutex.unlock();
|
447 kumpf 1.116 o.enqueue(message);
|
448 kumpf 1.126 _entriesMutex.lock();
|
449 kumpf 1.116
450 // After enqueue a message and the autoEntryMutex has been
451 // released and locked again, the array of _entries can be
452 // changed. The ArrayIterator has be reset with the original
453 // _entries.
454 entries.reset(_entries);
455 }
|
456 kumpf 1.68 }
457
|
458 kumpf 1.51 Uint32 _idleEntries = 0;
|
459 r.kieninger 1.83
|
460 a.arora 1.73 /*
|
461 david.dillard 1.95 We will keep track of the maximum socket number and pass this value
462 to the kernel as a parameter to SELECT. This loop seems like a good
463 place to calculate the max file descriptor (maximum socket number)
464 because we have to traverse the entire array.
|
465 r.kieninger 1.83 */
|
466 mike 1.106 SocketHandle maxSocketCurrentPass = 0;
|
467 kumpf 1.126 for (Uint32 indx = 0; indx < entries.size(); indx++)
|
468 mike 1.2 {
|
469 kumpf 1.126 if (maxSocketCurrentPass < entries[indx].socket)
470 maxSocketCurrentPass = entries[indx].socket;
|
471 a.arora 1.73
|
472 kumpf 1.126 if (entries[indx].status == MonitorEntry::STATUS_IDLE)
473 {
474 _idleEntries++;
475 FD_SET(entries[indx].socket, &fdread);
476 }
|
477 mday 1.13 }
|
478 s.hills 1.62
|
479 a.arora 1.73 /*
|
480 david.dillard 1.95 Add 1 then assign maxSocket accordingly. We add 1 to account for
481 descriptors starting at 0.
|
482 a.arora 1.73 */
483 maxSocketCurrentPass++;
484
|
485 kumpf 1.126 _entriesMutex.unlock();
|
486 david.dillard 1.95
487 //
488 // The first argument to select() is ignored on Windows and it is not
489 // a socket value. The original code assumed that the number of sockets
490 // and a socket value have the same type. On Windows they do not.
491 //
492 #ifdef PEGASUS_OS_TYPE_WINDOWS
493 int events = select(0, &fdread, NULL, NULL, &tv);
494 #else
|
495 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
496 david.dillard 1.95 #endif
|
497 kumpf 1.130 int selectErrno = getSocketError();
498
|
499 kumpf 1.126 _entriesMutex.lock();
|
500 kumpf 1.116
|
501 dave.sudlik 1.128 struct timeval timeNow;
502 Time::gettimeofday(&timeNow);
503
|
504 kumpf 1.116 // After enqueue a message and the autoEntryMutex has been released and
505 // locked again, the array of _entries can be changed. The ArrayIterator
506 // has be reset with the original _entries
|
507 r.kieninger 1.102 entries.reset(_entries);
|
508 mike 1.106
509 if (events == PEGASUS_SOCKET_ERROR)
|
510 mday 1.13 {
|
511 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
512 kumpf 1.130 "Monitor::run - select() returned error %d.", selectErrno));
|
513 kumpf 1.116 // The EBADF error indicates that one or more or the file
514 // descriptions was not valid. This could indicate that
515 // the entries structure has been corrupted or that
516 // we have a synchronization error.
|
517 kumpf 1.50
|
518 kumpf 1.130 PEGASUS_ASSERT(selectErrno != EBADF);
|
519 kumpf 1.50 }
520 else if (events)
521 {
|
522 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
523 kumpf 1.116 "Monitor::run select event received events = %d, monitoring %d "
524 "idle entries",
|
525 marek 1.118 events, _idleEntries));
|
526 kumpf 1.126 for (Uint32 indx = 0; indx < entries.size(); indx++)
|
527 kumpf 1.116 {
528 // The Monitor should only look at entries in the table that are
529 // IDLE (i.e., owned by the Monitor).
|
530 kumpf 1.126 if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
|
531 kumpf 1.116 (FD_ISSET(entries[indx].socket, &fdread)))
532 {
|
533 kumpf 1.126 MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
534 PEGASUS_ASSERT(q != 0);
|
535 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
536 kumpf 1.126 "Monitor::run indx = %d, queueId = %d, q = %p",
|
537 marek 1.118 indx, entries[indx].queueId, q));
|
538 kumpf 1.116
539 try
|
540 david.dillard 1.95 {
|
541 kumpf 1.126 if (entries[indx].type == MonitorEntry::TYPE_CONNECTION)
|
542 kumpf 1.116 {
|
543 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
544 kumpf 1.126 "entries[%d].type is TYPE_CONNECTION",
|
545 marek 1.118 indx));
|
546 kumpf 1.116
|
547 dave.sudlik 1.128 HTTPConnection *dst =
|
548 kumpf 1.116 reinterpret_cast<HTTPConnection *>(q);
|
549 dave.sudlik 1.128 dst->_entry_index = indx;
|
550 kumpf 1.116
|
551 dave.sudlik 1.128 // Update idle start time because we have received some
552 // data. Any data is good data at this point, and we'll
553 // keep the connection alive, even if we've exceeded
554 // the idleConnectionTimeout, which will be checked
555 // when we call closeConnectionOnTimeout() next.
556 Time::gettimeofday(&dst->_idleStartTime);
557
558 // Check for accept pending (ie. SSL handshake pending)
559 // or idle connection timeouts for sockets from which
560 // we received data (avoiding extra queue lookup below).
561 if (!dst->closeConnectionOnTimeout(&timeNow))
|
562 kumpf 1.116 {
|
563 dave.sudlik 1.128 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
564 "Entering HTTPConnection::run() for "
565 "indx = %d, queueId = %d, q = %p",
566 indx, entries[indx].queueId, q));
567
568 try
569 {
570 dst->run(1);
571 }
572 catch (...)
573 {
574 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL2,
575 "Caught exception from "
576 "HTTPConnection::run()");
577 }
578 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
579 "Exited HTTPConnection::run()");
580 }
581
582
|
583 kumpf 1.116 }
|
584 kumpf 1.126 else if (entries[indx].type == MonitorEntry::TYPE_INTERNAL)
|
585 kumpf 1.116 {
|
586 kumpf 1.126 char buffer;
587 Sint32 ignored =
588 Socket::read(entries[indx].socket, &buffer, 1);
|
589 kumpf 1.116 }
590 else
591 {
|
592 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
593 kumpf 1.116 "Non-connection entry, indx = %d, has been "
594 "received.",
|
595 marek 1.118 indx));
|
596 kumpf 1.116 int events = 0;
597 events |= SocketMessage::READ;
598 Message* msg = new SocketMessage(
599 entries[indx].socket, events);
|
600 kumpf 1.126 entries[indx].status = MonitorEntry::STATUS_BUSY;
601 _entriesMutex.unlock();
|
602 kumpf 1.116 q->enqueue(msg);
|
603 kumpf 1.126 _entriesMutex.lock();
|
604 kumpf 1.116
605 // After enqueue a message and the autoEntryMutex has
606 // been released and locked again, the array of
|
607 kumpf 1.126 // entries can be changed. The ArrayIterator has to be
608 // reset with the latest _entries.
|
609 kumpf 1.116 entries.reset(_entries);
|
610 kumpf 1.126 entries[indx].status = MonitorEntry::STATUS_IDLE;
|
611 kumpf 1.116 }
612 }
613 catch (...)
614 {
615 }
|
616 thilo.boehm 1.115 }
|
617 dave.sudlik 1.128 // else check for accept pending (ie. SSL handshake pending) or
618 // idle connection timeouts for sockets from which we did not
619 // receive data.
620 else if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
621 entries[indx].type == MonitorEntry::TYPE_CONNECTION)
622
623 {
624 MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
625 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
626 dst->_entry_index = indx;
627 dst->closeConnectionOnTimeout(&timeNow);
628 }
629 }
630 }
631 // else if "events" is zero (ie. select timed out) then we still need
632 // to check if there are any pending SSL handshakes that have timed out.
633 else
634 {
635 for (Uint32 indx = 0; indx < entries.size(); indx++)
636 {
637 if ((entries[indx].status == MonitorEntry::STATUS_IDLE) &&
638 dave.sudlik 1.128 entries[indx].type == MonitorEntry::TYPE_CONNECTION)
639 {
640 MessageQueue* q = MessageQueue::lookup(entries[indx].queueId);
641 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
642 dst->_entry_index = indx;
643 dst->closeConnectionOnTimeout(&timeNow);
644 }
|
645 kumpf 1.116 }
|
646 mday 1.24 }
|
647 mike 1.2 }
648
|
649 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
650 kumpf 1.48 {
651 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
652 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
653 kumpf 1.48 _stopConnections = 1;
|
654 a.arora 1.73 tickle();
|
655 kumpf 1.48
|
656 chuck 1.74 if (wait)
|
657 a.arora 1.73 {
|
658 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
659 // caller of this function may unbind the ports while the monitor
660 // is still accepting connections on them.
|
661 kumpf 1.101 _stopConnectionsSem.wait();
|
662 a.arora 1.73 }
|
663 r.kieninger 1.83
|
664 kumpf 1.48 PEG_METHOD_EXIT();
665 }
|
666 mday 1.25
|
667 mday 1.37
|
668 kumpf 1.116 int Monitor::solicitSocketMessages(
|
669 mike 1.106 SocketHandle socket,
|
670 mike 1.2 Uint32 events,
|
671 r.kieninger 1.83 Uint32 queueId,
|
672 kumpf 1.126 Uint32 type)
|
673 mike 1.2 {
|
674 kumpf 1.116 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
675 kumpf 1.126 AutoMutex autoMut(_entriesMutex);
676
|
677 kumpf 1.116 // Check to see if we need to dynamically grow the _entries array
|
678 kumpf 1.126 // We always want the _entries array to be 2 bigger than the
|
679 kumpf 1.116 // current connections requested
680 _solicitSocketCount++; // bump the count
|
681 kumpf 1.126
682 for (Uint32 i = _entries.size(); i < _solicitSocketCount + 1; i++)
|
683 kumpf 1.116 {
|
684 kumpf 1.126 _entries.append(MonitorEntry());
|
685 kumpf 1.116 }
|
686 a.arora 1.73
|
687 kumpf 1.126 for (Uint32 index = 1; index < _entries.size(); index++)
|
688 kumpf 1.116 {
689 try
690 {
|
691 kumpf 1.126 if (_entries[index].status == MonitorEntry::STATUS_EMPTY)
|
692 kumpf 1.116 {
693 _entries[index].socket = socket;
694 _entries[index].queueId = queueId;
|
695 kumpf 1.126 _entries[index].type = type;
696 _entries[index].status = MonitorEntry::STATUS_IDLE;
|
697 kumpf 1.116
|
698 kumpf 1.126 return (int)index;
|
699 kumpf 1.116 }
700 }
701 catch (...)
702 {
703 }
704 }
705 // decrease the count, if we are here we didn't do anything meaningful
706 _solicitSocketCount--;
707 PEG_METHOD_EXIT();
708 return -1;
|
709 mike 1.2 }
710
|
711 mike 1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
|
712 mike 1.2 {
|
713 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
714 kumpf 1.126 AutoMutex autoMut(_entriesMutex);
|
715 a.arora 1.73
716 /*
|
717 kumpf 1.116 Start at index = 1 because _entries[0] is the tickle entry which
|
718 kumpf 1.126 never needs to be reset to EMPTY;
|
719 a.arora 1.73 */
|
720 kumpf 1.126 for (Uint32 index = 1; index < _entries.size(); index++)
|
721 mike 1.2 {
|
722 kumpf 1.116 if (_entries[index].socket == socket)
723 {
|
724 kumpf 1.126 _entries[index].reset();
|
725 kumpf 1.116 _solicitSocketCount--;
726 break;
727 }
|
728 mike 1.2 }
|
729 a.arora 1.73
730 /*
|
731 kumpf 1.116 Dynamic Contraction:
732 To remove excess entries we will start from the end of the _entries
733 array and remove all entries with EMPTY status until we find the
734 first NON EMPTY. This prevents the positions, of the NON EMPTY
735 entries, from being changed.
|
736 r.kieninger 1.83 */
|
737 kumpf 1.126 for (Uint32 index = _entries.size() - 1;
738 (_entries[index].status == MonitorEntry::STATUS_EMPTY) &&
739 (index >= MAX_NUMBER_OF_MONITOR_ENTRIES);
740 index--)
|
741 kumpf 1.116 {
|
742 kumpf 1.126 _entries.remove(index);
|
743 a.arora 1.73 }
|
744 kumpf 1.126
|
745 kumpf 1.4 PEG_METHOD_EXIT();
|
746 mike 1.2 }
747
748 PEGASUS_NAMESPACE_END
|