1 mike 1.2 //%2006////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
13 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #include "ThreadPool.h"
35 #include "Thread.h"
36 #include <exception>
37 #include <Pegasus/Common/Tracer.h>
38 #include "Time.h"
39
40 PEGASUS_USING_STD;
41
42 PEGASUS_NAMESPACE_BEGIN
43 mike 1.2
44 ///////////////////////////////////////////////////////////////////////////////
45 //
46 // ThreadPool
47 //
48 ///////////////////////////////////////////////////////////////////////////////
49
|
50 kumpf 1.4 ThreadPool::ThreadPool(
51 Sint16 initialSize,
52 const char* key,
53 Sint16 minThreads,
54 Sint16 maxThreads,
55 struct timeval
56 &deallocateWait)
57 : _maxThreads(maxThreads),
58 _minThreads(minThreads),
59 _currentThreads(0),
60 _idleThreads(),
61 _runningThreads(),
62 _dying(0)
|
63 mike 1.2 {
64 _deallocateWait.tv_sec = deallocateWait.tv_sec;
65 _deallocateWait.tv_usec = deallocateWait.tv_usec;
66
67 memset(_key, 0x00, 17);
68 if (key != 0)
69 {
70 strncpy(_key, key, 16);
71 }
72
73 if ((_maxThreads > 0) && (_maxThreads < initialSize))
74 {
75 _maxThreads = initialSize;
76 }
77
78 if (_minThreads > initialSize)
79 {
80 _minThreads = initialSize;
81 }
82
83 for (int i = 0; i < initialSize; i++)
84 mike 1.2 {
85 _addToIdleThreadsQueue(_initializeThread());
86 }
87 }
88
89 ThreadPool::~ThreadPool()
90 {
91 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
92
93 try
94 {
95 // Set the dying flag so all thread know the destructor has been
96 // entered
97 _dying++;
98 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
|
99 kumpf 1.4 "Cleaning up %d idle threads.", _currentThreads.get());
|
100 mike 1.2
101 while (_currentThreads.get() > 0)
102 {
|
103 kumpf 1.4 Thread* thread = _idleThreads.remove_front();
|
104 mike 1.2 if (thread != 0)
105 {
106 _cleanupThread(thread);
107 _currentThreads--;
108 }
109 else
110 {
111 Threads::yield();
112 }
113 }
114 }
|
115 kumpf 1.4 catch (...)
|
116 mike 1.2 {
117 }
118 }
119
|
120 kumpf 1.4 ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
|
121 mike 1.2 {
122 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
123
124 try
125 {
126 Thread *myself = (Thread *) parm;
127 PEGASUS_ASSERT(myself != 0);
128
129 // Set myself into thread specific storage
130 // This will allow code to get its own Thread
131 Thread::setCurrent(myself);
132
133 ThreadPool *pool = (ThreadPool *) myself->get_parm();
134 PEGASUS_ASSERT(pool != 0);
135
136 Semaphore *sleep_sem = 0;
137 struct timeval *lastActivityTime = 0;
138
139 try
140 {
141 sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem");
142 mike 1.2 myself->dereference_tsd();
143 PEGASUS_ASSERT(sleep_sem != 0);
144
145 lastActivityTime =
146 (struct timeval *) myself->
147 reference_tsd("last activity time");
148 myself->dereference_tsd();
149 PEGASUS_ASSERT(lastActivityTime != 0);
150 }
|
151 kumpf 1.4 catch (...)
|
152 mike 1.2 {
153 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
154 kumpf 1.4 "ThreadPool::_loop: Failure getting sleep_sem or "
155 "lastActivityTime.");
|
156 mike 1.2 PEGASUS_ASSERT(false);
157 pool->_idleThreads.remove(myself);
158 pool->_currentThreads--;
159 PEG_METHOD_EXIT();
|
160 kumpf 1.4 return (ThreadReturnType) 1;
|
161 mike 1.2 }
162
163 while (1)
164 {
165 try
166 {
167 sleep_sem->wait();
168 }
|
169 kumpf 1.4 catch (...)
|
170 mike 1.2 {
171 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
172 kumpf 1.4 "ThreadPool::_loop: failure on sleep_sem->wait().");
|
173 mike 1.2 PEGASUS_ASSERT(false);
174 pool->_idleThreads.remove(myself);
175 pool->_currentThreads--;
176 PEG_METHOD_EXIT();
|
177 kumpf 1.4 return (ThreadReturnType) 1;
|
178 mike 1.2 }
179
180 // When we awaken we reside on the _runningThreads queue, not the
181 // _idleThreads queue.
182
183 ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
184 void *parm = 0;
185 Semaphore *blocking_sem = 0;
186
187 try
188 {
189 work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
190 myself->reference_tsd("work func");
191 myself->dereference_tsd();
192 parm = myself->reference_tsd("work parm");
193 myself->dereference_tsd();
194 blocking_sem =
195 (Semaphore *) myself->reference_tsd("blocking sem");
196 myself->dereference_tsd();
197 }
|
198 kumpf 1.4 catch (...)
|
199 mike 1.2 {
200 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
201 kumpf 1.4 "ThreadPool::_loop: Failure accessing work func, work "
202 "parm, or blocking sem.");
|
203 mike 1.2 PEGASUS_ASSERT(false);
204 pool->_idleThreads.remove(myself);
205 pool->_currentThreads--;
206 PEG_METHOD_EXIT();
|
207 kumpf 1.4 return (ThreadReturnType) 1;
|
208 mike 1.2 }
209
210 if (work == 0)
211 {
212 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
213 kumpf 1.4 "ThreadPool::_loop: work func is 0, meaning we should "
214 "exit.");
|
215 mike 1.2 break;
216 }
217
218 Time::gettimeofday(lastActivityTime);
219
220 try
221 {
222 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
223 "Work starting.");
224 work(parm);
225 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
226 "Work finished.");
227 }
|
228 kumpf 1.4 catch (Exception& e)
|
229 mike 1.2 {
230 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
231 kumpf 1.4 String("Exception from work in ThreadPool::_loop: ") +
232 e.getMessage());
|
233 mike 1.2 }
234 #if !defined(PEGASUS_OS_LSB)
|
235 kumpf 1.4 catch (const exception& e)
|
236 mike 1.2 {
237 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
238 kumpf 1.4 String("Exception from work in ThreadPool::_loop: ") +
239 e.what());
|
240 mike 1.2 }
241 #endif
|
242 kumpf 1.4 catch (...)
|
243 mike 1.2 {
244 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
245 kumpf 1.4 "Unknown exception from work in ThreadPool::_loop.");
|
246 mike 1.2 }
247
248 // put myself back onto the available list
249 try
250 {
251 Time::gettimeofday(lastActivityTime);
252 if (blocking_sem != 0)
253 {
254 blocking_sem->signal();
255 }
256
257 pool->_runningThreads.remove(myself);
258 pool->_idleThreads.insert_front(myself);
259 }
|
260 kumpf 1.4 catch (...)
|
261 mike 1.2 {
262 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
263 kumpf 1.4 "ThreadPool::_loop: Adding thread to idle pool failed.");
|
264 mike 1.2 PEGASUS_ASSERT(false);
265 pool->_currentThreads--;
266 PEG_METHOD_EXIT();
|
267 kumpf 1.4 return (ThreadReturnType) 1;
|
268 mike 1.2 }
269 }
270 }
|
271 kumpf 1.4 catch (const Exception & e)
|
272 mike 1.2 {
273 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
274 kumpf 1.4 "Caught exception: \"" + e.getMessage() + "\". Exiting _loop.");
|
275 mike 1.2 }
|
276 kumpf 1.4 catch (...)
|
277 mike 1.2 {
278 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
279 kumpf 1.4 "Caught unrecognized exception. Exiting _loop.");
|
280 mike 1.2 }
281
282 PEG_METHOD_EXIT();
|
283 kumpf 1.4 return (ThreadReturnType) 0;
|
284 mike 1.2 }
285
|
286 kumpf 1.4 ThreadStatus ThreadPool::allocate_and_awaken(
287 void* parm,
288 ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*),
289 Semaphore* blocking)
|
290 mike 1.2 {
291 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
292
293 // Allocate_and_awaken will not run if the _dying flag is set.
294 // Once the lock is acquired, ~ThreadPool will not change
295 // the value of _dying until the lock is released.
296
297 try
298 {
299 if (_dying.get())
300 {
301 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
302 kumpf 1.4 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
|
303 mike 1.2 return PEGASUS_THREAD_UNAVAILABLE;
304 }
305 struct timeval start;
306 Time::gettimeofday(&start);
307 Thread *th = 0;
308
309 th = _idleThreads.remove_front();
310
311 if (th == 0)
312 {
313 if ((_maxThreads == 0) ||
314 (_currentThreads.get() < Uint32(_maxThreads)))
315 {
316 th = _initializeThread();
317 }
318 }
319
320 if (th == 0)
321 {
|
322 kumpf 1.3 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
323 "ThreadPool::allocate_and_awaken: Insufficient resources: "
324 " pool = %s, running threads = %d, idle threads = %d",
325 _key, _runningThreads.size(), _idleThreads.size());
|
326 mike 1.2 return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
327 }
328
329 // initialize the thread data with the work function and parameters
330 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
331 kumpf 1.4 "Initializing thread with work function and parameters: parm = %p",
332 parm);
|
333 mike 1.2
334 th->delete_tsd("work func");
335 th->put_tsd("work func", NULL,
336 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
337 (void *)), (void *) work);
338 th->delete_tsd("work parm");
339 th->put_tsd("work parm", NULL, sizeof (void *), parm);
340 th->delete_tsd("blocking sem");
341 if (blocking != 0)
342 th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking);
343
344 // put the thread on the running list
345 _runningThreads.insert_front(th);
346
347 // signal the thread's sleep semaphore to awaken it
348 Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem");
349 PEGASUS_ASSERT(sleep_sem != 0);
350
351 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
352 sleep_sem->signal();
353 th->dereference_tsd();
354 mike 1.2 }
|
355 kumpf 1.4 catch (...)
|
356 mike 1.2 {
357 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
358 "ThreadPool::allocate_and_awaken: Operation Failed.");
359 PEG_METHOD_EXIT();
360 // ATTN: Error result has not yet been defined
361 return PEGASUS_THREAD_SETUP_FAILURE;
362 }
363 PEG_METHOD_EXIT();
364 return PEGASUS_THREAD_OK;
365 }
366
367 // caller is responsible for only calling this routine during slack periods
368 // but should call it at least once per _deallocateWait interval.
369
370 Uint32 ThreadPool::cleanupIdleThreads()
371 {
372 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
373
374 Uint32 numThreadsCleanedUp = 0;
375
376 Uint32 numIdleThreads = _idleThreads.size();
377 mike 1.2 for (Uint32 i = 0; i < numIdleThreads; i++)
378 {
379 // Do not dip below the minimum thread count
380 if (_currentThreads.get() <= (Uint32) _minThreads)
381 {
382 break;
383 }
384
385 Thread *thread = _idleThreads.remove_back();
386
387 // If there are no more threads in the _idleThreads queue, we're
388 // done.
389 if (thread == 0)
390 {
391 break;
392 }
393
394 struct timeval *lastActivityTime;
395 try
396 {
397 lastActivityTime =
398 mike 1.2 (struct timeval *) thread->
399 try_reference_tsd("last activity time");
400 PEGASUS_ASSERT(lastActivityTime != 0);
401 }
|
402 kumpf 1.4 catch (...)
|
403 mike 1.2 {
404 PEGASUS_ASSERT(false);
405 _idleThreads.insert_back(thread);
406 break;
407 }
408
409 Boolean cleanupThisThread =
410 _timeIntervalExpired(lastActivityTime, &_deallocateWait);
411 thread->dereference_tsd();
412
413 if (cleanupThisThread)
414 {
415 _cleanupThread(thread);
416 _currentThreads--;
417 numThreadsCleanedUp++;
418 }
419 else
420 {
421 _idleThreads.insert_front(thread);
422 }
423 }
424 mike 1.2
425 PEG_METHOD_EXIT();
426 return numThreadsCleanedUp;
427 }
428
429 void ThreadPool::_cleanupThread(Thread * thread)
430 {
431 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
432
433 // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
434 thread->delete_tsd("work func");
435 thread->put_tsd("work func", 0,
436 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
437 (void *)), (void *) 0);
438 thread->delete_tsd("work parm");
439 thread->put_tsd("work parm", 0, sizeof (void *), 0);
440
441 // signal the thread's sleep semaphore to awaken it
442 Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem");
443 PEGASUS_ASSERT(sleep_sem != 0);
444 sleep_sem->signal();
445 mike 1.2 thread->dereference_tsd();
446
447 thread->join();
448 delete thread;
449
450 PEG_METHOD_EXIT();
451 }
452
|
453 kumpf 1.4 Boolean ThreadPool::_timeIntervalExpired(
454 struct timeval* start,
455 struct timeval* interval)
|
456 mike 1.2 {
457 // never time out if the interval is zero
458 if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
459 {
460 return false;
461 }
462
463 struct timeval now, finish, remaining;
464 Uint32 usec;
465 Time::gettimeofday(&now);
466 Time::gettimeofday(&remaining); // Avoid valgrind error
467
468 finish.tv_sec = start->tv_sec + interval->tv_sec;
469 usec = start->tv_usec + interval->tv_usec;
470 finish.tv_sec += (usec / 1000000);
471 usec %= 1000000;
472 finish.tv_usec = usec;
473
474 return (Time::subtract(&remaining, &finish, &now) != 0);
475 }
476
477 mike 1.2 void ThreadPool::_deleteSemaphore(void *p)
478 {
479 delete(Semaphore *) p;
480 }
481
482 Thread *ThreadPool::_initializeThread()
483 {
484 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
485
486 Thread *th = (Thread *) new Thread(_loop, this, false);
487
488 // allocate a sleep semaphore and pass it in the thread context
489 // initial count is zero, loop function will sleep until
490 // we signal the semaphore
491 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
|
492 kumpf 1.4 th->put_tsd(
493 "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);
|
494 mike 1.2
|
495 kumpf 1.4 struct timeval* lastActivityTime =
|
496 mike 1.2 (struct timeval *)::operator new(sizeof (struct timeval));
497 Time::gettimeofday(lastActivityTime);
498
|
499 kumpf 1.4 th->put_tsd(
500 "last activity time",
501 thread_data::default_delete,
502 sizeof(struct timeval),
503 (void*) lastActivityTime);
|
504 mike 1.2 // thread will enter _loop() and sleep on sleep_sem until we signal it
505
506 if (th->run() != PEGASUS_THREAD_OK)
507 {
508 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
|
509 kumpf 1.4 "Could not create thread. Error code is %d.", errno);
|
510 mike 1.2 delete th;
511 return 0;
512 }
513 _currentThreads++;
514 Threads::yield();
515
516 PEG_METHOD_EXIT();
517 return th;
518 }
519
520 void ThreadPool::_addToIdleThreadsQueue(Thread * th)
521 {
522 if (th == 0)
523 {
524 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
525 kumpf 1.4 "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
|
526 mike 1.2 throw NullPointer();
527 }
528
529 try
530 {
531 _idleThreads.insert_front(th);
532 }
|
533 kumpf 1.4 catch (...)
|
534 mike 1.2 {
535 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
536 kumpf 1.4 "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
537 "failed.");
|
538 mike 1.2 }
539 }
540
541 PEGASUS_NAMESPACE_END
|