1 mday 1.1.2.1 //%/////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
13 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
14 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
15 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
17 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
18 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 //
21 //==============================================================================
22 mday 1.1.2.1 //
|
23 mday 1.1.2.2 // Author: Mike Day (mdday@us.ibm.com)
|
24 mday 1.1.2.1 //
|
25 mday 1.1.2.2 // Modified By:
|
26 mday 1.1.2.1 //
27 //%/////////////////////////////////////////////////////////////////////////////
28
29 #include "Thread.h"
|
30 mday 1.1.2.4 #include <Pegasus/Common/IPC.h>
|
31 mday 1.1.2.1
32 #if defined(PEGASUS_OS_TYPE_WINDOWS)
33 # include "ThreadWindows.cpp"
34 #elif defined(PEGASUS_OS_TYPE_UNIX)
35 # include "ThreadUnix.cpp"
36 #else
37 # error "Unsupported platform"
38 #endif
39
40 PEGASUS_NAMESPACE_BEGIN
41
|
42 mday 1.1.2.8 void thread_data::default_delete(void * data)
43 {
44 if( data != NULL)
45 ::operator delete(data);
46 }
47
|
48 mday 1.1.2.3 Boolean Thread::_signals_blocked = false;
|
49 mday 1.1.2.1
50 // for non-native implementations
51 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
52 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
53 {
|
54 mday 1.1.2.5 cleanup_handler *cu = new cleanup_handler(routine, parm);
55 try
56 {
57 _cleanup.insert_first(cu);
58 }
59 catch(IPCException& e)
60 {
|
61 mday 1.1.2.7 delete cu;
|
62 mday 1.1.2.5 throw;
63 }
64 return;
|
65 mday 1.1.2.1 }
|
66 mday 1.1.2.5
|
67 kumpf 1.1.2.6 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
|
68 mday 1.1.2.1 {
|
69 mday 1.1.2.5 cleanup_handler *cu ;
70 try
71 {
72 cu = _cleanup.remove_first() ;
73 }
74 catch(IPCException& e)
|
75 mday 1.1.2.7 {
|
76 kumpf 1.1.2.6 PEGASUS_ASSERT(0);
|
77 mday 1.1.2.5 }
78 if(execute == true)
79 cu->execute();
80 delete cu;
|
81 mday 1.1.2.1 }
|
82 mday 1.1.2.5
|
83 mday 1.1.2.3 #endif
|
84 mday 1.1.2.4
85
86 //thread_data *Thread::put_tsd(Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
87
|
88 mday 1.1.2.2
|
89 mday 1.1.2.3 #ifndef PEGASUS_THREAD_EXIT_NATIVE
|
90 mday 1.1.2.2 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
91 {
|
92 mday 1.1.2.5 // execute the cleanup stack and then return
93 while( _cleanup.count() )
94 {
95 try
96 {
97 cleanup_pop(true);
98 }
99 catch(IPCException& e)
100 {
|
101 mday 1.1.2.7 PEGASUS_ASSERT(0);
102 break;
|
103 mday 1.1.2.5 }
104 }
|
105 mday 1.1.2.7 _exit_code = exit_code;
106 exit_thread(exit_code);
|
107 mday 1.1.2.2 }
|
108 mday 1.1.2.4
|
109 mday 1.1.2.7
|
110 mday 1.1.2.1 #endif
|
111 mday 1.1.2.9
112
113 ThreadPool::ThreadPool(Sint16 initial_size,
114 Sint16 max,
115 Sint16 min,
116 Sint8 *key)
117 : _max_threads(max), _min_threads(min),
118 _current_threads(0), _waiters(initial_size),
119 _pool_sem(0), _pool(true), _running(true),
120 _dying(0)
121 {
122 _allocate_wait.tv_sec = 1;
123 _allocate_wait.tv_usec = 0;
124 _deallocate_wait.tv_sec = 30;
125 _deallocate_wait.tv_usec = 0;
126 _deadlock_detect.tv_sec = 60;
127 _deadlock_detect.tv_usec = 0;
128 memset(_key, 0x00, 17);
129 if(key != 0)
130 strncpy(_key, key, 16);
131 if(_max_threads < initial_size)
132 mday 1.1.2.9 _max_threads = initial_size;
133 if(_min_threads > initial_size)
134 _min_threads = initial_size;
135
136 int i;
137 for(i = 0; i < initial_size; i++)
138 {
139 _link_pool(_init_thread());
140 }
141 }
142
143 ThreadPool::~ThreadPool(void)
144 {
145 _dying++;
146 Thread *th = _pool.remove_first();
147 while(th != 0)
148 {
149 // signal the thread's sleep semaphore
150 th->cancel();
151 th->join();
152 th->empty_tsd();
153 mday 1.1.2.9 delete th;
154 th = _pool.remove_first();
155 }
156 }
157
158 // make this static to the class
159 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
160 {
161 Thread *myself = (Thread *)parm;
162 if(myself == 0)
163 throw NullPointer();
164 ThreadPool *pool = (ThreadPool *)myself->get_parm();
165 if(pool == 0 )
166 throw NullPointer();
167 Semaphore *sleep_sem;
168 struct timeval *deadlock_timer;
169
170 try
171 {
172 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
173 myself->dereference_tsd();
174 mday 1.1.2.9 deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
175 myself->dereference_tsd();
176 }
177 catch(IPCException & e)
178 {
179 myself->exit_self(0);
180 }
181 if(sleep_sem == 0 || deadlock_timer == 0)
182 throw NullPointer();
183
184 while(pool->_dying < 1)
185 {
186 myself->test_cancel();
187 sleep_sem->wait();
188 // when we awaken we reside on the running queue, not the pool queue
189 myself->test_cancel();
190 gettimeofday(deadlock_timer, NULL);
191
192 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *);
193 void *parm;
194
195 mday 1.1.2.9 try
196 {
197 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
198 myself->reference_tsd("work func");
199 myself->dereference_tsd();
200 parm = myself->reference_tsd("work parm");
201 myself->dereference_tsd();
202 }
203 catch(IPCException & e)
204 {
205 myself->exit_self(0);
206 }
207
208 if(_work == 0)
209 throw NullPointer();
210 _work(parm);
211
212 // put myself back onto the available list
213 try
214 {
215 pool->_running.remove((void *)myself);
216 mday 1.1.2.9 pool->_link_pool(myself);
217 }
218 catch(IPCException & e)
219 {
220 myself->exit_self(0);
221 }
222 }
223 myself->exit_self(0);
224 return((PEGASUS_THREAD_RETURN)0);
225 }
226
227
228 void ThreadPool::allocate_and_awaken(void *parm,
229 PEGASUS_THREAD_RETURN \
230 (PEGASUS_THREAD_CDECL *work)(void *))
231 throw(IPCException)
232 {
233 struct timeval start;
234 gettimeofday(&start, NULL);
235
236 Thread *th = _pool.remove_first();
237 mday 1.1.2.9
238 while (th == 0 && _dying < 1)
239 {
240 try // we couldn't get a free thread from the pool
241 {
242 // wait for the right interval and try again
243 while(th == 0 && _dying < 1)
244 {
245 _check_deadlock(&start);
246 Uint32 interval = _allocate_wait.tv_sec * 1000;
247 if(_allocate_wait.tv_usec > 0)
248 interval += (_deallocate_wait.tv_usec / 1000);
249 // will throw a timeout if no thread comes free
250 _pool_sem.time_wait(interval);
251 th = _pool.remove_first();
252 }
253 }
254 catch(TimeOut & to)
255 {
256 if(_current_threads < _max_threads)
257 {
258 mday 1.1.2.9 th = _init_thread();
259 break;
260 }
261 }
262 // will throw a Deadlock Exception before falling out of the loop
263 _check_deadlock(&start);
264 } // while th == null
265
266 if(_dying < 1)
267 {
268 // initialize the thread data with the work function and parameters
269 th->remove_tsd("work func");
270 th->put_tsd("work func", NULL,
271 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
272 (void *)work);
273 th->remove_tsd("work parm");
274 th->put_tsd("work parm", NULL, sizeof(void *), parm);
275
276 // put the thread on the running list
277 _running.insert_first(th);
278
279 mday 1.1.2.9 // signal the thread's sleep semaphore to awaken it
280 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
281 if(sleep_sem == 0)
282 throw NullPointer();
283 sleep_sem->signal();
284 }
285 else
286 _pool.insert_first(th);
287 }
288
289 // caller is responsible for only calling this routine during slack periods
290 // but should call it at least once per _deadlock_detect with the running q
291 // and at least once per _deallocate_wait for the pool q
292
293 void ThreadPool::_kill_dead_threads(DQueue<Thread> *q, Boolean (*check)(struct timeval *))
294 throw(IPCException)
295 {
296 struct timeval now;
297 gettimeofday(&now, NULL);
298
|
299 mday 1.1.2.11 DQueue<Thread> dead(true) ;
|
300 mday 1.1.2.9
301 if(q->count() > 0 )
302 {
303 try
304 {
305 q->try_lock();
306 }
307 catch(AlreadyLocked & a)
308 {
309 return;
310 }
311
312 Thread *context = 0;
313 struct timeval dt = { 0, 0 };
314 struct timeval *dtp;
315 Thread *th = q->next(context);
316 while (th != 0 )
317 {
318 try
319 {
320 dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
321 mday 1.1.2.9 }
322 catch(AlreadyLocked & a)
323 {
324 context = th;
325 th = q->next(context);
326 continue;
327 }
328
329 if(dtp != 0)
330 {
331 memcpy(&dt, dtp, sizeof(struct timeval));
332
333 }
334 th->dereference_tsd();
335 if( true == check(&dt))
336 {
337 th = q->remove_no_lock((void *)th);
338
339 if(th != 0)
340 {
341 dead.insert_first(th);
342 mday 1.1.2.9 th = 0;
343 }
344 }
345 context = th;
346 th = q->next(context);
347 }
348 q->unlock();
349 }
350
351 if(dead.count())
352 {
353 Thread *th = dead.remove_first();
354 while(th != 0)
355 {
356 th->cancel();
357 th->join();
358 delete th;
359 th = dead.remove_first();
360 }
361 }
362 return;
363 mday 1.1.2.9 }
364
365 Boolean ThreadPool::_check_time(struct timeval *start, struct timeval *interval)
366 {
367 struct timeval now;
368 gettimeofday(&now, NULL);
369 if( (now.tv_sec - start->tv_sec) > interval->tv_sec ||
370 (((now.tv_sec - start->tv_sec) == interval->tv_sec) &&
371 ((now.tv_usec - start->tv_usec) >= interval->tv_usec ) ) )
372 return true;
373 else
374 return false;
375 }
376
|
377 mday 1.1.2.1
378 PEGASUS_NAMESPACE_END
|
379 mday 1.1.2.7
|