1 mday 1.1.2.17
|
2 mday 1.1.2.1 //%/////////////////////////////////////////////////////////////////////////////
3 //
|
4 rudy 1.1.2.13 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM,
5 // Compaq Computer Corporation
|
6 mday 1.1.2.1 //
7 // Permission is hereby granted, free of charge, to any person obtaining a copy
8 // of this software and associated documentation files (the "Software"), to
9 // deal in the Software without restriction, including without limitation the
10 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
11 // sell copies of the Software, and to permit persons to whom the Software is
12 // furnished to do so, subject to the following conditions:
13 //
14 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
15 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
16 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
17 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
18 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
19 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
20 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
21 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
22 //
23 //==============================================================================
24 //
|
25 mday 1.1.2.2 // Author: Mike Day (mdday@us.ibm.com)
|
26 mday 1.1.2.1 //
|
27 rudy 1.1.2.13 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
28 // added nsk platform support
|
29 mday 1.1.2.1 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include "Thread.h"
|
33 mday 1.1.2.4 #include <Pegasus/Common/IPC.h>
|
34 mday 1.1.2.1
35 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
36 mday 1.1.2.16 # include "ThreadWindows.cpp"
|
37 mday 1.1.2.1 #elif defined(PEGASUS_OS_TYPE_UNIX)
38 # include "ThreadUnix.cpp"
|
39 rudy 1.1.2.13 #elif defined(PEGASUS_OS_TYPE_NSK)
40 # include "ThreadNsk.cpp"
|
41 mday 1.1.2.1 #else
42 # error "Unsupported platform"
43 #endif
44
45 PEGASUS_NAMESPACE_BEGIN
46
|
47 mday 1.1.2.8 void thread_data::default_delete(void * data)
48 {
49 if( data != NULL)
50 ::operator delete(data);
51 }
52
|
53 mday 1.1.2.3 Boolean Thread::_signals_blocked = false;
|
54 mday 1.1.2.1
55 // for non-native implementations
56 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
57 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
58 {
|
59 mday 1.1.2.5 cleanup_handler *cu = new cleanup_handler(routine, parm);
60 try
61 {
62 _cleanup.insert_first(cu);
63 }
64 catch(IPCException& e)
65 {
|
66 mday 1.1.2.7 delete cu;
|
67 mday 1.1.2.5 throw;
68 }
69 return;
|
70 mday 1.1.2.1 }
|
71 mday 1.1.2.5
|
72 kumpf 1.1.2.6 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
|
73 mday 1.1.2.1 {
|
74 mday 1.1.2.5 cleanup_handler *cu ;
75 try
76 {
77 cu = _cleanup.remove_first() ;
78 }
79 catch(IPCException& e)
|
80 mday 1.1.2.7 {
|
81 kumpf 1.1.2.6 PEGASUS_ASSERT(0);
|
82 mday 1.1.2.14 }
|
83 mday 1.1.2.5 if(execute == true)
84 cu->execute();
85 delete cu;
|
86 mday 1.1.2.1 }
|
87 mday 1.1.2.5
|
88 mday 1.1.2.3 #endif
|
89 mday 1.1.2.4
90
91 //thread_data *Thread::put_tsd(Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
92
|
93 mday 1.1.2.2
|
94 mday 1.1.2.3 #ifndef PEGASUS_THREAD_EXIT_NATIVE
|
95 mday 1.1.2.2 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
96 {
|
97 mday 1.1.2.5 // execute the cleanup stack and then return
|
98 mday 1.1.2.17 while( _cleanup.count() )
|
99 mday 1.1.2.5 {
100 try
101 {
102 cleanup_pop(true);
103 }
104 catch(IPCException& e)
105 {
|
106 mday 1.1.2.7 PEGASUS_ASSERT(0);
107 break;
|
108 mday 1.1.2.5 }
109 }
|
110 mday 1.1.2.7 _exit_code = exit_code;
111 exit_thread(exit_code);
|
112 mday 1.1.2.2 }
|
113 mday 1.1.2.4
|
114 mday 1.1.2.7
|
115 mday 1.1.2.1 #endif
|
116 mday 1.1.2.9
|
117 mday 1.1.2.14 ThreadPool::ThreadPool(Sint16 initial_size,
118 Sint8 *key,
119 Sint16 min,
120 Sint16 max,
121 struct timeval & alloc_wait,
122 struct timeval & dealloc_wait,
123 struct timeval & deadlock_detect)
|
124 mday 1.1.2.9 : _max_threads(max), _min_threads(min),
125 _current_threads(0), _waiters(initial_size),
126 _pool_sem(0), _pool(true), _running(true),
|
127 mday 1.1.2.14 _dead(true), _dying(0)
|
128 mday 1.1.2.9 {
|
129 mday 1.1.2.14 _allocate_wait.tv_sec = alloc_wait.tv_sec;
130 _allocate_wait.tv_usec = alloc_wait.tv_usec;
131 _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
132 _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
133 _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
134 _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
|
135 mday 1.1.2.9 memset(_key, 0x00, 17);
136 if(key != 0)
137 strncpy(_key, key, 16);
138 if(_max_threads < initial_size)
139 _max_threads = initial_size;
140 if(_min_threads > initial_size)
141 _min_threads = initial_size;
142
143 int i;
144 for(i = 0; i < initial_size; i++)
145 {
146 _link_pool(_init_thread());
147 }
148 }
149
|
150 mday 1.1.2.14
151
|
152 mday 1.1.2.9 ThreadPool::~ThreadPool(void)
153 {
154 _dying++;
155 Thread *th = _pool.remove_first();
156 while(th != 0)
|
157 mday 1.1.2.14 {
158 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
159
160 if(sleep_sem == 0)
161 {
162 th->dereference_tsd();
163 throw NullPointer();
164 }
165
166 sleep_sem->signal();
167 sleep_sem->signal();
168 th->dereference_tsd();
|
169 mday 1.1.2.9 // signal the thread's sleep semaphore
170 th->cancel();
171 th->join();
172 th->empty_tsd();
173 delete th;
174 th = _pool.remove_first();
175 }
|
176 mday 1.1.2.14
177 th = _running.remove_first();
178 while(th != 0)
179 {
180 // signal the thread's sleep semaphore
181 th->cancel();
182 th->join();
183 th->empty_tsd();
184 delete th;
185 th = _running.remove_first();
186 }
187
188 th = _dead.remove_first();
189 while(th != 0)
190 {
191 // signal the thread's sleep semaphore
192 th->cancel();
193 th->join();
194 th->empty_tsd();
195 delete th;
196 th = _dead.remove_first();
197 mday 1.1.2.14 }
|
198 mday 1.1.2.9 }
199
200 // make this static to the class
201 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
202 {
203 Thread *myself = (Thread *)parm;
204 if(myself == 0)
205 throw NullPointer();
206 ThreadPool *pool = (ThreadPool *)myself->get_parm();
207 if(pool == 0 )
208 throw NullPointer();
209 Semaphore *sleep_sem;
210 struct timeval *deadlock_timer;
211
212 try
213 {
214 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
215 myself->dereference_tsd();
216 deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
217 myself->dereference_tsd();
218 }
219 mday 1.1.2.9 catch(IPCException & e)
220 {
221 myself->exit_self(0);
222 }
223 if(sleep_sem == 0 || deadlock_timer == 0)
224 throw NullPointer();
225
226 while(pool->_dying < 1)
227 {
228 sleep_sem->wait();
229 // when we awaken we reside on the running queue, not the pool queue
|
230 mday 1.1.2.14 if(pool->_dying > 0)
231 break;
|
232 mday 1.1.2.19
|
233 mday 1.1.2.9
234 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *);
235 void *parm;
236
237 try
238 {
239 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
240 myself->reference_tsd("work func");
241 myself->dereference_tsd();
242 parm = myself->reference_tsd("work parm");
243 myself->dereference_tsd();
244 }
245 catch(IPCException & e)
246 {
247 myself->exit_self(0);
248 }
249
250 if(_work == 0)
251 throw NullPointer();
|
252 mday 1.1.2.19 gettimeofday(deadlock_timer, NULL);
|
253 mday 1.1.2.9 _work(parm);
254
255 // put myself back onto the available list
256 try
257 {
258 pool->_running.remove((void *)myself);
259 pool->_link_pool(myself);
260 }
261 catch(IPCException & e)
262 {
263 myself->exit_self(0);
264 }
265 }
|
266 mday 1.1.2.14 // wait to be awakend by the thread pool destructor
267 sleep_sem->wait();
268 myself->test_cancel();
|
269 mday 1.1.2.9 myself->exit_self(0);
270 return((PEGASUS_THREAD_RETURN)0);
271 }
272
273
274 void ThreadPool::allocate_and_awaken(void *parm,
275 PEGASUS_THREAD_RETURN \
276 (PEGASUS_THREAD_CDECL *work)(void *))
277 throw(IPCException)
278 {
279 struct timeval start;
280 gettimeofday(&start, NULL);
281
282 Thread *th = _pool.remove_first();
283
284 while (th == 0 && _dying < 1)
285 {
286 try // we couldn't get a free thread from the pool
287 {
288 // wait for the right interval and try again
289 while(th == 0 && _dying < 1)
290 mday 1.1.2.9 {
291 _check_deadlock(&start);
292 Uint32 interval = _allocate_wait.tv_sec * 1000;
293 if(_allocate_wait.tv_usec > 0)
294 interval += (_deallocate_wait.tv_usec / 1000);
295 // will throw a timeout if no thread comes free
296 _pool_sem.time_wait(interval);
297 th = _pool.remove_first();
298 }
299 }
300 catch(TimeOut & to)
301 {
302 if(_current_threads < _max_threads)
303 {
304 th = _init_thread();
305 break;
306 }
307 }
308 // will throw a Deadlock Exception before falling out of the loop
|
309 mday 1.1.2.14
310 _check_deadlock(&start);
311
|
312 mday 1.1.2.9 } // while th == null
313
314 if(_dying < 1)
315 {
316 // initialize the thread data with the work function and parameters
317 th->remove_tsd("work func");
318 th->put_tsd("work func", NULL,
319 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
320 (void *)work);
321 th->remove_tsd("work parm");
322 th->put_tsd("work parm", NULL, sizeof(void *), parm);
323
324 // put the thread on the running list
325 _running.insert_first(th);
326
327 // signal the thread's sleep semaphore to awaken it
328 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
329 mday 1.1.2.14
|
330 mday 1.1.2.9 if(sleep_sem == 0)
|
331 mday 1.1.2.14 {
332 th->dereference_tsd();
|
333 mday 1.1.2.9 throw NullPointer();
|
334 mday 1.1.2.14 }
335
|
336 mday 1.1.2.9 sleep_sem->signal();
|
337 mday 1.1.2.14 th->dereference_tsd();
|
338 mday 1.1.2.9 }
339 else
340 _pool.insert_first(th);
341 }
342
343 // caller is responsible for only calling this routine during slack periods
344 // but should call it at least once per _deadlock_detect with the running q
345 // and at least once per _deallocate_wait for the pool q
346
|
347 mday 1.1.2.14 void ThreadPool::kill_dead_threads(void)
348 throw(IPCException)
|
349 mday 1.1.2.9 {
350 struct timeval now;
351 gettimeofday(&now, NULL);
352
|
353 mday 1.1.2.14
354 // first go thread the dead q and clean it up as much as possible
355 while(_dead.count() > 0)
|
356 mday 1.1.2.9 {
|
357 mday 1.1.2.14 Thread *dead = _dead.remove_first();
358 if(dead == 0)
359 throw NullPointer();
360 if(dead->_handle.thid != 0)
|
361 mday 1.1.2.9 {
|
362 mday 1.1.2.17 dead->detach();
|
363 mday 1.1.2.14 destroy_thread(dead->_handle.thid, 0);
|
364 mday 1.1.2.17 dead->_handle.thid = 0;
|
365 mday 1.1.2.14 while(dead->_cleanup.count() )
366 {
|
367 mday 1.1.2.18 // this may throw a permission exception,
368 // which I will remove from the code prior to stabilizing
|
369 mday 1.1.2.14 dead->cleanup_pop(true);
370 }
|
371 mday 1.1.2.9 }
|
372 mday 1.1.2.14 delete dead;
373 }
374
375 DQueue<Thread> * map[2] =
|
376 mday 1.1.2.9 {
|
377 mday 1.1.2.14 &_pool, &_running
378 };
379
380
381 DQueue<Thread> *q = 0;
382 int i = 0;
383 AtomicInt needed(0);
384
385 for( q = map[i] ; i < 2; i++, q = map[i])
386 {
387 if(q->count() > 0 )
|
388 mday 1.1.2.9 {
389 try
390 {
|
391 mday 1.1.2.14 q->try_lock();
|
392 mday 1.1.2.9 }
393 catch(AlreadyLocked & a)
394 {
|
395 mday 1.1.2.14 q++;
|
396 mday 1.1.2.9 continue;
397 }
|
398 mday 1.1.2.14
399 struct timeval dt = { 0, 0 };
400 struct timeval *dtp;
401 Thread *th = 0;
402 th = q->next(th);
403 while (th != 0 )
|
404 mday 1.1.2.9 {
|
405 mday 1.1.2.14 try
406 {
407 dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
408 }
409 catch(AlreadyLocked & a)
410 {
411 th = q->next(th);
412 continue;
413 }
414
415 if(dtp != 0)
416 {
417 memcpy(&dt, dtp, sizeof(struct timeval));
|
418 mday 1.1.2.9
|
419 mday 1.1.2.14 }
420 th->dereference_tsd();
421 struct timeval deadlock_timeout;
422 if( true == check_time(&dt, get_deadlock_detect(&deadlock_timeout) ))
|
423 mday 1.1.2.9 {
|
424 mday 1.1.2.14 // if we are deallocating from the pool, escape if we are
425 // down to the minimum thread count
426 if( _current_threads.value() <= (Uint32)_min_threads )
427 {
428 if( i == 1)
429 {
430 th = q->next(th);
431 continue;
432 }
433 else
434 {
435 // we are killing a hung thread and we will drop below the
436 // minimum. create another thread to make up for the one
437 // we are about to kill
438 needed++;
439 }
440 }
441
442 th = q->remove_no_lock((void *)th);
443
444 if(th != 0)
445 mday 1.1.2.14 {
446 th->remove_tsd("work func");
447 th->put_tsd("work func", NULL,
448 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
449 (void *)&_undertaker);
450 th->remove_tsd("work parm");
451 th->put_tsd("work parm", NULL, sizeof(void *), th);
452
453 // signal the thread's sleep semaphore to awaken it
454 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
455
456 if(sleep_sem == 0)
457 {
458 th->dereference_tsd();
459 throw NullPointer();
460 }
461 // put the thread on the dead list
462 _dead.insert_first(th);
463 sleep_sem->signal();
464 th->dereference_tsd();
465 th = 0;
466 mday 1.1.2.14 }
|
467 mday 1.1.2.9 }
|
468 mday 1.1.2.14 th = q->next(th);
469 }
470 q->unlock();
471 while (needed.value() > 0)
472 {
473 _link_pool(_init_thread());
474 needed--;
|
475 mday 1.1.2.9 }
476 }
477 }
478
|
479 mday 1.1.2.14
|
480 mday 1.1.2.9 return;
481 }
482
|
483 mday 1.1.2.14 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
|
484 mday 1.1.2.9 {
485 struct timeval now;
486 gettimeofday(&now, NULL);
487 if( (now.tv_sec - start->tv_sec) > interval->tv_sec ||
488 (((now.tv_sec - start->tv_sec) == interval->tv_sec) &&
489 ((now.tv_usec - start->tv_usec) >= interval->tv_usec ) ) )
490 return true;
491 else
492 return false;
|
493 mday 1.1.2.14 }
494
495
496 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
497 {
498 Thread *myself = reinterpret_cast<Thread *>(parm);
499 if(myself != 0)
500 {
501 myself->detach();
502 myself->_handle.thid = 0;
503 myself->cancel();
504 myself->test_cancel();
505 myself->exit_self(0);
506 }
507 return((PEGASUS_THREAD_RETURN)0);
|
508 mday 1.1.2.9 }
509
|
510 mday 1.1.2.1
511 PEGASUS_NAMESPACE_END
|
512 mday 1.1.2.7
|