1 karl 1.64 //%2003////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.64 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Development
4 // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
|
7 mike 1.2 //
8 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
9 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
10 // deal in the Software without restriction, including without limitation the
11 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
12 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
13 // furnished to do so, subject to the following conditions:
14 //
|
15 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
16 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
17 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
18 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
19 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
21 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 //
24 //==============================================================================
25 //
26 // Author: Mike Brasher (mbrasher@bmc.com)
27 //
|
28 mday 1.49 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
|
29 a.arora 1.71 // Amit K Arora (Bug#1153) amita@in.ibm.com
|
30 alagaraja 1.75 // Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
|
31 mike 1.2 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #include <Pegasus/Common/Config.h>
|
35 mday 1.40
|
36 mike 1.2 #include <cstring>
37 #include "Monitor.h"
38 #include "MessageQueue.h"
39 #include "Socket.h"
|
40 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
41 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
42 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
43 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
44 mike 1.2
45 #ifdef PEGASUS_OS_TYPE_WINDOWS
46 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
47 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
48 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
49 mday 1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
|
50 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
51 otherwise, less than 64 clients (the default) will be able to connect to the \
52 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
53 mday 1.5
|
54 mike 1.2 # endif
55 # define FD_SETSIZE 1024
|
56 mday 1.5 # include <windows.h>
|
57 mike 1.2 #else
58 # include <sys/types.h>
59 # include <sys/socket.h>
60 # include <sys/time.h>
61 # include <netinet/in.h>
62 # include <netdb.h>
63 # include <arpa/inet.h>
64 #endif
65
66 PEGASUS_USING_STD;
67
68 PEGASUS_NAMESPACE_BEGIN
69
|
70 mday 1.18
|
71 mday 1.25 static AtomicInt _connections = 0;
72
73 static struct timeval create_time = {0, 1};
|
74 mday 1.38 static struct timeval destroy_time = {300, 0};
|
75 mday 1.26 static struct timeval deadlock_time = {0, 0};
|
76 mday 1.18
|
77 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
78 //
79 // MonitorRep
80 //
81 ////////////////////////////////////////////////////////////////////////////////
82
83 struct MonitorRep
84 {
85 fd_set rd_fd_set;
86 fd_set wr_fd_set;
87 fd_set ex_fd_set;
88 fd_set active_rd_fd_set;
89 fd_set active_wr_fd_set;
90 fd_set active_ex_fd_set;
91 };
92
93 ////////////////////////////////////////////////////////////////////////////////
94 //
95 // Monitor
96 //
97 ////////////////////////////////////////////////////////////////////////////////
98 mike 1.2
|
99 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
100 mike 1.2 Monitor::Monitor()
|
101 a.arora 1.73 : _module_handle(0),
102 _controller(0),
103 _async(false),
104 _stopConnections(0),
105 _stopConnectionsSem(0),
|
106 a.dunfey 1.76 _solicitSocketCount(0),
107 _tickle_client_socket(0),
108 _tickle_server_socket(0),
109 _tickle_peer_socket(0)
|
110 mike 1.2 {
|
111 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
112 mike 1.2 Socket::initializeInterface();
|
113 mday 1.25 _rep = 0;
|
114 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
115 a.arora 1.73
116 // setup the tickler
117 initializeTickler();
118
119 // Start the count at 1 because initilizeTickler()
120 // has added an entry in the first position of the
121 // _entries array
122 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
123 mday 1.37 {
124 _MonitorEntry entry(0, 0, 0);
125 _entries.append(entry);
126 }
|
127 mike 1.2 }
128
|
129 mday 1.18 Monitor::Monitor(Boolean async)
|
130 a.arora 1.73 : _module_handle(0),
131 _controller(0),
132 _async(async),
133 _stopConnections(0),
134 _stopConnectionsSem(0),
|
135 a.dunfey 1.76 _solicitSocketCount(0),
136 _tickle_client_socket(0),
137 _tickle_server_socket(0),
138 _tickle_peer_socket(0)
|
139 mday 1.18 {
|
140 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
141 mday 1.18 Socket::initializeInterface();
|
142 mday 1.25 _rep = 0;
|
143 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
144 a.arora 1.73
145 // setup the tickler
146 initializeTickler();
147
148 // Start the count at 1 because initilizeTickler()
149 // has added an entry in the first position of the
150 // _entries array
151 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
152 mday 1.37 {
153 _MonitorEntry entry(0, 0, 0);
154 _entries.append(entry);
155 }
|
156 mday 1.18 }
|
157 mday 1.20
|
158 mike 1.2 Monitor::~Monitor()
159 {
|
160 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
161 "deregistering with module controller");
|
162 kumpf 1.10
|
163 kumpf 1.11 if(_module_handle != NULL)
|
164 mday 1.8 {
165 _controller->deregister_module(PEGASUS_MODULENAME_MONITOR);
166 _controller = 0;
|
167 kumpf 1.10 delete _module_handle;
|
168 mday 1.8 }
|
169 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
|
170 kumpf 1.48
|
171 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
172 a.dunfey 1.76
173 try{
174 if(_tickle_peer_socket > 0)
175 {
176 Socket::close(_tickle_peer_socket);
177 }
178 if(_tickle_client_socket > 0)
179 {
180 Socket::close(_tickle_client_socket);
181 }
182 if(_tickle_server_socket > 0)
183 {
184 Socket::close(_tickle_server_socket);
185 }
186 }
187 catch(...)
188 {
189 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
190 "Failed to close tickle sockets");
191 }
192
|
193 mike 1.2 Socket::uninitializeInterface();
|
194 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
195 "returning from monitor destructor");
|
196 mday 1.18 }
197
|
198 a.arora 1.73 void Monitor::initializeTickler(){
199 /*
200 NOTE: On any errors trying to
201 setup out tickle connection,
202 throw an exception/end the server
203 */
204
205 /* setup the tickle server/listener */
206
207 // get a socket for the server side
208 if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
209 //handle error
210 MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
211 "Received error number $0 while creating the internal socket.",
212 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
213 errno);
214 #else
215 WSAGetLastError());
216 #endif
217 throw Exception(parms);
218 }
219 a.arora 1.73
220 // initialize the address
221 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
222 #ifdef PEGASUS_OS_ZOS
223 _tickle_server_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
224 #else
225 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
226 #pragma convert(37)
227 #endif
228 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
229 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
230 #pragma convert(0)
231 #endif
232 #endif
233 _tickle_server_addr.sin_family = PF_INET;
234 _tickle_server_addr.sin_port = 0;
235
236 PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_server_addr);
237
238 // bind server side to socket
239 if((::bind(_tickle_server_socket,
240 a.arora 1.73 (struct sockaddr *)&_tickle_server_addr,
241 sizeof(_tickle_server_addr))) < 0){
242 // handle error
243 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
244 "Received error number $0 while binding the internal socket.",
245 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
246 errno);
247 #else
248 WSAGetLastError());
249 #endif
250 throw Exception(parms);
251 }
252
253 // tell the kernel we are a server
254 if((::listen(_tickle_server_socket,3)) < 0){
255 // handle error
256 MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
257 "Received error number $0 while listening to the internal socket.",
258 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
259 errno);
260 #else
261 a.arora 1.73 WSAGetLastError());
262 #endif
263 throw Exception(parms);
264 }
265
266 // make sure we have the correct socket for our server
267 int sock = ::getsockname(_tickle_server_socket,
268 (struct sockaddr*)&_tickle_server_addr,
269 &_addr_size);
270 if(sock < 0){
271 // handle error
272 MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
273 "Received error number $0 while getting the internal socket name.",
274 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
275 errno);
276 #else
277 WSAGetLastError());
278 #endif
279 throw Exception(parms);
280 }
281
282 a.arora 1.73 /* set up the tickle client/connector */
283
284 // get a socket for our tickle client
285 if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) < 0){
286 // handle error
287 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
288 "Received error number $0 while creating the internal client socket.",
289 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
290 errno);
291 #else
292 WSAGetLastError());
293 #endif
294 throw Exception(parms);
295 }
296
297 // setup the address of the client
298 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
299 #ifdef PEGASUS_OS_ZOS
300 _tickle_client_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
301 #else
302 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
303 a.arora 1.73 #pragma convert(37)
304 #endif
305 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
306 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
307 #pragma convert(0)
308 #endif
309 #endif
310 _tickle_client_addr.sin_family = PF_INET;
311 _tickle_client_addr.sin_port = 0;
312
313 // bind socket to client side
314 if((::bind(_tickle_client_socket,
315 (struct sockaddr*)&_tickle_client_addr,
316 sizeof(_tickle_client_addr))) < 0){
317 // handle error
318 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
319 "Received error number $0 while binding the internal client socket.",
320 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
321 errno);
322 #else
323 WSAGetLastError());
324 a.arora 1.73 #endif
325 throw Exception(parms);
326 }
327
328 // connect to server side
329 if((::connect(_tickle_client_socket,
330 (struct sockaddr*)&_tickle_server_addr,
331 sizeof(_tickle_server_addr))) < 0){
332 // handle error
333 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
334 "Received error number $0 while connecting the internal client socket.",
335 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
336 errno);
337 #else
338 WSAGetLastError());
339 #endif
340 throw Exception(parms);
341 }
342
343 /* set up the slave connection */
344 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
345 a.arora 1.73 PEGASUS_SOCKLEN_SIZE peer_size = sizeof(_tickle_peer_addr);
346 pegasus_sleep(1);
347
348 // this call may fail, we will try a max of 20 times to establish this peer connection
349 if((_tickle_peer_socket = ::accept(_tickle_server_socket,
350 (struct sockaddr*)&_tickle_peer_addr,
351 &peer_size)) < 0){
352 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
353 // Only retry on non-windows platforms.
354 if(_tickle_peer_socket == -1 && errno == EAGAIN)
355 {
356 int retries = 0;
357 do
358 {
359 pegasus_sleep(1);
360 _tickle_peer_socket = ::accept(_tickle_server_socket,
361 (struct sockaddr*)&_tickle_peer_addr,
362 &peer_size);
363 retries++;
364 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
365 }
366 a.arora 1.73 #endif
367 }
368 if(_tickle_peer_socket == -1){
369 // handle error
370 MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
371 "Received error number $0 while accepting the internal socket connection.",
372 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
373 errno);
374 #else
375 WSAGetLastError());
376 #endif
377 throw Exception(parms);
378 }
379 // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
380 // checks entries with IDLE state for events
381 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
382 entry._status = _MonitorEntry::IDLE;
383 _entries.append(entry);
384 }
385
386 void Monitor::tickle(void)
387 a.arora 1.73 {
388 static char _buffer[] =
389 {
390 '0','0'
391 };
392
393 Socket::disableBlocking(_tickle_client_socket);
394 Socket::write(_tickle_client_socket,&_buffer, 2);
395 Socket::enableBlocking(_tickle_client_socket);
396 }
397
|
398 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
399 {
|
400 mday 1.18
|
401 mday 1.25 Boolean handled_events = false;
|
402 a.arora 1.73 int i = 0;
403
|
404 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
405 a.arora 1.73
|
406 mday 1.25 fd_set fdread;
407 FD_ZERO(&fdread);
|
408 a.arora 1.73
|
409 mday 1.37 _entry_mut.lock(pegasus_thread_self());
|
410 mday 1.13
|
411 kumpf 1.48 // Check the stopConnections flag. If set, clear the Acceptor monitor entries
412 if (_stopConnections == 1)
413 {
414 for ( int indx = 0; indx < (int)_entries.size(); indx++)
415 {
416 if (_entries[indx]._type == Monitor::ACCEPTOR)
417 {
418 if ( _entries[indx]._status.value() != _MonitorEntry::EMPTY)
419 {
420 if ( _entries[indx]._status.value() == _MonitorEntry::IDLE ||
421 _entries[indx]._status.value() == _MonitorEntry::DYING )
422 {
423 // remove the entry
424 _entries[indx]._status = _MonitorEntry::EMPTY;
425 }
426 else
427 {
428 // set status to DYING
|
429 kumpf 1.52 _entries[indx]._status = _MonitorEntry::DYING;
|
430 kumpf 1.48 }
431 }
432 }
433 }
434 _stopConnections = 0;
|
435 a.arora 1.73 _stopConnectionsSem.signal();
|
436 kumpf 1.48 }
|
437 kumpf 1.51
|
438 kumpf 1.68 for( int indx = 0; indx < (int)_entries.size(); indx++)
439 {
440 if ((_entries[indx]._status.value() == _MonitorEntry::DYING) &&
441 (_entries[indx]._type == Monitor::CONNECTION))
442 {
443 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
444 PEGASUS_ASSERT(q != 0);
445 MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
446 Message* message= new CloseConnectionMessage(_entries[indx].socket);
447 message->dest = o.getQueueId();
448
449 // HTTPAcceptor is responsible for closing the connection.
450 // The lock is released to allow HTTPAcceptor to call
451 // unsolicitSocketMessages to free the entry.
452 // Once HTTPAcceptor completes processing of the close
453 // connection, the lock is re-requested and processing of
454 // the for loop continues. This is safe with the current
455 // implementation of the _entries object. Note that the
456 // loop condition accesses the _entries.size() on each
457 // iteration, so that a change in size while the mutex is
458 // unlocked will not result in an ArrayIndexOutOfBounds
459 kumpf 1.68 // exception.
460
461 _entry_mut.unlock();
462 o.enqueue(message);
463 _entry_mut.lock(pegasus_thread_self());
464 }
465 }
466
|
467 kumpf 1.51 Uint32 _idleEntries = 0;
|
468 a.arora 1.73
469 /*
470 We will keep track of the maximum socket number and pass this value
471 to the kernel as a parameter to SELECT. This loop seems like a good
472 place to calculate the max file descriptor (maximum socket number)
473 because we have to traverse the entire array.
474 */
475 int maxSocketCurrentPass = 0;
|
476 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
|
477 mike 1.2 {
|
478 a.arora 1.73 if(maxSocketCurrentPass < _entries[indx].socket)
479 maxSocketCurrentPass = _entries[indx].socket;
480
|
481 mday 1.37 if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
|
482 mday 1.25 {
|
483 kumpf 1.51 _idleEntries++;
|
484 mday 1.25 FD_SET(_entries[indx].socket, &fdread);
485 }
|
486 mday 1.13 }
|
487 s.hills 1.62
|
488 a.arora 1.73 /*
489 Add 1 then assign maxSocket accordingly. We add 1 to account for
490 descriptors starting at 0.
491 */
492 maxSocketCurrentPass++;
493
|
494 kumpf 1.51 _entry_mut.unlock();
|
495 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
496 kumpf 1.51 _entry_mut.lock(pegasus_thread_self());
|
497 mday 1.25
|
498 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
499 kumpf 1.50 if(events == SOCKET_ERROR)
|
500 mike 1.2 #else
|
501 kumpf 1.50 if(events == -1)
|
502 mike 1.2 #endif
|
503 mday 1.13 {
|
504 kumpf 1.50 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
505 "Monitor::run - errorno = %d has occurred on select.", errno);
506 // The EBADF error indicates that one or more or the file
507 // descriptions was not valid. This could indicate that
508 // the _entries structure has been corrupted or that
509 // we have a synchronization error.
510
511 PEGASUS_ASSERT(errno != EBADF);
512 }
513 else if (events)
514 {
|
515 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
516 "Monitor::run select event received events = %d, monitoring %d idle entries",
517 events, _idleEntries);
|
518 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
519 {
|
520 kumpf 1.53 // The Monitor should only look at entries in the table that are IDLE (i.e.,
521 // owned by the Monitor).
522 if((_entries[indx]._status.value() == _MonitorEntry::IDLE) &&
523 (FD_ISSET(_entries[indx].socket, &fdread)))
|
524 mday 1.25 {
525 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
|
526 kumpf 1.53 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
527 "Monitor::run indx = %d, queueId = %d, q = %p",
528 indx, _entries[indx].queueId, q);
529 PEGASUS_ASSERT(q !=0);
|
530 mday 1.37
531 try
|
532 mday 1.25 {
|
533 mday 1.37 if(_entries[indx]._type == Monitor::CONNECTION)
534 {
|
535 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
536 "_entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
|
537 mday 1.37 static_cast<HTTPConnection *>(q)->_entry_index = indx;
538 _entries[indx]._status = _MonitorEntry::BUSY;
|
539 kumpf 1.66 // If allocate_and_awaken failure, retry on next iteration
|
540 a.arora 1.73 /* Removed for PEP 183.
|
541 kumpf 1.69 if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
542 (void *)q, _dispatch))
|
543 kumpf 1.67 {
544 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
545 "Monitor::run: Insufficient resources to process request.");
546 _entries[indx]._status = _MonitorEntry::IDLE;
547 _entry_mut.unlock();
548 return true;
549 }
|
550 a.arora 1.73 */
551 // Added for PEP 183
552 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
553 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
554 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
555 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
556 try
557 {
558 dst->run(1);
559 }
560 catch (...)
561 {
562 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
563 "Monitor::_dispatch: exception received");
564 }
565 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
566 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
567
568 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
569 // Once the HTTPConnection thread has set the status value to either
570 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
571 a.arora 1.73 // to the Monitor. It is no longer permissible to access the connection
572 // or the entry in the _entries table.
573 if (dst->_connectionClosePending)
574 {
575 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
576 }
577 else
578 {
579 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
580 }
581 // end Added for PEP 183
582 }
583 else if( _entries[indx]._type == Monitor::INTERNAL){
584 // set ourself to BUSY,
585 // read the data
586 // and set ourself back to IDLE
587
588 _entries[indx]._status == _MonitorEntry::BUSY;
589 static char buffer[2];
590 Socket::disableBlocking(_entries[indx].socket);
591 Sint32 amt = Socket::read(_entries[indx].socket,&buffer, 2);
592 a.arora 1.73 Socket::enableBlocking(_entries[indx].socket);
593 _entries[indx]._status == _MonitorEntry::IDLE;
|
594 mday 1.37 }
595 else
|
596 mday 1.25 {
|
597 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
598 "Non-connection entry, indx = %d, has been received.", indx);
|
599 mday 1.37 int events = 0;
600 events |= SocketMessage::READ;
601 Message *msg = new SocketMessage(_entries[indx].socket, events);
602 _entries[indx]._status = _MonitorEntry::BUSY;
603 _entry_mut.unlock();
|
604 mday 1.27
|
605 mday 1.37 q->enqueue(msg);
606 _entries[indx]._status = _MonitorEntry::IDLE;
|
607 mday 1.25 return true;
608 }
609 }
|
610 mday 1.37 catch(...)
|
611 mday 1.25 {
612 }
613 handled_events = true;
614 }
615 }
|
616 mday 1.24 }
|
617 mday 1.37 _entry_mut.unlock();
|
618 mday 1.13 return(handled_events);
|
619 mike 1.2 }
620
|
621 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
622 kumpf 1.48 {
623 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
624 a.arora 1.73 // set boolean then tickle the server to recognize _stopConnections
|
625 kumpf 1.48 _stopConnections = 1;
|
626 a.arora 1.73 tickle();
|
627 kumpf 1.48
|
628 chuck 1.74 if (wait)
|
629 a.arora 1.73 {
|
630 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
631 // caller of this function may unbind the ports while the monitor
632 // is still accepting connections on them.
633 try
634 {
635 _stopConnectionsSem.time_wait(10000);
636 }
637 catch (TimeOut &)
638 {
639 // The monitor is probably busy processng a very long request, and is
640 // not accepting connections. Let the caller unbind the ports.
641 }
|
642 a.arora 1.73 }
643
|
644 kumpf 1.48 PEG_METHOD_EXIT();
645 }
|
646 mday 1.25
|
647 mday 1.37
|
648 mday 1.25 int Monitor::solicitSocketMessages(
|
649 mike 1.2 Sint32 socket,
650 Uint32 events,
|
651 mday 1.8 Uint32 queueId,
652 int type)
|
653 mike 1.2 {
|
654 a.arora 1.73 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
655 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
656 a.arora 1.73 // Check to see if we need to dynamically grow the _entries array
657 // We always want the _entries array to 2 bigger than the
658 // current connections requested
659 _solicitSocketCount++; // bump the count
660 int size = (int)_entries.size();
661 if(_solicitSocketCount >= (size-1)){
662 for(int i = 0; i < (_solicitSocketCount - (size-1)); i++){
663 _MonitorEntry entry(0, 0, 0);
664 _entries.append(entry);
665 }
666 }
|
667 kumpf 1.4
|
668 a.arora 1.73 int index;
669 for(index = 1; index < (int)_entries.size(); index++)
|
670 mday 1.25 {
|
671 a.arora 1.73 try
|
672 mday 1.37 {
|
673 a.arora 1.73 if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
674 {
675 _entries[index].socket = socket;
676 _entries[index].queueId = queueId;
677 _entries[index]._type = type;
678 _entries[index]._status = _MonitorEntry::IDLE;
|
679 alagaraja 1.75
|
680 a.arora 1.73 return index;
681 }
|
682 mday 1.37 }
683 catch(...)
|
684 mday 1.25 {
685 }
686 }
|
687 a.arora 1.73 _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful
|
688 mday 1.25 PEG_METHOD_EXIT();
|
689 kumpf 1.50 return -1;
|
690 a.arora 1.73
|
691 mike 1.2 }
692
|
693 mday 1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
|
694 mike 1.2 {
|
695 kumpf 1.50
|
696 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
697 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
698 a.arora 1.73
699 /*
700 Start at index = 1 because _entries[0] is the tickle entry which never needs
701 to be EMPTY;
702 */
703 int index;
704 for(index = 1; index < _entries.size(); index++)
|
705 mike 1.2 {
|
706 mday 1.25 if(_entries[index].socket == socket)
707 {
|
708 a.arora 1.73 _entries[index]._status = _MonitorEntry::EMPTY;
709 _entries[index].socket = -1;
710 _solicitSocketCount--;
711 break;
|
712 mday 1.25 }
|
713 mike 1.2 }
|
714 a.arora 1.73
715 /*
716 Dynamic Contraction:
717 To remove excess entries we will start from the end of the _entries array
718 and remove all entries with EMPTY status until we find the first NON EMPTY.
719 This prevents the positions, of the NON EMPTY entries, from being changed.
720 */
721 index = _entries.size() - 1;
722 while(_entries[index]._status == _MonitorEntry::EMPTY){
723 if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
724 _entries.remove(index);
725 index--;
726 }
727
|
728 kumpf 1.4 PEG_METHOD_EXIT();
|
729 mike 1.2 }
730
|
731 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
732 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
733 {
|
734 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
735 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
736 kumpf 1.53 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
737 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
738 kumpf 1.51 try
739 {
740 dst->run(1);
741 }
742 catch (...)
743 {
744 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
745 "Monitor::_dispatch: exception received");
746 }
747 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
748 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
749
|
750 kumpf 1.53 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.value() == _MonitorEntry::BUSY);
|
751 kumpf 1.68
752 // Once the HTTPConnection thread has set the status value to either
753 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
754 // to the Monitor. It is no longer permissible to access the connection
755 // or the entry in the _entries table.
|
756 kumpf 1.50 if (dst->_connectionClosePending)
757 {
|
758 kumpf 1.68 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
759 }
760 else
761 {
762 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
763 kumpf 1.50 }
|
764 mday 1.8 return 0;
|
765 mday 1.40 }
766
767
768
769 ////************************* monitor 2 *****************************////
|
770 mday 1.43 ////************************* monitor 2 *****************************////
771 ////************************* monitor 2 *****************************////
772 ////************************* monitor 2 *****************************////
773 ////************************* monitor 2 *****************************////
774 ////************************* monitor 2 *****************************////
775 ////************************* monitor 2 *****************************////
|
776 mday 1.40
777
|
778 mday 1.56
779
780
|
781 mday 1.42 m2e_rep::m2e_rep(void)
|
782 mday 1.43 :Base(), state(IDLE)
783
|
784 mday 1.42 {
785 }
786
787 m2e_rep::m2e_rep(monitor_2_entry_type _type,
788 pegasus_socket _sock,
789 void* _accept,
790 void* _dispatch)
|
791 mday 1.43 : Base(), type(_type), state(IDLE), psock(_sock),
|
792 mday 1.42 accept_parm(_accept), dispatch_parm(_dispatch)
793 {
794
795 }
796
797 m2e_rep::~m2e_rep(void)
798 {
799 }
800
801 m2e_rep::m2e_rep(const m2e_rep& r)
802 : Base()
803 {
804 if(this != &r){
805 type = r.type;
806 psock = r.psock;
807 accept_parm = r.accept_parm;
808 dispatch_parm = r.dispatch_parm;
|
809 mday 1.43 state = IDLE;
810
|
811 mday 1.42 }
812 }
813
814
815 m2e_rep& m2e_rep::operator =(const m2e_rep& r)
816 {
817 if(this != &r) {
818 type = r.type;
819 psock = r.psock;
820 accept_parm = r.accept_parm;
821 dispatch_parm = r.dispatch_parm;
|
822 mday 1.43 state = IDLE;
|
823 mday 1.42 }
824 return *this;
825 }
826
827 Boolean m2e_rep::operator ==(const m2e_rep& r)
828 {
829 if(this == &r)
830 return true;
831 return false;
832 }
833
834 Boolean m2e_rep::operator ==(void* r)
835 {
836 if((void*)this == r)
837 return true;
838 return false;
839 }
840
841 m2e_rep::operator pegasus_socket() const
842 {
843 return psock;
844 mday 1.42 }
845
846
|
847 mday 1.40 monitor_2_entry::monitor_2_entry(void)
848 {
|
849 mday 1.42 _rep = new m2e_rep();
|
850 mday 1.40 }
851
|
852 mday 1.42 monitor_2_entry::monitor_2_entry(pegasus_socket& _psock,
853 monitor_2_entry_type _type,
854 void* _accept_parm, void* _dispatch_parm)
|
855 mday 1.40 {
|
856 mday 1.42 _rep = new m2e_rep(_type, _psock, _accept_parm, _dispatch_parm);
|
857 mday 1.40 }
858
859 monitor_2_entry::monitor_2_entry(const monitor_2_entry& e)
860 {
861 if(this != &e){
|
862 mday 1.42 Inc(this->_rep = e._rep);
|
863 mday 1.40 }
864 }
865
866 monitor_2_entry::~monitor_2_entry(void)
867 {
|
868 a.dunfey 1.76
|
869 mday 1.42 Dec(_rep);
|
870 mday 1.40 }
871
872 monitor_2_entry& monitor_2_entry::operator=(const monitor_2_entry& e)
873 {
874 if(this != &e){
|
875 mday 1.42 Dec(_rep);
876 Inc(this->_rep = e._rep);
|
877 mday 1.40 }
878 return *this;
879 }
880
|
881 mday 1.42 Boolean monitor_2_entry::operator ==(const monitor_2_entry& me) const
|
882 mday 1.40 {
883 if(this == &me)
884 return true;
885 return false;
886 }
887
|
888 mday 1.42 Boolean monitor_2_entry::operator ==(void* k) const
|
889 mday 1.40 {
890 if((void *)this == k)
891 return true;
892 return false;
893 }
894
895
|
896 mday 1.42 monitor_2_entry_type monitor_2_entry::get_type(void) const
|
897 mday 1.40 {
|
898 mday 1.42 return _rep->type;
899 }
900
901 void monitor_2_entry::set_type(monitor_2_entry_type t)
902 {
903 _rep->type = t;
904 }
905
906
|
907 mday 1.43 monitor_2_entry_state monitor_2_entry::get_state(void) const
908 {
909 return (monitor_2_entry_state) _rep->state.value();
910 }
911
912 void monitor_2_entry::set_state(monitor_2_entry_state t)
913 {
914 _rep->state = t;
915 }
916
|
917 mday 1.42 void* monitor_2_entry::get_accept(void) const
918 {
919 return _rep->accept_parm;
920 }
921
922 void monitor_2_entry::set_accept(void* a)
923 {
924 _rep->accept_parm = a;
925 }
926
927
928 void* monitor_2_entry::get_dispatch(void) const
929 {
930 return _rep->dispatch_parm;
931 }
932
933 void monitor_2_entry::set_dispatch(void* a)
934 {
935 _rep->dispatch_parm = a;
936 }
937
938 mday 1.42 pegasus_socket monitor_2_entry::get_sock(void) const
939 {
940 return _rep->psock;
941 }
942
943
944 void monitor_2_entry::set_sock(pegasus_socket& s)
945 {
946 _rep->psock = s;
947
|
948 mday 1.40 }
949
|
950 mday 1.59 //static monitor_2* _m2_instance;
|
951 mday 1.40
|
952 mday 1.49 AsyncDQueue<HTTPConnection2> monitor_2::_connections(true, 0);
953
|
954 mday 1.40 monitor_2::monitor_2(void)
|
955 mday 1.42 : _session_dispatch(0), _accept_dispatch(0), _listeners(true, 0),
|
956 mday 1.49 _ready(true, 0), _die(0), _requestCount(0)
|
957 mday 1.40 {
958 try {
959
960 bsd_socket_factory _factory;
961
962 // set up the listener/acceptor
963 pegasus_socket temp = pegasus_socket(&_factory);
964
965 temp.socket(PF_INET, SOCK_STREAM, 0);
966 // initialize the address
967 memset(&_tickle_addr, 0, sizeof(_tickle_addr));
|
968 marek 1.47 #ifdef PEGASUS_OS_ZOS
969 _tickle_addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
970 #else
|
971 chuck 1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
972 #pragma convert(37)
973 #endif
|
974 mday 1.40 _tickle_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
|
975 chuck 1.55 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
976 #pragma convert(0)
977 #endif
|
978 marek 1.47 #endif
|
979 mday 1.40 _tickle_addr.sin_family = PF_INET;
980 _tickle_addr.sin_port = 0;
981
982 PEGASUS_SOCKLEN_SIZE _addr_size = sizeof(_tickle_addr);
983
984 temp.bind((struct sockaddr *)&_tickle_addr, sizeof(_tickle_addr));
985 temp.listen(3);
986 temp.getsockname((struct sockaddr*)&_tickle_addr, &_addr_size);
987
988 // set up the connector
989
990 pegasus_socket tickler = pegasus_socket(&_factory);
991 tickler.socket(PF_INET, SOCK_STREAM, 0);
992 struct sockaddr_in _addr;
993 memset(&_addr, 0, sizeof(_addr));
|
994 kumpf 1.48 #ifdef PEGASUS_OS_ZOS
|
995 marek 1.47 _addr.sin_addr.s_addr = inet_addr_ebcdic("127.0.0.1");
996 #else
|
997 mday 1.40 _addr.sin_addr.s_addr = inet_addr("127.0.0.1");
|
998 marek 1.47 #endif
|
999 mday 1.40 _addr.sin_family = PF_INET;
1000 _addr.sin_port = 0;
1001 tickler.bind((struct sockaddr*)&_addr, sizeof(_addr));
1002 tickler.connect((struct sockaddr*)&_tickle_addr, sizeof(_tickle_addr));
1003
|
1004 mday 1.42 _tickler.set_sock(tickler);
1005 _tickler.set_type(INTERNAL);
|
1006 mday 1.43 _tickler.set_state(BUSY);
1007
|
1008 mday 1.40 struct sockaddr_in peer;
1009 memset(&peer, 0, sizeof(peer));
1010 PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
1011
1012 pegasus_socket accepted = temp.accept((struct sockaddr*)&peer, &peer_size);
|
1013 mday 1.57
|
1014 mday 1.42 monitor_2_entry* _tickle = new monitor_2_entry(accepted, INTERNAL, 0, 0);
|
1015 a.arora 1.71
1016 // No need to set _tickle's state as BUSY, since monitor_2::run() now
1017 // does a select only on sockets which are in IDLE (default) state.
1018 // _tickle->set_state(BUSY);
|
1019 mday 1.43
|
1020 mday 1.40 _listeners.insert_first(_tickle);
1021
1022 }
1023 catch(...){ }
1024 }
1025
1026 monitor_2::~monitor_2(void)
1027 {
|
1028 mday 1.60
1029 stop();
1030
|
1031 mday 1.41 try {
1032 monitor_2_entry* temp = _listeners.remove_first();
1033 while(temp){
1034 delete temp;
1035 temp = _listeners.remove_first();
1036 }
1037 }
|
1038 mday 1.60
|
1039 mday 1.41 catch(...){ }
|
1040 mday 1.60
1041
1042 try
1043 {
1044 HTTPConnection2* temp = _connections.remove_first();
1045 while(temp)
1046 {
1047 delete temp;
1048 temp = _connections.remove_first();
1049 }
1050 }
1051 catch(...)
1052 {
1053 }
1054
1055
|
1056 mday 1.40 }
1057
1058
1059 void monitor_2::run(void)
1060 {
1061 monitor_2_entry* temp;
|
1062 a.arora 1.71 int _nonIdle=0, _idleCount=0, events;
1063
|
1064 mday 1.40 while(_die.value() == 0) {
|
1065 a.arora 1.71 _nonIdle=_idleCount=0;
|
1066 mday 1.49
|
1067 mday 1.57 struct timeval tv_idle = { 60, 0 };
|
1068 mday 1.56
|
1069 mday 1.40 // place all sockets in the select set
1070 FD_ZERO(&rd_fd_set);
1071 try {
1072 _listeners.lock(pegasus_thread_self());
1073 temp = _listeners.next(0);
|
1074 a.arora 1.71 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1075 "monitor_2::run:Creating New FD list for SELECT.");
|
1076 mday 1.40 while(temp != 0 ){
|
1077 mday 1.57 if(temp->get_state() == CLOSED ) {
|
1078 mday 1.43 monitor_2_entry* closed = temp;
|
1079 a.arora 1.72 temp = _listeners.next(closed);
|
1080 mday 1.43 _listeners.remove_no_lock(closed);
|
1081 a.arora 1.71
1082 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1083 "monitor_2::run:Deleteing CLOSED socket fd=%d.",(Sint32)closed->get_sock());
|
1084 mday 1.60
|
1085 mday 1.49 HTTPConnection2 *cn = monitor_2::remove_connection((Sint32)(closed->get_sock()));
1086 delete cn;
|
1087 mday 1.43 delete closed;
1088 }
|
1089 mday 1.45 if(temp == 0)
1090 break;
|
1091 a.arora 1.71
1092
1093 //Count the number if IDLE sockets
1094 if(temp->get_state() != IDLE ) _nonIdle++;
1095 else _idleCount++;
1096
|
1097 mday 1.46 Sint32 fd = (Sint32) temp->get_sock();
|
1098 a.arora 1.71
1099 //Select should be called ONLY on the FDs which are in IDLE state
1100 if((fd >= 0) && (temp->get_state() == IDLE))
1101 {
1102 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1103 "monitor_2::run:Adding FD %d to the list for SELECT.",fd);
1104 FD_SET(fd , &rd_fd_set);
1105 }
1106 temp = _listeners.next(temp);
|
1107 mday 1.40 }
1108 _listeners.unlock();
1109 }
1110 catch(...){
1111 return;
1112 }
|
1113 a.arora 1.71
|
1114 mday 1.42 // important - the dispatch routine has pointers to all the
1115 // entries that are readable. These entries can be changed but
1116 // the pointer must not be tampered with.
|
1117 mday 1.56 if(_connections.count() )
|
1118 a.arora 1.71 events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, NULL);
|
1119 mday 1.56 else
|
1120 a.arora 1.71 events = select(FD_SETSIZE, &rd_fd_set, NULL, NULL, &tv_idle);
|
1121 mday 1.57
1122 if(_die.value())
1123 {
1124 break;
1125 }
|
1126 a.arora 1.71
1127 #ifdef PEGASUS_OS_TYPE_WINDOWS
1128 if(events == SOCKET_ERROR)
1129 #else
1130 if(events == -1)
1131 #endif
1132 {
1133 Tracer::trace(TRC_HTTP, Tracer::LEVEL2,
1134 "monitor_2:run:INVALID FD. errorno = %d on select.", errno);
1135 // The EBADF error indicates that one or more or the file
1136 // descriptions was not valid. This could indicate that
1137 // the _entries structure has been corrupted or that
1138 // we have a synchronization error.
1139
1140 // Keeping the line below commented for time being.
1141 // PEGASUS_ASSERT(errno != EBADF);
1142 }
1143 else if (events)
1144 {
1145 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1146 "monitor_2::run select event received events = %d, monitoring %d idle entries", events, _idleCount);
1147 a.arora 1.71
|
1148 mday 1.57
|
1149 mday 1.40 try {
1150 _listeners.lock(pegasus_thread_self());
1151 temp = _listeners.next(0);
1152 while(temp != 0 ){
|
1153 a.arora 1.72 Sint32 fd = (Sint32) temp->get_sock();
1154 if(fd >= 0 && FD_ISSET(fd, &rd_fd_set)) {
1155 if(temp->get_type() != CLIENTSESSION) temp->set_state(BUSY);
|
1156 mday 1.42 FD_CLR(fd, &rd_fd_set);
1157 monitor_2_entry* ready = new monitor_2_entry(*temp);
|
1158 mday 1.49 try
1159 {
1160 _ready.insert_first(ready);
1161 }
1162 catch(...)
1163 {
1164 }
1165
|
1166 mday 1.42 _requestCount++;
|
1167 mday 1.40 }
1168 temp = _listeners.next(temp);
1169 }
1170 _listeners.unlock();
1171 }
1172 catch(...){
1173 return;
1174 }
1175 // now handle the sockets that are ready to read
|
1176 mday 1.56 if(_ready.count())
1177 _dispatch();
1178 else
1179 {
1180 if(_connections.count() == 0 )
1181 _idle_dispatch(_idle_parm);
1182 }
|
1183 a.arora 1.71 } // if events
|
1184 mday 1.40 } // while alive
|
1185 a.arora 1.72 _die=0;
|
1186 mday 1.57
|
1187 mday 1.40 }
1188
|
1189 dj.gorey 1.70 int monitor_2::solicitSocketMessages(
1190 Sint32 socket,
1191 Uint32 events,
1192 Uint32 queueId,
1193 int type)
1194 {
1195
|
1196 a.arora 1.71 PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::solicitSocketMessages");
|
1197 dj.gorey 1.70
|
1198 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
1199 dj.gorey 1.70
1200 for(int index = 0; index < (int)_entries.size(); index++)
1201 {
1202 try
1203 {
1204 if(_entries[index]._status.value() == monitor_2_entry::EMPTY)
1205 {
1206 _entries[index].socket = socket;
1207 //_entries[index].queueId = queueId;
1208 //_entries[index]._type = type;
|
1209 a.arora 1.71 _entries[index]._status = IDLE;
|
1210 dj.gorey 1.70
1211 return index;
1212 }
1213 }
1214 catch(...)
1215 {
1216 }
1217
1218 }
1219 PEG_METHOD_EXIT();
1220 return -1;
1221 }
1222
1223
1224 void monitor_2::unsolicitSocketMessages(Sint32 socket)
1225 {
1226
1227 PEG_METHOD_ENTER(TRC_HTTP, "monitor_2::unsolicitSocketMessages");
|
1228 alagaraja 1.75 AutoMutex autoMut(_entry2_mut);
|
1229 dj.gorey 1.70
1230 for(int index = 0; index < (int)_entries2.size(); index++)
1231 {
1232 if(_entries2[index].socket == socket)
1233 {
1234 _entries2[index]._status = monitor_2_entry::EMPTY;
1235 _entries2[index].socket = -1;
1236 break;
1237 }
1238 }
1239 PEG_METHOD_EXIT();
1240 }
1241
|
1242 mday 1.42 void* monitor_2::set_session_dispatch(void (*dp)(monitor_2_entry*))
|
1243 mday 1.40 {
|
1244 mday 1.42 void* old = (void *)_session_dispatch;
|
1245 mday 1.40 _session_dispatch = dp;
1246 return old;
1247 }
1248
|
1249 mday 1.42 void* monitor_2::set_accept_dispatch(void (*dp)(monitor_2_entry*))
1250 {
1251 void* old = (void*)_accept_dispatch;
1252 _accept_dispatch = dp;
1253 return old;
|
1254 mday 1.56 }
1255
1256 void* monitor_2::set_idle_dispatch(void (*dp)(void*))
1257 {
1258 void* old = (void*)_idle_dispatch;
1259 _idle_dispatch = dp;
1260 return old;
1261 }
1262
1263 void* monitor_2::set_idle_parm(void* parm)
1264 {
1265 void* old = _idle_parm;
1266 _idle_parm = parm;
1267 return old;
|
1268 mday 1.42 }
1269
|
1270 mday 1.40
|
1271 mday 1.60
1272 //-----------------------------------------------------------------
1273 // Note on deleting the monitor_2_entry nodes:
1274 // Each case: in the switch statement needs to handle the deletion
1275 // of the monitor_2_entry * node differently. A SESSION dispatch
1276 // routine MUST DELETE the entry during its dispatch handling.
1277 // All other dispatch routines MUST NOT delete the entry during the
1278 // dispatch handling, but must allow monitor_2::_dispatch to delete
1279 // the entry.
1280 //
1281 // The reason is pretty obscure and it is debatable whether or not
1282 // to even bother, but during cimserver shutdown the single monitor_2_entry*
1283 // will leak unless the _session_dispatch routine takes care of deleting it.
1284 //
1285 // The reason is that a shutdown messages completely stops everything and
1286 // the _session_dispatch routine never returns. So monitor_2::_dispatch is
1287 // never able to do its own cleanup.
1288 //
1289 // << Mon Oct 13 09:33:33 2003 mdd >>
1290 //-----------------------------------------------------------------
1291
|
1292 mday 1.40 void monitor_2::_dispatch(void)
1293 {
|
1294 mday 1.49 monitor_2_entry* entry;
1295
1296 try
1297 {
1298
1299 entry = _ready.remove_first();
1300 }
1301 catch(...)
1302 {
1303 }
1304
1305 while(entry != 0 ) {
|
1306 mday 1.42 switch(entry->get_type()) {
|
1307 mday 1.40 case INTERNAL:
1308 static char buffer[2];
|
1309 mday 1.49 entry->get_sock().disableBlocking();
|
1310 mday 1.42 entry->get_sock().read(&buffer, 2);
|
1311 mday 1.49 entry->get_sock().enableBlocking();
|
1312 a.arora 1.71 entry->set_state(IDLE); // Set state of the socket to IDLE so that
1313 // monitor_2::run can add to the list of FDs
1314 // on which select would be called.
1315
1316
1317
|
1318 mday 1.60 delete entry;
1319
|
1320 mday 1.40 break;
1321 case LISTEN:
1322 {
1323 static struct sockaddr peer;
1324 static PEGASUS_SOCKLEN_SIZE peer_size = sizeof(peer);
|
1325 mday 1.49 entry->get_sock().disableBlocking();
|
1326 mday 1.42 pegasus_socket connected = entry->get_sock().accept(&peer, &peer_size);
|
1327 a.arora 1.71 entry->set_state(IDLE); // Set state of the LISTEN socket to IDLE
|
1328 mday 1.65 #ifdef PEGASUS_OS_TYPE_WINDOWS
1329 if((Sint32)connected == SOCKET_ERROR)
1330 #else
1331 if((Sint32)connected == -1 )
1332 #endif
1333 {
1334 delete entry;
1335 break;
1336 }
1337
|
1338 mday 1.49 entry->get_sock().enableBlocking();
|
1339 mday 1.42 monitor_2_entry *temp = add_entry(connected, SESSION, entry->get_accept(), entry->get_dispatch());
1340 if(temp && _accept_dispatch != 0)
|
1341 mday 1.49 _accept_dispatch(temp);
|
1342 mday 1.60 delete entry;
1343
|
1344 mday 1.40 }
1345 break;
1346 case SESSION:
|
1347 a.arora 1.72 case CLIENTSESSION:
|
1348 mday 1.60 if(_session_dispatch != 0 )
1349 {
1350 // NOTE: _session_dispatch will delete entry - do not do it here
|
1351 a.arora 1.72 unsigned client=0;
1352 if(entry->get_type() == CLIENTSESSION) client = 1;
1353 Sint32 sock=(Sint32)(entry->get_sock());
1354
1355 _session_dispatch(entry);
1356
1357 if(client)
1358 {
1359 HTTPConnection2 *cn = monitor_2::remove_connection(sock);
1360 if(cn) delete cn;
1361 // stop();
1362 _die=1;
1363 }
|
1364 mday 1.60 }
1365
|
1366 mday 1.40 else {
1367 static char buffer[4096];
|
1368 mday 1.42 int bytes = entry->get_sock().read(&buffer, 4096);
|
1369 mday 1.60 delete entry;
|
1370 mday 1.40 }
1371
1372 break;
1373 case UNTYPED:
1374 default:
|
1375 mday 1.60 delete entry;
|
1376 mday 1.40 break;
1377 }
|
1378 mday 1.42 _requestCount--;
|
1379 mday 1.49
1380 if(_ready.count() == 0 )
1381 break;
1382
1383 try
1384 {
1385 entry = _ready.remove_first();
1386 }
1387 catch(...)
1388 {
1389 }
1390
|
1391 mday 1.40 }
1392 }
1393
1394 void monitor_2::stop(void)
1395 {
1396 _die = 1;
1397 tickle();
1398 // shut down the listener list, free the list nodes
|
1399 mday 1.42 _tickler.get_sock().close();
|
1400 mday 1.40 _listeners.shutdown_queue();
1401 }
1402
1403 void monitor_2::tickle(void)
1404 {
1405 static char _buffer[] =
1406 {
1407 '0','0'
1408 };
1409
|
1410 mday 1.57 _tickler.get_sock().disableBlocking();
1411
|
1412 mday 1.42 _tickler.get_sock().write(&_buffer, 2);
|
1413 mday 1.57 _tickler.get_sock().enableBlocking();
1414
|
1415 mday 1.40 }
1416
1417
|
1418 mday 1.42 monitor_2_entry* monitor_2::add_entry(pegasus_socket& ps,
1419 monitor_2_entry_type type,
1420 void* accept_parm,
1421 void* dispatch_parm)
|
1422 mday 1.40 {
|
1423 a.arora 1.71 Sint32 fd1,fd2;
1424
1425 fd2=(Sint32) ps;
1426
|
1427 mday 1.42 monitor_2_entry* m2e = new monitor_2_entry(ps, type, accept_parm, dispatch_parm);
|
1428 a.arora 1.71
1429 // The purpose of the following piece of code is to avoid duplicate entries in
1430 // the _listeners list. Would it be too much of an overhead ?
1431 try {
1432
1433 monitor_2_entry* temp;
1434
1435 _listeners.lock(pegasus_thread_self());
1436 temp = _listeners.next(0);
1437 while(temp != 0 )
1438 {
1439 fd1=(Sint32) temp->get_sock();
1440
1441 if(fd1 == fd2)
1442 {
1443
1444 Tracer::trace(TRC_HTTP, Tracer::LEVEL3,
1445 "monitor_2::add_entry:Request for duplicate entry in _listeners for %d FD.", fd1);
1446 if(temp->get_state() == CLOSED)
1447 {
1448 temp->set_state(IDLE);
1449 a.arora 1.71 Tracer::trace(TRC_HTTP, Tracer::LEVEL3,
1450 "monitor_2::add_entry:CLOSED state changed to IDLE for %d.", fd1);
1451 }
1452 _listeners.unlock();
1453 delete m2e;
1454 return 0;
1455 }
1456 temp = _listeners.next(temp);
1457 }
1458 }
1459 catch(...)
1460 {
1461 delete m2e;
1462 return 0;
1463 }
1464
1465
1466 _listeners.unlock();
1467
1468
|
1469 mday 1.40 try{
1470 _listeners.insert_first(m2e);
1471 }
1472 catch(...){
1473 delete m2e;
|
1474 mday 1.42 return 0;
|
1475 mday 1.40 }
|
1476 a.arora 1.71 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1477 "monitor_2::add_entry:SUCCESSFULLY added to _listeners list. FD = %d.", fd2);
|
1478 mday 1.40 tickle();
|
1479 mday 1.42 return m2e;
|
1480 mday 1.40 }
1481
1482 Boolean monitor_2::remove_entry(Sint32 s)
1483 {
1484 monitor_2_entry* temp;
1485 try {
1486 _listeners.try_lock(pegasus_thread_self());
1487 temp = _listeners.next(0);
1488 while(temp != 0){
|
1489 mday 1.42 if(s == (Sint32)temp->_rep->psock ){
|
1490 mday 1.40 temp = _listeners.remove_no_lock(temp);
1491 delete temp;
1492 _listeners.unlock();
1493 return true;
1494 }
1495 temp = _listeners.next(temp);
1496 }
1497 _listeners.unlock();
1498 }
1499 catch(...){
1500 }
1501 return false;
|
1502 mday 1.7 }
|
1503 mday 1.37
|
1504 mday 1.42 Uint32 monitor_2::getOutstandingRequestCount(void)
1505 {
1506 return _requestCount.value();
1507
|
1508 mday 1.49 }
1509
1510
1511 HTTPConnection2* monitor_2::remove_connection(Sint32 sock)
1512 {
1513
1514 HTTPConnection2* temp;
1515 try
1516 {
1517 monitor_2::_connections.lock(pegasus_thread_self());
1518 temp = monitor_2::_connections.next(0);
1519 while(temp != 0 )
1520 {
1521 if(sock == temp->getSocket())
1522 {
1523 temp = monitor_2::_connections.remove_no_lock(temp);
1524 monitor_2::_connections.unlock();
1525 return temp;
1526 }
1527 temp = monitor_2::_connections.next(temp);
1528 }
1529 mday 1.49 monitor_2::_connections.unlock();
1530 }
1531 catch(...)
1532 {
1533 }
1534 return 0;
1535 }
1536
1537 Boolean monitor_2::insert_connection(HTTPConnection2* connection)
1538 {
1539 try
1540 {
1541 monitor_2::_connections.insert_first(connection);
1542 }
1543 catch(...)
1544 {
1545 return false;
1546 }
1547 return true;
|
1548 mday 1.42 }
|
1549 mday 1.7
|
1550 mike 1.2
1551 PEGASUS_NAMESPACE_END
|