1 karl 1.103 //%2006////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mike 1.2 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
15 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
18 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.103 //
|
21 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
24 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
27 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
|
34 mike 1.107 #include "Network.h"
|
35 mike 1.2 #include <Pegasus/Common/Config.h>
36 #include <cstring>
37 #include "Monitor.h"
38 #include "MessageQueue.h"
39 #include "Socket.h"
|
40 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
41 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
42 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
43 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
44 mike 1.100 #include "ArrayIterator.h"
|
45 kumpf 1.122 #include "HostAddress.h"
|
46 mike 1.113 #include <errno.h>
|
47 mike 1.2
48 PEGASUS_USING_STD;
49
50 PEGASUS_NAMESPACE_BEGIN
51
|
52 mike 1.96 static AtomicInt _connections(0);
|
53 mday 1.25
|
54 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
55 //
|
56 kumpf 1.122 // Tickler
57 //
58 ////////////////////////////////////////////////////////////////////////////////
59
60 Tickler::Tickler()
61 : _listenSocket(PEGASUS_INVALID_SOCKET),
62 _clientSocket(PEGASUS_INVALID_SOCKET),
63 _serverSocket(PEGASUS_INVALID_SOCKET)
64 {
65 }
66
67 Tickler::~Tickler()
68 {
69 uninitialize();
70 }
71
72 Boolean Tickler::initialize()
73 {
74 //
75 // Set up the addresses for the listen, client, and server sockets
76 // based on whether IPv6 is enabled.
77 kumpf 1.122 //
78
79 #ifdef PEGASUS_ENABLE_IPV6
80 struct sockaddr_storage listenAddress;
81 struct sockaddr_storage clientAddress;
82 struct sockaddr_storage serverAddress;
83 #else
84 struct sockaddr_in listenAddress;
85 struct sockaddr_in clientAddress;
86 struct sockaddr_in serverAddress;
87 #endif
88
89 int addressFamily;
90 SocketLength addressLength;
91
92 #ifdef PEGASUS_ENABLE_IPV6
93 if (System::isIPv6StackActive())
94 {
95 // Use the IPv6 loopback address for the listen sockets
96 HostAddress::convertTextToBinary(
97 HostAddress::AT_IPV6,
98 kumpf 1.122 "::1",
99 &reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_addr);
100 listenAddress.ss_family = AF_INET6;
101 reinterpret_cast<struct sockaddr_in6*>(&listenAddress)->sin6_port = 0;
102
103 addressFamily = AF_INET6;
104 addressLength = sizeof(struct sockaddr_in6);
105 }
106 else
107 #endif
108 {
109 // Use the IPv4 loopback address for the listen sockets
110 HostAddress::convertTextToBinary(
111 HostAddress::AT_IPV4,
112 "127.0.0.1",
113 &reinterpret_cast<struct sockaddr_in*>(
114 &listenAddress)->sin_addr.s_addr);
115 reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_family =
116 AF_INET;
117 reinterpret_cast<struct sockaddr_in*>(&listenAddress)->sin_port = 0;
118
119 kumpf 1.122 addressFamily = AF_INET;
120 addressLength = sizeof(struct sockaddr_in);
121 }
122
123 // Use the same address for the client socket as the listen socket
124 clientAddress = listenAddress;
125
126 //
127 // Set up a listen socket to allow the tickle client and server to connect
128 //
129
130 // Create the listen socket
131 if ((_listenSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
132 PEGASUS_INVALID_SOCKET)
133 {
134 MessageLoaderParms parms(
135 "Common.Monitor.TICKLE_CREATE",
136 "Received error number $0 while creating the internal socket.",
137 getSocketError());
138 throw Exception(parms);
139 }
140 kumpf 1.122
141 // Bind the listen socket to the loopback address
142 if (::bind(
143 _listenSocket,
144 reinterpret_cast<struct sockaddr*>(&listenAddress),
145 addressLength) < 0)
146 {
147 #ifdef PEGASUS_OS_ZOS
148 MessageLoaderParms parms(
149 "Common.Monitor.TICKLE_BIND_LONG",
150 "Received error:$0 while binding the internal socket.",
151 strerror(errno));
152 #else
153 MessageLoaderParms parms(
154 "Common.Monitor.TICKLE_BIND",
155 "Received error number $0 while binding the internal socket.",
156 getSocketError());
157 #endif
158 throw Exception(parms);
159 }
160
161 kumpf 1.122 // Listen for a connection from the tickle client
162 if ((::listen(_listenSocket, 3)) < 0)
163 {
164 MessageLoaderParms parms(
165 "Common.Monitor.TICKLE_LISTEN",
166 "Received error number $0 while listening to the internal socket.",
167 getSocketError());
168 throw Exception(parms);
169 }
170
171 // Verify we have the correct listen socket
172 SocketLength tmpAddressLength = addressLength;
173 int sock = ::getsockname(
174 _listenSocket,
175 reinterpret_cast<struct sockaddr*>(&listenAddress),
176 &tmpAddressLength);
177 if (sock < 0)
178 {
179 MessageLoaderParms parms(
180 "Common.Monitor.TICKLE_SOCKNAME",
181 "Received error number $0 while getting the internal socket name.",
182 kumpf 1.122 getSocketError());
183 throw Exception(parms);
184 }
185
186 //
187 // Set up the client side of the tickle connection.
188 //
189
190 // Create the client socket
191 if ((_clientSocket = Socket::createSocket(addressFamily, SOCK_STREAM, 0)) ==
192 PEGASUS_INVALID_SOCKET)
193 {
194 MessageLoaderParms parms(
195 "Common.Monitor.TICKLE_CLIENT_CREATE",
196 "Received error number $0 while creating the internal client "
197 "socket.",
198 getSocketError());
199 throw Exception(parms);
200 }
201
202 // Bind the client socket to the loopback address
203 kumpf 1.122 if (::bind(
204 _clientSocket,
205 reinterpret_cast<struct sockaddr*>(&clientAddress),
206 addressLength) < 0)
207 {
208 MessageLoaderParms parms(
209 "Common.Monitor.TICKLE_CLIENT_BIND",
210 "Received error number $0 while binding the internal client "
211 "socket.",
212 getSocketError());
213 throw Exception(parms);
214 }
215
216 // Connect the client socket to the listen socket address
217 if (::connect(
218 _clientSocket,
219 reinterpret_cast<struct sockaddr*>(&listenAddress),
220 addressLength) < 0)
221 {
222 MessageLoaderParms parms(
223 "Common.Monitor.TICKLE_CLIENT_CONNECT",
224 kumpf 1.122 "Received error number $0 while connecting the internal client "
225 "socket.",
226 getSocketError());
227 throw Exception(parms);
228 }
229
230 //
231 // Set up the server side of the tickle connection.
232 //
233
234 tmpAddressLength = addressLength;
235
236 // Accept the client socket connection. The accept call may fail with
237 // EAGAIN. Try a max of 20 times to establish this connection.
238 unsigned int retries = 0;
239 do
240 {
241 _serverSocket = ::accept(
242 _listenSocket,
243 reinterpret_cast<struct sockaddr*>(&serverAddress),
244 &tmpAddressLength);
245 kumpf 1.122
246 if ((_serverSocket != PEGASUS_SOCKET_ERROR) ||
247 (getSocketError() != PEGASUS_NETWORK_TRYAGAIN))
248 {
249 break;
250 }
251
252 Threads::sleep(1);
253 retries++;
254 } while (retries <= 20);
255
256 if (_serverSocket == PEGASUS_SOCKET_ERROR)
257 {
258 if (getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
259 {
260 // TCP/IP is down
261 uninitialize();
262 return false;
263 }
264
265 MessageLoaderParms parms(
266 kumpf 1.122 "Common.Monitor.TICKLE_ACCEPT",
267 "Received error number $0 while accepting the internal socket "
268 "connection.",
269 getSocketError());
270 throw Exception(parms);
271 }
272
273 //
274 // Close the listen socket and make the other sockets non-blocking
275 //
276
277 Socket::close(_listenSocket);
278 _listenSocket = PEGASUS_INVALID_SOCKET;
279
280 Socket::disableBlocking(_serverSocket);
281 Socket::disableBlocking(_clientSocket);
282 return true;
283 }
284
285 void Tickler::uninitialize()
286 {
287 kumpf 1.122 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
288
289 try
290 {
291 if (_serverSocket != PEGASUS_INVALID_SOCKET)
292 {
293 Socket::close(_serverSocket);
294 _serverSocket = PEGASUS_INVALID_SOCKET;
295 }
296 if (_clientSocket != PEGASUS_INVALID_SOCKET)
297 {
298 Socket::close(_clientSocket);
299 _clientSocket = PEGASUS_INVALID_SOCKET;
300 }
301 if (_listenSocket != PEGASUS_INVALID_SOCKET)
302 {
303 Socket::close(_listenSocket);
304 _listenSocket = PEGASUS_INVALID_SOCKET;
305 }
306 }
307 catch (...)
308 kumpf 1.122 {
309 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
310 "Failed to close tickle sockets");
311 }
312 }
313
314
315 ////////////////////////////////////////////////////////////////////////////////
316 //
|
317 mike 1.2 // Monitor
318 //
319 ////////////////////////////////////////////////////////////////////////////////
320
|
321 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
322 mike 1.2 Monitor::Monitor()
|
323 kumpf 1.87 : _stopConnections(0),
|
324 a.arora 1.73 _stopConnectionsSem(0),
|
325 kumpf 1.122 _solicitSocketCount(0)
|
326 mike 1.2 {
|
327 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
328 mike 1.2 Socket::initializeInterface();
|
329 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
330 a.arora 1.73
331 // setup the tickler
332 initializeTickler();
333
|
334 r.kieninger 1.83 // Start the count at 1 because initilizeTickler()
|
335 a.arora 1.73 // has added an entry in the first position of the
336 // _entries array
|
337 kumpf 1.116 for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
|
338 mday 1.37 {
339 _MonitorEntry entry(0, 0, 0);
340 _entries.append(entry);
341 }
|
342 mday 1.18 }
|
343 mday 1.20
|
344 mike 1.2 Monitor::~Monitor()
345 {
|
346 kumpf 1.122 _tickler.uninitialize();
|
347 thilo.boehm 1.115 Socket::uninitializeInterface();
|
348 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
349 thilo.boehm 1.115 "returning from monitor destructor");
350 }
|
351 mday 1.18
|
352 kumpf 1.116 void Monitor::initializeTickler()
353 {
|
354 kumpf 1.122 while (!_tickler.initialize())
|
355 thilo.boehm 1.115 {
|
356 kumpf 1.122 // Retry until TCP/IP is started
357 }
|
358 thilo.boehm 1.115
|
359 kumpf 1.122 // Create a MonitorEntry for the tickler and set its state to IDLE so the
360 // Monitor will watch for its events.
361 _MonitorEntry entry(_tickler.getServerSocket(), 1, INTERNAL);
|
362 a.arora 1.73 entry._status = _MonitorEntry::IDLE;
|
363 thilo.boehm 1.115
|
364 kumpf 1.122 if (_entries.size() == 0)
|
365 thilo.boehm 1.115 {
|
366 kumpf 1.122 // The tickler has not been initialized before; add its entry at the
367 // beginning of the list.
368 _entries.append(entry);
|
369 thilo.boehm 1.115 }
370 else
371 {
|
372 kumpf 1.122 // Overwrite the existing tickler entry.
373 _entries[0] = entry;
|
374 thilo.boehm 1.115 }
|
375 a.arora 1.73 }
376
|
377 kumpf 1.116 void Monitor::tickle()
|
378 a.arora 1.73 {
|
379 kumpf 1.122 AutoMutex autoMutex(_tickleMutex);
380 Socket::write(_tickler.getClientSocket(), "\0\0", 2);
|
381 sushma.fernandes 1.78 }
382
|
383 kumpf 1.121 void Monitor::setState(
384 Uint32 index,
385 _MonitorEntry::entry_status status)
|
386 sushma.fernandes 1.78 {
|
387 kumpf 1.121 AutoMutex autoEntryMutex(_entry_mut);
|
388 sushma.fernandes 1.78 // Set the state to requested state
389 _entries[index]._status = status;
|
390 a.arora 1.73 }
391
|
392 kumpf 1.114 void Monitor::run(Uint32 milliseconds)
|
393 mike 1.2 {
|
394 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
395 a.arora 1.73
|
396 mday 1.25 fd_set fdread;
397 FD_ZERO(&fdread);
|
398 a.arora 1.73
|
399 kumpf 1.94 AutoMutex autoEntryMutex(_entry_mut);
|
400 r.kieninger 1.83
|
401 mike 1.100 ArrayIterator<_MonitorEntry> entries(_entries);
402
|
403 kumpf 1.116 // Check the stopConnections flag. If set, clear the Acceptor monitor
404 // entries
|
405 mike 1.96 if (_stopConnections.get() == 1)
|
406 kumpf 1.48 {
|
407 mike 1.100 for ( int indx = 0; indx < (int)entries.size(); indx++)
|
408 kumpf 1.48 {
|
409 mike 1.100 if (entries[indx]._type == Monitor::ACCEPTOR)
|
410 kumpf 1.48 {
|
411 mike 1.100 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
|
412 kumpf 1.48 {
|
413 mike 1.100 if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
414 entries[indx]._status.get() == _MonitorEntry::DYING )
|
415 kumpf 1.48 {
416 // remove the entry
|
417 kumpf 1.116 entries[indx]._status = _MonitorEntry::EMPTY;
|
418 kumpf 1.48 }
419 else
420 {
421 // set status to DYING
|
422 mike 1.100 entries[indx]._status = _MonitorEntry::DYING;
|
423 kumpf 1.48 }
424 }
425 }
426 }
427 _stopConnections = 0;
|
428 kumpf 1.116 _stopConnectionsSem.signal();
|
429 kumpf 1.48 }
|
430 kumpf 1.51
|
431 kumpf 1.116 for (int indx = 0; indx < (int)entries.size(); indx++)
|
432 kumpf 1.68 {
|
433 kumpf 1.116 const _MonitorEntry &entry = entries[indx];
434 if ((entry._status.get() == _MonitorEntry::DYING) &&
435 (entry._type == Monitor::CONNECTION))
436 {
437 MessageQueue *q = MessageQueue::lookup(entry.queueId);
438 PEGASUS_ASSERT(q != 0);
439 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
440
441 if (h._connectionClosePending == false)
442 continue;
443
444 // NOTE: do not attempt to delete while there are pending responses
445 // coming thru. The last response to come thru after a
446 // _connectionClosePending will reset _responsePending to false
447 // and then cause the monitor to rerun this code and clean up.
448 // (see HTTPConnection.cpp)
449
450 if (h._responsePending == true)
451 {
|
452 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
453 kumpf 1.116 "Monitor::run - Ignoring connection delete request "
454 "because responses are still pending. "
455 "connection=0x%p, socket=%d\n",
|
456 marek 1.118 (void *)&h, h.getSocket()));
|
457 kumpf 1.116 continue;
458 }
459 h._connectionClosePending = false;
460 MessageQueue &o = h.get_owner();
461 Message* message= new CloseConnectionMessage(entry.socket);
462 message->dest = o.getQueueId();
463
464 // HTTPAcceptor is responsible for closing the connection.
465 // The lock is released to allow HTTPAcceptor to call
466 // unsolicitSocketMessages to free the entry.
467 // Once HTTPAcceptor completes processing of the close
468 // connection, the lock is re-requested and processing of
469 // the for loop continues. This is safe with the current
470 // implementation of the entries object. Note that the
471 // loop condition accesses the entries.size() on each
472 // iteration, so that a change in size while the mutex is
473 // unlocked will not result in an ArrayIndexOutOfBounds
474 // exception.
475
476 _entry_mut.unlock();
477 o.enqueue(message);
478 kumpf 1.116 _entry_mut.lock();
479
480 // After enqueue a message and the autoEntryMutex has been
481 // released and locked again, the array of _entries can be
482 // changed. The ArrayIterator has be reset with the original
483 // _entries.
484 entries.reset(_entries);
485 }
|
486 kumpf 1.68 }
487
|
488 kumpf 1.51 Uint32 _idleEntries = 0;
|
489 r.kieninger 1.83
|
490 a.arora 1.73 /*
|
491 david.dillard 1.95 We will keep track of the maximum socket number and pass this value
492 to the kernel as a parameter to SELECT. This loop seems like a good
493 place to calculate the max file descriptor (maximum socket number)
494 because we have to traverse the entire array.
|
495 r.kieninger 1.83 */
|
496 mike 1.106 SocketHandle maxSocketCurrentPass = 0;
|
497 kumpf 1.116 for (int indx = 0; indx < (int)entries.size(); indx++)
|
498 mike 1.2 {
|
499 kumpf 1.116 if (maxSocketCurrentPass < entries[indx].socket)
500 maxSocketCurrentPass = entries[indx].socket;
|
501 a.arora 1.73
|
502 kumpf 1.116 if (entries[indx]._status.get() == _MonitorEntry::IDLE)
|
503 mday 1.25 {
|
504 david.dillard 1.95 _idleEntries++;
|
505 mike 1.100 FD_SET(entries[indx].socket, &fdread);
|
506 mday 1.25 }
|
507 mday 1.13 }
|
508 s.hills 1.62
|
509 a.arora 1.73 /*
|
510 david.dillard 1.95 Add 1 then assign maxSocket accordingly. We add 1 to account for
511 descriptors starting at 0.
|
512 a.arora 1.73 */
513 maxSocketCurrentPass++;
514
|
515 mike 1.112 _entry_mut.unlock();
|
516 david.dillard 1.95
517 //
518 // The first argument to select() is ignored on Windows and it is not
519 // a socket value. The original code assumed that the number of sockets
520 // and a socket value have the same type. On Windows they do not.
521 //
522 #ifdef PEGASUS_OS_TYPE_WINDOWS
523 int events = select(0, &fdread, NULL, NULL, &tv);
524 #else
|
525 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
526 david.dillard 1.95 #endif
|
527 mike 1.112 _entry_mut.lock();
|
528 kumpf 1.116
529 // After enqueue a message and the autoEntryMutex has been released and
530 // locked again, the array of _entries can be changed. The ArrayIterator
531 // has be reset with the original _entries
|
532 r.kieninger 1.102 entries.reset(_entries);
|
533 mike 1.106
534 if (events == PEGASUS_SOCKET_ERROR)
|
535 mday 1.13 {
|
536 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
537 "Monitor::run - errorno = %d has occurred on select.", errno));
|
538 kumpf 1.116 // The EBADF error indicates that one or more or the file
539 // descriptions was not valid. This could indicate that
540 // the entries structure has been corrupted or that
541 // we have a synchronization error.
|
542 kumpf 1.50
|
543 kumpf 1.116 PEGASUS_ASSERT(errno != EBADF);
|
544 kumpf 1.50 }
545 else if (events)
546 {
|
547 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
548 kumpf 1.116 "Monitor::run select event received events = %d, monitoring %d "
549 "idle entries",
|
550 marek 1.118 events, _idleEntries));
|
551 kumpf 1.116 for (int indx = 0; indx < (int)entries.size(); indx++)
552 {
553 // The Monitor should only look at entries in the table that are
554 // IDLE (i.e., owned by the Monitor).
555 if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
556 (FD_ISSET(entries[indx].socket, &fdread)))
557 {
558 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
|
559 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
560 kumpf 1.116 "Monitor::run indx = %d, queueId = %d, q = %p",
|
561 marek 1.118 indx, entries[indx].queueId, q));
|
562 kumpf 1.116 PEGASUS_ASSERT(q !=0);
563
564 try
|
565 david.dillard 1.95 {
|
566 kumpf 1.116 if (entries[indx]._type == Monitor::CONNECTION)
567 {
|
568 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
569 kumpf 1.116 "entries[indx].type for indx = %d is "
570 "Monitor::CONNECTION",
|
571 marek 1.118 indx));
|
572 kumpf 1.116 static_cast<HTTPConnection *>(q)->_entry_index = indx;
573
574 // Do not update the entry just yet. The entry gets
575 // updated once the request has been read.
576 //entries[indx]._status = _MonitorEntry::BUSY;
|
577 sushma.fernandes 1.78
|
578 kumpf 1.116 // If allocate_and_awaken failure, retry on next
579 // iteration
|
580 a.arora 1.73 /* Removed for PEP 183.
|
581 kumpf 1.116 if (!MessageQueueService::get_thread_pool()->
582 allocate_and_awaken((void *)q, _dispatch))
583 {
|
584 kumpf 1.119 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA,
585 Tracer::LEVEL2,
|
586 kumpf 1.116 "Monitor::run: Insufficient resources to "
587 "process request.");
588 entries[indx]._status = _MonitorEntry::IDLE;
589 return true;
590 }
|
591 a.arora 1.73 */
592 // Added for PEP 183
|
593 kumpf 1.116 HTTPConnection *dst =
594 reinterpret_cast<HTTPConnection *>(q);
|
595 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
596 kumpf 1.116 "Monitor::_dispatch: entering run() for "
597 "indx = %d, queueId = %d, q = %p",
598 dst->_entry_index,
599 dst->_monitor->_entries[dst->_entry_index].queueId,
|
600 marek 1.118 dst));
|
601 kumpf 1.116
602 try
603 {
604 dst->run(1);
605 }
606 catch (...)
607 {
|
608 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
609 kumpf 1.116 "Monitor::_dispatch: exception received");
610 }
|
611 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
612 kumpf 1.116 "Monitor::_dispatch: exited run() for index %d",
|
613 marek 1.118 dst->_entry_index));
|
614 kumpf 1.116
615 // It is possible the entry status may not be set to
616 // busy. The following will fail in that case.
617 // PEGASUS_ASSERT(dst->_monitor->_entries[
618 // dst->_entry_index]._status.get() ==
619 // _MonitorEntry::BUSY);
620 // Once the HTTPConnection thread has set the status
621 // value to either Monitor::DYING or Monitor::IDLE,
622 // it has returned control of the connection to the
623 // Monitor. It is no longer permissible to access
624 // the connection or the entry in the _entries table.
625
626 // The following is not relevant as the worker thread
627 // or the reader thread will update the status of the
628 // entry.
629 //if (dst->_connectionClosePending)
630 //{
631 // dst->_monitor->_entries[dst->_entry_index]._status =
632 // _MonitorEntry::DYING;
633 //}
634 //else
635 kumpf 1.116 //{
636 // dst->_monitor->_entries[dst->_entry_index]._status =
637 // _MonitorEntry::IDLE;
638 //}
|
639 r.kieninger 1.83 // end Added for PEP 183
|
640 kumpf 1.116 }
641 else if (entries[indx]._type == Monitor::INTERNAL)
642 {
643 // set ourself to BUSY,
|
644 r.kieninger 1.83 // read the data
|
645 a.arora 1.73 // and set ourself back to IDLE
|
646 r.kieninger 1.83
|
647 kumpf 1.116 entries[indx]._status = _MonitorEntry::BUSY;
648 static char buffer[2];
649 Sint32 amt =
650 Socket::read(entries[indx].socket,&buffer, 2);
651
652 if (amt == PEGASUS_SOCKET_ERROR &&
653 getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
654 {
|
655 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
656 kumpf 1.116 "Monitor::run: Tickler socket got an IO error. "
657 "Going to re-create Socket and wait for "
658 "TCP/IP restart.");
|
659 kumpf 1.122 _tickler.uninitialize();
|
660 kumpf 1.116 initializeTickler();
661 }
662 else
663 {
664 entries[indx]._status = _MonitorEntry::IDLE;
665 }
666 }
667 else
668 {
|
669 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
670 kumpf 1.116 "Non-connection entry, indx = %d, has been "
671 "received.",
|
672 marek 1.118 indx));
|
673 kumpf 1.116 int events = 0;
674 events |= SocketMessage::READ;
675 Message* msg = new SocketMessage(
676 entries[indx].socket, events);
677 entries[indx]._status = _MonitorEntry::BUSY;
678 _entry_mut.unlock();
679 q->enqueue(msg);
680 _entry_mut.lock();
681
682 // After enqueue a message and the autoEntryMutex has
683 // been released and locked again, the array of
684 // entries can be changed. The ArrayIterator has be
685 // reset with the original _entries
686 entries.reset(_entries);
687 entries[indx]._status = _MonitorEntry::IDLE;
688 }
689 }
690 catch (...)
691 {
692 }
|
693 thilo.boehm 1.115 }
|
694 kumpf 1.116 }
|
695 mday 1.24 }
|
696 mike 1.2 }
697
|
698 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
699 kumpf 1.48 {
700 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
701 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
702 kumpf 1.48 _stopConnections = 1;
|
703 a.arora 1.73 tickle();
|
704 kumpf 1.48
|
705 chuck 1.74 if (wait)
|
706 a.arora 1.73 {
|
707 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
708 // caller of this function may unbind the ports while the monitor
709 // is still accepting connections on them.
|
710 kumpf 1.101 _stopConnectionsSem.wait();
|
711 a.arora 1.73 }
|
712 r.kieninger 1.83
|
713 kumpf 1.48 PEG_METHOD_EXIT();
714 }
|
715 mday 1.25
|
716 mday 1.37
|
717 kumpf 1.116 int Monitor::solicitSocketMessages(
|
718 mike 1.106 SocketHandle socket,
|
719 mike 1.2 Uint32 events,
|
720 r.kieninger 1.83 Uint32 queueId,
|
721 mday 1.8 int type)
|
722 mike 1.2 {
|
723 kumpf 1.116 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
724 AutoMutex autoMut(_entry_mut);
725 // Check to see if we need to dynamically grow the _entries array
726 // We always want the _entries array to 2 bigger than the
727 // current connections requested
728 _solicitSocketCount++; // bump the count
729 int size = (int)_entries.size();
730 if ((int)_solicitSocketCount >= (size-1))
731 {
732 for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)
733 {
734 _MonitorEntry entry(0, 0, 0);
735 _entries.append(entry);
736 }
737 }
|
738 a.arora 1.73
|
739 kumpf 1.116 int index;
740 for (index = 1; index < (int)_entries.size(); index++)
741 {
742 try
743 {
744 if (_entries[index]._status.get() == _MonitorEntry::EMPTY)
745 {
746 _entries[index].socket = socket;
747 _entries[index].queueId = queueId;
748 _entries[index]._type = type;
749 _entries[index]._status = _MonitorEntry::IDLE;
750
751 return index;
752 }
753 }
754 catch (...)
755 {
756 }
757 }
758 // decrease the count, if we are here we didn't do anything meaningful
759 _solicitSocketCount--;
760 kumpf 1.116 PEG_METHOD_EXIT();
761 return -1;
|
762 mike 1.2 }
763
|
764 mike 1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
|
765 mike 1.2 {
|
766 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
767 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
768 a.arora 1.73
769 /*
|
770 kumpf 1.116 Start at index = 1 because _entries[0] is the tickle entry which
771 never needs to be EMPTY;
|
772 a.arora 1.73 */
|
773 w.otsuka 1.89 unsigned int index;
|
774 kumpf 1.116 for (index = 1; index < _entries.size(); index++)
|
775 mike 1.2 {
|
776 kumpf 1.116 if (_entries[index].socket == socket)
777 {
778 _entries[index]._status = _MonitorEntry::EMPTY;
779 _entries[index].socket = PEGASUS_INVALID_SOCKET;
780 _solicitSocketCount--;
781 break;
782 }
|
783 mike 1.2 }
|
784 a.arora 1.73
785 /*
|
786 kumpf 1.116 Dynamic Contraction:
787 To remove excess entries we will start from the end of the _entries
788 array and remove all entries with EMPTY status until we find the
789 first NON EMPTY. This prevents the positions, of the NON EMPTY
790 entries, from being changed.
|
791 r.kieninger 1.83 */
|
792 a.arora 1.73 index = _entries.size() - 1;
|
793 kumpf 1.116 while (_entries[index]._status.get() == _MonitorEntry::EMPTY)
794 {
795 if (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
|
796 a.arora 1.73 _entries.remove(index);
|
797 kumpf 1.116 index--;
|
798 a.arora 1.73 }
|
799 kumpf 1.4 PEG_METHOD_EXIT();
|
800 mike 1.2 }
801
|
802 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
803 kumpf 1.116 ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm)
|
804 mday 1.7 {
|
805 kumpf 1.116 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
806 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
807 kumpf 1.116 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, "
808 "q = %p",
809 dst->_entry_index,
810 dst->_monitor->_entries[dst->_entry_index].queueId,
|
811 marek 1.118 dst));
|
812 kumpf 1.116
813 try
814 {
815 dst->run(1);
816 }
817 catch (...)
818 {
|
819 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
820 kumpf 1.116 "Monitor::_dispatch: exception received");
821 }
|
822 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
823 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));
|
824 kumpf 1.116
825 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==
826 _MonitorEntry::BUSY);
827
828 // Once the HTTPConnection thread has set the status value to either
829 // Monitor::DYING or Monitor::IDLE, it has returned control of the
830 // connection to the Monitor. It is no longer permissible to access the
831 // connection or the entry in the _entries table.
832 if (dst->_connectionClosePending)
833 {
834 dst->_monitor->_entries[dst->_entry_index]._status =
835 _MonitorEntry::DYING;
836 }
837 else
838 {
839 dst->_monitor->_entries[dst->_entry_index]._status =
840 _MonitorEntry::IDLE;
841 }
842 return 0;
|
843 mday 1.40 }
844
|
845 mike 1.2 PEGASUS_NAMESPACE_END
|