1 karl 1.103 //%2006////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mike 1.2 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
15 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
18 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.103 //
|
21 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
24 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
27 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
|
34 mike 1.107 #include "Network.h"
|
35 mike 1.2 #include <Pegasus/Common/Config.h>
36 #include <cstring>
37 #include "Monitor.h"
38 #include "MessageQueue.h"
39 #include "Socket.h"
|
40 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
41 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
42 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
43 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
44 mike 1.100 #include "ArrayIterator.h"
|
45 kumpf 1.121.4.1 #include "HostAddress.h"
|
46 mike 1.113 #include <errno.h>
|
47 mike 1.2
48 PEGASUS_USING_STD;
49
50 PEGASUS_NAMESPACE_BEGIN
51
52 ////////////////////////////////////////////////////////////////////////////////
53 //
|
54 kumpf 1.121.4.1 // Tickler
|
55 mike 1.2 //
56 ////////////////////////////////////////////////////////////////////////////////
57
|
58 kumpf 1.121.4.1 Tickler::Tickler()
59 : _listenSocket(PEGASUS_INVALID_SOCKET),
60 _clientSocket(PEGASUS_INVALID_SOCKET),
61 _serverSocket(PEGASUS_INVALID_SOCKET)
|
62 mike 1.2 {
|
63 kumpf 1.121.4.1 try
|
64 mday 1.37 {
|
65 kumpf 1.121.4.1 _initialize();
66 }
67 catch (...)
68 {
69 _uninitialize();
70 throw;
|
71 mday 1.37 }
|
72 mday 1.18 }
|
73 mday 1.20
|
74 kumpf 1.121.4.1 Tickler::~Tickler()
|
75 mike 1.2 {
|
76 kumpf 1.121.4.1 _uninitialize();
|
77 thilo.boehm 1.115 }
|
78 kumpf 1.121.4.1
|
79 kumpf 1.121.4.2 #if defined(PEGASUS_OS_TYPE_UNIX)
|
80 kumpf 1.121.4.1
81 // Use an anonymous pipe for the tickle connection.
82
83 void Tickler::_initialize()
|
84 kumpf 1.116 {
|
85 kumpf 1.121.4.1 int fds[2];
|
86 a.dunfey 1.76
|
87 kumpf 1.121.4.1 if (pipe(fds) == -1)
|
88 kumpf 1.116 {
|
89 kumpf 1.121.4.1 MessageLoaderParms parms(
90 "Common.Monitor.TICKLE_CREATE",
91 "Received error number $0 while creating the internal socket.",
92 getSocketError());
93 throw Exception(parms);
|
94 a.dunfey 1.76 }
95
|
96 kumpf 1.121.4.1 _serverSocket = fds[0];
97 _clientSocket = fds[1];
|
98 mday 1.18 }
99
|
100 kumpf 1.121.4.1 #else
101
102 // Use an external loopback socket connection to allow the tickle socket to
|
103 kumpf 1.121.4.2 // be included in the select() array on non-Unix platforms.
|
104 kumpf 1.121.4.1
105 void Tickler::_initialize()
|
106 kumpf 1.116 {
|
107 kumpf 1.121.4.1 //
108 // Set up the addresses for the listen, client, and server sockets
109 // based on whether IPv6 is enabled.
110 //
111
112 Socket::initializeInterface();
113
114 # ifdef PEGASUS_ENABLE_IPV6
115 struct sockaddr_storage listenAddress;
116 struct sockaddr_storage clientAddress;
117 struct sockaddr_storage serverAddress;
118 # else
119 struct sockaddr_in listenAddress;
120 struct sockaddr_in clientAddress;
121 struct sockaddr_in serverAddress;
122 # endif
123
124 int addressFamily;
125 SocketLength addressLength;
126
127 memset(&listenAddress, 0, sizeof (listenAddress));
128 kumpf 1.121.4.1
129 # ifdef PEGASUS_ENABLE_IPV6
130 if (System::isIPv6StackActive())
131 {
132 // Use the IPv6 loopback address for the listen sockets
133 HostAddress::convertTextToBinary(
134 HostAddress::AT_IPV6,
135 "::1",
136 &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
137 listenAddress.ss_family = AF_INET6;
138 reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
139
140 addressFamily = AF_INET6;
141 addressLength = sizeof(struct sockaddr_in6);
142 }
143 else
144 # endif
145 {
146 // Use the IPv4 loopback address for the listen sockets
147 HostAddress::convertTextToBinary(
148 HostAddress::AT_IPV4,
149 kumpf 1.121.4.1 "127.0.0.1",
150 &reinterpret_cast<struct sockaddr_in*>(
151 &listenAddress)->sin_addr.s_addr);
152 reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
153 AF_INET;
154 reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
155
156 addressFamily = AF_INET;
157 addressLength = sizeof(struct sockaddr_in);
158 }
159
160 // Use the same address for the client socket as the listen socket
161 clientAddress = listenAddress;
|
162 a.arora 1.73
|
163 kumpf 1.121.4.1 //
164 // Set up a listen socket to allow the tickle client and server to connect
165 //
166
167 // Create the listen socket
168 if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
|
169 kumpf 1.116 PEGASUS_INVALID_SOCKET)
|
170 kumpf 1.121.4.1 {
171 MessageLoaderParms parms(
172 "Common.Monitor.TICKLE_CREATE",
173 "Received error number $0 while creating the internal socket.",
174 getSocketError());
175 throw Exception(parms);
176 }
177
178 // Bind the listen socket to the loopback address
179 if (::bind(
180 _listenSocket,
181 reinterpret_cast<struct sockaddr*>(&listenAddress),
182 addressLength) < 0)
183 {
184 MessageLoaderParms parms(
185 "Common.Monitor.TICKLE_BIND",
186 "Received error number $0 while binding the internal socket.",
187 getSocketError());
188 throw Exception(parms);
189 }
190
191 kumpf 1.121.4.1 // Listen for a connection from the tickle client
192 if ((::listen(_listenSocket, 3)) < 0)
193 {
194 MessageLoaderParms parms(
195 "Common.Monitor.TICKLE_LISTEN",
196 "Received error number $0 while listening to the internal socket.",
197 getSocketError());
198 throw Exception(parms);
199 }
200
201 // Verify we have the correct listen socket
202 SocketLength tmpAddressLength = addressLength;
203 int sock = ::getsockname(
204 _listenSocket,
205 reinterpret_cast<struct sockaddr*>(&listenAddress),
206 &tmpAddressLength);
207 if (sock < 0)
208 {
209 MessageLoaderParms parms(
210 "Common.Monitor.TICKLE_SOCKNAME",
211 "Received error number $0 while getting the internal socket name.",
212 kumpf 1.121.4.1 getSocketError());
213 throw Exception(parms);
214 }
|
215 a.arora 1.73
|
216 kumpf 1.121.4.1 //
217 // Set up the client side of the tickle connection.
218 //
|
219 ouyang.jian 1.120
|
220 kumpf 1.121.4.1 // Create the client socket
221 if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
222 PEGASUS_INVALID_SOCKET)
223 {
224 MessageLoaderParms parms(
225 "Common.Monitor.TICKLE_CLIENT_CREATE",
226 "Received error number $0 while creating the internal client "
227 "socket.",
228 getSocketError());
229 throw Exception(parms);
230 }
231
232 // Bind the client socket to the loopback address
233 if (::bind(
234 _clientSocket,
235 reinterpret_cast<struct sockaddr*>(&clientAddress),
236 addressLength) < 0)
237 {
238 MessageLoaderParms parms(
239 "Common.Monitor.TICKLE_CLIENT_BIND",
240 "Received error number $0 while binding the internal client "
241 kumpf 1.121.4.1 "socket.",
242 getSocketError());
243 throw Exception(parms);
244 }
245
246 // Connect the client socket to the listen socket address
247 if (::connect(
248 _clientSocket,
249 reinterpret_cast<struct sockaddr*>(&listenAddress),
250 addressLength) < 0)
251 {
252 MessageLoaderParms parms(
253 "Common.Monitor.TICKLE_CLIENT_CONNECT",
254 "Received error number $0 while connecting the internal client "
255 "socket.",
256 getSocketError());
257 throw Exception(parms);
258 }
|
259 thilo.boehm 1.115
|
260 kumpf 1.121.4.1 //
261 // Set up the server side of the tickle connection.
262 //
|
263 thilo.boehm 1.115
|
264 kumpf 1.121.4.1 tmpAddressLength = addressLength;
|
265 a.arora 1.73
|
266 kumpf 1.121.4.1 // Accept the client socket connection.
267 _serverSocket = ::accept(
268 _listenSocket,
269 reinterpret_cast<struct sockaddr*>(&serverAddress),
270 &tmpAddressLength);
271
272 if (_serverSocket == PEGASUS_SOCKET_ERROR)
273 {
274 MessageLoaderParms parms(
275 "Common.Monitor.TICKLE_ACCEPT",
276 "Received error number $0 while accepting the internal socket "
277 "connection.",
278 getSocketError());
279 throw Exception(parms);
280 }
|
281 r.kieninger 1.83
|
282 kumpf 1.121.4.1 //
283 // Close the listen socket and make the other sockets non-blocking
284 //
|
285 a.arora 1.73
|
286 kumpf 1.121.4.1 Socket::close(_listenSocket);
287 _listenSocket = PEGASUS_INVALID_SOCKET;
|
288 ouyang.jian 1.120
|
289 kumpf 1.121.4.1 Socket::disableBlocking(_serverSocket);
290 Socket::disableBlocking(_clientSocket);
291 }
|
292 thilo.boehm 1.115
|
293 kumpf 1.121.4.1 #endif
294
295 void Tickler::_uninitialize()
296 {
297 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
298 thilo.boehm 1.115
|
299 kumpf 1.121.4.1 try
300 {
301 if (_serverSocket != PEGASUS_INVALID_SOCKET)
|
302 thilo.boehm 1.115 {
|
303 kumpf 1.121.4.1 Socket::close(_serverSocket);
304 _serverSocket = PEGASUS_INVALID_SOCKET;
|
305 thilo.boehm 1.115 }
|
306 kumpf 1.121.4.1 if (_clientSocket != PEGASUS_INVALID_SOCKET)
|
307 thilo.boehm 1.115 {
|
308 kumpf 1.121.4.1 Socket::close(_clientSocket);
309 _clientSocket = PEGASUS_INVALID_SOCKET;
|
310 kumpf 1.116 }
|
311 kumpf 1.121.4.1 if (_listenSocket != PEGASUS_INVALID_SOCKET)
|
312 thilo.boehm 1.115 {
|
313 kumpf 1.121.4.1 Socket::close(_listenSocket);
314 _listenSocket = PEGASUS_INVALID_SOCKET;
|
315 a.arora 1.73 }
|
316 kumpf 1.121.4.1 }
317 catch (...)
318 {
319 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
320 "Failed to close tickle sockets");
321 }
322 Socket::uninitializeInterface();
323 }
|
324 marek 1.111
325
|
326 kumpf 1.121.4.1 ////////////////////////////////////////////////////////////////////////////////
327 //
328 // Monitor
329 //
330 ////////////////////////////////////////////////////////////////////////////////
331
332 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
333 Monitor::Monitor()
334 : _stopConnections(0),
335 _stopConnectionsSem(0),
336 _solicitSocketCount(0)
337 {
338 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
339 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
340
341 // Create a MonitorEntry for the Tickler and set its state to IDLE so the
342 // Monitor will watch for its events.
343 _MonitorEntry entry(_tickler.getServerSocket(), 1, INTERNAL);
|
344 a.arora 1.73 entry._status = _MonitorEntry::IDLE;
|
345 kumpf 1.121.4.1 _entries.append(entry);
|
346 thilo.boehm 1.115
|
347 kumpf 1.121.4.1 // Start the count at 1 because _entries[0] is the Tickler
348 for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
|
349 thilo.boehm 1.115 {
|
350 kumpf 1.121.4.1 _MonitorEntry entry(0, 0, 0);
|
351 thilo.boehm 1.115 _entries.append(entry);
352 }
|
353 a.arora 1.73 }
354
|
355 kumpf 1.121.4.1 Monitor::~Monitor()
|
356 a.arora 1.73 {
|
357 kumpf 1.121.4.1 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
358 "returning from monitor destructor");
359 }
|
360 r.kieninger 1.83
|
361 kumpf 1.121.4.1 void Monitor::tickle()
362 {
363 AutoMutex autoMutex(_tickleMutex);
364 Socket::write(_tickler.getClientSocket(), "\0\0", 2);
|
365 sushma.fernandes 1.78 }
366
|
367 kumpf 1.121 void Monitor::setState(
368 Uint32 index,
369 _MonitorEntry::entry_status status)
|
370 sushma.fernandes 1.78 {
|
371 kumpf 1.121 AutoMutex autoEntryMutex(_entry_mut);
|
372 sushma.fernandes 1.78 // Set the state to requested state
373 _entries[index]._status = status;
|
374 a.arora 1.73 }
375
|
376 kumpf 1.114 void Monitor::run(Uint32 milliseconds)
|
377 mike 1.2 {
|
378 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
379 a.arora 1.73
|
380 mday 1.25 fd_set fdread;
381 FD_ZERO(&fdread);
|
382 a.arora 1.73
|
383 kumpf 1.94 AutoMutex autoEntryMutex(_entry_mut);
|
384 r.kieninger 1.83
|
385 mike 1.100 ArrayIterator<_MonitorEntry> entries(_entries);
386
|
387 kumpf 1.116 // Check the stopConnections flag. If set, clear the Acceptor monitor
388 // entries
|
389 mike 1.96 if (_stopConnections.get() == 1)
|
390 kumpf 1.48 {
|
391 mike 1.100 for ( int indx = 0; indx < (int)entries.size(); indx++)
|
392 kumpf 1.48 {
|
393 mike 1.100 if (entries[indx]._type == Monitor::ACCEPTOR)
|
394 kumpf 1.48 {
|
395 mike 1.100 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
|
396 kumpf 1.48 {
|
397 mike 1.100 if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
398 entries[indx]._status.get() == _MonitorEntry::DYING )
|
399 kumpf 1.48 {
400 // remove the entry
|
401 kumpf 1.116 entries[indx]._status = _MonitorEntry::EMPTY;
|
402 kumpf 1.48 }
403 else
404 {
405 // set status to DYING
|
406 mike 1.100 entries[indx]._status = _MonitorEntry::DYING;
|
407 kumpf 1.48 }
408 }
409 }
410 }
411 _stopConnections = 0;
|
412 kumpf 1.116 _stopConnectionsSem.signal();
|
413 kumpf 1.48 }
|
414 kumpf 1.51
|
415 kumpf 1.116 for (int indx = 0; indx < (int)entries.size(); indx++)
|
416 kumpf 1.68 {
|
417 kumpf 1.116 const _MonitorEntry &entry = entries[indx];
418 if ((entry._status.get() == _MonitorEntry::DYING) &&
419 (entry._type == Monitor::CONNECTION))
420 {
421 MessageQueue *q = MessageQueue::lookup(entry.queueId);
422 PEGASUS_ASSERT(q != 0);
423 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
424
425 if (h._connectionClosePending == false)
426 continue;
427
428 // NOTE: do not attempt to delete while there are pending responses
429 // coming thru. The last response to come thru after a
430 // _connectionClosePending will reset _responsePending to false
431 // and then cause the monitor to rerun this code and clean up.
432 // (see HTTPConnection.cpp)
433
434 if (h._responsePending == true)
435 {
|
436 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
437 kumpf 1.116 "Monitor::run - Ignoring connection delete request "
438 "because responses are still pending. "
439 "connection=0x%p, socket=%d\n",
|
440 marek 1.118 (void *)&h, h.getSocket()));
|
441 kumpf 1.116 continue;
442 }
443 h._connectionClosePending = false;
444 MessageQueue &o = h.get_owner();
445 Message* message= new CloseConnectionMessage(entry.socket);
446 message->dest = o.getQueueId();
447
448 // HTTPAcceptor is responsible for closing the connection.
449 // The lock is released to allow HTTPAcceptor to call
450 // unsolicitSocketMessages to free the entry.
451 // Once HTTPAcceptor completes processing of the close
452 // connection, the lock is re-requested and processing of
453 // the for loop continues. This is safe with the current
454 // implementation of the entries object. Note that the
455 // loop condition accesses the entries.size() on each
456 // iteration, so that a change in size while the mutex is
457 // unlocked will not result in an ArrayIndexOutOfBounds
458 // exception.
459
460 _entry_mut.unlock();
461 o.enqueue(message);
462 kumpf 1.116 _entry_mut.lock();
463
464 // After enqueue a message and the autoEntryMutex has been
465 // released and locked again, the array of _entries can be
466 // changed. The ArrayIterator has be reset with the original
467 // _entries.
468 entries.reset(_entries);
469 }
|
470 kumpf 1.68 }
471
|
472 kumpf 1.51 Uint32 _idleEntries = 0;
|
473 r.kieninger 1.83
|
474 a.arora 1.73 /*
|
475 david.dillard 1.95 We will keep track of the maximum socket number and pass this value
476 to the kernel as a parameter to SELECT. This loop seems like a good
477 place to calculate the max file descriptor (maximum socket number)
478 because we have to traverse the entire array.
|
479 r.kieninger 1.83 */
|
480 mike 1.106 SocketHandle maxSocketCurrentPass = 0;
|
481 kumpf 1.116 for (int indx = 0; indx < (int)entries.size(); indx++)
|
482 mike 1.2 {
|
483 kumpf 1.116 if (maxSocketCurrentPass < entries[indx].socket)
484 maxSocketCurrentPass = entries[indx].socket;
|
485 a.arora 1.73
|
486 kumpf 1.116 if (entries[indx]._status.get() == _MonitorEntry::IDLE)
|
487 mday 1.25 {
|
488 david.dillard 1.95 _idleEntries++;
|
489 mike 1.100 FD_SET(entries[indx].socket, &fdread);
|
490 mday 1.25 }
|
491 mday 1.13 }
|
492 s.hills 1.62
|
493 a.arora 1.73 /*
|
494 david.dillard 1.95 Add 1 then assign maxSocket accordingly. We add 1 to account for
495 descriptors starting at 0.
|
496 a.arora 1.73 */
497 maxSocketCurrentPass++;
498
|
499 mike 1.112 _entry_mut.unlock();
|
500 david.dillard 1.95
501 //
502 // The first argument to select() is ignored on Windows and it is not
503 // a socket value. The original code assumed that the number of sockets
504 // and a socket value have the same type. On Windows they do not.
505 //
506 #ifdef PEGASUS_OS_TYPE_WINDOWS
507 int events = select(0, &fdread, NULL, NULL, &tv);
508 #else
|
509 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
510 david.dillard 1.95 #endif
|
511 mike 1.112 _entry_mut.lock();
|
512 kumpf 1.116
513 // After enqueue a message and the autoEntryMutex has been released and
514 // locked again, the array of _entries can be changed. The ArrayIterator
515 // has be reset with the original _entries
|
516 r.kieninger 1.102 entries.reset(_entries);
|
517 mike 1.106
518 if (events == PEGASUS_SOCKET_ERROR)
|
519 mday 1.13 {
|
520 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
521 "Monitor::run - errorno = %d has occurred on select.", errno));
|
522 kumpf 1.116 // The EBADF error indicates that one or more or the file
523 // descriptions was not valid. This could indicate that
524 // the entries structure has been corrupted or that
525 // we have a synchronization error.
|
526 kumpf 1.50
|
527 kumpf 1.116 PEGASUS_ASSERT(errno != EBADF);
|
528 kumpf 1.50 }
529 else if (events)
530 {
|
531 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
532 kumpf 1.116 "Monitor::run select event received events = %d, monitoring %d "
533 "idle entries",
|
534 marek 1.118 events, _idleEntries));
|
535 kumpf 1.116 for (int indx = 0; indx < (int)entries.size(); indx++)
536 {
537 // The Monitor should only look at entries in the table that are
538 // IDLE (i.e., owned by the Monitor).
539 if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
540 (FD_ISSET(entries[indx].socket, &fdread)))
541 {
542 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
|
543 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
544 kumpf 1.116 "Monitor::run indx = %d, queueId = %d, q = %p",
|
545 marek 1.118 indx, entries[indx].queueId, q));
|
546 kumpf 1.116 PEGASUS_ASSERT(q !=0);
547
548 try
|
549 david.dillard 1.95 {
|
550 kumpf 1.116 if (entries[indx]._type == Monitor::CONNECTION)
551 {
|
552 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
553 kumpf 1.116 "entries[indx].type for indx = %d is "
554 "Monitor::CONNECTION",
|
555 marek 1.118 indx));
|
556 kumpf 1.116 static_cast<HTTPConnection *>(q)->_entry_index = indx;
557
558 // Do not update the entry just yet. The entry gets
559 // updated once the request has been read.
560 //entries[indx]._status = _MonitorEntry::BUSY;
|
561 sushma.fernandes 1.78
|
562 kumpf 1.116 // If allocate_and_awaken failure, retry on next
563 // iteration
|
564 a.arora 1.73 /* Removed for PEP 183.
|
565 kumpf 1.116 if (!MessageQueueService::get_thread_pool()->
566 allocate_and_awaken((void *)q, _dispatch))
567 {
|
568 kumpf 1.119 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA,
569 Tracer::LEVEL2,
|
570 kumpf 1.116 "Monitor::run: Insufficient resources to "
571 "process request.");
572 entries[indx]._status = _MonitorEntry::IDLE;
573 return true;
574 }
|
575 a.arora 1.73 */
576 // Added for PEP 183
|
577 kumpf 1.116 HTTPConnection *dst =
578 reinterpret_cast<HTTPConnection *>(q);
|
579 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
580 kumpf 1.116 "Monitor::_dispatch: entering run() for "
581 "indx = %d, queueId = %d, q = %p",
582 dst->_entry_index,
583 dst->_monitor->_entries[dst->_entry_index].queueId,
|
584 marek 1.118 dst));
|
585 kumpf 1.116
586 try
587 {
588 dst->run(1);
589 }
590 catch (...)
591 {
|
592 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
593 kumpf 1.116 "Monitor::_dispatch: exception received");
594 }
|
595 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
596 kumpf 1.116 "Monitor::_dispatch: exited run() for index %d",
|
597 marek 1.118 dst->_entry_index));
|
598 kumpf 1.116
599 // It is possible the entry status may not be set to
600 // busy. The following will fail in that case.
601 // PEGASUS_ASSERT(dst->_monitor->_entries[
602 // dst->_entry_index]._status.get() ==
603 // _MonitorEntry::BUSY);
604 // Once the HTTPConnection thread has set the status
605 // value to either Monitor::DYING or Monitor::IDLE,
606 // it has returned control of the connection to the
607 // Monitor. It is no longer permissible to access
608 // the connection or the entry in the _entries table.
609
610 // The following is not relevant as the worker thread
611 // or the reader thread will update the status of the
612 // entry.
613 //if (dst->_connectionClosePending)
614 //{
615 // dst->_monitor->_entries[dst->_entry_index]._status =
616 // _MonitorEntry::DYING;
617 //}
618 //else
619 kumpf 1.116 //{
620 // dst->_monitor->_entries[dst->_entry_index]._status =
621 // _MonitorEntry::IDLE;
622 //}
|
623 r.kieninger 1.83 // end Added for PEP 183
|
624 kumpf 1.116 }
625 else if (entries[indx]._type == Monitor::INTERNAL)
626 {
627 // set ourself to BUSY,
|
628 r.kieninger 1.83 // read the data
|
629 a.arora 1.73 // and set ourself back to IDLE
|
630 r.kieninger 1.83
|
631 kumpf 1.116 entries[indx]._status = _MonitorEntry::BUSY;
632 static char buffer[2];
633 Sint32 amt =
634 Socket::read(entries[indx].socket,&buffer, 2);
635
|
636 kumpf 1.121.4.1 entries[indx]._status = _MonitorEntry::IDLE;
|
637 kumpf 1.116 }
638 else
639 {
|
640 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
641 kumpf 1.116 "Non-connection entry, indx = %d, has been "
642 "received.",
|
643 marek 1.118 indx));
|
644 kumpf 1.116 int events = 0;
645 events |= SocketMessage::READ;
646 Message* msg = new SocketMessage(
647 entries[indx].socket, events);
648 entries[indx]._status = _MonitorEntry::BUSY;
649 _entry_mut.unlock();
650 q->enqueue(msg);
651 _entry_mut.lock();
652
653 // After enqueue a message and the autoEntryMutex has
654 // been released and locked again, the array of
655 // entries can be changed. The ArrayIterator has be
656 // reset with the original _entries
657 entries.reset(_entries);
658 entries[indx]._status = _MonitorEntry::IDLE;
659 }
660 }
661 catch (...)
662 {
663 }
|
664 thilo.boehm 1.115 }
|
665 kumpf 1.116 }
|
666 mday 1.24 }
|
667 mike 1.2 }
668
|
669 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
670 kumpf 1.48 {
671 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
672 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
673 kumpf 1.48 _stopConnections = 1;
|
674 a.arora 1.73 tickle();
|
675 kumpf 1.48
|
676 chuck 1.74 if (wait)
|
677 a.arora 1.73 {
|
678 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
679 // caller of this function may unbind the ports while the monitor
680 // is still accepting connections on them.
|
681 kumpf 1.101 _stopConnectionsSem.wait();
|
682 a.arora 1.73 }
|
683 r.kieninger 1.83
|
684 kumpf 1.48 PEG_METHOD_EXIT();
685 }
|
686 mday 1.25
|
687 mday 1.37
|
688 kumpf 1.116 int Monitor::solicitSocketMessages(
|
689 mike 1.106 SocketHandle socket,
|
690 mike 1.2 Uint32 events,
|
691 r.kieninger 1.83 Uint32 queueId,
|
692 mday 1.8 int type)
|
693 mike 1.2 {
|
694 kumpf 1.116 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
695 AutoMutex autoMut(_entry_mut);
696 // Check to see if we need to dynamically grow the _entries array
697 // We always want the _entries array to 2 bigger than the
698 // current connections requested
699 _solicitSocketCount++; // bump the count
700 int size = (int)_entries.size();
701 if ((int)_solicitSocketCount >= (size-1))
702 {
703 for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)
704 {
705 _MonitorEntry entry(0, 0, 0);
706 _entries.append(entry);
707 }
708 }
|
709 a.arora 1.73
|
710 kumpf 1.116 int index;
711 for (index = 1; index < (int)_entries.size(); index++)
712 {
713 try
714 {
715 if (_entries[index]._status.get() == _MonitorEntry::EMPTY)
716 {
717 _entries[index].socket = socket;
718 _entries[index].queueId = queueId;
719 _entries[index]._type = type;
720 _entries[index]._status = _MonitorEntry::IDLE;
721
722 return index;
723 }
724 }
725 catch (...)
726 {
727 }
728 }
729 // decrease the count, if we are here we didn't do anything meaningful
730 _solicitSocketCount--;
731 kumpf 1.116 PEG_METHOD_EXIT();
732 return -1;
|
733 mike 1.2 }
734
|
735 mike 1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
|
736 mike 1.2 {
|
737 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
738 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
739 a.arora 1.73
740 /*
|
741 kumpf 1.116 Start at index = 1 because _entries[0] is the tickle entry which
742 never needs to be EMPTY;
|
743 a.arora 1.73 */
|
744 w.otsuka 1.89 unsigned int index;
|
745 kumpf 1.116 for (index = 1; index < _entries.size(); index++)
|
746 mike 1.2 {
|
747 kumpf 1.116 if (_entries[index].socket == socket)
748 {
749 _entries[index]._status = _MonitorEntry::EMPTY;
750 _entries[index].socket = PEGASUS_INVALID_SOCKET;
751 _solicitSocketCount--;
752 break;
753 }
|
754 mike 1.2 }
|
755 a.arora 1.73
756 /*
|
757 kumpf 1.116 Dynamic Contraction:
758 To remove excess entries we will start from the end of the _entries
759 array and remove all entries with EMPTY status until we find the
760 first NON EMPTY. This prevents the positions, of the NON EMPTY
761 entries, from being changed.
|
762 r.kieninger 1.83 */
|
763 a.arora 1.73 index = _entries.size() - 1;
|
764 kumpf 1.116 while (_entries[index]._status.get() == _MonitorEntry::EMPTY)
765 {
766 if (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
|
767 a.arora 1.73 _entries.remove(index);
|
768 kumpf 1.116 index--;
|
769 a.arora 1.73 }
|
770 kumpf 1.4 PEG_METHOD_EXIT();
|
771 mike 1.2 }
772
|
773 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
774 kumpf 1.116 ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm)
|
775 mday 1.7 {
|
776 kumpf 1.116 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
777 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
778 kumpf 1.116 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, "
779 "q = %p",
780 dst->_entry_index,
781 dst->_monitor->_entries[dst->_entry_index].queueId,
|
782 marek 1.118 dst));
|
783 kumpf 1.116
784 try
785 {
786 dst->run(1);
787 }
788 catch (...)
789 {
|
790 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
791 kumpf 1.116 "Monitor::_dispatch: exception received");
792 }
|
793 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
794 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));
|
795 kumpf 1.116
796 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==
797 _MonitorEntry::BUSY);
798
799 // Once the HTTPConnection thread has set the status value to either
800 // Monitor::DYING or Monitor::IDLE, it has returned control of the
801 // connection to the Monitor. It is no longer permissible to access the
802 // connection or the entry in the _entries table.
803 if (dst->_connectionClosePending)
804 {
805 dst->_monitor->_entries[dst->_entry_index]._status =
806 _MonitorEntry::DYING;
807 }
808 else
809 {
810 dst->_monitor->_entries[dst->_entry_index]._status =
811 _MonitorEntry::IDLE;
812 }
813 return 0;
|
814 mday 1.40 }
815
|
816 mike 1.2 PEGASUS_NAMESPACE_END
|