1 karl 1.90 //%2005////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 mike 1.2 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
13 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
16 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
|
18 david.dillard 1.95 //
|
19 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
20 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
22 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
25 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Mike Brasher (mbrasher@bmc.com)
31 //
|
32 r.kieninger 1.83 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
|
33 a.arora 1.71 // Amit K Arora (Bug#1153) amita@in.ibm.com
|
34 alagaraja 1.75 // Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
|
35 sushma.fernandes 1.78 // Sushma Fernandes (sushma@hp.com) for Bug#2057
|
36 joyce.j 1.84 // Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
|
37 kumpf 1.85 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
38 mike 1.2 //
39 //%/////////////////////////////////////////////////////////////////////////////
40
41 #include <Pegasus/Common/Config.h>
|
42 mday 1.40
|
43 mike 1.2 #include <cstring>
44 #include "Monitor.h"
45 #include "MessageQueue.h"
46 #include "Socket.h"
|
47 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
48 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
49 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
50 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
51 mike 1.100 #include "ArrayIterator.h"
|
52 mike 1.2
53 #ifdef PEGASUS_OS_TYPE_WINDOWS
54 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
55 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
56 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
57 david.dillard 1.95 <windows.h>). Find inclusion of that header which is visible to this \
|
58 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
59 otherwise, less than 64 clients (the default) will be able to connect to the \
60 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
61 mday 1.5
|
62 mike 1.2 # endif
63 # define FD_SETSIZE 1024
|
64 mday 1.5 # include <windows.h>
|
65 mike 1.2 #else
66 # include <sys/types.h>
67 # include <sys/socket.h>
68 # include <sys/time.h>
69 # include <netinet/in.h>
70 # include <netdb.h>
71 # include <arpa/inet.h>
72 #endif
73
74 PEGASUS_USING_STD;
75
76 PEGASUS_NAMESPACE_BEGIN
77
|
78 mike 1.96 static AtomicInt _connections(0);
|
79 mday 1.25
|
80 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
81 //
82 // Monitor
83 //
84 ////////////////////////////////////////////////////////////////////////////////
85
|
86 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
87 mike 1.2 Monitor::Monitor()
|
88 kumpf 1.87 : _stopConnections(0),
|
89 a.arora 1.73 _stopConnectionsSem(0),
|
90 a.dunfey 1.76 _solicitSocketCount(0),
|
91 a.dunfey 1.77 _tickle_client_socket(-1),
92 _tickle_server_socket(-1),
93 _tickle_peer_socket(-1)
|
94 mike 1.2 {
|
95 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
96 mike 1.2 Socket::initializeInterface();
|
97 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
98 a.arora 1.73
99 // setup the tickler
100 initializeTickler();
101
|
102 r.kieninger 1.83 // Start the count at 1 because initilizeTickler()
|
103 a.arora 1.73 // has added an entry in the first position of the
104 // _entries array
105 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
106 mday 1.37 {
107 _MonitorEntry entry(0, 0, 0);
108 _entries.append(entry);
109 }
|
110 mday 1.18 }
|
111 mday 1.20
|
112 mike 1.2 Monitor::~Monitor()
113 {
|
114 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
115 a.dunfey 1.76
116 try{
|
117 a.dunfey 1.77 if(_tickle_peer_socket >= 0)
|
118 a.dunfey 1.76 {
119 Socket::close(_tickle_peer_socket);
120 }
|
121 a.dunfey 1.77 if(_tickle_client_socket >= 0)
|
122 a.dunfey 1.76 {
123 Socket::close(_tickle_client_socket);
124 }
|
125 a.dunfey 1.77 if(_tickle_server_socket >= 0)
|
126 a.dunfey 1.76 {
127 Socket::close(_tickle_server_socket);
128 }
129 }
130 catch(...)
131 {
132 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
133 "Failed to close tickle sockets");
134 }
135
|
136 mike 1.2 Socket::uninitializeInterface();
|
137 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
138 "returning from monitor destructor");
|
139 mday 1.18 }
140
|
141 a.arora 1.73 void Monitor::initializeTickler(){
|
142 r.kieninger 1.83 /*
143 NOTE: On any errors trying to
144 setup out tickle connection,
|
145 a.arora 1.73 throw an exception/end the server
146 */
147
148 /* setup the tickle server/listener */
149
150 // get a socket for the server side
|
151 david.dillard 1.95 if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
|
152 a.arora 1.73 //handle error
153 MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
154 "Received error number $0 while creating the internal socket.",
155 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
156 errno);
157 #else
158 WSAGetLastError());
159 #endif
160 throw Exception(parms);
161 }
162
163 // initialize the address
164 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
165 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
166 #pragma convert(37)
167 #endif
168 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
169 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
170 #pragma convert(0)
171 #endif
172 _tickle_server_addr.sin_family = PF_INET;
173 a.arora 1.73 _tickle_server_addr.sin_port = 0;
174
|
175 kumpf 1.86 PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
|
176 a.arora 1.73
177 // bind server side to socket
178 if((::bind(_tickle_server_socket,
|
179 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
180 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
181 // handle error
|
182 r.kieninger 1.83 #ifdef PEGASUS_OS_ZOS
183 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
184 "Received error:$0 while binding the internal socket.",strerror(errno));
185 #else
|
186 a.arora 1.73 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
187 "Received error number $0 while binding the internal socket.",
188 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
189 errno);
190 #else
191 WSAGetLastError());
192 #endif
|
193 r.kieninger 1.83 #endif
|
194 a.arora 1.73 throw Exception(parms);
195 }
196
197 // tell the kernel we are a server
198 if((::listen(_tickle_server_socket,3)) < 0){
199 // handle error
200 MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
201 "Received error number $0 while listening to the internal socket.",
202 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
203 errno);
204 #else
205 WSAGetLastError());
206 #endif
207 throw Exception(parms);
208 }
|
209 r.kieninger 1.83
|
210 a.arora 1.73 // make sure we have the correct socket for our server
211 int sock = ::getsockname(_tickle_server_socket,
|
212 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
213 &_addr_size);
|
214 a.arora 1.73 if(sock < 0){
215 // handle error
216 MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
217 "Received error number $0 while getting the internal socket name.",
218 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
219 errno);
220 #else
221 WSAGetLastError());
222 #endif
223 throw Exception(parms);
224 }
225
226 /* set up the tickle client/connector */
|
227 r.kieninger 1.83
|
228 a.arora 1.73 // get a socket for our tickle client
|
229 david.dillard 1.95 if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
|
230 a.arora 1.73 // handle error
231 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
232 "Received error number $0 while creating the internal client socket.",
233 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
234 errno);
235 #else
236 WSAGetLastError());
237 #endif
238 throw Exception(parms);
239 }
240
241 // setup the address of the client
242 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
243 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
244 #pragma convert(37)
245 #endif
246 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
247 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
248 #pragma convert(0)
249 #endif
250 _tickle_client_addr.sin_family = PF_INET;
251 a.arora 1.73 _tickle_client_addr.sin_port = 0;
252
253 // bind socket to client side
254 if((::bind(_tickle_client_socket,
|
255 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
|
256 a.arora 1.73 sizeof(_tickle_client_addr))) < 0){
257 // handle error
258 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
259 "Received error number $0 while binding the internal client socket.",
260 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
261 errno);
262 #else
263 WSAGetLastError());
264 #endif
265 throw Exception(parms);
266 }
267
268 // connect to server side
269 if((::connect(_tickle_client_socket,
|
270 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
271 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
272 // handle error
273 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
274 "Received error number $0 while connecting the internal client socket.",
275 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
276 errno);
277 #else
278 WSAGetLastError());
279 #endif
280 throw Exception(parms);
281 }
282
283 /* set up the slave connection */
284 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
|
285 kumpf 1.86 PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
|
286 r.kieninger 1.83 pegasus_sleep(1);
|
287 a.arora 1.73
288 // this call may fail, we will try a max of 20 times to establish this peer connection
289 if((_tickle_peer_socket = ::accept(_tickle_server_socket,
|
290 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
291 &peer_size)) < 0){
|
292 a.arora 1.73 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
293 // Only retry on non-windows platforms.
294 if(_tickle_peer_socket == -1 && errno == EAGAIN)
295 {
|
296 r.kieninger 1.83 int retries = 0;
|
297 a.arora 1.73 do
298 {
299 pegasus_sleep(1);
300 _tickle_peer_socket = ::accept(_tickle_server_socket,
|
301 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
302 &peer_size);
|
303 a.arora 1.73 retries++;
304 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
305 }
306 #endif
307 }
308 if(_tickle_peer_socket == -1){
309 // handle error
310 MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
311 "Received error number $0 while accepting the internal socket connection.",
312 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
313 errno);
314 #else
315 WSAGetLastError());
316 #endif
317 throw Exception(parms);
318 }
319 // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
320 // checks entries with IDLE state for events
321 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
322 entry._status = _MonitorEntry::IDLE;
323 _entries.append(entry);
324 a.arora 1.73 }
325
326 void Monitor::tickle(void)
327 {
|
328 sushma.fernandes 1.78 static char _buffer[] =
|
329 a.arora 1.73 {
330 '0','0'
331 };
|
332 r.kieninger 1.83
|
333 sushma.fernandes 1.78 AutoMutex autoMutex(_tickle_mutex);
|
334 r.kieninger 1.83 Socket::disableBlocking(_tickle_client_socket);
|
335 sushma.fernandes 1.78 Socket::write(_tickle_client_socket,&_buffer, 2);
|
336 r.kieninger 1.83 Socket::enableBlocking(_tickle_client_socket);
|
337 sushma.fernandes 1.78 }
338
339 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
340 {
341 // Set the state to requested state
342 _entries[index]._status = status;
|
343 a.arora 1.73 }
344
|
345 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
346 {
|
347 mday 1.18
|
348 mday 1.25 Boolean handled_events = false;
|
349 a.arora 1.73 int i = 0;
|
350 r.kieninger 1.83
|
351 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
352 a.arora 1.73
|
353 mday 1.25 fd_set fdread;
354 FD_ZERO(&fdread);
|
355 a.arora 1.73
|
356 kumpf 1.94 AutoMutex autoEntryMutex(_entry_mut);
|
357 r.kieninger 1.83
|
358 mike 1.100 ArrayIterator<_MonitorEntry> entries(_entries);
359
|
360 r.kieninger 1.83 // Check the stopConnections flag. If set, clear the Acceptor monitor entries
|
361 mike 1.96 if (_stopConnections.get() == 1)
|
362 kumpf 1.48 {
|
363 mike 1.100 for ( int indx = 0; indx < (int)entries.size(); indx++)
|
364 kumpf 1.48 {
|
365 mike 1.100 if (entries[indx]._type == Monitor::ACCEPTOR)
|
366 kumpf 1.48 {
|
367 mike 1.100 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
|
368 kumpf 1.48 {
|
369 mike 1.100 if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
370 entries[indx]._status.get() == _MonitorEntry::DYING )
|
371 kumpf 1.48 {
372 // remove the entry
|
373 mike 1.100 entries[indx]._status = _MonitorEntry::EMPTY;
|
374 kumpf 1.48 }
375 else
376 {
377 // set status to DYING
|
378 mike 1.100 entries[indx]._status = _MonitorEntry::DYING;
|
379 kumpf 1.48 }
380 }
381 }
382 }
383 _stopConnections = 0;
|
384 a.arora 1.73 _stopConnectionsSem.signal();
|
385 kumpf 1.48 }
|
386 kumpf 1.51
|
387 mike 1.100 for( int indx = 0; indx < (int)entries.size(); indx++)
|
388 kumpf 1.68 {
|
389 mike 1.100 const _MonitorEntry &entry = entries[indx];
|
390 mike 1.96 if ((entry._status.get() == _MonitorEntry::DYING) &&
|
391 brian.campbell 1.80 (entry._type == Monitor::CONNECTION))
|
392 kumpf 1.68 {
|
393 brian.campbell 1.80 MessageQueue *q = MessageQueue::lookup(entry.queueId);
|
394 kumpf 1.68 PEGASUS_ASSERT(q != 0);
|
395 brian.campbell 1.80 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
|
396 r.kieninger 1.83
|
397 brian.campbell 1.80 if (h._connectionClosePending == false)
398 continue;
399
400 // NOTE: do not attempt to delete while there are pending responses
|
401 r.kieninger 1.83 // coming thru. The last response to come thru after a
|
402 brian.campbell 1.80 // _connectionClosePending will reset _responsePending to false
403 // and then cause the monitor to rerun this code and clean up.
404 // (see HTTPConnection.cpp)
405
406 if (h._responsePending == true)
407 {
408 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
409 "Ignoring connection delete request because "
410 "responses are still pending. "
|
411 r.kieninger 1.83 "connection=0x%p, socket=%d\n",
|
412 brian.campbell 1.81 (void *)&h, h.getSocket());
|
413 brian.campbell 1.80 continue;
414 }
415 h._connectionClosePending = false;
416 MessageQueue &o = h.get_owner();
417 Message* message= new CloseConnectionMessage(entry.socket);
|
418 kumpf 1.68 message->dest = o.getQueueId();
419
|
420 r.kieninger 1.83 // HTTPAcceptor is responsible for closing the connection.
|
421 kumpf 1.68 // The lock is released to allow HTTPAcceptor to call
|
422 r.kieninger 1.83 // unsolicitSocketMessages to free the entry.
|
423 kumpf 1.68 // Once HTTPAcceptor completes processing of the close
424 // connection, the lock is re-requested and processing of
425 // the for loop continues. This is safe with the current
|
426 mike 1.100 // implementation of the entries object. Note that the
427 // loop condition accesses the entries.size() on each
|
428 kumpf 1.68 // iteration, so that a change in size while the mutex is
429 // unlocked will not result in an ArrayIndexOutOfBounds
430 // exception.
431
|
432 kumpf 1.94 autoEntryMutex.unlock();
|
433 kumpf 1.68 o.enqueue(message);
|
434 kumpf 1.94 autoEntryMutex.lock();
|
435 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
436 // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
437 entries.reset(_entries);
|
438 kumpf 1.68 }
439 }
440
|
441 kumpf 1.51 Uint32 _idleEntries = 0;
|
442 r.kieninger 1.83
|
443 a.arora 1.73 /*
|
444 david.dillard 1.95 We will keep track of the maximum socket number and pass this value
445 to the kernel as a parameter to SELECT. This loop seems like a good
446 place to calculate the max file descriptor (maximum socket number)
447 because we have to traverse the entire array.
|
448 r.kieninger 1.83 */
|
449 david.dillard 1.95 PEGASUS_SOCKET maxSocketCurrentPass = 0;
|
450 mike 1.100 for( int indx = 0; indx < (int)entries.size(); indx++)
|
451 mike 1.2 {
|
452 mike 1.100 if(maxSocketCurrentPass < entries[indx].socket)
453 maxSocketCurrentPass = entries[indx].socket;
|
454 a.arora 1.73
|
455 mike 1.100 if(entries[indx]._status.get() == _MonitorEntry::IDLE)
|
456 mday 1.25 {
|
457 david.dillard 1.95 _idleEntries++;
|
458 mike 1.100 FD_SET(entries[indx].socket, &fdread);
|
459 mday 1.25 }
|
460 mday 1.13 }
|
461 s.hills 1.62
|
462 a.arora 1.73 /*
|
463 david.dillard 1.95 Add 1 then assign maxSocket accordingly. We add 1 to account for
464 descriptors starting at 0.
|
465 a.arora 1.73 */
466 maxSocketCurrentPass++;
467
|
468 kumpf 1.94 autoEntryMutex.unlock();
|
469 david.dillard 1.95
470 //
471 // The first argument to select() is ignored on Windows and it is not
472 // a socket value. The original code assumed that the number of sockets
473 // and a socket value have the same type. On Windows they do not.
474 //
475 #ifdef PEGASUS_OS_TYPE_WINDOWS
476 int events = select(0, &fdread, NULL, NULL, &tv);
477 #else
|
478 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
479 david.dillard 1.95 #endif
|
480 kumpf 1.94 autoEntryMutex.lock();
|
481 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
482 // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
483 entries.reset(_entries);
|
484 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
485 kumpf 1.50 if(events == SOCKET_ERROR)
|
486 mike 1.2 #else
|
487 kumpf 1.50 if(events == -1)
|
488 mike 1.2 #endif
|
489 mday 1.13 {
|
490 kumpf 1.50 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
491 "Monitor::run - errorno = %d has occurred on select.", errno);
492 // The EBADF error indicates that one or more or the file
493 // descriptions was not valid. This could indicate that
|
494 mike 1.100 // the entries structure has been corrupted or that
|
495 kumpf 1.50 // we have a synchronization error.
496
497 PEGASUS_ASSERT(errno != EBADF);
498 }
499 else if (events)
500 {
|
501 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
502 r.kieninger 1.83 "Monitor::run select event received events = %d, monitoring %d idle entries",
|
503 david.dillard 1.95 events, _idleEntries);
|
504 mike 1.100 for( int indx = 0; indx < (int)entries.size(); indx++)
|
505 mday 1.25 {
|
506 kumpf 1.53 // The Monitor should only look at entries in the table that are IDLE (i.e.,
507 // owned by the Monitor).
|
508 mike 1.100 if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
509 (FD_ISSET(entries[indx].socket, &fdread)))
|
510 david.dillard 1.95 {
|
511 mike 1.100 MessageQueue *q = MessageQueue::lookup(entries[indx].queueId);
|
512 kumpf 1.53 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
513 "Monitor::run indx = %d, queueId = %d, q = %p",
|
514 mike 1.100 indx, entries[indx].queueId, q);
|
515 kumpf 1.53 PEGASUS_ASSERT(q !=0);
|
516 mday 1.37
|
517 david.dillard 1.95 try
518 {
|
519 mike 1.100 if(entries[indx]._type == Monitor::CONNECTION)
|
520 david.dillard 1.95 {
|
521 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
522 mike 1.100 "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
|
523 david.dillard 1.95 static_cast<HTTPConnection *>(q)->_entry_index = indx;
|
524 sushma.fernandes 1.78
525 // Do not update the entry just yet. The entry gets updated once
|
526 r.kieninger 1.83 // the request has been read.
|
527 mike 1.100 //entries[indx]._status = _MonitorEntry::BUSY;
|
528 sushma.fernandes 1.78
|
529 kumpf 1.66 // If allocate_and_awaken failure, retry on next iteration
|
530 a.arora 1.73 /* Removed for PEP 183.
|
531 kumpf 1.69 if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
532 (void *)q, _dispatch))
|
533 kumpf 1.67 {
534 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
535 "Monitor::run: Insufficient resources to process request.");
|
536 mike 1.100 entries[indx]._status = _MonitorEntry::IDLE;
|
537 kumpf 1.67 return true;
538 }
|
539 a.arora 1.73 */
540 // Added for PEP 183
|
541 david.dillard 1.95 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
542 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
543 a.arora 1.73 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
544 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
545 try
546 {
547 dst->run(1);
548 }
|
549 david.dillard 1.95 catch (...)
550 {
551 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
552 "Monitor::_dispatch: exception received");
553 }
554 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
555 a.arora 1.73 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
556
|
557 sushma.fernandes 1.78 // It is possible the entry status may not be set to busy.
|
558 r.kieninger 1.83 // The following will fail in that case.
|
559 mike 1.96 // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
|
560 a.arora 1.73 // Once the HTTPConnection thread has set the status value to either
561 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
562 // to the Monitor. It is no longer permissible to access the connection
563 // or the entry in the _entries table.
|
564 sushma.fernandes 1.78
565 // The following is not relevant as the worker thread or the
566 // reader thread will update the status of the entry.
567 //if (dst->_connectionClosePending)
|
568 r.kieninger 1.83 //{
|
569 sushma.fernandes 1.78 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
570 //}
571 //else
572 //{
573 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
574 r.kieninger 1.83 //}
575 // end Added for PEP 183
|
576 a.arora 1.73 }
|
577 mike 1.100 else if( entries[indx]._type == Monitor::INTERNAL){
|
578 r.kieninger 1.83 // set ourself to BUSY,
579 // read the data
|
580 a.arora 1.73 // and set ourself back to IDLE
|
581 r.kieninger 1.83
|
582 mike 1.100 entries[indx]._status = _MonitorEntry::BUSY;
|
583 a.arora 1.73 static char buffer[2];
|
584 mike 1.100 Socket::disableBlocking(entries[indx].socket);
585 Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
586 Socket::enableBlocking(entries[indx].socket);
587 entries[indx]._status = _MonitorEntry::IDLE;
|
588 mday 1.37 }
589 else
|
590 mday 1.25 {
|
591 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
592 "Non-connection entry, indx = %d, has been received.", indx);
|
593 mday 1.37 int events = 0;
594 events |= SocketMessage::READ;
|
595 mike 1.100 Message *msg = new SocketMessage(entries[indx].socket, events);
596 entries[indx]._status = _MonitorEntry::BUSY;
|
597 kumpf 1.94 autoEntryMutex.unlock();
|
598 mday 1.37 q->enqueue(msg);
|
599 kumpf 1.94 autoEntryMutex.lock();
|
600 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
601 // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
602 entries.reset(_entries);
|
603 mike 1.100 entries[indx]._status = _MonitorEntry::IDLE;
|
604 kumpf 1.94
|
605 mday 1.25 return true;
606 }
607 }
|
608 mday 1.37 catch(...)
|
609 mday 1.25 {
610 }
611 handled_events = true;
612 }
613 }
|
614 mday 1.24 }
|
615 kumpf 1.94
|
616 mday 1.13 return(handled_events);
|
617 mike 1.2 }
618
|
619 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
620 kumpf 1.48 {
621 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
622 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
623 kumpf 1.48 _stopConnections = 1;
|
624 a.arora 1.73 tickle();
|
625 kumpf 1.48
|
626 chuck 1.74 if (wait)
|
627 a.arora 1.73 {
|
628 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
629 // caller of this function may unbind the ports while the monitor
630 // is still accepting connections on them.
|
631 kumpf 1.101 _stopConnectionsSem.wait();
|
632 a.arora 1.73 }
|
633 r.kieninger 1.83
|
634 kumpf 1.48 PEG_METHOD_EXIT();
635 }
|
636 mday 1.25
|
637 mday 1.37
|
638 mday 1.25 int Monitor::solicitSocketMessages(
|
639 david.dillard 1.95 PEGASUS_SOCKET socket,
|
640 mike 1.2 Uint32 events,
|
641 r.kieninger 1.83 Uint32 queueId,
|
642 mday 1.8 int type)
|
643 mike 1.2 {
|
644 r.kieninger 1.83 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
645 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
646 a.arora 1.73 // Check to see if we need to dynamically grow the _entries array
647 // We always want the _entries array to 2 bigger than the
648 // current connections requested
649 _solicitSocketCount++; // bump the count
650 int size = (int)_entries.size();
|
651 w.otsuka 1.89 if((int)_solicitSocketCount >= (size-1)){
652 for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
|
653 a.arora 1.73 _MonitorEntry entry(0, 0, 0);
654 _entries.append(entry);
655 }
656 }
|
657 kumpf 1.4
|
658 a.arora 1.73 int index;
659 for(index = 1; index < (int)_entries.size(); index++)
|
660 mday 1.25 {
|
661 a.arora 1.73 try
|
662 mday 1.37 {
|
663 mike 1.96 if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
|
664 a.arora 1.73 {
665 _entries[index].socket = socket;
666 _entries[index].queueId = queueId;
667 _entries[index]._type = type;
668 _entries[index]._status = _MonitorEntry::IDLE;
|
669 r.kieninger 1.83
|
670 a.arora 1.73 return index;
671 }
|
672 mday 1.37 }
673 catch(...)
|
674 mday 1.25 {
675 }
676 }
|
677 a.arora 1.73 _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful
|
678 mday 1.25 PEG_METHOD_EXIT();
|
679 kumpf 1.50 return -1;
|
680 a.arora 1.73
|
681 mike 1.2 }
682
|
683 david.dillard 1.95 void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
|
684 mike 1.2 {
|
685 kumpf 1.50
|
686 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
687 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
688 a.arora 1.73
689 /*
690 Start at index = 1 because _entries[0] is the tickle entry which never needs
691 to be EMPTY;
692 */
|
693 w.otsuka 1.89 unsigned int index;
|
694 a.arora 1.73 for(index = 1; index < _entries.size(); index++)
|
695 mike 1.2 {
|
696 mday 1.25 if(_entries[index].socket == socket)
697 {
|
698 a.arora 1.73 _entries[index]._status = _MonitorEntry::EMPTY;
|
699 david.dillard 1.95 _entries[index].socket = PEGASUS_INVALID_SOCKET;
|
700 a.arora 1.73 _solicitSocketCount--;
701 break;
|
702 mday 1.25 }
|
703 mike 1.2 }
|
704 a.arora 1.73
705 /*
706 Dynamic Contraction:
707 To remove excess entries we will start from the end of the _entries array
708 and remove all entries with EMPTY status until we find the first NON EMPTY.
709 This prevents the positions, of the NON EMPTY entries, from being changed.
|
710 r.kieninger 1.83 */
|
711 a.arora 1.73 index = _entries.size() - 1;
|
712 mike 1.96 while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
|
713 a.arora 1.73 if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
714 _entries.remove(index);
715 index--;
716 }
|
717 kumpf 1.4 PEG_METHOD_EXIT();
|
718 mike 1.2 }
719
|
720 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
721 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
722 {
|
723 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
724 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
725 kumpf 1.53 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
726 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
727 kumpf 1.51 try
728 {
729 dst->run(1);
730 }
731 catch (...)
732 {
733 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
734 "Monitor::_dispatch: exception received");
735 }
736 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
737 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
|
738 r.kieninger 1.83
|
739 mike 1.96 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
|
740 kumpf 1.68
741 // Once the HTTPConnection thread has set the status value to either
742 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
743 // to the Monitor. It is no longer permissible to access the connection
744 // or the entry in the _entries table.
|
745 kumpf 1.50 if (dst->_connectionClosePending)
746 {
|
747 kumpf 1.68 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
748 }
749 else
750 {
751 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
752 kumpf 1.50 }
|
753 mday 1.8 return 0;
|
754 mday 1.40 }
755
|
756 mike 1.2 PEGASUS_NAMESPACE_END
|