1 mike 1.2 //%/////////////////////////////////////////////////////////////////////////////
2 //
|
3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
|
5 mike 1.2 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
7 chip 1.11 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
10 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
|
12 kumpf 1.17 //
|
13 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
14 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
16 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
19 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 //==============================================================================
23 //
24 // Author: Mike Day (mdday@us.ibm.com)
25 //
26 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
27 chip 1.11 // added nsk platform support
|
28 mike 1.2 //
29 //%/////////////////////////////////////////////////////////////////////////////
30
31 #include "Thread.h"
32 #include <Pegasus/Common/IPC.h>
|
33 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
34 mike 1.2
35 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
36 chip 1.11 # include "ThreadWindows.cpp"
|
37 mike 1.2 #elif defined(PEGASUS_OS_TYPE_UNIX)
38 # include "ThreadUnix.cpp"
39 #elif defined(PEGASUS_OS_TYPE_NSK)
40 # include "ThreadNsk.cpp"
41 #else
42 # error "Unsupported platform"
43 #endif
44
45 PEGASUS_NAMESPACE_BEGIN
46
|
47 mday 1.42
|
48 chip 1.11 void thread_data::default_delete(void * data)
49 {
|
50 mike 1.2 if( data != NULL)
|
51 chip 1.11 ::operator delete(data);
|
52 mike 1.2 }
53
|
54 chuck 1.43 // l10n start
55 void language_delete(void * data)
56 {
57 if( data != NULL)
58 {
59 AcceptLanguages * al = static_cast<AcceptLanguages *>(data);
|
60 chuck 1.44 delete al;
|
61 chuck 1.43 }
62 }
63 // l10n end
64
|
65 mike 1.2 Boolean Thread::_signals_blocked = false;
|
66 chuck 1.37 // l10n
|
67 mday 1.48 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = -1;
|
68 chuck 1.37 Boolean Thread::_key_initialized = false;
|
69 chuck 1.41 Boolean Thread::_key_error = false;
|
70 chuck 1.37
|
71 mike 1.2
72 // for non-native implementations
|
73 chip 1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
|
74 mike 1.2 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
75 {
76 cleanup_handler *cu = new cleanup_handler(routine, parm);
|
77 chip 1.11 try
78 {
79 _cleanup.insert_first(cu);
80 }
81 catch(IPCException&)
|
82 mike 1.2 {
83 delete cu;
|
84 chip 1.11 throw;
|
85 mike 1.2 }
86 return;
87 }
|
88 chip 1.11
|
89 mike 1.2 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
90 {
91 cleanup_handler *cu ;
|
92 chip 1.11 try
93 {
|
94 mike 1.2 cu = _cleanup.remove_first() ;
95 }
|
96 chip 1.11 catch(IPCException&)
|
97 mike 1.2 {
|
98 chip 1.11 PEGASUS_ASSERT(0);
|
99 mike 1.2 }
100 if(execute == true)
101 cu->execute();
102 delete cu;
103 }
|
104 chip 1.11
|
105 mike 1.2 #endif
106
107
|
108 kumpf 1.8 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
|
109 mike 1.2
110
|
111 chip 1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
112 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
113 {
114 // execute the cleanup stack and then return
|
115 mike 1.2 while( _cleanup.count() )
116 {
|
117 chip 1.11 try
118 {
119 cleanup_pop(true);
120 }
121 catch(IPCException&)
122 {
123 PEGASUS_ASSERT(0);
124 break;
|
125 mike 1.2 }
126 }
127 _exit_code = exit_code;
128 exit_thread(exit_code);
|
129 mday 1.4 _handle.thid = 0;
|
130 mike 1.2 }
131
132
133 #endif
134
|
135 chuck 1.37 // l10n start
|
136 chuck 1.39 Sint8 Thread::initializeKey()
137 {
138 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
139 if (!Thread::_key_initialized)
140 {
|
141 chuck 1.41 if (Thread::_key_error)
142 {
143 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
144 "Thread: ERROR - thread key error");
145 return -1;
146 }
147
|
148 chuck 1.39 if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
149 {
150 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
151 "Thread: able to create a thread key");
152 Thread::_key_initialized = true;
153 }
154 else
155 {
156 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157 "Thread: ERROR - unable to create a thread key");
|
158 chuck 1.41 Thread::_key_error = true;
|
159 chuck 1.39 return -1;
160 }
161 }
162
163 PEG_METHOD_EXIT();
164 return 0;
165 }
166
|
167 chuck 1.37 Thread * Thread::getCurrent()
168 {
|
169 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
|
170 chuck 1.40 if (Thread::initializeKey() != 0)
|
171 chuck 1.39 {
172 return NULL;
173 }
|
174 chuck 1.38 PEG_METHOD_EXIT();
|
175 chuck 1.39 return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
176 }
177
178 void Thread::setCurrent(Thread * thrd)
179 {
180 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
181 if (Thread::initializeKey() == 0)
182 {
183 if (pegasus_set_thread_specific(Thread::_platform_thread_key,
184 (void *) thrd) == 0)
185 {
186 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
187 "Successful set Thread * into thread specific storage");
188 }
189 else
190 {
191 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
192 "ERROR: got error setting Thread * into thread specific storage");
193 }
194 }
195 PEG_METHOD_EXIT();
|
196 chuck 1.37 }
197
198 AcceptLanguages * Thread::getLanguages()
199 {
|
200 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
|
201 chuck 1.37
202 Thread * curThrd = Thread::getCurrent();
203 if (curThrd == NULL)
204 return NULL;
205 AcceptLanguages * acceptLangs =
206 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
207 curThrd->dereference_tsd();
208 PEG_METHOD_EXIT();
209 return acceptLangs;
210 }
211
212 void Thread::setLanguages(AcceptLanguages *langs) //l10n
213 {
|
214 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
|
215 chuck 1.37
216 Thread * currentThrd = Thread::getCurrent();
217 if (currentThrd != NULL)
218 {
219 // deletes the old tsd and creates a new one
220 currentThrd->put_tsd("acceptLanguages",
|
221 chuck 1.43 language_delete,
|
222 chuck 1.37 sizeof(AcceptLanguages *),
223 langs);
224 }
225
226 PEG_METHOD_EXIT();
227 }
228
229 void Thread::clearLanguages() //l10n
230 {
|
231 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
|
232 chuck 1.37
233 Thread * currentThrd = Thread::getCurrent();
234 if (currentThrd != NULL)
235 {
236 // deletes the old tsd
237 currentThrd->delete_tsd("acceptLanguages");
238 }
239
240 PEG_METHOD_EXIT();
241 }
242 // l10n end
243
|
244 mday 1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
245
246
247 void ThreadPool::kill_idle_threads(void)
248 {
249 static struct timeval now, last = {0, 0};
250
251 pegasus_gettimeofday(&now);
252 if(now.tv_sec - last.tv_sec > 5)
253 {
254 _pools.lock();
255 ThreadPool *p = _pools.next(0);
256 while(p != 0)
257 {
258 try
259 {
260 p->kill_dead_threads();
261 }
262 catch(...)
263 {
264 }
265 mday 1.20 p = _pools.next(p);
266 }
267 _pools.unlock();
268 pegasus_gettimeofday(&last);
269 }
270 }
271
272
|
273 mike 1.2 ThreadPool::ThreadPool(Sint16 initial_size,
|
274 kumpf 1.8 const Sint8 *key,
|
275 mike 1.2 Sint16 min,
276 Sint16 max,
277 struct timeval & alloc_wait,
|
278 chip 1.11 struct timeval & dealloc_wait,
|
279 mike 1.2 struct timeval & deadlock_detect)
280 : _max_threads(max), _min_threads(min),
|
281 mday 1.12 _current_threads(0),
282 _pool(true), _running(true),
|
283 mike 1.2 _dead(true), _dying(0)
284 {
285 _allocate_wait.tv_sec = alloc_wait.tv_sec;
286 _allocate_wait.tv_usec = alloc_wait.tv_usec;
|
287 chip 1.11 _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
|
288 mike 1.2 _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
289 _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
290 _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
291 memset(_key, 0x00, 17);
292 if(key != 0)
293 strncpy(_key, key, 16);
|
294 mday 1.21 if(_max_threads > 0 && _max_threads < initial_size)
|
295 mike 1.2 _max_threads = initial_size;
296 if(_min_threads > initial_size)
297 _min_threads = initial_size;
|
298 chip 1.11
|
299 mike 1.2 int i;
300 for(i = 0; i < initial_size; i++)
301 {
302 _link_pool(_init_thread());
303 }
|
304 mday 1.20 _pools.insert_last(this);
|
305 mike 1.2 }
306
|
307 chip 1.11
|
308 mike 1.2
309 ThreadPool::~ThreadPool(void)
310 {
|
311 mday 1.47
|
312 mday 1.35 try
|
313 mday 1.47 {
314 {
315 auto_mutex(&(this->_monitor));
316 _dying++;
317 }
318
|
319 mday 1.35 _pools.remove(this);
320 Thread *th = 0;
321 th = _pool.remove_first();
322 while(th != 0)
|
323 mike 1.2 {
|
324 mday 1.35 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
325
326 if(sleep_sem == 0)
327 {
328 th->dereference_tsd();
329 throw NullPointer();
330 }
|
331 mday 1.47
|
332 s.hills 1.49 // Signal to get the thread out of the work loop.
333 sleep_sem->signal();
334 // Signal to get the thread past the end. See the comment
335 // "wait to be awakend by the thread pool destructor"
336 // Note: the current implementation of Thread for Windows
337 // does not implement "pthread" cancelation points so this
338 // is needed.
|
339 mday 1.35 sleep_sem->signal();
|
340 s.hills 1.49
|
341 mike 1.2 th->dereference_tsd();
|
342 mday 1.35 // signal the thread's sleep semaphore
343 th->cancel();
344 th->join();
345 th->empty_tsd();
346 delete th;
347 th = _pool.remove_first();
|
348 mike 1.2 }
|
349 mday 1.47
350 th = _dead.remove_first();
|
351 mday 1.35 while(th != 0)
352 {
|
353 mday 1.47 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
354
355 if(sleep_sem == 0)
356 {
357 th->dereference_tsd();
358 throw NullPointer();
359 }
360
361
362 sleep_sem->signal();
363 th->dereference_tsd();
364
|
365 mday 1.35 // signal the thread's sleep semaphore
366 th->cancel();
367 th->join();
368 th->empty_tsd();
369 delete th;
|
370 mday 1.47 th = _dead.remove_first();
|
371 mday 1.35 }
|
372 mday 1.47 {
373
374 auto_mutex(&(this->_monitor));
375 th = _running.remove_first();
|
376 mday 1.35 while(th != 0)
|
377 mday 1.47 {
|
378 mday 1.35 // signal the thread's sleep semaphore
|
379 mday 1.47 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
380 if(sleep_sem == 0 )
381 {
382 th->dereference_tsd();
383 throw NullPointer();
384 }
385
386 sleep_sem->signal();
387 th->dereference_tsd();
388
|
389 mday 1.35 th->cancel();
|
390 mday 1.47
391 // ensure that th->run() has a chance to execute so that the join will not
392 // block
|
393 mday 1.35 th->join();
394 th->empty_tsd();
395 delete th;
|
396 mday 1.47 th = _running.remove_first();
397 }
|
398 mday 1.35 }
|
399 mday 1.47
|
400 mike 1.2 }
|
401 mday 1.47
|
402 mday 1.35 catch(...)
|
403 mike 1.2 {
404 }
|
405 mday 1.47
|
406 mike 1.2 }
407
|
408 chip 1.11 // make this static to the class
|
409 mike 1.2 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
410 {
|
411 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
412
|
413 mike 1.2 Thread *myself = (Thread *)parm;
414 if(myself == 0)
|
415 kumpf 1.14 {
416 PEG_METHOD_EXIT();
|
417 mike 1.2 throw NullPointer();
|
418 kumpf 1.14 }
|
419 chuck 1.37
420 // l10n
421 // Set myself into thread specific storage
|
422 chuck 1.38 // This will allow code to get its own Thread
|
423 chuck 1.39 Thread::setCurrent(myself);
424
|
425 mike 1.2 ThreadPool *pool = (ThreadPool *)myself->get_parm();
|
426 kumpf 1.14 if(pool == 0 )
427 {
428 PEG_METHOD_EXIT();
|
429 mike 1.2 throw NullPointer();
|
430 kumpf 1.14 }
|
431 mday 1.47
|
432 mike 1.5 Semaphore *sleep_sem = 0;
|
433 mday 1.13 Semaphore *blocking_sem = 0;
434
|
435 mike 1.5 struct timeval *deadlock_timer = 0;
|
436 mday 1.47
|
437 chip 1.11 try
|
438 mike 1.2 {
439 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
440 myself->dereference_tsd();
441 deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
|
442 mday 1.22 myself->dereference_tsd();
|
443 mike 1.2 }
|
444 mike 1.6 catch(IPCException &)
|
445 mike 1.2 {
|
446 kumpf 1.14 PEG_METHOD_EXIT();
|
447 mday 1.47 return(0);
|
448 mike 1.2 }
|
449 mday 1.30 catch(...)
450 {
451 PEG_METHOD_EXIT();
|
452 mday 1.47 return(0);
|
453 mday 1.30 }
454
|
455 mike 1.2 if(sleep_sem == 0 || deadlock_timer == 0)
|
456 kumpf 1.14 {
457 PEG_METHOD_EXIT();
|
458 mike 1.2 throw NullPointer();
|
459 kumpf 1.14 }
|
460 mike 1.2
|
461 mday 1.47 while(pool->_dying.value() < 1)
|
462 mike 1.2 {
463 sleep_sem->wait();
|
464 mday 1.35
|
465 mike 1.2 // when we awaken we reside on the running queue, not the pool queue
|
466 chip 1.11
|
467 mday 1.47
|
468 mike 1.5 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
469 void *parm = 0;
|
470 mike 1.2
|
471 chip 1.11 try
|
472 mike 1.2 {
473 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
474 myself->reference_tsd("work func");
475 myself->dereference_tsd();
476 parm = myself->reference_tsd("work parm");
477 myself->dereference_tsd();
|
478 mday 1.13 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
479 myself->dereference_tsd();
480
|
481 mike 1.2 }
|
482 mike 1.6 catch(IPCException &)
|
483 mike 1.2 {
|
484 kumpf 1.14 PEG_METHOD_EXIT();
|
485 mday 1.47 return(0);
|
486 mike 1.2 }
|
487 chip 1.11
|
488 mike 1.2 if(_work == 0)
|
489 kumpf 1.14 {
490 PEG_METHOD_EXIT();
|
491 mike 1.2 throw NullPointer();
|
492 kumpf 1.14 }
|
493 kumpf 1.24
494 if(_work ==
495 (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
496 {
|
497 mday 1.23 _work(parm);
|
498 kumpf 1.24 }
499
|
500 mike 1.2 gettimeofday(deadlock_timer, NULL);
|
501 mday 1.20 try
502 {
|
503 mday 1.47 {
504 auto_mutex(&(pool->_monitor));
505 if(pool->_dying.value())
506 {
507 break;
508 }
509 }
|
510 mday 1.20 _work(parm);
511 }
512 catch(...)
513 {
|
514 mday 1.47 return((PEGASUS_THREAD_RETURN)0);
|
515 mday 1.20 }
|
516 chuck 1.37
|
517 mday 1.47
|
518 mday 1.13
|
519 chip 1.11 // put myself back onto the available list
520 try
|
521 mike 1.2 {
|
522 mday 1.47 auto_mutex(&(pool->_monitor));
523 if(pool->_dying.value() == 0)
524 {
525 gettimeofday(deadlock_timer, NULL);
526 if( blocking_sem != 0 )
527 blocking_sem->signal();
528
529 pool->_running.remove((void *)myself);
530 pool->_pool.insert_first(myself);
531 }
532 else
533 {
534 PEG_METHOD_EXIT();
535 return((PEGASUS_THREAD_RETURN)0);
536 }
|
537 mike 1.2 }
|
538 mike 1.6 catch(IPCException &)
|
539 mike 1.2 {
|
540 kumpf 1.14 PEG_METHOD_EXIT();
|
541 mday 1.47 return((PEGASUS_THREAD_RETURN)0);
|
542 mike 1.2 }
|
543 mday 1.51 catch(...)
544 {
545 return((PEGASUS_THREAD_RETURN)0);
546 }
547
|
548 mike 1.2 }
|
549 s.hills 1.49
550 // TODO: Why is this needed? Why not just continue?
|
551 mike 1.2 // wait to be awakend by the thread pool destructor
|
552 mday 1.50 //sleep_sem->wait();
|
553 s.hills 1.49
|
554 mike 1.2 myself->test_cancel();
|
555 kumpf 1.14
556 PEG_METHOD_EXIT();
|
557 mike 1.2 myself->exit_self(0);
558 return((PEGASUS_THREAD_RETURN)0);
559 }
560
561 void ThreadPool::allocate_and_awaken(void *parm,
562 PEGASUS_THREAD_RETURN \
|
563 mday 1.13 (PEGASUS_THREAD_CDECL *work)(void *),
564 Semaphore *blocking)
565
|
566 mike 1.2 throw(IPCException)
567 {
|
568 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
|
569 mike 1.2 struct timeval start;
570 gettimeofday(&start, NULL);
|
571 mday 1.47 Thread *th = 0;
572
573 try
574 {
575 auto_mutex(&(this->_monitor));
576 if(_dying.value())
577 {
578 return;
579 }
580 th = _pool.remove_first();
581 }
582 catch(...)
583 {
584 return;
585
586 }
587
|
588 mday 1.12
|
589 mday 1.7 // wait for the right interval and try again
|
590 mday 1.47 while (th == 0 && _dying.value() < 1)
|
591 mike 1.2 {
|
592 mday 1.47 // will throw an IPCException&
|
593 mday 1.12 _check_deadlock(&start) ;
594
|
595 mday 1.21 if(_max_threads == 0 || _current_threads < _max_threads)
|
596 mday 1.35 {
597 th = _init_thread();
598 continue;
599 }
600 pegasus_yield();
|
601 mday 1.47 try
602 {
603 auto_mutex(&(this->_monitor));
604 if(_dying.value())
605 {
606 return;
607 }
608 th = _pool.remove_first();
609 }
610 catch(...)
611 {
612 return ;
613 }
|
614 mday 1.7 }
|
615 chip 1.11
|
616 mday 1.47 if(_dying.value() < 1)
|
617 mike 1.2 {
618 // initialize the thread data with the work function and parameters
|
619 kumpf 1.14 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
620 "Initializing thread with work function and parameters: parm = %p",
621 parm);
622
|
623 kumpf 1.15 th->delete_tsd("work func");
|
624 chip 1.11 th->put_tsd("work func", NULL,
|
625 mike 1.2 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
626 (void *)work);
|
627 kumpf 1.15 th->delete_tsd("work parm");
|
628 mike 1.2 th->put_tsd("work parm", NULL, sizeof(void *), parm);
|
629 kumpf 1.15 th->delete_tsd("blocking sem");
|
630 mday 1.13 if(blocking != 0 )
631 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
|
632 mday 1.47 try
633 {
634 auto_mutex(&(this->_monitor));
635 if(_dying.value())
636 {
637 th->cancel();
638 th->join();
639 delete th;
640 return;
641 }
642
643 // put the thread on the running list
644
|
645 mike 1.2
|
646 mday 1.47 _running.insert_first(th);
|
647 mike 1.2 // signal the thread's sleep semaphore to awaken it
|
648 mday 1.47 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
649
650 if(sleep_sem == 0)
651 {
652 th->dereference_tsd();
653 PEG_METHOD_EXIT();
654 throw NullPointer();
655 }
656 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
657 sleep_sem->signal();
658 th->dereference_tsd();
659 }
660 catch(...)
|
661 mike 1.2 {
|
662 mday 1.47 PEG_METHOD_EXIT();
663 return;
|
664 mike 1.2 }
|
665 mday 1.47
|
666 mike 1.2 }
667 else
|
668 mday 1.47 {
669 th->cancel();
670 th->join();
671 delete th;
672 }
673
|
674 kumpf 1.14 PEG_METHOD_EXIT();
|
675 mike 1.2 }
676
677 // caller is responsible for only calling this routine during slack periods
678 // but should call it at least once per _deadlock_detect with the running q
679 // and at least once per _deallocate_wait for the pool q
680
|
681 mday 1.12 Uint32 ThreadPool::kill_dead_threads(void)
|
682 mike 1.2 throw(IPCException)
683 {
684 struct timeval now;
685 gettimeofday(&now, NULL);
|
686 mday 1.12 Uint32 bodies = 0;
687
|
688 mike 1.2 // first go thread the dead q and clean it up as much as possible
|
689 mday 1.47 try
690 {
691 auto_mutex(&(this->_monitor));
692 if(_dying.value() )
693 {
694 return 0;
695 }
696
697 while(_dead.count() > 0 && _dying.value() == 0 )
698 {
699 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
700 Thread *dead = _dead.remove_first();
701
702 if(dead == 0)
703 throw NullPointer();
704 dead->join();
705 delete dead;
706 }
707 }
708 catch(...)
|
709 mike 1.2 {
710 }
|
711 mday 1.47
712
|
713 chip 1.11 DQueue<Thread> * map[2] =
|
714 mike 1.2 {
715 &_pool, &_running
716 };
|
717 chip 1.11
718
|
719 mike 1.2 DQueue<Thread> *q = 0;
720 int i = 0;
721 AtomicInt needed(0);
|
722 chip 1.11
|
723 kumpf 1.31 #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
724 // This change prevents the thread pool from killing "hung" threads.
725 // The definition of a "hung" thread is one that has been on the run queue
726 // for longer than the time interval set when the thread pool was created.
727 // Cancelling "hung" threads has proven to be problematic.
728
729 // With this change the thread pool will not cancel "hung" threads. This
730 // may prevent a crash depending upon the state of the "hung" thread. In
731 // the case that the thread is actually hung, this change causes the
732 // thread resources not to be reclaimed.
733
734 // Idle threads, those that have not executed a routine for a time
735 // interval, continue to be destroyed. This is normal and should not
736 // cause any problems.
737 for( ; i < 1; i++)
738 #else
|
739 mday 1.30 for( ; i < 2; i++)
|
740 kumpf 1.31 #endif
|
741 mday 1.47 {
742 auto_mutex(&(this->_monitor));
|
743 mday 1.21 q = map[i];
|
744 mike 1.2 if(q->count() > 0 )
745 {
|
746 chip 1.11 try
|
747 mike 1.2 {
|
748 mday 1.47 if(_dying.value())
749 {
750 return bodies;
751 }
752
|
753 mike 1.2 q->try_lock();
754 }
|
755 mday 1.18 catch(...)
|
756 mike 1.2 {
|
757 mday 1.18 return bodies;
|
758 mike 1.2 }
759
760 struct timeval dt = { 0, 0 };
761 struct timeval *dtp;
762 Thread *th = 0;
763 th = q->next(th);
764 while (th != 0 )
765 {
|
766 chip 1.11 try
|
767 mike 1.2 {
768 dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
769 }
|
770 mday 1.18 catch(...)
|
771 mike 1.2 {
|
772 kumpf 1.25 q->unlock();
|
773 mday 1.18 return bodies;
|
774 mike 1.2 }
|
775 chip 1.11
|
776 mike 1.2 if(dtp != 0)
777 {
778 memcpy(&dt, dtp, sizeof(struct timeval));
779 }
780 th->dereference_tsd();
781 struct timeval deadlock_timeout;
|
782 mday 1.18 Boolean too_long;
783 if( i == 0)
784 {
785 too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
786 }
787 else
788 {
|
789 mday 1.22 too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
|
790 mday 1.18 }
791
792 if( true == too_long)
|
793 mike 1.2 {
794 // if we are deallocating from the pool, escape if we are
|
795 chip 1.11 // down to the minimum thread count
|
796 mday 1.13 _current_threads--;
|
797 mday 1.18 if( _current_threads.value() < (Uint32)_min_threads )
|
798 mike 1.2 {
|
799 mday 1.13 if( i == 0)
|
800 mike 1.2 {
|
801 mday 1.13 _current_threads++;
|
802 mike 1.2 th = q->next(th);
803 continue;
804 }
|
805 chip 1.11 else
|
806 mike 1.2 {
|
807 chip 1.11 // we are killing a hung thread and we will drop below the
|
808 mike 1.2 // minimum. create another thread to make up for the one
809 // we are about to kill
810 needed++;
811 }
812 }
|
813 chip 1.11
|
814 mike 1.2 th = q->remove_no_lock((void *)th);
|
815 chip 1.11
|
816 mike 1.2 if(th != 0)
817 {
|
818 mday 1.30 if( i == 0 )
|
819 mike 1.2 {
|
820 mday 1.30 th->delete_tsd("work func");
821 th->put_tsd("work func", NULL,
822 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
823 (void *)&_undertaker);
824 th->delete_tsd("work parm");
825 th->put_tsd("work parm", NULL, sizeof(void *), th);
826
827 // signal the thread's sleep semaphore to awaken it
828 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
829
830 if(sleep_sem == 0)
831 {
832 q->unlock();
833 th->dereference_tsd();
834 throw NullPointer();
835 }
836
837 bodies++;
|
838 mike 1.2 th->dereference_tsd();
|
839 mday 1.30 _dead.insert_first(th);
840 sleep_sem->signal();
841 th = 0;
842 }
843 else
844 {
845 // deadlocked threads
|
846 mday 1.34 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Killing a deadlocked thread");
|
847 mday 1.30 th->cancel();
848 delete th;
|
849 mike 1.2 }
850 }
851 }
852 th = q->next(th);
|
853 mday 1.20 pegasus_sleep(1);
|
854 mike 1.2 }
855 q->unlock();
856 }
857 }
|
858 mday 1.47 if(_dying.value() )
859 return bodies;
860
861 while (needed.value() > 0) {
862 _link_pool(_init_thread());
863 needed--;
864 pegasus_sleep(0);
865 }
|
866 mday 1.18 return bodies;
|
867 mike 1.2 }
868
|
869 mday 1.12
|
870 mike 1.2 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
871 {
|
872 mday 1.22 // never time out if the interval is zero
873 if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
874 return false;
875
|
876 mday 1.36 struct timeval now, finish, remaining ;
|
877 mday 1.13 Uint32 usec;
|
878 mday 1.33 pegasus_gettimeofday(&now);
|
879 mday 1.36 /* remove valgrind error */
880 pegasus_gettimeofday(&remaining);
881
|
882 mday 1.13
883 finish.tv_sec = start->tv_sec + interval->tv_sec;
884 usec = start->tv_usec + interval->tv_usec;
885 finish.tv_sec += (usec / 1000000);
886 usec %= 1000000;
887 finish.tv_usec = usec;
888
889 if ( timeval_subtract(&remaining, &finish, &now) )
|
890 mike 1.2 return true;
891 else
892 return false;
893 }
894
895 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
896 {
|
897 mday 1.30 exit_thread((PEGASUS_THREAD_RETURN)1);
898 return (PEGASUS_THREAD_RETURN)1;
|
899 mike 1.2 }
|
900 mday 1.19
901
902 void ThreadPool::_sleep_sem_del(void *p)
903 {
904 if(p != 0)
905 {
906 delete (Semaphore *)p;
907 }
908 }
909
910 void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
911 {
912 if (true == check_time(start, &_deadlock_detect))
913 throw Deadlock(pegasus_thread_self());
914 return;
915 }
916
917
918 Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
919 {
920 return(check_time(start, &_deadlock_detect));
921 mday 1.19 }
922
923 Boolean ThreadPool::_check_dealloc(struct timeval *start)
924 {
925 return(check_time(start, &_deallocate_wait));
926 }
927
928 Thread *ThreadPool::_init_thread(void) throw(IPCException)
929 {
930 Thread *th = (Thread *) new Thread(_loop, this, false);
931 // allocate a sleep semaphore and pass it in the thread context
932 // initial count is zero, loop function will sleep until
933 // we signal the semaphore
934 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
935 th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
936
937 struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
|
938 mday 1.35 pegasus_gettimeofday(dldt);
939
|
940 mday 1.19 th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
941 // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
|
942 chuck 1.37
|
943 mday 1.19 th->run();
944 _current_threads++;
945 pegasus_yield();
946
947 return th;
948 }
949
950 void ThreadPool::_link_pool(Thread *th) throw(IPCException)
951 {
952 if(th == 0)
953 throw NullPointer();
|
954 mday 1.47 try
955 {
956
957 auto_mutex(&(this->_monitor));
958 if(_dying.value())
959 {
960 th->cancel();
961 th->join();
962 delete th;
963 }
964
965 _pool.insert_first(th);
966
967 }
968 catch(...)
969 {
970 }
|
971 mday 1.19 }
|
972 mike 1.2
973
974 PEGASUS_NAMESPACE_END
975
|