1 mike 1.2 //%/////////////////////////////////////////////////////////////////////////////
2 //
|
3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
|
5 mike 1.2 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
7 kumpf 1.17 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
10 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
|
13 kumpf 1.17 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
14 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
16 kumpf 1.17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
19 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 //==============================================================================
23 //
24 // Author: Mike Brasher (mbrasher@bmc.com)
25 //
26 // Modified By:
27 //
28 //%/////////////////////////////////////////////////////////////////////////////
29
|
30 mday 1.32.2.2 #include "Monitor.h"
|
31 mike 1.2
32 #ifdef PEGASUS_OS_TYPE_WINDOWS
33 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
34 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
35 of <winsock.h>. It may have been indirectly included (e.g., by including \
|
36 mday 1.25 <windows.h>). Finthe inclusion of that header which is visible to this \
|
37 mike 1.2 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
38 otherwise, less than 64 clients (the default) will be able to connect to the \
39 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
|
40 mday 1.5
|
41 mike 1.2 # endif
42 # define FD_SETSIZE 1024
|
43 mday 1.5 # include <windows.h>
|
44 mike 1.2 #else
45 # include <sys/types.h>
46 # include <sys/socket.h>
47 # include <sys/time.h>
48 # include <netinet/in.h>
49 # include <netdb.h>
50 # include <arpa/inet.h>
|
51 mday 1.32.2.1 # include <unistd.h>
|
52 mike 1.2 #endif
53
54 PEGASUS_USING_STD;
55
56 PEGASUS_NAMESPACE_BEGIN
57
|
58 mday 1.18
|
59 mday 1.25 static AtomicInt _connections = 0;
60
61
62 static struct timeval create_time = {0, 1};
|
63 mday 1.26 static struct timeval destroy_time = {15, 0};
|
64 mday 1.32.2.5 static struct timeval deadlock_time = {300, 0};
|
65 mday 1.18
|
66 mike 1.2 ////////////////////////////////////////////////////////////////////////////////
67 //
68 // MonitorRep
69 //
70 ////////////////////////////////////////////////////////////////////////////////
71
72 struct MonitorRep
73 {
74 fd_set rd_fd_set;
75 fd_set wr_fd_set;
76 fd_set ex_fd_set;
77 fd_set active_rd_fd_set;
78 fd_set active_wr_fd_set;
79 fd_set active_ex_fd_set;
80 };
81
82 ////////////////////////////////////////////////////////////////////////////////
83 //
84 // Monitor
85 //
86 ////////////////////////////////////////////////////////////////////////////////
87 mike 1.2
88 Monitor::Monitor()
|
89 mday 1.32.2.2 : _async(false)
|
90 mike 1.2 {
91 Socket::initializeInterface();
|
92 mday 1.25 _rep = 0;
|
93 mday 1.32.2.1 _entries.reserveCapacity(32);
94 int i = 0;
95 for( ; i < 32; i++ )
96 {
97 _MonitorEntry entry(0, 0, 0);
98 _entries.append(entry);
99 }
|
100 mike 1.2 }
101
|
102 mday 1.18 Monitor::Monitor(Boolean async)
|
103 mday 1.32.2.2 : _async(async)
|
104 mday 1.18 {
105 Socket::initializeInterface();
|
106 mday 1.25 _rep = 0;
|
107 mday 1.32.2.3 _entries.reserveCapacity(32);
108 int i = 0;
109 for( ; i < 32; i++ )
110 {
111 _MonitorEntry entry(0, 0, 0);
112 _entries.append(entry);
113 }
|
114 mday 1.32.2.1
|
115 mday 1.19 if( _async == true )
116 {
117 _thread_pool = new ThreadPool(0,
118 "Monitor",
|
119 mday 1.32.2.1 1,
|
120 mday 1.25 0,
|
121 mday 1.19 create_time,
122 destroy_time,
123 deadlock_time);
124 }
|
125 mday 1.20 else
126 _thread_pool = 0;
|
127 mday 1.18 }
|
128 mday 1.20
|
129 mike 1.2 Monitor::~Monitor()
130 {
|
131 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
132 "deregistering with module controller");
|
133 kumpf 1.10
|
134 mday 1.32.2.2
|
135 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "deleting rep");
|
136 mday 1.8
|
137 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4, "uninitializing interface");
|
138 mike 1.2 Socket::uninitializeInterface();
|
139 kumpf 1.11 Tracer::trace(TRC_HTTP, Tracer::LEVEL4,
140 "returning from monitor destructor");
|
141 mday 1.21 if(_async == true)
|
142 mday 1.20 delete _thread_pool;
|
143 mike 1.2 }
144
|
145 mday 1.7
|
146 mday 1.18 int Monitor::kill_idle_threads()
147 {
148 static struct timeval now, last;
149 gettimeofday(&now, NULL);
|
150 mday 1.20 int dead_threads = 0;
|
151 mday 1.18
|
152 mday 1.32.2.1 if( now.tv_sec - last.tv_sec > 300 )
|
153 mday 1.18 {
|
154 mday 1.32.2.1 PEGASUS_STD(cout) << "Monitor Thread Pool currently has " <<
155 _thread_pool->running_count() +
156 _thread_pool->pool_count() << " Threads." << PEGASUS_STD(endl);
|
157 mday 1.18 gettimeofday(&last, NULL);
|
158 mday 1.20 try
159 {
160 dead_threads = _thread_pool->kill_dead_threads();
161 }
162 catch(IPCException& )
163 {
164 }
165
|
166 mday 1.18 }
|
167 mday 1.20 return dead_threads;
|
168 mday 1.18 }
169
|
170 mday 1.7
|
171 mike 1.2 Boolean Monitor::run(Uint32 milliseconds)
172 {
|
173 mday 1.18
|
174 mday 1.25 Boolean handled_events = false;
175 int i = 0;
|
176 mday 1.18
|
177 mday 1.25 struct timeval tv = {0,1};
178 fd_set fdread;
179 FD_ZERO(&fdread);
|
180 mday 1.32.2.4 _entry_mut.lock(pegasus_thread_self());
181
|
182 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
|
183 mike 1.2 {
|
184 mday 1.32.2.1 if(_entries[indx]._status.value() == _MonitorEntry::IDLE)
|
185 mday 1.25 {
186 FD_SET(_entries[indx].socket, &fdread);
187 }
|
188 mday 1.13 }
|
189 mday 1.32.2.1
|
190 mday 1.32.2.4
|
191 mday 1.25 int events = select(FD_SETSIZE, &fdread, NULL, NULL, &tv);
192
|
193 mike 1.2 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
194 mday 1.25 if(events && events != SOCKET_ERROR )
|
195 mike 1.2 #else
|
196 mday 1.25 if(events && events != -1 )
|
197 mike 1.2 #endif
|
198 mday 1.13 {
|
199 mday 1.25 for( int indx = 0; indx < (int)_entries.size(); indx++)
200 {
201 if(FD_ISSET(_entries[indx].socket, &fdread))
202 {
203 MessageQueue *q = MessageQueue::lookup(_entries[indx].queueId);
204 if(q == 0)
205 {
|
206 mday 1.32.2.1 PEGASUS_STD(cout) << "Monitor:: found an empty connection slot" << PEGASUS_STD(endl);
|
207 mday 1.32.2.5 try
|
208 mday 1.32.2.1 {
|
209 mday 1.32.2.5 _entries[indx]._status = _MonitorEntry::EMPTY;
|
210 mday 1.32.2.1 }
211 catch(...)
212 {
|
213 mday 1.32.2.5
|
214 mday 1.32.2.1 }
|
215 mday 1.32.2.5 continue;
|
216 mday 1.25 }
|
217 mday 1.32.2.1 try
|
218 mday 1.25 {
|
219 mday 1.32.2.1 if(_entries[indx]._type == Monitor::CONNECTION)
|
220 mday 1.25 {
|
221 mday 1.32.2.1 static_cast<HTTPConnection *>(q)->_entry_index = indx;
222 if(static_cast<HTTPConnection *>(q)->_dying.value() > 0 )
223 {
224 _entries[indx]._status = _MonitorEntry::DYING;
225 MessageQueue & o = static_cast<HTTPConnection *>(q)->get_owner();
226 Message* message= new CloseConnectionMessage(_entries[indx].socket);
227 message->dest = o.getQueueId();
|
228 mday 1.32.2.4 _entry_mut.unlock();
|
229 mday 1.32.2.1 o.enqueue(message);
230 return true;
231 }
232 _entries[indx]._status = _MonitorEntry::BUSY;
233 _thread_pool->allocate_and_awaken((void *)q, _dispatch);
234 }
235 else
236 {
237 int events = 0;
238 events |= SocketMessage::READ;
239 Message *msg = new SocketMessage(_entries[indx].socket, events);
240 _entries[indx]._status = _MonitorEntry::BUSY;
|
241 mday 1.32.2.4 _entry_mut.unlock();
|
242 mday 1.32.2.1 q->enqueue(msg);
243 _entries[indx]._status = _MonitorEntry::IDLE;
|
244 mday 1.25 return true;
245 }
246 }
|
247 mday 1.32.2.1 catch(...)
|
248 mday 1.25 {
249 }
250 handled_events = true;
251 }
252 }
|
253 mday 1.24 }
|
254 mday 1.32.2.4 _entry_mut.unlock();
|
255 mday 1.13 return(handled_events);
|
256 mike 1.2 }
257
|
258 mday 1.25
259 int Monitor::solicitSocketMessages(
|
260 mike 1.2 Sint32 socket,
261 Uint32 events,
|
262 mday 1.8 Uint32 queueId,
263 int type)
|
264 mike 1.2 {
|
265 kumpf 1.4
|
266 mday 1.32.2.1 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::solictSocketMessage");
|
267 mike 1.2
|
268 mday 1.32.2.1 int index = -1;
|
269 mday 1.32.2.4 _entry_mut.lock(pegasus_thread_self());
270
|
271 mday 1.25 for(index = 0; index < (int)_entries.size(); index++)
272 {
|
273 mday 1.32.2.1 try
274 {
275 if(_entries[index]._status.value() == _MonitorEntry::EMPTY)
276 {
277 _entries[index].socket = socket;
278 _entries[index].queueId = queueId;
279 _entries[index]._type = type;
280 _entries[index]._status = _MonitorEntry::IDLE;
|
281 mday 1.32.2.4 _entry_mut.unlock();
282
|
283 mday 1.32.2.1 return index;
284 }
285 }
286 catch(...)
|
287 mday 1.25 {
288 }
|
289 mday 1.32.2.4
|
290 mday 1.25 }
|
291 mday 1.32.2.4 _entry_mut.unlock();
|
292 mday 1.25 PEG_METHOD_EXIT();
293 return index;
|
294 mike 1.2 }
295
|
296 mday 1.25 void Monitor::unsolicitSocketMessages(Sint32 socket)
|
297 mike 1.2 {
|
298 mday 1.25 PEG_METHOD_ENTER(TRC_HTTP, "Monitor::unsolicitSocketMessages");
|
299 mday 1.32.2.4 _entry_mut.lock(pegasus_thread_self());
|
300 mday 1.27
|
301 mday 1.25 for(int index = 0; index < (int)_entries.size(); index++)
|
302 mike 1.2 {
|
303 mday 1.25 if(_entries[index].socket == socket)
304 {
305 _entries[index]._status = _MonitorEntry::EMPTY;
|
306 mday 1.32.2.4 break;
|
307 mday 1.25 }
|
308 mike 1.2 }
|
309 mday 1.32.2.4
310 _entry_mut.unlock();
|
311 mday 1.25
|
312 mday 1.32.2.1 PEG_METHOD_EXIT();
313 if( _async == true )
314 PEGASUS_STD(cout) << "Monitor:: running " << _thread_pool->running_count() <<
315 " idle " << _thread_pool->pool_count() << PEGASUS_STD(endl);
|
316 mike 1.2
|
317 mday 1.32.2.1 }
|
318 mday 1.7
319
320 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL Monitor::_dispatch(void *parm)
321 {
|
322 mday 1.8 HTTPConnection *dst = reinterpret_cast<HTTPConnection *>(parm);
|
323 mday 1.32.2.1
|
324 mday 1.25 dst->run(1);
|
325 mday 1.32.2.1 if( dst->_monitor->_entries.size() > (Uint32)dst->_entry_index )
326 dst->_monitor->_entries[dst->_entry_index]._status = _MonitorEntry::IDLE;
327
|
328 mday 1.8 return 0;
|
329 mday 1.7 }
330
|
331 mike 1.2
332 PEGASUS_NAMESPACE_END
|