1 karl 1.89 //%2006////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.71 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.56 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.71 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.75 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.89 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mike 1.2 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
15 chip 1.11 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
18 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.89 //
|
21 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
24 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
27 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Mike Day (mdday@us.ibm.com)
33 //
34 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
35 chip 1.11 // added nsk platform support
|
36 kumpf 1.59 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
37 a.arora 1.64 // Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
|
38 gs.keenan 1.76 // Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
|
39 david.dillard 1.83 // David Dillard, VERITAS Software Corp.
40 // (david.dillard@veritas.com)
|
41 mike 1.2 //
42 //%/////////////////////////////////////////////////////////////////////////////
43
44 #include "Thread.h"
|
45 kumpf 1.68 #include <exception>
|
46 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
47 mike 1.90.2.1 #include "Time.h"
|
48 mike 1.2
|
49 mike 1.90.2.6 PEGASUS_USING_STD;
50
51 PEGASUS_NAMESPACE_BEGIN
52
53 //==============================================================================
54 //
55 // POSIX Threads Implementation:
56 //
57 //==============================================================================
58
59 #if defined(PEGASUS_HAVE_PTHREADS)
60
|
61 mike 1.90.2.7 extern "C" void *_start_wrapper(void *arg_)
|
62 mike 1.90.2.6 {
|
63 mike 1.90.2.7 StartWrapperArg *arg = (StartWrapperArg *) arg_;
|
64 mike 1.90.2.6
|
65 mike 1.90.2.7 void *return_value = (*arg->start) (arg->arg);
|
66 mike 1.90.2.6 delete arg;
67
68 return return_value;
69 }
70
|
71 mike 1.90.2.7 static sigset_t *block_signal_mask(sigset_t * sig)
|
72 mike 1.90.2.6 {
73 sigemptyset(sig);
74 // should not be used for main()
75 sigaddset(sig, SIGHUP);
76 sigaddset(sig, SIGINT);
77 // maybe useless, since KILL can't be blocked according to POSIX
78 sigaddset(sig, SIGKILL);
79
80 sigaddset(sig, SIGABRT);
81 sigaddset(sig, SIGALRM);
82 sigaddset(sig, SIGPIPE);
83
84
85 // Note: older versions of the linux pthreads library use SIGUSR1 and SIGUSR2
86 // internally to stop and start threads that are blocking, the newer ones
87 // implement this through the kernel's real time signals
88 // since SIGSTOP/CONT can handle suspend()/resume() on Linux
89 // block them
90 // #if defined(PEGASUS_PLATFORM_LINUX_IX86_GNU)
91 // sigaddset(sig, SIGUSR1);
92 // sigaddset(sig, SIGUSR2);
93 mike 1.90.2.6 // #endif
94 #ifndef PEGASUS_PLATFORM_ZOS_ZSERIES_IBM
95 pthread_sigmask(SIG_BLOCK, sig, NULL);
|
96 mike 1.2 #else
|
97 mike 1.90.2.6 sigprocmask(SIG_BLOCK, sig, NULL);
|
98 mike 1.2 #endif
|
99 mike 1.90.2.6 return sig;
100 }
|
101 mike 1.2
|
102 mike 1.90.2.7 Thread::Thread(ThreadReturnType(PEGASUS_THREAD_CDECL * start) (void *), void *parameter, Boolean detached):_is_detached(detached),
103 _cancel_enabled(true),
104 _cancelled(false),
105 _start(start), _cleanup(), _tsd(), _thread_parm(parameter), _exit_code(0)
|
106 mike 1.90.2.6 {
107 Threads::clear(_handle.thid);
108 }
|
109 mike 1.2
|
110 mike 1.90.2.6 Thread::~Thread()
111 {
112 try
113 {
114 join();
115 empty_tsd();
116 }
|
117 mike 1.90.2.7 catch(...)
|
118 mike 1.90.2.6 {
119 // Do not allow the destructor to throw an exception
120 }
121 }
122
123 #endif /* PEGASUS_HAVE_PTHREADS */
124
125 //==============================================================================
126 //
127 // Windows Threads Implementation:
128 //
129 //==============================================================================
130
131 #if defined(PEGASUS_HAVE_WINDOWS_THREADS)
132
|
133 mike 1.90.2.7 Thread::Thread(ThreadReturnType(PEGASUS_THREAD_CDECL * start) (void *),
134 void *parameter,
135 Boolean detached):_is_detached(detached),
136 _cancel_enabled(true),
137 _cancelled(false),
138 _start(start), _cleanup(), _tsd(), _thread_parm(parameter), _exit_code(0)
|
139 mike 1.90.2.6 {
|
140 mike 1.90.2.7 Threads::clear(_handle.thid);
|
141 mike 1.90.2.6 }
142
143 Thread::~Thread()
144 {
|
145 mike 1.90.2.7 try
146 {
147 join();
148 empty_tsd();
149 }
150 catch(...)
151 {
152 }
|
153 mike 1.90.2.6 }
154
155 #endif /* PEGASUS_HAVE_WINDOWS_THREADS */
156
157 //==============================================================================
158 //
159 // Common implementation:
160 //
161 //==============================================================================
|
162 mday 1.42
|
163 mike 1.90.2.7 void thread_data::default_delete(void *data)
|
164 chip 1.11 {
|
165 mike 1.90.2.7 if (data != NULL)
166 ::operator delete(data);
|
167 mike 1.2 }
168
|
169 mike 1.90.2.7 void language_delete(void *data)
|
170 chuck 1.43 {
|
171 mike 1.90.2.7 if (data != NULL)
172 {
173 AutoPtr < AcceptLanguageList > al(static_cast <
174 AcceptLanguageList * >(data));
175 }
|
176 chuck 1.43 }
177
|
178 mike 1.2 Boolean Thread::_signals_blocked = false;
|
179 marek 1.63 #ifndef PEGASUS_OS_ZOS
|
180 mike 1.90.2.1 TSDKeyType Thread::_platform_thread_key = TSDKeyType(-1);
|
181 marek 1.63 #else
|
182 mike 1.90.2.1 TSDKeyType Thread::_platform_thread_key;
|
183 marek 1.63 #endif
|
184 chuck 1.37 Boolean Thread::_key_initialized = false;
|
185 chuck 1.41 Boolean Thread::_key_error = false;
|
186 chuck 1.37
|
187 mike 1.90.2.7 void Thread::cleanup_push(void (*routine) (void *), void *parm)
|
188 mike 1.2 {
|
189 mike 1.90.2.7 AutoPtr < cleanup_handler > cu(new cleanup_handler(routine, parm));
|
190 mike 1.90 _cleanup.insert_front(cu.get());
|
191 a.arora 1.64 cu.release();
|
192 mike 1.2 return;
193 }
|
194 kumpf 1.81
|
195 david.dillard 1.83 void Thread::cleanup_pop(Boolean execute)
|
196 mike 1.2 {
|
197 mike 1.90.2.7 AutoPtr < cleanup_handler > cu;
|
198 chip 1.11 try
199 {
|
200 mike 1.90 cu.reset(_cleanup.remove_front());
|
201 mike 1.2 }
|
202 mike 1.90.2.7 catch(IPCException &)
|
203 mike 1.2 {
|
204 kumpf 1.81 PEGASUS_ASSERT(0);
|
205 mike 1.90.2.7 }
206 if (execute == true)
|
207 kumpf 1.81 cu->execute();
|
208 mike 1.2 }
|
209 kumpf 1.81
|
210 mike 1.2
|
211 david.dillard 1.83 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value)
|
212 mike 1.2
213
|
214 mike 1.90.2.1 void Thread::exit_self(ThreadReturnType exit_code)
|
215 chip 1.11 {
|
216 mike 1.90.2.6 #if defined(PEGASUS_PLATFORM_HPUX_ACC) || \
217 defined(PEGASUS_PLATFORM_LINUX_GENERIC_GNU)
218 // NOTE: pthread_exit exhibits unusual behavior on RHEL 3 U2, as
219 // documented in Bugzilla 3836. Where feasible, it may be advantageous
220 // to avoid using this function.
221 pthread_exit(exit_code);
222 #else
|
223 chip 1.11 // execute the cleanup stack and then return
|
224 mike 1.90.2.7 while (_cleanup.size())
225 {
226 try
227 {
228 cleanup_pop(true);
229 }
230 catch(IPCException &)
231 {
232 PEGASUS_ASSERT(0);
233 break;
234 }
235 }
236 _exit_code = exit_code;
237 Threads::exit(exit_code);
238 Threads::clear(_handle.thid);
|
239 mike 1.2 #endif
|
240 mike 1.90.2.6 }
|
241 mike 1.2
|
242 chuck 1.39 Sint8 Thread::initializeKey()
243 {
|
244 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
245 if (!Thread::_key_initialized)
246 {
247 if (Thread::_key_error)
248 {
249 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
250 mike 1.90.2.7 "Thread: ERROR - thread key error");
|
251 kumpf 1.81 return -1;
252 }
253
|
254 mike 1.90.2.1 if (TSDKey::create(&Thread::_platform_thread_key) == 0)
|
255 kumpf 1.81 {
256 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
257 mike 1.90.2.7 "Thread: able to create a thread key");
|
258 kumpf 1.81 Thread::_key_initialized = true;
259 }
260 else
261 {
262 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
263 mike 1.90.2.7 "Thread: ERROR - unable to create a thread key");
|
264 kumpf 1.81 Thread::_key_error = true;
265 return -1;
266 }
267 }
|
268 chuck 1.39
|
269 kumpf 1.81 PEG_METHOD_EXIT();
270 return 0;
|
271 chuck 1.39 }
272
|
273 mike 1.90.2.7 Thread *Thread::getCurrent()
|
274 chuck 1.37 {
|
275 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
|
276 chuck 1.40 if (Thread::initializeKey() != 0)
|
277 chuck 1.39 {
|
278 kumpf 1.81 return NULL;
|
279 chuck 1.39 }
|
280 kumpf 1.81 PEG_METHOD_EXIT();
|
281 mike 1.90.2.7 return (Thread *) TSDKey::get_thread_specific(_platform_thread_key);
|
282 chuck 1.39 }
283
284 void Thread::setCurrent(Thread * thrd)
285 {
|
286 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
287 if (Thread::initializeKey() == 0)
288 {
|
289 mike 1.90.2.7 if (TSDKey::
290 set_thread_specific(Thread::_platform_thread_key,
291 (void *) thrd) == 0)
|
292 chuck 1.39 {
|
293 kumpf 1.81 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
294 "Successful set Thread * into thread specific storage");
|
295 chuck 1.39 }
296 else
297 {
|
298 kumpf 1.81 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
299 "ERROR: error setting Thread * into thread specific storage");
|
300 chuck 1.39 }
|
301 kumpf 1.81 }
302 PEG_METHOD_EXIT();
|
303 chuck 1.37 }
304
|
305 mike 1.90.2.7 AcceptLanguageList *Thread::getLanguages()
|
306 chuck 1.37 {
|
307 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
308
|
309 mike 1.90.2.7 Thread *curThrd = Thread::getCurrent();
|
310 kumpf 1.81 if (curThrd == NULL)
311 return NULL;
|
312 mike 1.90.2.7 AcceptLanguageList *acceptLangs =
313 (AcceptLanguageList *) curThrd->reference_tsd("acceptLanguages");
|
314 kumpf 1.81 curThrd->dereference_tsd();
315 PEG_METHOD_EXIT();
316 return acceptLangs;
|
317 chuck 1.37 }
318
|
319 mike 1.90.2.7 void Thread::setLanguages(AcceptLanguageList * langs) // l10n
|
320 chuck 1.37 {
|
321 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
322
|
323 mike 1.90.2.7 Thread *currentThrd = Thread::getCurrent();
|
324 kumpf 1.81 if (currentThrd != NULL)
325 {
326 // deletes the old tsd and creates a new one
327 currentThrd->put_tsd("acceptLanguages",
|
328 mike 1.90.2.7 language_delete,
329 sizeof (AcceptLanguageList *), langs);
|
330 kumpf 1.81 }
331
332 PEG_METHOD_EXIT();
|
333 chuck 1.37 }
334
|
335 mike 1.90.2.7 void Thread::clearLanguages() // l10n
|
336 chuck 1.37 {
|
337 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
338
|
339 mike 1.90.2.7 Thread *currentThrd = Thread::getCurrent();
|
340 kumpf 1.81 if (currentThrd != NULL)
341 {
342 // deletes the old tsd
343 currentThrd->delete_tsd("acceptLanguages");
344 }
345
346 PEG_METHOD_EXIT();
|
347 chuck 1.37 }
|
348 mday 1.52
|
349 kumpf 1.81 ///////////////////////////////////////////////////////////////////////////////
350 //
351 // ThreadPool
352 //
353 ///////////////////////////////////////////////////////////////////////////////
354
|
355 mike 1.90.2.7 ThreadPool::ThreadPool(Sint16 initialSize,
356 const char *key,
357 Sint16 minThreads,
358 Sint16 maxThreads,
359 struct timeval
360 &deallocateWait):_maxThreads(maxThreads),
361 _minThreads(minThreads), _currentThreads(0), _idleThreads(),
362 _runningThreads(), _dying(0)
|
363 mday 1.58 {
|
364 kumpf 1.81 _deallocateWait.tv_sec = deallocateWait.tv_sec;
365 _deallocateWait.tv_usec = deallocateWait.tv_usec;
|
366 mday 1.58
|
367 kumpf 1.81 memset(_key, 0x00, 17);
368 if (key != 0)
369 {
370 strncpy(_key, key, 16);
371 }
372
373 if ((_maxThreads > 0) && (_maxThreads < initialSize))
374 {
375 _maxThreads = initialSize;
376 }
|
377 mday 1.58
|
378 kumpf 1.81 if (_minThreads > initialSize)
379 {
380 _minThreads = initialSize;
381 }
|
382 mday 1.52
|
383 kumpf 1.81 for (int i = 0; i < initialSize; i++)
384 {
385 _addToIdleThreadsQueue(_initializeThread());
386 }
387 }
|
388 mday 1.20
|
389 kumpf 1.81 ThreadPool::~ThreadPool()
|
390 mday 1.20 {
|
391 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
|
392 konrad.r 1.86
|
393 kumpf 1.81 try
394 {
|
395 mike 1.90.2.7 // Set the dying flag so all thread know the destructor has been
396 // entered
|
397 kumpf 1.81 _dying++;
|
398 mike 1.90.2.7 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
399 "Cleaning up %d idle threads. ", _currentThreads.get());
|
400 mike 1.90.2.4
|
401 mike 1.87 while (_currentThreads.get() > 0)
|
402 kumpf 1.81 {
|
403 mike 1.90.2.7 Thread *thread = _idleThreads.remove_front();
|
404 kumpf 1.81 if (thread != 0)
405 {
406 _cleanupThread(thread);
407 _currentThreads--;
408 }
409 else
410 {
|
411 mike 1.90.2.1 Threads::yield();
|
412 kumpf 1.81 }
413 }
414 }
|
415 mike 1.90.2.7 catch(...)
|
416 kumpf 1.81 {
417 }
|
418 mday 1.20 }
419
|
420 mike 1.90.2.7 ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
|
421 kumpf 1.81 {
422 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
423
424 try
425 {
|
426 mike 1.90.2.7 Thread *myself = (Thread *) parm;
|
427 kumpf 1.82 PEGASUS_ASSERT(myself != 0);
|
428 kumpf 1.81
|
429 kumpf 1.82 // Set myself into thread specific storage
430 // This will allow code to get its own Thread
431 Thread::setCurrent(myself);
|
432 kumpf 1.81
|
433 mike 1.90.2.7 ThreadPool *pool = (ThreadPool *) myself->get_parm();
|
434 kumpf 1.82 PEGASUS_ASSERT(pool != 0);
|
435 mike 1.2
|
436 mike 1.90.2.7 Semaphore *sleep_sem = 0;
437 struct timeval *lastActivityTime = 0;
|
438 chuck 1.39
|
439 kumpf 1.81 try
440 {
|
441 mike 1.90.2.7 sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem");
|
442 kumpf 1.81 myself->dereference_tsd();
|
443 kumpf 1.82 PEGASUS_ASSERT(sleep_sem != 0);
444
445 lastActivityTime =
|
446 mike 1.90.2.7 (struct timeval *) myself->
447 reference_tsd("last activity time");
|
448 kumpf 1.81 myself->dereference_tsd();
|
449 kumpf 1.82 PEGASUS_ASSERT(lastActivityTime != 0);
|
450 kumpf 1.81 }
|
451 mike 1.90.2.7 catch(...)
|
452 kumpf 1.81 {
453 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
454 mike 1.90.2.7 "ThreadPool::_loop: Failure getting sleep_sem or "
455 "lastActivityTime.");
|
456 kumpf 1.81 PEGASUS_ASSERT(false);
457 pool->_idleThreads.remove(myself);
458 pool->_currentThreads--;
459 PEG_METHOD_EXIT();
|
460 mike 1.90.2.7 return ((ThreadReturnType) 1);
|
461 kumpf 1.81 }
|
462 mday 1.52
|
463 kumpf 1.82 while (1)
|
464 kumpf 1.81 {
|
465 kumpf 1.82 try
466 {
467 sleep_sem->wait();
468 }
|
469 mike 1.90.2.7 catch(...)
|
470 kumpf 1.82 {
471 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
472 mike 1.90.2.7 "ThreadPool::_loop: failure on sleep_sem->wait().");
|
473 kumpf 1.82 PEGASUS_ASSERT(false);
474 pool->_idleThreads.remove(myself);
475 pool->_currentThreads--;
476 PEG_METHOD_EXIT();
|
477 mike 1.90.2.7 return ((ThreadReturnType) 1);
|
478 kumpf 1.82 }
479
480 // When we awaken we reside on the _runningThreads queue, not the
481 // _idleThreads queue.
482
|
483 mike 1.90.2.7 ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
484 void *parm = 0;
485 Semaphore *blocking_sem = 0;
|
486 kumpf 1.82
487 try
488 {
|
489 mike 1.90.2.7 work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
|
490 kumpf 1.82 myself->reference_tsd("work func");
491 myself->dereference_tsd();
492 parm = myself->reference_tsd("work parm");
493 myself->dereference_tsd();
|
494 mike 1.90.2.7 blocking_sem =
495 (Semaphore *) myself->reference_tsd("blocking sem");
|
496 kumpf 1.82 myself->dereference_tsd();
497 }
|
498 mike 1.90.2.7 catch(...)
|
499 kumpf 1.82 {
500 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
501 mike 1.90.2.7 "ThreadPool::_loop: Failure accessing work func, work parm, "
502 "or blocking sem.");
|
503 kumpf 1.82 PEGASUS_ASSERT(false);
504 pool->_idleThreads.remove(myself);
505 pool->_currentThreads--;
506 PEG_METHOD_EXIT();
|
507 mike 1.90.2.7 return ((ThreadReturnType) 1);
|
508 kumpf 1.82 }
509
510 if (work == 0)
511 {
|
512 carolann.graves 1.84 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
513 mike 1.90.2.7 "ThreadPool::_loop: work func is 0, meaning we should exit.");
|
514 kumpf 1.82 break;
515 }
|
516 mike 1.2
|
517 mike 1.90.2.1 Time::gettimeofday(lastActivityTime);
|
518 konrad.r 1.67
|
519 kumpf 1.82 try
520 {
|
521 mike 1.90.2.7 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
522 "Work starting.");
|
523 kumpf 1.82 work(parm);
|
524 mike 1.90.2.7 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
525 "Work finished.");
|
526 kumpf 1.82 }
|
527 mike 1.90.2.7 catch(Exception & e)
|
528 kumpf 1.82 {
529 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
530 mike 1.90.2.7 String
531 ("Exception from work in ThreadPool::_loop: ")
532 + e.getMessage());
|
533 kumpf 1.82 }
|
534 kumpf 1.68 #if !defined(PEGASUS_OS_LSB)
|
535 mike 1.90.2.7 catch(const exception & e)
|
536 kumpf 1.82 {
537 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
538 mike 1.90.2.7 String
539 ("Exception from work in ThreadPool::_loop: ")
540 + e.what());
|
541 kumpf 1.82 }
|
542 kumpf 1.68 #endif
|
543 mike 1.90.2.7 catch(...)
|
544 kumpf 1.82 {
545 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
546 mike 1.90.2.7 "Unknown exception from work in ThreadPool::_loop.");
|
547 kumpf 1.82 }
|
548 kumpf 1.81
|
549 kumpf 1.82 // put myself back onto the available list
550 try
|
551 kumpf 1.57 {
|
552 mike 1.90.2.1 Time::gettimeofday(lastActivityTime);
|
553 kumpf 1.82 if (blocking_sem != 0)
554 {
555 blocking_sem->signal();
556 }
|
557 s.hills 1.49
|
558 mike 1.90 pool->_runningThreads.remove(myself);
559 pool->_idleThreads.insert_front(myself);
|
560 kumpf 1.82 }
|
561 mike 1.90.2.7 catch(...)
|
562 kumpf 1.82 {
563 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
564 mike 1.90.2.7 "ThreadPool::_loop: Adding thread to idle pool failed.");
|
565 kumpf 1.82 PEGASUS_ASSERT(false);
566 pool->_currentThreads--;
567 PEG_METHOD_EXIT();
|
568 mike 1.90.2.7 return ((ThreadReturnType) 1);
|
569 kumpf 1.82 }
|
570 kumpf 1.81 }
|
571 kumpf 1.82 }
|
572 mike 1.90.2.7 catch(const Exception & e)
|
573 kumpf 1.82 {
574 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
575 mike 1.90.2.7 "Caught exception: \"" + e.getMessage() +
576 "\". Exiting _loop.");
|
577 kumpf 1.82 }
|
578 mike 1.90.2.7 catch(...)
|
579 kumpf 1.82 {
580 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
581 mike 1.90.2.7 "Caught unrecognized exception. Exiting _loop.");
|
582 kumpf 1.81 }
|
583 kumpf 1.14
|
584 kumpf 1.81 PEG_METHOD_EXIT();
|
585 mike 1.90.2.7 return ((ThreadReturnType) 0);
|
586 mike 1.2 }
587
|
588 mike 1.90.2.7 ThreadStatus ThreadPool::allocate_and_awaken(void *parm,
589 ThreadReturnType
590 (PEGASUS_THREAD_CDECL *
591 work) (void *),
592 Semaphore * blocking)
|
593 mike 1.2 {
|
594 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
595
596 // Allocate_and_awaken will not run if the _dying flag is set.
597 // Once the lock is acquired, ~ThreadPool will not change
598 // the value of _dying until the lock is released.
599
600 try
601 {
|
602 mike 1.87 if (_dying.get())
|
603 kumpf 1.81 {
604 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
605 mike 1.90.2.7 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
|
606 konrad.r 1.86 return PEGASUS_THREAD_UNAVAILABLE;
|
607 kumpf 1.81 }
608 struct timeval start;
|
609 mike 1.90.2.1 Time::gettimeofday(&start);
|
610 mike 1.90.2.7 Thread *th = 0;
|
611 kumpf 1.81
|
612 mike 1.90 th = _idleThreads.remove_front();
|
613 kumpf 1.57
|
614 kumpf 1.81 if (th == 0)
615 {
|
616 mike 1.90.2.7 if ((_maxThreads == 0) ||
617 (_currentThreads.get() < Uint32(_maxThreads)))
|
618 kumpf 1.81 {
619 th = _initializeThread();
620 }
621 }
622
623 if (th == 0)
624 {
625 // ATTN-DME-P3-20031103: This trace message should not be
626 // be labeled TRC_DISCARDED_DATA, because it does not
627 // necessarily imply that a failure has occurred. However,
628 // this label is being used temporarily to help isolate
629 // the cause of client timeout problems.
630 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
631 mike 1.90.2.7 "ThreadPool::allocate_and_awaken: Insufficient resources: "
632 " pool = %s, running threads = %d, idle threads = %d",
633 _key, _runningThreads.size(), _idleThreads.size());
|
634 konrad.r 1.86 return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
|
635 kumpf 1.81 }
|
636 mike 1.2
|
637 kumpf 1.81 // initialize the thread data with the work function and parameters
638 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
639 mike 1.90.2.7 "Initializing thread with work function and parameters: parm = %p",
640 parm);
|
641 mike 1.2
|
642 kumpf 1.81 th->delete_tsd("work func");
643 th->put_tsd("work func", NULL,
|
644 mike 1.90.2.7 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
645 (void *)), (void *) work);
|
646 kumpf 1.81 th->delete_tsd("work parm");
|
647 mike 1.90.2.7 th->put_tsd("work parm", NULL, sizeof (void *), parm);
|
648 kumpf 1.81 th->delete_tsd("blocking sem");
649 if (blocking != 0)
|
650 mike 1.90.2.7 th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking);
|
651 kumpf 1.81
652 // put the thread on the running list
|
653 mike 1.90 _runningThreads.insert_front(th);
|
654 kumpf 1.81
655 // signal the thread's sleep semaphore to awaken it
|
656 mike 1.90.2.7 Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem");
|
657 kumpf 1.81 PEGASUS_ASSERT(sleep_sem != 0);
658
659 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
660 sleep_sem->signal();
661 th->dereference_tsd();
|
662 kumpf 1.57 }
|
663 mike 1.90.2.7 catch(...)
|
664 kumpf 1.57 {
|
665 kumpf 1.81 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
666 mike 1.90.2.7 "ThreadPool::allocate_and_awaken: Operation Failed.");
|
667 kumpf 1.81 PEG_METHOD_EXIT();
668 // ATTN: Error result has not yet been defined
|
669 konrad.r 1.86 return PEGASUS_THREAD_SETUP_FAILURE;
|
670 kumpf 1.57 }
|
671 kumpf 1.81 PEG_METHOD_EXIT();
|
672 konrad.r 1.86 return PEGASUS_THREAD_OK;
|
673 mike 1.2 }
674
|
675 kumpf 1.81 // caller is responsible for only calling this routine during slack periods
676 // but should call it at least once per _deallocateWait interval.
|
677 mday 1.12
|
678 kumpf 1.81 Uint32 ThreadPool::cleanupIdleThreads()
|
679 mike 1.2 {
|
680 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
681
682 Uint32 numThreadsCleanedUp = 0;
683
|
684 mike 1.90 Uint32 numIdleThreads = _idleThreads.size();
|
685 kumpf 1.81 for (Uint32 i = 0; i < numIdleThreads; i++)
686 {
687 // Do not dip below the minimum thread count
|
688 mike 1.90.2.7 if (_currentThreads.get() <= (Uint32) _minThreads)
|
689 kumpf 1.81 {
690 break;
691 }
692
|
693 mike 1.90.2.7 Thread *thread = _idleThreads.remove_back();
|
694 kumpf 1.81
|
695 mike 1.90.2.7 // If there are no more threads in the _idleThreads queue, we're
696 // done.
|
697 kumpf 1.81 if (thread == 0)
698 {
699 break;
700 }
701
|
702 mike 1.90.2.7 struct timeval *lastActivityTime;
|
703 kumpf 1.81 try
704 {
|
705 mike 1.90.2.7 lastActivityTime =
706 (struct timeval *) thread->
707 try_reference_tsd("last activity time");
|
708 kumpf 1.81 PEGASUS_ASSERT(lastActivityTime != 0);
709 }
|
710 mike 1.90.2.7 catch(...)
|
711 kumpf 1.81 {
712 PEGASUS_ASSERT(false);
|
713 mike 1.90 _idleThreads.insert_back(thread);
|
714 kumpf 1.81 break;
715 }
716
717 Boolean cleanupThisThread =
718 _timeIntervalExpired(lastActivityTime, &_deallocateWait);
719 thread->dereference_tsd();
720
721 if (cleanupThisThread)
722 {
723 _cleanupThread(thread);
724 _currentThreads--;
725 numThreadsCleanedUp++;
726 }
727 else
728 {
|
729 mike 1.90 _idleThreads.insert_front(thread);
|
730 kumpf 1.81 }
|
731 konrad.r 1.67 }
|
732 kumpf 1.81
733 PEG_METHOD_EXIT();
734 return numThreadsCleanedUp;
|
735 konrad.r 1.67 }
|
736 mday 1.19
|
737 mike 1.90.2.7 void ThreadPool::_cleanupThread(Thread * thread)
|
738 mday 1.19 {
|
739 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
740
741 // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
742 thread->delete_tsd("work func");
|
743 mike 1.90.2.7 thread->put_tsd("work func", 0,
744 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
745 (void *)), (void *) 0);
|
746 kumpf 1.81 thread->delete_tsd("work parm");
|
747 mike 1.90.2.7 thread->put_tsd("work parm", 0, sizeof (void *), 0);
|
748 kumpf 1.81
749 // signal the thread's sleep semaphore to awaken it
|
750 mike 1.90.2.7 Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem");
|
751 kumpf 1.81 PEGASUS_ASSERT(sleep_sem != 0);
752 sleep_sem->signal();
753 thread->dereference_tsd();
754
755 thread->join();
756 delete thread;
757
758 PEG_METHOD_EXIT();
|
759 mday 1.19 }
760
|
761 mike 1.90.2.7 Boolean ThreadPool::_timeIntervalExpired(struct timeval *start,
762 struct timeval *interval)
|
763 mday 1.19 {
|
764 kumpf 1.81 // never time out if the interval is zero
765 if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
766 {
767 return false;
768 }
769
770 struct timeval now, finish, remaining;
771 Uint32 usec;
|
772 mike 1.90.2.1 Time::gettimeofday(&now);
|
773 mike 1.90.2.7 Time::gettimeofday(&remaining); // Avoid valgrind error
|
774 kumpf 1.81
775 finish.tv_sec = start->tv_sec + interval->tv_sec;
776 usec = start->tv_usec + interval->tv_usec;
777 finish.tv_sec += (usec / 1000000);
778 usec %= 1000000;
779 finish.tv_usec = usec;
780
|
781 mike 1.90.2.1 return (Time::subtract(&remaining, &finish, &now) != 0);
|
782 mday 1.19 }
783
|
784 kumpf 1.81 void ThreadPool::_deleteSemaphore(void *p)
|
785 mday 1.19 {
|
786 mike 1.90.2.7 delete(Semaphore *) p;
|
787 mday 1.19 }
788
|
789 mike 1.90.2.7 Thread *ThreadPool::_initializeThread()
|
790 mday 1.19 {
|
791 kumpf 1.81 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
792
|
793 mike 1.90.2.7 Thread *th = (Thread *) new Thread(_loop, this, false);
|
794 kumpf 1.81
795 // allocate a sleep semaphore and pass it in the thread context
796 // initial count is zero, loop function will sleep until
797 // we signal the semaphore
|
798 mike 1.90.2.7 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
799 th->put_tsd("sleep sem", &_deleteSemaphore, sizeof (Semaphore),
800 (void *) sleep_sem);
|
801 kumpf 1.81
|
802 mike 1.90.2.7 struct timeval *lastActivityTime =
803 (struct timeval *)::operator new(sizeof (struct timeval));
|
804 mike 1.90.2.1 Time::gettimeofday(lastActivityTime);
|
805 kumpf 1.81
806 th->put_tsd("last activity time", thread_data::default_delete,
|
807 mike 1.90.2.7 sizeof (struct timeval), (void *) lastActivityTime);
|
808 kumpf 1.81 // thread will enter _loop() and sleep on sleep_sem until we signal it
809
|
810 konrad.r 1.86 if (th->run() != PEGASUS_THREAD_OK)
|
811 kumpf 1.81 {
|
812 mike 1.90.2.7 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
813 "Could not create thread. Error code is %d.", errno);
|
814 kumpf 1.81 delete th;
815 return 0;
816 }
817 _currentThreads++;
|
818 mike 1.90.2.1 Threads::yield();
|
819 kumpf 1.81
820 PEG_METHOD_EXIT();
821 return th;
|
822 mday 1.19 }
|
823 mike 1.2
|
824 mike 1.90.2.7 void ThreadPool::_addToIdleThreadsQueue(Thread * th)
|
825 kumpf 1.81 {
826 if (th == 0)
827 {
828 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
829 mike 1.90.2.7 "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
|
830 kumpf 1.81 throw NullPointer();
831 }
832
833 try
834 {
|
835 mike 1.90 _idleThreads.insert_front(th);
|
836 kumpf 1.81 }
|
837 mike 1.90.2.7 catch(...)
|
838 kumpf 1.81 {
839 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
840 mike 1.90.2.7 "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
841 "failed.");
|
842 kumpf 1.81 }
843 }
|
844 mike 1.2
|
845 mike 1.90.2.1 // ATTN: not sure where to put this!
846 #ifdef PEGASUS_ZOS_SECURITY
|
847 mike 1.90.2.7 bool isEnhancedSecurity = 99;
|
848 mike 1.90.2.1 #endif
849
|
850 mike 1.2 PEGASUS_NAMESPACE_END
|