1 mike 1.2 //%/////////////////////////////////////////////////////////////////////////////
2 //
|
3 kumpf 1.17 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
|
5 mike 1.2 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
7 chip 1.11 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
10 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
|
12 kumpf 1.17 //
|
13 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
14 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
16 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
19 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 //==============================================================================
23 //
24 // Author: Mike Day (mdday@us.ibm.com)
25 //
26 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
27 chip 1.11 // added nsk platform support
|
28 mike 1.2 //
29 //%/////////////////////////////////////////////////////////////////////////////
30
31 #include "Thread.h"
32 #include <Pegasus/Common/IPC.h>
|
33 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
34 mike 1.2
35 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
36 chip 1.11 # include "ThreadWindows.cpp"
|
37 mike 1.2 #elif defined(PEGASUS_OS_TYPE_UNIX)
38 # include "ThreadUnix.cpp"
39 #elif defined(PEGASUS_OS_TYPE_NSK)
40 # include "ThreadNsk.cpp"
41 #else
42 # error "Unsupported platform"
43 #endif
44
45 PEGASUS_NAMESPACE_BEGIN
46
|
47 chip 1.11 void thread_data::default_delete(void * data)
48 {
|
49 mike 1.2 if( data != NULL)
|
50 chip 1.11 ::operator delete(data);
|
51 mike 1.2 }
52
53 Boolean Thread::_signals_blocked = false;
|
54 mday 1.36.4.1 // l10n
55 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
56 Boolean Thread::_key_initialized = false;
57
|
58 mike 1.2
59 // for non-native implementations
|
60 chip 1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
|
61 mike 1.2 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
62 {
63 cleanup_handler *cu = new cleanup_handler(routine, parm);
|
64 chip 1.11 try
65 {
66 _cleanup.insert_first(cu);
67 }
68 catch(IPCException&)
|
69 mike 1.2 {
70 delete cu;
|
71 chip 1.11 throw;
|
72 mike 1.2 }
73 return;
74 }
|
75 chip 1.11
|
76 mike 1.2 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
77 {
78 cleanup_handler *cu ;
|
79 chip 1.11 try
80 {
|
81 mike 1.2 cu = _cleanup.remove_first() ;
82 }
|
83 chip 1.11 catch(IPCException&)
|
84 mike 1.2 {
|
85 chip 1.11 PEGASUS_ASSERT(0);
|
86 mike 1.2 }
87 if(execute == true)
88 cu->execute();
89 delete cu;
90 }
|
91 chip 1.11
|
92 mike 1.2 #endif
93
94
|
95 kumpf 1.8 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
|
96 mike 1.2
97
|
98 chip 1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
99 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
100 {
101 // execute the cleanup stack and then return
|
102 mike 1.2 while( _cleanup.count() )
103 {
|
104 chip 1.11 try
105 {
106 cleanup_pop(true);
107 }
108 catch(IPCException&)
109 {
110 PEGASUS_ASSERT(0);
111 break;
|
112 mike 1.2 }
113 }
114 _exit_code = exit_code;
115 exit_thread(exit_code);
|
116 mday 1.4 _handle.thid = 0;
|
117 mike 1.2 }
118
119
120 #endif
121
|
122 mday 1.36.4.1 // l10n start
123 Thread * Thread::getCurrent()
124 {
125 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::getCurrent");
126 if (!Thread::_key_initialized)
|
127 mday 1.36.4.2 return NULL;
128 PEG_METHOD_EXIT();
|
129 mday 1.36.4.1 return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
130 }
131
132 AcceptLanguages * Thread::getLanguages()
133 {
134 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::getLanguages");
135
136 Thread * curThrd = Thread::getCurrent();
137 if (curThrd == NULL)
138 return NULL;
139 AcceptLanguages * acceptLangs =
140 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
141 curThrd->dereference_tsd();
142 PEG_METHOD_EXIT();
143 return acceptLangs;
144 }
145
146 void Thread::setLanguages(AcceptLanguages *langs) //l10n
147 {
148 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::setLanguages");
149
150 mday 1.36.4.1 Thread * currentThrd = Thread::getCurrent();
151 if (currentThrd != NULL)
152 {
153 // deletes the old tsd and creates a new one
154 currentThrd->put_tsd("acceptLanguages",
155 thread_data::default_delete,
156 sizeof(AcceptLanguages *),
157 langs);
158 }
159
160 PEG_METHOD_EXIT();
161 }
162
163 void Thread::clearLanguages() //l10n
164 {
165 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::clearLanguages");
166
167 Thread * currentThrd = Thread::getCurrent();
168 if (currentThrd != NULL)
169 {
170 // deletes the old tsd
171 mday 1.36.4.1 currentThrd->delete_tsd("acceptLanguages");
172 }
173
174 PEG_METHOD_EXIT();
175 }
176 // l10n end
177
|
178 mday 1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
179
180
181 void ThreadPool::kill_idle_threads(void)
182 {
183 static struct timeval now, last = {0, 0};
184
185 pegasus_gettimeofday(&now);
186 if(now.tv_sec - last.tv_sec > 5)
187 {
188 _pools.lock();
189 ThreadPool *p = _pools.next(0);
190 while(p != 0)
191 {
192 try
193 {
194 p->kill_dead_threads();
195 }
196 catch(...)
197 {
198 }
199 mday 1.20 p = _pools.next(p);
200 }
201 _pools.unlock();
202 pegasus_gettimeofday(&last);
203 }
204 }
205
206
|
207 mike 1.2 ThreadPool::ThreadPool(Sint16 initial_size,
|
208 kumpf 1.8 const Sint8 *key,
|
209 mike 1.2 Sint16 min,
210 Sint16 max,
211 struct timeval & alloc_wait,
|
212 chip 1.11 struct timeval & dealloc_wait,
|
213 mike 1.2 struct timeval & deadlock_detect)
214 : _max_threads(max), _min_threads(min),
|
215 mday 1.12 _current_threads(0),
216 _pool(true), _running(true),
|
217 mike 1.2 _dead(true), _dying(0)
218 {
219 _allocate_wait.tv_sec = alloc_wait.tv_sec;
220 _allocate_wait.tv_usec = alloc_wait.tv_usec;
|
221 chip 1.11 _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
|
222 mike 1.2 _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
223 _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
224 _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
225 memset(_key, 0x00, 17);
226 if(key != 0)
227 strncpy(_key, key, 16);
|
228 mday 1.21 if(_max_threads > 0 && _max_threads < initial_size)
|
229 mike 1.2 _max_threads = initial_size;
230 if(_min_threads > initial_size)
231 _min_threads = initial_size;
|
232 chip 1.11
|
233 mike 1.2 int i;
234 for(i = 0; i < initial_size; i++)
235 {
236 _link_pool(_init_thread());
237 }
|
238 mday 1.20 _pools.insert_last(this);
239
|
240 mday 1.36.4.1 // l10n
241 if (!Thread::_key_initialized)
242 {
|
243 mday 1.36.4.2 if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
244 {
245 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
246 "ThreadPool: able to create a thread key");
247 Thread::_key_initialized = true;
248 }
249 else
250 {
251 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
252 "ThreadPool: ERROR - unable to create a thread key");
253 printf("ThreadPool: ERROR - unable to create a thread key\n");
254 }
|
255 mday 1.36.4.1 }
|
256 mike 1.2 }
257
|
258 chip 1.11
|
259 mike 1.2
260 ThreadPool::~ThreadPool(void)
261 {
|
262 mday 1.35 try
|
263 chip 1.11 {
|
264 mday 1.35 _pools.remove(this);
265 _dying++;
266 Thread *th = 0;
267 th = _pool.remove_first();
268 while(th != 0)
|
269 mike 1.2 {
|
270 mday 1.35 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
271
272 if(sleep_sem == 0)
273 {
274 th->dereference_tsd();
275 throw NullPointer();
276 }
277
278 sleep_sem->signal();
279 sleep_sem->signal();
|
280 mike 1.2 th->dereference_tsd();
|
281 mday 1.35 // signal the thread's sleep semaphore
282 th->cancel();
283 th->join();
284 th->empty_tsd();
285 delete th;
286 th = _pool.remove_first();
|
287 mike 1.2 }
|
288 chip 1.11
|
289 mday 1.35 th = _running.remove_first();
290 while(th != 0)
291 {
292 // signal the thread's sleep semaphore
293 th->cancel();
294 th->join();
295 th->empty_tsd();
296 delete th;
297 th = _running.remove_first();
298 }
|
299 mike 1.2
|
300 mday 1.35 th = _dead.remove_first();
301 while(th != 0)
302 {
303 // signal the thread's sleep semaphore
304 th->cancel();
305 th->join();
306 th->empty_tsd();
307 delete th;
308 th = _dead.remove_first();
309 }
|
310 mike 1.2 }
|
311 mday 1.35 catch(...)
|
312 mike 1.2 {
313 }
314 }
315
|
316 chip 1.11 // make this static to the class
|
317 mike 1.2 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
318 {
|
319 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
320
|
321 mike 1.2 Thread *myself = (Thread *)parm;
322 if(myself == 0)
|
323 kumpf 1.14 {
324 PEG_METHOD_EXIT();
|
325 mike 1.2 throw NullPointer();
|
326 kumpf 1.14 }
|
327 mday 1.36.4.1
328 // l10n
329 // Set myself into thread specific storage
|
330 mday 1.36.4.2 // This will allow code to get its own Thread
331 if (Thread::_key_initialized)
332 {
333 if (pegasus_set_thread_specific(Thread::_platform_thread_key,
334 (void *) myself) == 0)
335 {
336 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
337 "just set myself into thread specific storage");
338 }
339 else
340 {
341 printf("ThreadPool: ERROR setting tls\n");
342 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
343 "ERROR: got error setting thread specific storage");
344 }
345 }
346 else
347 {
348 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
349 "ERROR: thread key is not initialized");
350 }
|
351 mday 1.36.4.1
|
352 mike 1.2 ThreadPool *pool = (ThreadPool *)myself->get_parm();
|
353 kumpf 1.14 if(pool == 0 )
354 {
355 PEG_METHOD_EXIT();
|
356 mike 1.2 throw NullPointer();
|
357 kumpf 1.14 }
|
358 mike 1.5 Semaphore *sleep_sem = 0;
|
359 mday 1.13 Semaphore *blocking_sem = 0;
360
|
361 mike 1.5 struct timeval *deadlock_timer = 0;
|
362 chip 1.11
363 try
|
364 mike 1.2 {
365 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
366 myself->dereference_tsd();
367 deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
|
368 mday 1.22 myself->dereference_tsd();
|
369 mike 1.2 }
|
370 mike 1.6 catch(IPCException &)
|
371 mike 1.2 {
|
372 kumpf 1.14 PEG_METHOD_EXIT();
|
373 mike 1.2 myself->exit_self(0);
374 }
|
375 mday 1.30 catch(...)
376 {
377 PEG_METHOD_EXIT();
378 myself->exit_self(0);
379 }
380
|
381 mike 1.2 if(sleep_sem == 0 || deadlock_timer == 0)
|
382 kumpf 1.14 {
383 PEG_METHOD_EXIT();
|
384 mike 1.2 throw NullPointer();
|
385 kumpf 1.14 }
|
386 mike 1.2
387 while(pool->_dying < 1)
388 {
389 sleep_sem->wait();
|
390 mday 1.35
|
391 mike 1.2 // when we awaken we reside on the running queue, not the pool queue
392 if(pool->_dying > 0)
393 break;
|
394 chip 1.11
|
395 mike 1.5 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
396 void *parm = 0;
|
397 mike 1.2
|
398 chip 1.11 try
|
399 mike 1.2 {
400 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
401 myself->reference_tsd("work func");
402 myself->dereference_tsd();
403 parm = myself->reference_tsd("work parm");
404 myself->dereference_tsd();
|
405 mday 1.13 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
406 myself->dereference_tsd();
407
|
408 mike 1.2 }
|
409 mike 1.6 catch(IPCException &)
|
410 mike 1.2 {
|
411 kumpf 1.14 PEG_METHOD_EXIT();
|
412 mike 1.2 myself->exit_self(0);
413 }
|
414 chip 1.11
|
415 mike 1.2 if(_work == 0)
|
416 kumpf 1.14 {
417 PEG_METHOD_EXIT();
|
418 mike 1.2 throw NullPointer();
|
419 kumpf 1.14 }
|
420 kumpf 1.24
421 if(_work ==
422 (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
423 {
|
424 mday 1.23 _work(parm);
|
425 kumpf 1.24 }
426
|
427 mike 1.2 gettimeofday(deadlock_timer, NULL);
|
428 mday 1.20 try
429 {
430 _work(parm);
431 }
432 catch(...)
433 {
434 gettimeofday(deadlock_timer, NULL);
435 }
|
436 mday 1.36.4.1
|
437 mday 1.20 gettimeofday(deadlock_timer, NULL);
|
438 mday 1.13 if( blocking_sem != 0 )
439 blocking_sem->signal();
440
|
441 chip 1.11 // put myself back onto the available list
442 try
|
443 mike 1.2 {
444 pool->_running.remove((void *)myself);
445 pool->_link_pool(myself);
446 }
|
447 mike 1.6 catch(IPCException &)
|
448 mike 1.2 {
|
449 kumpf 1.14 PEG_METHOD_EXIT();
|
450 mike 1.2 myself->exit_self(0);
451 }
452 }
453 // wait to be awakend by the thread pool destructor
454 sleep_sem->wait();
455 myself->test_cancel();
|
456 kumpf 1.14
457 PEG_METHOD_EXIT();
|
458 mike 1.2 myself->exit_self(0);
459 return((PEGASUS_THREAD_RETURN)0);
460 }
461
462 void ThreadPool::allocate_and_awaken(void *parm,
463 PEGASUS_THREAD_RETURN \
|
464 mday 1.13 (PEGASUS_THREAD_CDECL *work)(void *),
465 Semaphore *blocking)
466
|
467 mike 1.2 throw(IPCException)
468 {
|
469 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
|
470 mike 1.2 struct timeval start;
471 gettimeofday(&start, NULL);
|
472 chip 1.11
|
473 mike 1.2 Thread *th = _pool.remove_first();
|
474 mday 1.12
|
475 mday 1.7 // wait for the right interval and try again
|
476 mday 1.33 while (th == 0 && _dying < 1)
|
477 mike 1.2 {
|
478 mday 1.12 _check_deadlock(&start) ;
479
|
480 mday 1.21 if(_max_threads == 0 || _current_threads < _max_threads)
|
481 mday 1.35 {
482 th = _init_thread();
483 continue;
484 }
485 pegasus_yield();
|
486 mday 1.7 th = _pool.remove_first();
487 }
|
488 chip 1.11
489
|
490 mike 1.2 if(_dying < 1)
491 {
492 // initialize the thread data with the work function and parameters
|
493 kumpf 1.14 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
494 "Initializing thread with work function and parameters: parm = %p",
495 parm);
496
|
497 kumpf 1.15 th->delete_tsd("work func");
|
498 chip 1.11 th->put_tsd("work func", NULL,
|
499 mike 1.2 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
500 (void *)work);
|
501 kumpf 1.15 th->delete_tsd("work parm");
|
502 mike 1.2 th->put_tsd("work parm", NULL, sizeof(void *), parm);
|
503 kumpf 1.15 th->delete_tsd("blocking sem");
|
504 mday 1.13 if(blocking != 0 )
505 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
506
|
507 chip 1.11 // put the thread on the running list
|
508 mike 1.2 _running.insert_first(th);
509
510 // signal the thread's sleep semaphore to awaken it
511 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
512
513 if(sleep_sem == 0)
514 {
515 th->dereference_tsd();
|
516 kumpf 1.14 PEG_METHOD_EXIT();
|
517 mike 1.2 throw NullPointer();
518 }
|
519 kumpf 1.14 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
|
520 mike 1.2 sleep_sem->signal();
521 th->dereference_tsd();
522 }
523 else
524 _pool.insert_first(th);
|
525 kumpf 1.14
526 PEG_METHOD_EXIT();
|
527 mike 1.2 }
528
529 // caller is responsible for only calling this routine during slack periods
530 // but should call it at least once per _deadlock_detect with the running q
531 // and at least once per _deallocate_wait for the pool q
532
|
533 mday 1.12 Uint32 ThreadPool::kill_dead_threads(void)
|
534 mike 1.2 throw(IPCException)
535 {
536 struct timeval now;
537 gettimeofday(&now, NULL);
|
538 mday 1.12 Uint32 bodies = 0;
539
|
540 mike 1.2 // first go thread the dead q and clean it up as much as possible
541 while(_dead.count() > 0)
542 {
|
543 mday 1.34 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
|
544 mike 1.2 Thread *dead = _dead.remove_first();
545 if(dead == 0)
546 throw NullPointer();
|
547 mday 1.30 dead->join();
|
548 mike 1.2 delete dead;
549 }
|
550 chip 1.11
551 DQueue<Thread> * map[2] =
|
552 mike 1.2 {
553 &_pool, &_running
554 };
|
555 chip 1.11
556
|
557 mike 1.2 DQueue<Thread> *q = 0;
558 int i = 0;
559 AtomicInt needed(0);
|
560 chip 1.11
|
561 kumpf 1.31 #ifdef PEGASUS_DISABLE_KILLING_HUNG_THREADS
562 // This change prevents the thread pool from killing "hung" threads.
563 // The definition of a "hung" thread is one that has been on the run queue
564 // for longer than the time interval set when the thread pool was created.
565 // Cancelling "hung" threads has proven to be problematic.
566
567 // With this change the thread pool will not cancel "hung" threads. This
568 // may prevent a crash depending upon the state of the "hung" thread. In
569 // the case that the thread is actually hung, this change causes the
570 // thread resources not to be reclaimed.
571
572 // Idle threads, those that have not executed a routine for a time
573 // interval, continue to be destroyed. This is normal and should not
574 // cause any problems.
575 for( ; i < 1; i++)
576 #else
|
577 mday 1.30 for( ; i < 2; i++)
|
578 kumpf 1.31 #endif
|
579 mday 1.21 {
580 q = map[i];
|
581 mike 1.2 if(q->count() > 0 )
582 {
|
583 chip 1.11 try
|
584 mike 1.2 {
585 q->try_lock();
586 }
|
587 mday 1.18 catch(...)
|
588 mike 1.2 {
|
589 mday 1.18 return bodies;
|
590 mike 1.2 }
591
592 struct timeval dt = { 0, 0 };
593 struct timeval *dtp;
594 Thread *th = 0;
595 th = q->next(th);
596 while (th != 0 )
597 {
|
598 chip 1.11 try
|
599 mike 1.2 {
600 dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
601 }
|
602 mday 1.18 catch(...)
|
603 mike 1.2 {
|
604 kumpf 1.25 q->unlock();
|
605 mday 1.18 return bodies;
|
606 mike 1.2 }
|
607 chip 1.11
|
608 mike 1.2 if(dtp != 0)
609 {
610 memcpy(&dt, dtp, sizeof(struct timeval));
611 }
612 th->dereference_tsd();
613 struct timeval deadlock_timeout;
|
614 mday 1.18 Boolean too_long;
615 if( i == 0)
616 {
617 too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
618 }
619 else
620 {
|
621 mday 1.22 too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
|
622 mday 1.18 }
623
624 if( true == too_long)
|
625 mike 1.2 {
626 // if we are deallocating from the pool, escape if we are
|
627 chip 1.11 // down to the minimum thread count
|
628 mday 1.13 _current_threads--;
|
629 mday 1.18 if( _current_threads.value() < (Uint32)_min_threads )
|
630 mike 1.2 {
|
631 mday 1.13 if( i == 0)
|
632 mike 1.2 {
|
633 mday 1.13 _current_threads++;
|
634 mike 1.2 th = q->next(th);
635 continue;
636 }
|
637 chip 1.11 else
|
638 mike 1.2 {
|
639 chip 1.11 // we are killing a hung thread and we will drop below the
|
640 mike 1.2 // minimum. create another thread to make up for the one
641 // we are about to kill
642 needed++;
643 }
644 }
|
645 chip 1.11
|
646 mike 1.2 th = q->remove_no_lock((void *)th);
|
647 chip 1.11
|
648 mike 1.2 if(th != 0)
649 {
|
650 mday 1.30 if( i == 0 )
|
651 mike 1.2 {
|
652 mday 1.30 th->delete_tsd("work func");
653 th->put_tsd("work func", NULL,
654 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
655 (void *)&_undertaker);
656 th->delete_tsd("work parm");
657 th->put_tsd("work parm", NULL, sizeof(void *), th);
658
659 // signal the thread's sleep semaphore to awaken it
660 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
661
662 if(sleep_sem == 0)
663 {
664 q->unlock();
665 th->dereference_tsd();
666 throw NullPointer();
667 }
668
669 bodies++;
|
670 mike 1.2 th->dereference_tsd();
|
671 mday 1.30 _dead.insert_first(th);
672 sleep_sem->signal();
673 th = 0;
674 }
675 else
676 {
677 // deadlocked threads
|
678 mday 1.34 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Killing a deadlocked thread");
|
679 mday 1.30 th->cancel();
680 delete th;
|
681 mike 1.2 }
682 }
683 }
684 th = q->next(th);
|
685 mday 1.20 pegasus_sleep(1);
|
686 mike 1.2 }
687 q->unlock();
688 while (needed.value() > 0)
689 {
690 _link_pool(_init_thread());
691 needed--;
|
692 mday 1.20 pegasus_sleep(0);
|
693 mike 1.2 }
694 }
695 }
|
696 mday 1.18 return bodies;
|
697 mike 1.2 }
698
|
699 mday 1.12
|
700 mike 1.2 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
701 {
|
702 mday 1.22 // never time out if the interval is zero
703 if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
704 return false;
705
|
706 mday 1.36 struct timeval now, finish, remaining ;
|
707 mday 1.13 Uint32 usec;
|
708 mday 1.33 pegasus_gettimeofday(&now);
|
709 mday 1.36 /* remove valgrind error */
710 pegasus_gettimeofday(&remaining);
711
|
712 mday 1.13
713 finish.tv_sec = start->tv_sec + interval->tv_sec;
714 usec = start->tv_usec + interval->tv_usec;
715 finish.tv_sec += (usec / 1000000);
716 usec %= 1000000;
717 finish.tv_usec = usec;
718
719 if ( timeval_subtract(&remaining, &finish, &now) )
|
720 mike 1.2 return true;
721 else
722 return false;
723 }
724
725 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
726 {
|
727 mday 1.30 exit_thread((PEGASUS_THREAD_RETURN)1);
728 return (PEGASUS_THREAD_RETURN)1;
|
729 mike 1.2 }
|
730 mday 1.19
731
732 void ThreadPool::_sleep_sem_del(void *p)
733 {
734 if(p != 0)
735 {
736 delete (Semaphore *)p;
737 }
738 }
739
740 void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
741 {
742 if (true == check_time(start, &_deadlock_detect))
743 throw Deadlock(pegasus_thread_self());
744 return;
745 }
746
747
748 Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
749 {
750 return(check_time(start, &_deadlock_detect));
751 mday 1.19 }
752
753 Boolean ThreadPool::_check_dealloc(struct timeval *start)
754 {
755 return(check_time(start, &_deallocate_wait));
756 }
757
758 Thread *ThreadPool::_init_thread(void) throw(IPCException)
759 {
760 Thread *th = (Thread *) new Thread(_loop, this, false);
761 // allocate a sleep semaphore and pass it in the thread context
762 // initial count is zero, loop function will sleep until
763 // we signal the semaphore
764 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
765 th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
766
767 struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
|
768 mday 1.35 pegasus_gettimeofday(dldt);
769
|
770 mday 1.19 th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
771 // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
|
772 mday 1.36.4.1
|
773 mday 1.19 th->run();
774 _current_threads++;
775 pegasus_yield();
776
777 return th;
778 }
779
780 void ThreadPool::_link_pool(Thread *th) throw(IPCException)
781 {
782 if(th == 0)
783 throw NullPointer();
784 _pool.insert_first(th);
785 }
|
786 mike 1.2
787
788 PEGASUS_NAMESPACE_END
789
|