1 karl 1.75 //%2005////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.56 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 mike 1.2 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
13 chip 1.11 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
16 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
|
18 david.dillard 1.83 //
|
19 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
20 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
22 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
25 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Mike Day (mdday@us.ibm.com)
31 //
32 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
33 chip 1.11 // added nsk platform support
|
34 kumpf 1.59 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
35 a.arora 1.64 // Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
|
36 gs.keenan 1.76 // Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
|
37 david.dillard 1.83 // David Dillard, VERITAS Software Corp.
38 // (david.dillard@veritas.com)
|
39 mike 1.2 //
40 //%/////////////////////////////////////////////////////////////////////////////
41
42 #include "Thread.h"
|
43 kumpf 1.68 #include <exception>
|
44 mike 1.2 #include <Pegasus/Common/IPC.h>
|
45 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
46 mike 1.2
47 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
48 chip 1.11 # include "ThreadWindows.cpp"
|
49 mike 1.2 #elif defined(PEGASUS_OS_TYPE_UNIX)
50 # include "ThreadUnix.cpp"
51 #elif defined(PEGASUS_OS_TYPE_NSK)
52 # include "ThreadNsk.cpp"
|
53 gs.keenan 1.76 #elif defined(PEGASUS_OS_VMS)
54 # include "ThreadVms.cpp"
|
55 mike 1.2 #else
56 # error "Unsupported platform"
57 #endif
58
|
59 kumpf 1.69 PEGASUS_USING_STD;
|
60 mike 1.2 PEGASUS_NAMESPACE_BEGIN
61
|
62 mday 1.42
|
63 chip 1.11 void thread_data::default_delete(void * data)
64 {
|
65 mike 1.2 if( data != NULL)
|
66 chip 1.11 ::operator delete(data);
|
67 mike 1.2 }
68
|
69 chuck 1.43 // l10n start
70 void language_delete(void * data)
71 {
72 if( data != NULL)
73 {
|
74 a.arora 1.64 AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
|
75 chuck 1.43 }
76 }
77 // l10n end
78
|
79 mike 1.2 Boolean Thread::_signals_blocked = false;
|
80 chuck 1.37 // l10n
|
81 marek 1.63 #ifndef PEGASUS_OS_ZOS
|
82 w.otsuka 1.74 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = PEGASUS_THREAD_KEY_TYPE(-1);
|
83 marek 1.63 #else
84 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
85 #endif
|
86 chuck 1.37 Boolean Thread::_key_initialized = false;
|
87 chuck 1.41 Boolean Thread::_key_error = false;
|
88 chuck 1.37
|
89 mike 1.2
|
90 david.dillard 1.83 void Thread::cleanup_push( void (*routine)(void *), void *parm)
|
91 mike 1.2 {
|
92 a.arora 1.64 AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
|
93 a.arora 1.65 _cleanup.insert_first(cu.get());
|
94 a.arora 1.64 cu.release();
|
95 mike 1.2 return;
96 }
|
97 kumpf 1.81
|
98 david.dillard 1.83 void Thread::cleanup_pop(Boolean execute)
|
99 mike 1.2 {
|
100 david.dillard 1.83 AutoPtr<cleanup_handler> cu;
|
101 chip 1.11 try
102 {
|
103 kumpf 1.81 cu.reset(_cleanup.remove_first());
|
104 mike 1.2 }
|
105 chip 1.11 catch(IPCException&)
|
106 mike 1.2 {
|
107 kumpf 1.81 PEGASUS_ASSERT(0);
|
108 mike 1.2 }
109 if(execute == true)
|
110 kumpf 1.81 cu->execute();
|
111 mike 1.2 }
|
112 kumpf 1.81
|
113 mike 1.2
|
114 david.dillard 1.83 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value)
|
115 mike 1.2
116
|
117 chip 1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
118 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
119 {
120 // execute the cleanup stack and then return
|
121 mike 1.2 while( _cleanup.count() )
122 {
|
123 chip 1.11 try
124 {
|
125 kumpf 1.81 cleanup_pop(true);
|
126 chip 1.11 }
127 catch(IPCException&)
128 {
|
129 kumpf 1.81 PEGASUS_ASSERT(0);
130 break;
|
131 mike 1.2 }
132 }
133 _exit_code = exit_code;
134 exit_thread(exit_code);
|
135 mday 1.4 _handle.thid = 0;
|
136 mike 1.2 }
137
138
139 #endif
140
|
141 chuck 1.37 // l10n start
|
142 chuck 1.39 Sint8 Thread::initializeKey()
143 {
|
144 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
145 if (!Thread::_key_initialized)
146 {
147 if (Thread::_key_error)
148 {
149 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
150 "Thread: ERROR - thread key error");
151 return -1;
152 }
153
154 if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
155 {
156 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157 "Thread: able to create a thread key");
158 Thread::_key_initialized = true;
159 }
160 else
161 {
162 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
163 "Thread: ERROR - unable to create a thread key");
164 Thread::_key_error = true;
165 kumpf 1.81 return -1;
166 }
167 }
|
168 chuck 1.39
|
169 kumpf 1.81 PEG_METHOD_EXIT();
170 return 0;
|
171 chuck 1.39 }
172
|
173 chuck 1.37 Thread * Thread::getCurrent()
174 {
|
175 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
|
176 chuck 1.40 if (Thread::initializeKey() != 0)
|
177 chuck 1.39 {
|
178 kumpf 1.81 return NULL;
|
179 chuck 1.39 }
|
180 kumpf 1.81 PEG_METHOD_EXIT();
181 return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
|
182 chuck 1.39 }
183
184 void Thread::setCurrent(Thread * thrd)
185 {
|
186 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
187 if (Thread::initializeKey() == 0)
188 {
189 if (pegasus_set_thread_specific(
190 Thread::_platform_thread_key, (void *) thrd) == 0)
|
191 chuck 1.39 {
|
192 kumpf 1.81 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
193 "Successful set Thread * into thread specific storage");
|
194 chuck 1.39 }
195 else
196 {
|
197 kumpf 1.81 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
198 "ERROR: error setting Thread * into thread specific storage");
|
199 chuck 1.39 }
|
200 kumpf 1.81 }
201 PEG_METHOD_EXIT();
|
202 chuck 1.37 }
203
204 AcceptLanguages * Thread::getLanguages()
205 {
|
206 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
207
208 Thread * curThrd = Thread::getCurrent();
209 if (curThrd == NULL)
210 return NULL;
211 AcceptLanguages * acceptLangs =
212 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
213 curThrd->dereference_tsd();
214 PEG_METHOD_EXIT();
215 return acceptLangs;
|
216 chuck 1.37 }
217
218 void Thread::setLanguages(AcceptLanguages *langs) //l10n
219 {
|
220 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
221
222 Thread* currentThrd = Thread::getCurrent();
223 if (currentThrd != NULL)
224 {
225 // deletes the old tsd and creates a new one
226 currentThrd->put_tsd("acceptLanguages",
227 language_delete,
228 sizeof(AcceptLanguages *),
229 langs);
230 }
231
232 PEG_METHOD_EXIT();
|
233 chuck 1.37 }
234
235 void Thread::clearLanguages() //l10n
236 {
|
237 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
238
239 Thread * currentThrd = Thread::getCurrent();
240 if (currentThrd != NULL)
241 {
242 // deletes the old tsd
243 currentThrd->delete_tsd("acceptLanguages");
244 }
245
246 PEG_METHOD_EXIT();
|
247 chuck 1.37 }
|
248 kumpf 1.81 // l10n end
|
249 chuck 1.37
|
250 mday 1.52
|
251 kumpf 1.81 ///////////////////////////////////////////////////////////////////////////////
252 //
253 // ThreadPool
254 //
255 ///////////////////////////////////////////////////////////////////////////////
256
257 ThreadPool::ThreadPool(
258 Sint16 initialSize,
259 const char* key,
260 Sint16 minThreads,
261 Sint16 maxThreads,
262 struct timeval& deallocateWait)
263 : _maxThreads(maxThreads),
264 _minThreads(minThreads),
265 _currentThreads(0),
266 _idleThreads(true),
267 _runningThreads(true),
268 _dying(0)
|
269 mday 1.58 {
|
270 kumpf 1.81 _deallocateWait.tv_sec = deallocateWait.tv_sec;
271 _deallocateWait.tv_usec = deallocateWait.tv_usec;
|
272 mday 1.58
|
273 kumpf 1.81 memset(_key, 0x00, 17);
274 if (key != 0)
275 {
276 strncpy(_key, key, 16);
277 }
278
279 if ((_maxThreads > 0) && (_maxThreads < initialSize))
280 {
281 _maxThreads = initialSize;
282 }
|
283 mday 1.58
|
284 kumpf 1.81 if (_minThreads > initialSize)
285 {
286 _minThreads = initialSize;
287 }
|
288 mday 1.52
|
289 kumpf 1.81 for (int i = 0; i < initialSize; i++)
290 {
291 _addToIdleThreadsQueue(_initializeThread());
292 }
293 }
|
294 mday 1.20
|
295 kumpf 1.81 ThreadPool::~ThreadPool()
|
296 mday 1.20 {
|
297 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
|
298 konrad.r 1.86
|
299 kumpf 1.81 try
300 {
301 // Set the dying flag so all thread know the destructor has been entered
302 _dying++;
|
303 konrad.r 1.86 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
304 "Cleaning up %d idle threads. ", _currentThreads.value());
|
305 kumpf 1.81 while (_currentThreads.value() > 0)
306 {
307 Thread* thread = _idleThreads.remove_first();
308 if (thread != 0)
309 {
310 _cleanupThread(thread);
311 _currentThreads--;
312 }
313 else
314 {
315 pegasus_yield();
316 }
317 }
318 }
319 catch (...)
320 {
321 }
|
322 mday 1.20 }
323
|
324 kumpf 1.81 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
325 {
326 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
327
328 try
329 {
|
330 kumpf 1.82 Thread* myself = (Thread *)parm;
331 PEGASUS_ASSERT(myself != 0);
|
332 kumpf 1.81
|
333 kumpf 1.82 // Set myself into thread specific storage
334 // This will allow code to get its own Thread
335 Thread::setCurrent(myself);
|
336 kumpf 1.81
|
337 kumpf 1.82 ThreadPool* pool = (ThreadPool *)myself->get_parm();
338 PEGASUS_ASSERT(pool != 0);
|
339 mike 1.2
|
340 kumpf 1.82 Semaphore* sleep_sem = 0;
341 struct timeval* lastActivityTime = 0;
|
342 chuck 1.39
|
343 kumpf 1.81 try
344 {
|
345 kumpf 1.82 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
|
346 kumpf 1.81 myself->dereference_tsd();
|
347 kumpf 1.82 PEGASUS_ASSERT(sleep_sem != 0);
348
349 lastActivityTime =
350 (struct timeval *)myself->reference_tsd("last activity time");
|
351 kumpf 1.81 myself->dereference_tsd();
|
352 kumpf 1.82 PEGASUS_ASSERT(lastActivityTime != 0);
|
353 kumpf 1.81 }
354 catch (...)
355 {
356 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
357 kumpf 1.82 "ThreadPool::_loop: Failure getting sleep_sem or "
358 "lastActivityTime.");
|
359 kumpf 1.81 PEGASUS_ASSERT(false);
360 pool->_idleThreads.remove(myself);
361 pool->_currentThreads--;
362 PEG_METHOD_EXIT();
363 return((PEGASUS_THREAD_RETURN)1);
364 }
|
365 mday 1.52
|
366 kumpf 1.82 while (1)
|
367 kumpf 1.81 {
|
368 kumpf 1.82 try
369 {
370 sleep_sem->wait();
371 }
372 catch (...)
373 {
374 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
375 "ThreadPool::_loop: failure on sleep_sem->wait().");
376 PEGASUS_ASSERT(false);
377 pool->_idleThreads.remove(myself);
378 pool->_currentThreads--;
379 PEG_METHOD_EXIT();
380 return((PEGASUS_THREAD_RETURN)1);
381 }
382
383 // When we awaken we reside on the _runningThreads queue, not the
384 // _idleThreads queue.
385
386 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *) = 0;
387 void* parm = 0;
388 Semaphore* blocking_sem = 0;
389 kumpf 1.82
390 try
391 {
392 work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *))
393 myself->reference_tsd("work func");
394 myself->dereference_tsd();
395 parm = myself->reference_tsd("work parm");
396 myself->dereference_tsd();
397 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
398 myself->dereference_tsd();
399 }
400 catch (...)
401 {
402 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
403 "ThreadPool::_loop: Failure accessing work func, work parm, "
404 "or blocking sem.");
405 PEGASUS_ASSERT(false);
406 pool->_idleThreads.remove(myself);
407 pool->_currentThreads--;
408 PEG_METHOD_EXIT();
409 return((PEGASUS_THREAD_RETURN)1);
410 kumpf 1.82 }
411
412 if (work == 0)
413 {
|
414 carolann.graves 1.84 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
415 kumpf 1.82 "ThreadPool::_loop: work func is 0, meaning we should exit.");
416 break;
417 }
|
418 mike 1.2
|
419 kumpf 1.82 gettimeofday(lastActivityTime, NULL);
|
420 konrad.r 1.67
|
421 kumpf 1.82 try
422 {
423 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work starting.");
424 work(parm);
425 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4, "Work finished.");
426 }
427 catch (Exception & e)
428 {
429 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
430 String("Exception from work in ThreadPool::_loop: ") +
431 e.getMessage());
432 }
|
433 kumpf 1.68 #if !defined(PEGASUS_OS_LSB)
|
434 konrad.r 1.86 catch (const exception& e)
|
435 kumpf 1.82 {
436 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
437 String("Exception from work in ThreadPool::_loop: ") +
438 e.what());
439 }
|
440 kumpf 1.68 #endif
|
441 kumpf 1.82 catch (...)
442 {
443 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
444 "Unknown exception from work in ThreadPool::_loop.");
445 }
|
446 kumpf 1.81
|
447 kumpf 1.82 // put myself back onto the available list
448 try
|
449 kumpf 1.57 {
|
450 kumpf 1.82 gettimeofday(lastActivityTime, NULL);
451 if (blocking_sem != 0)
452 {
453 blocking_sem->signal();
454 }
|
455 s.hills 1.49
|
456 kumpf 1.82 Boolean removed = pool->_runningThreads.remove((void *)myself);
457 PEGASUS_ASSERT(removed);
|
458 s.hills 1.49
|
459 kumpf 1.82 pool->_idleThreads.insert_first(myself);
460 }
461 catch (...)
462 {
463 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
464 "ThreadPool::_loop: Adding thread to idle pool failed.");
465 PEGASUS_ASSERT(false);
466 pool->_currentThreads--;
467 PEG_METHOD_EXIT();
468 return((PEGASUS_THREAD_RETURN)1);
469 }
|
470 kumpf 1.81 }
|
471 kumpf 1.82 }
472 catch (const Exception& e)
473 {
474 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
475 "Caught exception: \"" + e.getMessage() + "\". Exiting _loop.");
476 }
477 catch (...)
478 {
479 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
480 "Caught unrecognized exception. Exiting _loop.");
|
481 kumpf 1.81 }
|
482 kumpf 1.14
|
483 kumpf 1.81 PEG_METHOD_EXIT();
484 return((PEGASUS_THREAD_RETURN)0);
|
485 mike 1.2 }
486
|
487 konrad.r 1.86 ThreadStatus ThreadPool::allocate_and_awaken(
|
488 kumpf 1.81 void* parm,
489 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL* work)(void *),
490 Semaphore* blocking)
|
491 mike 1.2 {
|
492 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
493
494 // Allocate_and_awaken will not run if the _dying flag is set.
495 // Once the lock is acquired, ~ThreadPool will not change
496 // the value of _dying until the lock is released.
497
498 try
499 {
500 if (_dying.value())
501 {
502 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
503 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
|
504 konrad.r 1.86 return PEGASUS_THREAD_UNAVAILABLE;
|
505 kumpf 1.81 }
506 struct timeval start;
507 gettimeofday(&start, NULL);
508 Thread* th = 0;
509
510 th = _idleThreads.remove_first();
|
511 kumpf 1.57
|
512 kumpf 1.81 if (th == 0)
513 {
|
514 mike 1.86.8.1 if ((_maxThreads == 0) ||
515 (_currentThreads.value() < Uint32(_maxThreads)))
|
516 kumpf 1.81 {
517 th = _initializeThread();
518 }
519 }
520
521 if (th == 0)
522 {
523 // ATTN-DME-P3-20031103: This trace message should not be
524 // be labeled TRC_DISCARDED_DATA, because it does not
525 // necessarily imply that a failure has occurred. However,
526 // this label is being used temporarily to help isolate
527 // the cause of client timeout problems.
528 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
529 "ThreadPool::allocate_and_awaken: Insufficient resources: "
530 " pool = %s, running threads = %d, idle threads = %d",
531 _key, _runningThreads.count(), _idleThreads.count());
|
532 konrad.r 1.86 return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
|
533 kumpf 1.81 }
|
534 mike 1.2
|
535 kumpf 1.81 // initialize the thread data with the work function and parameters
536 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
537 "Initializing thread with work function and parameters: parm = %p",
538 parm);
|
539 mike 1.2
|
540 kumpf 1.81 th->delete_tsd("work func");
541 th->put_tsd("work func", NULL,
|
542 kumpf 1.70 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
|
543 kumpf 1.81 (void *)work);
544 th->delete_tsd("work parm");
545 th->put_tsd("work parm", NULL, sizeof(void *), parm);
546 th->delete_tsd("blocking sem");
547 if (blocking != 0)
548 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
549
550 // put the thread on the running list
551 _runningThreads.insert_first(th);
552
553 // signal the thread's sleep semaphore to awaken it
554 Semaphore* sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
555 PEGASUS_ASSERT(sleep_sem != 0);
556
557 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
558 sleep_sem->signal();
559 th->dereference_tsd();
|
560 kumpf 1.57 }
|
561 mday 1.58 catch (...)
|
562 kumpf 1.57 {
|
563 kumpf 1.81 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
564 "ThreadPool::allocate_and_awaken: Operation Failed.");
565 PEG_METHOD_EXIT();
566 // ATTN: Error result has not yet been defined
|
567 konrad.r 1.86 return PEGASUS_THREAD_SETUP_FAILURE;
|
568 kumpf 1.57 }
|
569 kumpf 1.81 PEG_METHOD_EXIT();
|
570 konrad.r 1.86 return PEGASUS_THREAD_OK;
|
571 mike 1.2 }
572
|
573 kumpf 1.81 // caller is responsible for only calling this routine during slack periods
574 // but should call it at least once per _deallocateWait interval.
|
575 mday 1.12
|
576 kumpf 1.81 Uint32 ThreadPool::cleanupIdleThreads()
|
577 mike 1.2 {
|
578 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
579
580 Uint32 numThreadsCleanedUp = 0;
581
582 Uint32 numIdleThreads = _idleThreads.count();
583 for (Uint32 i = 0; i < numIdleThreads; i++)
584 {
585 // Do not dip below the minimum thread count
586 if (_currentThreads.value() <= (Uint32)_minThreads)
587 {
588 break;
589 }
590
591 Thread* thread = _idleThreads.remove_last();
592
593 // If there are no more threads in the _idleThreads queue, we're done.
594 if (thread == 0)
595 {
596 break;
597 }
598
599 kumpf 1.81 struct timeval* lastActivityTime;
600 try
601 {
602 lastActivityTime = (struct timeval *)thread->try_reference_tsd(
603 "last activity time");
604 PEGASUS_ASSERT(lastActivityTime != 0);
605 }
606 catch (...)
607 {
608 PEGASUS_ASSERT(false);
609 _idleThreads.insert_last(thread);
610 break;
611 }
612
613 Boolean cleanupThisThread =
614 _timeIntervalExpired(lastActivityTime, &_deallocateWait);
615 thread->dereference_tsd();
616
617 if (cleanupThisThread)
618 {
619 _cleanupThread(thread);
620 kumpf 1.81 _currentThreads--;
621 numThreadsCleanedUp++;
622 }
623 else
624 {
625 _idleThreads.insert_first(thread);
626 }
|
627 konrad.r 1.67 }
|
628 kumpf 1.81
629 PEG_METHOD_EXIT();
630 return numThreadsCleanedUp;
|
631 konrad.r 1.67 }
|
632 mday 1.19
|
633 kumpf 1.81 void ThreadPool::_cleanupThread(Thread* thread)
|
634 mday 1.19 {
|
635 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
636
637 // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
638 thread->delete_tsd("work func");
639 thread->put_tsd(
640 "work func", 0,
641 sizeof(PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
642 (void *) 0);
643 thread->delete_tsd("work parm");
644 thread->put_tsd("work parm", 0, sizeof(void *), 0);
645
646 // signal the thread's sleep semaphore to awaken it
647 Semaphore* sleep_sem = (Semaphore *)thread->reference_tsd("sleep sem");
648 PEGASUS_ASSERT(sleep_sem != 0);
649 sleep_sem->signal();
650 thread->dereference_tsd();
651
652 thread->join();
653 delete thread;
654
655 PEG_METHOD_EXIT();
|
656 mday 1.19 }
657
|
658 kumpf 1.81 Boolean ThreadPool::_timeIntervalExpired(
659 struct timeval* start,
660 struct timeval* interval)
|
661 mday 1.19 {
|
662 kumpf 1.81 // never time out if the interval is zero
663 if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
664 {
665 return false;
666 }
667
668 struct timeval now, finish, remaining;
669 Uint32 usec;
670 pegasus_gettimeofday(&now);
671 pegasus_gettimeofday(&remaining); // Avoid valgrind error
672
673 finish.tv_sec = start->tv_sec + interval->tv_sec;
674 usec = start->tv_usec + interval->tv_usec;
675 finish.tv_sec += (usec / 1000000);
676 usec %= 1000000;
677 finish.tv_usec = usec;
678
679 return (timeval_subtract(&remaining, &finish, &now) != 0);
|
680 mday 1.19 }
681
|
682 kumpf 1.81 void ThreadPool::_deleteSemaphore(void *p)
|
683 mday 1.19 {
|
684 kumpf 1.81 delete (Semaphore *)p;
|
685 mday 1.19 }
686
|
687 kumpf 1.81 Thread* ThreadPool::_initializeThread()
|
688 mday 1.19 {
|
689 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
690
691 Thread* th = (Thread *) new Thread(_loop, this, false);
692
693 // allocate a sleep semaphore and pass it in the thread context
694 // initial count is zero, loop function will sleep until
695 // we signal the semaphore
696 Semaphore* sleep_sem = (Semaphore *) new Semaphore(0);
697 th->put_tsd(
698 "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void *)sleep_sem);
699
700 struct timeval* lastActivityTime =
701 (struct timeval *) ::operator new(sizeof(struct timeval));
702 pegasus_gettimeofday(lastActivityTime);
703
704 th->put_tsd("last activity time", thread_data::default_delete,
705 sizeof(struct timeval), (void *)lastActivityTime);
706 // thread will enter _loop() and sleep on sleep_sem until we signal it
707
|
708 konrad.r 1.86 if (th->run() != PEGASUS_THREAD_OK)
|
709 kumpf 1.81 {
|
710 konrad.r 1.86 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
711 "Could not create thread. Error code is %d.", errno);
|
712 kumpf 1.81 delete th;
713 return 0;
714 }
715 _currentThreads++;
716 pegasus_yield();
717
718 PEG_METHOD_EXIT();
719 return th;
|
720 mday 1.19 }
|
721 mike 1.2
|
722 kumpf 1.81 void ThreadPool::_addToIdleThreadsQueue(Thread* th)
723 {
724 if (th == 0)
725 {
726 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
727 "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
728 throw NullPointer();
729 }
730
731 try
732 {
733 _idleThreads.insert_first(th);
734 }
735 catch (...)
736 {
737 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
738 "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_first "
739 "failed.");
740 }
741 }
|
742 mike 1.2
743 PEGASUS_NAMESPACE_END
|