1 karl 1.75 //%2005////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.56 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 mike 1.2 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
13 chip 1.11 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
16 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
|
18 david.dillard 1.83 //
|
19 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
20 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
22 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
25 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Mike Day (mdday@us.ibm.com)
31 //
32 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
33 chip 1.11 // added nsk platform support
|
34 kumpf 1.59 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
35 a.arora 1.64 // Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
|
36 gs.keenan 1.76 // Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
|
37 david.dillard 1.83 // David Dillard, VERITAS Software Corp.
38 // (david.dillard@veritas.com)
|
39 mike 1.2 //
40 //%/////////////////////////////////////////////////////////////////////////////
41
42 #include "Thread.h"
|
43 kumpf 1.68 #include <exception>
|
44 mike 1.2 #include <Pegasus/Common/IPC.h>
|
45 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
46 mike 1.2
47 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
48 chip 1.11 # include "ThreadWindows.cpp"
|
49 mike 1.2 #elif defined(PEGASUS_OS_TYPE_UNIX)
50 # include "ThreadUnix.cpp"
51 #elif defined(PEGASUS_OS_TYPE_NSK)
52 # include "ThreadNsk.cpp"
|
53 gs.keenan 1.76 #elif defined(PEGASUS_OS_VMS)
54 # include "ThreadVms.cpp"
|
55 mike 1.2 #else
56 # error "Unsupported platform"
57 #endif
58
|
59 kumpf 1.69 PEGASUS_USING_STD;
|
60 mike 1.2 PEGASUS_NAMESPACE_BEGIN
61
|
62 mday 1.42
|
63 chip 1.11 void thread_data::default_delete(void * data)
64 {
|
65 mike 1.2 if( data != NULL)
|
66 chip 1.11 ::operator delete(data);
|
67 mike 1.2 }
68
|
69 chuck 1.43 // l10n start
70 void language_delete(void * data)
71 {
72 if( data != NULL)
73 {
|
74 a.arora 1.64 AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
|
75 chuck 1.43 }
76 }
77 // l10n end
78
|
79 mike 1.2 Boolean Thread::_signals_blocked = false;
|
80 chuck 1.37 // l10n
|
81 marek 1.63 #ifndef PEGASUS_OS_ZOS
|
82 w.otsuka 1.74 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = PEGASUS_THREAD_KEY_TYPE(-1);
|
83 marek 1.63 #else
84 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
85 #endif
|
86 chuck 1.37 Boolean Thread::_key_initialized = false;
|
87 chuck 1.41 Boolean Thread::_key_error = false;
|
88 chuck 1.37
|
89 mike 1.2
90 // for non-native implementations
|
91 chip 1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
|
92 david.dillard 1.83 void Thread::cleanup_push( void (*routine)(void *), void *parm)
|
93 mike 1.2 {
|
94 a.arora 1.64 AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
|
95 a.arora 1.65 _cleanup.insert_first(cu.get());
|
96 a.arora 1.64 cu.release();
|
97 mike 1.2 return;
98 }
|
99 kumpf 1.81
|
100 david.dillard 1.83 void Thread::cleanup_pop(Boolean execute)
|
101 mike 1.2 {
|
102 david.dillard 1.83 AutoPtr<cleanup_handler> cu;
|
103 chip 1.11 try
104 {
|
105 kumpf 1.81 cu.reset(_cleanup.remove_first());
|
106 mike 1.2 }
|
107 chip 1.11 catch(IPCException&)
|
108 mike 1.2 {
|
109 kumpf 1.81 PEGASUS_ASSERT(0);
|
110 mike 1.2 }
111 if(execute == true)
|
112 kumpf 1.81 cu->execute();
|
113 mike 1.2 }
|
114 kumpf 1.81
|
115 mike 1.2 #endif
116
117
|
118 david.dillard 1.83 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value)
|
119 mike 1.2
120
|
121 chip 1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
122 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
123 {
124 // execute the cleanup stack and then return
|
125 mike 1.2 while( _cleanup.count() )
126 {
|
127 chip 1.11 try
128 {
|
129 kumpf 1.81 cleanup_pop(true);
|
130 chip 1.11 }
131 catch(IPCException&)
132 {
|
133 kumpf 1.81 PEGASUS_ASSERT(0);
134 break;
|
135 mike 1.2 }
136 }
137 _exit_code = exit_code;
138 exit_thread(exit_code);
|
139 mday 1.4 _handle.thid = 0;
|
140 mike 1.2 }
141
142
143 #endif
144
|
145 chuck 1.37 // l10n start
|
146 chuck 1.39 Sint8 Thread::initializeKey()
147 {
|
148 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
149 if (!Thread::_key_initialized)
150 {
151 if (Thread::_key_error)
152 {
153 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
154 "Thread: ERROR - thread key error");
155 return -1;
156 }
157
158 if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
159 {
160 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
161 "Thread: able to create a thread key");
162 Thread::_key_initialized = true;
163 }
164 else
165 {
166 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
167 "Thread: ERROR - unable to create a thread key");
168 Thread::_key_error = true;
169 kumpf 1.81 return -1;
170 }
171 }
|
172 chuck 1.39
|
173 kumpf 1.81 PEG_METHOD_EXIT();
174 return 0;
|
175 chuck 1.39 }
176
|
177 chuck 1.37 Thread * Thread::getCurrent()
178 {
|
179 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
|
180 chuck 1.40 if (Thread::initializeKey() != 0)
|
181 chuck 1.39 {
|
182 kumpf 1.81 return NULL;
|
183 chuck 1.39 }
|
184 kumpf 1.81 PEG_METHOD_EXIT();
185 return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
|
186 chuck 1.39 }
187
188 void Thread::setCurrent(Thread * thrd)
189 {
|
190 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
191 if (Thread::initializeKey() == 0)
192 {
193 if (pegasus_set_thread_specific(
194 Thread::_platform_thread_key, (void *) thrd) == 0)
|
195 chuck 1.39 {
|
196 kumpf 1.81 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
197 "Successful set Thread * into thread specific storage");
|
198 chuck 1.39 }
199 else
200 {
|
201 kumpf 1.81 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
202 "ERROR: error setting Thread * into thread specific storage");
|
203 chuck 1.39 }
|
204 kumpf 1.81 }
205 PEG_METHOD_EXIT();
|
206 chuck 1.37 }
207
208 AcceptLanguages * Thread::getLanguages()
209 {
|
210 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
211
212 Thread * curThrd = Thread::getCurrent();
213 if (curThrd == NULL)
214 return NULL;
215 AcceptLanguages * acceptLangs =
216 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
217 curThrd->dereference_tsd();
218 PEG_METHOD_EXIT();
219 return acceptLangs;
|
220 chuck 1.37 }
221
222 void Thread::setLanguages(AcceptLanguages *langs) //l10n
223 {
|
224 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
225
226 Thread* currentThrd = Thread::getCurrent();
227 if (currentThrd != NULL)
228 {
229 // deletes the old tsd and creates a new one
230 currentThrd->put_tsd("acceptLanguages",
231 language_delete,
232 sizeof(AcceptLanguages *),
233 langs);
234 }
235
236 PEG_METHOD_EXIT();
|
237 chuck 1.37 }
238
239 void Thread::clearLanguages() //l10n
240 {
|
241 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
242
243 Thread * currentThrd = Thread::getCurrent();
244 if (currentThrd != NULL)
245 {
246 // deletes the old tsd
247 currentThrd->delete_tsd("acceptLanguages");
248 }
249
250 PEG_METHOD_EXIT();
|
251 chuck 1.37 }
|
252 kumpf 1.81 // l10n end
|
253 chuck 1.37
|
254 mday 1.52
|
255 kumpf 1.81 ///////////////////////////////////////////////////////////////////////////////
256 //
257 // ThreadPool
258 //
259 ///////////////////////////////////////////////////////////////////////////////
260
261 ThreadPool::ThreadPool(
262 Sint16 initialSize,
263 const char* key,
264 Sint16 minThreads,
265 Sint16 maxThreads,
266 struct timeval& deallocateWait)
267 : _maxThreads(maxThreads),
268 _minThreads(minThreads),
269 _currentThreads(0),
270 _idleThreads(true),
271 _runningThreads(true),
272 _dying(0)
|
273 mday 1.58 {
|
274 kumpf 1.81 _deallocateWait.tv_sec = deallocateWait.tv_sec;
275 _deallocateWait.tv_usec = deallocateWait.tv_usec;
|
276 mday 1.58
|
277 kumpf 1.81 memset(_key, 0x00, 17);
278 if (key != 0)
279 {
280 strncpy(_key, key, 16);
281 }
282
283 if ((_maxThreads > 0) && (_maxThreads < initialSize))
284 {
285 _maxThreads = initialSize;
286 }
|
287 mday 1.58
|
288 kumpf 1.81 if (_minThreads > initialSize)
289 {
290 _minThreads = initialSize;
291 }
|
292 mday 1.52
|
293 kumpf 1.81 for (int i = 0; i < initialSize; i++)
294 {
295 _addToIdleThreadsQueue(_initializeThread());
296 }
297 }
|
298 mday 1.20
|
299 kumpf 1.81 ThreadPool::~ThreadPool()
|
300 mday 1.20 {
|
301 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
302 try
303 {
304 // Set the dying flag so all thread know the destructor has been entered
305 _dying++;
306
307 while (_currentThreads.value() > 0)
308 {
309 Thread* thread = _idleThreads.remove_first();
310 if (thread != 0)
311 {
312 _cleanupThread(thread);
313 _currentThreads--;
314 }
315 else
316 {
317 pegasus_yield();
318 }
319 }
320 }
321 catch (...)
322 kumpf 1.81 {
323 }
|
324 mday 1.20 }
325
|
326 kumpf 1.81 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
327 {
328 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
329
330 try
331 {
|
332 kumpf 1.82 Thread* myself = (Thread *)parm;
333 PEGASUS_ASSERT(myself != 0);
|
334 kumpf 1.81
|
335 kumpf 1.82 // Set myself into thread specific storage
336 // This will allow code to get its own Thread
337 Thread::setCurrent(myself);
|
338 kumpf 1.81
|
339 kumpf 1.82 ThreadPool* pool = (ThreadPool *)myself->get_parm();
340 PEGASUS_ASSERT(pool != 0);
|
341 mike 1.2
|
342 kumpf 1.82 Semaphore* sleep_sem = 0;
343 struct timeval* lastActivityTime = 0;
|
344 chuck 1.39
|
345 kumpf 1.81 try
346 {
|
347 kumpf 1.82 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
|
348 kumpf 1.81 myself->dereference_tsd();
|
349 kumpf 1.82 PEGASUS_ASSERT(sleep_sem != 0);
350
351 lastActivityTime =
352 (struct timeval *)myself->reference_tsd("last activity time");
|
353 kumpf 1.81 myself->dereference_tsd();
|
354 kumpf 1.82 PEGASUS_ASSERT(lastActivityTime != 0);
|
355 kumpf 1.81 }
356 catch (...)
357 {
358 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
359 kumpf 1.82 "ThreadPool::_loop: Failure getting sleep_sem or "
360 "lastActivityTime.");
|
361 kumpf 1.81 PEGASUS_ASSERT(false);
362 pool->_idleThreads.remove(myself);
363 pool->_currentThreads--;
364 PEG_METHOD_EXIT();
365 return((PEGASUS_THREAD_RETURN)1);
366 }
|
367 mday 1.52
|
368 kumpf 1.82 while (1)
|
369 kumpf 1.81 {
|
370 kumpf 1.82 try
371 {
372 sleep_sem->wait();
373 }
374 catch (...)
375 {
376 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
377 "ThreadPool::_loop: failure on sleep_sem->wait().");
378 PEGASUS_ASSERT(false);
379 pool->_idleThreads.remove(myself);
380 pool->_currentThreads--;
381 PEG_METHOD_EXIT();
382 return((PEGASUS_THREAD_RETURN)1);
383 }
384
385 // When we awaken we reside on the _runningThreads queue, not the
386 // _idleThreads queue.
387
388 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *) = 0;
389 void* parm = 0;
390 Semaphore* blocking_sem = 0;
391 kumpf 1.82
392 try
393 {
394 work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *))
395 myself->reference_tsd("work func");
396 myself->dereference_tsd();
397 parm = myself->reference_tsd("work parm");
398 myself->dereference_tsd();
399 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
400 myself->dereference_tsd();
401 }
402 catch (...)
403 {
404 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
405 "ThreadPool::_loop: Failure accessing work func, work parm, "
406 "or blocking sem.");
407 PEGASUS_ASSERT(false);
408 pool->_idleThreads.remove(myself);
409 pool->_currentThreads--;
410 PEG_METHOD_EXIT();
411 return((PEGASUS_THREAD_RETURN)1);
412 kumpf 1.82 }
413
414 if (work == 0)
415 {
416 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
417 "ThreadPool::_loop: work func is 0, meaning we should exit.");
418 break;
419 }
|
420 mike 1.2
|
421 kumpf 1.82 gettimeofday(lastActivityTime, NULL);
|
422 konrad.r 1.67
|
423 kumpf 1.82 try
424 {
425 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work starting.");
426 work(parm);
427 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work finished.");
428 }
429 catch (Exception & e)
430 {
431 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
432 String("Exception from work in ThreadPool::_loop: ") +
433 e.getMessage());
434 }
|
435 kumpf 1.68 #if !defined(PEGASUS_OS_LSB)
|
436 kumpf 1.82 catch (exception& e)
437 {
438 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
439 String("Exception from work in ThreadPool::_loop: ") +
440 e.what());
441 }
|
442 kumpf 1.68 #endif
|
443 kumpf 1.82 catch (...)
444 {
445 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
446 "Unknown exception from work in ThreadPool::_loop.");
447 }
|
448 kumpf 1.81
|
449 kumpf 1.82 // put myself back onto the available list
450 try
|
451 kumpf 1.57 {
|
452 kumpf 1.82 gettimeofday(lastActivityTime, NULL);
453 if (blocking_sem != 0)
454 {
455 blocking_sem->signal();
456 }
|
457 s.hills 1.49
|
458 kumpf 1.82 Boolean removed = pool->_runningThreads.remove((void *)myself);
459 PEGASUS_ASSERT(removed);
|
460 s.hills 1.49
|
461 kumpf 1.82 pool->_idleThreads.insert_first(myself);
462 }
463 catch (...)
464 {
465 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
466 "ThreadPool::_loop: Adding thread to idle pool failed.");
467 PEGASUS_ASSERT(false);
468 pool->_currentThreads--;
469 PEG_METHOD_EXIT();
470 return((PEGASUS_THREAD_RETURN)1);
471 }
|
472 kumpf 1.81 }
|
473 kumpf 1.82 }
474 catch (const Exception& e)
475 {
476 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
477 "Caught exception: \"" + e.getMessage() + "\". Exiting _loop.");
478 }
479 catch (...)
480 {
481 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
482 "Caught unrecognized exception. Exiting _loop.");
|
483 kumpf 1.81 }
|
484 kumpf 1.14
|
485 kumpf 1.81 PEG_METHOD_EXIT();
486 return((PEGASUS_THREAD_RETURN)0);
|
487 mike 1.2 }
488
|
489 kumpf 1.81 Boolean ThreadPool::allocate_and_awaken(
490 void* parm,
491 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *),
492 Semaphore* blocking)
|
493 mike 1.2 {
|
494 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
495
496 // Allocate_and_awaken will not run if the _dying flag is set.
497 // Once the lock is acquired, ~ThreadPool will not change
498 // the value of _dying until the lock is released.
499
500 try
501 {
502 if (_dying.value())
503 {
504 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
505 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
506 // ATTN: Error result has not yet been defined
507 return true;
508 }
509 struct timeval start;
510 gettimeofday(&start, NULL);
511 Thread* th = 0;
512
513 th = _idleThreads.remove_first();
|
514 kumpf 1.57
|
515 kumpf 1.81 if (th == 0)
516 {
517 if ((_maxThreads == 0) || (_currentThreads < _maxThreads))
518 {
519 th = _initializeThread();
520 }
521 }
522
523 if (th == 0)
524 {
525 // ATTN-DME-P3-20031103: This trace message should not be
526 // be labeled TRC_DISCARDED_DATA, because it does not
527 // necessarily imply that a failure has occurred. However,
528 // this label is being used temporarily to help isolate
529 // the cause of client timeout problems.
|
530 kumpf 1.60
|
531 kumpf 1.81 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
532 "ThreadPool::allocate_and_awaken: Insufficient resources: "
533 " pool = %s, running threads = %d, idle threads = %d",
534 _key, _runningThreads.count(), _idleThreads.count());
535 return false;
536 }
|
537 mike 1.2
|
538 kumpf 1.81 // initialize the thread data with the work function and parameters
539 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
540 "Initializing thread with work function and parameters: parm = %p",
541 parm);
|
542 mike 1.2
|
543 kumpf 1.81 th->delete_tsd("work func");
544 th->put_tsd("work func", NULL,
|
545 kumpf 1.70 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
|
546 kumpf 1.81 (void *)work);
547 th->delete_tsd("work parm");
548 th->put_tsd("work parm", NULL, sizeof(void *), parm);
549 th->delete_tsd("blocking sem");
550 if (blocking != 0)
551 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
552
553 // put the thread on the running list
554 _runningThreads.insert_first(th);
555
556 // signal the thread's sleep semaphore to awaken it
557 Semaphore* sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
558 PEGASUS_ASSERT(sleep_sem != 0);
559
560 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
561 sleep_sem->signal();
562 th->dereference_tsd();
|
563 kumpf 1.57 }
|
564 mday 1.58 catch (...)
|
565 kumpf 1.57 {
|
566 kumpf 1.81 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
567 "ThreadPool::allocate_and_awaken: Operation Failed.");
568 PEG_METHOD_EXIT();
569 // ATTN: Error result has not yet been defined
570 return true;
|
571 kumpf 1.57 }
|
572 kumpf 1.81 PEG_METHOD_EXIT();
573 return true;
|
574 mike 1.2 }
575
|
576 kumpf 1.81 // caller is responsible for only calling this routine during slack periods
577 // but should call it at least once per _deallocateWait interval.
|
578 mday 1.12
|
579 kumpf 1.81 Uint32 ThreadPool::cleanupIdleThreads()
|
580 mike 1.2 {
|
581 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
582
583 Uint32 numThreadsCleanedUp = 0;
584
585 Uint32 numIdleThreads = _idleThreads.count();
586 for (Uint32 i = 0; i < numIdleThreads; i++)
587 {
588 // Do not dip below the minimum thread count
589 if (_currentThreads.value() <= (Uint32)_minThreads)
590 {
591 break;
592 }
593
594 Thread* thread = _idleThreads.remove_last();
595
596 // If there are no more threads in the _idleThreads queue, we're done.
597 if (thread == 0)
598 {
599 break;
600 }
601
602 kumpf 1.81 struct timeval* lastActivityTime;
603 try
604 {
605 lastActivityTime = (struct timeval *)thread->try_reference_tsd(
606 "last activity time");
607 PEGASUS_ASSERT(lastActivityTime != 0);
608 }
609 catch (...)
610 {
611 PEGASUS_ASSERT(false);
612 _idleThreads.insert_last(thread);
613 break;
614 }
615
616 Boolean cleanupThisThread =
617 _timeIntervalExpired(lastActivityTime, &_deallocateWait);
618 thread->dereference_tsd();
619
620 if (cleanupThisThread)
621 {
622 _cleanupThread(thread);
623 kumpf 1.81 _currentThreads--;
624 numThreadsCleanedUp++;
625 }
626 else
627 {
628 _idleThreads.insert_first(thread);
629 }
|
630 konrad.r 1.67 }
|
631 kumpf 1.81
632 PEG_METHOD_EXIT();
633 return numThreadsCleanedUp;
|
634 konrad.r 1.67 }
|
635 mday 1.19
|
636 kumpf 1.81 void ThreadPool::_cleanupThread(Thread* thread)
|
637 mday 1.19 {
|
638 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
639
640 // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
641 thread->delete_tsd("work func");
642 thread->put_tsd(
643 "work func", 0,
644 sizeof(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
645 (void *) 0);
646 thread->delete_tsd("work parm");
647 thread->put_tsd("work parm", 0, sizeof(void *), 0);
648
649 // signal the thread's sleep semaphore to awaken it
650 Semaphore* sleep_sem = (Semaphore *)thread->reference_tsd("sleep sem");
651 PEGASUS_ASSERT(sleep_sem != 0);
652 sleep_sem->signal();
653 thread->dereference_tsd();
654
655 thread->join();
656 delete thread;
657
658 PEG_METHOD_EXIT();
|
659 mday 1.19 }
660
|
661 kumpf 1.81 Boolean ThreadPool::_timeIntervalExpired(
662 struct timeval* start,
663 struct timeval* interval)
|
664 mday 1.19 {
|
665 kumpf 1.81 // never time out if the interval is zero
666 if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
667 {
668 return false;
669 }
670
671 struct timeval now, finish, remaining;
672 Uint32 usec;
673 pegasus_gettimeofday(&now);
674 pegasus_gettimeofday(&remaining); // Avoid valgrind error
675
676 finish.tv_sec = start->tv_sec + interval->tv_sec;
677 usec = start->tv_usec + interval->tv_usec;
678 finish.tv_sec += (usec / 1000000);
679 usec %= 1000000;
680 finish.tv_usec = usec;
681
682 return (timeval_subtract(&remaining, &finish, &now) != 0);
|
683 mday 1.19 }
684
|
685 kumpf 1.81 void ThreadPool::_deleteSemaphore(void *p)
|
686 mday 1.19 {
|
687 kumpf 1.81 delete (Semaphore *)p;
|
688 mday 1.19 }
689
|
690 kumpf 1.81 Thread* ThreadPool::_initializeThread()
|
691 mday 1.19 {
|
692 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
693
694 Thread* th = (Thread *) new Thread(_loop, this, false);
695
696 // allocate a sleep semaphore and pass it in the thread context
697 // initial count is zero, loop function will sleep until
698 // we signal the semaphore
699 Semaphore* sleep_sem = (Semaphore *) new Semaphore(0);
700 th->put_tsd(
701 "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void *)sleep_sem);
702
703 struct timeval* lastActivityTime =
704 (struct timeval *) ::operator new(sizeof(struct timeval));
705 pegasus_gettimeofday(lastActivityTime);
706
707 th->put_tsd("last activity time", thread_data::default_delete,
708 sizeof(struct timeval), (void *)lastActivityTime);
709 // thread will enter _loop() and sleep on sleep_sem until we signal it
710
711 if (!th->run())
712 {
713 kumpf 1.81 delete th;
714 return 0;
715 }
716 _currentThreads++;
717 pegasus_yield();
718
719 PEG_METHOD_EXIT();
720 return th;
|
721 mday 1.19 }
|
722 mike 1.2
|
723 kumpf 1.81 void ThreadPool::_addToIdleThreadsQueue(Thread* th)
724 {
725 if (th == 0)
726 {
727 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
728 "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
729 throw NullPointer();
730 }
731
732 try
733 {
734 _idleThreads.insert_first(th);
735 }
736 catch (...)
737 {
738 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
739 "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_first "
740 "failed.");
741 }
742 }
|
743 mike 1.2
744 PEGASUS_NAMESPACE_END
|