1 karl 1.71 //%2004////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.56 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 mike 1.2 //
10 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
11 chip 1.11 // of this software and associated documentation files (the "Software"), to
12 // deal in the Software without restriction, including without limitation the
13 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
14 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
15 // furnished to do so, subject to the following conditions:
|
16 kumpf 1.17 //
|
17 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
18 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
19 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
20 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
21 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
22 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
23 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
24 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 //
26 //==============================================================================
27 //
28 // Author: Mike Day (mdday@us.ibm.com)
29 //
30 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
31 chip 1.11 // added nsk platform support
|
32 kumpf 1.59 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
33 a.arora 1.64 // Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
|
34 mike 1.2 //
35 //%/////////////////////////////////////////////////////////////////////////////
36
37 #include "Thread.h"
|
38 kumpf 1.68 #include <exception>
|
39 mike 1.2 #include <Pegasus/Common/IPC.h>
|
40 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
41 mike 1.2
42 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
43 chip 1.11 # include "ThreadWindows.cpp"
|
44 mike 1.2 #elif defined(PEGASUS_OS_TYPE_UNIX)
45 # include "ThreadUnix.cpp"
46 #elif defined(PEGASUS_OS_TYPE_NSK)
47 # include "ThreadNsk.cpp"
48 #else
49 # error "Unsupported platform"
50 #endif
51
|
52 kumpf 1.69 PEGASUS_USING_STD;
|
53 mike 1.2 PEGASUS_NAMESPACE_BEGIN
54
|
55 mday 1.42
|
56 chip 1.11 void thread_data::default_delete(void * data)
57 {
|
58 mike 1.2 if( data != NULL)
|
59 chip 1.11 ::operator delete(data);
|
60 mike 1.2 }
61
|
62 chuck 1.43 // l10n start
63 void language_delete(void * data)
64 {
65 if( data != NULL)
66 {
|
67 a.arora 1.64 AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
|
68 chuck 1.43 }
69 }
70 // l10n end
71
|
72 mike 1.2 Boolean Thread::_signals_blocked = false;
|
73 chuck 1.37 // l10n
|
74 marek 1.63 #ifndef PEGASUS_OS_ZOS
|
75 w.otsuka 1.71.2.2 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key =
76 PEGASUS_THREAD_KEY_TYPE(-1);
|
77 marek 1.63 #else
78 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
79 #endif
|
80 chuck 1.37 Boolean Thread::_key_initialized = false;
|
81 chuck 1.41 Boolean Thread::_key_error = false;
|
82 chuck 1.37
|
83 mike 1.2
84 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
85 {
|
86 a.arora 1.64 AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
|
87 a.arora 1.65 _cleanup.insert_first(cu.get());
|
88 a.arora 1.64 cu.release();
|
89 mike 1.2 return;
90 }
|
91 chip 1.11
|
92 mike 1.2 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
93 {
|
94 a.arora 1.64 AutoPtr<cleanup_handler> cu ;
|
95 chip 1.11 try
96 {
|
97 a.arora 1.64 cu.reset(_cleanup.remove_first());
|
98 mike 1.2 }
|
99 chip 1.11 catch(IPCException&)
|
100 mike 1.2 {
|
101 chip 1.11 PEGASUS_ASSERT(0);
|
102 mike 1.2 }
103 if(execute == true)
104 cu->execute();
105 }
|
106 kumpf 1.71.2.3
|
107 mike 1.2
|
108 kumpf 1.8 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
|
109 mike 1.2
110
|
111 chip 1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
112 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
113 {
114 // execute the cleanup stack and then return
|
115 mike 1.2 while( _cleanup.count() )
116 {
|
117 chip 1.11 try
118 {
119 cleanup_pop(true);
120 }
121 catch(IPCException&)
122 {
123 PEGASUS_ASSERT(0);
124 break;
|
125 mike 1.2 }
126 }
127 _exit_code = exit_code;
128 exit_thread(exit_code);
|
129 mday 1.4 _handle.thid = 0;
|
130 mike 1.2 }
131
132
133 #endif
134
|
135 chuck 1.37 // l10n start
|
136 chuck 1.39 Sint8 Thread::initializeKey()
137 {
138 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
139 if (!Thread::_key_initialized)
140 {
|
141 chuck 1.41 if (Thread::_key_error)
142 {
143 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
144 "Thread: ERROR - thread key error");
145 return -1;
146 }
147
|
148 chuck 1.39 if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
149 {
150 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
151 "Thread: able to create a thread key");
152 Thread::_key_initialized = true;
153 }
154 else
155 {
156 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157 "Thread: ERROR - unable to create a thread key");
|
158 chuck 1.41 Thread::_key_error = true;
|
159 chuck 1.39 return -1;
160 }
161 }
162
163 PEG_METHOD_EXIT();
164 return 0;
165 }
166
|
167 chuck 1.37 Thread * Thread::getCurrent()
168 {
|
169 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
|
170 chuck 1.40 if (Thread::initializeKey() != 0)
|
171 chuck 1.39 {
172 return NULL;
173 }
|
174 chuck 1.38 PEG_METHOD_EXIT();
|
175 chuck 1.39 return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
176 }
177
178 void Thread::setCurrent(Thread * thrd)
179 {
180 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
181 if (Thread::initializeKey() == 0)
182 {
183 if (pegasus_set_thread_specific(Thread::_platform_thread_key,
184 (void *) thrd) == 0)
185 {
186 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
187 "Successful set Thread * into thread specific storage");
188 }
189 else
190 {
191 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
192 "ERROR: got error setting Thread * into thread specific storage");
193 }
194 }
195 PEG_METHOD_EXIT();
|
196 chuck 1.37 }
197
198 AcceptLanguages * Thread::getLanguages()
199 {
|
200 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
|
201 chuck 1.37
202 Thread * curThrd = Thread::getCurrent();
203 if (curThrd == NULL)
204 return NULL;
205 AcceptLanguages * acceptLangs =
206 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
207 curThrd->dereference_tsd();
208 PEG_METHOD_EXIT();
209 return acceptLangs;
210 }
211
212 void Thread::setLanguages(AcceptLanguages *langs) //l10n
213 {
|
214 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
|
215 chuck 1.37
216 Thread * currentThrd = Thread::getCurrent();
217 if (currentThrd != NULL)
218 {
219 // deletes the old tsd and creates a new one
220 currentThrd->put_tsd("acceptLanguages",
|
221 chuck 1.43 language_delete,
|
222 chuck 1.37 sizeof(AcceptLanguages *),
223 langs);
224 }
225
226 PEG_METHOD_EXIT();
227 }
228
229 void Thread::clearLanguages() //l10n
230 {
|
231 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
|
232 chuck 1.37
233 Thread * currentThrd = Thread::getCurrent();
234 if (currentThrd != NULL)
235 {
236 // deletes the old tsd
237 currentThrd->delete_tsd("acceptLanguages");
238 }
239
240 PEG_METHOD_EXIT();
241 }
242 // l10n end
243
|
244 kumpf 1.57 #if 0
|
245 mday 1.52 // two special synchronization classes for ThreadPool
|
246 kumpf 1.57 //
|
247 mday 1.52
|
248 kumpf 1.57 class timed_mutex
|
249 mday 1.52 {
250 public:
251 timed_mutex(Mutex* mut, int msec)
|
252 kumpf 1.57 :_mut(mut)
|
253 mday 1.52 {
|
254 kumpf 1.57 _mut->timed_lock(msec, pegasus_thread_self());
|
255 mday 1.52 }
256 ~timed_mutex(void)
257 {
|
258 kumpf 1.57 _mut->unlock();
|
259 mday 1.52 }
260 Mutex* _mut;
261 };
|
262 kumpf 1.57 #endif
|
263 mday 1.52
264 class try_mutex
265 {
266 public:
267 try_mutex(Mutex* mut)
268 :_mut(mut)
269 {
270 _mut->try_lock(pegasus_thread_self());
271 }
272 ~try_mutex(void)
273 {
274 _mut->unlock();
275 }
276
277 Mutex* _mut;
278 };
279
|
280 mday 1.58 class auto_int
281 {
282 public:
283 auto_int(AtomicInt* num)
284 : _int(num)
285 {
286 _int->operator++();
287 }
288 ~auto_int(void)
289 {
290 _int->operator--();
291 }
292 AtomicInt *_int;
293 };
294
295
296 AtomicInt _idle_control;
|
297 mday 1.52
|
298 mday 1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
299
300 void ThreadPool::kill_idle_threads(void)
301 {
302 static struct timeval now, last = {0, 0};
303
304 pegasus_gettimeofday(&now);
305 if(now.tv_sec - last.tv_sec > 5)
306 {
307 _pools.lock();
308 ThreadPool *p = _pools.next(0);
309 while(p != 0)
310 {
311 try
312 {
313 p->kill_dead_threads();
314 }
315 catch(...)
316 {
317 }
318 p = _pools.next(p);
319 mday 1.20 }
320 _pools.unlock();
321 pegasus_gettimeofday(&last);
322 }
323 }
324
325
|
326 mike 1.2 ThreadPool::ThreadPool(Sint16 initial_size,
|
327 kumpf 1.8 const Sint8 *key,
|
328 mike 1.2 Sint16 min,
329 Sint16 max,
330 struct timeval & alloc_wait,
|
331 chip 1.11 struct timeval & dealloc_wait,
|
332 mike 1.2 struct timeval & deadlock_detect)
333 : _max_threads(max), _min_threads(min),
|
334 mday 1.12 _current_threads(0),
335 _pool(true), _running(true),
|
336 mike 1.2 _dead(true), _dying(0)
337 {
338 _allocate_wait.tv_sec = alloc_wait.tv_sec;
339 _allocate_wait.tv_usec = alloc_wait.tv_usec;
|
340 chip 1.11 _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
|
341 mike 1.2 _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
342 _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
343 _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
344 memset(_key, 0x00, 17);
345 if(key != 0)
346 strncpy(_key, key, 16);
|
347 mday 1.21 if(_max_threads > 0 && _max_threads < initial_size)
|
348 mike 1.2 _max_threads = initial_size;
349 if(_min_threads > initial_size)
350 _min_threads = initial_size;
|
351 chip 1.11
|
352 mike 1.2 int i;
353 for(i = 0; i < initial_size; i++)
354 {
355 _link_pool(_init_thread());
356 }
|
357 mday 1.20 _pools.insert_last(this);
|
358 mike 1.2 }
359
360 ThreadPool::~ThreadPool(void)
361 {
|
362 kumpf 1.57 PEG_METHOD_ENTER(TRC_THREAD, "Thread::~ThreadPool");
|
363 mday 1.35 try
|
364 mday 1.47 {
|
365 kumpf 1.57 // Set the dying flag so all thread know the destructor has been entered
|
366 mday 1.58 _dying++;
367
|
368 mday 1.52 // remove from the global pools list
|
369 mday 1.35 _pools.remove(this);
|
370 mday 1.52
|
371 kumpf 1.71.2.6 while(_current_threads.value() > 0)
|
372 mike 1.2 {
|
373 kumpf 1.71.2.6 Thread* thread = _pool.remove_first();
374 if (thread != 0)
|
375 kumpf 1.57 {
|
376 kumpf 1.71.2.6 _cleanupThread(thread);
377 _current_threads--;
|
378 kumpf 1.57 }
379
380 else
381 {
|
382 kumpf 1.71.2.6 pegasus_yield();
|
383 kumpf 1.57 }
|
384 mday 1.35 }
|
385 mike 1.2 }
|
386 mday 1.35 catch(...)
|
387 mike 1.2 {
388 }
389 }
390
|
391 chip 1.11 // make this static to the class
|
392 mike 1.2 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
393 {
|
394 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
395
|
396 mike 1.2 Thread *myself = (Thread *)parm;
397 if(myself == 0)
|
398 kumpf 1.14 {
|
399 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
400 "ThreadPool::_loop: Thread pointer is null");
|
401 kumpf 1.14 PEG_METHOD_EXIT();
|
402 mike 1.2 throw NullPointer();
|
403 kumpf 1.14 }
|
404 chuck 1.37
405 // l10n
406 // Set myself into thread specific storage
|
407 chuck 1.38 // This will allow code to get its own Thread
|
408 chuck 1.39 Thread::setCurrent(myself);
409
|
410 mike 1.2 ThreadPool *pool = (ThreadPool *)myself->get_parm();
|
411 kumpf 1.14 if(pool == 0 )
412 {
|
413 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
414 "ThreadPool::_loop: ThreadPool pointer is null");
|
415 kumpf 1.14 PEG_METHOD_EXIT();
|
416 mike 1.2 throw NullPointer();
|
417 kumpf 1.14 }
|
418 mday 1.52
|
419 mike 1.5 Semaphore *sleep_sem = 0;
|
420 mday 1.13 Semaphore *blocking_sem = 0;
421
|
422 mike 1.5 struct timeval *deadlock_timer = 0;
|
423 mday 1.47
|
424 chip 1.11 try
|
425 mike 1.2 {
426 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
427 myself->dereference_tsd();
|
428 kumpf 1.71.2.6 PEGASUS_ASSERT(sleep_sem != 0);
429
|
430 mike 1.2 deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
|
431 mday 1.22 myself->dereference_tsd();
|
432 kumpf 1.71.2.6 PEGASUS_ASSERT(deadlock_timer != 0);
|
433 mike 1.2 }
|
434 mday 1.30 catch(...)
435 {
|
436 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
437 konrad.r 1.67 "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer.");
|
438 kumpf 1.71.2.6 PEGASUS_ASSERT(false);
439 pool->_pool.remove(myself);
440 pool->_current_threads--;
|
441 mday 1.30 PEG_METHOD_EXIT();
|
442 kumpf 1.71.2.6 return((PEGASUS_THREAD_RETURN)1);
|
443 mday 1.30 }
444
|
445 mday 1.54 while(1)
|
446 mike 1.2 {
|
447 mday 1.52 try
448 {
|
449 kumpf 1.71.2.6 sleep_sem->wait();
|
450 konrad.r 1.67 }
|
451 kumpf 1.71.2.6 catch(...)
|
452 mday 1.52 {
|
453 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
454 konrad.r 1.67 "ThreadPool::_loop: failure on sleep_sem->wait().");
|
455 kumpf 1.71.2.6 PEGASUS_ASSERT(false);
456 pool->_pool.remove(myself);
457 pool->_current_threads--;
|
458 mday 1.52 PEG_METHOD_EXIT();
|
459 kumpf 1.71.2.6 return((PEGASUS_THREAD_RETURN)1);
|
460 mday 1.52 }
461
|
462 mike 1.2 // when we awaken we reside on the running queue, not the pool queue
|
463 konrad.r 1.67 /* Hence no need to move the thread to the _dead queue, as the _running
464 * queue is only dused by kill_dead_threads which makes sure that the
465 * the threads are cleaned up (unlocking any locked lists in the TSD, etc)
466 * before killing it.
467 */
|
468 mday 1.47
|
469 mike 1.5 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
470 void *parm = 0;
|
471 kumpf 1.71.2.6 Semaphore* blocking_sem = 0;
|
472 mike 1.2
|
473 chip 1.11 try
|
474 mike 1.2 {
475 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
476 myself->reference_tsd("work func");
477 myself->dereference_tsd();
478 parm = myself->reference_tsd("work parm");
479 myself->dereference_tsd();
|
480 mday 1.13 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
481 myself->dereference_tsd();
482
|
483 mike 1.2 }
|
484 kumpf 1.71.2.6 catch(...)
|
485 mike 1.2 {
|
486 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
487 "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");
|
488 kumpf 1.71.2.6 PEGASUS_ASSERT(false);
489 pool->_pool.remove(myself);
490 pool->_current_threads--;
|
491 kumpf 1.14 PEG_METHOD_EXIT();
|
492 kumpf 1.71.2.6 return((PEGASUS_THREAD_RETURN)1);
|
493 mike 1.2 }
|
494 mday 1.52
|
495 mike 1.2 if(_work == 0)
|
496 kumpf 1.14 {
|
497 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
498 kumpf 1.71.2.6 "ThreadPool::_loop: work func is 0, meaning we should exit.");
499 break;
|
500 kumpf 1.24 }
501
|
502 mike 1.2 gettimeofday(deadlock_timer, NULL);
|
503 kumpf 1.57
|
504 kumpf 1.71.2.6 try
|
505 mday 1.20 {
|
506 kumpf 1.71.2.6 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
507 "Worker started");
508 _work(parm);
509 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
510 "Worker finished");
511 }
512 catch(Exception & e)
513 {
514 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
515 String("Exception from _work in ThreadPool::_loop: ") +
516 e.getMessage());
517 }
|
518 kumpf 1.68 #if !defined(PEGASUS_OS_LSB)
|
519 kumpf 1.71.2.6 catch (exception& e)
520 {
521 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
522 String("Exception from _work in ThreadPool::_loop: ") +
523 e.what());
524 }
|
525 kumpf 1.68 #endif
|
526 kumpf 1.71.2.6 catch(...)
527 {
528 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
529 "ThreadPool::_loop: execution of _work failed.");
530 }
|
531 chuck 1.37
|
532 chip 1.11 // put myself back onto the available list
533 try
|
534 mike 1.2 {
|
535 kumpf 1.71.2.6 gettimeofday(deadlock_timer, NULL);
536 if( blocking_sem != 0 )
537 blocking_sem->signal();
538
539 Boolean removed = pool->_running.remove((void *)myself);
540 PEGASUS_ASSERT(removed);
541
542 pool->_pool.insert_first(myself);
|
543 mike 1.2 }
|
544 mday 1.52 catch(...)
|
545 mike 1.2 {
|
546 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
547 "ThreadPool::_loop: Adding thread to idle pool failed.");
|
548 kumpf 1.71.2.6 PEGASUS_ASSERT(false);
549 pool->_current_threads--;
|
550 kumpf 1.14 PEG_METHOD_EXIT();
|
551 kumpf 1.71.2.6 return((PEGASUS_THREAD_RETURN)1);
|
552 mike 1.2 }
|
553 mday 1.51
|
554 mike 1.2 }
|
555 s.hills 1.49
|
556 kumpf 1.14 PEG_METHOD_EXIT();
|
557 mike 1.2 return((PEGASUS_THREAD_RETURN)0);
558 }
559
|
560 denise.eckstein 1.71.2.5 ThreadStatus ThreadPool::allocate_and_awaken(void *parm,
561 PEGASUS_THREAD_RETURN \
562 (PEGASUS_THREAD_CDECL *work)(void *),
563 Semaphore *blocking)
564
|
565 mike 1.2 {
|
566 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
|
567 kumpf 1.57
568 // Allocate_and_awaken will not run if the _dying flag is set.
569 // Once the lock is acquired, ~ThreadPool will not change
570 // the value of _dying until the lock is released.
|
571 mday 1.47
|
572 kumpf 1.57 try
|
573 mday 1.47 {
|
574 kumpf 1.57 if (_dying.value())
|
575 mday 1.47 {
|
576 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
577 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
|
578 denise.eckstein 1.71.2.5 return PEGASUS_THREAD_UNAVAILABLE;
|
579 kumpf 1.57 }
580 struct timeval start;
581 gettimeofday(&start, NULL);
582 Thread *th = 0;
583
|
584 mday 1.47 th = _pool.remove_first();
585
|
586 kumpf 1.59 if (th == 0)
|
587 kumpf 1.57 {
588 // will throw an IPCException&
589 _check_deadlock(&start) ;
|
590 mday 1.12
|
591 kumpf 1.57 if(_max_threads == 0 || _current_threads < _max_threads)
592 {
593 th = _init_thread();
594 }
|
595 kumpf 1.59 }
596
597 if (th == 0)
598 {
|
599 kumpf 1.60 // ATTN-DME-P3-20031103: This trace message should not be
600 // be labeled TRC_DISCARDED_DATA, because it does not
601 // necessarily imply that a failure has occurred. However,
602 // this label is being used temporarily to help isolate
603 // the cause of client timeout problems.
604
605 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
606 "ThreadPool::allocate_and_awaken: Insufficient resources: "
607 " pool = %s, running threads = %d, idle threads = %d, dead threads = %d ",
608 _key, _running.count(), _pool.count(), _dead.count());
|
609 denise.eckstein 1.71.2.5 return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
|
610 mday 1.47 }
|
611 chip 1.11
|
612 mike 1.2 // initialize the thread data with the work function and parameters
|
613 kumpf 1.14 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
614 kumpf 1.57 "Initializing thread with work function and parameters: parm = %p",
|
615 kumpf 1.14 parm);
616
|
617 kumpf 1.15 th->delete_tsd("work func");
|
618 chip 1.11 th->put_tsd("work func", NULL,
|
619 mike 1.2 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
620 (void *)work);
|
621 kumpf 1.15 th->delete_tsd("work parm");
|
622 mike 1.2 th->put_tsd("work parm", NULL, sizeof(void *), parm);
|
623 kumpf 1.15 th->delete_tsd("blocking sem");
|
624 mday 1.13 if(blocking != 0 )
|
625 kumpf 1.57 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
|
626 mday 1.47
|
627 kumpf 1.57 // put the thread on the running list
628 _running.insert_first(th);
|
629 mike 1.2
630 // signal the thread's sleep semaphore to awaken it
|
631 kumpf 1.57 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
632 denise.eckstein 1.71.2.5 PEGASUS_ASSERT(sleep_sem != 0);
633
|
634 kumpf 1.57 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
635 sleep_sem->signal();
636 th->dereference_tsd();
|
637 mike 1.2 }
|
638 kumpf 1.57 catch (...)
|
639 mday 1.47 {
|
640 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
641 "ThreadPool::allocate_and_awaken: Operation Failed.");
642 PEG_METHOD_EXIT();
|
643 kumpf 1.59 // ATTN: Error result has not yet been defined
|
644 denise.eckstein 1.71.2.5 return PEGASUS_THREAD_SETUP_FAILURE;
|
645 mday 1.47 }
|
646 kumpf 1.14 PEG_METHOD_EXIT();
|
647 denise.eckstein 1.71.2.5 return PEGASUS_THREAD_OK;
|
648 mike 1.2 }
649
650 // caller is responsible for only calling this routine during slack periods
651 // but should call it at least once per _deadlock_detect with the running q
652 // and at least once per _deallocate_wait for the pool q
653
|
654 mday 1.12 Uint32 ThreadPool::kill_dead_threads(void)
|
655 mike 1.2 throw(IPCException)
656 {
|
657 konrad.r 1.67 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::kill_dead_threads");
|
658 kumpf 1.57
|
659 kumpf 1.71.2.6 Uint32 numThreadsCleanedUp = 0;
660
661 Uint32 numIdleThreads = _pool.count();
662 for (Uint32 i = 0; i < numIdleThreads; i++)
|
663 kumpf 1.57 {
|
664 kumpf 1.71.2.6 // Do not dip below the minimum thread count
665 if (_current_threads.value() <= (Uint32)_min_threads)
666 {
667 break;
668 }
669
670 Thread* thread = _pool.remove_last();
671
672 // If there are no more threads in the _pool queue, we're done.
673 if (thread == 0)
674 {
675 break;
676 }
677
678 struct timeval* lastActivityTime;
679 try
680 {
681 lastActivityTime = (struct timeval *)thread->try_reference_tsd(
682 "deadlock timer");
683 PEGASUS_ASSERT(lastActivityTime != 0);
684 }
685 kumpf 1.71.2.6 catch (...)
686 {
687 PEGASUS_ASSERT(false);
688 _pool.insert_last(thread);
689 break;
690 }
691
692 Boolean cleanupThisThread =
693 check_time(lastActivityTime, &_deallocate_wait);
694 thread->dereference_tsd();
695
696 if (cleanupThisThread)
697 {
698 _cleanupThread(thread);
699 _current_threads--;
700 numThreadsCleanedUp++;
701 }
702 else
703 {
704 _pool.insert_first(thread);
705 }
|
706 kumpf 1.57 }
|
707 kumpf 1.71.2.6
708 PEG_METHOD_EXIT();
709 return numThreadsCleanedUp;
|
710 mike 1.2 }
711
|
712 kumpf 1.71.2.6 void ThreadPool::_cleanupThread(Thread* th)
713 {
714 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
715
716 // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
717 th->delete_tsd("work func");
718 th->put_tsd(
719 "work func", NULL,
720 sizeof(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
721 (void *) 0);
722 th->delete_tsd("work parm");
723 th->put_tsd("work parm", NULL, sizeof(void *), 0);
724
725 // signal the thread's sleep semaphore to awaken it
726 Semaphore* sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
727 PEGASUS_ASSERT(sleep_sem != 0);
728 sleep_sem->signal();
729 th->dereference_tsd();
730
731 th->join();
732 delete th;
733 kumpf 1.71.2.6
734 PEG_METHOD_EXIT();
735 }
|
736 mday 1.12
|
737 mike 1.2 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
738 {
|
739 mday 1.22 // never time out if the interval is zero
740 if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
741 return false;
742
|
743 mday 1.55 struct timeval now , finish , remaining ;
|
744 mday 1.13 Uint32 usec;
|
745 mday 1.33 pegasus_gettimeofday(&now);
|
746 mday 1.36 /* remove valgrind error */
747 pegasus_gettimeofday(&remaining);
748
|
749 mday 1.13
750 finish.tv_sec = start->tv_sec + interval->tv_sec;
751 usec = start->tv_usec + interval->tv_usec;
752 finish.tv_sec += (usec / 1000000);
753 usec %= 1000000;
754 finish.tv_usec = usec;
755
756 if ( timeval_subtract(&remaining, &finish, &now) )
|
757 mike 1.2 return true;
758 else
759 return false;
760 }
761
762 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
763 {
|
764 konrad.r 1.67
765 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_undertaker");
|
766 mday 1.30 exit_thread((PEGASUS_THREAD_RETURN)1);
|
767 konrad.r 1.67 PEG_METHOD_EXIT();
|
768 mday 1.30 return (PEGASUS_THREAD_RETURN)1;
|
769 mike 1.2 }
|
770 mday 1.19
|
771 konrad.r 1.67 PEGASUS_THREAD_RETURN ThreadPool::_graveyard(Thread *t)
772 {
773 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_graveyard");
774 ThreadPool *pool = (ThreadPool *)t->get_parm();
775 if(pool == 0 ) {
776 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
777 "Could not obtain the pool information from the Thread.", t);
778
779 return (PEGASUS_THREAD_RETURN)1;
780 }
781 if (pool->_pool.exists(t))
782 {
783 if (pool->_pool.remove( (void *) t) != 0)
784 {
785 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
786 "Moving thread %p", t);
787 /* We are moving the thread to the _running queue b/c
788 _only_ kill_dead_threads has enough logic to take care
789 of cleaning up the threads.*/
790
791 pool->_running.insert_first( t );
792 konrad.r 1.67 }
793 else
794 {
795 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
796 "Could not move Thread %p from _pool to _runing queue.", t);
797 return (PEGASUS_THREAD_RETURN)1;
798 }
799 }
800
801 else if (pool->_running.exists(t))
802 {
803 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
804 "Thread %p is on _running queue. Letting kill_dead_threads take care of the problem.", t);
805 return (PEGASUS_THREAD_RETURN)1;
806 }
807 if (!pool->_dead.exists(t))
808 {
809 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
810 "Thread is not on any queue! Moving it to the running queue.");
811 pool->_running.insert_first( t );
812 }
813 konrad.r 1.67 PEG_METHOD_EXIT();
814 return (PEGASUS_THREAD_RETURN)0;
815 }
|
816 mday 1.19
817 void ThreadPool::_sleep_sem_del(void *p)
818 {
819 if(p != 0)
820 {
821 delete (Semaphore *)p;
822 }
823 }
824
825 void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
826 {
827 if (true == check_time(start, &_deadlock_detect))
828 throw Deadlock(pegasus_thread_self());
829 return;
830 }
831
832
833 Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
834 {
835 return(check_time(start, &_deadlock_detect));
836 }
837 mday 1.19
838 Boolean ThreadPool::_check_dealloc(struct timeval *start)
839 {
840 return(check_time(start, &_deallocate_wait));
841 }
842
843 Thread *ThreadPool::_init_thread(void) throw(IPCException)
844 {
|
845 konrad.r 1.67 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_init_thread");
|
846 mday 1.19 Thread *th = (Thread *) new Thread(_loop, this, false);
847 // allocate a sleep semaphore and pass it in the thread context
848 // initial count is zero, loop function will sleep until
849 // we signal the semaphore
850 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
851 th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
852
853 struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
|
854 mday 1.35 pegasus_gettimeofday(dldt);
855
|
856 mday 1.19 th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
857 // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
|
858 denise.eckstein 1.71.2.5
859 if (th->run() != PEGASUS_THREAD_OK)
|
860 kumpf 1.59 {
|
861 denise.eckstein 1.71.2.5 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
862 "Could not create thread. Error code is %d.", errno);
863 delete th;
864 return 0;
|
865 kumpf 1.59 }
|
866 mday 1.19 _current_threads++;
867 pegasus_yield();
|
868 denise.eckstein 1.71.2.5
869 PEG_METHOD_EXIT();
|
870 mday 1.19 return th;
871 }
872
873 void ThreadPool::_link_pool(Thread *th) throw(IPCException)
874 {
875 if(th == 0)
|
876 kumpf 1.57 {
877 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
878 "ThreadPool::_link_pool: Thread pointer is null.");
|
879 mday 1.19 throw NullPointer();
|
880 kumpf 1.57 }
|
881 mday 1.47 try
882 {
883 _pool.insert_first(th);
884 }
885 catch(...)
886 {
|
887 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
888 "ThreadPool::_link_pool: _pool.insert_first failed.");
|
889 mday 1.47 }
|
890 mday 1.19 }
|
891 mike 1.2
892
893 PEGASUS_NAMESPACE_END
894
|