1 mike 1.2 //%2006////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
13 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #include "ThreadPool.h"
35 #include "Thread.h"
36 #include <exception>
37 #include <Pegasus/Common/Tracer.h>
38 #include "Time.h"
39
40 PEGASUS_USING_STD;
41
42 PEGASUS_NAMESPACE_BEGIN
43 mike 1.2
44 ///////////////////////////////////////////////////////////////////////////////
45 //
46 // ThreadPool
47 //
48 ///////////////////////////////////////////////////////////////////////////////
49
|
50 kumpf 1.4 ThreadPool::ThreadPool(
51 Sint16 initialSize,
52 const char* key,
53 Sint16 minThreads,
54 Sint16 maxThreads,
55 struct timeval
56 &deallocateWait)
57 : _maxThreads(maxThreads),
58 _minThreads(minThreads),
59 _currentThreads(0),
60 _idleThreads(),
61 _runningThreads(),
62 _dying(0)
|
63 mike 1.2 {
64 _deallocateWait.tv_sec = deallocateWait.tv_sec;
65 _deallocateWait.tv_usec = deallocateWait.tv_usec;
66
67 memset(_key, 0x00, 17);
68 if (key != 0)
69 {
70 strncpy(_key, key, 16);
71 }
72
73 if ((_maxThreads > 0) && (_maxThreads < initialSize))
74 {
75 _maxThreads = initialSize;
76 }
77
78 if (_minThreads > initialSize)
79 {
80 _minThreads = initialSize;
81 }
82
83 for (int i = 0; i < initialSize; i++)
84 mike 1.2 {
85 _addToIdleThreadsQueue(_initializeThread());
86 }
87 }
88
89 ThreadPool::~ThreadPool()
90 {
91 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
92
93 try
94 {
95 // Set the dying flag so all thread know the destructor has been
96 // entered
97 _dying++;
|
98 marek 1.9 PEG_TRACE((TRC_THREAD, Tracer::LEVEL2,
99 "Cleaning up %d idle threads.", _currentThreads.get()));
|
100 mike 1.2
101 while (_currentThreads.get() > 0)
102 {
|
103 kumpf 1.4 Thread* thread = _idleThreads.remove_front();
|
104 mike 1.2 if (thread != 0)
105 {
106 _cleanupThread(thread);
107 _currentThreads--;
108 }
109 else
110 {
111 Threads::yield();
112 }
113 }
114 }
|
115 kumpf 1.4 catch (...)
|
116 mike 1.2 {
117 }
118 }
119
|
120 kumpf 1.4 ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
|
121 mike 1.2 {
122 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
123
124 try
125 {
126 Thread *myself = (Thread *) parm;
127 PEGASUS_ASSERT(myself != 0);
128
129 // Set myself into thread specific storage
130 // This will allow code to get its own Thread
131 Thread::setCurrent(myself);
132
133 ThreadPool *pool = (ThreadPool *) myself->get_parm();
134 PEGASUS_ASSERT(pool != 0);
135
136 Semaphore *sleep_sem = 0;
137 struct timeval *lastActivityTime = 0;
138
139 try
140 {
141 sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem");
142 mike 1.2 myself->dereference_tsd();
143 PEGASUS_ASSERT(sleep_sem != 0);
144
145 lastActivityTime =
146 (struct timeval *) myself->
147 reference_tsd("last activity time");
148 myself->dereference_tsd();
149 PEGASUS_ASSERT(lastActivityTime != 0);
150 }
|
151 kumpf 1.4 catch (...)
|
152 mike 1.2 {
|
153 marek 1.9 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
154 kumpf 1.4 "ThreadPool::_loop: Failure getting sleep_sem or "
155 "lastActivityTime.");
|
156 mike 1.2 PEGASUS_ASSERT(false);
157 pool->_idleThreads.remove(myself);
158 pool->_currentThreads--;
159 PEG_METHOD_EXIT();
|
160 kumpf 1.4 return (ThreadReturnType) 1;
|
161 mike 1.2 }
162
163 while (1)
164 {
165 try
166 {
167 sleep_sem->wait();
168 }
|
169 kumpf 1.4 catch (...)
|
170 mike 1.2 {
|
171 marek 1.9 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
172 kumpf 1.4 "ThreadPool::_loop: failure on sleep_sem->wait().");
|
173 mike 1.2 PEGASUS_ASSERT(false);
174 pool->_idleThreads.remove(myself);
175 pool->_currentThreads--;
176 PEG_METHOD_EXIT();
|
177 kumpf 1.4 return (ThreadReturnType) 1;
|
178 mike 1.2 }
179
180 // When we awaken we reside on the _runningThreads queue, not the
181 // _idleThreads queue.
182
183 ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
184 void *parm = 0;
185 Semaphore *blocking_sem = 0;
186
187 try
188 {
189 work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
190 myself->reference_tsd("work func");
191 myself->dereference_tsd();
192 parm = myself->reference_tsd("work parm");
193 myself->dereference_tsd();
194 blocking_sem =
195 (Semaphore *) myself->reference_tsd("blocking sem");
196 myself->dereference_tsd();
197 }
|
198 kumpf 1.4 catch (...)
|
199 mike 1.2 {
|
200 marek 1.9 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
201 kumpf 1.4 "ThreadPool::_loop: Failure accessing work func, work "
202 "parm, or blocking sem.");
|
203 mike 1.2 PEGASUS_ASSERT(false);
204 pool->_idleThreads.remove(myself);
205 pool->_currentThreads--;
206 PEG_METHOD_EXIT();
|
207 kumpf 1.4 return (ThreadReturnType) 1;
|
208 mike 1.2 }
209
210 if (work == 0)
211 {
|
212 marek 1.9 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
|
213 kumpf 1.4 "ThreadPool::_loop: work func is 0, meaning we should "
214 "exit.");
|
215 mike 1.2 break;
216 }
217
218 Time::gettimeofday(lastActivityTime);
219
220 try
221 {
|
222 marek 1.9 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
|
223 mike 1.2 "Work starting.");
224 work(parm);
|
225 marek 1.9 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
|
226 mike 1.2 "Work finished.");
227 }
|
228 kumpf 1.4 catch (Exception& e)
|
229 mike 1.2 {
230 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
231 kumpf 1.4 String("Exception from work in ThreadPool::_loop: ") +
232 e.getMessage());
|
233 mike 1.2 }
|
234 kumpf 1.4 catch (const exception& e)
|
235 mike 1.2 {
236 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
237 kumpf 1.4 String("Exception from work in ThreadPool::_loop: ") +
238 e.what());
|
239 mike 1.2 }
|
240 kumpf 1.4 catch (...)
|
241 mike 1.2 {
|
242 marek 1.9 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
243 kumpf 1.4 "Unknown exception from work in ThreadPool::_loop.");
|
244 mike 1.2 }
245
246 // put myself back onto the available list
247 try
248 {
249 Time::gettimeofday(lastActivityTime);
250 if (blocking_sem != 0)
251 {
252 blocking_sem->signal();
253 }
254
255 pool->_runningThreads.remove(myself);
256 pool->_idleThreads.insert_front(myself);
257 }
|
258 kumpf 1.4 catch (...)
|
259 mike 1.2 {
|
260 marek 1.9 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
261 kumpf 1.4 "ThreadPool::_loop: Adding thread to idle pool failed.");
|
262 mike 1.2 PEGASUS_ASSERT(false);
263 pool->_currentThreads--;
264 PEG_METHOD_EXIT();
|
265 kumpf 1.4 return (ThreadReturnType) 1;
|
266 mike 1.2 }
267 }
268 }
|
269 kumpf 1.4 catch (const Exception & e)
|
270 mike 1.2 {
271 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
272 kumpf 1.4 "Caught exception: \"" + e.getMessage() + "\". Exiting _loop.");
|
273 mike 1.2 }
|
274 kumpf 1.4 catch (...)
|
275 mike 1.2 {
|
276 marek 1.9 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
277 kumpf 1.4 "Caught unrecognized exception. Exiting _loop.");
|
278 mike 1.2 }
279
280 PEG_METHOD_EXIT();
|
281 kumpf 1.4 return (ThreadReturnType) 0;
|
282 mike 1.2 }
283
|
284 kumpf 1.4 ThreadStatus ThreadPool::allocate_and_awaken(
285 void* parm,
286 ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*),
287 Semaphore* blocking)
|
288 mike 1.2 {
289 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
290
291 // Allocate_and_awaken will not run if the _dying flag is set.
292 // Once the lock is acquired, ~ThreadPool will not change
293 // the value of _dying until the lock is released.
294
295 try
296 {
297 if (_dying.get())
298 {
|
299 marek 1.9 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
300 kumpf 1.4 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
|
301 mike 1.2 return PEGASUS_THREAD_UNAVAILABLE;
302 }
303 struct timeval start;
304 Time::gettimeofday(&start);
305 Thread *th = 0;
306
307 th = _idleThreads.remove_front();
308
309 if (th == 0)
310 {
311 if ((_maxThreads == 0) ||
312 (_currentThreads.get() < Uint32(_maxThreads)))
313 {
314 th = _initializeThread();
315 }
316 }
317
318 if (th == 0)
319 {
|
320 marek 1.9 PEG_TRACE((TRC_THREAD, Tracer::LEVEL2,
|
321 kumpf 1.3 "ThreadPool::allocate_and_awaken: Insufficient resources: "
322 " pool = %s, running threads = %d, idle threads = %d",
|
323 marek 1.9 _key, _runningThreads.size(), _idleThreads.size()));
|
324 mike 1.2 return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
325 }
326
327 // initialize the thread data with the work function and parameters
|
328 marek 1.9 PEG_TRACE((TRC_THREAD, Tracer::LEVEL4,
|
329 marek 1.14 "Initializing thread(%s)"
330 " with work function and parameters: parm = %p",
331 Threads::id(th->getThreadHandle().thid).buffer,
|
332 marek 1.9 parm));
|
333 mike 1.2
334 th->delete_tsd("work func");
335 th->put_tsd("work func", NULL,
336 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
337 (void *)), (void *) work);
338 th->delete_tsd("work parm");
339 th->put_tsd("work parm", NULL, sizeof (void *), parm);
340 th->delete_tsd("blocking sem");
341 if (blocking != 0)
342 th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking);
343
344 // put the thread on the running list
345 _runningThreads.insert_front(th);
346
347 // signal the thread's sleep semaphore to awaken it
348 Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem");
349 PEGASUS_ASSERT(sleep_sem != 0);
350
|
351 kumpf 1.10 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
352 "Signal thread to awaken");
|
353 mike 1.2 sleep_sem->signal();
354 th->dereference_tsd();
355 }
|
356 kumpf 1.4 catch (...)
|
357 mike 1.2 {
|
358 marek 1.9 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
359 mike 1.2 "ThreadPool::allocate_and_awaken: Operation Failed.");
360 PEG_METHOD_EXIT();
361 // ATTN: Error result has not yet been defined
362 return PEGASUS_THREAD_SETUP_FAILURE;
363 }
364 PEG_METHOD_EXIT();
365 return PEGASUS_THREAD_OK;
366 }
367
368 // caller is responsible for only calling this routine during slack periods
369 // but should call it at least once per _deallocateWait interval.
370
371 Uint32 ThreadPool::cleanupIdleThreads()
372 {
373 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
374
375 Uint32 numThreadsCleanedUp = 0;
376
|
377 kumpf 1.7 Uint32 numIdleThreads = _idleThreads.size();
378 for (Uint32 i = 0; i < numIdleThreads; i++)
|
379 mike 1.2 {
380 // Do not dip below the minimum thread count
381 if (_currentThreads.get() <= (Uint32) _minThreads)
382 {
383 break;
384 }
385
386 Thread *thread = _idleThreads.remove_back();
387
388 // If there are no more threads in the _idleThreads queue, we're
389 // done.
390 if (thread == 0)
391 {
392 break;
393 }
394
395 struct timeval *lastActivityTime;
396 try
397 {
398 lastActivityTime =
399 (struct timeval *) thread->
400 mike 1.2 try_reference_tsd("last activity time");
401 PEGASUS_ASSERT(lastActivityTime != 0);
402 }
|
403 kumpf 1.4 catch (...)
|
404 mike 1.2 {
405 PEGASUS_ASSERT(false);
406 _idleThreads.insert_back(thread);
407 break;
408 }
409
410 Boolean cleanupThisThread =
411 _timeIntervalExpired(lastActivityTime, &_deallocateWait);
412 thread->dereference_tsd();
413
414 if (cleanupThisThread)
415 {
416 _cleanupThread(thread);
417 _currentThreads--;
418 numThreadsCleanedUp++;
419 }
420 else
421 {
422 _idleThreads.insert_front(thread);
423 }
424 }
425 mike 1.2
426 PEG_METHOD_EXIT();
427 return numThreadsCleanedUp;
428 }
429
430 void ThreadPool::_cleanupThread(Thread * thread)
431 {
432 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
433
434 // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
435 thread->delete_tsd("work func");
436 thread->put_tsd("work func", 0,
437 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
438 (void *)), (void *) 0);
439 thread->delete_tsd("work parm");
440 thread->put_tsd("work parm", 0, sizeof (void *), 0);
441
442 // signal the thread's sleep semaphore to awaken it
443 Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem");
444 PEGASUS_ASSERT(sleep_sem != 0);
445 sleep_sem->signal();
446 mike 1.2 thread->dereference_tsd();
447
448 thread->join();
449 delete thread;
450
451 PEG_METHOD_EXIT();
452 }
453
|
454 kumpf 1.4 Boolean ThreadPool::_timeIntervalExpired(
455 struct timeval* start,
456 struct timeval* interval)
|
457 mike 1.2 {
|
458 kumpf 1.6 PEGASUS_ASSERT(interval != 0);
459
|
460 mike 1.2 // never time out if the interval is zero
|
461 kumpf 1.6 if ((interval->tv_sec == 0) && (interval->tv_usec == 0))
|
462 mike 1.2 {
463 return false;
464 }
465
466 struct timeval now, finish, remaining;
467 Uint32 usec;
468 Time::gettimeofday(&now);
469 Time::gettimeofday(&remaining); // Avoid valgrind error
470
471 finish.tv_sec = start->tv_sec + interval->tv_sec;
472 usec = start->tv_usec + interval->tv_usec;
473 finish.tv_sec += (usec / 1000000);
474 usec %= 1000000;
475 finish.tv_usec = usec;
476
477 return (Time::subtract(&remaining, &finish, &now) != 0);
478 }
479
480 void ThreadPool::_deleteSemaphore(void *p)
481 {
482 delete(Semaphore *) p;
483 mike 1.2 }
484
485 Thread *ThreadPool::_initializeThread()
486 {
487 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
488
489 Thread *th = (Thread *) new Thread(_loop, this, false);
490
491 // allocate a sleep semaphore and pass it in the thread context
492 // initial count is zero, loop function will sleep until
493 // we signal the semaphore
494 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
|
495 kumpf 1.4 th->put_tsd(
496 "sleep sem", &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);
|
497 mike 1.2
|
498 kumpf 1.4 struct timeval* lastActivityTime =
|
499 mike 1.2 (struct timeval *)::operator new(sizeof (struct timeval));
500 Time::gettimeofday(lastActivityTime);
501
|
502 kumpf 1.4 th->put_tsd(
503 "last activity time",
504 thread_data::default_delete,
505 sizeof(struct timeval),
506 (void*) lastActivityTime);
|
507 mike 1.2 // thread will enter _loop() and sleep on sleep_sem until we signal it
508
509 if (th->run() != PEGASUS_THREAD_OK)
510 {
|
511 marek 1.9 PEG_TRACE((TRC_THREAD, Tracer::LEVEL2,
512 "Could not create thread. Error code is %d.", errno));
|
513 mike 1.2 delete th;
514 return 0;
515 }
516 _currentThreads++;
517
518 PEG_METHOD_EXIT();
519 return th;
520 }
521
522 void ThreadPool::_addToIdleThreadsQueue(Thread * th)
523 {
524 if (th == 0)
525 {
|
526 marek 1.9 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
527 kumpf 1.4 "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
|
528 mike 1.2 throw NullPointer();
529 }
530
531 try
532 {
533 _idleThreads.insert_front(th);
534 }
|
535 kumpf 1.4 catch (...)
|
536 mike 1.2 {
|
537 marek 1.9 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
538 kumpf 1.4 "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
539 "failed.");
|
540 mike 1.2 }
541 }
542
543 PEGASUS_NAMESPACE_END
|