1 karl 1.89 //%2006////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.56 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.89 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mike 1.2 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
15 chip 1.11 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
18 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.89 //
|
21 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
24 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
27 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Mike Day (mdday@us.ibm.com)
33 //
34 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
35 chip 1.11 // added nsk platform support
|
36 kumpf 1.59 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
37 a.arora 1.64 // Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
|
38 gs.keenan 1.76 // Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
|
39 david.dillard 1.83 // David Dillard, VERITAS Software Corp.
40 // (david.dillard@veritas.com)
|
41 mike 1.2 //
42 //%/////////////////////////////////////////////////////////////////////////////
43
44 #include "Thread.h"
|
45 kumpf 1.68 #include <exception>
|
46 mike 1.2 #include <Pegasus/Common/IPC.h>
|
47 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
48 mike 1.2
49 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
50 chip 1.11 # include "ThreadWindows.cpp"
|
51 mike 1.2 #elif defined(PEGASUS_OS_TYPE_UNIX)
52 # include "ThreadUnix.cpp"
53 #elif defined(PEGASUS_OS_TYPE_NSK)
54 # include "ThreadNsk.cpp"
|
55 gs.keenan 1.76 #elif defined(PEGASUS_OS_VMS)
56 # include "ThreadVms.cpp"
|
57 mike 1.2 #else
58 # error "Unsupported platform"
59 #endif
60
|
61 kumpf 1.69 PEGASUS_USING_STD;
|
62 mike 1.2 PEGASUS_NAMESPACE_BEGIN
63
|
64 mday 1.42
|
65 chip 1.11 void thread_data::default_delete(void * data)
66 {
|
67 mike 1.2 if( data != NULL)
|
68 chip 1.11 ::operator delete(data);
|
69 mike 1.2 }
70
|
71 chuck 1.43 // l10n start
72 void language_delete(void * data)
73 {
74 if( data != NULL)
75 {
|
76 kumpf 1.88 AutoPtr<AcceptLanguageList> al(static_cast<AcceptLanguageList *>(data));
|
77 chuck 1.43 }
78 }
79 // l10n end
80
|
81 mike 1.2 Boolean Thread::_signals_blocked = false;
|
82 chuck 1.37 // l10n
|
83 marek 1.63 #ifndef PEGASUS_OS_ZOS
|
84 w.otsuka 1.74 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = PEGASUS_THREAD_KEY_TYPE(-1);
|
85 marek 1.63 #else
86 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
87 #endif
|
88 chuck 1.37 Boolean Thread::_key_initialized = false;
|
89 chuck 1.41 Boolean Thread::_key_error = false;
|
90 chuck 1.37
|
91 mike 1.2
|
92 david.dillard 1.83 void Thread::cleanup_push( void (*routine)(void *), void *parm)
|
93 mike 1.2 {
|
94 a.arora 1.64 AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
|
95 a.arora 1.65 _cleanup.insert_first(cu.get());
|
96 a.arora 1.64 cu.release();
|
97 mike 1.2 return;
98 }
|
99 kumpf 1.81
|
100 david.dillard 1.83 void Thread::cleanup_pop(Boolean execute)
|
101 mike 1.2 {
|
102 david.dillard 1.83 AutoPtr<cleanup_handler> cu;
|
103 chip 1.11 try
104 {
|
105 kumpf 1.81 cu.reset(_cleanup.remove_first());
|
106 mike 1.2 }
|
107 chip 1.11 catch(IPCException&)
|
108 mike 1.2 {
|
109 kumpf 1.81 PEGASUS_ASSERT(0);
|
110 mike 1.2 }
111 if(execute == true)
|
112 kumpf 1.81 cu->execute();
|
113 mike 1.2 }
|
114 kumpf 1.81
|
115 mike 1.2
|
116 david.dillard 1.83 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value)
|
117 mike 1.2
118
|
119 chip 1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
120 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
121 {
122 // execute the cleanup stack and then return
|
123 mike 1.2 while( _cleanup.count() )
124 {
|
125 chip 1.11 try
126 {
|
127 kumpf 1.81 cleanup_pop(true);
|
128 chip 1.11 }
129 catch(IPCException&)
130 {
|
131 kumpf 1.81 PEGASUS_ASSERT(0);
132 break;
|
133 mike 1.2 }
134 }
135 _exit_code = exit_code;
136 exit_thread(exit_code);
|
137 mday 1.4 _handle.thid = 0;
|
138 mike 1.2 }
139
140
141 #endif
142
|
143 chuck 1.37 // l10n start
|
144 chuck 1.39 Sint8 Thread::initializeKey()
145 {
|
146 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
147 if (!Thread::_key_initialized)
148 {
149 if (Thread::_key_error)
150 {
151 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
152 "Thread: ERROR - thread key error");
153 return -1;
154 }
155
156 if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
157 {
158 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
159 "Thread: able to create a thread key");
160 Thread::_key_initialized = true;
161 }
162 else
163 {
164 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
165 "Thread: ERROR - unable to create a thread key");
166 Thread::_key_error = true;
167 kumpf 1.81 return -1;
168 }
169 }
|
170 chuck 1.39
|
171 kumpf 1.81 PEG_METHOD_EXIT();
172 return 0;
|
173 chuck 1.39 }
174
|
175 chuck 1.37 Thread * Thread::getCurrent()
176 {
|
177 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
|
178 chuck 1.40 if (Thread::initializeKey() != 0)
|
179 chuck 1.39 {
|
180 kumpf 1.81 return NULL;
|
181 chuck 1.39 }
|
182 kumpf 1.81 PEG_METHOD_EXIT();
183 return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
|
184 chuck 1.39 }
185
186 void Thread::setCurrent(Thread * thrd)
187 {
|
188 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
189 if (Thread::initializeKey() == 0)
190 {
191 if (pegasus_set_thread_specific(
192 Thread::_platform_thread_key, (void *) thrd) == 0)
|
193 chuck 1.39 {
|
194 kumpf 1.81 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
195 "Successful set Thread * into thread specific storage");
|
196 chuck 1.39 }
197 else
198 {
|
199 kumpf 1.81 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
200 "ERROR: error setting Thread * into thread specific storage");
|
201 chuck 1.39 }
|
202 kumpf 1.81 }
203 PEG_METHOD_EXIT();
|
204 chuck 1.37 }
205
|
206 kumpf 1.88 AcceptLanguageList * Thread::getLanguages()
|
207 chuck 1.37 {
|
208 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
209
210 Thread * curThrd = Thread::getCurrent();
211 if (curThrd == NULL)
212 return NULL;
|
213 kumpf 1.88 AcceptLanguageList * acceptLangs =
214 (AcceptLanguageList *)curThrd->reference_tsd("acceptLanguages");
|
215 kumpf 1.81 curThrd->dereference_tsd();
216 PEG_METHOD_EXIT();
217 return acceptLangs;
|
218 chuck 1.37 }
219
|
220 kumpf 1.88 void Thread::setLanguages(AcceptLanguageList *langs) //l10n
|
221 chuck 1.37 {
|
222 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
223
224 Thread* currentThrd = Thread::getCurrent();
225 if (currentThrd != NULL)
226 {
227 // deletes the old tsd and creates a new one
228 currentThrd->put_tsd("acceptLanguages",
229 language_delete,
|
230 kumpf 1.88 sizeof(AcceptLanguageList *),
|
231 kumpf 1.81 langs);
232 }
233
234 PEG_METHOD_EXIT();
|
235 chuck 1.37 }
236
237 void Thread::clearLanguages() //l10n
238 {
|
239 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
240
241 Thread * currentThrd = Thread::getCurrent();
242 if (currentThrd != NULL)
243 {
244 // deletes the old tsd
245 currentThrd->delete_tsd("acceptLanguages");
246 }
247
248 PEG_METHOD_EXIT();
|
249 chuck 1.37 }
|
250 kumpf 1.81 // l10n end
|
251 chuck 1.37
|
252 mday 1.52
|
253 kumpf 1.81 ///////////////////////////////////////////////////////////////////////////////
254 //
255 // ThreadPool
256 //
257 ///////////////////////////////////////////////////////////////////////////////
258
259 ThreadPool::ThreadPool(
260 Sint16 initialSize,
261 const char* key,
262 Sint16 minThreads,
263 Sint16 maxThreads,
264 struct timeval& deallocateWait)
265 : _maxThreads(maxThreads),
266 _minThreads(minThreads),
267 _currentThreads(0),
268 _idleThreads(true),
269 _runningThreads(true),
270 _dying(0)
|
271 mday 1.58 {
|
272 kumpf 1.81 _deallocateWait.tv_sec = deallocateWait.tv_sec;
273 _deallocateWait.tv_usec = deallocateWait.tv_usec;
|
274 mday 1.58
|
275 kumpf 1.81 memset(_key, 0x00, 17);
276 if (key != 0)
277 {
278 strncpy(_key, key, 16);
279 }
280
281 if ((_maxThreads > 0) && (_maxThreads < initialSize))
282 {
283 _maxThreads = initialSize;
284 }
|
285 mday 1.58
|
286 kumpf 1.81 if (_minThreads > initialSize)
287 {
288 _minThreads = initialSize;
289 }
|
290 mday 1.52
|
291 kumpf 1.81 for (int i = 0; i < initialSize; i++)
292 {
293 _addToIdleThreadsQueue(_initializeThread());
294 }
295 }
|
296 mday 1.20
|
297 kumpf 1.81 ThreadPool::~ThreadPool()
|
298 mday 1.20 {
|
299 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
|
300 konrad.r 1.86
|
301 kumpf 1.81 try
302 {
303 // Set the dying flag so all thread know the destructor has been entered
304 _dying++;
|
305 konrad.r 1.86 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
|
306 mike 1.87 "Cleaning up %d idle threads. ", _currentThreads.get());
307 while (_currentThreads.get() > 0)
|
308 kumpf 1.81 {
309 Thread* thread = _idleThreads.remove_first();
310 if (thread != 0)
311 {
312 _cleanupThread(thread);
313 _currentThreads--;
314 }
315 else
316 {
317 pegasus_yield();
318 }
319 }
320 }
321 catch (...)
322 {
323 }
|
324 mday 1.20 }
325
|
326 kumpf 1.81 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
327 {
328 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
329
330 try
331 {
|
332 kumpf 1.82 Thread* myself = (Thread *)parm;
333 PEGASUS_ASSERT(myself != 0);
|
334 kumpf 1.81
|
335 kumpf 1.82 // Set myself into thread specific storage
336 // This will allow code to get its own Thread
337 Thread::setCurrent(myself);
|
338 kumpf 1.81
|
339 kumpf 1.82 ThreadPool* pool = (ThreadPool *)myself->get_parm();
340 PEGASUS_ASSERT(pool != 0);
|
341 mike 1.2
|
342 kumpf 1.82 Semaphore* sleep_sem = 0;
343 struct timeval* lastActivityTime = 0;
|
344 chuck 1.39
|
345 kumpf 1.81 try
346 {
|
347 kumpf 1.82 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
|
348 kumpf 1.81 myself->dereference_tsd();
|
349 kumpf 1.82 PEGASUS_ASSERT(sleep_sem != 0);
350
351 lastActivityTime =
352 (struct timeval *)myself->reference_tsd("last activity time");
|
353 kumpf 1.81 myself->dereference_tsd();
|
354 kumpf 1.82 PEGASUS_ASSERT(lastActivityTime != 0);
|
355 kumpf 1.81 }
356 catch (...)
357 {
358 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
359 kumpf 1.82 "ThreadPool::_loop: Failure getting sleep_sem or "
360 "lastActivityTime.");
|
361 kumpf 1.81 PEGASUS_ASSERT(false);
362 pool->_idleThreads.remove(myself);
363 pool->_currentThreads--;
364 PEG_METHOD_EXIT();
365 return((PEGASUS_THREAD_RETURN)1);
366 }
|
367 mday 1.52
|
368 kumpf 1.82 while (1)
|
369 kumpf 1.81 {
|
370 kumpf 1.82 try
371 {
372 sleep_sem->wait();
373 }
374 catch (...)
375 {
376 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
377 "ThreadPool::_loop: failure on sleep_sem->wait().");
378 PEGASUS_ASSERT(false);
379 pool->_idleThreads.remove(myself);
380 pool->_currentThreads--;
381 PEG_METHOD_EXIT();
382 return((PEGASUS_THREAD_RETURN)1);
383 }
384
385 // When we awaken we reside on the _runningThreads queue, not the
386 // _idleThreads queue.
387
388 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *) = 0;
389 void* parm = 0;
390 Semaphore* blocking_sem = 0;
391 kumpf 1.82
392 try
393 {
394 work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *))
395 myself->reference_tsd("work func");
396 myself->dereference_tsd();
397 parm = myself->reference_tsd("work parm");
398 myself->dereference_tsd();
399 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
400 myself->dereference_tsd();
401 }
402 catch (...)
403 {
404 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
405 "ThreadPool::_loop: Failure accessing work func, work parm, "
406 "or blocking sem.");
407 PEGASUS_ASSERT(false);
408 pool->_idleThreads.remove(myself);
409 pool->_currentThreads--;
410 PEG_METHOD_EXIT();
411 return((PEGASUS_THREAD_RETURN)1);
412 kumpf 1.82 }
413
414 if (work == 0)
415 {
|
416 carolann.graves 1.84 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
417 kumpf 1.82 "ThreadPool::_loop: work func is 0, meaning we should exit.");
418 break;
419 }
|
420 mike 1.2
|
421 kumpf 1.82 gettimeofday(lastActivityTime, NULL);
|
422 konrad.r 1.67
|
423 kumpf 1.82 try
424 {
425 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work starting.");
426 work(parm);
427 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work finished.");
428 }
429 catch (Exception & e)
430 {
431 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
432 String("Exception from work in ThreadPool::_loop: ") +
433 e.getMessage());
434 }
|
435 kumpf 1.68 #if !defined(PEGASUS_OS_LSB)
|
436 konrad.r 1.86 catch (const exception& e)
|
437 kumpf 1.82 {
438 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
439 String("Exception from work in ThreadPool::_loop: ") +
440 e.what());
441 }
|
442 kumpf 1.68 #endif
|
443 kumpf 1.82 catch (...)
444 {
445 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
446 "Unknown exception from work in ThreadPool::_loop.");
447 }
|
448 kumpf 1.81
|
449 kumpf 1.82 // put myself back onto the available list
450 try
|
451 kumpf 1.57 {
|
452 kumpf 1.82 gettimeofday(lastActivityTime, NULL);
453 if (blocking_sem != 0)
454 {
455 blocking_sem->signal();
456 }
|
457 s.hills 1.49
|
458 kumpf 1.82 Boolean removed = pool->_runningThreads.remove((void *)myself);
459 PEGASUS_ASSERT(removed);
|
460 s.hills 1.49
|
461 kumpf 1.82 pool->_idleThreads.insert_first(myself);
462 }
463 catch (...)
464 {
465 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
466 "ThreadPool::_loop: Adding thread to idle pool failed.");
467 PEGASUS_ASSERT(false);
468 pool->_currentThreads--;
469 PEG_METHOD_EXIT();
470 return((PEGASUS_THREAD_RETURN)1);
471 }
|
472 kumpf 1.81 }
|
473 kumpf 1.82 }
474 catch (const Exception& e)
475 {
476 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
477 "Caught exception: \"" + e.getMessage() + "\". Exiting _loop.");
478 }
479 catch (...)
480 {
481 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
482 "Caught unrecognized exception. Exiting _loop.");
|
483 kumpf 1.81 }
|
484 kumpf 1.14
|
485 kumpf 1.81 PEG_METHOD_EXIT();
486 return((PEGASUS_THREAD_RETURN)0);
|
487 mike 1.2 }
488
|
489 konrad.r 1.86 ThreadStatus ThreadPool::allocate_and_awaken(
|
490 kumpf 1.81 void* parm,
491 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *),
492 Semaphore* blocking)
|
493 mike 1.2 {
|
494 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
495
496 // Allocate_and_awaken will not run if the _dying flag is set.
497 // Once the lock is acquired, ~ThreadPool will not change
498 // the value of _dying until the lock is released.
499
500 try
501 {
|
502 mike 1.87 if (_dying.get())
|
503 kumpf 1.81 {
504 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
505 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
|
506 konrad.r 1.86 return PEGASUS_THREAD_UNAVAILABLE;
|
507 kumpf 1.81 }
508 struct timeval start;
509 gettimeofday(&start, NULL);
510 Thread* th = 0;
511
512 th = _idleThreads.remove_first();
|
513 kumpf 1.57
|
514 kumpf 1.81 if (th == 0)
515 {
|
516 mike 1.87 if ((_maxThreads == 0) ||
517 (_currentThreads.get() < Uint32(_maxThreads)))
|
518 kumpf 1.81 {
519 th = _initializeThread();
520 }
521 }
522
523 if (th == 0)
524 {
|
525 kumpf 1.89.2.1 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
|
526 kumpf 1.81 "ThreadPool::allocate_and_awaken: Insufficient resources: "
527 " pool = %s, running threads = %d, idle threads = %d",
528 _key, _runningThreads.count(), _idleThreads.count());
|
529 konrad.r 1.86 return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
|
530 kumpf 1.81 }
|
531 mike 1.2
|
532 kumpf 1.81 // initialize the thread data with the work function and parameters
533 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
534 "Initializing thread with work function and parameters: parm = %p",
535 parm);
|
536 mike 1.2
|
537 kumpf 1.81 th->delete_tsd("work func");
538 th->put_tsd("work func", NULL,
|
539 kumpf 1.70 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
|
540 kumpf 1.81 (void *)work);
541 th->delete_tsd("work parm");
542 th->put_tsd("work parm", NULL, sizeof(void *), parm);
543 th->delete_tsd("blocking sem");
544 if (blocking != 0)
545 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
546
547 // put the thread on the running list
548 _runningThreads.insert_first(th);
549
550 // signal the thread's sleep semaphore to awaken it
551 Semaphore* sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
552 PEGASUS_ASSERT(sleep_sem != 0);
553
554 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
555 sleep_sem->signal();
556 th->dereference_tsd();
|
557 kumpf 1.57 }
|
558 mday 1.58 catch (...)
|
559 kumpf 1.57 {
|
560 kumpf 1.81 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
561 "ThreadPool::allocate_and_awaken: Operation Failed.");
562 PEG_METHOD_EXIT();
563 // ATTN: Error result has not yet been defined
|
564 konrad.r 1.86 return PEGASUS_THREAD_SETUP_FAILURE;
|
565 kumpf 1.57 }
|
566 kumpf 1.81 PEG_METHOD_EXIT();
|
567 konrad.r 1.86 return PEGASUS_THREAD_OK;
|
568 mike 1.2 }
569
|
570 kumpf 1.81 // caller is responsible for only calling this routine during slack periods
571 // but should call it at least once per _deallocateWait interval.
|
572 mday 1.12
|
573 kumpf 1.81 Uint32 ThreadPool::cleanupIdleThreads()
|
574 mike 1.2 {
|
575 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
576
577 Uint32 numThreadsCleanedUp = 0;
578
579 Uint32 numIdleThreads = _idleThreads.count();
580 for (Uint32 i = 0; i < numIdleThreads; i++)
581 {
582 // Do not dip below the minimum thread count
|
583 mike 1.87 if (_currentThreads.get() <= (Uint32)_minThreads)
|
584 kumpf 1.81 {
585 break;
586 }
587
588 Thread* thread = _idleThreads.remove_last();
589
590 // If there are no more threads in the _idleThreads queue, we're done.
591 if (thread == 0)
592 {
593 break;
594 }
595
596 struct timeval* lastActivityTime;
597 try
598 {
599 lastActivityTime = (struct timeval *)thread->try_reference_tsd(
600 "last activity time");
601 PEGASUS_ASSERT(lastActivityTime != 0);
602 }
603 catch (...)
604 {
605 kumpf 1.81 PEGASUS_ASSERT(false);
606 _idleThreads.insert_last(thread);
607 break;
608 }
609
610 Boolean cleanupThisThread =
611 _timeIntervalExpired(lastActivityTime, &_deallocateWait);
612 thread->dereference_tsd();
613
614 if (cleanupThisThread)
615 {
616 _cleanupThread(thread);
617 _currentThreads--;
618 numThreadsCleanedUp++;
619 }
620 else
621 {
622 _idleThreads.insert_first(thread);
623 }
|
624 konrad.r 1.67 }
|
625 kumpf 1.81
626 PEG_METHOD_EXIT();
627 return numThreadsCleanedUp;
|
628 konrad.r 1.67 }
|
629 mday 1.19
|
630 kumpf 1.81 void ThreadPool::_cleanupThread(Thread* thread)
|
631 mday 1.19 {
|
632 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
633
634 // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
635 thread->delete_tsd("work func");
636 thread->put_tsd(
637 "work func", 0,
638 sizeof(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
639 (void *) 0);
640 thread->delete_tsd("work parm");
641 thread->put_tsd("work parm", 0, sizeof(void *), 0);
642
643 // signal the thread's sleep semaphore to awaken it
644 Semaphore* sleep_sem = (Semaphore *)thread->reference_tsd("sleep sem");
645 PEGASUS_ASSERT(sleep_sem != 0);
646 sleep_sem->signal();
647 thread->dereference_tsd();
648
649 thread->join();
650 delete thread;
651
652 PEG_METHOD_EXIT();
|
653 mday 1.19 }
654
|
655 kumpf 1.81 Boolean ThreadPool::_timeIntervalExpired(
656 struct timeval* start,
657 struct timeval* interval)
|
658 mday 1.19 {
|
659 kumpf 1.81 // never time out if the interval is zero
660 if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
661 {
662 return false;
663 }
664
665 struct timeval now, finish, remaining;
666 Uint32 usec;
667 pegasus_gettimeofday(&now);
668 pegasus_gettimeofday(&remaining); // Avoid valgrind error
669
670 finish.tv_sec = start->tv_sec + interval->tv_sec;
671 usec = start->tv_usec + interval->tv_usec;
672 finish.tv_sec += (usec / 1000000);
673 usec %= 1000000;
674 finish.tv_usec = usec;
675
676 return (timeval_subtract(&remaining, &finish, &now) != 0);
|
677 mday 1.19 }
678
|
679 kumpf 1.81 void ThreadPool::_deleteSemaphore(void *p)
|
680 mday 1.19 {
|
681 kumpf 1.81 delete (Semaphore *)p;
|
682 mday 1.19 }
683
|
684 kumpf 1.81 Thread* ThreadPool::_initializeThread()
|
685 mday 1.19 {
|
686 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
687
688 Thread* th = (Thread *) new Thread(_loop, this, false);
689
690 // allocate a sleep semaphore and pass it in the thread context
691 // initial count is zero, loop function will sleep until
692 // we signal the semaphore
693 Semaphore* sleep_sem = (Semaphore *) new Semaphore(0);
694 th->put_tsd(
695 "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void *)sleep_sem);
696
697 struct timeval* lastActivityTime =
698 (struct timeval *) ::operator new(sizeof(struct timeval));
699 pegasus_gettimeofday(lastActivityTime);
700
701 th->put_tsd("last activity time", thread_data::default_delete,
702 sizeof(struct timeval), (void *)lastActivityTime);
703 // thread will enter _loop() and sleep on sleep_sem until we signal it
704
|
705 konrad.r 1.86 if (th->run() != PEGASUS_THREAD_OK)
|
706 kumpf 1.81 {
|
707 konrad.r 1.86 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
708 "Could not create thread. Error code is %d.", errno);
|
709 kumpf 1.81 delete th;
710 return 0;
711 }
712 _currentThreads++;
713 pegasus_yield();
714
715 PEG_METHOD_EXIT();
716 return th;
|
717 mday 1.19 }
|
718 mike 1.2
|
719 kumpf 1.81 void ThreadPool::_addToIdleThreadsQueue(Thread* th)
720 {
721 if (th == 0)
722 {
723 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
724 "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
725 throw NullPointer();
726 }
727
728 try
729 {
730 _idleThreads.insert_first(th);
731 }
732 catch (...)
733 {
734 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
735 "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_first "
736 "failed.");
737 }
738 }
|
739 mike 1.2
740 PEGASUS_NAMESPACE_END
|