1 karl 1.75 //%2005////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.56 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 mike 1.2 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
13 chip 1.11 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
16 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
|
18 david.dillard 1.83 //
|
19 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
20 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
22 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
25 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Mike Day (mdday@us.ibm.com)
31 //
32 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
33 chip 1.11 // added nsk platform support
|
34 kumpf 1.59 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
35 a.arora 1.64 // Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
|
36 gs.keenan 1.76 // Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
|
37 david.dillard 1.83 // David Dillard, VERITAS Software Corp.
38 // (david.dillard@veritas.com)
|
39 mike 1.2 //
40 //%/////////////////////////////////////////////////////////////////////////////
41
42 #include "Thread.h"
|
43 kumpf 1.68 #include <exception>
|
44 mike 1.2 #include <Pegasus/Common/IPC.h>
|
45 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
46 mike 1.2
47 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
48 chip 1.11 # include "ThreadWindows.cpp"
|
49 mike 1.2 #elif defined(PEGASUS_OS_TYPE_UNIX)
50 # include "ThreadUnix.cpp"
51 #elif defined(PEGASUS_OS_TYPE_NSK)
52 # include "ThreadNsk.cpp"
|
53 gs.keenan 1.76 #elif defined(PEGASUS_OS_VMS)
54 # include "ThreadVms.cpp"
|
55 mike 1.2 #else
56 # error "Unsupported platform"
57 #endif
58
|
59 kumpf 1.69 PEGASUS_USING_STD;
|
60 mike 1.2 PEGASUS_NAMESPACE_BEGIN
61
|
62 mday 1.42
|
63 chip 1.11 void thread_data::default_delete(void * data)
64 {
|
65 mike 1.2 if( data != NULL)
|
66 chip 1.11 ::operator delete(data);
|
67 mike 1.2 }
68
|
69 chuck 1.43 // l10n start
70 void language_delete(void * data)
71 {
72 if( data != NULL)
73 {
|
74 a.arora 1.64 AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
|
75 chuck 1.43 }
76 }
77 // l10n end
78
|
79 mike 1.2 Boolean Thread::_signals_blocked = false;
|
80 chuck 1.37 // l10n
|
81 marek 1.63 #ifndef PEGASUS_OS_ZOS
|
82 w.otsuka 1.74 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = PEGASUS_THREAD_KEY_TYPE(-1);
|
83 marek 1.63 #else
84 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
85 #endif
|
86 chuck 1.37 Boolean Thread::_key_initialized = false;
|
87 chuck 1.41 Boolean Thread::_key_error = false;
|
88 chuck 1.37
|
89 mike 1.2
|
90 david.dillard 1.83 void Thread::cleanup_push( void (*routine)(void *), void *parm)
|
91 mike 1.2 {
|
92 a.arora 1.64 AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
|
93 a.arora 1.65 _cleanup.insert_first(cu.get());
|
94 a.arora 1.64 cu.release();
|
95 mike 1.2 return;
96 }
|
97 kumpf 1.81
|
98 david.dillard 1.83 void Thread::cleanup_pop(Boolean execute)
|
99 mike 1.2 {
|
100 david.dillard 1.83 AutoPtr<cleanup_handler> cu;
|
101 chip 1.11 try
102 {
|
103 kumpf 1.81 cu.reset(_cleanup.remove_first());
|
104 mike 1.2 }
|
105 chip 1.11 catch(IPCException&)
|
106 mike 1.2 {
|
107 kumpf 1.81 PEGASUS_ASSERT(0);
|
108 mike 1.2 }
109 if(execute == true)
|
110 kumpf 1.81 cu->execute();
|
111 mike 1.2 }
|
112 kumpf 1.81
|
113 mike 1.2
|
114 david.dillard 1.83 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value)
|
115 mike 1.2
116
|
117 chip 1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
118 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
119 {
120 // execute the cleanup stack and then return
|
121 mike 1.2 while( _cleanup.count() )
122 {
|
123 chip 1.11 try
124 {
|
125 kumpf 1.81 cleanup_pop(true);
|
126 chip 1.11 }
127 catch(IPCException&)
128 {
|
129 kumpf 1.81 PEGASUS_ASSERT(0);
130 break;
|
131 mike 1.2 }
132 }
133 _exit_code = exit_code;
134 exit_thread(exit_code);
|
135 mday 1.4 _handle.thid = 0;
|
136 mike 1.2 }
137
138
139 #endif
140
|
141 chuck 1.37 // l10n start
|
142 chuck 1.39 Sint8 Thread::initializeKey()
143 {
|
144 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
145 if (!Thread::_key_initialized)
146 {
147 if (Thread::_key_error)
148 {
149 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
150 "Thread: ERROR - thread key error");
151 return -1;
152 }
153
154 if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
155 {
156 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157 "Thread: able to create a thread key");
158 Thread::_key_initialized = true;
159 }
160 else
161 {
162 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
163 "Thread: ERROR - unable to create a thread key");
164 Thread::_key_error = true;
165 kumpf 1.81 return -1;
166 }
167 }
|
168 chuck 1.39
|
169 kumpf 1.81 PEG_METHOD_EXIT();
170 return 0;
|
171 chuck 1.39 }
172
|
173 chuck 1.37 Thread * Thread::getCurrent()
174 {
|
175 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
|
176 chuck 1.40 if (Thread::initializeKey() != 0)
|
177 chuck 1.39 {
|
178 kumpf 1.81 return NULL;
|
179 chuck 1.39 }
|
180 kumpf 1.81 PEG_METHOD_EXIT();
181 return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
|
182 chuck 1.39 }
183
184 void Thread::setCurrent(Thread * thrd)
185 {
|
186 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
187 if (Thread::initializeKey() == 0)
188 {
189 if (pegasus_set_thread_specific(
190 Thread::_platform_thread_key, (void *) thrd) == 0)
|
191 chuck 1.39 {
|
192 kumpf 1.81 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
193 "Successful set Thread * into thread specific storage");
|
194 chuck 1.39 }
195 else
196 {
|
197 kumpf 1.81 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
198 "ERROR: error setting Thread * into thread specific storage");
|
199 chuck 1.39 }
|
200 kumpf 1.81 }
201 PEG_METHOD_EXIT();
|
202 chuck 1.37 }
203
204 AcceptLanguages * Thread::getLanguages()
205 {
|
206 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
207
208 Thread * curThrd = Thread::getCurrent();
209 if (curThrd == NULL)
210 return NULL;
211 AcceptLanguages * acceptLangs =
212 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
213 curThrd->dereference_tsd();
214 PEG_METHOD_EXIT();
215 return acceptLangs;
|
216 chuck 1.37 }
217
218 void Thread::setLanguages(AcceptLanguages *langs) //l10n
219 {
|
220 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
221
222 Thread* currentThrd = Thread::getCurrent();
223 if (currentThrd != NULL)
224 {
225 // deletes the old tsd and creates a new one
226 currentThrd->put_tsd("acceptLanguages",
227 language_delete,
228 sizeof(AcceptLanguages *),
229 langs);
230 }
231
232 PEG_METHOD_EXIT();
|
233 chuck 1.37 }
234
235 void Thread::clearLanguages() //l10n
236 {
|
237 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
238
239 Thread * currentThrd = Thread::getCurrent();
240 if (currentThrd != NULL)
241 {
242 // deletes the old tsd
243 currentThrd->delete_tsd("acceptLanguages");
244 }
245
246 PEG_METHOD_EXIT();
|
247 chuck 1.37 }
|
248 kumpf 1.81 // l10n end
|
249 chuck 1.37
|
250 mday 1.52
|
251 kumpf 1.81 ///////////////////////////////////////////////////////////////////////////////
252 //
253 // ThreadPool
254 //
255 ///////////////////////////////////////////////////////////////////////////////
256
257 ThreadPool::ThreadPool(
258 Sint16 initialSize,
259 const char* key,
260 Sint16 minThreads,
261 Sint16 maxThreads,
262 struct timeval& deallocateWait)
263 : _maxThreads(maxThreads),
264 _minThreads(minThreads),
265 _currentThreads(0),
266 _idleThreads(true),
267 _runningThreads(true),
268 _dying(0)
|
269 mday 1.58 {
|
270 kumpf 1.81 _deallocateWait.tv_sec = deallocateWait.tv_sec;
271 _deallocateWait.tv_usec = deallocateWait.tv_usec;
|
272 mday 1.58
|
273 kumpf 1.81 memset(_key, 0x00, 17);
274 if (key != 0)
275 {
276 strncpy(_key, key, 16);
277 }
278
279 if ((_maxThreads > 0) && (_maxThreads < initialSize))
280 {
281 _maxThreads = initialSize;
282 }
|
283 mday 1.58
|
284 kumpf 1.81 if (_minThreads > initialSize)
285 {
286 _minThreads = initialSize;
287 }
|
288 mday 1.52
|
289 kumpf 1.81 for (int i = 0; i < initialSize; i++)
290 {
291 _addToIdleThreadsQueue(_initializeThread());
292 }
293 }
|
294 mday 1.20
|
295 kumpf 1.81 ThreadPool::~ThreadPool()
|
296 mday 1.20 {
|
297 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
298 try
299 {
300 // Set the dying flag so all thread know the destructor has been entered
301 _dying++;
302
303 while (_currentThreads.value() > 0)
304 {
305 Thread* thread = _idleThreads.remove_first();
306 if (thread != 0)
307 {
308 _cleanupThread(thread);
309 _currentThreads--;
310 }
311 else
312 {
313 pegasus_yield();
314 }
315 }
316 }
317 catch (...)
318 kumpf 1.81 {
319 }
|
320 mday 1.20 }
321
|
322 kumpf 1.81 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
323 {
324 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
325
326 try
327 {
|
328 kumpf 1.82 Thread* myself = (Thread *)parm;
329 PEGASUS_ASSERT(myself != 0);
|
330 kumpf 1.81
|
331 kumpf 1.82 // Set myself into thread specific storage
332 // This will allow code to get its own Thread
333 Thread::setCurrent(myself);
|
334 kumpf 1.81
|
335 kumpf 1.82 ThreadPool* pool = (ThreadPool *)myself->get_parm();
336 PEGASUS_ASSERT(pool != 0);
|
337 mike 1.2
|
338 kumpf 1.82 Semaphore* sleep_sem = 0;
339 struct timeval* lastActivityTime = 0;
|
340 chuck 1.39
|
341 kumpf 1.81 try
342 {
|
343 kumpf 1.82 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
|
344 kumpf 1.81 myself->dereference_tsd();
|
345 kumpf 1.82 PEGASUS_ASSERT(sleep_sem != 0);
346
347 lastActivityTime =
348 (struct timeval *)myself->reference_tsd("last activity time");
|
349 kumpf 1.81 myself->dereference_tsd();
|
350 kumpf 1.82 PEGASUS_ASSERT(lastActivityTime != 0);
|
351 kumpf 1.81 }
352 catch (...)
353 {
354 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
355 kumpf 1.82 "ThreadPool::_loop: Failure getting sleep_sem or "
356 "lastActivityTime.");
|
357 kumpf 1.81 PEGASUS_ASSERT(false);
358 pool->_idleThreads.remove(myself);
359 pool->_currentThreads--;
360 PEG_METHOD_EXIT();
361 return((PEGASUS_THREAD_RETURN)1);
362 }
|
363 mday 1.52
|
364 kumpf 1.82 while (1)
|
365 kumpf 1.81 {
|
366 kumpf 1.82 try
367 {
368 sleep_sem->wait();
369 }
370 catch (...)
371 {
372 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
373 "ThreadPool::_loop: failure on sleep_sem->wait().");
374 PEGASUS_ASSERT(false);
375 pool->_idleThreads.remove(myself);
376 pool->_currentThreads--;
377 PEG_METHOD_EXIT();
378 return((PEGASUS_THREAD_RETURN)1);
379 }
380
381 // When we awaken we reside on the _runningThreads queue, not the
382 // _idleThreads queue.
383
384 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *) = 0;
385 void* parm = 0;
386 Semaphore* blocking_sem = 0;
387 kumpf 1.82
388 try
389 {
390 work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *))
391 myself->reference_tsd("work func");
392 myself->dereference_tsd();
393 parm = myself->reference_tsd("work parm");
394 myself->dereference_tsd();
395 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
396 myself->dereference_tsd();
397 }
398 catch (...)
399 {
400 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
401 "ThreadPool::_loop: Failure accessing work func, work parm, "
402 "or blocking sem.");
403 PEGASUS_ASSERT(false);
404 pool->_idleThreads.remove(myself);
405 pool->_currentThreads--;
406 PEG_METHOD_EXIT();
407 return((PEGASUS_THREAD_RETURN)1);
408 kumpf 1.82 }
409
410 if (work == 0)
411 {
|
412 carolann.graves 1.84 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
413 kumpf 1.82 "ThreadPool::_loop: work func is 0, meaning we should exit.");
414 break;
415 }
|
416 mike 1.2
|
417 kumpf 1.82 gettimeofday(lastActivityTime, NULL);
|
418 konrad.r 1.67
|
419 kumpf 1.82 try
420 {
421 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work starting.");
422 work(parm);
423 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work finished.");
424 }
425 catch (Exception & e)
426 {
427 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
428 String("Exception from work in ThreadPool::_loop: ") +
429 e.getMessage());
430 }
|
431 kumpf 1.68 #if !defined(PEGASUS_OS_LSB)
|
432 kumpf 1.82 catch (exception& e)
433 {
434 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
435 String("Exception from work in ThreadPool::_loop: ") +
436 e.what());
437 }
|
438 kumpf 1.68 #endif
|
439 kumpf 1.82 catch (...)
440 {
441 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
442 "Unknown exception from work in ThreadPool::_loop.");
443 }
|
444 kumpf 1.81
|
445 kumpf 1.82 // put myself back onto the available list
446 try
|
447 kumpf 1.57 {
|
448 kumpf 1.82 gettimeofday(lastActivityTime, NULL);
449 if (blocking_sem != 0)
450 {
451 blocking_sem->signal();
452 }
|
453 s.hills 1.49
|
454 kumpf 1.82 Boolean removed = pool->_runningThreads.remove((void *)myself);
455 PEGASUS_ASSERT(removed);
|
456 s.hills 1.49
|
457 kumpf 1.82 pool->_idleThreads.insert_first(myself);
458 }
459 catch (...)
460 {
461 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
462 "ThreadPool::_loop: Adding thread to idle pool failed.");
463 PEGASUS_ASSERT(false);
464 pool->_currentThreads--;
465 PEG_METHOD_EXIT();
466 return((PEGASUS_THREAD_RETURN)1);
467 }
|
468 kumpf 1.81 }
|
469 kumpf 1.82 }
470 catch (const Exception& e)
471 {
472 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
473 "Caught exception: \"" + e.getMessage() + "\". Exiting _loop.");
474 }
475 catch (...)
476 {
477 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
478 "Caught unrecognized exception. Exiting _loop.");
|
479 kumpf 1.81 }
|
480 kumpf 1.14
|
481 kumpf 1.81 PEG_METHOD_EXIT();
482 return((PEGASUS_THREAD_RETURN)0);
|
483 mike 1.2 }
484
|
485 kumpf 1.81 Boolean ThreadPool::allocate_and_awaken(
486 void* parm,
487 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *),
488 Semaphore* blocking)
|
489 mike 1.2 {
|
490 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
491
492 // Allocate_and_awaken will not run if the _dying flag is set.
493 // Once the lock is acquired, ~ThreadPool will not change
494 // the value of _dying until the lock is released.
495
496 try
497 {
498 if (_dying.value())
499 {
500 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
501 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
502 // ATTN: Error result has not yet been defined
503 return true;
504 }
505 struct timeval start;
506 gettimeofday(&start, NULL);
507 Thread* th = 0;
508
509 th = _idleThreads.remove_first();
|
510 kumpf 1.57
|
511 kumpf 1.81 if (th == 0)
512 {
513 if ((_maxThreads == 0) || (_currentThreads < _maxThreads))
514 {
515 th = _initializeThread();
516 }
517 }
518
519 if (th == 0)
520 {
521 // ATTN-DME-P3-20031103: This trace message should not be
522 // be labeled TRC_DISCARDED_DATA, because it does not
523 // necessarily imply that a failure has occurred. However,
524 // this label is being used temporarily to help isolate
525 // the cause of client timeout problems.
|
526 kumpf 1.60
|
527 kumpf 1.81 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
528 "ThreadPool::allocate_and_awaken: Insufficient resources: "
529 " pool = %s, running threads = %d, idle threads = %d",
530 _key, _runningThreads.count(), _idleThreads.count());
531 return false;
532 }
|
533 mike 1.2
|
534 kumpf 1.81 // initialize the thread data with the work function and parameters
535 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
536 "Initializing thread with work function and parameters: parm = %p",
537 parm);
|
538 mike 1.2
|
539 kumpf 1.81 th->delete_tsd("work func");
540 th->put_tsd("work func", NULL,
|
541 kumpf 1.70 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
|
542 kumpf 1.81 (void *)work);
543 th->delete_tsd("work parm");
544 th->put_tsd("work parm", NULL, sizeof(void *), parm);
545 th->delete_tsd("blocking sem");
546 if (blocking != 0)
547 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
548
549 // put the thread on the running list
550 _runningThreads.insert_first(th);
551
552 // signal the thread's sleep semaphore to awaken it
553 Semaphore* sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
554 PEGASUS_ASSERT(sleep_sem != 0);
555
556 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
557 sleep_sem->signal();
558 th->dereference_tsd();
|
559 kumpf 1.57 }
|
560 mday 1.58 catch (...)
|
561 kumpf 1.57 {
|
562 kumpf 1.81 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
563 "ThreadPool::allocate_and_awaken: Operation Failed.");
564 PEG_METHOD_EXIT();
565 // ATTN: Error result has not yet been defined
566 return true;
|
567 kumpf 1.57 }
|
568 kumpf 1.81 PEG_METHOD_EXIT();
569 return true;
|
570 mike 1.2 }
571
|
572 kumpf 1.81 // caller is responsible for only calling this routine during slack periods
573 // but should call it at least once per _deallocateWait interval.
|
574 mday 1.12
|
575 kumpf 1.81 Uint32 ThreadPool::cleanupIdleThreads()
|
576 mike 1.2 {
|
577 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
578
579 Uint32 numThreadsCleanedUp = 0;
580
581 Uint32 numIdleThreads = _idleThreads.count();
582 for (Uint32 i = 0; i < numIdleThreads; i++)
583 {
584 // Do not dip below the minimum thread count
585 if (_currentThreads.value() <= (Uint32)_minThreads)
586 {
587 break;
588 }
589
590 Thread* thread = _idleThreads.remove_last();
591
592 // If there are no more threads in the _idleThreads queue, we're done.
593 if (thread == 0)
594 {
595 break;
596 }
597
598 kumpf 1.81 struct timeval* lastActivityTime;
599 try
600 {
601 lastActivityTime = (struct timeval *)thread->try_reference_tsd(
602 "last activity time");
603 PEGASUS_ASSERT(lastActivityTime != 0);
604 }
605 catch (...)
606 {
607 PEGASUS_ASSERT(false);
608 _idleThreads.insert_last(thread);
609 break;
610 }
611
612 Boolean cleanupThisThread =
613 _timeIntervalExpired(lastActivityTime, &_deallocateWait);
614 thread->dereference_tsd();
615
616 if (cleanupThisThread)
617 {
618 _cleanupThread(thread);
619 kumpf 1.81 _currentThreads--;
620 numThreadsCleanedUp++;
621 }
622 else
623 {
624 _idleThreads.insert_first(thread);
625 }
|
626 konrad.r 1.67 }
|
627 kumpf 1.81
628 PEG_METHOD_EXIT();
629 return numThreadsCleanedUp;
|
630 konrad.r 1.67 }
|
631 mday 1.19
|
632 kumpf 1.81 void ThreadPool::_cleanupThread(Thread* thread)
|
633 mday 1.19 {
|
634 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
635
636 // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
637 thread->delete_tsd("work func");
638 thread->put_tsd(
639 "work func", 0,
640 sizeof(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
641 (void *) 0);
642 thread->delete_tsd("work parm");
643 thread->put_tsd("work parm", 0, sizeof(void *), 0);
644
645 // signal the thread's sleep semaphore to awaken it
646 Semaphore* sleep_sem = (Semaphore *)thread->reference_tsd("sleep sem");
647 PEGASUS_ASSERT(sleep_sem != 0);
648 sleep_sem->signal();
649 thread->dereference_tsd();
650
651 thread->join();
652 delete thread;
653
654 PEG_METHOD_EXIT();
|
655 mday 1.19 }
656
|
657 kumpf 1.81 Boolean ThreadPool::_timeIntervalExpired(
658 struct timeval* start,
659 struct timeval* interval)
|
660 mday 1.19 {
|
661 kumpf 1.81 // never time out if the interval is zero
662 if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
663 {
664 return false;
665 }
666
667 struct timeval now, finish, remaining;
668 Uint32 usec;
669 pegasus_gettimeofday(&now);
670 pegasus_gettimeofday(&remaining); // Avoid valgrind error
671
672 finish.tv_sec = start->tv_sec + interval->tv_sec;
673 usec = start->tv_usec + interval->tv_usec;
674 finish.tv_sec += (usec / 1000000);
675 usec %= 1000000;
676 finish.tv_usec = usec;
677
678 return (timeval_subtract(&remaining, &finish, &now) != 0);
|
679 mday 1.19 }
680
|
681 kumpf 1.81 void ThreadPool::_deleteSemaphore(void *p)
|
682 mday 1.19 {
|
683 kumpf 1.81 delete (Semaphore *)p;
|
684 mday 1.19 }
685
|
686 kumpf 1.81 Thread* ThreadPool::_initializeThread()
|
687 mday 1.19 {
|
688 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
689
690 Thread* th = (Thread *) new Thread(_loop, this, false);
691
692 // allocate a sleep semaphore and pass it in the thread context
693 // initial count is zero, loop function will sleep until
694 // we signal the semaphore
695 Semaphore* sleep_sem = (Semaphore *) new Semaphore(0);
696 th->put_tsd(
697 "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void *)sleep_sem);
698
699 struct timeval* lastActivityTime =
700 (struct timeval *) ::operator new(sizeof(struct timeval));
701 pegasus_gettimeofday(lastActivityTime);
702
703 th->put_tsd("last activity time", thread_data::default_delete,
704 sizeof(struct timeval), (void *)lastActivityTime);
705 // thread will enter _loop() and sleep on sleep_sem until we signal it
706
707 if (!th->run())
708 {
709 kumpf 1.81 delete th;
710 return 0;
711 }
712 _currentThreads++;
713 pegasus_yield();
714
715 PEG_METHOD_EXIT();
716 return th;
|
717 mday 1.19 }
|
718 mike 1.2
|
719 kumpf 1.81 void ThreadPool::_addToIdleThreadsQueue(Thread* th)
720 {
721 if (th == 0)
722 {
723 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
724 "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
725 throw NullPointer();
726 }
727
728 try
729 {
730 _idleThreads.insert_first(th);
731 }
732 catch (...)
733 {
734 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
735 "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_first "
736 "failed.");
737 }
738 }
|
739 mike 1.2
740 PEGASUS_NAMESPACE_END
|