1 karl 1.103 //%2006////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mike 1.2 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
15 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
18 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.103 //
|
21 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
24 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
27 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
|
34 mike 1.107 #include "Network.h"
|
35 mike 1.2 #include <Pegasus/Common/Config.h>
36 #include <cstring>
37 #include "Monitor.h"
38 #include "MessageQueue.h"
39 #include "Socket.h"
|
40 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
41 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
42 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
43 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
44 mike 1.100 #include "ArrayIterator.h"
|
45 mike 1.113 #include <errno.h>
|
46 mike 1.2
47 PEGASUS_USING_STD;
48
49 PEGASUS_NAMESPACE_BEGIN
50
|
51 mike 1.96 static AtomicInt _connections(0);
|
52 mday 1.25
|
53 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
54 //
55 // Monitor
56 //
57 ////////////////////////////////////////////////////////////////////////////////
58
|
59 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
60 mike 1.2 Monitor::Monitor()
|
61 kumpf 1.87 : _stopConnections(0),
|
62 a.arora 1.73 _stopConnectionsSem(0),
|
63 a.dunfey 1.76 _solicitSocketCount(0),
|
64 a.dunfey 1.77 _tickle_client_socket(-1),
65 _tickle_server_socket(-1),
66 _tickle_peer_socket(-1)
|
67 mike 1.2 {
|
68 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
69 mike 1.2 Socket::initializeInterface();
|
70 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
71 a.arora 1.73
72 // setup the tickler
73 initializeTickler();
74
|
75 r.kieninger 1.83 // Start the count at 1 because initilizeTickler()
|
76 a.arora 1.73 // has added an entry in the first position of the
77 // _entries array
|
78 kumpf 1.116 for (int i = 1; i < numberOfMonitorEntriesToAllocate; i++)
|
79 mday 1.37 {
80 _MonitorEntry entry(0, 0, 0);
81 _entries.append(entry);
82 }
|
83 mday 1.18 }
|
84 mday 1.20
|
85 mike 1.2 Monitor::~Monitor()
86 {
|
87 thilo.boehm 1.115 uninitializeTickler();
88 Socket::uninitializeInterface();
|
89 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
90 thilo.boehm 1.115 "returning from monitor destructor");
91 }
|
92 kumpf 1.116 void Monitor::uninitializeTickler()
93 {
|
94 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
95 a.dunfey 1.76
|
96 kumpf 1.116 try
97 {
98 if (_tickle_peer_socket >= 0)
|
99 a.dunfey 1.76 {
100 Socket::close(_tickle_peer_socket);
101 }
|
102 kumpf 1.116 if (_tickle_client_socket >= 0)
|
103 a.dunfey 1.76 {
104 Socket::close(_tickle_client_socket);
105 }
|
106 kumpf 1.116 if (_tickle_server_socket >= 0)
|
107 a.dunfey 1.76 {
108 Socket::close(_tickle_server_socket);
109 }
110 }
|
111 kumpf 1.116 catch (...)
|
112 a.dunfey 1.76 {
|
113 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
114 a.dunfey 1.76 "Failed to close tickle sockets");
115 }
116
|
117 mday 1.18 }
118
|
119 kumpf 1.116 void Monitor::initializeTickler()
120 {
|
121 r.kieninger 1.83 /*
122 NOTE: On any errors trying to
123 setup out tickle connection,
|
124 a.arora 1.73 throw an exception/end the server
125 */
126
127 /* setup the tickle server/listener */
|
128 kumpf 1.116 // try until the tcpip is restarted
|
129 thilo.boehm 1.115 do
130 {
131 // get a socket for the server side
|
132 kumpf 1.116 if ((_tickle_server_socket =
133 Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==
134 PEGASUS_INVALID_SOCKET)
135 {
136 MessageLoaderParms parms(
137 "Common.Monitor.TICKLE_CREATE",
138 "Received error number $0 while creating the internal socket.",
139 getSocketError());
|
140 thilo.boehm 1.115 throw Exception(parms);
141 }
|
142 a.arora 1.73
|
143 thilo.boehm 1.115 // initialize the address
144 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
|
145 ouyang.jian 1.120
|
146 thilo.boehm 1.115 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
147 _tickle_server_addr.sin_family = PF_INET;
148 _tickle_server_addr.sin_port = 0;
|
149 a.arora 1.73
|
150 thilo.boehm 1.115 SocketLength _addr_size = sizeof(_tickle_server_addr);
|
151 a.arora 1.73
|
152 thilo.boehm 1.115 // bind server side to socket
|
153 kumpf 1.116 if ((::bind(_tickle_server_socket,
154 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
155 sizeof(_tickle_server_addr))) < 0)
|
156 thilo.boehm 1.115 {
|
157 r.kieninger 1.83 #ifdef PEGASUS_OS_ZOS
|
158 kumpf 1.116 MessageLoaderParms parms(
159 "Common.Monitor.TICKLE_BIND_LONG",
160 "Received error:$0 while binding the internal socket.",
161 strerror(errno));
|
162 r.kieninger 1.83 #else
|
163 kumpf 1.116 MessageLoaderParms parms(
164 "Common.Monitor.TICKLE_BIND",
165 "Received error number $0 while binding the internal socket.",
166 getSocketError());
|
167 r.kieninger 1.83 #endif
|
168 thilo.boehm 1.115 throw Exception(parms);
169 }
170
171 // tell the kernel we are a server
|
172 kumpf 1.116 if ((::listen(_tickle_server_socket, 3)) < 0)
|
173 thilo.boehm 1.115 {
|
174 kumpf 1.116 MessageLoaderParms parms(
175 "Common.Monitor.TICKLE_LISTEN",
176 "Received error number $0 while listening to the internal "
177 "socket.",
178 getSocketError());
|
179 thilo.boehm 1.115 throw Exception(parms);
180 }
181
182 // make sure we have the correct socket for our server
|
183 kumpf 1.116 int sock = ::getsockname(
184 _tickle_server_socket,
185 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
186 &_addr_size);
187 if (sock < 0)
188 {
189 MessageLoaderParms parms(
190 "Common.Monitor.TICKLE_SOCKNAME",
191 "Received error number $0 while getting the internal socket "
192 "name.",
193 getSocketError());
|
194 thilo.boehm 1.115 throw Exception(parms);
195 }
|
196 a.arora 1.73
|
197 thilo.boehm 1.115 /* set up the tickle client/connector */
|
198 r.kieninger 1.83
|
199 thilo.boehm 1.115 // get a socket for our tickle client
|
200 kumpf 1.116 if ((_tickle_client_socket =
201 Socket::createSocket(PF_INET, SOCK_STREAM, 0)) ==
202 PEGASUS_INVALID_SOCKET)
203 {
204 MessageLoaderParms parms(
205 "Common.Monitor.TICKLE_CLIENT_CREATE",
206 "Received error number $0 while creating the internal client "
207 "socket.",
208 getSocketError());
|
209 thilo.boehm 1.115 throw Exception(parms);
210 }
|
211 a.arora 1.73
|
212 thilo.boehm 1.115 // setup the address of the client
213 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
|
214 ouyang.jian 1.120
|
215 thilo.boehm 1.115 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
216 _tickle_client_addr.sin_family = PF_INET;
217 _tickle_client_addr.sin_port = 0;
|
218 a.arora 1.73
|
219 thilo.boehm 1.115 // bind socket to client side
|
220 kumpf 1.116 if ((::bind(_tickle_client_socket,
221 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
222 sizeof(_tickle_client_addr))) < 0)
223 {
224 MessageLoaderParms parms(
225 "Common.Monitor.TICKLE_CLIENT_BIND",
226 "Received error number $0 while binding the internal client "
227 "socket.",
228 getSocketError());
|
229 thilo.boehm 1.115 throw Exception(parms);
230 }
231
232 // connect to server side
|
233 kumpf 1.116 if ((::connect(_tickle_client_socket,
234 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
235 sizeof(_tickle_server_addr))) < 0)
236 {
237 MessageLoaderParms parms(
238 "Common.Monitor.TICKLE_CLIENT_CONNECT",
239 "Received error number $0 while connecting the internal "
240 "client socket.",
241 getSocketError());
|
242 thilo.boehm 1.115 throw Exception(parms);
243 }
244
245 /* set up the slave connection */
246 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
247 SocketLength peer_size = sizeof(_tickle_peer_addr);
248 Threads::sleep(1);
249
|
250 kumpf 1.116 // this call may fail, we will try a max of 20 times to establish
251 // this peer connection
252 if ((_tickle_peer_socket = ::accept(_tickle_server_socket,
253 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
254 &peer_size)) < 0)
|
255 thilo.boehm 1.115 {
|
256 kumpf 1.116 if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
257 getSocketError() == PEGASUS_NETWORK_TRYAGAIN)
|
258 thilo.boehm 1.115 {
259 int retries = 0;
260 do
261 {
262 Threads::sleep(1);
|
263 kumpf 1.116 _tickle_peer_socket = ::accept(
264 _tickle_server_socket,
265 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
266 &peer_size);
|
267 thilo.boehm 1.115 retries++;
|
268 kumpf 1.116 } while (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
269 getSocketError() == PEGASUS_NETWORK_TRYAGAIN &&
270 retries < 20);
|
271 thilo.boehm 1.115 }
272 // TCP/IP is down, destroy sockets and retry again.
|
273 kumpf 1.116 if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR &&
274 getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
|
275 thilo.boehm 1.115 {
276 // destroy everything
277 uninitializeTickler();
278 // retry again.
279 continue;
280 }
281 }
|
282 kumpf 1.116 if (_tickle_peer_socket == PEGASUS_SOCKET_ERROR)
|
283 thilo.boehm 1.115 {
|
284 kumpf 1.116 MessageLoaderParms parms(
285 "Common.Monitor.TICKLE_ACCEPT",
286 "Received error number $0 while accepting the internal "
287 "socket connection.",
288 getSocketError());
|
289 thilo.boehm 1.115 throw Exception(parms);
|
290 kumpf 1.116 }
291 else
|
292 thilo.boehm 1.115 {
293 // socket is ok
294 break;
|
295 a.arora 1.73 }
|
296 kumpf 1.116 } while (1); // try until TCP/IP is restarted
|
297 marek 1.111
298 Socket::disableBlocking(_tickle_peer_socket);
299 Socket::disableBlocking(_tickle_client_socket);
300
|
301 kumpf 1.116 // add the tickler to the list of entries to be monitored and set to
302 // IDLE because Monitor only
|
303 a.arora 1.73 // checks entries with IDLE state for events
304 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
305 entry._status = _MonitorEntry::IDLE;
|
306 thilo.boehm 1.115
307 // is the tickler initalized as first socket on startup ?
308 if (_entries.size()==0)
309 {
310 // if yes, append a new entry
311 _entries.append(entry);
312 }
313 else
314 {
315 // if not, overwrite the tickler entry with new socket
316 _entries[0]=entry;
317 }
|
318 a.arora 1.73 }
319
|
320 kumpf 1.116 void Monitor::tickle()
|
321 a.arora 1.73 {
|
322 sushma.fernandes 1.78 static char _buffer[] =
|
323 a.arora 1.73 {
324 '0','0'
325 };
|
326 r.kieninger 1.83
|
327 sushma.fernandes 1.78 AutoMutex autoMutex(_tickle_mutex);
328 Socket::write(_tickle_client_socket,&_buffer, 2);
329 }
330
|
331 kumpf 1.121 void Monitor::setState(
332 Uint32 index,
333 _MonitorEntry::entry_status status)
|
334 sushma.fernandes 1.78 {
|
335 kumpf 1.121 AutoMutex autoEntryMutex(_entry_mut);
|
336 sushma.fernandes 1.78 // Set the state to requested state
337 _entries[index]._status = status;
|
338 a.arora 1.73 }
339
|
340 kumpf 1.114 void Monitor::run(Uint32 milliseconds)
|
341 mike 1.2 {
|
342 mday 1.18
|
343 r.kieninger 1.83
|
344 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
345 a.arora 1.73
|
346 mday 1.25 fd_set fdread;
347 FD_ZERO(&fdread);
|
348 a.arora 1.73
|
349 kumpf 1.94 AutoMutex autoEntryMutex(_entry_mut);
|
350 r.kieninger 1.83
|
351 mike 1.100 ArrayIterator<_MonitorEntry> entries(_entries);
352
|
353 kumpf 1.116 // Check the stopConnections flag. If set, clear the Acceptor monitor
354 // entries
|
355 mike 1.96 if (_stopConnections.get() == 1)
|
356 kumpf 1.48 {
|
357 mike 1.100 for ( int indx = 0; indx < (int)entries.size(); indx++)
|
358 kumpf 1.48 {
|
359 mike 1.100 if (entries[indx]._type == Monitor::ACCEPTOR)
|
360 kumpf 1.48 {
|
361 mike 1.100 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
|
362 kumpf 1.48 {
|
363 mike 1.100 if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
364 entries[indx]._status.get() == _MonitorEntry::DYING )
|
365 kumpf 1.48 {
366 // remove the entry
|
367 kumpf 1.116 entries[indx]._status = _MonitorEntry::EMPTY;
|
368 kumpf 1.48 }
369 else
370 {
371 // set status to DYING
|
372 mike 1.100 entries[indx]._status = _MonitorEntry::DYING;
|
373 kumpf 1.48 }
374 }
375 }
376 }
377 _stopConnections = 0;
|
378 kumpf 1.116 _stopConnectionsSem.signal();
|
379 kumpf 1.48 }
|
380 kumpf 1.51
|
381 kumpf 1.116 for (int indx = 0; indx < (int)entries.size(); indx++)
|
382 kumpf 1.68 {
|
383 kumpf 1.116 const _MonitorEntry &entry = entries[indx];
384 if ((entry._status.get() == _MonitorEntry::DYING) &&
385 (entry._type == Monitor::CONNECTION))
386 {
387 MessageQueue *q = MessageQueue::lookup(entry.queueId);
388 PEGASUS_ASSERT(q != 0);
389 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
390
391 if (h._connectionClosePending == false)
392 continue;
393
394 // NOTE: do not attempt to delete while there are pending responses
395 // coming thru. The last response to come thru after a
396 // _connectionClosePending will reset _responsePending to false
397 // and then cause the monitor to rerun this code and clean up.
398 // (see HTTPConnection.cpp)
399
400 if (h._responsePending == true)
401 {
|
402 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
403 kumpf 1.116 "Monitor::run - Ignoring connection delete request "
404 "because responses are still pending. "
405 "connection=0x%p, socket=%d\n",
|
406 marek 1.118 (void *)&h, h.getSocket()));
|
407 kumpf 1.116 continue;
408 }
409 h._connectionClosePending = false;
410 MessageQueue &o = h.get_owner();
411 Message* message= new CloseConnectionMessage(entry.socket);
412 message->dest = o.getQueueId();
413
414 // HTTPAcceptor is responsible for closing the connection.
415 // The lock is released to allow HTTPAcceptor to call
416 // unsolicitSocketMessages to free the entry.
417 // Once HTTPAcceptor completes processing of the close
418 // connection, the lock is re-requested and processing of
419 // the for loop continues. This is safe with the current
420 // implementation of the entries object. Note that the
421 // loop condition accesses the entries.size() on each
422 // iteration, so that a change in size while the mutex is
423 // unlocked will not result in an ArrayIndexOutOfBounds
424 // exception.
425
426 _entry_mut.unlock();
427 o.enqueue(message);
428 kumpf 1.116 _entry_mut.lock();
429
430 // After enqueue a message and the autoEntryMutex has been
431 // released and locked again, the array of _entries can be
432 // changed. The ArrayIterator has be reset with the original
433 // _entries.
434 entries.reset(_entries);
435 }
|
436 kumpf 1.68 }
437
|
438 kumpf 1.51 Uint32 _idleEntries = 0;
|
439 r.kieninger 1.83
|
440 a.arora 1.73 /*
|
441 david.dillard 1.95 We will keep track of the maximum socket number and pass this value
442 to the kernel as a parameter to SELECT. This loop seems like a good
443 place to calculate the max file descriptor (maximum socket number)
444 because we have to traverse the entire array.
|
445 r.kieninger 1.83 */
|
446 mike 1.106 SocketHandle maxSocketCurrentPass = 0;
|
447 kumpf 1.116 for (int indx = 0; indx < (int)entries.size(); indx++)
|
448 mike 1.2 {
|
449 kumpf 1.116 if (maxSocketCurrentPass < entries[indx].socket)
450 maxSocketCurrentPass = entries[indx].socket;
|
451 a.arora 1.73
|
452 kumpf 1.116 if (entries[indx]._status.get() == _MonitorEntry::IDLE)
|
453 mday 1.25 {
|
454 david.dillard 1.95 _idleEntries++;
|
455 mike 1.100 FD_SET(entries[indx].socket, &fdread);
|
456 mday 1.25 }
|
457 mday 1.13 }
|
458 s.hills 1.62
|
459 a.arora 1.73 /*
|
460 david.dillard 1.95 Add 1 then assign maxSocket accordingly. We add 1 to account for
461 descriptors starting at 0.
|
462 a.arora 1.73 */
463 maxSocketCurrentPass++;
464
|
465 mike 1.112 _entry_mut.unlock();
|
466 david.dillard 1.95
467 //
468 // The first argument to select() is ignored on Windows and it is not
469 // a socket value. The original code assumed that the number of sockets
470 // and a socket value have the same type. On Windows they do not.
471 //
472 #ifdef PEGASUS_OS_TYPE_WINDOWS
473 int events = select(0, &fdread, NULL, NULL, &tv);
474 #else
|
475 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
476 david.dillard 1.95 #endif
|
477 mike 1.112 _entry_mut.lock();
|
478 kumpf 1.116
479 // After enqueue a message and the autoEntryMutex has been released and
480 // locked again, the array of _entries can be changed. The ArrayIterator
481 // has be reset with the original _entries
|
482 r.kieninger 1.102 entries.reset(_entries);
|
483 mike 1.106
484 if (events == PEGASUS_SOCKET_ERROR)
|
485 mday 1.13 {
|
486 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
487 "Monitor::run - errorno = %d has occurred on select.", errno));
|
488 kumpf 1.116 // The EBADF error indicates that one or more or the file
489 // descriptions was not valid. This could indicate that
490 // the entries structure has been corrupted or that
491 // we have a synchronization error.
|
492 kumpf 1.50
|
493 kumpf 1.116 PEGASUS_ASSERT(errno != EBADF);
|
494 kumpf 1.50 }
495 else if (events)
496 {
|
497 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
498 kumpf 1.116 "Monitor::run select event received events = %d, monitoring %d "
499 "idle entries",
|
500 marek 1.118 events, _idleEntries));
|
501 kumpf 1.116 for (int indx = 0; indx < (int)entries.size(); indx++)
502 {
503 // The Monitor should only look at entries in the table that are
504 // IDLE (i.e., owned by the Monitor).
505 if ((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
506 (FD_ISSET(entries[indx].socket, &fdread)))
507 {
508 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
|
509 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
510 kumpf 1.116 "Monitor::run indx = %d, queueId = %d, q = %p",
|
511 marek 1.118 indx, entries[indx].queueId, q));
|
512 kumpf 1.116 PEGASUS_ASSERT(q !=0);
513
514 try
|
515 david.dillard 1.95 {
|
516 kumpf 1.116 if (entries[indx]._type == Monitor::CONNECTION)
517 {
|
518 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
519 kumpf 1.116 "entries[indx].type for indx = %d is "
520 "Monitor::CONNECTION",
|
521 marek 1.118 indx));
|
522 kumpf 1.116 static_cast<HTTPConnection *>(q)->_entry_index = indx;
523
524 // Do not update the entry just yet. The entry gets
525 // updated once the request has been read.
526 //entries[indx]._status = _MonitorEntry::BUSY;
|
527 sushma.fernandes 1.78
|
528 kumpf 1.116 // If allocate_and_awaken failure, retry on next
529 // iteration
|
530 a.arora 1.73 /* Removed for PEP 183.
|
531 kumpf 1.116 if (!MessageQueueService::get_thread_pool()->
532 allocate_and_awaken((void *)q, _dispatch))
533 {
|
534 kumpf 1.119 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA,
535 Tracer::LEVEL2,
|
536 kumpf 1.116 "Monitor::run: Insufficient resources to "
537 "process request.");
538 entries[indx]._status = _MonitorEntry::IDLE;
539 return true;
540 }
|
541 a.arora 1.73 */
542 // Added for PEP 183
|
543 kumpf 1.116 HTTPConnection *dst =
544 reinterpret_cast<HTTPConnection *>(q);
|
545 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
546 kumpf 1.116 "Monitor::_dispatch: entering run() for "
547 "indx = %d, queueId = %d, q = %p",
548 dst->_entry_index,
549 dst->_monitor->_entries[dst->_entry_index].queueId,
|
550 marek 1.118 dst));
|
551 kumpf 1.116
552 try
553 {
554 dst->run(1);
555 }
556 catch (...)
557 {
|
558 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
559 kumpf 1.116 "Monitor::_dispatch: exception received");
560 }
|
561 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
562 kumpf 1.116 "Monitor::_dispatch: exited run() for index %d",
|
563 marek 1.118 dst->_entry_index));
|
564 kumpf 1.116
565 // It is possible the entry status may not be set to
566 // busy. The following will fail in that case.
567 // PEGASUS_ASSERT(dst->_monitor->_entries[
568 // dst->_entry_index]._status.get() ==
569 // _MonitorEntry::BUSY);
570 // Once the HTTPConnection thread has set the status
571 // value to either Monitor::DYING or Monitor::IDLE,
572 // it has returned control of the connection to the
573 // Monitor. It is no longer permissible to access
574 // the connection or the entry in the _entries table.
575
576 // The following is not relevant as the worker thread
577 // or the reader thread will update the status of the
578 // entry.
579 //if (dst->_connectionClosePending)
580 //{
581 // dst->_monitor->_entries[dst->_entry_index]._status =
582 // _MonitorEntry::DYING;
583 //}
584 //else
585 kumpf 1.116 //{
586 // dst->_monitor->_entries[dst->_entry_index]._status =
587 // _MonitorEntry::IDLE;
588 //}
|
589 r.kieninger 1.83 // end Added for PEP 183
|
590 kumpf 1.116 }
591 else if (entries[indx]._type == Monitor::INTERNAL)
592 {
593 // set ourself to BUSY,
|
594 r.kieninger 1.83 // read the data
|
595 a.arora 1.73 // and set ourself back to IDLE
|
596 r.kieninger 1.83
|
597 kumpf 1.116 entries[indx]._status = _MonitorEntry::BUSY;
598 static char buffer[2];
599 Sint32 amt =
600 Socket::read(entries[indx].socket,&buffer, 2);
601
602 if (amt == PEGASUS_SOCKET_ERROR &&
603 getSocketError() == PEGASUS_NETWORK_TCPIP_STOPPED)
604 {
|
605 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
606 kumpf 1.116 "Monitor::run: Tickler socket got an IO error. "
607 "Going to re-create Socket and wait for "
608 "TCP/IP restart.");
609 uninitializeTickler();
610 initializeTickler();
611 }
612 else
613 {
614 entries[indx]._status = _MonitorEntry::IDLE;
615 }
616 }
617 else
618 {
|
619 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
620 kumpf 1.116 "Non-connection entry, indx = %d, has been "
621 "received.",
|
622 marek 1.118 indx));
|
623 kumpf 1.116 int events = 0;
624 events |= SocketMessage::READ;
625 Message* msg = new SocketMessage(
626 entries[indx].socket, events);
627 entries[indx]._status = _MonitorEntry::BUSY;
628 _entry_mut.unlock();
629 q->enqueue(msg);
630 _entry_mut.lock();
631
632 // After enqueue a message and the autoEntryMutex has
633 // been released and locked again, the array of
634 // entries can be changed. The ArrayIterator has be
635 // reset with the original _entries
636 entries.reset(_entries);
637 entries[indx]._status = _MonitorEntry::IDLE;
638 }
639 }
640 catch (...)
641 {
642 }
|
643 thilo.boehm 1.115 }
|
644 kumpf 1.116 }
|
645 mday 1.24 }
|
646 mike 1.2 }
647
|
648 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
649 kumpf 1.48 {
650 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
651 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
652 kumpf 1.48 _stopConnections = 1;
|
653 a.arora 1.73 tickle();
|
654 kumpf 1.48
|
655 chuck 1.74 if (wait)
|
656 a.arora 1.73 {
|
657 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
658 // caller of this function may unbind the ports while the monitor
659 // is still accepting connections on them.
|
660 kumpf 1.101 _stopConnectionsSem.wait();
|
661 a.arora 1.73 }
|
662 r.kieninger 1.83
|
663 kumpf 1.48 PEG_METHOD_EXIT();
664 }
|
665 mday 1.25
|
666 mday 1.37
|
667 kumpf 1.116 int Monitor::solicitSocketMessages(
|
668 mike 1.106 SocketHandle socket,
|
669 mike 1.2 Uint32 events,
|
670 r.kieninger 1.83 Uint32 queueId,
|
671 mday 1.8 int type)
|
672 mike 1.2 {
|
673 kumpf 1.116 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
674 AutoMutex autoMut(_entry_mut);
675 // Check to see if we need to dynamically grow the _entries array
676 // We always want the _entries array to 2 bigger than the
677 // current connections requested
678 _solicitSocketCount++; // bump the count
679 int size = (int)_entries.size();
680 if ((int)_solicitSocketCount >= (size-1))
681 {
682 for (int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++)
683 {
684 _MonitorEntry entry(0, 0, 0);
685 _entries.append(entry);
686 }
687 }
|
688 a.arora 1.73
|
689 kumpf 1.116 int index;
690 for (index = 1; index < (int)_entries.size(); index++)
691 {
692 try
693 {
694 if (_entries[index]._status.get() == _MonitorEntry::EMPTY)
695 {
696 _entries[index].socket = socket;
697 _entries[index].queueId = queueId;
698 _entries[index]._type = type;
699 _entries[index]._status = _MonitorEntry::IDLE;
700
701 return index;
702 }
703 }
704 catch (...)
705 {
706 }
707 }
708 // decrease the count, if we are here we didn't do anything meaningful
709 _solicitSocketCount--;
710 kumpf 1.116 PEG_METHOD_EXIT();
711 return -1;
|
712 mike 1.2 }
713
|
714 mike 1.106 void Monitor::unsolicitSocketMessages(SocketHandle socket)
|
715 mike 1.2 {
|
716 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
717 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
718 a.arora 1.73
719 /*
|
720 kumpf 1.116 Start at index = 1 because _entries[0] is the tickle entry which
721 never needs to be EMPTY;
|
722 a.arora 1.73 */
|
723 w.otsuka 1.89 unsigned int index;
|
724 kumpf 1.116 for (index = 1; index < _entries.size(); index++)
|
725 mike 1.2 {
|
726 kumpf 1.116 if (_entries[index].socket == socket)
727 {
728 _entries[index]._status = _MonitorEntry::EMPTY;
729 _entries[index].socket = PEGASUS_INVALID_SOCKET;
730 _solicitSocketCount--;
731 break;
732 }
|
733 mike 1.2 }
|
734 a.arora 1.73
735 /*
|
736 kumpf 1.116 Dynamic Contraction:
737 To remove excess entries we will start from the end of the _entries
738 array and remove all entries with EMPTY status until we find the
739 first NON EMPTY. This prevents the positions, of the NON EMPTY
740 entries, from being changed.
|
741 r.kieninger 1.83 */
|
742 a.arora 1.73 index = _entries.size() - 1;
|
743 kumpf 1.116 while (_entries[index]._status.get() == _MonitorEntry::EMPTY)
744 {
745 if (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
|
746 a.arora 1.73 _entries.remove(index);
|
747 kumpf 1.116 index--;
|
748 a.arora 1.73 }
|
749 kumpf 1.4 PEG_METHOD_EXIT();
|
750 mike 1.2 }
751
|
752 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
753 kumpf 1.116 ThreadReturnType PEGASUS_THREAD_CDECL Monitor::_dispatch(void* parm)
|
754 mday 1.7 {
|
755 kumpf 1.116 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
756 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
|
757 kumpf 1.116 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, "
758 "q = %p",
759 dst->_entry_index,
760 dst->_monitor->_entries[dst->_entry_index].queueId,
|
761 marek 1.118 dst));
|
762 kumpf 1.116
763 try
764 {
765 dst->run(1);
766 }
767 catch (...)
768 {
|
769 marek 1.118 PEG_TRACE_CSTRING(TRC_HTTP, Tracer::LEVEL4,
|
770 kumpf 1.116 "Monitor::_dispatch: exception received");
771 }
|
772 marek 1.118 PEG_TRACE((TRC_HTTP, Tracer::LEVEL4,
773 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index));
|
774 kumpf 1.116
775 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() ==
776 _MonitorEntry::BUSY);
777
778 // Once the HTTPConnection thread has set the status value to either
779 // Monitor::DYING or Monitor::IDLE, it has returned control of the
780 // connection to the Monitor. It is no longer permissible to access the
781 // connection or the entry in the _entries table.
782 if (dst->_connectionClosePending)
783 {
784 dst->_monitor->_entries[dst->_entry_index]._status =
785 _MonitorEntry::DYING;
786 }
787 else
788 {
789 dst->_monitor->_entries[dst->_entry_index]._status =
790 _MonitorEntry::IDLE;
791 }
792 return 0;
|
793 mday 1.40 }
794
|
795 mike 1.2 PEGASUS_NAMESPACE_END
|