1 karl 1.103 //%2006////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.79 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.64 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.79 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.90 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.103 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mike 1.2 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
15 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
18 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 mreddy 1.103.10.16 //
|
21 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
24 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
27 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Mike Brasher (mbrasher@bmc.com)
33 //
|
34 r.kieninger 1.83 // Modified By: Mike Day (monitor_2) mdday@us.ibm.com
|
35 a.arora 1.71 // Amit K Arora (Bug#1153) amita@in.ibm.com
|
36 alagaraja 1.75 // Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
|
37 sushma.fernandes 1.78 // Sushma Fernandes (sushma@hp.com) for Bug#2057
|
38 joyce.j 1.84 // Josephine Eskaline Joyce (jojustin@in.ibm.com) for PEP#101
|
39 kumpf 1.85 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
40 mike 1.2 //
41 //%/////////////////////////////////////////////////////////////////////////////
42
43 #include <Pegasus/Common/Config.h>
|
44 mday 1.40
|
45 mike 1.2 #include <cstring>
46 #include "Monitor.h"
47 #include "MessageQueue.h"
48 #include "Socket.h"
|
49 kumpf 1.4 #include <Pegasus/Common/Tracer.h>
|
50 mday 1.7 #include <Pegasus/Common/HTTPConnection.h>
|
51 kumpf 1.69 #include <Pegasus/Common/MessageQueueService.h>
|
52 a.arora 1.73 #include <Pegasus/Common/Exception.h>
|
53 mike 1.100 #include "ArrayIterator.h"
|
54 mike 1.2
|
55 w.white 1.103.10.4
56
|
57 j.alex 1.103.10.20 //const static DWORD MAX_BUFFER_SIZE = 4096; // 4 kilobytes
|
58 w.white 1.103.10.4
|
59 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
60 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
61 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
62 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
63 david.dillard 1.95 <windows.h>). Find inclusion of that header which is visible to this \
|
64 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
65 otherwise, less than 64 clients (the default) will be able to connect to the \
66 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
67 mday 1.5
|
68 mike 1.2 # endif
69 # define FD_SETSIZE 1024
|
70 mday 1.5 # include <windows.h>
|
71 mike 1.2 #else
72 # include <sys/types.h>
73 # include <sys/socket.h>
74 # include <sys/time.h>
75 # include <netinet/in.h>
76 # include <netdb.h>
77 # include <arpa/inet.h>
78 #endif
79
80 PEGASUS_USING_STD;
81
82 PEGASUS_NAMESPACE_BEGIN
83
|
84 mike 1.96 static AtomicInt _connections(0);
|
85 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
86 mreddy 1.103.10.9 Mutex Monitor::_cout_mut;
|
87 j.alex 1.103.10.22 #endif
|
88 w.white 1.103.10.1
|
89 j.alex 1.103.10.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
90 #define PIPE_INCREMENT 1
91 #endif
|
92 w.white 1.103.10.1
|
93 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
94 //
95 // Monitor
96 //
97 ////////////////////////////////////////////////////////////////////////////////
98
|
99 kumpf 1.54 #define MAX_NUMBER_OF_MONITOR_ENTRIES 32
|
100 mike 1.2 Monitor::Monitor()
|
101 kumpf 1.87 : _stopConnections(0),
|
102 a.arora 1.73 _stopConnectionsSem(0),
|
103 a.dunfey 1.76 _solicitSocketCount(0),
|
104 a.dunfey 1.77 _tickle_client_socket(-1),
105 _tickle_server_socket(-1),
106 _tickle_peer_socket(-1)
|
107 mike 1.2 {
|
108 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
109 mreddy 1.103.10.9 {
110 AutoMutex automut(Monitor::_cout_mut);
111 PEGASUS_STD(cout) << "Entering: Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
112 }
|
113 j.alex 1.103.10.22 #endif
|
114 kumpf 1.54 int numberOfMonitorEntriesToAllocate = MAX_NUMBER_OF_MONITOR_ENTRIES;
|
115 mike 1.2 Socket::initializeInterface();
|
116 kumpf 1.54 _entries.reserveCapacity(numberOfMonitorEntriesToAllocate);
|
117 a.arora 1.73
118 // setup the tickler
119 initializeTickler();
120
|
121 r.kieninger 1.83 // Start the count at 1 because initilizeTickler()
|
122 a.arora 1.73 // has added an entry in the first position of the
123 // _entries array
124 for( int i = 1; i < numberOfMonitorEntriesToAllocate; i++ )
|
125 mday 1.37 {
126 _MonitorEntry entry(0, 0, 0);
127 _entries.append(entry);
128 }
|
129 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
130 mreddy 1.103.10.9 {
131 AutoMutex automut(Monitor::_cout_mut);
132 PEGASUS_STD(cout) << "Exiting: Monitor::Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
133 }
|
134 j.alex 1.103.10.22 #endif
|
135 mday 1.18 }
|
136 mday 1.20
|
137 mike 1.2 Monitor::~Monitor()
138 {
|
139 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
140 mreddy 1.103.10.9 {
141 AutoMutex automut(Monitor::_cout_mut);
142 PEGASUS_STD(cout) << "Entering: Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
143 }
|
144 j.alex 1.103.10.22 #endif
|
145 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
146 a.dunfey 1.76
147 try{
|
148 a.dunfey 1.77 if(_tickle_peer_socket >= 0)
|
149 a.dunfey 1.76 {
150 Socket::close(_tickle_peer_socket);
151 }
|
152 a.dunfey 1.77 if(_tickle_client_socket >= 0)
|
153 a.dunfey 1.76 {
154 Socket::close(_tickle_client_socket);
155 }
|
156 a.dunfey 1.77 if(_tickle_server_socket >= 0)
|
157 a.dunfey 1.76 {
158 Socket::close(_tickle_server_socket);
159 }
160 }
161 catch(...)
162 {
163 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
164 "Failed to close tickle sockets");
165 }
166
|
167 mike 1.2 Socket::uninitializeInterface();
|
168 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
169 "returning from monitor destructor");
|
170 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
171 mreddy 1.103.10.9 {
172 AutoMutex automut(Monitor::_cout_mut);
173 PEGASUS_STD(cout) << "Exiting: Monitor::~Monitor(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
174 }
|
175 j.alex 1.103.10.22 #endif
|
176 mday 1.18 }
177
|
178 a.arora 1.73 void Monitor::initializeTickler(){
|
179 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
180 mreddy 1.103.10.9 {
181 AutoMutex automut(Monitor::_cout_mut);
182 PEGASUS_STD(cout) << "Entering: Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
183 }
|
184 j.alex 1.103.10.22 #endif
|
185 r.kieninger 1.83 /*
186 NOTE: On any errors trying to
187 setup out tickle connection,
|
188 a.arora 1.73 throw an exception/end the server
189 */
190
191 /* setup the tickle server/listener */
192
193 // get a socket for the server side
|
194 david.dillard 1.95 if((_tickle_server_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
|
195 a.arora 1.73 //handle error
196 MessageLoaderParms parms("Common.Monitor.TICKLE_CREATE",
197 "Received error number $0 while creating the internal socket.",
198 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
199 errno);
200 #else
201 WSAGetLastError());
202 #endif
203 throw Exception(parms);
204 }
205
206 // initialize the address
207 memset(&_tickle_server_addr, 0, sizeof(_tickle_server_addr));
208 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
209 #pragma convert(37)
210 #endif
211 _tickle_server_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
212 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
213 #pragma convert(0)
214 #endif
215 _tickle_server_addr.sin_family = PF_INET;
216 a.arora 1.73 _tickle_server_addr.sin_port = 0;
217
|
218 kumpf 1.86 PEGASUS_SOCKLEN_T _addr_size = sizeof(_tickle_server_addr);
|
219 a.arora 1.73
220 // bind server side to socket
221 if((::bind(_tickle_server_socket,
|
222 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
223 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
224 // handle error
|
225 r.kieninger 1.83 #ifdef PEGASUS_OS_ZOS
226 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND_LONG",
227 "Received error:$0 while binding the internal socket.",strerror(errno));
228 #else
|
229 a.arora 1.73 MessageLoaderParms parms("Common.Monitor.TICKLE_BIND",
230 "Received error number $0 while binding the internal socket.",
231 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
232 errno);
233 #else
234 WSAGetLastError());
235 #endif
|
236 r.kieninger 1.83 #endif
|
237 a.arora 1.73 throw Exception(parms);
238 }
239
240 // tell the kernel we are a server
241 if((::listen(_tickle_server_socket,3)) < 0){
242 // handle error
243 MessageLoaderParms parms("Common.Monitor.TICKLE_LISTEN",
244 "Received error number $0 while listening to the internal socket.",
245 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
246 errno);
247 #else
248 WSAGetLastError());
249 #endif
250 throw Exception(parms);
251 }
|
252 r.kieninger 1.83
|
253 a.arora 1.73 // make sure we have the correct socket for our server
254 int sock = ::getsockname(_tickle_server_socket,
|
255 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
256 &_addr_size);
|
257 a.arora 1.73 if(sock < 0){
258 // handle error
259 MessageLoaderParms parms("Common.Monitor.TICKLE_SOCKNAME",
260 "Received error number $0 while getting the internal socket name.",
261 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
262 errno);
263 #else
264 WSAGetLastError());
265 #endif
266 throw Exception(parms);
267 }
268
269 /* set up the tickle client/connector */
|
270 r.kieninger 1.83
|
271 a.arora 1.73 // get a socket for our tickle client
|
272 david.dillard 1.95 if((_tickle_client_socket = ::socket(PF_INET, SOCK_STREAM, 0)) == PEGASUS_INVALID_SOCKET){
|
273 a.arora 1.73 // handle error
274 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CREATE",
275 "Received error number $0 while creating the internal client socket.",
276 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
277 errno);
278 #else
279 WSAGetLastError());
280 #endif
281 throw Exception(parms);
282 }
283
284 // setup the address of the client
285 memset(&_tickle_client_addr, 0, sizeof(_tickle_client_addr));
286 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
287 #pragma convert(37)
288 #endif
289 _tickle_client_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
290 #ifdef PEGASUS_PLATFORM_OS400_ISERIES_IBM
291 #pragma convert(0)
292 #endif
293 _tickle_client_addr.sin_family = PF_INET;
294 a.arora 1.73 _tickle_client_addr.sin_port = 0;
295
296 // bind socket to client side
297 if((::bind(_tickle_client_socket,
|
298 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_client_addr),
|
299 a.arora 1.73 sizeof(_tickle_client_addr))) < 0){
300 // handle error
301 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_BIND",
302 "Received error number $0 while binding the internal client socket.",
303 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
304 errno);
305 #else
306 WSAGetLastError());
307 #endif
308 throw Exception(parms);
309 }
310
311 // connect to server side
312 if((::connect(_tickle_client_socket,
|
313 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_server_addr),
|
314 a.arora 1.73 sizeof(_tickle_server_addr))) < 0){
315 // handle error
316 MessageLoaderParms parms("Common.Monitor.TICKLE_CLIENT_CONNECT",
317 "Received error number $0 while connecting the internal client socket.",
318 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
319 errno);
320 #else
321 WSAGetLastError());
322 #endif
323 throw Exception(parms);
324 }
325
326 /* set up the slave connection */
327 memset(&_tickle_peer_addr, 0, sizeof(_tickle_peer_addr));
|
328 kumpf 1.86 PEGASUS_SOCKLEN_T peer_size = sizeof(_tickle_peer_addr);
|
329 r.kieninger 1.83 pegasus_sleep(1);
|
330 a.arora 1.73
331 // this call may fail, we will try a max of 20 times to establish this peer connection
332 if((_tickle_peer_socket = ::accept(_tickle_server_socket,
|
333 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
334 &peer_size)) < 0){
|
335 a.arora 1.73 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
336 // Only retry on non-windows platforms.
337 if(_tickle_peer_socket == -1 && errno == EAGAIN)
338 {
|
339 r.kieninger 1.83 int retries = 0;
|
340 a.arora 1.73 do
341 {
342 pegasus_sleep(1);
343 _tickle_peer_socket = ::accept(_tickle_server_socket,
|
344 kumpf 1.88 reinterpret_cast<struct sockaddr*>(&_tickle_peer_addr),
345 &peer_size);
|
346 a.arora 1.73 retries++;
347 } while(_tickle_peer_socket == -1 && errno == EAGAIN && retries < 20);
348 }
349 #endif
350 }
351 if(_tickle_peer_socket == -1){
352 // handle error
353 MessageLoaderParms parms("Common.Monitor.TICKLE_ACCEPT",
354 "Received error number $0 while accepting the internal socket connection.",
355 #if !defined(PEGASUS_OS_TYPE_WINDOWS)
356 errno);
357 #else
358 WSAGetLastError());
359 #endif
360 throw Exception(parms);
361 }
362 // add the tickler to the list of entries to be monitored and set to IDLE because Monitor only
363 // checks entries with IDLE state for events
364 _MonitorEntry entry(_tickle_peer_socket, 1, INTERNAL);
365 entry._status = _MonitorEntry::IDLE;
366 _entries.append(entry);
|
367 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
368 mreddy 1.103.10.9 {
369 AutoMutex automut(Monitor::_cout_mut);
370 PEGASUS_STD(cout) << "Exiting: Monitor::initializeTickler(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
371 }
|
372 j.alex 1.103.10.22 #endif
|
373 a.arora 1.73 }
374
375 void Monitor::tickle(void)
376 {
|
377 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
378 mreddy 1.103.10.9 {
379 AutoMutex automut(Monitor::_cout_mut);
380 PEGASUS_STD(cout) << "Entering: Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
381 }
|
382 j.alex 1.103.10.22 #endif
|
383 sushma.fernandes 1.78 static char _buffer[] =
|
384 a.arora 1.73 {
385 '0','0'
386 };
|
387 r.kieninger 1.83
|
388 sushma.fernandes 1.78 AutoMutex autoMutex(_tickle_mutex);
|
389 r.kieninger 1.83 Socket::disableBlocking(_tickle_client_socket);
|
390 sushma.fernandes 1.78 Socket::write(_tickle_client_socket,&_buffer, 2);
|
391 r.kieninger 1.83 Socket::enableBlocking(_tickle_client_socket);
|
392 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
393 mreddy 1.103.10.9 {
394 AutoMutex automut(Monitor::_cout_mut);
395 PEGASUS_STD(cout) << "Exiting: Monitor::tickle(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
396 }
|
397 j.alex 1.103.10.22 #endif
|
398 sushma.fernandes 1.78 }
399
400 void Monitor::setState( Uint32 index, _MonitorEntry::entry_status status )
401 {
402 // Set the state to requested state
403 _entries[index]._status = status;
|
404 a.arora 1.73 }
405
|
406 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
407 {
|
408 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
409 mreddy 1.103.10.9 {
410 AutoMutex automut(Monitor::_cout_mut);
411 PEGASUS_STD(cout) << "Entering: Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
412 }
|
413 j.alex 1.103.10.22 #endif
|
414 mday 1.18
|
415 mday 1.25 Boolean handled_events = false;
|
416 a.arora 1.73 int i = 0;
|
417 r.kieninger 1.83
|
418 kumpf 1.36 struct timeval tv = {milliseconds/1000, milliseconds%1000*1000};
|
419 a.arora 1.73
|
420 mday 1.25 fd_set fdread;
421 FD_ZERO(&fdread);
|
422 a.arora 1.73
|
423 kumpf 1.94 AutoMutex autoEntryMutex(_entry_mut);
|
424 r.kieninger 1.83
|
425 mike 1.100 ArrayIterator<_MonitorEntry> entries(_entries);
426
|
427 r.kieninger 1.83 // Check the stopConnections flag. If set, clear the Acceptor monitor entries
|
428 mike 1.96 if (_stopConnections.get() == 1)
|
429 kumpf 1.48 {
|
430 mike 1.100 for ( int indx = 0; indx < (int)entries.size(); indx++)
|
431 kumpf 1.48 {
|
432 mike 1.100 if (entries[indx]._type == Monitor::ACCEPTOR)
|
433 kumpf 1.48 {
|
434 mike 1.100 if ( entries[indx]._status.get() != _MonitorEntry::EMPTY)
|
435 kumpf 1.48 {
|
436 mike 1.100 if ( entries[indx]._status.get() == _MonitorEntry::IDLE ||
437 entries[indx]._status.get() == _MonitorEntry::DYING )
|
438 kumpf 1.48 {
439 // remove the entry
|
440 j.alex 1.103.10.19 entries[indx]._status = _MonitorEntry::EMPTY;
|
441 kumpf 1.48 }
442 else
443 {
444 // set status to DYING
|
445 mike 1.100 entries[indx]._status = _MonitorEntry::DYING;
|
446 kumpf 1.48 }
447 }
448 }
449 }
450 _stopConnections = 0;
|
451 a.arora 1.73 _stopConnectionsSem.signal();
|
452 kumpf 1.48 }
|
453 kumpf 1.51
|
454 mike 1.100 for( int indx = 0; indx < (int)entries.size(); indx++)
|
455 kumpf 1.68 {
|
456 j.alex 1.103.10.19 const _MonitorEntry &entry = entries[indx];
|
457 mike 1.96 if ((entry._status.get() == _MonitorEntry::DYING) &&
|
458 j.alex 1.103.10.19 (entry._type == Monitor::CONNECTION))
|
459 kumpf 1.68 {
|
460 brian.campbell 1.80 MessageQueue *q = MessageQueue::lookup(entry.queueId);
|
461 kumpf 1.68 PEGASUS_ASSERT(q != 0);
|
462 brian.campbell 1.80 HTTPConnection &h = *static_cast<HTTPConnection *>(q);
|
463 r.kieninger 1.83
|
464 brian.campbell 1.80 if (h._connectionClosePending == false)
465 continue;
466
467 // NOTE: do not attempt to delete while there are pending responses
|
468 r.kieninger 1.83 // coming thru. The last response to come thru after a
|
469 brian.campbell 1.80 // _connectionClosePending will reset _responsePending to false
470 // and then cause the monitor to rerun this code and clean up.
471 // (see HTTPConnection.cpp)
472
473 if (h._responsePending == true)
474 {
|
475 j.alex 1.103.10.14 if (!entry.namedPipeConnection)
476 {
477 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
|
478 brian.campbell 1.80 "Ignoring connection delete request because "
479 "responses are still pending. "
|
480 r.kieninger 1.83 "connection=0x%p, socket=%d\n",
|
481 brian.campbell 1.81 (void *)&h, h.getSocket());
|
482 mreddy 1.103.10.16 }
483 else
|
484 j.alex 1.103.10.14 {
485 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "Monitor::run - "
486 "Ignoring connection delete request because "
487 "responses are still pending. "
488 "connection=0x%p, NamedPipe=%d\n",
489 (void *)&h, h.getNamedPipe().getPipe());
490 }
|
491 brian.campbell 1.80 continue;
492 }
493 h._connectionClosePending = false;
494 MessageQueue &o = h.get_owner();
|
495 j.alex 1.103.10.14 Message* message;
496 if (!entry.namedPipeConnection)
497 {
498 message= new CloseConnectionMessage(entry.socket);
499 }
500 else
501 {
502 message= new CloseConnectionMessage(entry.namedPipe);
503
504 }
|
505 kumpf 1.68 message->dest = o.getQueueId();
506
|
507 r.kieninger 1.83 // HTTPAcceptor is responsible for closing the connection.
|
508 kumpf 1.68 // The lock is released to allow HTTPAcceptor to call
|
509 r.kieninger 1.83 // unsolicitSocketMessages to free the entry.
|
510 kumpf 1.68 // Once HTTPAcceptor completes processing of the close
511 // connection, the lock is re-requested and processing of
512 // the for loop continues. This is safe with the current
|
513 mike 1.100 // implementation of the entries object. Note that the
514 // loop condition accesses the entries.size() on each
|
515 kumpf 1.68 // iteration, so that a change in size while the mutex is
516 // unlocked will not result in an ArrayIndexOutOfBounds
517 // exception.
518
|
519 kumpf 1.94 autoEntryMutex.unlock();
|
520 kumpf 1.68 o.enqueue(message);
|
521 kumpf 1.94 autoEntryMutex.lock();
|
522 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
523 // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries.
524 entries.reset(_entries);
|
525 kumpf 1.68 }
526 }
527
|
528 kumpf 1.51 Uint32 _idleEntries = 0;
|
529 r.kieninger 1.83
|
530 a.arora 1.73 /*
|
531 david.dillard 1.95 We will keep track of the maximum socket number and pass this value
532 to the kernel as a parameter to SELECT. This loop seems like a good
533 place to calculate the max file descriptor (maximum socket number)
534 because we have to traverse the entire array.
|
535 r.kieninger 1.83 */
|
536 w.white 1.103.10.1 //Array<HANDLE> pipeEventArray;
|
537 j.alex 1.103.10.2 PEGASUS_SOCKET maxSocketCurrentPass = 0;
|
538 w.white 1.103.10.1 int indx;
539
|
540 j.alex 1.103.10.2
541 #ifdef PEGASUS_OS_TYPE_WINDOWS
542
|
543 w.white 1.103.10.1 //This array associates named pipe connections to their place in [indx]
544 //in the entries array. The value in poition zero of the array is the
545 //index of the fist named pipe connection in the entries array
546 Array <Uint32> indexPipeCountAssociator;
|
547 mreddy 1.103.10.16 int pipeEntryCount=0;
|
548 j.alex 1.103.10.2 int MaxPipes = PIPE_INCREMENT;
|
549 mreddy 1.103.10.16 HANDLE* hEvents = new HANDLE[PIPE_INCREMENT];
|
550 j.alex 1.103.10.2
551 #endif
|
552 w.white 1.103.10.1
553 for( indx = 0; indx < (int)entries.size(); indx++)
|
554 mike 1.2 {
|
555 a.arora 1.73
|
556 j.alex 1.103.10.2
557 #ifdef PEGASUS_OS_TYPE_WINDOWS
558 if(entries[indx].isNamedPipeConnection())
559 {
|
560 w.white 1.103.10.5
|
561 j.alex 1.103.10.2 //entering this clause mean that a Named Pipe connection is at entries[indx]
|
562 w.white 1.103.10.1 //cout << "In Monitor::run in clause to to create array of for WaitformultipuleObjects" << endl;
|
563 j.alex 1.103.10.2
|
564 w.white 1.103.10.6 //cout << "In Monitor::run - pipe being added to array is " << entries[indx].namedPipe.getName() << endl;
|
565 mreddy 1.103.10.16
|
566 j.alex 1.103.10.2 entries[indx].pipeSet = false;
|
567 w.white 1.103.10.5
|
568 j.alex 1.103.10.2 // We can Keep a counter in the Monitor class for the number of named pipes ...
569 // Which can be used here to create the array size for hEvents..( obviously before this for loop.:-) )
570 if (pipeEntryCount >= MaxPipes)
571 {
|
572 w.white 1.103.10.6 // cout << "Monitor::run 'if (pipeEntryCount >= MaxPipes)' begining - pipeEntryCount=" <<
573 // pipeEntryCount << " MaxPipes=" << MaxPipes << endl;
|
574 j.alex 1.103.10.2 MaxPipes += PIPE_INCREMENT;
|
575 w.white 1.103.10.5 HANDLE* temp_hEvents = new HANDLE[MaxPipes];
576
|
577 j.alex 1.103.10.2 for (Uint32 i =0;i<pipeEntryCount;i++)
578 {
579 temp_hEvents[i] = hEvents[i];
580 }
|
581 w.white 1.103.10.5
|
582 j.alex 1.103.10.2 delete [] hEvents;
|
583 w.white 1.103.10.5
|
584 j.alex 1.103.10.2 hEvents = temp_hEvents;
|
585 w.white 1.103.10.6 // cout << "Monitor::run 'if (pipeEntryCount >= MaxPipes)' ending"<< endl;
|
586 w.white 1.103.10.5
|
587 mreddy 1.103.10.16 }
|
588 j.alex 1.103.10.2
|
589 w.white 1.103.10.1 //pipeEventArray.append((entries[indx].namedPipe.getOverlap()).hEvent);
|
590 mreddy 1.103.10.15 hEvents[pipeEntryCount] = entries[indx].namedPipe.getOverlap()->hEvent;
|
591 mreddy 1.103.10.16
|
592 w.white 1.103.10.1 indexPipeCountAssociator.append(indx);
|
593 w.white 1.103.10.17
|
594 j.alex 1.103.10.22 pipeEntryCount++;
|
595 w.white 1.103.10.4
596
597
|
598 mday 1.25 }
|
599 j.alex 1.103.10.2 else
|
600 mreddy 1.103.10.16
|
601 j.alex 1.103.10.2 #endif
602 {
|
603 mreddy 1.103.10.16
|
604 j.alex 1.103.10.2 if(maxSocketCurrentPass < entries[indx].socket)
605 maxSocketCurrentPass = entries[indx].socket;
606
607 if(entries[indx]._status.get() == _MonitorEntry::IDLE)
608 {
609 _idleEntries++;
610 FD_SET(entries[indx].socket, &fdread);
611 }
612
613 }
614 }
|
615 s.hills 1.62
|
616 a.arora 1.73 /*
|
617 david.dillard 1.95 Add 1 then assign maxSocket accordingly. We add 1 to account for
618 descriptors starting at 0.
|
619 a.arora 1.73 */
620 maxSocketCurrentPass++;
621
|
622 kumpf 1.94 autoEntryMutex.unlock();
|
623 david.dillard 1.95
624 //
625 // The first argument to select() is ignored on Windows and it is not
626 // a socket value. The original code assumed that the number of sockets
627 // and a socket value have the same type. On Windows they do not.
628 //
|
629 w.white 1.103.10.13
630 int events;
631 int pEvents;
632
|
633 david.dillard 1.95 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
634 mreddy 1.103.10.16
|
635 w.white 1.103.10.13 // events = select(0, &fdread, NULL, NULL, &tv);
|
636 mreddy 1.103.10.16
|
637 w.white 1.103.10.13 //if (events == NULL)
|
638 mreddy 1.103.10.16 //{ // This connection uses namedPipes
639
|
640 w.white 1.103.10.13 events = 0;
|
641 j.alex 1.103.10.2 DWORD dwWait=NULL;
|
642 w.white 1.103.10.13 pEvents = 0;
|
643 w.white 1.103.10.1
|
644 j.alex 1.103.10.22
645 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
646 {
|
647 w.white 1.103.10.17 AutoMutex automut(Monitor::_cout_mut);
648 cout << "Monitor::run - Calling WaitForMultipleObjects\n";
649 }
|
650 j.alex 1.103.10.22 #endif
651 // }
|
652 w.white 1.103.10.13 //this should be in a try block
|
653 w.white 1.103.10.1
|
654 j.alex 1.103.10.19 dwWait = WaitForMultipleObjects(
655 MaxPipes,
656 hEvents, //ABB:- array of event objects
657 FALSE, // ABB:-does not wait for all
658 milliseconds); //ABB:- timeout value //WW this may need be shorter
|
659 w.white 1.103.10.1
660 if(dwWait == WAIT_TIMEOUT)
661 {
|
662 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
663 w.white 1.103.10.17 {
664 AutoMutex automut(Monitor::_cout_mut);
|
665 w.white 1.103.10.1 cout << "Wait WAIT_TIMEOUT\n";
|
666 w.white 1.103.10.17 cout << "Monitor::run before the select in TIMEOUT clause events = " << events << endl;
667 }
|
668 j.alex 1.103.10.22 #endif
669 events = select(0, &fdread, NULL, NULL, &tv);
670 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
671 j.alex 1.103.10.23 {
|
672 j.alex 1.103.10.22 AutoMutex automut(Monitor::_cout_mut);
673 cout << "Monitor::run after the select in TIMEOUT clause events = " << events << endl;
|
674 j.alex 1.103.10.23 }
|
675 j.alex 1.103.10.22 #endif
676
|
677 w.white 1.103.10.13
|
678 j.alex 1.103.10.2 // Sleep(2000);
|
679 mreddy 1.103.10.16 //continue;
680
681 //return false; // I think we do nothing.... Mybe there is a socket connection... so
|
682 j.alex 1.103.10.2 // cant return.
|
683 w.white 1.103.10.1 }
684 else if (dwWait == WAIT_FAILED)
685 {
|
686 w.white 1.103.10.13 if (GetLastError() == 6) //WW this may be too specific
687 {
|
688 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
689 j.alex 1.103.10.23 {
|
690 w.white 1.103.10.13 AutoMutex automut(Monitor::_cout_mut);
691 cout << "Monitor::run about to call 'select since waitForMultipleObjects failed\n";
|
692 j.alex 1.103.10.23 }
|
693 j.alex 1.103.10.22 #endif
|
694 w.white 1.103.10.17 /********* NOTE
695 this time (tv) combined with the waitForMulitpleObjects timeout is
696 too long it will cause the client side to time out
697 ******************/
|
698 w.white 1.103.10.13 events = select(0, &fdread, NULL, NULL, &tv);
699
700 }
701 else
702 {
|
703 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
704 j.alex 1.103.10.23 {
|
705 w.white 1.103.10.13 AutoMutex automut(Monitor::_cout_mut);
706 cout << "Wait Failed returned\n";
707 cout << "failed with " << GetLastError() << "." << endl;
|
708 j.alex 1.103.10.23 }
|
709 j.alex 1.103.10.22 #endif
|
710 w.white 1.103.10.13 pEvents = -1;
711 return false;
712 }
|
713 w.white 1.103.10.1 }
714 else
715 {
716 int pCount = dwWait - WAIT_OBJECT_0; // determines which pipe
|
717 w.white 1.103.10.17 {
|
718 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
719 w.white 1.103.10.17 {
720 AutoMutex automut(Monitor::_cout_mut);
721 // cout << endl << "****************************" <<
722 // "Monitor::run WaitForMultiPleObject returned activity on server pipe: "<<
723 // pCount<< endl << endl;
724 cout << "Monitor::run WaitForMultiPleObject returned activity pipeEntrycount is " <<
725 pipeEntryCount <<
726 " this is the type " << entries[indexPipeCountAssociator[pCount]]._type << " this is index " << indexPipeCountAssociator[pCount] << endl;
727 }
|
728 j.alex 1.103.10.22 #endif
|
729 w.white 1.103.10.17
730 /* There is a timeing problem here sometimes the wite in HTTPConnection i s
731 not all the way done (has not _monitor->setState (_entry_index, _MonitorEntry::IDLE) )
732 there for that should be done here if it is not done alread*/
733
734 if (entries[indexPipeCountAssociator[pCount]]._status.get() != _MonitorEntry::IDLE)
735 {
736 this->setState(indexPipeCountAssociator[pCount], _MonitorEntry::IDLE);
|
737 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
738 j.alex 1.103.10.23 {
|
739 j.alex 1.103.10.22 AutoMutex automut(Monitor::_cout_mut);
|
740 j.alex 1.103.10.23 cout << "setting state of index " << indexPipeCountAssociator[pCount] << " to IDLE" << endl;
741 }
|
742 j.alex 1.103.10.22 #endif
|
743 w.white 1.103.10.17 }
744
745
|
746 w.white 1.103.10.13 }
|
747 w.white 1.103.10.1
|
748 j.alex 1.103.10.2 pEvents = 1;
|
749 w.white 1.103.10.1
750 //this statment gets the pipe entry that was trigered
751 entries[indexPipeCountAssociator[pCount]].pipeSet = true;
|
752 mreddy 1.103.10.16
|
753 w.white 1.103.10.1 }
|
754 david.dillard 1.95 #else
|
755 w.white 1.103.10.13 events = select(maxSocketCurrentPass, &fdread, NULL, NULL, &tv);
|
756 david.dillard 1.95 #endif
|
757 kumpf 1.94 autoEntryMutex.lock();
|
758 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
759 // the array of _entries can be changed. The ArrayIterator has be reset with the original _entries
760 entries.reset(_entries);
|
761 w.white 1.103.10.1
|
762 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
763 j.alex 1.103.10.2 if(pEvents == -1)
764 {
765 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
766 "Monitor::run - errorno = %d has occurred on select.",GetLastError() );
767 // The EBADF error indicates that one or more or the file
768 // descriptions was not valid. This could indicate that
769 // the entries structure has been corrupted or that
770 // we have a synchronization error.
771
772 // We need to generate an assert here...
773 PEGASUS_ASSERT(GetLastError()!= EBADF);
774
775
776 }
|
777 mreddy 1.103.10.16
|
778 kumpf 1.50 if(events == SOCKET_ERROR)
|
779 mike 1.2 #else
|
780 kumpf 1.50 if(events == -1)
|
781 mike 1.2 #endif
|
782 mday 1.13 {
|
783 j.alex 1.103.10.2
784 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
785 kumpf 1.50 "Monitor::run - errorno = %d has occurred on select.", errno);
786 // The EBADF error indicates that one or more or the file
787 // descriptions was not valid. This could indicate that
|
788 mike 1.100 // the entries structure has been corrupted or that
|
789 kumpf 1.50 // we have a synchronization error.
790
791 PEGASUS_ASSERT(errno != EBADF);
792 }
|
793 j.alex 1.103.10.2 else if ((events)||(pEvents))
|
794 kumpf 1.50 {
|
795 w.white 1.103.10.1
|
796 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
797 w.white 1.103.10.17 {
798 AutoMutex automut(Monitor::_cout_mut);
799 cout << "IN Monior::run events= " << events << " pEvents= " << pEvents<< endl;
800 }
|
801 j.alex 1.103.10.22 #endif
|
802 w.white 1.103.10.17
803 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
804 r.kieninger 1.83 "Monitor::run select event received events = %d, monitoring %d idle entries",
|
805 david.dillard 1.95 events, _idleEntries);
|
806 mike 1.100 for( int indx = 0; indx < (int)entries.size(); indx++)
|
807 mday 1.25 {
|
808 mreddy 1.103.10.16 //cout << "Monitor::run at start of 'for( int indx = 0; indx ' - index = " << indx << endl;
|
809 kumpf 1.53 // The Monitor should only look at entries in the table that are IDLE (i.e.,
810 // owned by the Monitor).
|
811 w.white 1.103.10.17 // cout << endl << " status of entry " << indx << " is " << entries[indx]._status.get() << endl;
812 if((entries[indx]._status.get() == _MonitorEntry::IDLE) &&
813 ((FD_ISSET(entries[indx].socket, &fdread)&& (events)) ||
814 (entries[indx].isNamedPipeConnection() && entries[indx].pipeSet && (pEvents))))
|
815 david.dillard 1.95 {
|
816 w.white 1.103.10.17
|
817 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
818 w.white 1.103.10.17 {
819 AutoMutex automut(Monitor::_cout_mut);
820 cout <<"Monitor::run - index " << indx << " just got into 'if' statement" << endl;
821 }
|
822 j.alex 1.103.10.22 #endif
|
823 w.white 1.103.10.4 MessageQueue *q;
|
824 w.white 1.103.10.12 try{
|
825 mreddy 1.103.10.16
|
826 w.white 1.103.10.4 q = MessageQueue::lookup(entries[indx].queueId);
827 }
828 catch (Exception e)
829 {
|
830 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
831 j.alex 1.103.10.23 {
|
832 w.white 1.103.10.17 AutoMutex automut(Monitor::_cout_mut);
|
833 w.white 1.103.10.4 cout << " this is what lookup gives - " << e.getMessage() << endl;
|
834 j.alex 1.103.10.23 }
|
835 j.alex 1.103.10.22 #endif
|
836 w.white 1.103.10.4 exit(1);
837 }
838 catch(...)
839 {
|
840 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
841 j.alex 1.103.10.23 {
|
842 w.white 1.103.10.17 AutoMutex automut(Monitor::_cout_mut);
|
843 w.white 1.103.10.4 cout << "MessageQueue::lookup gives strange exception " << endl;
|
844 j.alex 1.103.10.23 }
|
845 j.alex 1.103.10.22 #endif
|
846 w.white 1.103.10.4 exit(1);
847 }
848
849
850
|
851 mreddy 1.103.10.16
|
852 w.white 1.103.10.4 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
853 kumpf 1.53 "Monitor::run indx = %d, queueId = %d, q = %p",
|
854 mike 1.100 indx, entries[indx].queueId, q);
|
855 w.white 1.103.10.17 // printf("Monitor::run indx = %d, queueId = %d, q = %p",
856 // indx, entries[indx].queueId, q);
857 //cout << "Monitor::run before PEGASUS_ASSerT(q !=0) " << endl;
858 PEGASUS_ASSERT(q !=0);
|
859 mday 1.37
|
860 w.white 1.103.10.17
|
861 david.dillard 1.95 try
|
862 mreddy 1.103.10.16 {
|
863 w.white 1.103.10.17 /* {
|
864 w.white 1.103.10.12 AutoMutex automut(Monitor::_cout_mut);
|
865 w.white 1.103.10.1 cout <<" this is the type " << entries[indx]._type <<
|
866 w.white 1.103.10.17 " for index " << indx << endl;
|
867 w.white 1.103.10.1 cout << "IN Monior::run right before entries[indx]._type == Monitor::CONNECTION" << endl;
|
868 w.white 1.103.10.17 }*/
|
869 w.white 1.103.10.12 if(entries[indx]._type == Monitor::CONNECTION)
|
870 david.dillard 1.95 {
|
871 j.alex 1.103.10.22
872 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
873 j.alex 1.103.10.23 {
874 AutoMutex automut(Monitor::_cout_mut);
875 cout << "In Monitor::run Monitor::CONNECTION clause" << endl;
876 }
|
877 j.alex 1.103.10.22 #endif
|
878 w.white 1.103.10.4
879 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
880 mike 1.100 "entries[indx].type for indx = %d is Monitor::CONNECTION", indx);
|
881 david.dillard 1.95 static_cast<HTTPConnection *>(q)->_entry_index = indx;
|
882 sushma.fernandes 1.78
883 // Do not update the entry just yet. The entry gets updated once
|
884 r.kieninger 1.83 // the request has been read.
|
885 mike 1.100 //entries[indx]._status = _MonitorEntry::BUSY;
|
886 sushma.fernandes 1.78
|
887 kumpf 1.66 // If allocate_and_awaken failure, retry on next iteration
|
888 a.arora 1.73 /* Removed for PEP 183.
|
889 kumpf 1.69 if (!MessageQueueService::get_thread_pool()->allocate_and_awaken(
890 (void *)q, _dispatch))
|
891 kumpf 1.67 {
892 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
893 "Monitor::run: Insufficient resources to process request.");
|
894 mike 1.100 entries[indx]._status = _MonitorEntry::IDLE;
|
895 kumpf 1.67 return true;
896 }
|
897 a.arora 1.73 */
898 // Added for PEP 183
|
899 david.dillard 1.95 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(q);
900 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
901 a.arora 1.73 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
902 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
903 w.white 1.103.10.6
904 /*In the case of named Pipes, the request has already been read from the pipe
905 therefor this section passed the request data to the HTTPConnection
906 NOTE: not sure if this would be better suited in a sparate private method
907 */
|
908 w.white 1.103.10.17
|
909 w.white 1.103.10.7 dst->setNamedPipe(entries[indx].namedPipe); //this step shouldn't be needd
|
910 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
911 w.white 1.103.10.17 {
912 AutoMutex automut(Monitor::_cout_mut);
913 cout << "In Monitor::run after dst->setNamedPipe string read is " << entries[indx].namedPipe.raw << endl;
914 }
|
915 j.alex 1.103.10.22 #endif
|
916 a.arora 1.73 try
|
917 mreddy 1.103.10.16 {
|
918 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
919 w.white 1.103.10.12 {
920 AutoMutex automut(Monitor::_cout_mut);
|
921 w.white 1.103.10.3 cout << "In Monitor::run about to call 'dst->run(1)' " << endl;
|
922 w.white 1.103.10.12 }
|
923 j.alex 1.103.10.22 #endif
|
924 a.arora 1.73 dst->run(1);
925 }
|
926 david.dillard 1.95 catch (...)
927 {
|
928 j.alex 1.103.10.22
|
929 david.dillard 1.95 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
930 "Monitor::_dispatch: exception received");
931 }
932 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
933 a.arora 1.73 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
934
|
935 w.white 1.103.10.13 if (entries[indx].isNamedPipeConnection())
936 {
937 entries[indx]._type = Monitor::ACCEPTOR;
938 }
|
939 w.white 1.103.10.17
|
940 sushma.fernandes 1.78 // It is possible the entry status may not be set to busy.
|
941 r.kieninger 1.83 // The following will fail in that case.
|
942 mike 1.96 // PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
|
943 a.arora 1.73 // Once the HTTPConnection thread has set the status value to either
944 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
945 // to the Monitor. It is no longer permissible to access the connection
946 // or the entry in the _entries table.
|
947 sushma.fernandes 1.78
948 // The following is not relevant as the worker thread or the
949 // reader thread will update the status of the entry.
950 //if (dst->_connectionClosePending)
|
951 r.kieninger 1.83 //{
|
952 sushma.fernandes 1.78 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
953 //}
954 //else
955 //{
956 // dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
957 r.kieninger 1.83 //}
958 // end Added for PEP 183
|
959 a.arora 1.73 }
|
960 mike 1.100 else if( entries[indx]._type == Monitor::INTERNAL){
|
961 r.kieninger 1.83 // set ourself to BUSY,
962 // read the data
|
963 a.arora 1.73 // and set ourself back to IDLE
|
964 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
965 j.alex 1.103.10.23 {
|
966 j.alex 1.103.10.22 AutoMutex automut(Monitor::_cout_mut);
|
967 w.white 1.103.10.17 cout << endl << " in - entries[indx]._type == Monitor::INTERNAL- " << endl << endl;
|
968 j.alex 1.103.10.23 }
|
969 j.alex 1.103.10.22 #endif
|
970 w.white 1.103.10.17 if (!entries[indx].isNamedPipeConnection())
971 {
972 entries[indx]._status = _MonitorEntry::BUSY;
973 static char buffer[2];
|
974 mike 1.100 Socket::disableBlocking(entries[indx].socket);
975 Sint32 amt = Socket::read(entries[indx].socket,&buffer, 2);
976 Socket::enableBlocking(entries[indx].socket);
|
977 w.white 1.103.10.17 entries[indx]._status = _MonitorEntry::IDLE;
978 }
|
979 mday 1.37 }
980 else
|
981 mday 1.25 {
|
982 mreddy 1.103.10.16
|
983 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
984 {
|
985 w.white 1.103.10.12 AutoMutex automut(Monitor::_cout_mut);
|
986 w.white 1.103.10.1 cout << "In Monitor::run else clause of CONNECTION if statments" << endl;
|
987 j.alex 1.103.10.22 }
988 #endif
|
989 w.white 1.103.10.17 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
990 kumpf 1.51 "Non-connection entry, indx = %d, has been received.", indx);
|
991 mday 1.37 int events = 0;
|
992 w.white 1.103.10.1 Message *msg;
|
993 j.alex 1.103.10.22
994 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
995 {
|
996 w.white 1.103.10.12 AutoMutex automut(Monitor::_cout_mut);
|
997 j.alex 1.103.10.2 cout << " In Monitor::run Just before checking if NamedPipeConnection" << "for Index "<<indx<< endl;
|
998 w.white 1.103.10.12 }
|
999 j.alex 1.103.10.22 #endif
|
1000 w.white 1.103.10.1 if (entries[indx].isNamedPipeConnection())
1001 {
|
1002 w.white 1.103.10.4 if(!entries[indx].namedPipe.isConnectionPipe)
|
1003 mreddy 1.103.10.16 { /*if we enter this clasue it means that the named pipe that we are
1004 looking at has recived a connection but is not the pipe we get connection requests over.
|
1005 w.white 1.103.10.13 therefore we need to change the _type to CONNECTION and wait for a CIM Operations request*/
|
1006 w.white 1.103.10.4 entries[indx]._type = Monitor::CONNECTION;
1007
1008
|
1009 w.white 1.103.10.8 /* This is a test - this shows that the read file needs to be done
|
1010 w.white 1.103.10.4 before we call wiatForMultipleObjects*/
1011 /******************************************************
1012 ********************************************************/
|
1013 w.white 1.103.10.17
1014
1015
|
1016 j.alex 1.103.10.21 memset(entries[indx].namedPipe.raw,'\0',NAMEDPIPE_MAX_BUFFER_SIZE);
|
1017 w.white 1.103.10.4 BOOL rc = ::ReadFile(
1018 entries[indx].namedPipe.getPipe(),
|
1019 w.white 1.103.10.6 &entries[indx].namedPipe.raw,
|
1020 j.alex 1.103.10.20 NAMEDPIPE_MAX_BUFFER_SIZE,
|
1021 w.white 1.103.10.7 &entries[indx].namedPipe.bytesRead,
|
1022 mreddy 1.103.10.15 entries[indx].namedPipe.getOverlap());
|
1023 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1024 w.white 1.103.10.12 {
1025 AutoMutex automut(Monitor::_cout_mut);
|
1026 mreddy 1.103.10.16 cout << "Monitor::run just called read on index " << indx << endl;
|
1027 w.white 1.103.10.12 }
|
1028 j.alex 1.103.10.22 #endif
|
1029 mreddy 1.103.10.16
1030 //&entries[indx].namedPipe.bytesRead = &size;
|
1031 w.white 1.103.10.4 if(!rc)
1032 {
|
1033 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1034 j.alex 1.103.10.23 {
|
1035 j.alex 1.103.10.22 AutoMutex automut(Monitor::_cout_mut);
|
1036 w.white 1.103.10.4 cout << "ReadFile failed for : " << GetLastError() << "."<< endl;
|
1037 j.alex 1.103.10.23 }
|
1038 j.alex 1.103.10.22 #endif
|
1039 w.white 1.103.10.4 }
1040
|
1041 mreddy 1.103.10.16
|
1042 w.white 1.103.10.4
1043 /******************************************************
1044 ********************************************************/
1045
1046
1047
1048
|
1049 w.white 1.103.10.17 continue;
1050
|
1051 w.white 1.103.10.4
1052 }
|
1053 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1054 w.white 1.103.10.12 {
1055 AutoMutex automut(Monitor::_cout_mut);
1056 cout << " In Monitor::run about to create a Pipe message" << endl;
1057
1058 }
|
1059 j.alex 1.103.10.22 #endif
|
1060 w.white 1.103.10.1 events |= NamedPipeMessage::READ;
1061 msg = new NamedPipeMessage(entries[indx].namedPipe, events);
1062 }
1063 else
|
1064 mreddy 1.103.10.16 {
|
1065 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1066 mreddy 1.103.10.16 {
|
1067 w.white 1.103.10.12 AutoMutex automut(Monitor::_cout_mut);
|
1068 j.alex 1.103.10.2 cout << " In Monitor::run ..its a socket message" << endl;
|
1069 w.white 1.103.10.12 }
|
1070 j.alex 1.103.10.22 #endif
|
1071 w.white 1.103.10.1 events |= SocketMessage::READ;
1072 msg = new SocketMessage(entries[indx].socket, events);
1073 }
1074
|
1075 mike 1.100 entries[indx]._status = _MonitorEntry::BUSY;
|
1076 kumpf 1.94 autoEntryMutex.unlock();
|
1077 mday 1.37 q->enqueue(msg);
|
1078 kumpf 1.94 autoEntryMutex.lock();
|
1079 r.kieninger 1.102 // After enqueue a message and the autoEntryMutex has been released and locked again,
1080 // the array of entries can be changed. The ArrayIterator has be reset with the original _entries
1081 entries.reset(_entries);
|
1082 mike 1.100 entries[indx]._status = _MonitorEntry::IDLE;
|
1083 kumpf 1.94
|
1084 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1085 mreddy 1.103.10.9 {
1086 AutoMutex automut(Monitor::_cout_mut);
1087 PEGASUS_STD(cout) << "Exiting: Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1088 }
|
1089 j.alex 1.103.10.22 #endif
|
1090 mday 1.25 return true;
1091 }
1092 }
|
1093 mday 1.37 catch(...)
|
1094 mday 1.25 {
1095 }
1096 handled_events = true;
1097 }
1098 }
|
1099 mday 1.24 }
|
1100 kumpf 1.94
|
1101 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1102 mreddy 1.103.10.9 {
1103 AutoMutex automut(Monitor::_cout_mut);
1104 PEGASUS_STD(cout) << "Exiting: Monitor::run(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1105 }
|
1106 j.alex 1.103.10.22 #endif
|
1107 mday 1.13 return(handled_events);
|
1108 mike 1.2 }
1109
|
1110 chuck 1.74 void Monitor::stopListeningForConnections(Boolean wait)
|
1111 kumpf 1.48 {
|
1112 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1113 mreddy 1.103.10.9 {
1114 AutoMutex automut(Monitor::_cout_mut);
1115 PEGASUS_STD(cout) << "Entering: Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1116 }
|
1117 j.alex 1.103.10.22 #endif
|
1118 kumpf 1.48 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::stopListeningForConnections()");
|
1119 r.kieninger 1.83 // set boolean then tickle the server to recognize _stopConnections
|
1120 kumpf 1.48 _stopConnections = 1;
|
1121 a.arora 1.73 tickle();
|
1122 kumpf 1.48
|
1123 chuck 1.74 if (wait)
|
1124 a.arora 1.73 {
|
1125 chuck 1.74 // Wait for the monitor to notice _stopConnections. Otherwise the
1126 // caller of this function may unbind the ports while the monitor
1127 // is still accepting connections on them.
|
1128 kumpf 1.101 _stopConnectionsSem.wait();
|
1129 a.arora 1.73 }
|
1130 r.kieninger 1.83
|
1131 kumpf 1.48 PEG_METHOD_EXIT();
|
1132 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1133 mreddy 1.103.10.9 {
1134 AutoMutex automut(Monitor::_cout_mut);
1135 PEGASUS_STD(cout) << "Exiting: Monitor::stopListeningForConnections(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1136 }
|
1137 j.alex 1.103.10.22 #endif
|
1138 kumpf 1.48 }
|
1139 mday 1.25
|
1140 mday 1.37
|
1141 mday 1.25 int Monitor::solicitSocketMessages(
|
1142 david.dillard 1.95 PEGASUS_SOCKET socket,
|
1143 mike 1.2 Uint32 events,
|
1144 r.kieninger 1.83 Uint32 queueId,
|
1145 mday 1.8 int type)
|
1146 mike 1.2 {
|
1147 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1148 mreddy 1.103.10.9 {
1149 AutoMutex automut(Monitor::_cout_mut);
1150 PEGASUS_STD(cout) << "Entering: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1151 }
|
1152 j.alex 1.103.10.22 #endif
|
1153 r.kieninger 1.83 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitSocketMessages");
|
1154 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
1155 a.arora 1.73 // Check to see if we need to dynamically grow the _entries array
1156 // We always want the _entries array to 2 bigger than the
1157 // current connections requested
1158 _solicitSocketCount++; // bump the count
1159 int size = (int)_entries.size();
|
1160 w.otsuka 1.89 if((int)_solicitSocketCount >= (size-1)){
1161 for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
|
1162 a.arora 1.73 _MonitorEntry entry(0, 0, 0);
1163 _entries.append(entry);
1164 }
1165 }
|
1166 kumpf 1.4
|
1167 a.arora 1.73 int index;
1168 for(index = 1; index < (int)_entries.size(); index++)
|
1169 mday 1.25 {
|
1170 a.arora 1.73 try
|
1171 mday 1.37 {
|
1172 mike 1.96 if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
|
1173 a.arora 1.73 {
1174 _entries[index].socket = socket;
1175 _entries[index].queueId = queueId;
1176 _entries[index]._type = type;
1177 _entries[index]._status = _MonitorEntry::IDLE;
|
1178 r.kieninger 1.83
|
1179 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1180 mreddy 1.103.10.9 {
1181 AutoMutex automut(Monitor::_cout_mut);
1182 PEGASUS_STD(cout) << "Exiting: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1183 }
|
1184 j.alex 1.103.10.22 #endif
|
1185 a.arora 1.73 return index;
1186 }
|
1187 mday 1.37 }
1188 catch(...)
|
1189 mday 1.25 {
1190 }
1191 }
|
1192 a.arora 1.73 _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful
|
1193 mday 1.25 PEG_METHOD_EXIT();
|
1194 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1195 mreddy 1.103.10.9 {
1196 AutoMutex automut(Monitor::_cout_mut);
1197 PEGASUS_STD(cout) << "Exiting: Monitor::solicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1198 }
|
1199 j.alex 1.103.10.22 #endif
|
1200 kumpf 1.50 return -1;
|
1201 a.arora 1.73
|
1202 mike 1.2 }
1203
|
1204 david.dillard 1.95 void Monitor::unsolicitSocketMessages(PEGASUS_SOCKET socket)
|
1205 mike 1.2 {
|
1206 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1207 mreddy 1.103.10.9 {
1208 AutoMutex automut(Monitor::_cout_mut);
1209 PEGASUS_STD(cout) << "Entering: Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1210 }
|
1211 j.alex 1.103.10.22 #endif
|
1212 kumpf 1.50
|
1213 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
1214 alagaraja 1.75 AutoMutex autoMut(_entry_mut);
|
1215 a.arora 1.73
1216 /*
1217 Start at index = 1 because _entries[0] is the tickle entry which never needs
1218 to be EMPTY;
1219 */
|
1220 w.otsuka 1.89 unsigned int index;
|
1221 a.arora 1.73 for(index = 1; index < _entries.size(); index++)
|
1222 mike 1.2 {
|
1223 mday 1.25 if(_entries[index].socket == socket)
1224 {
|
1225 a.arora 1.73 _entries[index]._status = _MonitorEntry::EMPTY;
|
1226 david.dillard 1.95 _entries[index].socket = PEGASUS_INVALID_SOCKET;
|
1227 a.arora 1.73 _solicitSocketCount--;
1228 break;
|
1229 mday 1.25 }
|
1230 mike 1.2 }
|
1231 a.arora 1.73
1232 /*
1233 Dynamic Contraction:
1234 To remove excess entries we will start from the end of the _entries array
1235 and remove all entries with EMPTY status until we find the first NON EMPTY.
1236 This prevents the positions, of the NON EMPTY entries, from being changed.
|
1237 r.kieninger 1.83 */
|
1238 a.arora 1.73 index = _entries.size() - 1;
|
1239 mike 1.96 while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
|
1240 a.arora 1.73 if(_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES)
1241 _entries.remove(index);
1242 index--;
1243 }
|
1244 kumpf 1.4 PEG_METHOD_EXIT();
|
1245 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1246 mreddy 1.103.10.9 {
1247 AutoMutex automut(Monitor::_cout_mut);
1248 PEGASUS_STD(cout) << "Exiting: Monitor::unsolicitSocketMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1249 }
|
1250 j.alex 1.103.10.22 #endif
|
1251 mike 1.2 }
1252
|
1253 a.arora 1.73 // Note: this is no longer called with PEP 183.
|
1254 mday 1.7 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
1255 {
|
1256 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1257 mreddy 1.103.10.9 {
1258 AutoMutex automut(Monitor::_cout_mut);
1259 PEGASUS_STD(cout) << "Entering: Monitor::_dispatch(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1260 }
|
1261 j.alex 1.103.10.22 #endif
|
1262 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
1263 kumpf 1.51 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
|
1264 kumpf 1.53 "Monitor::_dispatch: entering run() for indx = %d, queueId = %d, q = %p",
1265 dst->_entry_index, dst->_monitor->_entries[dst->_entry_index].queueId, dst);
|
1266 kumpf 1.51 try
1267 {
1268 dst->run(1);
1269 }
1270 catch (...)
1271 {
1272 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1273 "Monitor::_dispatch: exception received");
1274 }
1275 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
1276 "Monitor::_dispatch: exited run() for index %d", dst->_entry_index);
|
1277 r.kieninger 1.83
|
1278 mike 1.96 PEGASUS_ASSERT(dst->_monitor->_entries[dst->_entry_index]._status.get() == _MonitorEntry::BUSY);
|
1279 kumpf 1.68
1280 // Once the HTTPConnection thread has set the status value to either
1281 // Monitor::DYING or Monitor::IDLE, it has returned control of the connection
1282 // to the Monitor. It is no longer permissible to access the connection
1283 // or the entry in the _entries table.
|
1284 kumpf 1.50 if (dst->_connectionClosePending)
1285 {
|
1286 kumpf 1.68 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::DYING;
1287 }
1288 else
1289 {
1290 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
|
1291 kumpf 1.50 }
|
1292 mday 1.8 return 0;
|
1293 mday 1.40 }
1294
|
1295 w.white 1.103.10.1
1296 //This method is anlogsu to solicitSocketMessages. It does the same thing for named Pipes
1297 int Monitor::solicitPipeMessages(
1298 NamedPipe namedPipe,
1299 Uint32 events, //not sure what has to change for this enum
1300 Uint32 queueId,
1301 int type)
1302 {
1303 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solicitPipeMessages");
|
1304 j.alex 1.103.10.22
|
1305 w.white 1.103.10.1 AutoMutex autoMut(_entry_mut);
1306 // Check to see if we need to dynamically grow the _entries array
1307 // We always want the _entries array to 2 bigger than the
1308 // current connections requested
|
1309 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1310 j.alex 1.103.10.23 {
|
1311 j.alex 1.103.10.22 AutoMutex automut(Monitor::_cout_mut);
|
1312 w.white 1.103.10.1 PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages at the begining" << PEGASUS_STD(endl);
|
1313 j.alex 1.103.10.23 }
|
1314 j.alex 1.103.10.22 #endif
|
1315 w.white 1.103.10.1
|
1316 mreddy 1.103.10.16
|
1317 w.white 1.103.10.1 _solicitSocketCount++; // bump the count
1318 int size = (int)_entries.size();
1319 if((int)_solicitSocketCount >= (size-1)){
1320 for(int i = 0; i < ((int)_solicitSocketCount - (size-1)); i++){
1321 _MonitorEntry entry(0, 0, 0);
1322 _entries.append(entry);
1323 }
1324 }
1325
1326 int index;
1327 for(index = 1; index < (int)_entries.size(); index++)
1328 {
1329 try
1330 {
1331 if(_entries[index]._status.get() == _MonitorEntry::EMPTY)
1332 {
1333 _entries[index].socket = NULL;
1334 _entries[index].namedPipe = namedPipe;
1335 _entries[index].namedPipeConnection = true;
1336 _entries[index].queueId = queueId;
1337 _entries[index]._type = type;
1338 w.white 1.103.10.1 _entries[index]._status = _MonitorEntry::IDLE;
|
1339 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1340 j.alex 1.103.10.23 {
|
1341 j.alex 1.103.10.22 AutoMutex automut(Monitor::_cout_mut);
|
1342 w.white 1.103.10.1 PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages after seting up _entries[index] index = " << index << PEGASUS_STD(endl);
|
1343 j.alex 1.103.10.23 }
|
1344 j.alex 1.103.10.22 #endif
|
1345 j.alex 1.103.10.23
|
1346 w.white 1.103.10.1 return index;
1347 }
1348 }
1349 catch(...)
1350 {
1351 }
1352
1353 }
1354 _solicitSocketCount--; // decrease the count, if we are here we didnt do anything meaningful
1355 PEGASUS_STD(cout) << "In Monitor::solicitPipeMessages nothing happed - it didn't work" << PEGASUS_STD(endl);
1356
1357 PEG_METHOD_EXIT();
1358 return -1;
1359
1360 }
1361
|
1362 j.alex 1.103.10.14 void Monitor::unsolicitPipeMessages(NamedPipe namedPipe)
1363 {
|
1364 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1365 j.alex 1.103.10.14 {
1366 AutoMutex automut(Monitor::_cout_mut);
1367 PEGASUS_STD(cout) << "Entering: Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1368 }
|
1369 j.alex 1.103.10.22 #endif
|
1370 j.alex 1.103.10.14
1371 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitPipeMessages");
1372 AutoMutex autoMut(_entry_mut);
1373
1374 /*
1375 Start at index = 1 because _entries[0] is the tickle entry which never needs
1376 to be EMPTY;
1377 */
1378 unsigned int index;
1379 for(index = 1; index < _entries.size(); index++)
1380 {
1381 if(_entries[index].namedPipe.getPipe() == namedPipe.getPipe())
1382 {
1383 _entries[index]._status = _MonitorEntry::EMPTY;
1384 //_entries[index].namedPipe = PEGASUS_INVALID_SOCKET;
1385 _solicitSocketCount--;
1386 break;
1387 }
1388 }
1389
1390 /*
1391 j.alex 1.103.10.14 Dynamic Contraction:
1392 To remove excess entries we will start from the end of the _entries array
1393 and remove all entries with EMPTY status until we find the first NON EMPTY.
1394 This prevents the positions, of the NON EMPTY entries, from being changed.
1395 */
1396 index = _entries.size() - 1;
1397 while(_entries[index]._status.get() == _MonitorEntry::EMPTY){
|
1398 j.alex 1.103.10.19 if((_entries[index].namedPipe.getPipe() == namedPipe.getPipe()) ||
1399 (_entries.size() > MAX_NUMBER_OF_MONITOR_ENTRIES))
1400 {
1401 _entries.remove(index);
1402 }
|
1403 j.alex 1.103.10.14 index--;
1404 }
1405 PEG_METHOD_EXIT();
|
1406 j.alex 1.103.10.22 #ifdef PEGASUS_LOCALDOMAINSOCKET_DEBUG
|
1407 j.alex 1.103.10.14 {
1408 AutoMutex automut(Monitor::_cout_mut);
1409 PEGASUS_STD(cout) << "Exiting: Monitor::unsolicitPipeMessages(): (tid:" << Uint32(pegasus_thread_self()) << ")" << PEGASUS_STD(endl);
1410 }
|
1411 j.alex 1.103.10.22 #endif
|
1412 j.alex 1.103.10.14 }
1413
1414
|
1415 w.white 1.103.10.1
|
1416 mike 1.2 PEGASUS_NAMESPACE_END
|