1 mike 1.2 //%/////////////////////////////////////////////////////////////////////////////
2 //
|
3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
|
5 mike 1.2 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
7 chip 1.11 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
10 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
|
12 kumpf 1.17 //
|
13 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
14 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
16 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
19 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 //==============================================================================
23 //
24 // Author: Mike Day (mdday@us.ibm.com)
25 //
26 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
27 chip 1.11 // added nsk platform support
|
28 mike 1.2 //
29 //%/////////////////////////////////////////////////////////////////////////////
30
31 #include "Thread.h"
32 #include <Pegasus/Common/IPC.h>
|
33 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
34 mike 1.2
35 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
36 chip 1.11 # include "ThreadWindows.cpp"
|
37 mike 1.2 #elif defined(PEGASUS_OS_TYPE_UNIX)
38 # include "ThreadUnix.cpp"
39 #elif defined(PEGASUS_OS_TYPE_NSK)
40 # include "ThreadNsk.cpp"
41 #else
42 # error "Unsupported platform"
43 #endif
44
45 PEGASUS_NAMESPACE_BEGIN
46
|
47 chip 1.11 void thread_data::default_delete(void * data)
48 {
|
49 mike 1.2 if( data != NULL)
|
50 chip 1.11 ::operator delete(data);
|
51 mike 1.2 }
52
53 Boolean Thread::_signals_blocked = false;
54
55 // for non-native implementations
|
56 chip 1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
|
57 mike 1.2 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
58 {
59 cleanup_handler *cu = new cleanup_handler(routine, parm);
|
60 chip 1.11 try
61 {
62 _cleanup.insert_first(cu);
63 }
64 catch(IPCException&)
|
65 mike 1.2 {
66 delete cu;
|
67 chip 1.11 throw;
|
68 mike 1.2 }
69 return;
70 }
|
71 chip 1.11
|
72 mike 1.2 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
73 {
74 cleanup_handler *cu ;
|
75 chip 1.11 try
76 {
|
77 mike 1.2 cu = _cleanup.remove_first() ;
78 }
|
79 chip 1.11 catch(IPCException&)
|
80 mike 1.2 {
|
81 chip 1.11 PEGASUS_ASSERT(0);
|
82 mike 1.2 }
83 if(execute == true)
84 cu->execute();
85 delete cu;
86 }
|
87 chip 1.11
|
88 mike 1.2 #endif
89
90
|
91 kumpf 1.8 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
|
92 mike 1.2
93
|
94 chip 1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
95 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
96 {
97 // execute the cleanup stack and then return
|
98 mike 1.2 while( _cleanup.count() )
99 {
|
100 chip 1.11 try
101 {
102 cleanup_pop(true);
103 }
104 catch(IPCException&)
105 {
106 PEGASUS_ASSERT(0);
107 break;
|
108 mike 1.2 }
109 }
110 _exit_code = exit_code;
111 exit_thread(exit_code);
|
112 mday 1.4 _handle.thid = 0;
|
113 mike 1.2 }
114
115
116 #endif
117
|
118 mday 1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
119
120
121 void ThreadPool::kill_idle_threads(void)
122 {
123 static struct timeval now, last = {0, 0};
124
125 pegasus_gettimeofday(&now);
126 if(now.tv_sec - last.tv_sec > 5)
127 {
128 _pools.lock();
129 ThreadPool *p = _pools.next(0);
130 while(p != 0)
131 {
132 try
133 {
134 p->kill_dead_threads();
135 }
136 catch(...)
137 {
138 }
139 mday 1.20 p = _pools.next(p);
140 }
141 _pools.unlock();
142 pegasus_gettimeofday(&last);
143 }
144 }
145
146
|
147 mike 1.2 ThreadPool::ThreadPool(Sint16 initial_size,
|
148 kumpf 1.8 const Sint8 *key,
|
149 mike 1.2 Sint16 min,
150 Sint16 max,
151 struct timeval & alloc_wait,
|
152 chip 1.11 struct timeval & dealloc_wait,
|
153 mike 1.2 struct timeval & deadlock_detect)
154 : _max_threads(max), _min_threads(min),
|
155 mday 1.12 _current_threads(0),
156 _pool(true), _running(true),
|
157 mike 1.2 _dead(true), _dying(0)
158 {
159 _allocate_wait.tv_sec = alloc_wait.tv_sec;
160 _allocate_wait.tv_usec = alloc_wait.tv_usec;
|
161 chip 1.11 _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
|
162 mike 1.2 _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
163 _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
164 _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
165 memset(_key, 0x00, 17);
166 if(key != 0)
167 strncpy(_key, key, 16);
|
168 mday 1.21 if(_max_threads > 0 && _max_threads < initial_size)
|
169 mike 1.2 _max_threads = initial_size;
170 if(_min_threads > initial_size)
171 _min_threads = initial_size;
|
172 chip 1.11
|
173 mike 1.2 int i;
174 for(i = 0; i < initial_size; i++)
175 {
176 _link_pool(_init_thread());
177 }
|
178 mday 1.20 _pools.insert_last(this);
179
|
180 mike 1.2 }
181
|
182 chip 1.11
|
183 mike 1.2
184 ThreadPool::~ThreadPool(void)
185 {
|
186 mday 1.19
|
187 mday 1.33 try {
|
188 mday 1.20 _pools.remove(this);
|
189 mike 1.2 _dying++;
|
190 mday 1.12 Thread *th = 0;
|
191 kumpf 1.16 th = _pool.remove_first();
|
192 mike 1.2 while(th != 0)
|
193 chip 1.11 {
|
194 mike 1.2 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
195
196 if(sleep_sem == 0)
197 {
198 th->dereference_tsd();
199 throw NullPointer();
200 }
|
201 chip 1.11
|
202 mike 1.2 sleep_sem->signal();
203 sleep_sem->signal();
204 th->dereference_tsd();
205 // signal the thread's sleep semaphore
206 th->cancel();
207 th->join();
208 th->empty_tsd();
209 delete th;
210 th = _pool.remove_first();
211 }
212
213 th = _running.remove_first();
214 while(th != 0)
215 {
216 // signal the thread's sleep semaphore
217 th->cancel();
218 th->join();
219 th->empty_tsd();
220 delete th;
221 th = _running.remove_first();
222 }
223 mike 1.2
224 th = _dead.remove_first();
225 while(th != 0)
226 {
227 // signal the thread's sleep semaphore
228 th->cancel();
229 th->join();
230 th->empty_tsd();
231 delete th;
232 th = _dead.remove_first();
233 }
|
234 mday 1.33 }
235 catch(...){}
|
236 mike 1.2 }
237
|
238 chip 1.11 // make this static to the class
|
239 mike 1.2 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
240 {
|
241 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
242
|
243 mday 1.33 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool::_loop entered");
|
244 mike 1.2 Thread *myself = (Thread *)parm;
245 if(myself == 0)
|
246 kumpf 1.14 {
247 PEG_METHOD_EXIT();
|
248 mike 1.2 throw NullPointer();
|
249 kumpf 1.14 }
|
250 mike 1.2 ThreadPool *pool = (ThreadPool *)myself->get_parm();
|
251 kumpf 1.14 if(pool == 0 )
252 {
253 PEG_METHOD_EXIT();
|
254 mike 1.2 throw NullPointer();
|
255 kumpf 1.14 }
|
256 mike 1.5 Semaphore *sleep_sem = 0;
|
257 mday 1.13 Semaphore *blocking_sem = 0;
258
|
259 mike 1.5 struct timeval *deadlock_timer = 0;
|
260 chip 1.11
261 try
|
262 mike 1.2 {
263 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
264 myself->dereference_tsd();
265 deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
|
266 mday 1.22 myself->dereference_tsd();
|
267 mike 1.2 }
|
268 mike 1.6 catch(IPCException &)
|
269 mike 1.2 {
|
270 mday 1.32 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
|
271 mday 1.33 "IPCException Caught - EXITING");
|
272 kumpf 1.14 PEG_METHOD_EXIT();
|
273 mike 1.2 myself->exit_self(0);
274 }
|
275 mday 1.30 catch(...)
276 {
|
277 mday 1.32 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
|
278 mday 1.33 "Unknown Exception Caught - EXITING");
|
279 mday 1.30 PEG_METHOD_EXIT();
280 myself->exit_self(0);
281 }
282
|
283 mike 1.2 if(sleep_sem == 0 || deadlock_timer == 0)
|
284 kumpf 1.14 {
|
285 mday 1.32 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
|
286 mday 1.33 "NULL Semaphore - EXITING");
|
287 kumpf 1.14 PEG_METHOD_EXIT();
|
288 mike 1.2 throw NullPointer();
|
289 kumpf 1.14 }
|
290 mike 1.2
291 while(pool->_dying < 1)
292 {
|
293 mday 1.32 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
|
294 mday 1.33 "ThreadPool::_loop - waiting on semaphore");
|
295 mike 1.2 sleep_sem->wait();
|
296 mday 1.32 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
|
297 mday 1.33 "ThreadPool::_loop - awakened from semaphore");
|
298 mike 1.2 // when we awaken we reside on the running queue, not the pool queue
299 if(pool->_dying > 0)
300 break;
|
301 mday 1.32
|
302 chip 1.11
|
303 mike 1.5 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
304 void *parm = 0;
|
305 mike 1.2
|
306 chip 1.11 try
|
307 mike 1.2 {
308 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
309 myself->reference_tsd("work func");
310 myself->dereference_tsd();
311 parm = myself->reference_tsd("work parm");
312 myself->dereference_tsd();
|
313 mday 1.13 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
314 myself->dereference_tsd();
315
|
316 mike 1.2 }
|
317 mike 1.6 catch(IPCException &)
|
318 mike 1.2 {
|
319 mday 1.32 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
|
320 mday 1.33 "IPCException Caught - EXITING");
|
321 kumpf 1.14 PEG_METHOD_EXIT();
|
322 mike 1.2 myself->exit_self(0);
323 }
|
324 chip 1.11
|
325 mike 1.2 if(_work == 0)
|
326 kumpf 1.14 {
|
327 mday 1.32 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
|
328 mday 1.33 "NULL work pointer - EXITING");
|
329 kumpf 1.14 PEG_METHOD_EXIT();
|
330 mike 1.2 throw NullPointer();
|
331 kumpf 1.14 }
|
332 kumpf 1.24
333 if(_work ==
334 (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
335 {
|
336 mday 1.32 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
|
337 mday 1.33 "Calling the Undertaker");
|
338 mday 1.23 _work(parm);
|
339 kumpf 1.24 }
340
|
341 mike 1.2 gettimeofday(deadlock_timer, NULL);
|
342 mday 1.20 try
343 {
|
344 mday 1.32 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
|
345 mday 1.33 "ThreadPool::_loop - calling work routine");
|
346 mday 1.20 _work(parm);
|
347 mday 1.32 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
|
348 mday 1.33 "ThreadPool::_loop - returned from work routine");
|
349 mday 1.20 }
350 catch(...)
351 {
|
352 mday 1.32 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
|
353 mday 1.33 "Unknown Exception Caught - EXITING");
|
354 mday 1.20 gettimeofday(deadlock_timer, NULL);
355 }
356 gettimeofday(deadlock_timer, NULL);
|
357 mday 1.13 if( blocking_sem != 0 )
358 blocking_sem->signal();
359
|
360 chip 1.11 // put myself back onto the available list
361 try
|
362 mike 1.2 {
363 pool->_running.remove((void *)myself);
364 pool->_link_pool(myself);
365 }
|
366 mike 1.6 catch(IPCException &)
|
367 mike 1.2 {
|
368 mday 1.32 Tracer::trace(__FILE__, __LINE__, TRC_THREAD, Tracer::LEVEL4,
|
369 mday 1.33 "IPCException Caught - EXITING");
|
370 kumpf 1.14 PEG_METHOD_EXIT();
|
371 mike 1.2 myself->exit_self(0);
372 }
373 }
374 // wait to be awakend by the thread pool destructor
375 sleep_sem->wait();
376 myself->test_cancel();
|
377 kumpf 1.14
378 PEG_METHOD_EXIT();
|
379 mike 1.2 myself->exit_self(0);
380 return((PEGASUS_THREAD_RETURN)0);
381 }
382
383 void ThreadPool::allocate_and_awaken(void *parm,
384 PEGASUS_THREAD_RETURN \
|
385 mday 1.13 (PEGASUS_THREAD_CDECL *work)(void *),
386 Semaphore *blocking)
387
|
388 mike 1.2 throw(IPCException)
389 {
|
390 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
|
391 mike 1.2 struct timeval start;
392 gettimeofday(&start, NULL);
|
393 chip 1.11
|
394 mike 1.2 Thread *th = _pool.remove_first();
|
395 mday 1.12
|
396 mday 1.7 // wait for the right interval and try again
|
397 mday 1.33 while (th == 0 && _dying < 1)
|
398 mike 1.2 {
|
399 mday 1.12 _check_deadlock(&start) ;
400
|
401 mday 1.21 if(_max_threads == 0 || _current_threads < _max_threads)
|
402 mday 1.33 {
403 th = _init_thread();
404 continue;
405 }
406
407 pegasus_sleep(1);
|
408 mday 1.7 th = _pool.remove_first();
409 }
|
410 chip 1.11
411
|
412 mike 1.2 if(_dying < 1)
413 {
414 // initialize the thread data with the work function and parameters
|
415 kumpf 1.14 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
416 "Initializing thread with work function and parameters: parm = %p",
417 parm);
418
|
419 kumpf 1.15 th->delete_tsd("work func");
|
420 chip 1.11 th->put_tsd("work func", NULL,
|
421 mike 1.2 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
422 (void *)work);
|
423 kumpf 1.15 th->delete_tsd("work parm");
|
424 mike 1.2 th->put_tsd("work parm", NULL, sizeof(void *), parm);
|
425 kumpf 1.15 th->delete_tsd("blocking sem");
|
426 mday 1.13 if(blocking != 0 )
427 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
428
|
429 chip 1.11 // put the thread on the running list
|
430 mike 1.2 _running.insert_first(th);
431
432 // signal the thread's sleep semaphore to awaken it
433 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
434
435 if(sleep_sem == 0)
436 {
437 th->dereference_tsd();
|
438 kumpf 1.14 PEG_METHOD_EXIT();
|
439 mike 1.2 throw NullPointer();
440 }
|
441 kumpf 1.14 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
|
442 mike 1.2 sleep_sem->signal();
443 th->dereference_tsd();
444 }
445 else
446 _pool.insert_first(th);
|
447 kumpf 1.14
448 PEG_METHOD_EXIT();
|
449 mike 1.2 }
450
451 // caller is responsible for only calling this routine during slack periods
452 // but should call it at least once per _deadlock_detect with the running q
453 // and at least once per _deallocate_wait for the pool q
454
|
455 mday 1.12 Uint32 ThreadPool::kill_dead_threads(void)
|
456 mike 1.2 throw(IPCException)
457 {
458 struct timeval now;
459 gettimeofday(&now, NULL);
|
460 mday 1.12 Uint32 bodies = 0;
461
|
462 mike 1.2 // first go thread the dead q and clean it up as much as possible
463 while(_dead.count() > 0)
464 {
|
465 mday 1.32
|
466 kumpf 1.31 #if !defined(PEGASUS_PLATFORM_HPUX_ACC) && !defined(PEGASUS_PLATFORM_LINUX_IA64_GNU)
|
467 mday 1.30 PEGASUS_STD(cout) << "ThreadPool:: removing and joining dead thread" << PEGASUS_STD(endl);
|
468 kumpf 1.31 #endif
|
469 mike 1.2 Thread *dead = _dead.remove_first();
470 if(dead == 0)
471 throw NullPointer();
|
472 mday 1.30 dead->join();
|
473 mike 1.2 delete dead;
474 }
|
475 chip 1.11
476 DQueue<Thread> * map[2] =
|
477 mike 1.2 {
478 &_pool, &_running
479 };
|
480 chip 1.11
481
|
482 mike 1.2 DQueue<Thread> *q = 0;
483 int i = 0;
484 AtomicInt needed(0);
|
485 chip 1.11
|
486 kumpf 1.31 #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
487 // This change prevents the thread pool from killing "hung" threads.
488 // The definition of a "hung" thread is one that has been on the run queue
489 // for longer than the time interval set when the thread pool was created.
490 // Cancelling "hung" threads has proven to be problematic.
491
492 // With this change the thread pool will not cancel "hung" threads. This
493 // may prevent a crash depending upon the state of the "hung" thread. In
494 // the case that the thread is actually hung, this change causes the
495 // thread resources not to be reclaimed.
496
497 // Idle threads, those that have not executed a routine for a time
498 // interval, continue to be destroyed. This is normal and should not
499 // cause any problems.
500 for( ; i < 1; i++)
501 #else
|
502 mday 1.30 for( ; i < 2; i++)
|
503 kumpf 1.31 #endif
|
504 mday 1.21 {
505 q = map[i];
|
506 mike 1.2 if(q->count() > 0 )
507 {
|
508 chip 1.11 try
|
509 mike 1.2 {
510 q->try_lock();
511 }
|
512 mday 1.18 catch(...)
|
513 mike 1.2 {
|
514 mday 1.18 return bodies;
|
515 mike 1.2 }
516
517 struct timeval dt = { 0, 0 };
518 struct timeval *dtp;
519 Thread *th = 0;
520 th = q->next(th);
521 while (th != 0 )
522 {
|
523 chip 1.11 try
|
524 mike 1.2 {
525 dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
526 }
|
527 mday 1.18 catch(...)
|
528 mike 1.2 {
|
529 kumpf 1.25 q->unlock();
|
530 mday 1.18 return bodies;
|
531 mike 1.2 }
|
532 chip 1.11
|
533 mike 1.2 if(dtp != 0)
534 {
535 memcpy(&dt, dtp, sizeof(struct timeval));
536 }
537 th->dereference_tsd();
538 struct timeval deadlock_timeout;
|
539 mday 1.18 Boolean too_long;
540 if( i == 0)
541 {
542 too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
543 }
544 else
545 {
|
546 mday 1.22 too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
|
547 mday 1.18 }
548
549 if( true == too_long)
|
550 mike 1.2 {
551 // if we are deallocating from the pool, escape if we are
|
552 chip 1.11 // down to the minimum thread count
|
553 mday 1.13 _current_threads--;
|
554 mday 1.18 if( _current_threads.value() < (Uint32)_min_threads )
|
555 mike 1.2 {
|
556 mday 1.13 if( i == 0)
|
557 mike 1.2 {
|
558 mday 1.13 _current_threads++;
|
559 mike 1.2 th = q->next(th);
560 continue;
561 }
|
562 chip 1.11 else
|
563 mike 1.2 {
|
564 chip 1.11 // we are killing a hung thread and we will drop below the
|
565 mike 1.2 // minimum. create another thread to make up for the one
566 // we are about to kill
567 needed++;
568 }
569 }
|
570 chip 1.11
|
571 mike 1.2 th = q->remove_no_lock((void *)th);
|
572 chip 1.11
|
573 mike 1.2 if(th != 0)
574 {
|
575 mday 1.30 if( i == 0 )
|
576 mike 1.2 {
|
577 mday 1.30 th->delete_tsd("work func");
578 th->put_tsd("work func", NULL,
579 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
580 (void *)&_undertaker);
581 th->delete_tsd("work parm");
582 th->put_tsd("work parm", NULL, sizeof(void *), th);
583
584 // signal the thread's sleep semaphore to awaken it
585 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
586
587 if(sleep_sem == 0)
588 {
589 q->unlock();
590 th->dereference_tsd();
591 throw NullPointer();
592 }
593
594 bodies++;
|
595 mike 1.2 th->dereference_tsd();
|
596 mday 1.30 _dead.insert_first(th);
597 sleep_sem->signal();
598 th = 0;
599 }
600 else
601 {
602 // deadlocked threads
|
603 kumpf 1.31 #if !defined(PEGASUS_PLATFORM_HPUX_ACC) && !defined(PEGASUS_PLATFORM_LINUX_IA64_GNU)
|
604 mday 1.30 PEGASUS_STD(cout) << "Killing a deadlocked thread" << PEGASUS_STD(endl);
|
605 kumpf 1.31 #endif
|
606 mday 1.30 th->cancel();
607 delete th;
|
608 mike 1.2 }
609 }
610 }
611 th = q->next(th);
|
612 mday 1.20 pegasus_sleep(1);
|
613 mike 1.2 }
614 q->unlock();
615 while (needed.value() > 0)
616 {
617 _link_pool(_init_thread());
618 needed--;
|
619 mday 1.20 pegasus_sleep(0);
|
620 mike 1.2 }
621 }
622 }
|
623 mday 1.18 return bodies;
|
624 mike 1.2 }
625
|
626 mday 1.12
|
627 mike 1.2 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
628 {
|
629 mday 1.22 // never time out if the interval is zero
630 if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
631 return false;
632
|
633 mday 1.13 struct timeval now, finish, remaining;
634 Uint32 usec;
|
635 mday 1.33 pegasus_gettimeofday(&now);
|
636 mday 1.13
637 finish.tv_sec = start->tv_sec + interval->tv_sec;
638 usec = start->tv_usec + interval->tv_usec;
639 finish.tv_sec += (usec / 1000000);
640 usec %= 1000000;
641 finish.tv_usec = usec;
642
643 if ( timeval_subtract(&remaining, &finish, &now) )
|
644 mike 1.2 return true;
645 else
646 return false;
647 }
648
649 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
650 {
|
651 mday 1.30 exit_thread((PEGASUS_THREAD_RETURN)1);
652 return (PEGASUS_THREAD_RETURN)1;
|
653 mike 1.2 }
|
654 mday 1.19
655
656 void ThreadPool::_sleep_sem_del(void *p)
657 {
658 if(p != 0)
659 {
660 delete (Semaphore *)p;
661 }
662 }
663
664 void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
665 {
666 if (true == check_time(start, &_deadlock_detect))
667 throw Deadlock(pegasus_thread_self());
668 return;
669 }
670
671
672 Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
673 {
674 return(check_time(start, &_deadlock_detect));
675 mday 1.19 }
676
677 Boolean ThreadPool::_check_dealloc(struct timeval *start)
678 {
679 return(check_time(start, &_deallocate_wait));
680 }
681
682 Thread *ThreadPool::_init_thread(void) throw(IPCException)
683 {
|
684 mday 1.32 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_init_thread");
|
685 mday 1.19 Thread *th = (Thread *) new Thread(_loop, this, false);
686 // allocate a sleep semaphore and pass it in the thread context
687 // initial count is zero, loop function will sleep until
688 // we signal the semaphore
689 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
690 th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
691
692 struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
693 th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
694 // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
695 th->run();
696 _current_threads++;
697 pegasus_yield();
|
698 mday 1.32 PEG_METHOD_EXIT();
|
699 mday 1.19
700 return th;
701 }
702
703 void ThreadPool::_link_pool(Thread *th) throw(IPCException)
704 {
705 if(th == 0)
706 throw NullPointer();
707 _pool.insert_first(th);
708 }
709
|
710 mike 1.2
711
712 PEGASUS_NAMESPACE_END
713
|