1 karl 1.75 //%2005////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.56 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 mike 1.2 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
13 chip 1.11 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
16 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
|
18 kumpf 1.17 //
|
19 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
20 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
22 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
25 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Mike Day (mdday@us.ibm.com)
31 //
32 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
33 chip 1.11 // added nsk platform support
|
34 kumpf 1.59 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
35 a.arora 1.64 // Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
|
36 gs.keenan 1.76 // Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
|
37 mike 1.2 //
38 //%/////////////////////////////////////////////////////////////////////////////
39
40 #include "Thread.h"
|
41 kumpf 1.68 #include <exception>
|
42 mike 1.2 #include <Pegasus/Common/IPC.h>
|
43 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
44 mike 1.2
45 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
46 chip 1.11 # include "ThreadWindows.cpp"
|
47 mike 1.2 #elif defined(PEGASUS_OS_TYPE_UNIX)
48 # include "ThreadUnix.cpp"
49 #elif defined(PEGASUS_OS_TYPE_NSK)
50 # include "ThreadNsk.cpp"
|
51 gs.keenan 1.76 #elif defined(PEGASUS_OS_VMS)
52 # include "ThreadVms.cpp"
|
53 mike 1.2 #else
54 # error "Unsupported platform"
55 #endif
56
|
57 kumpf 1.69 PEGASUS_USING_STD;
|
58 mike 1.2 PEGASUS_NAMESPACE_BEGIN
59
|
60 mday 1.42
|
61 chip 1.11 void thread_data::default_delete(void * data)
62 {
|
63 mike 1.2 if( data != NULL)
|
64 chip 1.11 ::operator delete(data);
|
65 mike 1.2 }
66
|
67 chuck 1.43 // l10n start
68 void language_delete(void * data)
69 {
70 if( data != NULL)
71 {
|
72 a.arora 1.64 AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
|
73 chuck 1.43 }
74 }
75 // l10n end
76
|
77 mike 1.2 Boolean Thread::_signals_blocked = false;
|
78 chuck 1.37 // l10n
|
79 marek 1.63 #ifndef PEGASUS_OS_ZOS
|
80 w.otsuka 1.74 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = PEGASUS_THREAD_KEY_TYPE(-1);
|
81 marek 1.63 #else
82 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
83 #endif
|
84 chuck 1.37 Boolean Thread::_key_initialized = false;
|
85 chuck 1.41 Boolean Thread::_key_error = false;
|
86 chuck 1.37
|
87 mike 1.2
88 // for non-native implementations
|
89 chip 1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
|
90 mike 1.2 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
91 {
|
92 a.arora 1.64 AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
|
93 a.arora 1.65 _cleanup.insert_first(cu.get());
|
94 a.arora 1.64 cu.release();
|
95 mike 1.2 return;
96 }
|
97 kumpf 1.81
|
98 mike 1.2 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
99 {
|
100 a.arora 1.64 AutoPtr<cleanup_handler> cu ;
|
101 chip 1.11 try
102 {
|
103 kumpf 1.81 cu.reset(_cleanup.remove_first());
|
104 mike 1.2 }
|
105 chip 1.11 catch(IPCException&)
|
106 mike 1.2 {
|
107 kumpf 1.81 PEGASUS_ASSERT(0);
|
108 mike 1.2 }
109 if(execute == true)
|
110 kumpf 1.81 cu->execute();
|
111 mike 1.2 }
|
112 kumpf 1.81
|
113 mike 1.2 #endif
114
115
|
116 kumpf 1.8 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
|
117 mike 1.2
118
|
119 chip 1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
120 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
121 {
122 // execute the cleanup stack and then return
|
123 mike 1.2 while( _cleanup.count() )
124 {
|
125 chip 1.11 try
126 {
|
127 kumpf 1.81 cleanup_pop(true);
|
128 chip 1.11 }
129 catch(IPCException&)
130 {
|
131 kumpf 1.81 PEGASUS_ASSERT(0);
132 break;
|
133 mike 1.2 }
134 }
135 _exit_code = exit_code;
136 exit_thread(exit_code);
|
137 mday 1.4 _handle.thid = 0;
|
138 mike 1.2 }
139
140
141 #endif
142
|
143 chuck 1.37 // l10n start
|
144 chuck 1.39 Sint8 Thread::initializeKey()
145 {
|
146 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
147 if (!Thread::_key_initialized)
148 {
149 if (Thread::_key_error)
150 {
151 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
152 "Thread: ERROR - thread key error");
153 return -1;
154 }
155
156 if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
157 {
158 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
159 "Thread: able to create a thread key");
160 Thread::_key_initialized = true;
161 }
162 else
163 {
164 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
165 "Thread: ERROR - unable to create a thread key");
166 Thread::_key_error = true;
167 kumpf 1.81 return -1;
168 }
169 }
|
170 chuck 1.39
|
171 kumpf 1.81 PEG_METHOD_EXIT();
172 return 0;
|
173 chuck 1.39 }
174
|
175 chuck 1.37 Thread * Thread::getCurrent()
176 {
|
177 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
|
178 chuck 1.40 if (Thread::initializeKey() != 0)
|
179 chuck 1.39 {
|
180 kumpf 1.81 return NULL;
|
181 chuck 1.39 }
|
182 kumpf 1.81 PEG_METHOD_EXIT();
183 return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
|
184 chuck 1.39 }
185
186 void Thread::setCurrent(Thread * thrd)
187 {
|
188 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
189 if (Thread::initializeKey() == 0)
190 {
191 if (pegasus_set_thread_specific(
192 Thread::_platform_thread_key, (void *) thrd) == 0)
|
193 chuck 1.39 {
|
194 kumpf 1.81 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
195 "Successful set Thread * into thread specific storage");
|
196 chuck 1.39 }
197 else
198 {
|
199 kumpf 1.81 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
200 "ERROR: error setting Thread * into thread specific storage");
|
201 chuck 1.39 }
|
202 kumpf 1.81 }
203 PEG_METHOD_EXIT();
|
204 chuck 1.37 }
205
206 AcceptLanguages * Thread::getLanguages()
207 {
|
208 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
209
210 Thread * curThrd = Thread::getCurrent();
211 if (curThrd == NULL)
212 return NULL;
213 AcceptLanguages * acceptLangs =
214 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
215 curThrd->dereference_tsd();
216 PEG_METHOD_EXIT();
217 return acceptLangs;
|
218 chuck 1.37 }
219
220 void Thread::setLanguages(AcceptLanguages *langs) //l10n
221 {
|
222 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
223
224 Thread* currentThrd = Thread::getCurrent();
225 if (currentThrd != NULL)
226 {
227 // deletes the old tsd and creates a new one
228 currentThrd->put_tsd("acceptLanguages",
229 language_delete,
230 sizeof(AcceptLanguages *),
231 langs);
232 }
233
234 PEG_METHOD_EXIT();
|
235 chuck 1.37 }
236
237 void Thread::clearLanguages() //l10n
238 {
|
239 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
240
241 Thread * currentThrd = Thread::getCurrent();
242 if (currentThrd != NULL)
243 {
244 // deletes the old tsd
245 currentThrd->delete_tsd("acceptLanguages");
246 }
247
248 PEG_METHOD_EXIT();
|
249 chuck 1.37 }
|
250 kumpf 1.81 // l10n end
|
251 chuck 1.37
|
252 mday 1.52
|
253 kumpf 1.81 ///////////////////////////////////////////////////////////////////////////////
254 //
255 // ThreadPool
256 //
257 ///////////////////////////////////////////////////////////////////////////////
258
259 ThreadPool::ThreadPool(
260 Sint16 initialSize,
261 const char* key,
262 Sint16 minThreads,
263 Sint16 maxThreads,
264 struct timeval& deallocateWait)
265 : _maxThreads(maxThreads),
266 _minThreads(minThreads),
267 _currentThreads(0),
268 _idleThreads(true),
269 _runningThreads(true),
270 _dying(0)
|
271 mday 1.58 {
|
272 kumpf 1.81 _deallocateWait.tv_sec = deallocateWait.tv_sec;
273 _deallocateWait.tv_usec = deallocateWait.tv_usec;
|
274 mday 1.58
|
275 kumpf 1.81 memset(_key, 0x00, 17);
276 if (key != 0)
277 {
278 strncpy(_key, key, 16);
279 }
280
281 if ((_maxThreads > 0) && (_maxThreads < initialSize))
282 {
283 _maxThreads = initialSize;
284 }
|
285 mday 1.58
|
286 kumpf 1.81 if (_minThreads > initialSize)
287 {
288 _minThreads = initialSize;
289 }
|
290 mday 1.52
|
291 kumpf 1.81 for (int i = 0; i < initialSize; i++)
292 {
293 _addToIdleThreadsQueue(_initializeThread());
294 }
295 }
|
296 mday 1.20
|
297 kumpf 1.81 ThreadPool::~ThreadPool()
|
298 mday 1.20 {
|
299 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
300 try
301 {
302 // Set the dying flag so all thread know the destructor has been entered
303 _dying++;
304
305 while (_currentThreads.value() > 0)
306 {
307 Thread* thread = _idleThreads.remove_first();
308 if (thread != 0)
309 {
310 _cleanupThread(thread);
311 _currentThreads--;
312 }
313 else
314 {
315 pegasus_yield();
316 }
317 }
318 }
319 catch (...)
320 kumpf 1.81 {
321 }
|
322 mday 1.20 }
323
|
324 kumpf 1.81 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
325 {
326 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
327
328 Thread* myself = (Thread *)parm;
329 PEGASUS_ASSERT(myself != 0);
|
330 mday 1.20
|
331 kumpf 1.81 // Set myself into thread specific storage
332 // This will allow code to get its own Thread
333 Thread::setCurrent(myself);
|
334 chip 1.11
|
335 kumpf 1.81 ThreadPool* pool = (ThreadPool *)myself->get_parm();
336 PEGASUS_ASSERT(pool != 0);
|
337 mike 1.2
|
338 kumpf 1.81 Semaphore* sleep_sem = 0;
339 struct timeval* lastActivityTime = 0;
|
340 chip 1.11
|
341 kumpf 1.81 try
342 {
343 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
344 myself->dereference_tsd();
345 PEGASUS_ASSERT(sleep_sem != 0);
346
347 lastActivityTime =
348 (struct timeval *)myself->reference_tsd("last activity time");
349 myself->dereference_tsd();
350 PEGASUS_ASSERT(lastActivityTime != 0);
351 }
352 catch (...)
353 {
354 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
355 "ThreadPool::_loop: Failure getting sleep_sem or "
356 "lastActivityTime.");
357 PEGASUS_ASSERT(false);
358 pool->_idleThreads.remove(myself);
359 pool->_currentThreads--;
360 PEG_METHOD_EXIT();
361 return((PEGASUS_THREAD_RETURN)1);
362 kumpf 1.81 }
363
364 while (1)
365 {
366 try
367 {
368 sleep_sem->wait();
369 }
370 catch (...)
371 {
372 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
373 "ThreadPool::_loop: failure on sleep_sem->wait().");
374 PEGASUS_ASSERT(false);
375 pool->_idleThreads.remove(myself);
376 pool->_currentThreads--;
377 PEG_METHOD_EXIT();
378 return((PEGASUS_THREAD_RETURN)1);
379 }
|
380 mike 1.2
|
381 kumpf 1.81 // When we awaken we reside on the _runningThreads queue, not the
382 // _idleThreads queue.
|
383 kumpf 1.14
|
384 kumpf 1.81 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *) = 0;
385 void* parm = 0;
386 Semaphore* blocking_sem = 0;
|
387 chuck 1.39
|
388 kumpf 1.81 try
389 {
390 work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *))
391 myself->reference_tsd("work func");
392 myself->dereference_tsd();
393 parm = myself->reference_tsd("work parm");
394 myself->dereference_tsd();
395 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
396 myself->dereference_tsd();
397 }
398 catch (...)
399 {
400 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
401 "ThreadPool::_loop: Failure accessing work func, work parm, "
402 "or blocking sem.");
403 PEGASUS_ASSERT(false);
404 pool->_idleThreads.remove(myself);
405 pool->_currentThreads--;
406 PEG_METHOD_EXIT();
407 return((PEGASUS_THREAD_RETURN)1);
408 }
|
409 mday 1.52
|
410 kumpf 1.81 if (work == 0)
411 {
412 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
413 "ThreadPool::_loop: work func is 0, meaning we should exit.");
414 break;
415 }
|
416 mike 1.2
|
417 kumpf 1.81 gettimeofday(lastActivityTime, NULL);
|
418 konrad.r 1.67
|
419 kumpf 1.81 try
420 {
421 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work starting.");
422 work(parm);
423 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work finished.");
424 }
425 catch (Exception & e)
426 {
|
427 konrad.r 1.67 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
428 kumpf 1.81 String("Exception from work in ThreadPool::_loop: ") +
429 e.getMessage());
430 }
|
431 kumpf 1.68 #if !defined(PEGASUS_OS_LSB)
|
432 kumpf 1.81 catch (exception& e)
433 {
|
434 kumpf 1.68 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
435 kumpf 1.81 String("Exception from work in ThreadPool::_loop: ") +
436 e.what());
437 }
|
438 kumpf 1.68 #endif
|
439 kumpf 1.81 catch (...)
440 {
441 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
442 "Unknown exception from work in ThreadPool::_loop.");
443 }
444
445 // put myself back onto the available list
446 try
447 {
448 gettimeofday(lastActivityTime, NULL);
449 if (blocking_sem != 0)
|
450 kumpf 1.57 {
|
451 kumpf 1.81 blocking_sem->signal();
|
452 kumpf 1.57 }
|
453 s.hills 1.49
|
454 kumpf 1.81 Boolean removed = pool->_runningThreads.remove((void *)myself);
455 PEGASUS_ASSERT(removed);
|
456 s.hills 1.49
|
457 kumpf 1.81 pool->_idleThreads.insert_first(myself);
458 }
459 catch (...)
460 {
461 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
462 "ThreadPool::_loop: Adding thread to idle pool failed.");
463 PEGASUS_ASSERT(false);
464 pool->_currentThreads--;
465 PEG_METHOD_EXIT();
466 return((PEGASUS_THREAD_RETURN)1);
467 }
468 }
|
469 kumpf 1.14
|
470 kumpf 1.81 PEG_METHOD_EXIT();
471 return((PEGASUS_THREAD_RETURN)0);
|
472 mike 1.2 }
473
|
474 kumpf 1.81 Boolean ThreadPool::allocate_and_awaken(
475 void* parm,
476 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *),
477 Semaphore* blocking)
|
478 mike 1.2 {
|
479 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
480
481 // Allocate_and_awaken will not run if the _dying flag is set.
482 // Once the lock is acquired, ~ThreadPool will not change
483 // the value of _dying until the lock is released.
484
485 try
486 {
487 if (_dying.value())
488 {
489 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
490 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
491 // ATTN: Error result has not yet been defined
492 return true;
493 }
494 struct timeval start;
495 gettimeofday(&start, NULL);
496 Thread* th = 0;
497
498 th = _idleThreads.remove_first();
|
499 kumpf 1.57
|
500 kumpf 1.81 if (th == 0)
501 {
502 if ((_maxThreads == 0) || (_currentThreads < _maxThreads))
503 {
504 th = _initializeThread();
505 }
506 }
507
508 if (th == 0)
509 {
510 // ATTN-DME-P3-20031103: This trace message should not be
511 // be labeled TRC_DISCARDED_DATA, because it does not
512 // necessarily imply that a failure has occurred. However,
513 // this label is being used temporarily to help isolate
514 // the cause of client timeout problems.
|
515 kumpf 1.60
|
516 kumpf 1.81 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
517 "ThreadPool::allocate_and_awaken: Insufficient resources: "
518 " pool = %s, running threads = %d, idle threads = %d",
519 _key, _runningThreads.count(), _idleThreads.count());
520 return false;
521 }
|
522 mike 1.2
|
523 kumpf 1.81 // initialize the thread data with the work function and parameters
524 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
525 "Initializing thread with work function and parameters: parm = %p",
526 parm);
|
527 mike 1.2
|
528 kumpf 1.81 th->delete_tsd("work func");
529 th->put_tsd("work func", NULL,
|
530 kumpf 1.70 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
|
531 kumpf 1.81 (void *)work);
532 th->delete_tsd("work parm");
533 th->put_tsd("work parm", NULL, sizeof(void *), parm);
534 th->delete_tsd("blocking sem");
535 if (blocking != 0)
536 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
537
538 // put the thread on the running list
539 _runningThreads.insert_first(th);
540
541 // signal the thread's sleep semaphore to awaken it
542 Semaphore* sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
543 PEGASUS_ASSERT(sleep_sem != 0);
544
545 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
546 sleep_sem->signal();
547 th->dereference_tsd();
|
548 kumpf 1.57 }
|
549 mday 1.58 catch (...)
|
550 kumpf 1.57 {
|
551 kumpf 1.81 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
552 "ThreadPool::allocate_and_awaken: Operation Failed.");
553 PEG_METHOD_EXIT();
554 // ATTN: Error result has not yet been defined
555 return true;
|
556 kumpf 1.57 }
|
557 kumpf 1.81 PEG_METHOD_EXIT();
558 return true;
|
559 mike 1.2 }
560
|
561 kumpf 1.81 // caller is responsible for only calling this routine during slack periods
562 // but should call it at least once per _deallocateWait interval.
|
563 mday 1.12
|
564 kumpf 1.81 Uint32 ThreadPool::cleanupIdleThreads()
|
565 mike 1.2 {
|
566 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
567
568 Uint32 numThreadsCleanedUp = 0;
569
570 Uint32 numIdleThreads = _idleThreads.count();
571 for (Uint32 i = 0; i < numIdleThreads; i++)
572 {
573 // Do not dip below the minimum thread count
574 if (_currentThreads.value() <= (Uint32)_minThreads)
575 {
576 break;
577 }
578
579 Thread* thread = _idleThreads.remove_last();
580
581 // If there are no more threads in the _idleThreads queue, we're done.
582 if (thread == 0)
583 {
584 break;
585 }
586
587 kumpf 1.81 struct timeval* lastActivityTime;
588 try
589 {
590 lastActivityTime = (struct timeval *)thread->try_reference_tsd(
591 "last activity time");
592 PEGASUS_ASSERT(lastActivityTime != 0);
593 }
594 catch (...)
595 {
596 PEGASUS_ASSERT(false);
597 _idleThreads.insert_last(thread);
598 break;
599 }
600
601 Boolean cleanupThisThread =
602 _timeIntervalExpired(lastActivityTime, &_deallocateWait);
603 thread->dereference_tsd();
604
605 if (cleanupThisThread)
606 {
607 _cleanupThread(thread);
608 kumpf 1.81 _currentThreads--;
609 numThreadsCleanedUp++;
610 }
611 else
612 {
613 _idleThreads.insert_first(thread);
614 }
|
615 konrad.r 1.67 }
|
616 kumpf 1.81
617 PEG_METHOD_EXIT();
618 return numThreadsCleanedUp;
|
619 konrad.r 1.67 }
|
620 mday 1.19
|
621 kumpf 1.81 void ThreadPool::_cleanupThread(Thread* thread)
|
622 mday 1.19 {
|
623 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
624
625 // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
626 thread->delete_tsd("work func");
627 thread->put_tsd(
628 "work func", 0,
629 sizeof(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
630 (void *) 0);
631 thread->delete_tsd("work parm");
632 thread->put_tsd("work parm", 0, sizeof(void *), 0);
633
634 // signal the thread's sleep semaphore to awaken it
635 Semaphore* sleep_sem = (Semaphore *)thread->reference_tsd("sleep sem");
636 PEGASUS_ASSERT(sleep_sem != 0);
637 sleep_sem->signal();
638 thread->dereference_tsd();
639
640 thread->join();
641 delete thread;
642
643 PEG_METHOD_EXIT();
|
644 mday 1.19 }
645
|
646 kumpf 1.81 Boolean ThreadPool::_timeIntervalExpired(
647 struct timeval* start,
648 struct timeval* interval)
|
649 mday 1.19 {
|
650 kumpf 1.81 // never time out if the interval is zero
651 if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
652 {
653 return false;
654 }
655
656 struct timeval now, finish, remaining;
657 Uint32 usec;
658 pegasus_gettimeofday(&now);
659 pegasus_gettimeofday(&remaining); // Avoid valgrind error
660
661 finish.tv_sec = start->tv_sec + interval->tv_sec;
662 usec = start->tv_usec + interval->tv_usec;
663 finish.tv_sec += (usec / 1000000);
664 usec %= 1000000;
665 finish.tv_usec = usec;
666
667 return (timeval_subtract(&remaining, &finish, &now) != 0);
|
668 mday 1.19 }
669
|
670 kumpf 1.81 void ThreadPool::_deleteSemaphore(void *p)
|
671 mday 1.19 {
|
672 kumpf 1.81 delete (Semaphore *)p;
|
673 mday 1.19 }
674
|
675 kumpf 1.81 Thread* ThreadPool::_initializeThread()
|
676 mday 1.19 {
|
677 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
678
679 Thread* th = (Thread *) new Thread(_loop, this, false);
680
681 // allocate a sleep semaphore and pass it in the thread context
682 // initial count is zero, loop function will sleep until
683 // we signal the semaphore
684 Semaphore* sleep_sem = (Semaphore *) new Semaphore(0);
685 th->put_tsd(
686 "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void *)sleep_sem);
687
688 struct timeval* lastActivityTime =
689 (struct timeval *) ::operator new(sizeof(struct timeval));
690 pegasus_gettimeofday(lastActivityTime);
691
692 th->put_tsd("last activity time", thread_data::default_delete,
693 sizeof(struct timeval), (void *)lastActivityTime);
694 // thread will enter _loop() and sleep on sleep_sem until we signal it
695
696 if (!th->run())
697 {
698 kumpf 1.81 delete th;
699 return 0;
700 }
701 _currentThreads++;
702 pegasus_yield();
703
704 PEG_METHOD_EXIT();
705 return th;
|
706 mday 1.19 }
|
707 mike 1.2
|
708 kumpf 1.81 void ThreadPool::_addToIdleThreadsQueue(Thread* th)
709 {
710 if (th == 0)
711 {
712 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
713 "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
714 throw NullPointer();
715 }
716
717 try
718 {
719 _idleThreads.insert_first(th);
720 }
721 catch (...)
722 {
723 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
724 "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_first "
725 "failed.");
726 }
727 }
|
728 mike 1.2
729 PEGASUS_NAMESPACE_END
|