1 karl 1.79 //%2004////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 mike 1.2 //
10 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
11 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
12 // deal in the Software without restriction, including without limitation the
13 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
14 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
15 // furnished to do so, subject to the following conditions:
16 //
|
17 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
18 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
19 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
20 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
21 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
22 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
23 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
24 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 //
26 //==============================================================================
27 //
28 // Author: Mike Brasher (mbrasher@bmc.com)
29 //
|
30 mday 1.49 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
|
31 a.arora 1.71 // Amit K Arora (Bug#1153) amita@in.ibm.com
|
32 alagaraja 1.75 // Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
|
33 sushma.fernandes 1.78 // Sushma Fernandes (sushma@hp.com) for Bug#2057
|
34 mike 1.2 //
35 //%/////////////////////////////////////////////////////////////////////////////
36
37 #include <Pegasus/Common/Config.h>
|
38 mday 1.40
|
39 mike 1.2 #include <cstring>
40 #include "Monitor.h"
41 #include "MessageQueue.h"
42 #include "Socket.h"
|
43 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
44 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
45 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
46 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
47 mike 1.2
48 #ifdef PEGASUS_OS_TYPE_WINDOWS
49 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
50 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
51 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
52 mday 1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
|
53 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
54 otherwise, less than 64 clients (the default) will be able to connect to the \
55 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
56 mday 1.5
|
57 mike 1.2 # endif
58 # define FD_SETSIZE 1024
|
59 mday 1.5 # include <windows.h>
|
60 mike 1.2 #else
61 # include <sys/types.h>
62 # include <sys/socket.h>
63 # include <sys/time.h>
64 # include <netinet/in.h>
65 # include <netdb.h>
66 # include <arpa/inet.h>
67 #endif
68
69 PEGASUS_USING_STD;
70
71 PEGASUS_NAMESPACE_BEGIN
72
|
73 mday 1.18
|
74 mday 1.25 static AtomicInt _connections = 0;
75
76 static struct timeval create_time = {0, 1};
|
77 mday 1.38 static struct timeval destroy_time = {300, 0};
|
78 mday 1.26 static struct timeval deadlock_time = {0, 0};
|
79 mday 1.18
|
80 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
81 //
82 // MonitorRep
83 //
84 ////////////////////////////////////////////////////////////////////////////////
85
86 struct MonitorRep
87 {
88 fd_set rd_fd_set;
89 fd_set wr_fd_set;
90 fd_set ex_fd_set;
91 fd_set active_rd_fd_set;
92 fd_set active_wr_fd_set;
93 fd_set active_ex_fd_set;
94 };
95
96 ////////////////////////////////////////////////////////////////////////////////
97 //
98 // Monitor
99 //
100 ////////////////////////////////////////////////////////////////////////////////
101 mike 1.2
|
102 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
103 mike 1.2 Monitor::Monitor()
|
104 a.arora 1.73 : _module_handle(0),
105 _controller(0),
106 _async(false),
107 _stopConnections(0),
108 _stopConnectionsSem(0),
|
109 a.dunfey 1.76 _solicitSocketCount(0),
|
110 a.dunfey 1.77 _tickle_client_socket(-1),
111 _tickle_server_socket(-1),
112 _tickle_peer_socket(-1)
|
113 mike 1.2 {
|
114 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
115 mike 1.2 Socket::initializeInterface();
|
116 mday 1.25 _rep = 0;
|
117 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
118 a.arora 1.73
119 // setup the tickler
120 initializeTickler();
121
122 // Start the count at 1 because initilizeTickler()
123 // has added an entry in the first position of the
124 // _entries array
125 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
126 mday 1.37 {
127 _MonitorEntry entry(0, 0, 0);
128 _entries.append(entry);
129 }
|
130 mike 1.2 }
131
|
132 mday 1.18 Monitor::Monitor(Boolean async)
|
133 a.arora 1.73 : _module_handle(0),
134 _controller(0),
135 _async(async),
136 _stopConnections(0),
137 _stopConnectionsSem(0),
|
138 a.dunfey 1.76 _solicitSocketCount(0),
|
139 a.dunfey 1.77 _tickle_client_socket(-1),
140 _tickle_server_socket(-1),
141 _tickle_peer_socket(-1)
|
142 mday 1.18 {
|
143 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
144 mday 1.18 Socket::initializeInterface();
|
145 mday 1.25 _rep = 0;
|
146 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
147 a.arora 1.73
148 // setup the tickler
149 initializeTickler();
150
151 // Start the count at 1 because initilizeTickler()
152 // has added an entry in the first position of the
153 // _entries array
154 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
155 mday 1.37 {
156 _MonitorEntry entry(0, 0, 0);
157 _entries.append(entry);
158 }
|
159 mday 1.18 }
|
160 mday 1.20
|
161 mike 1.2 Monitor::~Monitor()
162 {
|
163 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
164 "deregistering with module controller");
|
165 kumpf 1.10
|
166 kumpf 1.11 if(_module_handle != NULL)
|
167 mday 1.8 {
168 _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
169 _controller = 0;
|
170 kumpf 1.10 delete _module_handle;
|
171 mday 1.8 }
|
172 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
|
173 kumpf 1.48
|
174 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
175 a.dunfey 1.76
176 try{
|
177 a.dunfey 1.77 if(_tickle_peer_socket >= 0)
|
178 a.dunfey 1.76 {
179 Socket::close(_tickle_peer_socket);
180 }
|
181 a.dunfey 1.77 if(_tickle_client_socket >= 0)
|
182 a.dunfey 1.76 {
183 Socket::close(_tickle_client_socket);
184 }
|
185 a.dunfey 1.77 if(_tickle_server_socket >= 0)
|
186 a.dunfey 1.76 {
187 Socket::close(_tickle_server_socket);
188 }
189 }
190 catch(...)
191 {
192 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
193 "Failed to close tickle sockets");
194 }
195
|
196 mike 1.2 Socket::uninitializeInterface();
|
197 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
198 "returning from monitor destructor");
|
199 mday 1.18 }
200
|
201 a.arora 1.73 void Monitor::initializeTickler(){
202 /*
203 NOTE: On any errors trying to
204 setup out tickle connection,
205 throw an exception/end the server
206 */
207
208 /* setup the tickle server/listener */
209
210 // get a socket for the server side
211 if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
212 //handle error
213 MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
214 "Received error number $0 while creating the internal socket.",
215 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
216 errno);
217 #else
218 WSAGetLastError());
219 #endif
220 throw Exception(parms);
221 }
222 a.arora 1.73
223 // initialize the address
224 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
225 #ifdef PEGASUS_OS_ZOS
226 _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
227 #else
228 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
229 #pragma convert(37)
230 #endif
231 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
232 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
233 #pragma convert(0)
234 #endif
235 #endif
236 _tickle_server_addr.sin_family = PF_INET;
237 _tickle_server_addr.sin_port = 0;
238
239 PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_server_addr);
240
241 // bind server side to socket
242 if((::bind(_tickle_server_socket,
243 a.arora 1.73 (struct sockaddr *)&_tickle_server_addr,
244 sizeof(_tickle_server_addr))) < 0){
245 // handle error
246 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
247 "Received error number $0 while binding the internal socket.",
248 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
249 errno);
250 #else
251 WSAGetLastError());
252 #endif
253 throw Exception(parms);
254 }
255
256 // tell the kernel we are a server
257 if((::listen(_tickle_server_socket,3)) < 0){
258 // handle error
259 MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
260 "Received error number $0 while listening to the internal socket.",
261 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
262 errno);
263 #else
264 a.arora 1.73 WSAGetLastError());
265 #endif
266 throw Exception(parms);
267 }
268
269 // make sure we have the correct socket for our server
270 int sock = ::getsockname(_tickle_server_socket,
271 (struct sockaddr*)&_tickle_server_addr,
272 &_addr_size);
273 if(sock < 0){
274 // handle error
275 MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
276 "Received error number $0 while getting the internal socket name.",
277 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
278 errno);
279 #else
280 WSAGetLastError());
281 #endif
282 throw Exception(parms);
283 }
284
285 a.arora 1.73 /* set up the tickle client/connector */
286
287 // get a socket for our tickle client
288 if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
289 // handle error
290 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
291 "Received error number $0 while creating the internal client socket.",
292 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
293 errno);
294 #else
295 WSAGetLastError());
296 #endif
297 throw Exception(parms);
298 }
299
300 // setup the address of the client
301 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
302 #ifdef PEGASUS_OS_ZOS
303 _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
304 #else
305 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
306 a.arora 1.73 #pragma convert(37)
307 #endif
308 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
309 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
310 #pragma convert(0)
311 #endif
312 #endif
313 _tickle_client_addr.sin_family = PF_INET;
314 _tickle_client_addr.sin_port = 0;
315
316 // bind socket to client side
317 if((::bind(_tickle_client_socket,
318 (struct sockaddr*)&_tickle_client_addr,
319 sizeof(_tickle_client_addr))) < 0){
320 // handle error
321 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
322 "Received error number $0 while binding the internal client socket.",
323 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
324 errno);
325 #else
326 WSAGetLastError());
327 a.arora 1.73 #endif
328 throw Exception(parms);
329 }
330
331 // connect to server side
332 if((::connect(_tickle_client_socket,
333 (struct sockaddr*)&_tickle_server_addr,
334 sizeof(_tickle_server_addr))) < 0){
335 // handle error
336 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
337 "Received error number $0 while connecting the internal client socket.",
338 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
339 errno);
340 #else
341 WSAGetLastError());
342 #endif
343 throw Exception(parms);
344 }
345
346 /* set up the slave connection */
347 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
348 a.arora 1.73 PEGASUS_SOCKLEN_SIZE peer_size = sizeof(_tickle_peer_addr);
349 pegasus_sleep(1);
350
351 // this call may fail, we will try a max of 20 times to establish this peer connection
352 if((_tickle_peer_socket = ::accept(_tickle_server_socket,
353 (struct sockaddr*)&_tickle_peer_addr,
354 &peer_size)) < 0){
355 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
356 // Only retry on non-windows platforms.
357 if(_tickle_peer_socket == -1 && errno == EAGAIN)
358 {
359 int retries = 0;
360 do
361 {
362 pegasus_sleep(1);
363 _tickle_peer_socket = ::accept(_tickle_server_socket,
364 (struct sockaddr*)&_tickle_peer_addr,
365 &peer_size);
366 retries++;
367 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
368 }
369 a.arora 1.73 #endif
370 }
371 if(_tickle_peer_socket == -1){
372 // handle error
373 MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
374 "Received error number $0 while accepting the internal socket connection.",
375 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
376 errno);
377 #else
378 WSAGetLastError());
379 #endif
380 throw Exception(parms);
381 }
382 // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
383 // checks entries with IDLE state for events
384 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
385 entry._status = _MonitorEntry::IDLE;
386 _entries.append(entry);
387 }
388
389 void Monitor::tickle(void)
390 a.arora 1.73 {
|
391 sushma.fernandes 1.78 static char _buffer[] =
|
392 a.arora 1.73 {
393 '0','0'
394 };
395
|
396 sushma.fernandes 1.78 AutoMutex autoMutex(_tickle_mutex);
397 Socket::disableBlocking(_tickle_client_socket);
398 Socket::write(_tickle_client_socket,&_buffer, 2);
399 Socket::enableBlocking(_tickle_client_socket);
400 }
401
402 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
403 {
404 // Set the state to requested state
405 _entries[index]._status = status;
|
406 a.arora 1.73 }
407
|
408 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
409 {
|
410 mday 1.18
|
411 mday 1.25 Boolean handled_events = false;
|
412 a.arora 1.73 int i = 0;
413
|
414 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
415 a.arora 1.73
|
416 mday 1.25 fd_set fdread;
417 FD_ZERO(&fdread);
|
418 a.arora 1.73
|
419 mday 1.37 _entry_mut.lock(pegasus_thread_self());
|
420 mday 1.13
|
421 kumpf 1.48 // Check the stopConnections flag. If set, clear the Acceptor monitor entries
422 if (_stopConnections == 1)
423 {
424 for ( int indx = 0; indx < (int)_entries.size(); indx++)
425 {
426 if (_entries[indx]._type == Monitor::ACCEPTOR)
427 {
428 if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)
429 {
430 if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||
431 _entries[indx]._status.value() == _MonitorEntry::DYING )
432 {
433 // remove the entry
434 _entries[indx]._status = _MonitorEntry::EMPTY;
435 }
436 else
437 {
438 // set status to DYING
|
439 kumpf 1.52 _entries[indx]._status = _MonitorEntry::DYING;
|
440 kumpf 1.48 }
441 }
442 }
443 }
444 _stopConnections = 0;
|
445 a.arora 1.73 _stopConnectionsSem.signal();
|
446 kumpf 1.48 }
|
447 kumpf 1.51
|
448 kumpf 1.68 for( int indx = 0; indx < (int)_entries.size(); indx++)
449 {
450 if ((_entries[indx]._status.value() == _MonitorEntry::DYING) &&
451 (_entries[indx]._type == Monitor::CONNECTION))
452 {
453 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
454 PEGASUS_ASSERT(q != 0);
455 MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
456 Message* message= new CloseConnectionMessage(_entries[indx].socket);
457 message->dest = o.getQueueId();
458
459 // HTTPAcceptor is responsible for closing the connection.
460 // The lock is released to allow HTTPAcceptor to call
461 // unsolicitSocketMessages to free the entry.
462 // Once HTTPAcceptor completes processing of the close
463 // connection, the lock is re-requested and processing of
464 // the for loop continues. This is safe with the current
465 // implementation of the _entries object. Note that the
466 // loop condition accesses the _entries.size() on each
467 // iteration, so that a change in size while the mutex is
468 // unlocked will not result in an ArrayIndexOutOfBounds
469 kumpf 1.68 // exception.
470
471 _entry_mut.unlock();
472 o.enqueue(message);
473 _entry_mut.lock(pegasus_thread_self());
474 }
475 }
476
|
477 kumpf 1.51 Uint32 _idleEntries = 0;
|
478 a.arora 1.73
479 /*
480 We will keep track of the maximum socket number and pass this value
481 to the kernel as a parameter to SELECT. This loop seems like a good
482 place to calculate the max file descriptor (maximum socket number)
483 because we have to traverse the entire array.
484 */
485 int maxSocketCurrentPass = 0;
|
486 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
|
487 mike 1.2 {
|
488 a.arora 1.73 if(maxSocketCurrentPass < _entries[indx].socket)
489 maxSocketCurrentPass = _entries[indx].socket;
490
|
491 mday 1.37 if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
|
492 mday 1.25 {
|
493 kumpf 1.51 _idleEntries++;
|
494 mday 1.25 FD_SET(_entries[indx].socket, &fdread);
495 }
|
496 mday 1.13 }
|
497 s.hills 1.62
|
498 a.arora 1.73 /*
499 Add 1 then assign maxSocket accordingly. We add 1 to account for
500 descriptors starting at 0.
501 */
502 maxSocketCurrentPass++;
503
|
504 kumpf 1.51 _entry_mut.unlock();
|
505 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
506 kumpf 1.51 _entry_mut.lock(pegasus_thread_self());
|
507 mday 1.25
|
508 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
509 kumpf 1.50 if(events == SOCKET_ERROR)
|
510 mike 1.2 #else
|
511 kumpf 1.50 if(events == -1)
|
512 mike 1.2 #endif
|
513 mday 1.13 {
|
514 kumpf 1.50 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
515 "Monitor::run - errorno = %d has occurred on select.", errno);
516 // The EBADF error indicates that one or more or the file
517 // descriptions was not valid. This could indicate that
518 // the _entries structure has been corrupted or that
519 // we have a synchronization error.
520
521 PEGASUS_ASSERT(errno != EBADF);
522 }
523 else if (events)
524 {
|
525 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
526 "Monitor::run select event received events = %d, monitoring %d idle entries",
527 events, _idleEntries);
|
528 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
529 {
|
530 kumpf 1.53 // The Monitor should only look at entries in the table that are IDLE (i.e.,
531 // owned by the Monitor).
532 if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&
533 (FD_ISSET(_entries[indx].socket, &fdread)))
|
534 mday 1.25 {
535 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
|
536 kumpf 1.53 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
537 "Monitor::run indx = %d, queueId = %d, q = %p",
538 indx, _entries[indx].queueId, q);
539 PEGASUS_ASSERT(q !=0);
|
540 mday 1.37
541 try
|
542 mday 1.25 {
|
543 mday 1.37 if(_entries[indx]._type == Monitor::CONNECTION)
544 {
|
545 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
546 "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
|
547 mday 1.37 static_cast<HTTPConnection *>(q)->_entry_index = indx;
|
548 sushma.fernandes 1.78
549 // Do not update the entry just yet. The entry gets updated once
550 // the request has been read.
551 //_entries[indx]._status = _MonitorEntry::BUSY;
552
|
553 kumpf 1.66 // If allocate_and_awaken failure, retry on next iteration
|
554 a.arora 1.73 /* Removed for PEP 183.
|
555 kumpf 1.69 if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
556 (void *)q, _dispatch))
|
557 kumpf 1.67 {
558 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
559 "Monitor::run: Insufficient resources to process request.");
560 _entries[indx]._status = _MonitorEntry::IDLE;
561 _entry_mut.unlock();
562 return true;
563 }
|
564 a.arora 1.73 */
565 // Added for PEP 183
566 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
567 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
568 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
569 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
570 try
571 {
572 dst->run(1);
573 }
574 catch (...)
575 {
576 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
577 "Monitor::_dispatch: exception received");
578 }
579 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
580 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
581
|
582 sushma.fernandes 1.78 // It is possible the entry status may not be set to busy.
583 // The following will fail in that case.
584 // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
|
585 a.arora 1.73 // Once the HTTPConnection thread has set the status value to either
586 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
587 // to the Monitor. It is no longer permissible to access the connection
588 // or the entry in the _entries table.
|
589 sushma.fernandes 1.78
590 // The following is not relevant as the worker thread or the
591 // reader thread will update the status of the entry.
592 //if (dst->_connectionClosePending)
593 //{
594 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
595 //}
596 //else
597 //{
598 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
599 //}
600 // end Added for PEP 183
|
601 a.arora 1.73 }
602 else if( _entries[indx]._type == Monitor::INTERNAL){
603 // set ourself to BUSY,
604 // read the data
605 // and set ourself back to IDLE
606
607 _entries[indx]._status == _MonitorEntry::BUSY;
608 static char buffer[2];
609 Socket::disableBlocking(_entries[indx].socket);
610 Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
611 Socket::enableBlocking(_entries[indx].socket);
612 _entries[indx]._status == _MonitorEntry::IDLE;
|
613 mday 1.37 }
614 else
|
615 mday 1.25 {
|
616 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
617 "Non-connection entry, indx = %d, has been received.", indx);
|
618 mday 1.37 int events = 0;
619 events |= SocketMessage::READ;
620 Message *msg = new SocketMessage(_entries[indx].socket, events);
621 _entries[indx]._status = _MonitorEntry::BUSY;
622 _entry_mut.unlock();
|
623 mday 1.27
|
624 mday 1.37 q->enqueue(msg);
625 _entries[indx]._status = _MonitorEntry::IDLE;
|
626 mday 1.25 return true;
627 }
628 }
|
629 mday 1.37 catch(...)
|
630 mday 1.25 {
631 }
632 handled_events = true;
633 }
634 }
|
635 mday 1.24 }
|
636 mday 1.37 _entry_mut.unlock();
|
637 mday 1.13 return(handled_events);
|
638 mike 1.2 }
639
|
640 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
641 kumpf 1.48 {
642 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
643 a.arora 1.73 // set boolean then tickle the server to recognize _stopConnections
|
644 kumpf 1.48 _stopConnections = 1;
|
645 a.arora 1.73 tickle();
|
646 kumpf 1.48
|
647 chuck 1.74 if (wait)
|
648 a.arora 1.73 {
|
649 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
650 // caller of this function may unbind the ports while the monitor
651 // is still accepting connections on them.
652 try
653 {
654 _stopConnectionsSem.time_wait(10000);
655 }
656 catch (TimeOut &)
657 {
658 // The monitor is probably busy processng a very long request, and is
659 // not accepting connections. Let the caller unbind the ports.
660 }
|
661 a.arora 1.73 }
662
|
663 kumpf 1.48 PEG_METHOD_EXIT();
664 }
|
665 mday 1.25
|
666 mday 1.37
|
667 mday 1.25 int Monitor::solicitSocketMessages(
|
668 mike 1.2 Sint32 socket,
669 Uint32 events,
|
670 mday 1.8 Uint32 queueId,
671 int type)
|
672 mike 1.2 {
|
673 a.arora 1.73 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
674 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
675 a.arora 1.73 // Check to see if we need to dynamically grow the _entries array
676 // We always want the _entries array to 2 bigger than the
677 // current connections requested
678 _solicitSocketCount++; // bump the count
679 int size = (int)_entries.size();
680 if(_solicitSocketCount >= (size-1)){
681 for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){
682 _MonitorEntry entry(0, 0, 0);
683 _entries.append(entry);
684 }
685 }
|
686 kumpf 1.4
|
687 a.arora 1.73 int index;
688 for(index = 1; index < (int)_entries.size(); index++)
|
689 mday 1.25 {
|
690 a.arora 1.73 try
|
691 mday 1.37 {
|
692 a.arora 1.73 if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
693 {
694 _entries[index].socket = socket;
695 _entries[index].queueId = queueId;
696 _entries[index]._type = type;
697 _entries[index]._status = _MonitorEntry::IDLE;
|
698 alagaraja 1.75
|
699 a.arora 1.73 return index;
700 }
|
701 mday 1.37 }
702 catch(...)
|
703 mday 1.25 {
704 }
705 }
|
706 a.arora 1.73 _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful
|
707 mday 1.25 PEG_METHOD_EXIT();
|
708 kumpf 1.50 return -1;
|
709 a.arora 1.73
|
710 mike 1.2 }
711
|
712 mday 1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
|
713 mike 1.2 {
|
714 kumpf 1.50
|
715 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
716 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
717 a.arora 1.73
718 /*
719 Start at index = 1 because _entries[0] is the tickle entry which never needs
720 to be EMPTY;
721 */
722 int index;
723 for(index = 1; index < _entries.size(); index++)
|
724 mike 1.2 {
|
725 mday 1.25 if(_entries[index].socket == socket)
726 {
|
727 a.arora 1.73 _entries[index]._status = _MonitorEntry::EMPTY;
728 _entries[index].socket = -1;
729 _solicitSocketCount--;
730 break;
|
731 mday 1.25 }
|
732 mike 1.2 }
|
733 a.arora 1.73
734 /*
735 Dynamic Contraction:
736 To remove excess entries we will start from the end of the _entries array
737 and remove all entries with EMPTY status until we find the first NON EMPTY.
738 This prevents the positions, of the NON EMPTY entries, from being changed.
739 */
740 index = _entries.size() - 1;
741 while(_entries[index]._status == _MonitorEntry::EMPTY){
742 if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
743 _entries.remove(index);
744 index--;
745 }
746
|
747 kumpf 1.4 PEG_METHOD_EXIT();
|
748 mike 1.2 }
749
|
750 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
751 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
752 {
|
753 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
754 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
755 kumpf 1.53 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
756 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
757 kumpf 1.51 try
758 {
759 dst->run(1);
760 }
761 catch (...)
762 {
763 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
764 "Monitor::_dispatch: exception received");
765 }
766 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
767 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
768
|
769 kumpf 1.53 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
|
770 kumpf 1.68
771 // Once the HTTPConnection thread has set the status value to either
772 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
773 // to the Monitor. It is no longer permissible to access the connection
774 // or the entry in the _entries table.
|
775 kumpf 1.50 if (dst->_connectionClosePending)
776 {
|
777 kumpf 1.68 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
778 }
779 else
780 {
781 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
782 kumpf 1.50 }
|
783 mday 1.8 return 0;
|
784 mday 1.40 }
785
786
787
788 ////************************* monitor 2 *****************************////
|
789 mday 1.43 ////************************* monitor 2 *****************************////
790 ////************************* monitor 2 *****************************////
791 ////************************* monitor 2 *****************************////
792 ////************************* monitor 2 *****************************////
793 ////************************* monitor 2 *****************************////
794 ////************************* monitor 2 *****************************////
|
795 mday 1.40
796
|
797 mday 1.56
798
799
|
800 mday 1.42 m2e_rep::m2e_rep(void)
|
801 mday 1.43 :Base(), state(IDLE)
802
|
803 mday 1.42 {
804 }
805
806 m2e_rep::m2e_rep(monitor_2_entry_type _type,
807 pegasus_socket _sock,
808 void* _accept,
809 void* _dispatch)
|
810 mday 1.43 : Base(), type(_type), state(IDLE), psock(_sock),
|
811 mday 1.42 accept_parm(_accept), dispatch_parm(_dispatch)
812 {
813
814 }
815
816 m2e_rep::~m2e_rep(void)
817 {
818 }
819
820 m2e_rep::m2e_rep(const m2e_rep& r)
821 : Base()
822 {
823 if(this != &r){
824 type = r.type;
825 psock = r.psock;
826 accept_parm = r.accept_parm;
827 dispatch_parm = r.dispatch_parm;
|
828 mday 1.43 state = IDLE;
829
|
830 mday 1.42 }
831 }
832
833
834 m2e_rep& m2e_rep::operator =(const m2e_rep& r)
835 {
836 if(this != &r) {
837 type = r.type;
838 psock = r.psock;
839 accept_parm = r.accept_parm;
840 dispatch_parm = r.dispatch_parm;
|
841 mday 1.43 state = IDLE;
|
842 mday 1.42 }
843 return *this;
844 }
845
846 Boolean m2e_rep::operator ==(const m2e_rep& r)
847 {
848 if(this == &r)
849 return true;
850 return false;
851 }
852
853 Boolean m2e_rep::operator ==(void* r)
854 {
855 if((void*)this == r)
856 return true;
857 return false;
858 }
859
860 m2e_rep::operator pegasus_socket() const
861 {
862 return psock;
863 mday 1.42 }
864
865
|
866 mday 1.40 monitor_2_entry::monitor_2_entry(void)
867 {
|
868 mday 1.42 _rep = new m2e_rep();
|
869 mday 1.40 }
870
|
871 mday 1.42 monitor_2_entry::monitor_2_entry(pegasus_socket& _psock,
872 monitor_2_entry_type _type,
873 void* _accept_parm, void* _dispatch_parm)
|
874 mday 1.40 {
|
875 mday 1.42 _rep = new m2e_rep(_type, _psock, _accept_parm, _dispatch_parm);
|
876 mday 1.40 }
877
878 monitor_2_entry::monitor_2_entry(const monitor_2_entry& e)
879 {
880 if(this != &e){
|
881 mday 1.42 Inc(this->_rep = e._rep);
|
882 mday 1.40 }
883 }
884
885 monitor_2_entry::~monitor_2_entry(void)
886 {
|
887 a.dunfey 1.76
|
888 mday 1.42 Dec(_rep);
|
889 mday 1.40 }
890
891 monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e)
892 {
893 if(this != &e){
|
894 mday 1.42 Dec(_rep);
895 Inc(this->_rep = e._rep);
|
896 mday 1.40 }
897 return *this;
898 }
899
|
900 mday 1.42 Boolean monitor_2_entry::operator ==(const monitor_2_entry& me) const
|
901 mday 1.40 {
902 if(this == &me)
903 return true;
904 return false;
905 }
906
|
907 mday 1.42 Boolean monitor_2_entry::operator ==(void* k) const
|
908 mday 1.40 {
909 if((void *)this == k)
910 return true;
911 return false;
912 }
913
914
|
915 mday 1.42 monitor_2_entry_type monitor_2_entry::get_type(void) const
|
916 mday 1.40 {
|
917 mday 1.42 return _rep->type;
918 }
919
920 void monitor_2_entry::set_type(monitor_2_entry_type t)
921 {
922 _rep->type = t;
923 }
924
925
|
926 mday 1.43 monitor_2_entry_state monitor_2_entry::get_state(void) const
927 {
928 return (monitor_2_entry_state) _rep->state.value();
929 }
930
931 void monitor_2_entry::set_state(monitor_2_entry_state t)
932 {
933 _rep->state = t;
934 }
935
|
936 mday 1.42 void* monitor_2_entry::get_accept(void) const
937 {
938 return _rep->accept_parm;
939 }
940
941 void monitor_2_entry::set_accept(void* a)
942 {
943 _rep->accept_parm = a;
944 }
945
946
947 void* monitor_2_entry::get_dispatch(void) const
948 {
949 return _rep->dispatch_parm;
950 }
951
952 void monitor_2_entry::set_dispatch(void* a)
953 {
954 _rep->dispatch_parm = a;
955 }
956
957 mday 1.42 pegasus_socket monitor_2_entry::get_sock(void) const
958 {
959 return _rep->psock;
960 }
961
962
963 void monitor_2_entry::set_sock(pegasus_socket& s)
964 {
965 _rep->psock = s;
966
|
967 mday 1.40 }
968
|
969 mday 1.59 //static monitor_2* _m2_instance;
|
970 mday 1.40
|
971 mday 1.49 AsyncDQueue<HTTPConnection2> monitor_2::_connections(true, 0);
972
|
973 mday 1.40 monitor_2::monitor_2(void)
|
974 mday 1.42 : _session_dispatch(0), _accept_dispatch(0), _listeners(true, 0),
|
975 mday 1.49 _ready(true, 0), _die(0), _requestCount(0)
|
976 mday 1.40 {
977 try {
978
979 bsd_socket_factory _factory;
980
981 // set up the listener/acceptor
982 pegasus_socket temp = pegasus_socket(&_factory);
983
984 temp.socket(PF_INET, SOCK_STREAM, 0);
985 // initialize the address
986 memset(&_tickle_addr, 0, sizeof(_tickle_addr));
|
987 marek 1.47 #ifdef PEGASUS_OS_ZOS
988 _tickle_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
989 #else
|
990 chuck 1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
991 #pragma convert(37)
992 #endif
|
993 mday 1.40 _tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
|
994 chuck 1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
995 #pragma convert(0)
996 #endif
|
997 marek 1.47 #endif
|
998 mday 1.40 _tickle_addr.sin_family = PF_INET;
999 _tickle_addr.sin_port = 0;
1000
1001 PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr);
1002
1003 temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr));
1004 temp.listen(3);
1005 temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size);
1006
1007 // set up the connector
1008
1009 pegasus_socket tickler = pegasus_socket(&_factory);
1010 tickler.socket(PF_INET, SOCK_STREAM, 0);
1011 struct sockaddr_in _addr;
1012 memset(&_addr, 0, sizeof(_addr));
|
1013 kumpf 1.48 #ifdef PEGASUS_OS_ZOS
|
1014 marek 1.47 _addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
1015 #else
|
1016 mday 1.40 _addr.sin_addr.s_addr = inet_addr("127.0.0.1");
|
1017 marek 1.47 #endif
|
1018 mday 1.40 _addr.sin_family = PF_INET;
1019 _addr.sin_port = 0;
1020 tickler.bind((struct sockaddr*)&_addr, sizeof(_addr));
1021 tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr));
1022
|
1023 mday 1.42 _tickler.set_sock(tickler);
1024 _tickler.set_type(INTERNAL);
|
1025 mday 1.43 _tickler.set_state(BUSY);
1026
|
1027 mday 1.40 struct sockaddr_in peer;
1028 memset(&peer, 0, sizeof(peer));
1029 PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
1030
1031 pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size);
|
1032 mday 1.57
|
1033 mday 1.42 monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL, 0, 0);
|
1034 a.arora 1.71
1035 // No need to set _tickle's state as BUSY, since monitor_2::run() now
1036 // does a select only on sockets which are in IDLE (default) state.
1037 // _tickle->set_state(BUSY);
|
1038 mday 1.43
|
1039 mday 1.40 _listeners.insert_first(_tickle);
1040
1041 }
1042 catch(...){ }
1043 }
1044
1045 monitor_2::~monitor_2(void)
1046 {
|
1047 mday 1.60
1048 stop();
1049
|
1050 mday 1.41 try {
1051 monitor_2_entry* temp = _listeners.remove_first();
1052 while(temp){
1053 delete temp;
1054 temp = _listeners.remove_first();
1055 }
1056 }
|
1057 mday 1.60
|
1058 mday 1.41 catch(...){ }
|
1059 mday 1.60
1060
1061 try
1062 {
1063 HTTPConnection2* temp = _connections.remove_first();
1064 while(temp)
1065 {
1066 delete temp;
1067 temp = _connections.remove_first();
1068 }
1069 }
1070 catch(...)
1071 {
1072 }
1073
1074
|
1075 mday 1.40 }
1076
1077
1078 void monitor_2::run(void)
1079 {
1080 monitor_2_entry* temp;
|
1081 a.arora 1.71 int _nonIdle=0, _idleCount=0, events;
1082
|
1083 mday 1.40 while(_die.value() == 0) {
|
1084 a.arora 1.71 _nonIdle=_idleCount=0;
|
1085 mday 1.49
|
1086 mday 1.57 struct timeval tv_idle = { 60, 0 };
|
1087 mday 1.56
|
1088 mday 1.40 // place all sockets in the select set
1089 FD_ZERO(&rd_fd_set);
1090 try {
1091 _listeners.lock(pegasus_thread_self());
1092 temp = _listeners.next(0);
|
1093 a.arora 1.71 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1094 "monitor_2::run:Creating New FD list for SELECT.");
|
1095 mday 1.40 while(temp != 0 ){
|
1096 mday 1.57 if(temp->get_state() == CLOSED ) {
|
1097 mday 1.43 monitor_2_entry* closed = temp;
|
1098 a.arora 1.72 temp = _listeners.next(closed);
|
1099 mday 1.43 _listeners.remove_no_lock(closed);
|
1100 a.arora 1.71
1101 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1102 "monitor_2::run:Deleteing CLOSED socket fd=%d.",(Sint32)closed->get_sock());
|
1103 mday 1.60
|
1104 mday 1.49 HTTPConnection2 *cn = monitor_2::remove_connection((Sint32)(closed->get_sock()));
1105 delete cn;
|
1106 mday 1.43 delete closed;
1107 }
|
1108 mday 1.45 if(temp == 0)
1109 break;
|
1110 a.arora 1.71
1111
1112 //Count the number if IDLE sockets
1113 if(temp->get_state() != IDLE ) _nonIdle++;
1114 else _idleCount++;
1115
|
1116 mday 1.46 Sint32 fd = (Sint32) temp->get_sock();
|
1117 a.arora 1.71
1118 //Select should be called ONLY on the FDs which are in IDLE state
1119 if((fd >= 0) && (temp->get_state() == IDLE))
1120 {
1121 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1122 "monitor_2::run:Adding FD %d to the list for SELECT.",fd);
1123 FD_SET(fd , &rd_fd_set);
1124 }
1125 temp = _listeners.next(temp);
|
1126 mday 1.40 }
1127 _listeners.unlock();
1128 }
1129 catch(...){
1130 return;
1131 }
|
1132 a.arora 1.71
|
1133 mday 1.42 // important - the dispatch routine has pointers to all the
1134 // entries that are readable. These entries can be changed but
1135 // the pointer must not be tampered with.
|
1136 mday 1.56 if(_connections.count() )
|
1137 a.arora 1.71 events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL);
|
1138 mday 1.56 else
|
1139 a.arora 1.71 events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, &tv_idle);
|
1140 mday 1.57
1141 if(_die.value())
1142 {
1143 break;
1144 }
|
1145 a.arora 1.71
1146 #ifdef PEGASUS_OS_TYPE_WINDOWS
1147 if(events == SOCKET_ERROR)
1148 #else
1149 if(events == -1)
1150 #endif
1151 {
1152 Tracer::trace(TRC_HTTP, Tracer::LEVEL2,
1153 "monitor_2:run:INVALID FD. errorno = %d on select.", errno);
1154 // The EBADF error indicates that one or more or the file
1155 // descriptions was not valid. This could indicate that
1156 // the _entries structure has been corrupted or that
1157 // we have a synchronization error.
1158
1159 // Keeping the line below commented for time being.
1160 // PEGASUS_ASSERT(errno != EBADF);
1161 }
1162 else if (events)
1163 {
1164 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1165 "monitor_2::run select event received events = %d, monitoring %d idle entries", events, _idleCount);
1166 a.arora 1.71
|
1167 mday 1.57
|
1168 mday 1.40 try {
1169 _listeners.lock(pegasus_thread_self());
1170 temp = _listeners.next(0);
1171 while(temp != 0 ){
|
1172 a.arora 1.72 Sint32 fd = (Sint32) temp->get_sock();
1173 if(fd >= 0 && FD_ISSET(fd, &rd_fd_set)) {
1174 if(temp->get_type() != CLIENTSESSION) temp->set_state(BUSY);
|
1175 mday 1.42 FD_CLR(fd, &rd_fd_set);
1176 monitor_2_entry* ready = new monitor_2_entry(*temp);
|
1177 mday 1.49 try
1178 {
1179 _ready.insert_first(ready);
1180 }
1181 catch(...)
1182 {
1183 }
1184
|
1185 mday 1.42 _requestCount++;
|
1186 mday 1.40 }
1187 temp = _listeners.next(temp);
1188 }
1189 _listeners.unlock();
1190 }
1191 catch(...){
1192 return;
1193 }
1194 // now handle the sockets that are ready to read
|
1195 mday 1.56 if(_ready.count())
1196 _dispatch();
1197 else
1198 {
1199 if(_connections.count() == 0 )
1200 _idle_dispatch(_idle_parm);
1201 }
|
1202 a.arora 1.71 } // if events
|
1203 mday 1.40 } // while alive
|
1204 a.arora 1.72 _die=0;
|
1205 mday 1.57
|
1206 mday 1.40 }
1207
|
1208 dj.gorey 1.70 int monitor_2::solicitSocketMessages(
1209 Sint32 socket,
1210 Uint32 events,
1211 Uint32 queueId,
1212 int type)
1213 {
1214
|
1215 a.arora 1.71 PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::solicitSocketMessages");
|
1216 dj.gorey 1.70
|
1217 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
1218 dj.gorey 1.70
1219 for(int index = 0; index < (int)_entries.size(); index++)
1220 {
1221 try
1222 {
1223 if(_entries[index]._status.value() == monitor_2_entry::EMPTY)
1224 {
1225 _entries[index].socket = socket;
1226 //_entries[index].queueId = queueId;
1227 //_entries[index]._type = type;
|
1228 a.arora 1.71 _entries[index]._status = IDLE;
|
1229 dj.gorey 1.70
1230 return index;
1231 }
1232 }
1233 catch(...)
1234 {
1235 }
1236
1237 }
1238 PEG_METHOD_EXIT();
1239 return -1;
1240 }
1241
1242
1243 void monitor_2::unsolicitSocketMessages(Sint32 socket)
1244 {
1245
1246 PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::unsolicitSocketMessages");
|
1247 alagaraja 1.75 AutoMutex autoMut(_entry2_mut);
|
1248 dj.gorey 1.70
1249 for(int index = 0; index < (int)_entries2.size(); index++)
1250 {
1251 if(_entries2[index].socket == socket)
1252 {
1253 _entries2[index]._status = monitor_2_entry::EMPTY;
1254 _entries2[index].socket = -1;
1255 break;
1256 }
1257 }
1258 PEG_METHOD_EXIT();
1259 }
1260
|
1261 mday 1.42 void* monitor_2::set_session_dispatch(void (*dp)(monitor_2_entry*))
|
1262 mday 1.40 {
|
1263 mday 1.42 void* old = (void *)_session_dispatch;
|
1264 mday 1.40 _session_dispatch = dp;
1265 return old;
1266 }
1267
|
1268 mday 1.42 void* monitor_2::set_accept_dispatch(void (*dp)(monitor_2_entry*))
1269 {
1270 void* old = (void*)_accept_dispatch;
1271 _accept_dispatch = dp;
1272 return old;
|
1273 mday 1.56 }
1274
1275 void* monitor_2::set_idle_dispatch(void (*dp)(void*))
1276 {
1277 void* old = (void*)_idle_dispatch;
1278 _idle_dispatch = dp;
1279 return old;
1280 }
1281
1282 void* monitor_2::set_idle_parm(void* parm)
1283 {
1284 void* old = _idle_parm;
1285 _idle_parm = parm;
1286 return old;
|
1287 mday 1.42 }
1288
|
1289 mday 1.40
|
1290 mday 1.60
1291 //-----------------------------------------------------------------
1292 // Note on deleting the monitor_2_entry nodes:
1293 // Each case: in the switch statement needs to handle the deletion
1294 // of the monitor_2_entry * node differently. A SESSION dispatch
1295 // routine MUST DELETE the entry during its dispatch handling.
1296 // All other dispatch routines MUST NOT delete the entry during the
1297 // dispatch handling, but must allow monitor_2::_dispatch to delete
1298 // the entry.
1299 //
1300 // The reason is pretty obscure and it is debatable whether or not
1301 // to even bother, but during cimserver shutdown the single monitor_2_entry*
1302 // will leak unless the _session_dispatch routine takes care of deleting it.
1303 //
1304 // The reason is that a shutdown messages completely stops everything and
1305 // the _session_dispatch routine never returns. So monitor_2::_dispatch is
1306 // never able to do its own cleanup.
1307 //
1308 // << Mon Oct 13 09:33:33 2003 mdd >>
1309 //-----------------------------------------------------------------
1310
|
1311 mday 1.40 void monitor_2::_dispatch(void)
1312 {
|
1313 mday 1.49 monitor_2_entry* entry;
1314
1315 try
1316 {
1317
1318 entry = _ready.remove_first();
1319 }
1320 catch(...)
1321 {
1322 }
1323
1324 while(entry != 0 ) {
|
1325 mday 1.42 switch(entry->get_type()) {
|
1326 mday 1.40 case INTERNAL:
1327 static char buffer[2];
|
1328 mday 1.49 entry->get_sock().disableBlocking();
|
1329 mday 1.42 entry->get_sock().read(&buffer, 2);
|
1330 mday 1.49 entry->get_sock().enableBlocking();
|
1331 a.arora 1.71 entry->set_state(IDLE); // Set state of the socket to IDLE so that
1332 // monitor_2::run can add to the list of FDs
1333 // on which select would be called.
1334
1335
1336
|
1337 mday 1.60 delete entry;
1338
|
1339 mday 1.40 break;
1340 case LISTEN:
1341 {
1342 static struct sockaddr peer;
1343 static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
|
1344 mday 1.49 entry->get_sock().disableBlocking();
|
1345 mday 1.42 pegasus_socket connected = entry->get_sock().accept(&peer, &peer_size);
|
1346 a.arora 1.71 entry->set_state(IDLE); // Set state of the LISTEN socket to IDLE
|
1347 mday 1.65 #ifdef PEGASUS_OS_TYPE_WINDOWS
1348 if((Sint32)connected == SOCKET_ERROR)
1349 #else
1350 if((Sint32)connected == -1 )
1351 #endif
1352 {
1353 delete entry;
1354 break;
1355 }
1356
|
1357 mday 1.49 entry->get_sock().enableBlocking();
|
1358 mday 1.42 monitor_2_entry *temp = add_entry(connected, SESSION, entry->get_accept(), entry->get_dispatch());
1359 if(temp && _accept_dispatch != 0)
|
1360 mday 1.49 _accept_dispatch(temp);
|
1361 mday 1.60 delete entry;
1362
|
1363 mday 1.40 }
1364 break;
1365 case SESSION:
|
1366 a.arora 1.72 case CLIENTSESSION:
|
1367 mday 1.60 if(_session_dispatch != 0 )
1368 {
1369 // NOTE: _session_dispatch will delete entry - do not do it here
|
1370 a.arora 1.72 unsigned client=0;
1371 if(entry->get_type() == CLIENTSESSION) client = 1;
1372 Sint32 sock=(Sint32)(entry->get_sock());
1373
1374 _session_dispatch(entry);
1375
1376 if(client)
1377 {
1378 HTTPConnection2 *cn = monitor_2::remove_connection(sock);
1379 if(cn) delete cn;
1380 // stop();
1381 _die=1;
1382 }
|
1383 mday 1.60 }
1384
|
1385 mday 1.40 else {
1386 static char buffer[4096];
|
1387 mday 1.42 int bytes = entry->get_sock().read(&buffer, 4096);
|
1388 mday 1.60 delete entry;
|
1389 mday 1.40 }
1390
1391 break;
1392 case UNTYPED:
1393 default:
|
1394 mday 1.60 delete entry;
|
1395 mday 1.40 break;
1396 }
|
1397 mday 1.42 _requestCount--;
|
1398 mday 1.49
1399 if(_ready.count() == 0 )
1400 break;
1401
1402 try
1403 {
1404 entry = _ready.remove_first();
1405 }
1406 catch(...)
1407 {
1408 }
1409
|
1410 mday 1.40 }
1411 }
1412
1413 void monitor_2::stop(void)
1414 {
1415 _die = 1;
1416 tickle();
1417 // shut down the listener list, free the list nodes
|
1418 mday 1.42 _tickler.get_sock().close();
|
1419 mday 1.40 _listeners.shutdown_queue();
1420 }
1421
1422 void monitor_2::tickle(void)
1423 {
1424 static char _buffer[] =
1425 {
1426 '0','0'
1427 };
1428
|
1429 mday 1.57 _tickler.get_sock().disableBlocking();
1430
|
1431 mday 1.42 _tickler.get_sock().write(&_buffer, 2);
|
1432 mday 1.57 _tickler.get_sock().enableBlocking();
1433
|
1434 mday 1.40 }
1435
1436
|
1437 mday 1.42 monitor_2_entry* monitor_2::add_entry(pegasus_socket& ps,
1438 monitor_2_entry_type type,
1439 void* accept_parm,
1440 void* dispatch_parm)
|
1441 mday 1.40 {
|
1442 a.arora 1.71 Sint32 fd1,fd2;
1443
1444 fd2=(Sint32) ps;
1445
|
1446 mday 1.42 monitor_2_entry* m2e = new monitor_2_entry(ps, type, accept_parm, dispatch_parm);
|
1447 a.arora 1.71
1448 // The purpose of the following piece of code is to avoid duplicate entries in
1449 // the _listeners list. Would it be too much of an overhead ?
1450 try {
1451
1452 monitor_2_entry* temp;
1453
1454 _listeners.lock(pegasus_thread_self());
1455 temp = _listeners.next(0);
1456 while(temp != 0 )
1457 {
1458 fd1=(Sint32) temp->get_sock();
1459
1460 if(fd1 == fd2)
1461 {
1462
1463 Tracer::trace(TRC_HTTP, Tracer::LEVEL3,
1464 "monitor_2::add_entry:Request for duplicate entry in _listeners for %d FD.", fd1);
1465 if(temp->get_state() == CLOSED)
1466 {
1467 temp->set_state(IDLE);
1468 a.arora 1.71 Tracer::trace(TRC_HTTP, Tracer::LEVEL3,
1469 "monitor_2::add_entry:CLOSED state changed to IDLE for %d.", fd1);
1470 }
1471 _listeners.unlock();
1472 delete m2e;
1473 return 0;
1474 }
1475 temp = _listeners.next(temp);
1476 }
1477 }
1478 catch(...)
1479 {
1480 delete m2e;
1481 return 0;
1482 }
1483
1484
1485 _listeners.unlock();
1486
1487
|
1488 mday 1.40 try{
1489 _listeners.insert_first(m2e);
1490 }
1491 catch(...){
1492 delete m2e;
|
1493 mday 1.42 return 0;
|
1494 mday 1.40 }
|
1495 a.arora 1.71 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1496 "monitor_2::add_entry:SUCCESSFULLY added to _listeners list. FD = %d.", fd2);
|
1497 mday 1.40 tickle();
|
1498 mday 1.42 return m2e;
|
1499 mday 1.40 }
1500
1501 Boolean monitor_2::remove_entry(Sint32 s)
1502 {
1503 monitor_2_entry* temp;
1504 try {
1505 _listeners.try_lock(pegasus_thread_self());
1506 temp = _listeners.next(0);
1507 while(temp != 0){
|
1508 mday 1.42 if(s == (Sint32)temp->_rep->psock ){
|
1509 mday 1.40 temp = _listeners.remove_no_lock(temp);
1510 delete temp;
1511 _listeners.unlock();
1512 return true;
1513 }
1514 temp = _listeners.next(temp);
1515 }
1516 _listeners.unlock();
1517 }
1518 catch(...){
1519 }
1520 return false;
|
1521 mday 1.7 }
|
1522 mday 1.37
|
1523 mday 1.42 Uint32 monitor_2::getOutstandingRequestCount(void)
1524 {
1525 return _requestCount.value();
1526
|
1527 mday 1.49 }
1528
1529
1530 HTTPConnection2* monitor_2::remove_connection(Sint32 sock)
1531 {
1532
1533 HTTPConnection2* temp;
1534 try
1535 {
1536 monitor_2::_connections.lock(pegasus_thread_self());
1537 temp = monitor_2::_connections.next(0);
1538 while(temp != 0 )
1539 {
1540 if(sock == temp->getSocket())
1541 {
1542 temp = monitor_2::_connections.remove_no_lock(temp);
1543 monitor_2::_connections.unlock();
1544 return temp;
1545 }
1546 temp = monitor_2::_connections.next(temp);
1547 }
1548 mday 1.49 monitor_2::_connections.unlock();
1549 }
1550 catch(...)
1551 {
1552 }
1553 return 0;
1554 }
1555
1556 Boolean monitor_2::insert_connection(HTTPConnection2* connection)
1557 {
1558 try
1559 {
1560 monitor_2::_connections.insert_first(connection);
1561 }
1562 catch(...)
1563 {
1564 return false;
1565 }
1566 return true;
|
1567 mday 1.42 }
|
1568 mday 1.7
|
1569 mike 1.2
1570 PEGASUS_NAMESPACE_END
|