1 karl 1.71 //%2004////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.56 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 mike 1.2 //
10 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
11 chip 1.11 // of this software and associated documentation files (the "Software"), to
12 // deal in the Software without restriction, including without limitation the
13 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
14 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
15 // furnished to do so, subject to the following conditions:
|
16 kumpf 1.17 //
|
17 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
18 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
19 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
20 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
21 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
22 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
23 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
24 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 //
26 //==============================================================================
27 //
28 // Author: Mike Day (mdday@us.ibm.com)
29 //
30 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
31 chip 1.11 // added nsk platform support
|
32 kumpf 1.59 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
33 a.arora 1.64 // Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
|
34 mike 1.2 //
35 //%/////////////////////////////////////////////////////////////////////////////
36
37 #include "Thread.h"
|
38 kumpf 1.68 #include <exception>
|
39 mike 1.2 #include <Pegasus/Common/IPC.h>
|
40 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
41 mike 1.2
42 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
43 chip 1.11 # include "ThreadWindows.cpp"
|
44 mike 1.2 #elif defined(PEGASUS_OS_TYPE_UNIX)
45 # include "ThreadUnix.cpp"
46 #elif defined(PEGASUS_OS_TYPE_NSK)
47 # include "ThreadNsk.cpp"
48 #else
49 # error "Unsupported platform"
50 #endif
51
|
52 kumpf 1.69 PEGASUS_USING_STD;
|
53 mike 1.2 PEGASUS_NAMESPACE_BEGIN
54
|
55 mday 1.42
|
56 chip 1.11 void thread_data::default_delete(void * data)
57 {
|
58 mike 1.2 if( data != NULL)
|
59 chip 1.11 ::operator delete(data);
|
60 mike 1.2 }
61
|
62 chuck 1.43 // l10n start
63 void language_delete(void * data)
64 {
65 if( data != NULL)
66 {
|
67 a.arora 1.64 AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
|
68 chuck 1.43 }
69 }
70 // l10n end
71
|
72 mike 1.2 Boolean Thread::_signals_blocked = false;
|
73 chuck 1.37 // l10n
|
74 marek 1.63 #ifndef PEGASUS_OS_ZOS
|
75 mday 1.48 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = -1;
|
76 marek 1.63 #else
77 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
78 #endif
|
79 chuck 1.37 Boolean Thread::_key_initialized = false;
|
80 chuck 1.41 Boolean Thread::_key_error = false;
|
81 chuck 1.37
|
82 mike 1.2
83 // for non-native implementations
|
84 chip 1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
|
85 mike 1.2 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
86 {
|
87 a.arora 1.64 AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
|
88 a.arora 1.65 _cleanup.insert_first(cu.get());
|
89 a.arora 1.64 cu.release();
|
90 mike 1.2 return;
91 }
|
92 chip 1.11
|
93 mike 1.2 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
94 {
|
95 a.arora 1.64 AutoPtr<cleanup_handler> cu ;
|
96 chip 1.11 try
97 {
|
98 a.arora 1.64 cu.reset(_cleanup.remove_first());
|
99 mike 1.2 }
|
100 chip 1.11 catch(IPCException&)
|
101 mike 1.2 {
|
102 chip 1.11 PEGASUS_ASSERT(0);
|
103 mike 1.2 }
104 if(execute == true)
105 cu->execute();
106 }
|
107 chip 1.11
|
108 mike 1.2 #endif
109
110
|
111 kumpf 1.8 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
|
112 mike 1.2
113
|
114 chip 1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
115 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
116 {
117 // execute the cleanup stack and then return
|
118 mike 1.2 while( _cleanup.count() )
119 {
|
120 chip 1.11 try
121 {
122 cleanup_pop(true);
123 }
124 catch(IPCException&)
125 {
126 PEGASUS_ASSERT(0);
127 break;
|
128 mike 1.2 }
129 }
130 _exit_code = exit_code;
131 exit_thread(exit_code);
|
132 mday 1.4 _handle.thid = 0;
|
133 mike 1.2 }
134
135
136 #endif
137
|
138 chuck 1.37 // l10n start
|
139 chuck 1.39 Sint8 Thread::initializeKey()
140 {
141 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
142 if (!Thread::_key_initialized)
143 {
|
144 chuck 1.41 if (Thread::_key_error)
145 {
146 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
147 "Thread: ERROR - thread key error");
148 return -1;
149 }
150
|
151 chuck 1.39 if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
152 {
153 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
154 "Thread: able to create a thread key");
155 Thread::_key_initialized = true;
156 }
157 else
158 {
159 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
160 "Thread: ERROR - unable to create a thread key");
|
161 chuck 1.41 Thread::_key_error = true;
|
162 chuck 1.39 return -1;
163 }
164 }
165
166 PEG_METHOD_EXIT();
167 return 0;
168 }
169
|
170 chuck 1.37 Thread * Thread::getCurrent()
171 {
|
172 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
|
173 chuck 1.40 if (Thread::initializeKey() != 0)
|
174 chuck 1.39 {
175 return NULL;
176 }
|
177 chuck 1.38 PEG_METHOD_EXIT();
|
178 chuck 1.39 return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
179 }
180
181 void Thread::setCurrent(Thread * thrd)
182 {
183 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
184 if (Thread::initializeKey() == 0)
185 {
186 if (pegasus_set_thread_specific(Thread::_platform_thread_key,
187 (void *) thrd) == 0)
188 {
189 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
190 "Successful set Thread * into thread specific storage");
191 }
192 else
193 {
194 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
195 "ERROR: got error setting Thread * into thread specific storage");
196 }
197 }
198 PEG_METHOD_EXIT();
|
199 chuck 1.37 }
200
201 AcceptLanguages * Thread::getLanguages()
202 {
|
203 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
|
204 chuck 1.37
205 Thread * curThrd = Thread::getCurrent();
206 if (curThrd == NULL)
207 return NULL;
208 AcceptLanguages * acceptLangs =
209 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
210 curThrd->dereference_tsd();
211 PEG_METHOD_EXIT();
212 return acceptLangs;
213 }
214
215 void Thread::setLanguages(AcceptLanguages *langs) //l10n
216 {
|
217 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
|
218 chuck 1.37
219 Thread * currentThrd = Thread::getCurrent();
220 if (currentThrd != NULL)
221 {
222 // deletes the old tsd and creates a new one
223 currentThrd->put_tsd("acceptLanguages",
|
224 chuck 1.43 language_delete,
|
225 chuck 1.37 sizeof(AcceptLanguages *),
226 langs);
227 }
228
229 PEG_METHOD_EXIT();
230 }
231
232 void Thread::clearLanguages() //l10n
233 {
|
234 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
|
235 chuck 1.37
236 Thread * currentThrd = Thread::getCurrent();
237 if (currentThrd != NULL)
238 {
239 // deletes the old tsd
240 currentThrd->delete_tsd("acceptLanguages");
241 }
242
243 PEG_METHOD_EXIT();
244 }
245 // l10n end
246
|
247 kumpf 1.57 #if 0
|
248 mday 1.52 // two special synchronization classes for ThreadPool
|
249 kumpf 1.57 //
|
250 mday 1.52
|
251 kumpf 1.57 class timed_mutex
|
252 mday 1.52 {
253 public:
254 timed_mutex(Mutex* mut, int msec)
|
255 kumpf 1.57 :_mut(mut)
|
256 mday 1.52 {
|
257 kumpf 1.57 _mut->timed_lock(msec, pegasus_thread_self());
|
258 mday 1.52 }
259 ~timed_mutex(void)
260 {
|
261 kumpf 1.57 _mut->unlock();
|
262 mday 1.52 }
263 Mutex* _mut;
264 };
|
265 kumpf 1.57 #endif
|
266 mday 1.52
267 class try_mutex
268 {
269 public:
270 try_mutex(Mutex* mut)
271 :_mut(mut)
272 {
273 _mut->try_lock(pegasus_thread_self());
274 }
275 ~try_mutex(void)
276 {
277 _mut->unlock();
278 }
279
280 Mutex* _mut;
281 };
282
|
283 mday 1.58 class auto_int
284 {
285 public:
286 auto_int(AtomicInt* num)
287 : _int(num)
288 {
289 _int->operator++();
290 }
291 ~auto_int(void)
292 {
293 _int->operator--();
294 }
295 AtomicInt *_int;
296 };
297
298
299 AtomicInt _idle_control;
|
300 mday 1.52
|
301 mday 1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
302
303 void ThreadPool::kill_idle_threads(void)
304 {
305 static struct timeval now, last = {0, 0};
306
307 pegasus_gettimeofday(&now);
308 if(now.tv_sec - last.tv_sec > 5)
309 {
310 _pools.lock();
311 ThreadPool *p = _pools.next(0);
312 while(p != 0)
313 {
314 try
315 {
316 p->kill_dead_threads();
317 }
318 catch(...)
319 {
320 }
321 p = _pools.next(p);
322 mday 1.20 }
323 _pools.unlock();
324 pegasus_gettimeofday(&last);
325 }
326 }
327
328
|
329 mike 1.2 ThreadPool::ThreadPool(Sint16 initial_size,
|
330 david.dillard 1.73 const char *key,
|
331 mike 1.2 Sint16 min,
332 Sint16 max,
333 struct timeval & alloc_wait,
|
334 chip 1.11 struct timeval & dealloc_wait,
|
335 mike 1.2 struct timeval & deadlock_detect)
336 : _max_threads(max), _min_threads(min),
|
337 mday 1.12 _current_threads(0),
338 _pool(true), _running(true),
|
339 mike 1.2 _dead(true), _dying(0)
340 {
341 _allocate_wait.tv_sec = alloc_wait.tv_sec;
342 _allocate_wait.tv_usec = alloc_wait.tv_usec;
|
343 chip 1.11 _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
|
344 mike 1.2 _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
345 _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
346 _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
347 memset(_key, 0x00, 17);
348 if(key != 0)
349 strncpy(_key, key, 16);
|
350 mday 1.21 if(_max_threads > 0 && _max_threads < initial_size)
|
351 mike 1.2 _max_threads = initial_size;
352 if(_min_threads > initial_size)
353 _min_threads = initial_size;
|
354 chip 1.11
|
355 mike 1.2 int i;
356 for(i = 0; i < initial_size; i++)
357 {
358 _link_pool(_init_thread());
359 }
|
360 mday 1.20 _pools.insert_last(this);
|
361 mike 1.2 }
362
|
363 chip 1.11
|
364 mday 1.52 // Note: <<< Fri Oct 17 09:19:03 2003 mdd >>>
365 // the pegasus_yield() calls that preceed each th->join() are to
366 // give a thread on the running list a chance to reach a cancellation
367 // point before the join
|
368 mike 1.2
369 ThreadPool::~ThreadPool(void)
370 {
|
371 kumpf 1.57 PEG_METHOD_ENTER(TRC_THREAD, "Thread::~ThreadPool");
|
372 mday 1.35 try
|
373 mday 1.47 {
|
374 kumpf 1.57 // Set the dying flag so all thread know the destructor has been entered
|
375 mday 1.58 _dying++;
376
|
377 mday 1.52 // remove from the global pools list
|
378 mday 1.35 _pools.remove(this);
|
379 mday 1.52
380 // start with idle threads.
|
381 mday 1.35 Thread *th = 0;
382 th = _pool.remove_first();
|
383 mday 1.52 Semaphore* sleep_sem;
384
|
385 mday 1.35 while(th != 0)
|
386 mike 1.2 {
|
387 mday 1.52 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
388 kumpf 1.57 PEGASUS_ASSERT(sleep_sem != 0);
389
|
390 mday 1.35 if(sleep_sem == 0)
391 {
392 th->dereference_tsd();
393 }
|
394 kumpf 1.57 else
395 {
396 // Signal to get the thread out of the work loop.
397 sleep_sem->signal();
|
398 mday 1.52
|
399 kumpf 1.57 // Signal to get the thread past the end. See the comment
400 // "wait to be awakend by the thread pool destructor"
401 // Note: the current implementation of Thread for Windows
402 // does not implement "pthread" cancelation points so this
403 // is needed.
404 sleep_sem->signal();
405 th->dereference_tsd();
406 th->join();
407 delete th;
408 }
|
409 mday 1.35 th = _pool.remove_first();
|
410 mike 1.2 }
|
411 kumpf 1.57
|
412 mday 1.58 while(_idle_control.value())
413 pegasus_yield();
414
|
415 mday 1.47 th = _dead.remove_first();
|
416 mday 1.35 while(th != 0)
417 {
|
418 mday 1.52 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
419 kumpf 1.57 PEGASUS_ASSERT(sleep_sem != 0);
|
420 mday 1.58
|
421 mday 1.47 if(sleep_sem == 0)
422 {
423 th->dereference_tsd();
424 }
|
425 kumpf 1.57 else
426 {
427 //ATTN-DME-P3-20030322: _dead queue processing in
428 //ThreadPool::~ThreadPool is inconsistent with the
429 //processing in kill_dead_threads. Is this correct?
|
430 mday 1.58
|
431 kumpf 1.57 // signal the thread's sleep semaphore
432 sleep_sem->signal();
433 sleep_sem->signal();
434 th->dereference_tsd();
435 th->join();
436 delete th;
437 }
|
438 mday 1.47 th = _dead.remove_first();
|
439 mday 1.35 }
|
440 mday 1.52
|
441 mday 1.47 {
|
442 mday 1.52 th = _running.remove_first();
443 while(th != 0)
444 {
445 // signal the thread's sleep semaphore
446
447 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
448 kumpf 1.57 PEGASUS_ASSERT(sleep_sem != 0);
449
|
450 mday 1.52 if(sleep_sem == 0 )
451 {
452 th->dereference_tsd();
453 }
|
454 kumpf 1.57 else
455 {
456 sleep_sem->signal();
457 sleep_sem->signal();
458 th->dereference_tsd();
|
459 kumpf 1.70 //th->cancel();
|
460 kumpf 1.57 pegasus_yield();
|
461 mday 1.52
|
462 kumpf 1.57 th->join();
463 delete th;
464 }
|
465 mday 1.52 th = _running.remove_first();
|
466 kumpf 1.57 }
467 }
|
468 mike 1.2 }
|
469 mday 1.52
|
470 mday 1.35 catch(...)
|
471 mike 1.2 {
472 }
473 }
474
|
475 chip 1.11 // make this static to the class
|
476 mike 1.2 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
477 {
|
478 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
479
|
480 mike 1.2 Thread *myself = (Thread *)parm;
481 if(myself == 0)
|
482 kumpf 1.14 {
|
483 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
484 "ThreadPool::_loop: Thread pointer is null");
|
485 kumpf 1.14 PEG_METHOD_EXIT();
|
486 mike 1.2 throw NullPointer();
|
487 kumpf 1.14 }
|
488 chuck 1.37
489 // l10n
490 // Set myself into thread specific storage
|
491 chuck 1.38 // This will allow code to get its own Thread
|
492 chuck 1.39 Thread::setCurrent(myself);
493
|
494 mike 1.2 ThreadPool *pool = (ThreadPool *)myself->get_parm();
|
495 kumpf 1.14 if(pool == 0 )
496 {
|
497 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
498 "ThreadPool::_loop: ThreadPool pointer is null");
|
499 kumpf 1.14 PEG_METHOD_EXIT();
|
500 mike 1.2 throw NullPointer();
|
501 kumpf 1.14 }
|
502 mday 1.52 if(pool->_dying.value())
503 {
|
504 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
505 "ThreadPool::_loop: ThreadPool is dying(1)");
|
506 mday 1.52 PEG_METHOD_EXIT();
507 return((PEGASUS_THREAD_RETURN)0);
508 }
509
|
510 mike 1.5 Semaphore *sleep_sem = 0;
|
511 mday 1.13 Semaphore *blocking_sem = 0;
512
|
513 mike 1.5 struct timeval *deadlock_timer = 0;
|
514 mday 1.47
|
515 chip 1.11 try
|
516 mike 1.2 {
517 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
518 myself->dereference_tsd();
519 deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
|
520 mday 1.22 myself->dereference_tsd();
|
521 mike 1.2 }
|
522 mday 1.52
|
523 mday 1.30 catch(...)
524 {
|
525 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
526 konrad.r 1.67 "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer.");
527 _graveyard(myself);
|
528 mday 1.30 PEG_METHOD_EXIT();
|
529 mday 1.52 return((PEGASUS_THREAD_RETURN)0);
|
530 mday 1.30 }
531
|
532 mike 1.2 if(sleep_sem == 0 || deadlock_timer == 0)
|
533 kumpf 1.14 {
|
534 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
535 "ThreadPool::_loop: sleep_sem or deadlock_timer are null.");
|
536 konrad.r 1.67 _graveyard(myself);
|
537 kumpf 1.14 PEG_METHOD_EXIT();
|
538 mday 1.52 return((PEGASUS_THREAD_RETURN)0);
|
539 kumpf 1.14 }
|
540 mike 1.2
|
541 mday 1.54 while(1)
|
542 mike 1.2 {
|
543 mday 1.58 if(pool->_dying.value())
544 break;
545
|
546 mday 1.52 try
547 {
|
548 brian.campbell 1.72 Boolean ignoreInterrupt = false;
549 sleep_sem->wait(ignoreInterrupt);
|
550 mday 1.52 }
|
551 konrad.r 1.67 catch (WaitInterrupted &e)
552 {
553 /* From the sem_wait manpage:
554 The sem_trywait() and sem_wait() functions may fail if:
555
556 EINTR A signal interrupted this function.
557 */
558 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
559 "Sleep semaphore wait failed. Doing a continue");
560 continue;
561 }
|
562 mday 1.52 catch(IPCException& )
563 {
|
564 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
565 konrad.r 1.67 "ThreadPool::_loop: failure on sleep_sem->wait().");
566 _graveyard(myself);
|
567 mday 1.52 PEG_METHOD_EXIT();
568 return((PEGASUS_THREAD_RETURN)0);
569 }
570
|
571 mike 1.2 // when we awaken we reside on the running queue, not the pool queue
|
572 konrad.r 1.67 /* Hence no need to move the thread to the _dead queue, as the _running
573 * queue is only dused by kill_dead_threads which makes sure that the
574 * the threads are cleaned up (unlocking any locked lists in the TSD, etc)
575 * before killing it.
576 */
|
577 mday 1.47
|
578 mike 1.5 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
579 void *parm = 0;
|
580 mike 1.2
|
581 chip 1.11 try
|
582 mike 1.2 {
583 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
584 myself->reference_tsd("work func");
585 myself->dereference_tsd();
586 parm = myself->reference_tsd("work parm");
587 myself->dereference_tsd();
|
588 mday 1.13 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
589 myself->dereference_tsd();
590
|
591 mike 1.2 }
|
592 mike 1.6 catch(IPCException &)
|
593 mike 1.2 {
|
594 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
595 "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");
|
596 konrad.r 1.67 /*
597 * We cannot move ourselves to the dead queue b/c the TSD might be still
598 * locked and _graveyard is not equipped to de-lock (dereference_tsd) the TSD.
599 * Only the kill_dead_threads has enough logic to handle such situations.
600 _graveyard( myself);
601 */
|
602 kumpf 1.14 PEG_METHOD_EXIT();
|
603 mday 1.52 return((PEGASUS_THREAD_RETURN)0);
|
604 mike 1.2 }
|
605 mday 1.52
|
606 mike 1.2 if(_work == 0)
|
607 kumpf 1.14 {
|
608 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
609 "ThreadPool::_loop: work func is null.");
|
610 kumpf 1.14 PEG_METHOD_EXIT();
|
611 kumpf 1.57 return((PEGASUS_THREAD_RETURN)0);
|
612 kumpf 1.14 }
|
613 kumpf 1.24
614 if(_work ==
615 (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
616 {
|
617 konrad.r 1.67 /*
618 * The undertaker is set by ThreadPool::kill_dead_threads which awakens this thread,
619 * joins it and then removes it from the queue. Hence no reason to go to the
620 _graveyard( myself);
621 */
|
622 kumpf 1.57 PEG_METHOD_EXIT();
|
623 mday 1.23 _work(parm);
|
624 kumpf 1.24 }
625
|
626 mike 1.2 gettimeofday(deadlock_timer, NULL);
|
627 kumpf 1.57
628 if (pool->_dying.value() == 0)
|
629 mday 1.20 {
|
630 kumpf 1.57 try
631 {
|
632 konrad.r 1.67 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
633 "Worker started");
|
634 kumpf 1.57 _work(parm);
|
635 konrad.r 1.67 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
636 "Worker finished");
|
637 kumpf 1.57 }
|
638 kumpf 1.59 catch(Exception & e)
639 {
640 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
641 String("Exception from _work in ThreadPool::_loop: ") +
642 e.getMessage());
643 PEG_METHOD_EXIT();
644 return((PEGASUS_THREAD_RETURN)0);
645 }
|
646 kumpf 1.68 #if !defined(PEGASUS_OS_LSB)
647 catch (exception& e)
648 {
649 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
650 String("Exception from _work in ThreadPool::_loop: ") +
651 e.what());
652 PEG_METHOD_EXIT();
653 return((PEGASUS_THREAD_RETURN)0);
654 }
655 #endif
|
656 kumpf 1.57 catch(...)
657 {
658 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
659 "ThreadPool::_loop: execution of _work failed.");
660 PEG_METHOD_EXIT();
661 return((PEGASUS_THREAD_RETURN)0);
662 }
663 }
|
664 chuck 1.37
|
665 chip 1.11 // put myself back onto the available list
666 try
|
667 mike 1.2 {
|
668 mday 1.47 if(pool->_dying.value() == 0)
669 {
670 gettimeofday(deadlock_timer, NULL);
671 if( blocking_sem != 0 )
672 blocking_sem->signal();
673
|
674 s.hills 1.53 // If we are not on _running then ~ThreadPool has removed
675 // us and now "owns" our pointer.
|
676 kumpf 1.57 if ( pool->_running.remove((void *)myself) != 0 )
677 {
678 pool->_pool.insert_first(myself);
679 }
680 else
681 {
|
682 kumpf 1.60 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
683 "ThreadPool::_loop: Failed to remove thread from running queue.");
|
684 kumpf 1.57 PEG_METHOD_EXIT();
|
685 mday 1.54 return((PEGASUS_THREAD_RETURN)0);
|
686 kumpf 1.57 }
|
687 mday 1.47 }
688 else
689 {
690 PEG_METHOD_EXIT();
691 return((PEGASUS_THREAD_RETURN)0);
692 }
|
693 mike 1.2 }
|
694 mday 1.52 catch(...)
|
695 mike 1.2 {
|
696 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
697 "ThreadPool::_loop: Adding thread to idle pool failed.");
|
698 kumpf 1.14 PEG_METHOD_EXIT();
|
699 mday 1.47 return((PEGASUS_THREAD_RETURN)0);
|
700 mike 1.2 }
|
701 mday 1.51
|
702 mike 1.2 }
|
703 s.hills 1.49
704 // TODO: Why is this needed? Why not just continue?
|
705 mike 1.2 // wait to be awakend by the thread pool destructor
|
706 mday 1.50 //sleep_sem->wait();
|
707 s.hills 1.49
|
708 mike 1.2 myself->test_cancel();
|
709 kumpf 1.14
710 PEG_METHOD_EXIT();
|
711 mike 1.2 return((PEGASUS_THREAD_RETURN)0);
712 }
713
|
714 kumpf 1.59 Boolean ThreadPool::allocate_and_awaken(void *parm,
715 PEGASUS_THREAD_RETURN \
716 (PEGASUS_THREAD_CDECL *work)(void *),
717 Semaphore *blocking)
|
718 mike 1.2 throw(IPCException)
719 {
|
720 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
|
721 kumpf 1.57
722 // Allocate_and_awaken will not run if the _dying flag is set.
723 // Once the lock is acquired, ~ThreadPool will not change
724 // the value of _dying until the lock is released.
|
725 mday 1.47
|
726 kumpf 1.57 try
|
727 mday 1.47 {
|
728 kumpf 1.57 if (_dying.value())
|
729 mday 1.47 {
|
730 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
731 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
|
732 kumpf 1.59 // ATTN: Error result has not yet been defined
733 return true;
|
734 kumpf 1.57 }
735 struct timeval start;
736 gettimeofday(&start, NULL);
737 Thread *th = 0;
738
|
739 mday 1.47 th = _pool.remove_first();
740
|
741 kumpf 1.59 if (th == 0)
|
742 kumpf 1.57 {
743 // will throw an IPCException&
744 _check_deadlock(&start) ;
|
745 mday 1.12
|
746 kumpf 1.57 if(_max_threads == 0 || _current_threads < _max_threads)
747 {
748 th = _init_thread();
749 }
|
750 kumpf 1.59 }
751
752 if (th == 0)
753 {
|
754 kumpf 1.60 // ATTN-DME-P3-20031103: This trace message should not be
755 // be labeled TRC_DISCARDED_DATA, because it does not
756 // necessarily imply that a failure has occurred. However,
757 // this label is being used temporarily to help isolate
758 // the cause of client timeout problems.
759
760 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
761 "ThreadPool::allocate_and_awaken: Insufficient resources: "
762 " pool = %s, running threads = %d, idle threads = %d, dead threads = %d ",
763 _key, _running.count(), _pool.count(), _dead.count());
|
764 kumpf 1.59 return false;
|
765 mday 1.47 }
|
766 chip 1.11
|
767 mike 1.2 // initialize the thread data with the work function and parameters
|
768 kumpf 1.14 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
769 kumpf 1.57 "Initializing thread with work function and parameters: parm = %p",
|
770 kumpf 1.14 parm);
771
|
772 kumpf 1.15 th->delete_tsd("work func");
|
773 chip 1.11 th->put_tsd("work func", NULL,
|
774 mike 1.2 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
775 (void *)work);
|
776 kumpf 1.15 th->delete_tsd("work parm");
|
777 mike 1.2 th->put_tsd("work parm", NULL, sizeof(void *), parm);
|
778 kumpf 1.15 th->delete_tsd("blocking sem");
|
779 mday 1.13 if(blocking != 0 )
|
780 kumpf 1.57 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
|
781 mday 1.47
|
782 kumpf 1.57 // put the thread on the running list
783 _running.insert_first(th);
|
784 mike 1.2
785 // signal the thread's sleep semaphore to awaken it
|
786 kumpf 1.57 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
787 mday 1.47
|
788 kumpf 1.57 if(sleep_sem == 0)
|
789 mike 1.2 {
|
790 kumpf 1.57 th->dereference_tsd();
791 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
792 "ThreadPool::allocate_and_awaken: thread data is corrupted.");
793 PEG_METHOD_EXIT();
794 throw NullPointer();
|
795 mike 1.2 }
|
796 kumpf 1.57 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
797 sleep_sem->signal();
798 th->dereference_tsd();
|
799 mike 1.2 }
|
800 kumpf 1.57 catch (...)
|
801 mday 1.47 {
|
802 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
803 "ThreadPool::allocate_and_awaken: Operation Failed.");
804 PEG_METHOD_EXIT();
|
805 kumpf 1.59 // ATTN: Error result has not yet been defined
806 return true;
|
807 mday 1.47 }
|
808 kumpf 1.14 PEG_METHOD_EXIT();
|
809 kumpf 1.59 return true;
|
810 mike 1.2 }
811
812 // caller is responsible for only calling this routine during slack periods
813 // but should call it at least once per _deadlock_detect with the running q
814 // and at least once per _deallocate_wait for the pool q
815
|
816 mday 1.12 Uint32 ThreadPool::kill_dead_threads(void)
|
817 mike 1.2 throw(IPCException)
818 {
|
819 kumpf 1.57 // Since the kill_dead_threads, ThreadPool or allocate_and_awaken
820 // manipulate the threads on the ThreadPool queues, they should never
821 // be allowed to run at the same time.
822
|
823 konrad.r 1.67 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::kill_dead_threads");
|
824 mday 1.58 // << Thu Oct 23 14:41:02 2003 mdd >>
825 // not true, the queues are thread safe. they are syncrhonized.
|
826 kumpf 1.57
|
827 mday 1.58 auto_int do_not_destruct(&_idle_control);
828
|
829 kumpf 1.57 try
|
830 mday 1.47 {
|
831 kumpf 1.57 if (_dying.value())
|
832 mday 1.47 {
|
833 kumpf 1.57 return 0;
|
834 mday 1.47 }
|
835 mday 1.58
|
836 kumpf 1.57 struct timeval now;
837 gettimeofday(&now, NULL);
838 Uint32 bodies = 0;
|
839 konrad.r 1.67 AtomicInt needed(0);
|
840 kumpf 1.57
841 // first go thread the dead q and clean it up as much as possible
842 try
|
843 mday 1.47 {
|
844 mday 1.58 while(_dying.value() == 0 && _dead.count() > 0)
|
845 kumpf 1.57 {
846 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
847 Thread *dead = _dead.remove_first();
|
848 mday 1.58
849 if(dead )
850 {
851 dead->join();
852 delete dead;
853 }
|
854 kumpf 1.57 }
855 }
856 catch(...)
857 {
|
858 konrad.r 1.67 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Exception when deleting dead");
|
859 mday 1.47 }
860
|
861 mday 1.58 if (_dying.value())
862 {
863 return 0;
864 }
|
865 mday 1.47
|
866 mday 1.58 Thread *th = 0;
867 internal_dq idq;
868
|
869 kumpf 1.70 if(_pool.count() > 0 )
|
870 mday 1.52 {
|
871 kumpf 1.70 try
872 {
873 _pool.try_lock();
874 }
875 catch(...)
876 {
877 return bodies;
878 }
879
880 struct timeval dt = { 0, 0 };
881 struct timeval *dtp;
882
883 th = _pool.next(th);
884 while (th != 0 )
885 {
|
886 chip 1.11 try
|
887 mike 1.2 {
|
888 kumpf 1.70 dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
|
889 mike 1.2 }
|
890 mday 1.18 catch(...)
|
891 mike 1.2 {
|
892 kumpf 1.70 _pool.unlock();
|
893 mday 1.18 return bodies;
|
894 mike 1.2 }
|
895 kumpf 1.57
|
896 kumpf 1.70 if(dtp != 0)
897 {
898 memcpy(&dt, dtp, sizeof(struct timeval));
899 }
900 th->dereference_tsd();
901 struct timeval deadlock_timeout;
902 Boolean too_long;
903 too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
|
904 mday 1.58
|
905 kumpf 1.70 if( true == too_long)
|
906 kumpf 1.57 {
|
907 kumpf 1.70 // escape if we are down to the minimum thread count
908 _current_threads--;
909 if( _current_threads.value() < (Uint32)_min_threads )
|
910 kumpf 1.57 {
|
911 kumpf 1.70 _current_threads++;
912 th = _pool.next(th);
913 continue;
|
914 kumpf 1.57 }
|
915 kumpf 1.70
916 th = _pool.remove_no_lock((void *)th);
917 idq.insert_first((void*)th);
|
918 mike 1.2 }
|
919 kumpf 1.70 th = _pool.next(th);
920 }
921 _pool.unlock();
922 }
|
923 mday 1.58
|
924 kumpf 1.70 th = (Thread*)idq.remove_last();
925 while(th != 0)
926 {
927 th->delete_tsd("work func");
928 th->put_tsd("work func", NULL,
929 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
930 (void *)&_undertaker);
931 th->delete_tsd("work parm");
932 th->put_tsd("work parm", NULL, sizeof(void *), th);
|
933 mday 1.58
|
934 kumpf 1.70 // signal the thread's sleep semaphore to awaken it
935 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
936 PEGASUS_ASSERT(sleep_sem != 0);
|
937 mday 1.58
|
938 kumpf 1.70 bodies++;
939 th->dereference_tsd();
940 sleep_sem->signal();
941 th->join(); // Note: Clean up the thread here rather than
942 delete th; // leave it sitting unused on the _dead queue
943 th = (Thread*)idq.remove_last();
|
944 mike 1.2 }
|
945 kumpf 1.57
|
946 konrad.r 1.67 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
947 "We need %u new threads", needed.value());
|
948 kumpf 1.57 while (needed.value() > 0) {
949 _link_pool(_init_thread());
950 needed--;
951 pegasus_sleep(0);
952 }
953 return bodies;
954 }
|
955 mday 1.58 catch (...)
|
956 kumpf 1.57 {
957 }
|
958 konrad.r 1.67 PEG_METHOD_EXIT();
|
959 kumpf 1.57 return 0;
|
960 mike 1.2 }
961
|
962 mday 1.12
|
963 mike 1.2 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
964 {
|
965 mday 1.22 // never time out if the interval is zero
966 if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
967 return false;
968
|
969 mday 1.55 struct timeval now , finish , remaining ;
|
970 mday 1.13 Uint32 usec;
|
971 mday 1.33 pegasus_gettimeofday(&now);
|
972 mday 1.36 /* remove valgrind error */
973 pegasus_gettimeofday(&remaining);
974
|
975 mday 1.13
976 finish.tv_sec = start->tv_sec + interval->tv_sec;
977 usec = start->tv_usec + interval->tv_usec;
978 finish.tv_sec += (usec / 1000000);
979 usec %= 1000000;
980 finish.tv_usec = usec;
981
982 if ( timeval_subtract(&remaining, &finish, &now) )
|
983 mike 1.2 return true;
984 else
985 return false;
986 }
987
988 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
989 {
|
990 konrad.r 1.67
991 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_undertaker");
|
992 mday 1.30 exit_thread((PEGASUS_THREAD_RETURN)1);
|
993 konrad.r 1.67 PEG_METHOD_EXIT();
|
994 mday 1.30 return (PEGASUS_THREAD_RETURN)1;
|
995 mike 1.2 }
|
996 mday 1.19
|
997 konrad.r 1.67 PEGASUS_THREAD_RETURN ThreadPool::_graveyard(Thread *t)
998 {
999 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_graveyard");
1000 ThreadPool *pool = (ThreadPool *)t->get_parm();
1001 if(pool == 0 ) {
1002 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
1003 "Could not obtain the pool information from the Thread.", t);
1004
1005 return (PEGASUS_THREAD_RETURN)1;
1006 }
1007 if (pool->_pool.exists(t))
1008 {
1009 if (pool->_pool.remove( (void *) t) != 0)
1010 {
1011 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
1012 "Moving thread %p", t);
1013 /* We are moving the thread to the _running queue b/c
1014 _only_ kill_dead_threads has enough logic to take care
1015 of cleaning up the threads.*/
1016
1017 pool->_running.insert_first( t );
1018 konrad.r 1.67 }
1019 else
1020 {
1021 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
1022 "Could not move Thread %p from _pool to _runing queue.", t);
1023 return (PEGASUS_THREAD_RETURN)1;
1024 }
1025 }
1026
1027 else if (pool->_running.exists(t))
1028 {
1029 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
1030 "Thread %p is on _running queue. Letting kill_dead_threads take care of the problem.", t);
1031 return (PEGASUS_THREAD_RETURN)1;
1032 }
1033 if (!pool->_dead.exists(t))
1034 {
1035 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
1036 "Thread is not on any queue! Moving it to the running queue.");
1037 pool->_running.insert_first( t );
1038 }
1039 konrad.r 1.67 PEG_METHOD_EXIT();
1040 return (PEGASUS_THREAD_RETURN)0;
1041 }
|
1042 mday 1.19
1043 void ThreadPool::_sleep_sem_del(void *p)
1044 {
1045 if(p != 0)
1046 {
1047 delete (Semaphore *)p;
1048 }
1049 }
1050
1051 void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
1052 {
1053 if (true == check_time(start, &_deadlock_detect))
1054 throw Deadlock(pegasus_thread_self());
1055 return;
1056 }
1057
1058
1059 Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
1060 {
1061 return(check_time(start, &_deadlock_detect));
1062 }
1063 mday 1.19
1064 Boolean ThreadPool::_check_dealloc(struct timeval *start)
1065 {
1066 return(check_time(start, &_deallocate_wait));
1067 }
1068
1069 Thread *ThreadPool::_init_thread(void) throw(IPCException)
1070 {
|
1071 konrad.r 1.67 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_init_thread");
|
1072 mday 1.19 Thread *th = (Thread *) new Thread(_loop, this, false);
1073 // allocate a sleep semaphore and pass it in the thread context
1074 // initial count is zero, loop function will sleep until
1075 // we signal the semaphore
1076 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
1077 th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
1078
1079 struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
|
1080 mday 1.35 pegasus_gettimeofday(dldt);
1081
|
1082 mday 1.19 th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
1083 // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
|
1084 chuck 1.37
|
1085 kumpf 1.59 if (!th->run())
1086 {
1087 delete th;
1088 return 0;
1089 }
|
1090 mday 1.19 _current_threads++;
1091 pegasus_yield();
|
1092 konrad.r 1.67 PEG_METHOD_EXIT();
|
1093 mday 1.19
1094 return th;
1095 }
1096
1097 void ThreadPool::_link_pool(Thread *th) throw(IPCException)
1098 {
1099 if(th == 0)
|
1100 kumpf 1.57 {
1101 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1102 "ThreadPool::_link_pool: Thread pointer is null.");
|
1103 mday 1.19 throw NullPointer();
|
1104 kumpf 1.57 }
|
1105 mday 1.47 try
1106 {
1107 _pool.insert_first(th);
1108 }
1109 catch(...)
1110 {
|
1111 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1112 "ThreadPool::_link_pool: _pool.insert_first failed.");
|
1113 mday 1.47 }
|
1114 mday 1.19 }
|
1115 mike 1.2
1116
1117 PEGASUS_NAMESPACE_END
1118
|