1 mike 1.2 //%/////////////////////////////////////////////////////////////////////////////
2 //
|
3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
|
5 mike 1.2 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
7 chip 1.11 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
10 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
|
12 kumpf 1.17 //
|
13 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
14 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
16 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
19 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 //==============================================================================
23 //
24 // Author: Mike Day (mdday@us.ibm.com)
25 //
26 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
27 chip 1.11 // added nsk platform support
|
28 mike 1.2 //
29 //%/////////////////////////////////////////////////////////////////////////////
30
31 #include "Thread.h"
32 #include <Pegasus/Common/IPC.h>
|
33 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
34 mike 1.2
35 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
36 chip 1.11 # include "ThreadWindows.cpp"
|
37 mike 1.2 #elif defined(PEGASUS_OS_TYPE_UNIX)
38 # include "ThreadUnix.cpp"
39 #elif defined(PEGASUS_OS_TYPE_NSK)
40 # include "ThreadNsk.cpp"
41 #else
42 # error "Unsupported platform"
43 #endif
44
45 PEGASUS_NAMESPACE_BEGIN
46
|
47 mday 1.42
|
48 chip 1.11 void thread_data::default_delete(void * data)
49 {
|
50 mike 1.2 if( data != NULL)
|
51 chip 1.11 ::operator delete(data);
|
52 mike 1.2 }
53
|
54 chuck 1.43 // l10n start
55 void language_delete(void * data)
56 {
57 if( data != NULL)
58 {
59 AcceptLanguages * al = static_cast<AcceptLanguages *>(data);
|
60 chuck 1.44 delete al;
|
61 chuck 1.43 }
62 }
63 // l10n end
64
|
65 mike 1.2 Boolean Thread::_signals_blocked = false;
|
66 chuck 1.37 // l10n
|
67 mday 1.48 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = -1;
|
68 chuck 1.37 Boolean Thread::_key_initialized = false;
|
69 chuck 1.41 Boolean Thread::_key_error = false;
|
70 chuck 1.37
|
71 mike 1.2
72 // for non-native implementations
|
73 chip 1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
|
74 mike 1.2 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
75 {
76 cleanup_handler *cu = new cleanup_handler(routine, parm);
|
77 chip 1.11 try
78 {
79 _cleanup.insert_first(cu);
80 }
81 catch(IPCException&)
|
82 mike 1.2 {
83 delete cu;
|
84 chip 1.11 throw;
|
85 mike 1.2 }
86 return;
87 }
|
88 chip 1.11
|
89 mike 1.2 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
90 {
91 cleanup_handler *cu ;
|
92 chip 1.11 try
93 {
|
94 mike 1.2 cu = _cleanup.remove_first() ;
95 }
|
96 chip 1.11 catch(IPCException&)
|
97 mike 1.2 {
|
98 chip 1.11 PEGASUS_ASSERT(0);
|
99 mike 1.2 }
100 if(execute == true)
101 cu->execute();
102 delete cu;
103 }
|
104 chip 1.11
|
105 mike 1.2 #endif
106
107
|
108 kumpf 1.8 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
|
109 mike 1.2
110
|
111 chip 1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
112 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
113 {
114 // execute the cleanup stack and then return
|
115 mike 1.2 while( _cleanup.count() )
116 {
|
117 chip 1.11 try
118 {
119 cleanup_pop(true);
120 }
121 catch(IPCException&)
122 {
123 PEGASUS_ASSERT(0);
124 break;
|
125 mike 1.2 }
126 }
127 _exit_code = exit_code;
128 exit_thread(exit_code);
|
129 mday 1.4 _handle.thid = 0;
|
130 mike 1.2 }
131
132
133 #endif
134
|
135 chuck 1.37 // l10n start
|
136 chuck 1.39 Sint8 Thread::initializeKey()
137 {
138 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
139 if (!Thread::_key_initialized)
140 {
|
141 chuck 1.41 if (Thread::_key_error)
142 {
143 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
144 "Thread: ERROR - thread key error");
145 return -1;
146 }
147
|
148 chuck 1.39 if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
149 {
150 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
151 "Thread: able to create a thread key");
152 Thread::_key_initialized = true;
153 }
154 else
155 {
156 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157 "Thread: ERROR - unable to create a thread key");
|
158 chuck 1.41 Thread::_key_error = true;
|
159 chuck 1.39 return -1;
160 }
161 }
162
163 PEG_METHOD_EXIT();
164 return 0;
165 }
166
|
167 chuck 1.37 Thread * Thread::getCurrent()
168 {
|
169 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
|
170 chuck 1.40 if (Thread::initializeKey() != 0)
|
171 chuck 1.39 {
172 return NULL;
173 }
|
174 chuck 1.38 PEG_METHOD_EXIT();
|
175 chuck 1.39 return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
176 }
177
178 void Thread::setCurrent(Thread * thrd)
179 {
180 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
181 if (Thread::initializeKey() == 0)
182 {
183 if (pegasus_set_thread_specific(Thread::_platform_thread_key,
184 (void *) thrd) == 0)
185 {
186 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
187 "Successful set Thread * into thread specific storage");
188 }
189 else
190 {
191 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
192 "ERROR: got error setting Thread * into thread specific storage");
193 }
194 }
195 PEG_METHOD_EXIT();
|
196 chuck 1.37 }
197
198 AcceptLanguages * Thread::getLanguages()
199 {
|
200 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
|
201 chuck 1.37
202 Thread * curThrd = Thread::getCurrent();
203 if (curThrd == NULL)
204 return NULL;
205 AcceptLanguages * acceptLangs =
206 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
207 curThrd->dereference_tsd();
208 PEG_METHOD_EXIT();
209 return acceptLangs;
210 }
211
212 void Thread::setLanguages(AcceptLanguages *langs) //l10n
213 {
|
214 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
|
215 chuck 1.37
216 Thread * currentThrd = Thread::getCurrent();
217 if (currentThrd != NULL)
218 {
219 // deletes the old tsd and creates a new one
220 currentThrd->put_tsd("acceptLanguages",
|
221 chuck 1.43 language_delete,
|
222 chuck 1.37 sizeof(AcceptLanguages *),
223 langs);
224 }
225
226 PEG_METHOD_EXIT();
227 }
228
229 void Thread::clearLanguages() //l10n
230 {
|
231 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
|
232 chuck 1.37
233 Thread * currentThrd = Thread::getCurrent();
234 if (currentThrd != NULL)
235 {
236 // deletes the old tsd
237 currentThrd->delete_tsd("acceptLanguages");
238 }
239
240 PEG_METHOD_EXIT();
241 }
242 // l10n end
243
|
244 mday 1.52
245
246 // two special synchronization classes for ThreadPool
247 //
248
249 class timed_mutex
250 {
251 public:
252 timed_mutex(Mutex* mut, int msec)
253 :_mut(mut)
254 {
255 _mut->timed_lock(msec, pegasus_thread_self());
256 }
257 ~timed_mutex(void)
258 {
259 _mut->unlock();
260 }
261 Mutex* _mut;
262 };
263
264
265 mday 1.52 class try_mutex
266 {
267 public:
268 try_mutex(Mutex* mut)
269 :_mut(mut)
270 {
271 _mut->try_lock(pegasus_thread_self());
272 }
273 ~try_mutex(void)
274 {
275 _mut->unlock();
276 }
277
278 Mutex* _mut;
279 };
280
281
|
282 mday 1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
283
284
285 void ThreadPool::kill_idle_threads(void)
286 {
287 static struct timeval now, last = {0, 0};
288
289 pegasus_gettimeofday(&now);
290 if(now.tv_sec - last.tv_sec > 5)
291 {
292 _pools.lock();
293 ThreadPool *p = _pools.next(0);
294 while(p != 0)
295 {
296 try
297 {
298 p->kill_dead_threads();
299 }
300 catch(...)
301 {
302 }
303 mday 1.20 p = _pools.next(p);
304 }
305 _pools.unlock();
306 pegasus_gettimeofday(&last);
307 }
308 }
309
310
|
311 mike 1.2 ThreadPool::ThreadPool(Sint16 initial_size,
|
312 kumpf 1.8 const Sint8 *key,
|
313 mike 1.2 Sint16 min,
314 Sint16 max,
315 struct timeval & alloc_wait,
|
316 chip 1.11 struct timeval & dealloc_wait,
|
317 mike 1.2 struct timeval & deadlock_detect)
318 : _max_threads(max), _min_threads(min),
|
319 mday 1.12 _current_threads(0),
320 _pool(true), _running(true),
|
321 mike 1.2 _dead(true), _dying(0)
322 {
323 _allocate_wait.tv_sec = alloc_wait.tv_sec;
324 _allocate_wait.tv_usec = alloc_wait.tv_usec;
|
325 chip 1.11 _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
|
326 mike 1.2 _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
327 _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
328 _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
329 memset(_key, 0x00, 17);
330 if(key != 0)
331 strncpy(_key, key, 16);
|
332 mday 1.21 if(_max_threads > 0 && _max_threads < initial_size)
|
333 mike 1.2 _max_threads = initial_size;
334 if(_min_threads > initial_size)
335 _min_threads = initial_size;
|
336 chip 1.11
|
337 mike 1.2 int i;
338 for(i = 0; i < initial_size; i++)
339 {
340 _link_pool(_init_thread());
341 }
|
342 mday 1.20 _pools.insert_last(this);
|
343 mike 1.2 }
344
|
345 chip 1.11
|
346 mday 1.52 // Note: <<< Fri Oct 17 09:19:03 2003 mdd >>>
347 // the pegasus_yield() calls that preceed each th->join() are to
348 // give a thread on the running list a chance to reach a cancellation
349 // point before the join
|
350 mike 1.2
351 ThreadPool::~ThreadPool(void)
352 {
|
353 mday 1.35 try
|
354 mday 1.47 {
|
355 mday 1.52 // set the dying flag so all thread know the destructor has been entered
|
356 mday 1.47 {
357 auto_mutex(&(this->_monitor));
358 _dying++;
359 }
|
360 mday 1.52 // remove from the global pools list
|
361 mday 1.35 _pools.remove(this);
|
362 mday 1.52
363 // start with idle threads.
|
364 mday 1.35 Thread *th = 0;
365 th = _pool.remove_first();
|
366 mday 1.52 Semaphore* sleep_sem;
367
|
368 mday 1.35 while(th != 0)
|
369 mike 1.2 {
|
370 mday 1.52 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
371 mday 1.35 if(sleep_sem == 0)
372 {
373 th->dereference_tsd();
374 throw NullPointer();
375 }
|
376 mday 1.52
|
377 s.hills 1.49 // Signal to get the thread out of the work loop.
378 sleep_sem->signal();
|
379 mday 1.52
|
380 s.hills 1.49 // Signal to get the thread past the end. See the comment
381 // "wait to be awakend by the thread pool destructor"
382 // Note: the current implementation of Thread for Windows
383 // does not implement "pthread" cancelation points so this
384 // is needed.
|
385 mday 1.35 sleep_sem->signal();
|
386 mike 1.2 th->dereference_tsd();
|
387 mday 1.35 th->cancel();
388 th->join();
389 delete th;
390 th = _pool.remove_first();
|
391 mike 1.2 }
|
392 mday 1.47 th = _dead.remove_first();
|
393 mday 1.35 while(th != 0)
394 {
|
395 mday 1.52 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
396
|
397 mday 1.47 if(sleep_sem == 0)
398 {
399 th->dereference_tsd();
400 throw NullPointer();
401 }
402
|
403 mday 1.52 // signal the thread's sleep semaphore
404 sleep_sem->signal();
|
405 mday 1.47 sleep_sem->signal();
|
406 mday 1.52 th->dereference_tsd();
|
407 mday 1.35 th->cancel();
408 th->join();
409 delete th;
|
410 mday 1.47 th = _dead.remove_first();
|
411 mday 1.35 }
|
412 mday 1.52
|
413 mday 1.47 {
|
414 mday 1.52 th = _running.remove_first();
415 while(th != 0)
416 {
417 // signal the thread's sleep semaphore
418
419 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
420 if(sleep_sem == 0 )
421 {
422 th->dereference_tsd();
423 throw NullPointer();
424 }
425
426 sleep_sem->signal();
427 sleep_sem->signal();
|
428 mday 1.47 th->dereference_tsd();
|
429 mday 1.52 th->cancel();
430 pegasus_yield();
431
432 th->join();
433 delete th;
434 th = _running.remove_first();
|
435 mday 1.47 }
|
436 mday 1.35 }
|
437 mday 1.47
|
438 mike 1.2 }
|
439 mday 1.52
|
440 mday 1.35 catch(...)
|
441 mike 1.2 {
442 }
443 }
444
|
445 chip 1.11 // make this static to the class
|
446 mike 1.2 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
447 {
|
448 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
449
|
450 mike 1.2 Thread *myself = (Thread *)parm;
451 if(myself == 0)
|
452 kumpf 1.14 {
453 PEG_METHOD_EXIT();
|
454 mike 1.2 throw NullPointer();
|
455 kumpf 1.14 }
|
456 chuck 1.37
457 // l10n
458 // Set myself into thread specific storage
|
459 chuck 1.38 // This will allow code to get its own Thread
|
460 chuck 1.39 Thread::setCurrent(myself);
461
|
462 mike 1.2 ThreadPool *pool = (ThreadPool *)myself->get_parm();
|
463 kumpf 1.14 if(pool == 0 )
464 {
465 PEG_METHOD_EXIT();
|
466 mike 1.2 throw NullPointer();
|
467 kumpf 1.14 }
|
468 mday 1.52 if(pool->_dying.value())
469 {
470 PEG_METHOD_EXIT();
471 return((PEGASUS_THREAD_RETURN)0);
472 }
473
|
474 mike 1.5 Semaphore *sleep_sem = 0;
|
475 mday 1.13 Semaphore *blocking_sem = 0;
476
|
477 mike 1.5 struct timeval *deadlock_timer = 0;
|
478 mday 1.47
|
479 chip 1.11 try
|
480 mike 1.2 {
481 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
482 myself->dereference_tsd();
483 deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
|
484 mday 1.22 myself->dereference_tsd();
|
485 mike 1.2 }
|
486 mday 1.52
|
487 mday 1.30 catch(...)
488 {
489 PEG_METHOD_EXIT();
|
490 mday 1.52 return((PEGASUS_THREAD_RETURN)0);
|
491 mday 1.30 }
492
|
493 mike 1.2 if(sleep_sem == 0 || deadlock_timer == 0)
|
494 kumpf 1.14 {
495 PEG_METHOD_EXIT();
|
496 mday 1.52 return((PEGASUS_THREAD_RETURN)0);
|
497 kumpf 1.14 }
|
498 mike 1.2
|
499 mday 1.47 while(pool->_dying.value() < 1)
|
500 mike 1.2 {
|
501 mday 1.52 try
502 {
503 sleep_sem->wait();
504 }
505 catch(IPCException& )
506 {
507 PEG_METHOD_EXIT();
508 return((PEGASUS_THREAD_RETURN)0);
509 }
510
|
511 mike 1.2 // when we awaken we reside on the running queue, not the pool queue
|
512 mday 1.52 if(pool->_dying.value())
513 {
514 PEG_METHOD_EXIT();
515 return((PEGASUS_THREAD_RETURN)0);
516 }
517
|
518 mday 1.47
|
519 mike 1.5 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
520 void *parm = 0;
|
521 mike 1.2
|
522 chip 1.11 try
|
523 mike 1.2 {
524 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
525 myself->reference_tsd("work func");
526 myself->dereference_tsd();
527 parm = myself->reference_tsd("work parm");
528 myself->dereference_tsd();
|
529 mday 1.13 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
530 myself->dereference_tsd();
531
|
532 mike 1.2 }
|
533 mike 1.6 catch(IPCException &)
|
534 mike 1.2 {
|
535 kumpf 1.14 PEG_METHOD_EXIT();
|
536 mday 1.52 return((PEGASUS_THREAD_RETURN)0);
|
537 mike 1.2 }
|
538 mday 1.52
|
539 mike 1.2 if(_work == 0)
|
540 kumpf 1.14 {
541 PEG_METHOD_EXIT();
|
542 mike 1.2 throw NullPointer();
|
543 kumpf 1.14 }
|
544 kumpf 1.24
545 if(_work ==
546 (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
547 {
|
548 mday 1.23 _work(parm);
|
549 kumpf 1.24 }
550
|
551 mike 1.2 gettimeofday(deadlock_timer, NULL);
|
552 mday 1.20 try
553 {
|
554 mday 1.47 {
|
555 mday 1.52 timed_mutex(&(pool->_monitor), 1000);
|
556 mday 1.47 if(pool->_dying.value())
557 {
|
558 mday 1.52 _undertaker(parm);
|
559 mday 1.47 }
560 }
|
561 mday 1.20 _work(parm);
562 }
563 catch(...)
564 {
|
565 mday 1.47 return((PEGASUS_THREAD_RETURN)0);
|
566 mday 1.20 }
|
567 chuck 1.37
|
568 chip 1.11 // put myself back onto the available list
569 try
|
570 mike 1.2 {
|
571 mday 1.52 timed_mutex(&(pool->_monitor), 1000);
|
572 mday 1.47 if(pool->_dying.value() == 0)
573 {
574 gettimeofday(deadlock_timer, NULL);
575 if( blocking_sem != 0 )
576 blocking_sem->signal();
577
|
578 s.hills 1.53 // If we are not on _running then ~ThreadPool has removed
579 // us and now "owns" our pointer.
580 if( pool->_running.remove((void *)myself) != 0 )
581 pool->_pool.insert_first(myself);
|
582 mday 1.47 }
583 else
584 {
585 PEG_METHOD_EXIT();
586 return((PEGASUS_THREAD_RETURN)0);
587 }
|
588 mike 1.2 }
|
589 mday 1.52 catch(...)
|
590 mike 1.2 {
|
591 kumpf 1.14 PEG_METHOD_EXIT();
|
592 mday 1.47 return((PEGASUS_THREAD_RETURN)0);
|
593 mike 1.2 }
|
594 mday 1.51
|
595 mike 1.2 }
|
596 s.hills 1.49
597 // TODO: Why is this needed? Why not just continue?
|
598 mike 1.2 // wait to be awakend by the thread pool destructor
|
599 mday 1.50 //sleep_sem->wait();
|
600 s.hills 1.49
|
601 mike 1.2 myself->test_cancel();
|
602 kumpf 1.14
603 PEG_METHOD_EXIT();
|
604 mike 1.2 return((PEGASUS_THREAD_RETURN)0);
605 }
606
607 void ThreadPool::allocate_and_awaken(void *parm,
608 PEGASUS_THREAD_RETURN \
|
609 mday 1.13 (PEGASUS_THREAD_CDECL *work)(void *),
610 Semaphore *blocking)
611
|
612 mike 1.2 throw(IPCException)
613 {
|
614 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
|
615 mike 1.2 struct timeval start;
616 gettimeofday(&start, NULL);
|
617 mday 1.47 Thread *th = 0;
618
619 try
620 {
|
621 mday 1.52 timed_mutex(&(this->_monitor), 1000);
|
622 mday 1.47 if(_dying.value())
623 {
624 return;
625 }
626 th = _pool.remove_first();
627 }
628 catch(...)
629 {
630 return;
631
632 }
633
|
634 mday 1.12
|
635 mday 1.7 // wait for the right interval and try again
|
636 mday 1.47 while (th == 0 && _dying.value() < 1)
|
637 mike 1.2 {
|
638 mday 1.47 // will throw an IPCException&
|
639 mday 1.12 _check_deadlock(&start) ;
640
|
641 mday 1.21 if(_max_threads == 0 || _current_threads < _max_threads)
|
642 mday 1.35 {
643 th = _init_thread();
644 continue;
645 }
646 pegasus_yield();
|
647 mday 1.47 try
648 {
|
649 mday 1.52 timed_mutex(&(this->_monitor), 1000);
|
650 mday 1.47 if(_dying.value())
651 {
652 return;
653 }
654 th = _pool.remove_first();
655 }
656 catch(...)
657 {
658 return ;
659 }
|
660 mday 1.7 }
|
661 chip 1.11
|
662 mday 1.47 if(_dying.value() < 1)
|
663 mike 1.2 {
664 // initialize the thread data with the work function and parameters
|
665 kumpf 1.14 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
666 "Initializing thread with work function and parameters: parm = %p",
667 parm);
668
|
669 kumpf 1.15 th->delete_tsd("work func");
|
670 chip 1.11 th->put_tsd("work func", NULL,
|
671 mike 1.2 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
672 (void *)work);
|
673 kumpf 1.15 th->delete_tsd("work parm");
|
674 mike 1.2 th->put_tsd("work parm", NULL, sizeof(void *), parm);
|
675 kumpf 1.15 th->delete_tsd("blocking sem");
|
676 mday 1.13 if(blocking != 0 )
677 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
|
678 mday 1.47 try
679 {
|
680 mday 1.52 timed_mutex(&(this->_monitor), 1000);
|
681 mday 1.47 if(_dying.value())
682 {
683 th->cancel();
684 th->join();
685 delete th;
686 return;
687 }
688
689 // put the thread on the running list
690
|
691 mike 1.2
|
692 mday 1.47 _running.insert_first(th);
|
693 mike 1.2 // signal the thread's sleep semaphore to awaken it
|
694 mday 1.47 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
695
696 if(sleep_sem == 0)
697 {
698 th->dereference_tsd();
699 PEG_METHOD_EXIT();
700 throw NullPointer();
701 }
702 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
703 sleep_sem->signal();
704 th->dereference_tsd();
705 }
706 catch(...)
|
707 mike 1.2 {
|
708 mday 1.47 PEG_METHOD_EXIT();
709 return;
|
710 mike 1.2 }
|
711 mday 1.47
|
712 mike 1.2 }
713 else
|
714 mday 1.47 {
715 th->cancel();
716 th->join();
717 delete th;
718 }
719
|
720 kumpf 1.14 PEG_METHOD_EXIT();
|
721 mike 1.2 }
722
723 // caller is responsible for only calling this routine during slack periods
724 // but should call it at least once per _deadlock_detect with the running q
725 // and at least once per _deallocate_wait for the pool q
726
|
727 mday 1.12 Uint32 ThreadPool::kill_dead_threads(void)
|
728 mike 1.2 throw(IPCException)
729 {
730 struct timeval now;
731 gettimeofday(&now, NULL);
|
732 mday 1.12 Uint32 bodies = 0;
733
|
734 mike 1.2 // first go thread the dead q and clean it up as much as possible
|
735 mday 1.47 try
736 {
|
737 mday 1.52 timed_mutex(&(this->_monitor), 1000);
|
738 mday 1.47 if(_dying.value() )
739 {
740 return 0;
741 }
742
743 while(_dead.count() > 0 && _dying.value() == 0 )
744 {
745 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
746 Thread *dead = _dead.remove_first();
747
748 if(dead == 0)
749 throw NullPointer();
750 dead->join();
751 delete dead;
752 }
753 }
754 catch(...)
|
755 mike 1.2 {
756 }
|
757 mday 1.47
758
|
759 chip 1.11 DQueue<Thread> * map[2] =
|
760 mike 1.2 {
761 &_pool, &_running
762 };
|
763 chip 1.11
764
|
765 mike 1.2 DQueue<Thread> *q = 0;
766 int i = 0;
767 AtomicInt needed(0);
|
768 chip 1.11
|
769 kumpf 1.31 #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
770 // This change prevents the thread pool from killing "hung" threads.
771 // The definition of a "hung" thread is one that has been on the run queue
772 // for longer than the time interval set when the thread pool was created.
773 // Cancelling "hung" threads has proven to be problematic.
774
775 // With this change the thread pool will not cancel "hung" threads. This
776 // may prevent a crash depending upon the state of the "hung" thread. In
777 // the case that the thread is actually hung, this change causes the
778 // thread resources not to be reclaimed.
779
780 // Idle threads, those that have not executed a routine for a time
781 // interval, continue to be destroyed. This is normal and should not
782 // cause any problems.
783 for( ; i < 1; i++)
784 #else
|
785 mday 1.30 for( ; i < 2; i++)
|
786 kumpf 1.31 #endif
|
787 mday 1.47 {
|
788 mday 1.52 try
789 {
790 try_mutex(&(this->_monitor));
791 }
792 catch(IPCException&)
793 {
794 return bodies;
795 }
796
|
797 mday 1.21 q = map[i];
|
798 mike 1.2 if(q->count() > 0 )
799 {
|
800 chip 1.11 try
|
801 mike 1.2 {
|
802 mday 1.47 if(_dying.value())
803 {
804 return bodies;
805 }
806
|
807 mike 1.2 q->try_lock();
808 }
|
809 mday 1.18 catch(...)
|
810 mike 1.2 {
|
811 mday 1.18 return bodies;
|
812 mike 1.2 }
813
814 struct timeval dt = { 0, 0 };
815 struct timeval *dtp;
816 Thread *th = 0;
817 th = q->next(th);
818 while (th != 0 )
819 {
|
820 chip 1.11 try
|
821 mike 1.2 {
822 dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
823 }
|
824 mday 1.18 catch(...)
|
825 mike 1.2 {
|
826 kumpf 1.25 q->unlock();
|
827 mday 1.18 return bodies;
|
828 mike 1.2 }
|
829 chip 1.11
|
830 mike 1.2 if(dtp != 0)
831 {
832 memcpy(&dt, dtp, sizeof(struct timeval));
833 }
834 th->dereference_tsd();
835 struct timeval deadlock_timeout;
|
836 mday 1.18 Boolean too_long;
837 if( i == 0)
838 {
839 too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
840 }
841 else
842 {
|
843 mday 1.22 too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
|
844 mday 1.18 }
845
846 if( true == too_long)
|
847 mike 1.2 {
848 // if we are deallocating from the pool, escape if we are
|
849 chip 1.11 // down to the minimum thread count
|
850 mday 1.13 _current_threads--;
|
851 mday 1.18 if( _current_threads.value() < (Uint32)_min_threads )
|
852 mike 1.2 {
|
853 mday 1.13 if( i == 0)
|
854 mike 1.2 {
|
855 mday 1.13 _current_threads++;
|
856 mike 1.2 th = q->next(th);
857 continue;
858 }
|
859 chip 1.11 else
|
860 mike 1.2 {
|
861 chip 1.11 // we are killing a hung thread and we will drop below the
|
862 mike 1.2 // minimum. create another thread to make up for the one
863 // we are about to kill
864 needed++;
865 }
866 }
|
867 chip 1.11
|
868 mike 1.2 th = q->remove_no_lock((void *)th);
|
869 chip 1.11
|
870 mike 1.2 if(th != 0)
871 {
|
872 mday 1.30 if( i == 0 )
|
873 mike 1.2 {
|
874 mday 1.30 th->delete_tsd("work func");
875 th->put_tsd("work func", NULL,
876 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
877 (void *)&_undertaker);
878 th->delete_tsd("work parm");
879 th->put_tsd("work parm", NULL, sizeof(void *), th);
880
881 // signal the thread's sleep semaphore to awaken it
882 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
883
884 if(sleep_sem == 0)
885 {
886 q->unlock();
887 th->dereference_tsd();
888 throw NullPointer();
889 }
890
891 bodies++;
|
892 mike 1.2 th->dereference_tsd();
|
893 mday 1.30 _dead.insert_first(th);
894 sleep_sem->signal();
895 th = 0;
896 }
897 else
898 {
899 // deadlocked threads
|
900 mday 1.34 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Killing a deadlocked thread");
|
901 mday 1.30 th->cancel();
902 delete th;
|
903 mike 1.2 }
904 }
905 }
906 th = q->next(th);
|
907 mday 1.52 pegasus_yield();
|
908 mike 1.2 }
909 q->unlock();
910 }
911 }
|
912 mday 1.47 if(_dying.value() )
913 return bodies;
914
915 while (needed.value() > 0) {
916 _link_pool(_init_thread());
917 needed--;
918 pegasus_sleep(0);
919 }
|
920 mday 1.18 return bodies;
|
921 mike 1.2 }
922
|
923 mday 1.12
|
924 mike 1.2 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
925 {
|
926 mday 1.22 // never time out if the interval is zero
927 if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
928 return false;
929
|
930 mday 1.36 struct timeval now, finish, remaining ;
|
931 mday 1.13 Uint32 usec;
|
932 mday 1.33 pegasus_gettimeofday(&now);
|
933 mday 1.36 /* remove valgrind error */
934 pegasus_gettimeofday(&remaining);
935
|
936 mday 1.13
937 finish.tv_sec = start->tv_sec + interval->tv_sec;
938 usec = start->tv_usec + interval->tv_usec;
939 finish.tv_sec += (usec / 1000000);
940 usec %= 1000000;
941 finish.tv_usec = usec;
942
943 if ( timeval_subtract(&remaining, &finish, &now) )
|
944 mike 1.2 return true;
945 else
946 return false;
947 }
948
949 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
950 {
|
951 mday 1.30 exit_thread((PEGASUS_THREAD_RETURN)1);
952 return (PEGASUS_THREAD_RETURN)1;
|
953 mike 1.2 }
|
954 mday 1.19
955
956 void ThreadPool::_sleep_sem_del(void *p)
957 {
958 if(p != 0)
959 {
960 delete (Semaphore *)p;
961 }
962 }
963
964 void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
965 {
966 if (true == check_time(start, &_deadlock_detect))
967 throw Deadlock(pegasus_thread_self());
968 return;
969 }
970
971
972 Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
973 {
974 return(check_time(start, &_deadlock_detect));
975 mday 1.19 }
976
977 Boolean ThreadPool::_check_dealloc(struct timeval *start)
978 {
979 return(check_time(start, &_deallocate_wait));
980 }
981
982 Thread *ThreadPool::_init_thread(void) throw(IPCException)
983 {
984 Thread *th = (Thread *) new Thread(_loop, this, false);
985 // allocate a sleep semaphore and pass it in the thread context
986 // initial count is zero, loop function will sleep until
987 // we signal the semaphore
988 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
989 th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
990
991 struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
|
992 mday 1.35 pegasus_gettimeofday(dldt);
993
|
994 mday 1.19 th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
995 // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
|
996 chuck 1.37
|
997 mday 1.19 th->run();
998 _current_threads++;
999 pegasus_yield();
1000
1001 return th;
1002 }
1003
1004 void ThreadPool::_link_pool(Thread *th) throw(IPCException)
1005 {
1006 if(th == 0)
1007 throw NullPointer();
|
1008 mday 1.47 try
1009 {
1010
|
1011 mday 1.52 timed_mutex(&(this->_monitor), 1000);
|
1012 mday 1.47 if(_dying.value())
1013 {
1014 th->cancel();
1015 th->join();
1016 delete th;
1017 }
1018
1019 _pool.insert_first(th);
1020
1021 }
1022 catch(...)
1023 {
1024 }
|
1025 mday 1.19 }
|
1026 mike 1.2
1027
1028 PEGASUS_NAMESPACE_END
1029
|