1 karl 1.103 //%2006////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mike 1.2 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
15 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
18 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.103 //
|
21 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
24 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
27 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Mike Brasher (mbrasher@bmc.com)
33 //
|
34 r.kieninger 1.83 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
|
35 a.arora 1.71 // Amit K Arora (Bug#1153) amita@in.ibm.com
|
36 alagaraja 1.75 // Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
|
37 sushma.fernandes 1.78 // Sushma Fernandes (sushma@hp.com) for Bug#2057
|
38 joyce.j 1.84 // Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
|
39 kumpf 1.85 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
40 mike 1.2 //
41 //%/////////////////////////////////////////////////////////////////////////////
42
43 #include <Pegasus/Common/Config.h>
|
44 mday 1.40
|
45 mike 1.2 #include <cstring>
46 #include "Monitor.h"
47 #include "MessageQueue.h"
48 #include "Socket.h"
|
49 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
50 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
51 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
52 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
53 mike 1.100 #include "ArrayIterator.h"
|
54 mike 1.2
|
55 w.white 1.103.10.4
56
57 const static DWORD MAX_BUFFER_SIZE = 4096; // 4 kilobytes
58
|
59 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
60 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
61 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
62 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
63 david.dillard 1.95 <windows.h>). Find inclusion of that header which is visible to this \
|
64 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
65 otherwise, less than 64 clients (the default) will be able to connect to the \
66 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
67 mday 1.5
|
68 mike 1.2 # endif
69 # define FD_SETSIZE 1024
|
70 mday 1.5 # include <windows.h>
|
71 mike 1.2 #else
72 # include <sys/types.h>
73 # include <sys/socket.h>
74 # include <sys/time.h>
75 # include <netinet/in.h>
76 # include <netdb.h>
77 # include <arpa/inet.h>
78 #endif
79
80 PEGASUS_USING_STD;
81
82 PEGASUS_NAMESPACE_BEGIN
83
|
84 mike 1.96 static AtomicInt _connections(0);
|
85 mreddy 1.103.10.9 Mutex Monitor::_cout_mut;
|
86 w.white 1.103.10.1
|
87 j.alex 1.103.10.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
88 #define PIPE_INCREMENT 1
89 #endif
|
90 w.white 1.103.10.1
|
91 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
92 //
93 // Monitor
94 //
95 ////////////////////////////////////////////////////////////////////////////////
96
|
97 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
98 mike 1.2 Monitor::Monitor()
|
99 kumpf 1.87 : _stopConnections(0),
|
100 a.arora 1.73 _stopConnectionsSem(0),
|
101 a.dunfey 1.76 _solicitSocketCount(0),
|
102 a.dunfey 1.77 _tickle_client_socket(-1),
103 _tickle_server_socket(-1),
104 _tickle_peer_socket(-1)
|
105 mike 1.2 {
|
106 mreddy 1.103.10.9 {
107 AutoMutex automut(Monitor::_cout_mut);
108 PEGASUS_STD(cout) << "Entering: Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
109 }
|
110 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
111 mike 1.2 Socket::initializeInterface();
|
112 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
113 a.arora 1.73
114 // setup the tickler
115 initializeTickler();
116
|
117 r.kieninger 1.83 // Start the count at 1 because initilizeTickler()
|
118 a.arora 1.73 // has added an entry in the first position of the
119 // _entries array
120 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
121 mday 1.37 {
122 _MonitorEntry entry(0, 0, 0);
123 _entries.append(entry);
124 }
|
125 mreddy 1.103.10.9 {
126 AutoMutex automut(Monitor::_cout_mut);
127 PEGASUS_STD(cout) << "Exiting: Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
128 }
|
129 mday 1.18 }
|
130 mday 1.20
|
131 mike 1.2 Monitor::~Monitor()
132 {
|
133 mreddy 1.103.10.9 {
134 AutoMutex automut(Monitor::_cout_mut);
135 PEGASUS_STD(cout) << "Entering: Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
136 }
|
137 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
138 a.dunfey 1.76
139 try{
|
140 a.dunfey 1.77 if(_tickle_peer_socket >= 0)
|
141 a.dunfey 1.76 {
142 Socket::close(_tickle_peer_socket);
143 }
|
144 a.dunfey 1.77 if(_tickle_client_socket >= 0)
|
145 a.dunfey 1.76 {
146 Socket::close(_tickle_client_socket);
147 }
|
148 a.dunfey 1.77 if(_tickle_server_socket >= 0)
|
149 a.dunfey 1.76 {
150 Socket::close(_tickle_server_socket);
151 }
152 }
153 catch(...)
154 {
155 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
156 "Failed to close tickle sockets");
157 }
158
|
159 mike 1.2 Socket::uninitializeInterface();
|
160 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
161 "returning from monitor destructor");
|
162 mreddy 1.103.10.9 {
163 AutoMutex automut(Monitor::_cout_mut);
164 PEGASUS_STD(cout) << "Exiting: Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
165 }
|
166 mday 1.18 }
167
|
168 a.arora 1.73 void Monitor::initializeTickler(){
|
169 mreddy 1.103.10.9 {
170 AutoMutex automut(Monitor::_cout_mut);
171 PEGASUS_STD(cout) << "Entering: Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
172 }
|
173 r.kieninger 1.83 /*
174 NOTE: On any errors trying to
175 setup out tickle connection,
|
176 a.arora 1.73 throw an exception/end the server
177 */
178
179 /* setup the tickle server/listener */
180
181 // get a socket for the server side
|
182 david.dillard 1.95 if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
|
183 a.arora 1.73 //handle error
184 MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
185 "Received error number $0 while creating the internal socket.",
186 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
187 errno);
188 #else
189 WSAGetLastError());
190 #endif
191 throw Exception(parms);
192 }
193
194 // initialize the address
195 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
196 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
197 #pragma convert(37)
198 #endif
199 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
200 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
201 #pragma convert(0)
202 #endif
203 _tickle_server_addr.sin_family = PF_INET;
204 a.arora 1.73 _tickle_server_addr.sin_port = 0;
205
|
206 kumpf 1.86 PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
|
207 a.arora 1.73
208 // bind server side to socket
209 if((::bind(_tickle_server_socket,
|
210 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
211 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
212 // handle error
|
213 r.kieninger 1.83 #ifdef PEGASUS_OS_ZOS
214 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
215 "Received error:$0 while binding the internal socket.",strerror(errno));
216 #else
|
217 a.arora 1.73 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
218 "Received error number $0 while binding the internal socket.",
219 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
220 errno);
221 #else
222 WSAGetLastError());
223 #endif
|
224 r.kieninger 1.83 #endif
|
225 a.arora 1.73 throw Exception(parms);
226 }
227
228 // tell the kernel we are a server
229 if((::listen(_tickle_server_socket,3)) < 0){
230 // handle error
231 MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
232 "Received error number $0 while listening to the internal socket.",
233 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
234 errno);
235 #else
236 WSAGetLastError());
237 #endif
238 throw Exception(parms);
239 }
|
240 r.kieninger 1.83
|
241 a.arora 1.73 // make sure we have the correct socket for our server
242 int sock = ::getsockname(_tickle_server_socket,
|
243 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
244 &_addr_size);
|
245 a.arora 1.73 if(sock < 0){
246 // handle error
247 MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
248 "Received error number $0 while getting the internal socket name.",
249 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
250 errno);
251 #else
252 WSAGetLastError());
253 #endif
254 throw Exception(parms);
255 }
256
257 /* set up the tickle client/connector */
|
258 r.kieninger 1.83
|
259 a.arora 1.73 // get a socket for our tickle client
|
260 david.dillard 1.95 if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
|
261 a.arora 1.73 // handle error
262 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
263 "Received error number $0 while creating the internal client socket.",
264 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
265 errno);
266 #else
267 WSAGetLastError());
268 #endif
269 throw Exception(parms);
270 }
271
272 // setup the address of the client
273 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
274 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
275 #pragma convert(37)
276 #endif
277 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
278 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
279 #pragma convert(0)
280 #endif
281 _tickle_client_addr.sin_family = PF_INET;
282 a.arora 1.73 _tickle_client_addr.sin_port = 0;
283
284 // bind socket to client side
285 if((::bind(_tickle_client_socket,
|
286 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
|
287 a.arora 1.73 sizeof(_tickle_client_addr))) < 0){
288 // handle error
289 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
290 "Received error number $0 while binding the internal client socket.",
291 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
292 errno);
293 #else
294 WSAGetLastError());
295 #endif
296 throw Exception(parms);
297 }
298
299 // connect to server side
300 if((::connect(_tickle_client_socket,
|
301 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
302 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
303 // handle error
304 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
305 "Received error number $0 while connecting the internal client socket.",
306 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
307 errno);
308 #else
309 WSAGetLastError());
310 #endif
311 throw Exception(parms);
312 }
313
314 /* set up the slave connection */
315 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
|
316 kumpf 1.86 PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
|
317 r.kieninger 1.83 pegasus_sleep(1);
|
318 a.arora 1.73
319 // this call may fail, we will try a max of 20 times to establish this peer connection
320 if((_tickle_peer_socket = ::accept(_tickle_server_socket,
|
321 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
322 &peer_size)) < 0){
|
323 a.arora 1.73 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
324 // Only retry on non-windows platforms.
325 if(_tickle_peer_socket == -1 && errno == EAGAIN)
326 {
|
327 r.kieninger 1.83 int retries = 0;
|
328 a.arora 1.73 do
329 {
330 pegasus_sleep(1);
331 _tickle_peer_socket = ::accept(_tickle_server_socket,
|
332 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
333 &peer_size);
|
334 a.arora 1.73 retries++;
335 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
336 }
337 #endif
338 }
339 if(_tickle_peer_socket == -1){
340 // handle error
341 MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
342 "Received error number $0 while accepting the internal socket connection.",
343 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
344 errno);
345 #else
346 WSAGetLastError());
347 #endif
348 throw Exception(parms);
349 }
350 // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
351 // checks entries with IDLE state for events
352 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
353 entry._status = _MonitorEntry::IDLE;
354 _entries.append(entry);
|
355 mreddy 1.103.10.9 {
356 AutoMutex automut(Monitor::_cout_mut);
357 PEGASUS_STD(cout) << "Exiting: Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
358 }
|
359 a.arora 1.73 }
360
361 void Monitor::tickle(void)
362 {
|
363 mreddy 1.103.10.9 {
364 AutoMutex automut(Monitor::_cout_mut);
365 PEGASUS_STD(cout) << "Entering: Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
366 }
|
367 sushma.fernandes 1.78 static char _buffer[] =
|
368 a.arora 1.73 {
369 '0','0'
370 };
|
371 r.kieninger 1.83
|
372 sushma.fernandes 1.78 AutoMutex autoMutex(_tickle_mutex);
|
373 r.kieninger 1.83 Socket::disableBlocking(_tickle_client_socket);
|
374 sushma.fernandes 1.78 Socket::write(_tickle_client_socket,&_buffer, 2);
|
375 r.kieninger 1.83 Socket::enableBlocking(_tickle_client_socket);
|
376 mreddy 1.103.10.9 {
377 AutoMutex automut(Monitor::_cout_mut);
378 PEGASUS_STD(cout) << "Exiting: Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
379 }
|
380 sushma.fernandes 1.78 }
381
382 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
383 {
384 // Set the state to requested state
385 _entries[index]._status = status;
|
386 a.arora 1.73 }
387
|
388 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
389 {
|
390 mreddy 1.103.10.9 {
391 AutoMutex automut(Monitor::_cout_mut);
392 PEGASUS_STD(cout) << "Entering: Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
393 }
|
394 mday 1.18
|
395 mday 1.25 Boolean handled_events = false;
|
396 a.arora 1.73 int i = 0;
|
397 r.kieninger 1.83
|
398 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
399 a.arora 1.73
|
400 mday 1.25 fd_set fdread;
401 FD_ZERO(&fdread);
|
402 a.arora 1.73
|
403 kumpf 1.94 AutoMutex autoEntryMutex(_entry_mut);
|
404 r.kieninger 1.83
|
405 mike 1.100 ArrayIterator<_MonitorEntry> entries(_entries);
406
|
407 r.kieninger 1.83 // Check the stopConnections flag. If set, clear the Acceptor monitor entries
|
408 mike 1.96 if (_stopConnections.get() == 1)
|
409 kumpf 1.48 {
|
410 mike 1.100 for ( int indx = 0; indx < (int)entries.size(); indx++)
|
411 kumpf 1.48 {
|
412 mike 1.100 if (entries[indx]._type == Monitor::ACCEPTOR)
|
413 kumpf 1.48 {
|
414 mike 1.100 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
|
415 kumpf 1.48 {
|
416 mike 1.100 if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
417 entries[indx]._status.get() == _MonitorEntry::DYING )
|
418 kumpf 1.48 {
419 // remove the entry
|
420 mike 1.100 entries[indx]._status = _MonitorEntry::EMPTY;
|
421 kumpf 1.48 }
422 else
423 {
424 // set status to DYING
|
425 mike 1.100 entries[indx]._status = _MonitorEntry::DYING;
|
426 kumpf 1.48 }
427 }
428 }
429 }
430 _stopConnections = 0;
|
431 a.arora 1.73 _stopConnectionsSem.signal();
|
432 kumpf 1.48 }
|
433 kumpf 1.51
|
434 mike 1.100 for( int indx = 0; indx < (int)entries.size(); indx++)
|
435 kumpf 1.68 {
|
436 mike 1.100 const _MonitorEntry &entry = entries[indx];
|
437 mike 1.96 if ((entry._status.get() == _MonitorEntry::DYING) &&
|
438 brian.campbell 1.80 (entry._type == Monitor::CONNECTION))
|
439 kumpf 1.68 {
|
440 brian.campbell 1.80 MessageQueue *q = MessageQueue::lookup(entry.queueId);
|
441 kumpf 1.68 PEGASUS_ASSERT(q != 0);
|
442 brian.campbell 1.80 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
|
443 r.kieninger 1.83
|
444 brian.campbell 1.80 if (h._connectionClosePending == false)
445 continue;
446
447 // NOTE: do not attempt to delete while there are pending responses
|
448 r.kieninger 1.83 // coming thru. The last response to come thru after a
|
449 brian.campbell 1.80 // _connectionClosePending will reset _responsePending to false
450 // and then cause the monitor to rerun this code and clean up.
451 // (see HTTPConnection.cpp)
452
453 if (h._responsePending == true)
454 {
455 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
456 "Ignoring connection delete request because "
457 "responses are still pending. "
|
458 r.kieninger 1.83 "connection=0x%p, socket=%d\n",
|
459 brian.campbell 1.81 (void *)&h, h.getSocket());
|
460 brian.campbell 1.80 continue;
461 }
462 h._connectionClosePending = false;
463 MessageQueue &o = h.get_owner();
464 Message* message= new CloseConnectionMessage(entry.socket);
|
465 kumpf 1.68 message->dest = o.getQueueId();
466
|
467 r.kieninger 1.83 // HTTPAcceptor is responsible for closing the connection.
|
468 kumpf 1.68 // The lock is released to allow HTTPAcceptor to call
|
469 r.kieninger 1.83 // unsolicitSocketMessages to free the entry.
|
470 kumpf 1.68 // Once HTTPAcceptor completes processing of the close
471 // connection, the lock is re-requested and processing of
472 // the for loop continues. This is safe with the current
|
473 mike 1.100 // implementation of the entries object. Note that the
474 // loop condition accesses the entries.size() on each
|
475 kumpf 1.68 // iteration, so that a change in size while the mutex is
476 // unlocked will not result in an ArrayIndexOutOfBounds
477 // exception.
478
|
479 kumpf 1.94 autoEntryMutex.unlock();
|
480 kumpf 1.68 o.enqueue(message);
|
481 kumpf 1.94 autoEntryMutex.lock();
|
482 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
483 // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
484 entries.reset(_entries);
|
485 kumpf 1.68 }
486 }
487
|
488 kumpf 1.51 Uint32 _idleEntries = 0;
|
489 r.kieninger 1.83
|
490 a.arora 1.73 /*
|
491 david.dillard 1.95 We will keep track of the maximum socket number and pass this value
492 to the kernel as a parameter to SELECT. This loop seems like a good
493 place to calculate the max file descriptor (maximum socket number)
494 because we have to traverse the entire array.
|
495 r.kieninger 1.83 */
|
496 w.white 1.103.10.1 //Array<HANDLE> pipeEventArray;
|
497 j.alex 1.103.10.2 PEGASUS_SOCKET maxSocketCurrentPass = 0;
|
498 w.white 1.103.10.1 int indx;
499
|
500 j.alex 1.103.10.2
501 #ifdef PEGASUS_OS_TYPE_WINDOWS
502
|
503 w.white 1.103.10.1 //This array associates named pipe connections to their place in [indx]
504 //in the entries array. The value in poition zero of the array is the
505 //index of the fist named pipe connection in the entries array
506 Array <Uint32> indexPipeCountAssociator;
|
507 j.alex 1.103.10.2 int pipeEntryCount=0;
508 int MaxPipes = PIPE_INCREMENT;
509 HANDLE* hEvents = new HANDLE[PIPE_INCREMENT];
510
511 #endif
|
512 w.white 1.103.10.1
513 for( indx = 0; indx < (int)entries.size(); indx++)
|
514 mike 1.2 {
|
515 a.arora 1.73
|
516 j.alex 1.103.10.2
517 #ifdef PEGASUS_OS_TYPE_WINDOWS
518 if(entries[indx].isNamedPipeConnection())
519 {
|
520 w.white 1.103.10.5
|
521 j.alex 1.103.10.2 //entering this clause mean that a Named Pipe connection is at entries[indx]
|
522 w.white 1.103.10.1 //cout << "In Monitor::run in clause to to create array of for WaitformultipuleObjects" << endl;
|
523 j.alex 1.103.10.2
|
524 w.white 1.103.10.6 //cout << "In Monitor::run - pipe being added to array is " << entries[indx].namedPipe.getName() << endl;
|
525 w.white 1.103.10.5
|
526 j.alex 1.103.10.2 entries[indx].pipeSet = false;
|
527 w.white 1.103.10.5
|
528 j.alex 1.103.10.2 // We can Keep a counter in the Monitor class for the number of named pipes ...
529 // Which can be used here to create the array size for hEvents..( obviously before this for loop.:-) )
530 if (pipeEntryCount >= MaxPipes)
531 {
|
532 w.white 1.103.10.6 // cout << "Monitor::run 'if (pipeEntryCount >= MaxPipes)' begining - pipeEntryCount=" <<
533 // pipeEntryCount << " MaxPipes=" << MaxPipes << endl;
|
534 j.alex 1.103.10.2 MaxPipes += PIPE_INCREMENT;
|
535 w.white 1.103.10.5 HANDLE* temp_hEvents = new HANDLE[MaxPipes];
536
|
537 j.alex 1.103.10.2 for (Uint32 i =0;i<pipeEntryCount;i++)
538 {
539 temp_hEvents[i] = hEvents[i];
540 }
|
541 w.white 1.103.10.5
|
542 j.alex 1.103.10.2 delete [] hEvents;
|
543 w.white 1.103.10.5
|
544 j.alex 1.103.10.2 hEvents = temp_hEvents;
|
545 w.white 1.103.10.6 // cout << "Monitor::run 'if (pipeEntryCount >= MaxPipes)' ending"<< endl;
|
546 w.white 1.103.10.5
|
547 j.alex 1.103.10.2 }
548
|
549 w.white 1.103.10.1 //pipeEventArray.append((entries[indx].namedPipe.getOverlap()).hEvent);
|
550 j.alex 1.103.10.2 hEvents[pipeEntryCount] = entries[indx].namedPipe.getOverlap().hEvent;
|
551 w.white 1.103.10.4
|
552 w.white 1.103.10.1 indexPipeCountAssociator.append(indx);
|
553 w.white 1.103.10.4
554 pipeEntryCount++;
555
556
557
|
558 w.white 1.103.10.7 // cout << "Monitor::run pipeEntrycount is " << pipeEntryCount <<
559 // " this is the type " << entries[indx]._type << " this is index " << indx << endl;
|
560 w.white 1.103.10.1
|
561 mday 1.25 }
|
562 j.alex 1.103.10.2 else
563
564 #endif
565 {
566
567 if(maxSocketCurrentPass < entries[indx].socket)
568 maxSocketCurrentPass = entries[indx].socket;
569
570 if(entries[indx]._status.get() == _MonitorEntry::IDLE)
571 {
572 _idleEntries++;
573 FD_SET(entries[indx].socket, &fdread);
574 }
575
576 }
577 }
|
578 s.hills 1.62
|
579 a.arora 1.73 /*
|
580 david.dillard 1.95 Add 1 then assign maxSocket accordingly. We add 1 to account for
581 descriptors starting at 0.
|
582 a.arora 1.73 */
583 maxSocketCurrentPass++;
584
|
585 kumpf 1.94 autoEntryMutex.unlock();
|
586 david.dillard 1.95
587 //
588 // The first argument to select() is ignored on Windows and it is not
589 // a socket value. The original code assumed that the number of sockets
590 // and a socket value have the same type. On Windows they do not.
591 //
592 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
593 w.white 1.103.10.1 //int events = select(0, &fdread, NULL, NULL, &tv);
|
594 j.alex 1.103.10.2 int events = 0;
595 DWORD dwWait=NULL;
596 int pEvents = 0;
|
597 w.white 1.103.10.1
|
598 w.white 1.103.10.7 // cout << "events after select" << events << endl;
|
599 w.white 1.103.10.1 cout << "Calling WaitForMultipleObjects\n";
600
601 //this should be in a try block
602
603 dwWait = WaitForMultipleObjects(MaxPipes,
604 hEvents, //ABB:- array of event objects
605 FALSE, // ABB:-does not wait for all
606 20000); //ABB:- timeout value
607
608 if(dwWait == WAIT_TIMEOUT)
609 {
610 cout << "Wait WAIT_TIMEOUT\n";
|
611 j.alex 1.103.10.2
612 // Sleep(2000);
|
613 w.white 1.103.10.1 //continue;
|
614 j.alex 1.103.10.2
615 //return false; // I think we do nothing.... Mybe there is a socket connection... so
616 // cant return.
|
617 w.white 1.103.10.1 }
618 else if (dwWait == WAIT_FAILED)
619 {
620 cout << "Wait Failed returned\n";
621 cout << "failed with " << GetLastError() << "." << endl;
|
622 j.alex 1.103.10.2 pEvents = -1;
|
623 w.white 1.103.10.3 return false;
|
624 w.white 1.103.10.1 }
625 else
626 {
627 int pCount = dwWait - WAIT_OBJECT_0; // determines which pipe
|
628 w.white 1.103.10.7 // cout << " WaitForMultiPleObject returned activity on server pipe: "<<
629 // pCount<< endl;
|
630 w.white 1.103.10.1
|
631 j.alex 1.103.10.2 pEvents = 1;
|
632 w.white 1.103.10.1
633 //this statment gets the pipe entry that was trigered
634 entries[indexPipeCountAssociator[pCount]].pipeSet = true;
635
636 if (pCount > 0) //this means activity on pipe is CIMOperation reques
637 {
|
638 w.white 1.103.10.7 // cout << "In Monitor::run got Operation request" << endl;
|
639 w.white 1.103.10.1 //entries[indx]._type = Monitor::CONNECTION;
640 }
641 else //this clause my not be needed in production but is used for testing
642 {
|
643 w.white 1.103.10.7 // cout << "In Monitor::run got Connection request" << endl;
|
644 w.white 1.103.10.1
645 }
646
647 }
648 //
649
|
650 j.alex 1.103.10.2
|
651 w.white 1.103.10.1 // Sleep(2000);
652
653 //int events = 1;
654 /*if (dwWait)
655 {
656 cout << "in Monitor::run about to call handlePipeConnectionEvent" << endl;
657 _handlePipeConnectionEvent(dwWait);
658 }*/
|
659 david.dillard 1.95 #else
|
660 a.arora 1.73 int events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
661 david.dillard 1.95 #endif
|
662 kumpf 1.94 autoEntryMutex.lock();
|
663 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
664 // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
665 entries.reset(_entries);
|
666 w.white 1.103.10.1
|
667 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
668 j.alex 1.103.10.2 if(pEvents == -1)
669 {
670 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
671 "Monitor::run - errorno = %d has occurred on select.",GetLastError() );
672 // The EBADF error indicates that one or more or the file
673 // descriptions was not valid. This could indicate that
674 // the entries structure has been corrupted or that
675 // we have a synchronization error.
676
677 // We need to generate an assert here...
678 PEGASUS_ASSERT(GetLastError()!= EBADF);
679
680
681 }
682
|
683 kumpf 1.50 if(events == SOCKET_ERROR)
|
684 mike 1.2 #else
|
685 kumpf 1.50 if(events == -1)
|
686 mike 1.2 #endif
|
687 mday 1.13 {
|
688 j.alex 1.103.10.2
689 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
690 kumpf 1.50 "Monitor::run - errorno = %d has occurred on select.", errno);
691 // The EBADF error indicates that one or more or the file
692 // descriptions was not valid. This could indicate that
|
693 mike 1.100 // the entries structure has been corrupted or that
|
694 kumpf 1.50 // we have a synchronization error.
695
696 PEGASUS_ASSERT(errno != EBADF);
697 }
|
698 j.alex 1.103.10.2 else if ((events)||(pEvents))
|
699 kumpf 1.50 {
|
700 w.white 1.103.10.1
|
701 w.white 1.103.10.7 // cout << "IN Monior::run 'else if (events)' clause - array size is " <<
702 // (int)entries.size() << endl;
|
703 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
704 r.kieninger 1.83 "Monitor::run select event received events = %d, monitoring %d idle entries",
|
705 david.dillard 1.95 events, _idleEntries);
|
706 mike 1.100 for( int indx = 0; indx < (int)entries.size(); indx++)
|
707 mday 1.25 {
|
708 w.white 1.103.10.5 //cout << "Monitor::run at start of 'for( int indx = 0; indx ' - index = " << indx << endl;
|
709 kumpf 1.53 // The Monitor should only look at entries in the table that are IDLE (i.e.,
710 // owned by the Monitor).
|
711 w.white 1.103.10.1 if(((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
|
712 j.alex 1.103.10.2 FD_ISSET(entries[indx].socket, &fdread)&& (events)) ||
713 (entries[indx].isNamedPipeConnection() && entries[indx].pipeSet && (pEvents)))
|
714 david.dillard 1.95 {
|
715 w.white 1.103.10.4 MessageQueue *q;
716 cout << "IN Monior::run inside - for int indx = " <<indx <<
717 "and queue ID is " << entries[indx].queueId << endl;
718 try{
719
720 q = MessageQueue::lookup(entries[indx].queueId);
721 }
722 catch (Exception e)
723 {
724 cout << " this is what lookup gives - " << e.getMessage() << endl;
725 exit(1);
726 }
727 catch(...)
728 {
729 cout << "MessageQueue::lookup gives strange exception " << endl;
730 exit(1);
731 }
732
733
734
735 cout << "Monitor::run after MessageQueue::lookup(entries[indx].queueId)" << endl;
736 w.white 1.103.10.4 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
737 kumpf 1.53 "Monitor::run indx = %d, queueId = %d, q = %p",
|
738 mike 1.100 indx, entries[indx].queueId, q);
|
739 w.white 1.103.10.4 cout << "Monitor::run before PEGASUS_ASSerT(q !=0) " << endl;
|
740 kumpf 1.53 PEGASUS_ASSERT(q !=0);
|
741 mday 1.37
|
742 david.dillard 1.95 try
743 {
|
744 w.white 1.103.10.1 cout <<" this is the type " << entries[indx]._type <<
745 "for index " << indx << endl;
746 cout << "IN Monior::run right before entries[indx]._type == Monitor::CONNECTION" << endl;
|
747 mike 1.100 if(entries[indx]._type == Monitor::CONNECTION)
|
748 david.dillard 1.95 {
|
749 w.white 1.103.10.4 cout << "In Monitor::run Monitor::CONNECTION clause" << endl;
750
751 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
752 mike 1.100 "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
|
753 david.dillard 1.95 static_cast<HTTPConnection *>(q)->_entry_index = indx;
|
754 sushma.fernandes 1.78
755 // Do not update the entry just yet. The entry gets updated once
|
756 r.kieninger 1.83 // the request has been read.
|
757 mike 1.100 //entries[indx]._status = _MonitorEntry::BUSY;
|
758 sushma.fernandes 1.78
|
759 kumpf 1.66 // If allocate_and_awaken failure, retry on next iteration
|
760 a.arora 1.73 /* Removed for PEP 183.
|
761 kumpf 1.69 if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
762 (void *)q, _dispatch))
|
763 kumpf 1.67 {
764 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
765 "Monitor::run: Insufficient resources to process request.");
|
766 mike 1.100 entries[indx]._status = _MonitorEntry::IDLE;
|
767 kumpf 1.67 return true;
768 }
|
769 a.arora 1.73 */
770 // Added for PEP 183
|
771 david.dillard 1.95 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
772 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
773 a.arora 1.73 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
774 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
775 w.white 1.103.10.6
776 /*In the case of named Pipes, the request has already been read from the pipe
777 therefor this section passed the request data to the HTTPConnection
778 NOTE: not sure if this would be better suited in a sparate private method
779 */
|
780 w.white 1.103.10.7 dst->setNamedPipe(entries[indx].namedPipe); //this step shouldn't be needd
|
781 w.white 1.103.10.8 cout << "In Monitor::run after dst->setNamedPipe string read is " <<
782 entries[indx].namedPipe.raw << endl;
|
783 w.white 1.103.10.6
|
784 a.arora 1.73 try
785 {
|
786 w.white 1.103.10.3 cout << "In Monitor::run about to call 'dst->run(1)' " << endl;
|
787 a.arora 1.73 dst->run(1);
788 }
|
789 david.dillard 1.95 catch (...)
790 {
791 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
792 "Monitor::_dispatch: exception received");
793 }
794 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
795 a.arora 1.73 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
796
|
797 sushma.fernandes 1.78 // It is possible the entry status may not be set to busy.
|
798 r.kieninger 1.83 // The following will fail in that case.
|
799 mike 1.96 // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
|
800 a.arora 1.73 // Once the HTTPConnection thread has set the status value to either
801 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
802 // to the Monitor. It is no longer permissible to access the connection
803 // or the entry in the _entries table.
|
804 sushma.fernandes 1.78
805 // The following is not relevant as the worker thread or the
806 // reader thread will update the status of the entry.
807 //if (dst->_connectionClosePending)
|
808 r.kieninger 1.83 //{
|
809 sushma.fernandes 1.78 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
810 //}
811 //else
812 //{
813 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
814 r.kieninger 1.83 //}
815 // end Added for PEP 183
|
816 a.arora 1.73 }
|
817 mike 1.100 else if( entries[indx]._type == Monitor::INTERNAL){
|
818 r.kieninger 1.83 // set ourself to BUSY,
819 // read the data
|
820 a.arora 1.73 // and set ourself back to IDLE
|
821 w.white 1.103.10.1 cout << " in - entries[indx]._type == Monitor::INTERNAL- " << endl;
|
822 r.kieninger 1.83
|
823 mike 1.100 entries[indx]._status = _MonitorEntry::BUSY;
|
824 a.arora 1.73 static char buffer[2];
|
825 mike 1.100 Socket::disableBlocking(entries[indx].socket);
826 Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
827 Socket::enableBlocking(entries[indx].socket);
828 entries[indx]._status = _MonitorEntry::IDLE;
|
829 mday 1.37 }
830 else
|
831 mday 1.25 {
|
832 w.white 1.103.10.1 cout << "In Monitor::run else clause of CONNECTION if statments" << endl;
|
833 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
834 "Non-connection entry, indx = %d, has been received.", indx);
|
835 mday 1.37 int events = 0;
|
836 w.white 1.103.10.1 Message *msg;
|
837 j.alex 1.103.10.2 cout << " In Monitor::run Just before checking if NamedPipeConnection" << "for Index "<<indx<< endl;
|
838 w.white 1.103.10.1
839 if (entries[indx].isNamedPipeConnection())
840 {
|
841 w.white 1.103.10.4 if(!entries[indx].namedPipe.isConnectionPipe)
842 { /*if we enter this clasue it means that the named pipe that we are
843 looking at has recived a connection but is not the pipe we get connection requests over.
844 therefore we neew to change the _type to CONNECTION and wait for a CIM Operations request*/
845 entries[indx]._type = Monitor::CONNECTION;
846
847
|
848 w.white 1.103.10.8 /* This is a test - this shows that the read file needs to be done
|
849 w.white 1.103.10.4 before we call wiatForMultipleObjects*/
850 /******************************************************
851 ********************************************************/
852
|
853 w.white 1.103.10.7 //DWORD size = 0;
|
854 w.white 1.103.10.4
855 BOOL rc = ::ReadFile(
856 entries[indx].namedPipe.getPipe(),
|
857 w.white 1.103.10.6 &entries[indx].namedPipe.raw,
858 MAX_BUFFER_SIZE,
|
859 w.white 1.103.10.7 &entries[indx].namedPipe.bytesRead,
|
860 w.white 1.103.10.4 &entries[indx].namedPipe.getOverlap());
|
861 w.white 1.103.10.6
862 cout << "just called read on index " << indx << endl;
863
|
864 w.white 1.103.10.7 //&entries[indx].namedPipe.bytesRead = &size;
|
865 w.white 1.103.10.4 if(!rc)
866 {
867
868 cout << "ReadFile failed for : " << GetLastError() << "."<< endl;
869
870 }
871
872
873
874 /******************************************************
875 ********************************************************/
876
877
878
879
880 continue;
881
882 }
|
883 w.white 1.103.10.1 cout << " In Monitor::run about to create a Pipe message" << endl;
884 events |= NamedPipeMessage::READ;
885 msg = new NamedPipeMessage(entries[indx].namedPipe, events);
886 }
887 else
888 {
|
889 j.alex 1.103.10.2 cout << " In Monitor::run ..its a socket message" << endl;
|
890 w.white 1.103.10.1 events |= SocketMessage::READ;
891 msg = new SocketMessage(entries[indx].socket, events);
892 }
893
|
894 mike 1.100 entries[indx]._status = _MonitorEntry::BUSY;
|
895 kumpf 1.94 autoEntryMutex.unlock();
|
896 mday 1.37 q->enqueue(msg);
|
897 kumpf 1.94 autoEntryMutex.lock();
|
898 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
899 // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
900 entries.reset(_entries);
|
901 mike 1.100 entries[indx]._status = _MonitorEntry::IDLE;
|
902 kumpf 1.94
|
903 mreddy 1.103.10.9 {
904 AutoMutex automut(Monitor::_cout_mut);
905 PEGASUS_STD(cout) << "Exiting: Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
906 }
|
907 mday 1.25 return true;
908 }
909 }
|
910 mday 1.37 catch(...)
|
911 mday 1.25 {
912 }
913 handled_events = true;
914 }
915 }
|
916 mday 1.24 }
|
917 kumpf 1.94
|
918 mreddy 1.103.10.9 {
919 AutoMutex automut(Monitor::_cout_mut);
920 PEGASUS_STD(cout) << "Exiting: Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
921 }
|
922 mday 1.13 return(handled_events);
|
923 mike 1.2 }
924
|
925 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
926 kumpf 1.48 {
|
927 mreddy 1.103.10.9 {
928 AutoMutex automut(Monitor::_cout_mut);
929 PEGASUS_STD(cout) << "Entering: Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
930 }
|
931 kumpf 1.48 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
932 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
933 kumpf 1.48 _stopConnections = 1;
|
934 a.arora 1.73 tickle();
|
935 kumpf 1.48
|
936 chuck 1.74 if (wait)
|
937 a.arora 1.73 {
|
938 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
939 // caller of this function may unbind the ports while the monitor
940 // is still accepting connections on them.
|
941 kumpf 1.101 _stopConnectionsSem.wait();
|
942 a.arora 1.73 }
|
943 r.kieninger 1.83
|
944 kumpf 1.48 PEG_METHOD_EXIT();
|
945 mreddy 1.103.10.9 {
946 AutoMutex automut(Monitor::_cout_mut);
947 PEGASUS_STD(cout) << "Exiting: Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
948 }
|
949 kumpf 1.48 }
|
950 mday 1.25
|
951 mday 1.37
|
952 mday 1.25 int Monitor::solicitSocketMessages(
|
953 david.dillard 1.95 PEGASUS_SOCKET socket,
|
954 mike 1.2 Uint32 events,
|
955 r.kieninger 1.83 Uint32 queueId,
|
956 mday 1.8 int type)
|
957 mike 1.2 {
|
958 mreddy 1.103.10.9 {
959 AutoMutex automut(Monitor::_cout_mut);
960 PEGASUS_STD(cout) << "Entering: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
961 }
|
962 r.kieninger 1.83 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
963 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
964 a.arora 1.73 // Check to see if we need to dynamically grow the _entries array
965 // We always want the _entries array to 2 bigger than the
966 // current connections requested
967 _solicitSocketCount++; // bump the count
968 int size = (int)_entries.size();
|
969 w.otsuka 1.89 if((int)_solicitSocketCount >= (size-1)){
970 for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
|
971 a.arora 1.73 _MonitorEntry entry(0, 0, 0);
972 _entries.append(entry);
973 }
974 }
|
975 kumpf 1.4
|
976 a.arora 1.73 int index;
977 for(index = 1; index < (int)_entries.size(); index++)
|
978 mday 1.25 {
|
979 a.arora 1.73 try
|
980 mday 1.37 {
|
981 mike 1.96 if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
|
982 a.arora 1.73 {
983 _entries[index].socket = socket;
984 _entries[index].queueId = queueId;
985 _entries[index]._type = type;
986 _entries[index]._status = _MonitorEntry::IDLE;
|
987 r.kieninger 1.83
|
988 mreddy 1.103.10.9 {
989 AutoMutex automut(Monitor::_cout_mut);
990 PEGASUS_STD(cout) << "Exiting: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
991 }
|
992 a.arora 1.73 return index;
993 }
|
994 mday 1.37 }
995 catch(...)
|
996 mday 1.25 {
997 }
998 }
|
999 a.arora 1.73 _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful
|
1000 mday 1.25 PEG_METHOD_EXIT();
|
1001 mreddy 1.103.10.9 {
1002 AutoMutex automut(Monitor::_cout_mut);
1003 PEGASUS_STD(cout) << "Exiting: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1004 }
|
1005 kumpf 1.50 return -1;
|
1006 a.arora 1.73
|
1007 mike 1.2 }
1008
|
1009 david.dillard 1.95 void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
|
1010 mike 1.2 {
|
1011 mreddy 1.103.10.9 {
1012 AutoMutex automut(Monitor::_cout_mut);
1013 PEGASUS_STD(cout) << "Entering: Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1014 }
|
1015 kumpf 1.50
|
1016 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
1017 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
1018 a.arora 1.73
1019 /*
1020 Start at index = 1 because _entries[0] is the tickle entry which never needs
1021 to be EMPTY;
1022 */
|
1023 w.otsuka 1.89 unsigned int index;
|
1024 a.arora 1.73 for(index = 1; index < _entries.size(); index++)
|
1025 mike 1.2 {
|
1026 mday 1.25 if(_entries[index].socket == socket)
1027 {
|
1028 a.arora 1.73 _entries[index]._status = _MonitorEntry::EMPTY;
|
1029 david.dillard 1.95 _entries[index].socket = PEGASUS_INVALID_SOCKET;
|
1030 a.arora 1.73 _solicitSocketCount--;
1031 break;
|
1032 mday 1.25 }
|
1033 mike 1.2 }
|
1034 a.arora 1.73
1035 /*
1036 Dynamic Contraction:
1037 To remove excess entries we will start from the end of the _entries array
1038 and remove all entries with EMPTY status until we find the first NON EMPTY.
1039 This prevents the positions, of the NON EMPTY entries, from being changed.
|
1040 r.kieninger 1.83 */
|
1041 a.arora 1.73 index = _entries.size() - 1;
|
1042 mike 1.96 while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
|
1043 a.arora 1.73 if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
1044 _entries.remove(index);
1045 index--;
1046 }
|
1047 kumpf 1.4 PEG_METHOD_EXIT();
|
1048 mreddy 1.103.10.9 {
1049 AutoMutex automut(Monitor::_cout_mut);
1050 PEGASUS_STD(cout) << "Exiting: Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1051 }
|
1052 mike 1.2 }
1053
|
1054 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
1055 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
1056 {
|
1057 mreddy 1.103.10.9 {
1058 AutoMutex automut(Monitor::_cout_mut);
1059 PEGASUS_STD(cout) << "Entering: Monitor::_dispatch(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1060 }
|
1061 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
1062 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
1063 kumpf 1.53 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
1064 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
1065 kumpf 1.51 try
1066 {
1067 dst->run(1);
1068 }
1069 catch (...)
1070 {
1071 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1072 "Monitor::_dispatch: exception received");
1073 }
1074 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1075 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
|
1076 r.kieninger 1.83
|
1077 mike 1.96 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
|
1078 kumpf 1.68
1079 // Once the HTTPConnection thread has set the status value to either
1080 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
1081 // to the Monitor. It is no longer permissible to access the connection
1082 // or the entry in the _entries table.
|
1083 kumpf 1.50 if (dst->_connectionClosePending)
1084 {
|
1085 kumpf 1.68 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
1086 }
1087 else
1088 {
1089 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
1090 kumpf 1.50 }
|
1091 mday 1.8 return 0;
|
1092 mday 1.40 }
1093
|
1094 w.white 1.103.10.1
1095 //This method is anlogsu to solicitSocketMessages. It does the same thing for named Pipes
1096 int Monitor::solicitPipeMessages(
1097 NamedPipe namedPipe,
1098 Uint32 events, //not sure what has to change for this enum
1099 Uint32 queueId,
1100 int type)
1101 {
1102 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitPipeMessages");
1103 AutoMutex autoMut(_entry_mut);
1104 // Check to see if we need to dynamically grow the _entries array
1105 // We always want the _entries array to 2 bigger than the
1106 // current connections requested
1107 PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages at the begining" << PEGASUS_STD(endl);
1108
1109
1110
1111 _solicitSocketCount++; // bump the count
1112 int size = (int)_entries.size();
1113 if((int)_solicitSocketCount >= (size-1)){
1114 for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
1115 w.white 1.103.10.1 _MonitorEntry entry(0, 0, 0);
1116 _entries.append(entry);
1117 }
1118 }
1119
1120 int index;
1121 for(index = 1; index < (int)_entries.size(); index++)
1122 {
1123 try
1124 {
1125 if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
1126 {
1127 _entries[index].socket = NULL;
1128 _entries[index].namedPipe = namedPipe;
1129 _entries[index].namedPipeConnection = true;
1130 _entries[index].queueId = queueId;
1131 _entries[index]._type = type;
1132 _entries[index]._status = _MonitorEntry::IDLE;
1133
1134 PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages after seting up _entries[index] index = " << index << PEGASUS_STD(endl);
1135
1136 w.white 1.103.10.1 return index;
1137 }
1138 }
1139 catch(...)
1140 {
1141 }
1142
1143 }
1144 _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful
1145 PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages nothing happed - it didn't work" << PEGASUS_STD(endl);
1146
1147 PEG_METHOD_EXIT();
1148 return -1;
1149
1150 }
1151
1152
|
1153 mike 1.2 PEGASUS_NAMESPACE_END
|