1 mike 1.2 //%2006////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
13 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 // Author: Mike Day (mdday@us.ibm.com)
33 //
34 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
35 // added nsk platform support
36 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
37 // Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
38 // Sean Keenan, Hewlett-Packard Company (sean.keenan@hp.com)
39 // David Dillard, VERITAS Software Corp.
40 // (david.dillard@veritas.com)
41 //
42 //%/////////////////////////////////////////////////////////////////////////////
43 mike 1.2
44 #include "ThreadPool.h"
45 #include "Thread.h"
46 #include <exception>
47 #include <Pegasus/Common/Tracer.h>
48 #include "Time.h"
49
50 PEGASUS_USING_STD;
51
52 PEGASUS_NAMESPACE_BEGIN
53
54 ///////////////////////////////////////////////////////////////////////////////
55 //
56 // ThreadPool
57 //
58 ///////////////////////////////////////////////////////////////////////////////
59
60 ThreadPool::ThreadPool(Sint16 initialSize,
61 const char *key,
62 Sint16 minThreads,
63 Sint16 maxThreads,
64 mike 1.2 struct timeval
65 &deallocateWait):_maxThreads(maxThreads),
66 _minThreads(minThreads), _currentThreads(0), _idleThreads(),
67 _runningThreads(), _dying(0)
68 {
69 _deallocateWait.tv_sec = deallocateWait.tv_sec;
70 _deallocateWait.tv_usec = deallocateWait.tv_usec;
71
72 memset(_key, 0x00, 17);
73 if (key != 0)
74 {
75 strncpy(_key, key, 16);
76 }
77
78 if ((_maxThreads > 0) && (_maxThreads < initialSize))
79 {
80 _maxThreads = initialSize;
81 }
82
83 if (_minThreads > initialSize)
84 {
85 mike 1.2 _minThreads = initialSize;
86 }
87
88 for (int i = 0; i < initialSize; i++)
89 {
90 _addToIdleThreadsQueue(_initializeThread());
91 }
92 }
93
94 ThreadPool::~ThreadPool()
95 {
96 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
97
98 try
99 {
100 // Set the dying flag so all thread know the destructor has been
101 // entered
102 _dying++;
103 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
104 "Cleaning up %d idle threads. ", _currentThreads.get());
105
106 mike 1.2 while (_currentThreads.get() > 0)
107 {
108 Thread *thread = _idleThreads.remove_front();
109 if (thread != 0)
110 {
111 _cleanupThread(thread);
112 _currentThreads--;
113 }
114 else
115 {
116 Threads::yield();
117 }
118 }
119 }
120 catch(...)
121 {
122 }
123 }
124
125 ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
126 {
127 mike 1.2 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
128
129 try
130 {
131 Thread *myself = (Thread *) parm;
132 PEGASUS_ASSERT(myself != 0);
133
134 // Set myself into thread specific storage
135 // This will allow code to get its own Thread
136 Thread::setCurrent(myself);
137
138 ThreadPool *pool = (ThreadPool *) myself->get_parm();
139 PEGASUS_ASSERT(pool != 0);
140
141 Semaphore *sleep_sem = 0;
142 struct timeval *lastActivityTime = 0;
143
144 try
145 {
146 sleep_sem = (Semaphore *) myself->reference_tsd("sleep sem");
147 myself->dereference_tsd();
148 mike 1.2 PEGASUS_ASSERT(sleep_sem != 0);
149
150 lastActivityTime =
151 (struct timeval *) myself->
152 reference_tsd("last activity time");
153 myself->dereference_tsd();
154 PEGASUS_ASSERT(lastActivityTime != 0);
155 }
156 catch(...)
157 {
158 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
159 "ThreadPool::_loop: Failure getting sleep_sem or "
160 "lastActivityTime.");
161 PEGASUS_ASSERT(false);
162 pool->_idleThreads.remove(myself);
163 pool->_currentThreads--;
164 PEG_METHOD_EXIT();
165 return ((ThreadReturnType) 1);
166 }
167
168 while (1)
169 mike 1.2 {
170 try
171 {
172 sleep_sem->wait();
173 }
174 catch(...)
175 {
176 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
177 "ThreadPool::_loop: failure on sleep_sem->wait().");
178 PEGASUS_ASSERT(false);
179 pool->_idleThreads.remove(myself);
180 pool->_currentThreads--;
181 PEG_METHOD_EXIT();
182 return ((ThreadReturnType) 1);
183 }
184
185 // When we awaken we reside on the _runningThreads queue, not the
186 // _idleThreads queue.
187
188 ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
189 void *parm = 0;
190 mike 1.2 Semaphore *blocking_sem = 0;
191
192 try
193 {
194 work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
195 myself->reference_tsd("work func");
196 myself->dereference_tsd();
197 parm = myself->reference_tsd("work parm");
198 myself->dereference_tsd();
199 blocking_sem =
200 (Semaphore *) myself->reference_tsd("blocking sem");
201 myself->dereference_tsd();
202 }
203 catch(...)
204 {
205 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
206 "ThreadPool::_loop: Failure accessing work func, work parm, "
207 "or blocking sem.");
208 PEGASUS_ASSERT(false);
209 pool->_idleThreads.remove(myself);
210 pool->_currentThreads--;
211 mike 1.2 PEG_METHOD_EXIT();
212 return ((ThreadReturnType) 1);
213 }
214
215 if (work == 0)
216 {
217 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
218 "ThreadPool::_loop: work func is 0, meaning we should exit.");
219 break;
220 }
221
222 Time::gettimeofday(lastActivityTime);
223
224 try
225 {
226 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
227 "Work starting.");
228 work(parm);
229 PEG_TRACE_STRING(TRC_THREAD, Tracer::LEVEL4,
230 "Work finished.");
231 }
232 mike 1.2 catch(Exception & e)
233 {
234 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
235 String
236 ("Exception from work in ThreadPool::_loop: ")
237 + e.getMessage());
238 }
239 #if !defined(PEGASUS_OS_LSB)
240 catch(const exception & e)
241 {
242 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
243 String
244 ("Exception from work in ThreadPool::_loop: ")
245 + e.what());
246 }
247 #endif
248 catch(...)
249 {
250 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
251 "Unknown exception from work in ThreadPool::_loop.");
252 }
253 mike 1.2
254 // put myself back onto the available list
255 try
256 {
257 Time::gettimeofday(lastActivityTime);
258 if (blocking_sem != 0)
259 {
260 blocking_sem->signal();
261 }
262
263 pool->_runningThreads.remove(myself);
264 pool->_idleThreads.insert_front(myself);
265 }
266 catch(...)
267 {
268 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
269 "ThreadPool::_loop: Adding thread to idle pool failed.");
270 PEGASUS_ASSERT(false);
271 pool->_currentThreads--;
272 PEG_METHOD_EXIT();
273 return ((ThreadReturnType) 1);
274 mike 1.2 }
275 }
276 }
277 catch(const Exception & e)
278 {
279 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
280 "Caught exception: \"" + e.getMessage() +
281 "\". Exiting _loop.");
282 }
283 catch(...)
284 {
285 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
286 "Caught unrecognized exception. Exiting _loop.");
287 }
288
289 PEG_METHOD_EXIT();
290 return ((ThreadReturnType) 0);
291 }
292
293 ThreadStatus ThreadPool::allocate_and_awaken(void *parm,
294 ThreadReturnType
295 mike 1.2 (PEGASUS_THREAD_CDECL *
296 work) (void *),
297 Semaphore * blocking)
298 {
299 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
300
301 // Allocate_and_awaken will not run if the _dying flag is set.
302 // Once the lock is acquired, ~ThreadPool will not change
303 // the value of _dying until the lock is released.
304
305 try
306 {
307 if (_dying.get())
308 {
309 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
310 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
311 return PEGASUS_THREAD_UNAVAILABLE;
312 }
313 struct timeval start;
314 Time::gettimeofday(&start);
315 Thread *th = 0;
316 mike 1.2
317 th = _idleThreads.remove_front();
318
319 if (th == 0)
320 {
321 if ((_maxThreads == 0) ||
322 (_currentThreads.get() < Uint32(_maxThreads)))
323 {
324 th = _initializeThread();
325 }
326 }
327
328 if (th == 0)
329 {
|
334 mike 1.2 return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
335 }
336
337 // initialize the thread data with the work function and parameters
338 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
339 "Initializing thread with work function and parameters: parm = %p",
340 parm);
341
342 th->delete_tsd("work func");
343 th->put_tsd("work func", NULL,
344 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
345 (void *)), (void *) work);
346 th->delete_tsd("work parm");
347 th->put_tsd("work parm", NULL, sizeof (void *), parm);
348 th->delete_tsd("blocking sem");
349 if (blocking != 0)
350 th->put_tsd("blocking sem", NULL, sizeof (Semaphore *), blocking);
351
352 // put the thread on the running list
353 _runningThreads.insert_front(th);
354
355 mike 1.2 // signal the thread's sleep semaphore to awaken it
356 Semaphore *sleep_sem = (Semaphore *) th->reference_tsd("sleep sem");
357 PEGASUS_ASSERT(sleep_sem != 0);
358
359 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
360 sleep_sem->signal();
361 th->dereference_tsd();
362 }
363 catch(...)
364 {
365 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
366 "ThreadPool::allocate_and_awaken: Operation Failed.");
367 PEG_METHOD_EXIT();
368 // ATTN: Error result has not yet been defined
369 return PEGASUS_THREAD_SETUP_FAILURE;
370 }
371 PEG_METHOD_EXIT();
372 return PEGASUS_THREAD_OK;
373 }
374
375 // caller is responsible for only calling this routine during slack periods
376 mike 1.2 // but should call it at least once per _deallocateWait interval.
377
378 Uint32 ThreadPool::cleanupIdleThreads()
379 {
380 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
381
382 Uint32 numThreadsCleanedUp = 0;
383
384 Uint32 numIdleThreads = _idleThreads.size();
385 for (Uint32 i = 0; i < numIdleThreads; i++)
386 {
387 // Do not dip below the minimum thread count
388 if (_currentThreads.get() <= (Uint32) _minThreads)
389 {
390 break;
391 }
392
393 Thread *thread = _idleThreads.remove_back();
394
395 // If there are no more threads in the _idleThreads queue, we're
396 // done.
397 mike 1.2 if (thread == 0)
398 {
399 break;
400 }
401
402 struct timeval *lastActivityTime;
403 try
404 {
405 lastActivityTime =
406 (struct timeval *) thread->
407 try_reference_tsd("last activity time");
408 PEGASUS_ASSERT(lastActivityTime != 0);
409 }
410 catch(...)
411 {
412 PEGASUS_ASSERT(false);
413 _idleThreads.insert_back(thread);
414 break;
415 }
416
417 Boolean cleanupThisThread =
418 mike 1.2 _timeIntervalExpired(lastActivityTime, &_deallocateWait);
419 thread->dereference_tsd();
420
421 if (cleanupThisThread)
422 {
423 _cleanupThread(thread);
424 _currentThreads--;
425 numThreadsCleanedUp++;
426 }
427 else
428 {
429 _idleThreads.insert_front(thread);
430 }
431 }
432
433 PEG_METHOD_EXIT();
434 return numThreadsCleanedUp;
435 }
436
437 void ThreadPool::_cleanupThread(Thread * thread)
438 {
439 mike 1.2 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
440
441 // Set the "work func" and "work parm" to 0 so _loop() knows to exit.
442 thread->delete_tsd("work func");
443 thread->put_tsd("work func", 0,
444 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
445 (void *)), (void *) 0);
446 thread->delete_tsd("work parm");
447 thread->put_tsd("work parm", 0, sizeof (void *), 0);
448
449 // signal the thread's sleep semaphore to awaken it
450 Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd("sleep sem");
451 PEGASUS_ASSERT(sleep_sem != 0);
452 sleep_sem->signal();
453 thread->dereference_tsd();
454
455 thread->join();
456 delete thread;
457
458 PEG_METHOD_EXIT();
459 }
460 mike 1.2
461 Boolean ThreadPool::_timeIntervalExpired(struct timeval *start,
462 struct timeval *interval)
463 {
464 // never time out if the interval is zero
465 if (interval && (interval->tv_sec == 0) && (interval->tv_usec == 0))
466 {
467 return false;
468 }
469
470 struct timeval now, finish, remaining;
471 Uint32 usec;
472 Time::gettimeofday(&now);
473 Time::gettimeofday(&remaining); // Avoid valgrind error
474
475 finish.tv_sec = start->tv_sec + interval->tv_sec;
476 usec = start->tv_usec + interval->tv_usec;
477 finish.tv_sec += (usec / 1000000);
478 usec %= 1000000;
479 finish.tv_usec = usec;
480
481 mike 1.2 return (Time::subtract(&remaining, &finish, &now) != 0);
482 }
483
484 void ThreadPool::_deleteSemaphore(void *p)
485 {
486 delete(Semaphore *) p;
487 }
488
489 Thread *ThreadPool::_initializeThread()
490 {
491 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
492
493 Thread *th = (Thread *) new Thread(_loop, this, false);
494
495 // allocate a sleep semaphore and pass it in the thread context
496 // initial count is zero, loop function will sleep until
497 // we signal the semaphore
498 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
499 th->put_tsd("sleep sem", &_deleteSemaphore, sizeof (Semaphore),
500 (void *) sleep_sem);
501
502 mike 1.2 struct timeval *lastActivityTime =
503 (struct timeval *)::operator new(sizeof (struct timeval));
504 Time::gettimeofday(lastActivityTime);
505
506 th->put_tsd("last activity time", thread_data::default_delete,
507 sizeof (struct timeval), (void *) lastActivityTime);
508 // thread will enter _loop() and sleep on sleep_sem until we signal it
509
510 if (th->run() != PEGASUS_THREAD_OK)
511 {
512 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
513 "Could not create thread. Error code is %d.", errno);
514 delete th;
515 return 0;
516 }
517 _currentThreads++;
518 Threads::yield();
519
520 PEG_METHOD_EXIT();
521 return th;
522 }
523 mike 1.2
524 void ThreadPool::_addToIdleThreadsQueue(Thread * th)
525 {
526 if (th == 0)
527 {
528 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
529 "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
530 throw NullPointer();
531 }
532
533 try
534 {
535 _idleThreads.insert_front(th);
536 }
537 catch(...)
538 {
539 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
540 "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
541 "failed.");
542 }
543 }
544 mike 1.2
545 PEGASUS_NAMESPACE_END
|