1 mike 1.2 //%2006////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
13 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #include "ThreadPool.h"
35 #include "Thread.h"
36 #include <exception>
37 #include <Pegasus/Common/Tracer.h>
38 #include "Time.h"
39
40 PEGASUS_USING_STD;
41
42 PEGASUS_NAMESPACE_BEGIN
43 mike 1.2
44 ///////////////////////////////////////////////////////////////////////////////
45 //
46 // ThreadPool
47 //
48 ///////////////////////////////////////////////////////////////////////////////
49
|
50 kumpf 1.4 ThreadPool::ThreadPool(
51 Sint16 initialSize,
52 const char* key,
53 Sint16 minThreads,
54 Sint16 maxThreads,
55 struct timeval
56 &deallocateWait)
57 : _maxThreads(maxThreads),
58 _minThreads(minThreads),
59 _currentThreads(0),
60 _idleThreads(),
61 _runningThreads(),
62 _dying(0)
|
63 mike 1.2 {
64 _deallocateWait.tv_sec = deallocateWait.tv_sec;
65 _deallocateWait.tv_usec = deallocateWait.tv_usec;
66
67 memset(_key, 0x00, 17);
68 if (key != 0)
69 {
70 strncpy(_key, key, 16);
71 }
72
73 if ((_maxThreads > 0) && (_maxThreads < initialSize))
74 {
75 _maxThreads = initialSize;
76 }
77
78 if (_minThreads > initialSize)
79 {
80 _minThreads = initialSize;
81 }
82
83 for (int i = 0; i < initialSize; i++)
84 mike 1.2 {
85 _addToIdleThreadsQueue(_initializeThread());
86 }
87 }
88
89 ThreadPool::~ThreadPool()
90 {
91 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
92
93 try
94 {
95 // Set the dying flag so all thread know the destructor has been
96 // entered
97 _dying++;
|
98 marek 1.15 PEG_TRACE((TRC_THREAD, Tracer::LEVEL3,
|
99 marek 1.9 "Cleaning up %d idle threads.", _currentThreads.get()));
|
100 mike 1.2
101 while (_currentThreads.get() > 0)
102 {
|
103 kumpf 1.4 Thread* thread = _idleThreads.remove_front();
|
104 mike 1.2 if (thread != 0)
105 {
106 _cleanupThread(thread);
107 _currentThreads--;
108 }
109 else
110 {
111 Threads::yield();
112 }
113 }
114 }
|
115 kumpf 1.4 catch (...)
|
116 mike 1.2 {
117 }
118 }
119
|
120 kumpf 1.4 ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
|
121 mike 1.2 {
122 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
123
124 try
125 {
126 Thread *myself = (Thread *) parm;
127 PEGASUS_ASSERT(myself != 0);
128
129 // Set myself into thread specific storage
130 // This will allow code to get its own Thread
131 Thread::setCurrent(myself);
132
133 ThreadPool *pool = (ThreadPool *) myself->get_parm();
134 PEGASUS_ASSERT(pool != 0);
135
136 Semaphore *sleep_sem = 0;
137 struct timeval *lastActivityTime = 0;
138
139 try
140 {
141 sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem");
142 mike 1.2 myself->dereference_tsd();
143 PEGASUS_ASSERT(sleep_sem != 0);
144
145 lastActivityTime =
146 (struct timeval *) myself->
147 reference_tsd("last activity time");
148 myself->dereference_tsd();
149 PEGASUS_ASSERT(lastActivityTime != 0);
150 }
|
151 kumpf 1.4 catch (...)
|
152 mike 1.2 {
|
153 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
154 kumpf 1.4 "ThreadPool::_loop: Failure getting sleep_sem or "
155 "lastActivityTime.");
|
156 mike 1.2 PEGASUS_ASSERT(false);
157 pool->_idleThreads.remove(myself);
158 pool->_currentThreads--;
159 PEG_METHOD_EXIT();
|
160 kumpf 1.4 return (ThreadReturnType) 1;
|
161 mike 1.2 }
162
163 while (1)
164 {
165 try
166 {
167 sleep_sem->wait();
168 }
|
169 kumpf 1.4 catch (...)
|
170 mike 1.2 {
|
171 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
172 kumpf 1.4 "ThreadPool::_loop: failure on sleep_sem->wait().");
|
173 mike 1.2 PEGASUS_ASSERT(false);
174 pool->_idleThreads.remove(myself);
175 pool->_currentThreads--;
176 PEG_METHOD_EXIT();
|
177 kumpf 1.4 return (ThreadReturnType) 1;
|
178 mike 1.2 }
179
180 // When we awaken we reside on the _runningThreads queue, not the
181 // _idleThreads queue.
182
183 ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
|
184 kumpf 1.16 void *workParm = 0;
|
185 mike 1.2 Semaphore *blocking_sem = 0;
186
187 try
188 {
189 work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
190 myself->reference_tsd("work func");
191 myself->dereference_tsd();
|
192 kumpf 1.16 workParm = myself->reference_tsd("work parm");
|
193 mike 1.2 myself->dereference_tsd();
194 blocking_sem =
195 (Semaphore *) myself->reference_tsd("blocking sem");
196 myself->dereference_tsd();
197 }
|
198 kumpf 1.4 catch (...)
|
199 mike 1.2 {
|
200 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
201 kumpf 1.4 "ThreadPool::_loop: Failure accessing work func, work "
202 "parm, or blocking sem.");
|
203 mike 1.2 PEGASUS_ASSERT(false);
204 pool->_idleThreads.remove(myself);
205 pool->_currentThreads--;
206 PEG_METHOD_EXIT();
|
207 kumpf 1.4 return (ThreadReturnType) 1;
|
208 mike 1.2 }
209
210 if (work == 0)
211 {
|
212 marek 1.9 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
|
213 kumpf 1.4 "ThreadPool::_loop: work func is 0, meaning we should "
214 "exit.");
|
215 mike 1.2 break;
216 }
217
218 Time::gettimeofday(lastActivityTime);
219
220 try
221 {
|
222 marek 1.9 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
|
223 mike 1.2 "Work starting.");
|
224 kumpf 1.16 work(workParm);
|
225 marek 1.9 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
|
226 mike 1.2 "Work finished.");
227 }
|
228 kumpf 1.4 catch (Exception& e)
|
229 mike 1.2 {
|
230 thilo.boehm 1.18 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
231 "Exception from work in ThreadPool::_loop: %s",
232 (const char*)e.getMessage().getCString()));
|
233 mike 1.2 }
|
234 kumpf 1.4 catch (const exception& e)
|
235 mike 1.2 {
|
236 thilo.boehm 1.18 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
237 "Exception from work in ThreadPool::_loop: %s",e.what()));
|
238 mike 1.2 }
|
239 kumpf 1.4 catch (...)
|
240 mike 1.2 {
|
241 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
242 kumpf 1.4 "Unknown exception from work in ThreadPool::_loop.");
|
243 mike 1.2 }
244
245 // put myself back onto the available list
246 try
247 {
248 Time::gettimeofday(lastActivityTime);
249 if (blocking_sem != 0)
250 {
251 blocking_sem->signal();
252 }
253
254 pool->_runningThreads.remove(myself);
255 pool->_idleThreads.insert_front(myself);
256 }
|
257 kumpf 1.4 catch (...)
|
258 mike 1.2 {
|
259 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
260 kumpf 1.4 "ThreadPool::_loop: Adding thread to idle pool failed.");
|
261 mike 1.2 PEGASUS_ASSERT(false);
262 pool->_currentThreads--;
263 PEG_METHOD_EXIT();
|
264 kumpf 1.4 return (ThreadReturnType) 1;
|
265 mike 1.2 }
266 }
267 }
|
268 kumpf 1.4 catch (const Exception & e)
|
269 mike 1.2 {
|
270 thilo.boehm 1.18 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
271 "Caught exception: \"%s\". Exiting _loop.",
272 (const char*)e.getMessage().getCString()));
|
273 mike 1.2 }
|
274 kumpf 1.4 catch (...)
|
275 mike 1.2 {
|
276 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
277 kumpf 1.4 "Caught unrecognized exception. Exiting _loop.");
|
278 mike 1.2 }
279
280 PEG_METHOD_EXIT();
|
281 kumpf 1.4 return (ThreadReturnType) 0;
|
282 mike 1.2 }
283
|
284 kumpf 1.4 ThreadStatus ThreadPool::allocate_and_awaken(
285 void* parm,
286 ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*),
287 Semaphore* blocking)
|
288 mike 1.2 {
289 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
290
291 // Allocate_and_awaken will not run if the _dying flag is set.
292 // Once the lock is acquired, ~ThreadPool will not change
293 // the value of _dying until the lock is released.
294
295 try
296 {
297 if (_dying.get())
298 {
|
299 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL3,
|
300 kumpf 1.4 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
|
301 mike 1.2 return PEGASUS_THREAD_UNAVAILABLE;
302 }
303 struct timeval start;
304 Time::gettimeofday(&start);
305 Thread *th = 0;
306
307 th = _idleThreads.remove_front();
308
309 if (th == 0)
310 {
311 if ((_maxThreads == 0) ||
312 (_currentThreads.get() < Uint32(_maxThreads)))
313 {
314 th = _initializeThread();
315 }
316 }
317
318 if (th == 0)
319 {
|
320 marek 1.15 PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
|
321 kumpf 1.3 "ThreadPool::allocate_and_awaken: Insufficient resources: "
322 " pool = %s, running threads = %d, idle threads = %d",
|
323 marek 1.9 _key, _runningThreads.size(), _idleThreads.size()));
|
324 mike 1.2 return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
325 }
326
327 // initialize the thread data with the work function and parameters
|
328 marek 1.9 PEG_TRACE((TRC_THREAD, Tracer::LEVEL4,
|
329 marek 1.14 "Initializing thread(%s)"
330 " with work function and parameters: parm = %p",
331 Threads::id(th->getThreadHandle().thid).buffer,
|
332 marek 1.9 parm));
|
333 mike 1.2
334 th->delete_tsd("work func");
335 th->put_tsd("work func", NULL,
336 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
337 (void *)), (void *) work);
338 th->delete_tsd("work parm");
339 th->put_tsd("work parm", NULL, sizeof (void *), parm);
340 th->delete_tsd("blocking sem");
341 if (blocking != 0)
342 th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking);
343
344 // put the thread on the running list
345 _runningThreads.insert_front(th);
346
347 // signal the thread's sleep semaphore to awaken it
348 Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem");
349 PEGASUS_ASSERT(sleep_sem != 0);
350
|
351 kumpf 1.10 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
352 "Signal thread to awaken");
|
353 mike 1.2 sleep_sem->signal();
354 th->dereference_tsd();
355 }
|
356 kumpf 1.4 catch (...)
|
357 mike 1.2 {
|
358 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
359 mike 1.2 "ThreadPool::allocate_and_awaken: Operation Failed.");
360 PEG_METHOD_EXIT();
361 // ATTN: Error result has not yet been defined
362 return PEGASUS_THREAD_SETUP_FAILURE;
363 }
364 PEG_METHOD_EXIT();
365 return PEGASUS_THREAD_OK;
366 }
367
368 // caller is responsible for only calling this routine during slack periods
369 // but should call it at least once per _deallocateWait interval.
370
371 Uint32 ThreadPool::cleanupIdleThreads()
372 {
373 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
374
375 Uint32 numThreadsCleanedUp = 0;
376
|
377 kumpf 1.7 Uint32 numIdleThreads = _idleThreads.size();
378 for (Uint32 i = 0; i < numIdleThreads; i++)
|
379 mike 1.2 {
380 // Do not dip below the minimum thread count
381 if (_currentThreads.get() <= (Uint32) _minThreads)
382 {
383 break;
384 }
385
386 Thread *thread = _idleThreads.remove_back();
387
388 // If there are no more threads in the _idleThreads queue, we're
389 // done.
390 if (thread == 0)
391 {
392 break;
393 }
394
|
395 kumpf 1.20 void* tsd = 0;
396 if (!thread->try_reference_tsd("last activity time", &tsd))
|
397 mike 1.2 {
398 PEGASUS_ASSERT(false);
399 _idleThreads.insert_back(thread);
400 break;
401 }
|
402 kumpf 1.20 struct timeval *lastActivityTime =
403 reinterpret_cast<struct timeval*>(tsd);
|
404 kumpf 1.17 PEGASUS_ASSERT(lastActivityTime != 0);
|
405 mike 1.2
406 Boolean cleanupThisThread =
407 _timeIntervalExpired(lastActivityTime, &_deallocateWait);
408 thread->dereference_tsd();
409
410 if (cleanupThisThread)
411 {
412 _cleanupThread(thread);
413 _currentThreads--;
414 numThreadsCleanedUp++;
415 }
416 else
417 {
418 _idleThreads.insert_front(thread);
419 }
420 }
421
422 PEG_METHOD_EXIT();
423 return numThreadsCleanedUp;
424 }
425
426 mike 1.2 void ThreadPool::_cleanupThread(Thread * thread)
427 {
428 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
429
430 // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
431 thread->delete_tsd("work func");
432 thread->put_tsd("work func", 0,
433 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
434 (void *)), (void *) 0);
435 thread->delete_tsd("work parm");
436 thread->put_tsd("work parm", 0, sizeof (void *), 0);
437
438 // signal the thread's sleep semaphore to awaken it
439 Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem");
440 PEGASUS_ASSERT(sleep_sem != 0);
441 sleep_sem->signal();
442 thread->dereference_tsd();
443
444 thread->join();
445 delete thread;
446
447 mike 1.2 PEG_METHOD_EXIT();
448 }
449
|
450 kumpf 1.4 Boolean ThreadPool::_timeIntervalExpired(
451 struct timeval* start,
452 struct timeval* interval)
|
453 mike 1.2 {
|
454 kumpf 1.6 PEGASUS_ASSERT(interval != 0);
455
|
456 mike 1.2 // never time out if the interval is zero
|
457 kumpf 1.6 if ((interval->tv_sec == 0) && (interval->tv_usec == 0))
|
458 mike 1.2 {
459 return false;
460 }
461
462 struct timeval now, finish, remaining;
463 Uint32 usec;
464 Time::gettimeofday(&now);
|
465 karl 1.19
466 memset(&remaining, 0, sizeof(remaining));
|
467 mike 1.2
468 finish.tv_sec = start->tv_sec + interval->tv_sec;
469 usec = start->tv_usec + interval->tv_usec;
470 finish.tv_sec += (usec / 1000000);
471 usec %= 1000000;
472 finish.tv_usec = usec;
473
474 return (Time::subtract(&remaining, &finish, &now) != 0);
475 }
476
477 void ThreadPool::_deleteSemaphore(void *p)
478 {
479 delete(Semaphore *) p;
480 }
481
482 Thread *ThreadPool::_initializeThread()
483 {
484 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
485
486 Thread *th = (Thread *) new Thread(_loop, this, false);
487
488 mike 1.2 // allocate a sleep semaphore and pass it in the thread context
489 // initial count is zero, loop function will sleep until
490 // we signal the semaphore
491 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
|
492 kumpf 1.4 th->put_tsd(
493 "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);
|
494 mike 1.2
|
495 kumpf 1.4 struct timeval* lastActivityTime =
|
496 mike 1.2 (struct timeval *)::operator new(sizeof (struct timeval));
497 Time::gettimeofday(lastActivityTime);
498
|
499 kumpf 1.4 th->put_tsd(
500 "last activity time",
501 thread_data::default_delete,
502 sizeof(struct timeval),
503 (void*) lastActivityTime);
|
504 mike 1.2 // thread will enter _loop() and sleep on sleep_sem until we signal it
505
506 if (th->run() != PEGASUS_THREAD_OK)
507 {
|
508 marek 1.15 PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
|
509 marek 1.9 "Could not create thread. Error code is %d.", errno));
|
510 mike 1.2 delete th;
511 return 0;
512 }
513 _currentThreads++;
514
515 PEG_METHOD_EXIT();
516 return th;
517 }
518
519 void ThreadPool::_addToIdleThreadsQueue(Thread * th)
520 {
521 if (th == 0)
522 {
|
523 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
524 kumpf 1.4 "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
|
525 mike 1.2 throw NullPointer();
526 }
527
528 try
529 {
530 _idleThreads.insert_front(th);
531 }
|
532 kumpf 1.4 catch (...)
|
533 mike 1.2 {
|
534 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
535 kumpf 1.4 "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
536 "failed.");
|
537 mike 1.2 }
538 }
539
540 PEGASUS_NAMESPACE_END
|