1 karl 1.75 //%2005////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.56 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 mike 1.2 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
13 chip 1.11 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
16 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
|
18 kumpf 1.17 //
|
19 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
20 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
22 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
25 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Mike Day (mdday@us.ibm.com)
31 //
32 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
33 chip 1.11 // added nsk platform support
|
34 kumpf 1.59 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
35 a.arora 1.64 // Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
|
36 gs.keenan 1.76 // Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
|
37 mike 1.2 //
38 //%/////////////////////////////////////////////////////////////////////////////
39
40 #include "Thread.h"
|
41 kumpf 1.68 #include <exception>
|
42 mike 1.2 #include <Pegasus/Common/IPC.h>
|
43 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
44 mike 1.2
45 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
46 chip 1.11 # include "ThreadWindows.cpp"
|
47 mike 1.2 #elif defined(PEGASUS_OS_TYPE_UNIX)
48 # include "ThreadUnix.cpp"
49 #elif defined(PEGASUS_OS_TYPE_NSK)
50 # include "ThreadNsk.cpp"
|
51 gs.keenan 1.76 #elif defined(PEGASUS_OS_VMS)
52 # include "ThreadVms.cpp"
|
53 mike 1.2 #else
54 # error "Unsupported platform"
55 #endif
56
|
57 kumpf 1.69 PEGASUS_USING_STD;
|
58 mike 1.2 PEGASUS_NAMESPACE_BEGIN
59
|
60 mday 1.42
|
61 chip 1.11 void thread_data::default_delete(void * data)
62 {
|
63 mike 1.2 if( data != NULL)
|
64 chip 1.11 ::operator delete(data);
|
65 mike 1.2 }
66
|
67 chuck 1.43 // l10n start
68 void language_delete(void * data)
69 {
70 if( data != NULL)
71 {
|
72 a.arora 1.64 AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
|
73 chuck 1.43 }
74 }
75 // l10n end
76
|
77 mike 1.2 Boolean Thread::_signals_blocked = false;
|
78 chuck 1.37 // l10n
|
79 marek 1.63 #ifndef PEGASUS_OS_ZOS
|
80 w.otsuka 1.74 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = PEGASUS_THREAD_KEY_TYPE(-1);
|
81 marek 1.63 #else
82 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
83 #endif
|
84 chuck 1.37 Boolean Thread::_key_initialized = false;
|
85 chuck 1.41 Boolean Thread::_key_error = false;
|
86 chuck 1.37
|
87 mike 1.2
88 // for non-native implementations
|
89 chip 1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
|
90 mike 1.2 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
91 {
|
92 a.arora 1.64 AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
|
93 a.arora 1.65 _cleanup.insert_first(cu.get());
|
94 a.arora 1.64 cu.release();
|
95 mike 1.2 return;
96 }
|
97 chip 1.11
|
98 mike 1.2 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
99 {
|
100 a.arora 1.64 AutoPtr<cleanup_handler> cu ;
|
101 chip 1.11 try
102 {
|
103 a.arora 1.64 cu.reset(_cleanup.remove_first());
|
104 mike 1.2 }
|
105 chip 1.11 catch(IPCException&)
|
106 mike 1.2 {
|
107 chip 1.11 PEGASUS_ASSERT(0);
|
108 mike 1.2 }
109 if(execute == true)
110 cu->execute();
111 }
|
112 chip 1.11
|
113 mike 1.2 #endif
114
115
|
116 kumpf 1.8 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
|
117 mike 1.2
118
|
119 chip 1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
120 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
121 {
122 // execute the cleanup stack and then return
|
123 mike 1.2 while( _cleanup.count() )
124 {
|
125 chip 1.11 try
126 {
127 cleanup_pop(true);
128 }
129 catch(IPCException&)
130 {
131 PEGASUS_ASSERT(0);
132 break;
|
133 mike 1.2 }
134 }
135 _exit_code = exit_code;
136 exit_thread(exit_code);
|
137 mday 1.4 _handle.thid = 0;
|
138 mike 1.2 }
139
140
141 #endif
142
|
143 chuck 1.37 // l10n start
|
144 chuck 1.39 Sint8 Thread::initializeKey()
145 {
146 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
147 if (!Thread::_key_initialized)
148 {
|
149 chuck 1.41 if (Thread::_key_error)
150 {
151 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
152 "Thread: ERROR - thread key error");
153 return -1;
154 }
155
|
156 chuck 1.39 if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
157 {
158 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
159 "Thread: able to create a thread key");
160 Thread::_key_initialized = true;
161 }
162 else
163 {
164 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
165 "Thread: ERROR - unable to create a thread key");
|
166 chuck 1.41 Thread::_key_error = true;
|
167 chuck 1.39 return -1;
168 }
169 }
170
171 PEG_METHOD_EXIT();
172 return 0;
173 }
174
|
175 chuck 1.37 Thread * Thread::getCurrent()
176 {
|
177 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
|
178 chuck 1.40 if (Thread::initializeKey() != 0)
|
179 chuck 1.39 {
180 return NULL;
181 }
|
182 chuck 1.38 PEG_METHOD_EXIT();
|
183 chuck 1.39 return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
184 }
185
186 void Thread::setCurrent(Thread * thrd)
187 {
188 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
189 if (Thread::initializeKey() == 0)
190 {
191 if (pegasus_set_thread_specific(Thread::_platform_thread_key,
192 (void *) thrd) == 0)
193 {
194 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
195 "Successful set Thread * into thread specific storage");
196 }
197 else
198 {
199 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
200 "ERROR: got error setting Thread * into thread specific storage");
201 }
202 }
203 PEG_METHOD_EXIT();
|
204 chuck 1.37 }
205
206 AcceptLanguages * Thread::getLanguages()
207 {
|
208 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
|
209 chuck 1.37
210 Thread * curThrd = Thread::getCurrent();
211 if (curThrd == NULL)
212 return NULL;
213 AcceptLanguages * acceptLangs =
214 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
215 curThrd->dereference_tsd();
216 PEG_METHOD_EXIT();
217 return acceptLangs;
218 }
219
220 void Thread::setLanguages(AcceptLanguages *langs) //l10n
221 {
|
222 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
|
223 chuck 1.37
224 Thread * currentThrd = Thread::getCurrent();
225 if (currentThrd != NULL)
226 {
227 // deletes the old tsd and creates a new one
228 currentThrd->put_tsd("acceptLanguages",
|
229 chuck 1.43 language_delete,
|
230 chuck 1.37 sizeof(AcceptLanguages *),
231 langs);
232 }
233
234 PEG_METHOD_EXIT();
235 }
236
237 void Thread::clearLanguages() //l10n
238 {
|
239 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
|
240 chuck 1.37
241 Thread * currentThrd = Thread::getCurrent();
242 if (currentThrd != NULL)
243 {
244 // deletes the old tsd
245 currentThrd->delete_tsd("acceptLanguages");
246 }
247
248 PEG_METHOD_EXIT();
249 }
250 // l10n end
251
|
252 kumpf 1.57 #if 0
|
253 mday 1.52 // two special synchronization classes for ThreadPool
|
254 kumpf 1.57 //
|
255 mday 1.52
|
256 kumpf 1.57 class timed_mutex
|
257 mday 1.52 {
258 public:
259 timed_mutex(Mutex* mut, int msec)
|
260 kumpf 1.57 :_mut(mut)
|
261 mday 1.52 {
|
262 kumpf 1.57 _mut->timed_lock(msec, pegasus_thread_self());
|
263 mday 1.52 }
264 ~timed_mutex(void)
265 {
|
266 kumpf 1.57 _mut->unlock();
|
267 mday 1.52 }
268 Mutex* _mut;
269 };
|
270 kumpf 1.57 #endif
|
271 mday 1.52
272 class try_mutex
273 {
274 public:
275 try_mutex(Mutex* mut)
276 :_mut(mut)
277 {
278 _mut->try_lock(pegasus_thread_self());
279 }
280 ~try_mutex(void)
281 {
282 _mut->unlock();
283 }
284
285 Mutex* _mut;
286 };
287
|
288 mday 1.58 class auto_int
289 {
290 public:
291 auto_int(AtomicInt* num)
292 : _int(num)
293 {
294 _int->operator++();
295 }
296 ~auto_int(void)
297 {
298 _int->operator--();
299 }
300 AtomicInt *_int;
301 };
302
303
304 AtomicInt _idle_control;
|
305 mday 1.52
|
306 mday 1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
307
308 void ThreadPool::kill_idle_threads(void)
309 {
310 static struct timeval now, last = {0, 0};
311
312 pegasus_gettimeofday(&now);
313 if(now.tv_sec - last.tv_sec > 5)
314 {
315 _pools.lock();
316 ThreadPool *p = _pools.next(0);
317 while(p != 0)
318 {
319 try
320 {
321 p->kill_dead_threads();
322 }
323 catch(...)
324 {
325 }
326 p = _pools.next(p);
327 mday 1.20 }
328 _pools.unlock();
329 pegasus_gettimeofday(&last);
330 }
331 }
332
333
|
334 mike 1.2 ThreadPool::ThreadPool(Sint16 initial_size,
|
335 david.dillard 1.73 const char *key,
|
336 mike 1.2 Sint16 min,
337 Sint16 max,
338 struct timeval & alloc_wait,
|
339 chip 1.11 struct timeval & dealloc_wait,
|
340 mike 1.2 struct timeval & deadlock_detect)
341 : _max_threads(max), _min_threads(min),
|
342 mday 1.12 _current_threads(0),
343 _pool(true), _running(true),
|
344 mike 1.2 _dead(true), _dying(0)
345 {
346 _allocate_wait.tv_sec = alloc_wait.tv_sec;
347 _allocate_wait.tv_usec = alloc_wait.tv_usec;
|
348 chip 1.11 _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
|
349 mike 1.2 _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
350 _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
351 _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
352 memset(_key, 0x00, 17);
353 if(key != 0)
354 strncpy(_key, key, 16);
|
355 mday 1.21 if(_max_threads > 0 && _max_threads < initial_size)
|
356 mike 1.2 _max_threads = initial_size;
357 if(_min_threads > initial_size)
358 _min_threads = initial_size;
|
359 chip 1.11
|
360 mike 1.2 int i;
361 for(i = 0; i < initial_size; i++)
362 {
363 _link_pool(_init_thread());
364 }
|
365 mday 1.20 _pools.insert_last(this);
|
366 mike 1.2 }
367
|
368 chip 1.11
|
369 mday 1.52 // Note: <<< Fri Oct 17 09:19:03 2003 mdd >>>
370 // the pegasus_yield() calls that preceed each th->join() are to
371 // give a thread on the running list a chance to reach a cancellation
372 // point before the join
|
373 mike 1.2
374 ThreadPool::~ThreadPool(void)
375 {
|
376 kumpf 1.57 PEG_METHOD_ENTER(TRC_THREAD, "Thread::~ThreadPool");
|
377 mday 1.35 try
|
378 mday 1.47 {
|
379 kumpf 1.57 // Set the dying flag so all thread know the destructor has been entered
|
380 mday 1.58 _dying++;
381
|
382 mday 1.52 // remove from the global pools list
|
383 mday 1.35 _pools.remove(this);
|
384 mday 1.52
385 // start with idle threads.
|
386 mday 1.35 Thread *th = 0;
387 th = _pool.remove_first();
|
388 mday 1.52 Semaphore* sleep_sem;
389
|
390 mday 1.35 while(th != 0)
|
391 mike 1.2 {
|
392 mday 1.52 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
393 kumpf 1.57 PEGASUS_ASSERT(sleep_sem != 0);
394
|
395 mday 1.35 if(sleep_sem == 0)
396 {
397 th->dereference_tsd();
398 }
|
399 kumpf 1.57 else
400 {
401 // Signal to get the thread out of the work loop.
402 sleep_sem->signal();
|
403 mday 1.52
|
404 kumpf 1.57 // Signal to get the thread past the end. See the comment
405 // "wait to be awakend by the thread pool destructor"
406 // Note: the current implementation of Thread for Windows
407 // does not implement "pthread" cancelation points so this
408 // is needed.
409 sleep_sem->signal();
410 th->dereference_tsd();
411 th->join();
412 delete th;
413 }
|
414 mday 1.35 th = _pool.remove_first();
|
415 mike 1.2 }
|
416 kumpf 1.57
|
417 mday 1.58 while(_idle_control.value())
418 pegasus_yield();
419
|
420 mday 1.47 th = _dead.remove_first();
|
421 mday 1.35 while(th != 0)
422 {
|
423 mday 1.52 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
424 kumpf 1.57 PEGASUS_ASSERT(sleep_sem != 0);
|
425 mday 1.58
|
426 mday 1.47 if(sleep_sem == 0)
427 {
428 th->dereference_tsd();
429 }
|
430 kumpf 1.57 else
431 {
432 //ATTN-DME-P3-20030322: _dead queue processing in
433 //ThreadPool::~ThreadPool is inconsistent with the
434 //processing in kill_dead_threads. Is this correct?
|
435 mday 1.58
|
436 kumpf 1.57 // signal the thread's sleep semaphore
437 sleep_sem->signal();
438 sleep_sem->signal();
439 th->dereference_tsd();
440 th->join();
441 delete th;
442 }
|
443 mday 1.47 th = _dead.remove_first();
|
444 mday 1.35 }
|
445 mday 1.52
|
446 mday 1.47 {
|
447 mday 1.52 th = _running.remove_first();
448 while(th != 0)
449 {
450 // signal the thread's sleep semaphore
451
452 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
453 kumpf 1.57 PEGASUS_ASSERT(sleep_sem != 0);
454
|
455 mday 1.52 if(sleep_sem == 0 )
456 {
457 th->dereference_tsd();
458 }
|
459 kumpf 1.57 else
460 {
461 sleep_sem->signal();
462 sleep_sem->signal();
463 th->dereference_tsd();
|
464 kumpf 1.70 //th->cancel();
|
465 kumpf 1.57 pegasus_yield();
|
466 mday 1.52
|
467 kumpf 1.57 th->join();
468 delete th;
469 }
|
470 mday 1.52 th = _running.remove_first();
|
471 kumpf 1.57 }
472 }
|
473 mike 1.2 }
|
474 mday 1.52
|
475 mday 1.35 catch(...)
|
476 mike 1.2 {
477 }
478 }
479
|
480 chip 1.11 // make this static to the class
|
481 mike 1.2 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
482 {
|
483 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
484
|
485 mike 1.2 Thread *myself = (Thread *)parm;
486 if(myself == 0)
|
487 kumpf 1.14 {
|
488 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
489 "ThreadPool::_loop: Thread pointer is null");
|
490 kumpf 1.14 PEG_METHOD_EXIT();
|
491 mike 1.2 throw NullPointer();
|
492 kumpf 1.14 }
|
493 chuck 1.37
494 // l10n
495 // Set myself into thread specific storage
|
496 chuck 1.38 // This will allow code to get its own Thread
|
497 chuck 1.39 Thread::setCurrent(myself);
498
|
499 mike 1.2 ThreadPool *pool = (ThreadPool *)myself->get_parm();
|
500 kumpf 1.14 if(pool == 0 )
501 {
|
502 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
503 "ThreadPool::_loop: ThreadPool pointer is null");
|
504 kumpf 1.14 PEG_METHOD_EXIT();
|
505 mike 1.2 throw NullPointer();
|
506 kumpf 1.14 }
|
507 mday 1.52 if(pool->_dying.value())
508 {
|
509 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
510 "ThreadPool::_loop: ThreadPool is dying(1)");
|
511 mday 1.52 PEG_METHOD_EXIT();
512 return((PEGASUS_THREAD_RETURN)0);
513 }
514
|
515 mike 1.5 Semaphore *sleep_sem = 0;
|
516 mday 1.13 Semaphore *blocking_sem = 0;
517
|
518 mike 1.5 struct timeval *deadlock_timer = 0;
|
519 mday 1.47
|
520 chip 1.11 try
|
521 mike 1.2 {
522 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
523 myself->dereference_tsd();
524 deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
|
525 mday 1.22 myself->dereference_tsd();
|
526 mike 1.2 }
|
527 mday 1.52
|
528 mday 1.30 catch(...)
529 {
|
530 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
531 konrad.r 1.67 "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer.");
532 _graveyard(myself);
|
533 mday 1.30 PEG_METHOD_EXIT();
|
534 mday 1.52 return((PEGASUS_THREAD_RETURN)0);
|
535 mday 1.30 }
536
|
537 mike 1.2 if(sleep_sem == 0 || deadlock_timer == 0)
|
538 kumpf 1.14 {
|
539 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
540 "ThreadPool::_loop: sleep_sem or deadlock_timer are null.");
|
541 konrad.r 1.67 _graveyard(myself);
|
542 kumpf 1.14 PEG_METHOD_EXIT();
|
543 mday 1.52 return((PEGASUS_THREAD_RETURN)0);
|
544 kumpf 1.14 }
|
545 mike 1.2
|
546 mday 1.54 while(1)
|
547 mike 1.2 {
|
548 mday 1.58 if(pool->_dying.value())
549 break;
550
|
551 mday 1.52 try
552 {
|
553 brian.campbell 1.72 Boolean ignoreInterrupt = false;
554 sleep_sem->wait(ignoreInterrupt);
|
555 mday 1.52 }
|
556 konrad.r 1.67 catch (WaitInterrupted &e)
557 {
558 /* From the sem_wait manpage:
559 The sem_trywait() and sem_wait() functions may fail if:
560
561 EINTR A signal interrupted this function.
562 */
563 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
564 "Sleep semaphore wait failed. Doing a continue");
565 continue;
566 }
|
567 mday 1.52 catch(IPCException& )
568 {
|
569 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
570 konrad.r 1.67 "ThreadPool::_loop: failure on sleep_sem->wait().");
571 _graveyard(myself);
|
572 mday 1.52 PEG_METHOD_EXIT();
573 return((PEGASUS_THREAD_RETURN)0);
574 }
575
|
576 mike 1.2 // when we awaken we reside on the running queue, not the pool queue
|
577 konrad.r 1.67 /* Hence no need to move the thread to the _dead queue, as the _running
578 * queue is only dused by kill_dead_threads which makes sure that the
579 * the threads are cleaned up (unlocking any locked lists in the TSD, etc)
580 * before killing it.
581 */
|
582 mday 1.47
|
583 mike 1.5 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
584 void *parm = 0;
|
585 mike 1.2
|
586 chip 1.11 try
|
587 mike 1.2 {
588 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
589 myself->reference_tsd("work func");
590 myself->dereference_tsd();
591 parm = myself->reference_tsd("work parm");
592 myself->dereference_tsd();
|
593 mday 1.13 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
594 myself->dereference_tsd();
595
|
596 mike 1.2 }
|
597 mike 1.6 catch(IPCException &)
|
598 mike 1.2 {
|
599 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
600 "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");
|
601 konrad.r 1.67 /*
602 * We cannot move ourselves to the dead queue b/c the TSD might be still
603 * locked and _graveyard is not equipped to de-lock (dereference_tsd) the TSD.
604 * Only the kill_dead_threads has enough logic to handle such situations.
605 _graveyard( myself);
606 */
|
607 kumpf 1.14 PEG_METHOD_EXIT();
|
608 mday 1.52 return((PEGASUS_THREAD_RETURN)0);
|
609 mike 1.2 }
|
610 mday 1.52
|
611 mike 1.2 if(_work == 0)
|
612 kumpf 1.14 {
|
613 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
614 "ThreadPool::_loop: work func is null.");
|
615 kumpf 1.14 PEG_METHOD_EXIT();
|
616 kumpf 1.57 return((PEGASUS_THREAD_RETURN)0);
|
617 kumpf 1.14 }
|
618 kumpf 1.24
619 if(_work ==
620 (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
621 {
|
622 konrad.r 1.67 /*
623 * The undertaker is set by ThreadPool::kill_dead_threads which awakens this thread,
624 * joins it and then removes it from the queue. Hence no reason to go to the
625 _graveyard( myself);
626 */
|
627 kumpf 1.57 PEG_METHOD_EXIT();
|
628 mday 1.23 _work(parm);
|
629 kumpf 1.24 }
630
|
631 mike 1.2 gettimeofday(deadlock_timer, NULL);
|
632 kumpf 1.57
633 if (pool->_dying.value() == 0)
|
634 mday 1.20 {
|
635 kumpf 1.57 try
636 {
|
637 konrad.r 1.67 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
638 "Worker started");
|
639 kumpf 1.57 _work(parm);
|
640 konrad.r 1.67 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
641 "Worker finished");
|
642 kumpf 1.57 }
|
643 kumpf 1.59 catch(Exception & e)
644 {
645 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
646 String("Exception from _work in ThreadPool::_loop: ") +
647 e.getMessage());
648 PEG_METHOD_EXIT();
649 return((PEGASUS_THREAD_RETURN)0);
650 }
|
651 kumpf 1.68 #if !defined(PEGASUS_OS_LSB)
652 catch (exception& e)
653 {
654 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
655 String("Exception from _work in ThreadPool::_loop: ") +
656 e.what());
657 PEG_METHOD_EXIT();
658 return((PEGASUS_THREAD_RETURN)0);
659 }
660 #endif
|
661 kumpf 1.57 catch(...)
662 {
663 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
664 "ThreadPool::_loop: execution of _work failed.");
665 PEG_METHOD_EXIT();
666 return((PEGASUS_THREAD_RETURN)0);
667 }
668 }
|
669 chuck 1.37
|
670 chip 1.11 // put myself back onto the available list
671 try
|
672 mike 1.2 {
|
673 mday 1.47 if(pool->_dying.value() == 0)
674 {
675 gettimeofday(deadlock_timer, NULL);
676 if( blocking_sem != 0 )
677 blocking_sem->signal();
678
|
679 s.hills 1.53 // If we are not on _running then ~ThreadPool has removed
680 // us and now "owns" our pointer.
|
681 kumpf 1.57 if ( pool->_running.remove((void *)myself) != 0 )
682 {
683 pool->_pool.insert_first(myself);
684 }
685 else
686 {
|
687 kumpf 1.60 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
688 "ThreadPool::_loop: Failed to remove thread from running queue.");
|
689 kumpf 1.57 PEG_METHOD_EXIT();
|
690 mday 1.54 return((PEGASUS_THREAD_RETURN)0);
|
691 kumpf 1.57 }
|
692 mday 1.47 }
693 else
694 {
695 PEG_METHOD_EXIT();
696 return((PEGASUS_THREAD_RETURN)0);
697 }
|
698 mike 1.2 }
|
699 mday 1.52 catch(...)
|
700 mike 1.2 {
|
701 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
702 "ThreadPool::_loop: Adding thread to idle pool failed.");
|
703 kumpf 1.14 PEG_METHOD_EXIT();
|
704 mday 1.47 return((PEGASUS_THREAD_RETURN)0);
|
705 mike 1.2 }
|
706 mday 1.51
|
707 mike 1.2 }
|
708 s.hills 1.49
709 // TODO: Why is this needed? Why not just continue?
|
710 mike 1.2 // wait to be awakend by the thread pool destructor
|
711 mday 1.50 //sleep_sem->wait();
|
712 s.hills 1.49
|
713 mike 1.2 myself->test_cancel();
|
714 kumpf 1.14
715 PEG_METHOD_EXIT();
|
716 mike 1.2 return((PEGASUS_THREAD_RETURN)0);
717 }
718
|
719 kumpf 1.59 Boolean ThreadPool::allocate_and_awaken(void *parm,
720 PEGASUS_THREAD_RETURN \
721 (PEGASUS_THREAD_CDECL *work)(void *),
722 Semaphore *blocking)
|
723 mike 1.2 throw(IPCException)
724 {
|
725 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
|
726 kumpf 1.57
727 // Allocate_and_awaken will not run if the _dying flag is set.
728 // Once the lock is acquired, ~ThreadPool will not change
729 // the value of _dying until the lock is released.
|
730 mday 1.47
|
731 kumpf 1.57 try
|
732 mday 1.47 {
|
733 kumpf 1.57 if (_dying.value())
|
734 mday 1.47 {
|
735 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
736 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
|
737 kumpf 1.59 // ATTN: Error result has not yet been defined
738 return true;
|
739 kumpf 1.57 }
740 struct timeval start;
741 gettimeofday(&start, NULL);
742 Thread *th = 0;
743
|
744 mday 1.47 th = _pool.remove_first();
745
|
746 kumpf 1.59 if (th == 0)
|
747 kumpf 1.57 {
748 // will throw an IPCException&
749 _check_deadlock(&start) ;
|
750 mday 1.12
|
751 kumpf 1.57 if(_max_threads == 0 || _current_threads < _max_threads)
752 {
753 th = _init_thread();
754 }
|
755 kumpf 1.59 }
756
757 if (th == 0)
758 {
|
759 kumpf 1.60 // ATTN-DME-P3-20031103: This trace message should not be
760 // be labeled TRC_DISCARDED_DATA, because it does not
761 // necessarily imply that a failure has occurred. However,
762 // this label is being used temporarily to help isolate
763 // the cause of client timeout problems.
764
765 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
766 "ThreadPool::allocate_and_awaken: Insufficient resources: "
767 " pool = %s, running threads = %d, idle threads = %d, dead threads = %d ",
768 _key, _running.count(), _pool.count(), _dead.count());
|
769 kumpf 1.59 return false;
|
770 mday 1.47 }
|
771 chip 1.11
|
772 mike 1.2 // initialize the thread data with the work function and parameters
|
773 kumpf 1.14 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
774 kumpf 1.57 "Initializing thread with work function and parameters: parm = %p",
|
775 kumpf 1.14 parm);
776
|
777 kumpf 1.15 th->delete_tsd("work func");
|
778 chip 1.11 th->put_tsd("work func", NULL,
|
779 mike 1.2 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
780 (void *)work);
|
781 kumpf 1.15 th->delete_tsd("work parm");
|
782 mike 1.2 th->put_tsd("work parm", NULL, sizeof(void *), parm);
|
783 kumpf 1.15 th->delete_tsd("blocking sem");
|
784 mday 1.13 if(blocking != 0 )
|
785 kumpf 1.57 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
|
786 mday 1.47
|
787 kumpf 1.57 // put the thread on the running list
788 _running.insert_first(th);
|
789 mike 1.2
790 // signal the thread's sleep semaphore to awaken it
|
791 kumpf 1.57 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
792 mday 1.47
|
793 kumpf 1.57 if(sleep_sem == 0)
|
794 mike 1.2 {
|
795 kumpf 1.57 th->dereference_tsd();
796 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
797 "ThreadPool::allocate_and_awaken: thread data is corrupted.");
798 PEG_METHOD_EXIT();
799 throw NullPointer();
|
800 mike 1.2 }
|
801 kumpf 1.57 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
802 sleep_sem->signal();
803 th->dereference_tsd();
|
804 mike 1.2 }
|
805 kumpf 1.57 catch (...)
|
806 mday 1.47 {
|
807 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
808 "ThreadPool::allocate_and_awaken: Operation Failed.");
809 PEG_METHOD_EXIT();
|
810 kumpf 1.59 // ATTN: Error result has not yet been defined
811 return true;
|
812 mday 1.47 }
|
813 kumpf 1.14 PEG_METHOD_EXIT();
|
814 kumpf 1.59 return true;
|
815 mike 1.2 }
816
817 // caller is responsible for only calling this routine during slack periods
818 // but should call it at least once per _deadlock_detect with the running q
819 // and at least once per _deallocate_wait for the pool q
820
|
821 mday 1.12 Uint32 ThreadPool::kill_dead_threads(void)
|
822 mike 1.2 throw(IPCException)
823 {
|
824 kumpf 1.57 // Since the kill_dead_threads, ThreadPool or allocate_and_awaken
825 // manipulate the threads on the ThreadPool queues, they should never
826 // be allowed to run at the same time.
827
|
828 konrad.r 1.67 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::kill_dead_threads");
|
829 mday 1.58 // << Thu Oct 23 14:41:02 2003 mdd >>
830 // not true, the queues are thread safe. they are syncrhonized.
|
831 kumpf 1.57
|
832 mday 1.58 auto_int do_not_destruct(&_idle_control);
833
|
834 kumpf 1.57 try
|
835 mday 1.47 {
|
836 kumpf 1.57 if (_dying.value())
|
837 mday 1.47 {
|
838 kumpf 1.57 return 0;
|
839 mday 1.47 }
|
840 mday 1.58
|
841 kumpf 1.57 struct timeval now;
842 gettimeofday(&now, NULL);
843 Uint32 bodies = 0;
|
844 konrad.r 1.67 AtomicInt needed(0);
|
845 kumpf 1.57
846 // first go thread the dead q and clean it up as much as possible
847 try
|
848 mday 1.47 {
|
849 mday 1.58 while(_dying.value() == 0 && _dead.count() > 0)
|
850 kumpf 1.57 {
851 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
852 Thread *dead = _dead.remove_first();
|
853 mday 1.58
854 if(dead )
855 {
856 dead->join();
857 delete dead;
858 }
|
859 kumpf 1.57 }
860 }
861 catch(...)
862 {
|
863 konrad.r 1.67 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Exception when deleting dead");
|
864 mday 1.47 }
865
|
866 mday 1.58 if (_dying.value())
867 {
868 return 0;
869 }
|
870 mday 1.47
|
871 mday 1.58 Thread *th = 0;
872 internal_dq idq;
873
|
874 kumpf 1.70 if(_pool.count() > 0 )
|
875 mday 1.52 {
|
876 kumpf 1.70 try
877 {
878 _pool.try_lock();
879 }
880 catch(...)
881 {
882 return bodies;
883 }
884
885 struct timeval dt = { 0, 0 };
886 struct timeval *dtp;
887
888 th = _pool.next(th);
889 while (th != 0 )
890 {
|
891 chip 1.11 try
|
892 mike 1.2 {
|
893 kumpf 1.70 dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
|
894 mike 1.2 }
|
895 mday 1.18 catch(...)
|
896 mike 1.2 {
|
897 kumpf 1.70 _pool.unlock();
|
898 mday 1.18 return bodies;
|
899 mike 1.2 }
|
900 kumpf 1.57
|
901 kumpf 1.70 if(dtp != 0)
902 {
903 memcpy(&dt, dtp, sizeof(struct timeval));
904 }
905 th->dereference_tsd();
906 struct timeval deadlock_timeout;
907 Boolean too_long;
908 too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
|
909 mday 1.58
|
910 kumpf 1.70 if( true == too_long)
|
911 kumpf 1.57 {
|
912 kumpf 1.70 // escape if we are down to the minimum thread count
913 _current_threads--;
914 if( _current_threads.value() < (Uint32)_min_threads )
|
915 kumpf 1.57 {
|
916 kumpf 1.70 _current_threads++;
917 th = _pool.next(th);
918 continue;
|
919 kumpf 1.57 }
|
920 kumpf 1.70
921 th = _pool.remove_no_lock((void *)th);
922 idq.insert_first((void*)th);
|
923 mike 1.2 }
|
924 kumpf 1.70 th = _pool.next(th);
925 }
926 _pool.unlock();
927 }
|
928 mday 1.58
|
929 kumpf 1.70 th = (Thread*)idq.remove_last();
930 while(th != 0)
931 {
932 th->delete_tsd("work func");
933 th->put_tsd("work func", NULL,
934 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
935 (void *)&_undertaker);
936 th->delete_tsd("work parm");
937 th->put_tsd("work parm", NULL, sizeof(void *), th);
|
938 mday 1.58
|
939 kumpf 1.70 // signal the thread's sleep semaphore to awaken it
940 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
941 PEGASUS_ASSERT(sleep_sem != 0);
|
942 mday 1.58
|
943 kumpf 1.70 bodies++;
944 th->dereference_tsd();
945 sleep_sem->signal();
946 th->join(); // Note: Clean up the thread here rather than
947 delete th; // leave it sitting unused on the _dead queue
948 th = (Thread*)idq.remove_last();
|
949 mike 1.2 }
|
950 kumpf 1.57
|
951 konrad.r 1.67 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
952 "We need %u new threads", needed.value());
|
953 kumpf 1.57 while (needed.value() > 0) {
954 _link_pool(_init_thread());
955 needed--;
956 pegasus_sleep(0);
957 }
958 return bodies;
959 }
|
960 mday 1.58 catch (...)
|
961 kumpf 1.57 {
962 }
|
963 konrad.r 1.67 PEG_METHOD_EXIT();
|
964 kumpf 1.57 return 0;
|
965 mike 1.2 }
966
|
967 mday 1.12
|
968 mike 1.2 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
969 {
|
970 mday 1.22 // never time out if the interval is zero
971 if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
972 return false;
973
|
974 mday 1.55 struct timeval now , finish , remaining ;
|
975 mday 1.13 Uint32 usec;
|
976 mday 1.33 pegasus_gettimeofday(&now);
|
977 mday 1.36 /* remove valgrind error */
978 pegasus_gettimeofday(&remaining);
979
|
980 mday 1.13
981 finish.tv_sec = start->tv_sec + interval->tv_sec;
982 usec = start->tv_usec + interval->tv_usec;
983 finish.tv_sec += (usec / 1000000);
984 usec %= 1000000;
985 finish.tv_usec = usec;
986
987 if ( timeval_subtract(&remaining, &finish, &now) )
|
988 mike 1.2 return true;
989 else
990 return false;
991 }
992
993 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
994 {
|
995 konrad.r 1.67
996 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_undertaker");
|
997 mday 1.30 exit_thread((PEGASUS_THREAD_RETURN)1);
|
998 konrad.r 1.67 PEG_METHOD_EXIT();
|
999 mday 1.30 return (PEGASUS_THREAD_RETURN)1;
|
1000 mike 1.2 }
|
1001 mday 1.19
|
1002 konrad.r 1.67 PEGASUS_THREAD_RETURN ThreadPool::_graveyard(Thread *t)
1003 {
1004 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_graveyard");
1005 ThreadPool *pool = (ThreadPool *)t->get_parm();
1006 if(pool == 0 ) {
1007 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
1008 "Could not obtain the pool information from the Thread.", t);
1009
1010 return (PEGASUS_THREAD_RETURN)1;
1011 }
1012 if (pool->_pool.exists(t))
1013 {
1014 if (pool->_pool.remove( (void *) t) != 0)
1015 {
1016 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
1017 "Moving thread %p", t);
1018 /* We are moving the thread to the _running queue b/c
1019 _only_ kill_dead_threads has enough logic to take care
1020 of cleaning up the threads.*/
1021
1022 pool->_running.insert_first( t );
1023 konrad.r 1.67 }
1024 else
1025 {
1026 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
1027 "Could not move Thread %p from _pool to _runing queue.", t);
1028 return (PEGASUS_THREAD_RETURN)1;
1029 }
1030 }
1031
1032 else if (pool->_running.exists(t))
1033 {
1034 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
1035 "Thread %p is on _running queue. Letting kill_dead_threads take care of the problem.", t);
1036 return (PEGASUS_THREAD_RETURN)1;
1037 }
1038 if (!pool->_dead.exists(t))
1039 {
1040 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
1041 "Thread is not on any queue! Moving it to the running queue.");
1042 pool->_running.insert_first( t );
1043 }
1044 konrad.r 1.67 PEG_METHOD_EXIT();
1045 return (PEGASUS_THREAD_RETURN)0;
1046 }
|
1047 mday 1.19
1048 void ThreadPool::_sleep_sem_del(void *p)
1049 {
1050 if(p != 0)
1051 {
1052 delete (Semaphore *)p;
1053 }
1054 }
1055
1056 void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
1057 {
1058 if (true == check_time(start, &_deadlock_detect))
1059 throw Deadlock(pegasus_thread_self());
1060 return;
1061 }
1062
1063
1064 Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
1065 {
1066 return(check_time(start, &_deadlock_detect));
1067 }
1068 mday 1.19
1069 Boolean ThreadPool::_check_dealloc(struct timeval *start)
1070 {
1071 return(check_time(start, &_deallocate_wait));
1072 }
1073
1074 Thread *ThreadPool::_init_thread(void) throw(IPCException)
1075 {
|
1076 konrad.r 1.67 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_init_thread");
|
1077 mday 1.19 Thread *th = (Thread *) new Thread(_loop, this, false);
1078 // allocate a sleep semaphore and pass it in the thread context
1079 // initial count is zero, loop function will sleep until
1080 // we signal the semaphore
1081 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
1082 th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
1083
1084 struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
|
1085 mday 1.35 pegasus_gettimeofday(dldt);
1086
|
1087 mday 1.19 th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
1088 // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
|
1089 chuck 1.37
|
1090 kumpf 1.59 if (!th->run())
1091 {
1092 delete th;
1093 return 0;
1094 }
|
1095 mday 1.19 _current_threads++;
1096 pegasus_yield();
|
1097 konrad.r 1.67 PEG_METHOD_EXIT();
|
1098 mday 1.19
1099 return th;
1100 }
1101
1102 void ThreadPool::_link_pool(Thread *th) throw(IPCException)
1103 {
1104 if(th == 0)
|
1105 kumpf 1.57 {
1106 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1107 "ThreadPool::_link_pool: Thread pointer is null.");
|
1108 mday 1.19 throw NullPointer();
|
1109 kumpf 1.57 }
|
1110 mday 1.47 try
1111 {
1112 _pool.insert_first(th);
1113 }
1114 catch(...)
1115 {
|
1116 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1117 "ThreadPool::_link_pool: _pool.insert_first failed.");
|
1118 mday 1.47 }
|
1119 mday 1.19 }
|
1120 mike 1.2
1121
1122 PEGASUS_NAMESPACE_END
1123
|