1 martin 1.22 //%LICENSE////////////////////////////////////////////////////////////////
|
2 martin 1.23 //
|
3 martin 1.22 // Licensed to The Open Group (TOG) under one or more contributor license
4 // agreements. Refer to the OpenPegasusNOTICE.txt file distributed with
5 // this work for additional information regarding copyright ownership.
6 // Each contributor licenses this file to you under the OpenPegasus Open
7 // Source License; you may not use this file except in compliance with the
8 // License.
|
9 martin 1.23 //
|
10 martin 1.22 // Permission is hereby granted, free of charge, to any person obtaining a
11 // copy of this software and associated documentation files (the "Software"),
12 // to deal in the Software without restriction, including without limitation
13 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 // and/or sell copies of the Software, and to permit persons to whom the
15 // Software is furnished to do so, subject to the following conditions:
|
16 martin 1.23 //
|
17 martin 1.22 // The above copyright notice and this permission notice shall be included
18 // in all copies or substantial portions of the Software.
|
19 martin 1.23 //
|
20 martin 1.22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
21 martin 1.23 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
22 martin 1.22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23 // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
24 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
25 // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
26 // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
27 martin 1.23 //
|
28 martin 1.22 //////////////////////////////////////////////////////////////////////////
|
29 mike 1.2 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include "ThreadPool.h"
33 #include "Thread.h"
34 #include <exception>
35 #include <Pegasus/Common/Tracer.h>
36 #include "Time.h"
37
38 PEGASUS_USING_STD;
39
40 PEGASUS_NAMESPACE_BEGIN
41
42 ///////////////////////////////////////////////////////////////////////////////
43 //
44 // ThreadPool
45 //
46 ///////////////////////////////////////////////////////////////////////////////
47
|
48 kumpf 1.4 ThreadPool::ThreadPool(
49 Sint16 initialSize,
50 const char* key,
51 Sint16 minThreads,
52 Sint16 maxThreads,
53 struct timeval
54 &deallocateWait)
55 : _maxThreads(maxThreads),
56 _minThreads(minThreads),
57 _currentThreads(0),
58 _idleThreads(),
59 _runningThreads(),
60 _dying(0)
|
61 mike 1.2 {
62 _deallocateWait.tv_sec = deallocateWait.tv_sec;
63 _deallocateWait.tv_usec = deallocateWait.tv_usec;
64
65 memset(_key, 0x00, 17);
66 if (key != 0)
67 {
68 strncpy(_key, key, 16);
69 }
70
71 if ((_maxThreads > 0) && (_maxThreads < initialSize))
72 {
73 _maxThreads = initialSize;
74 }
75
76 if (_minThreads > initialSize)
77 {
78 _minThreads = initialSize;
79 }
80
81 for (int i = 0; i < initialSize; i++)
82 mike 1.2 {
83 _addToIdleThreadsQueue(_initializeThread());
84 }
85 }
86
87 ThreadPool::~ThreadPool()
88 {
89 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
90
91 try
92 {
93 // Set the dying flag so all thread know the destructor has been
94 // entered
95 _dying++;
|
96 marek 1.15 PEG_TRACE((TRC_THREAD, Tracer::LEVEL3,
|
97 marek 1.9 "Cleaning up %d idle threads.", _currentThreads.get()));
|
98 mike 1.2
99 while (_currentThreads.get() > 0)
100 {
|
101 kumpf 1.4 Thread* thread = _idleThreads.remove_front();
|
102 mike 1.2 if (thread != 0)
103 {
104 _cleanupThread(thread);
105 _currentThreads--;
106 }
107 else
108 {
109 Threads::yield();
110 }
111 }
112 }
|
113 kumpf 1.4 catch (...)
|
114 mike 1.2 {
115 }
116 }
117
|
118 kumpf 1.4 ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
|
119 mike 1.2 {
120 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
121
122 try
123 {
124 Thread *myself = (Thread *) parm;
125 PEGASUS_ASSERT(myself != 0);
126
127 // Set myself into thread specific storage
128 // This will allow code to get its own Thread
129 Thread::setCurrent(myself);
130
131 ThreadPool *pool = (ThreadPool *) myself->get_parm();
132 PEGASUS_ASSERT(pool != 0);
133
134 Semaphore *sleep_sem = 0;
135 struct timeval *lastActivityTime = 0;
136
137 try
138 {
|
139 mike 1.21 sleep_sem = (Semaphore *) myself->reference_tsd(TSD_SLEEP_SEM);
|
140 mike 1.2 myself->dereference_tsd();
141 PEGASUS_ASSERT(sleep_sem != 0);
142
143 lastActivityTime =
144 (struct timeval *) myself->
|
145 mike 1.21 reference_tsd(TSD_LAST_ACTIVITY_TIME);
|
146 mike 1.2 myself->dereference_tsd();
147 PEGASUS_ASSERT(lastActivityTime != 0);
148 }
|
149 kumpf 1.4 catch (...)
|
150 mike 1.2 {
|
151 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
152 kumpf 1.4 "ThreadPool::_loop: Failure getting sleep_sem or "
153 "lastActivityTime.");
|
154 mike 1.2 pool->_idleThreads.remove(myself);
155 pool->_currentThreads--;
156 PEG_METHOD_EXIT();
|
157 kumpf 1.4 return (ThreadReturnType) 1;
|
158 mike 1.2 }
159
160 while (1)
161 {
162 try
163 {
164 sleep_sem->wait();
165 }
|
166 kumpf 1.4 catch (...)
|
167 mike 1.2 {
|
168 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
169 kumpf 1.4 "ThreadPool::_loop: failure on sleep_sem->wait().");
|
170 mike 1.2 pool->_idleThreads.remove(myself);
171 pool->_currentThreads--;
172 PEG_METHOD_EXIT();
|
173 kumpf 1.4 return (ThreadReturnType) 1;
|
174 mike 1.2 }
175
176 // When we awaken we reside on the _runningThreads queue, not the
177 // _idleThreads queue.
178
179 ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
|
180 kumpf 1.16 void *workParm = 0;
|
181 mike 1.2 Semaphore *blocking_sem = 0;
182
183 try
184 {
185 work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
|
186 mike 1.21 myself->reference_tsd(TSD_WORK_FUNC);
|
187 mike 1.2 myself->dereference_tsd();
|
188 mike 1.21 workParm = myself->reference_tsd(TSD_WORK_PARM);
|
189 mike 1.2 myself->dereference_tsd();
190 blocking_sem =
|
191 mike 1.21 (Semaphore *) myself->reference_tsd(TSD_BLOCKING_SEM);
|
192 mike 1.2 myself->dereference_tsd();
193 }
|
194 kumpf 1.4 catch (...)
|
195 mike 1.2 {
|
196 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
197 kumpf 1.4 "ThreadPool::_loop: Failure accessing work func, work "
198 "parm, or blocking sem.");
|
199 mike 1.2 pool->_idleThreads.remove(myself);
200 pool->_currentThreads--;
201 PEG_METHOD_EXIT();
|
202 kumpf 1.4 return (ThreadReturnType) 1;
|
203 mike 1.2 }
204
205 if (work == 0)
206 {
|
207 marek 1.9 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
|
208 kumpf 1.4 "ThreadPool::_loop: work func is 0, meaning we should "
209 "exit.");
|
210 mike 1.2 break;
211 }
212
213 Time::gettimeofday(lastActivityTime);
214
215 try
216 {
|
217 marek 1.9 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
|
218 mike 1.2 "Work starting.");
|
219 kumpf 1.16 work(workParm);
|
220 marek 1.9 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
|
221 mike 1.2 "Work finished.");
222 }
|
223 kumpf 1.4 catch (Exception& e)
|
224 mike 1.2 {
|
225 thilo.boehm 1.18 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
226 "Exception from work in ThreadPool::_loop: %s",
227 (const char*)e.getMessage().getCString()));
|
228 mike 1.2 }
|
229 kumpf 1.4 catch (const exception& e)
|
230 mike 1.2 {
|
231 thilo.boehm 1.18 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
232 "Exception from work in ThreadPool::_loop: %s",e.what()));
|
233 mike 1.2 }
|
234 kumpf 1.4 catch (...)
|
235 mike 1.2 {
|
236 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
237 kumpf 1.4 "Unknown exception from work in ThreadPool::_loop.");
|
238 mike 1.2 }
239
240 // put myself back onto the available list
241 try
242 {
243 Time::gettimeofday(lastActivityTime);
244 if (blocking_sem != 0)
245 {
246 blocking_sem->signal();
247 }
248
249 pool->_runningThreads.remove(myself);
250 pool->_idleThreads.insert_front(myself);
251 }
|
252 kumpf 1.4 catch (...)
|
253 mike 1.2 {
|
254 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
255 kumpf 1.4 "ThreadPool::_loop: Adding thread to idle pool failed.");
|
256 mike 1.2 pool->_currentThreads--;
257 PEG_METHOD_EXIT();
|
258 kumpf 1.4 return (ThreadReturnType) 1;
|
259 mike 1.2 }
260 }
261 }
|
262 kumpf 1.4 catch (const Exception & e)
|
263 mike 1.2 {
|
264 thilo.boehm 1.18 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
265 "Caught exception: \"%s\". Exiting _loop.",
266 (const char*)e.getMessage().getCString()));
|
267 mike 1.2 }
|
268 kumpf 1.4 catch (...)
|
269 mike 1.2 {
|
270 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
271 kumpf 1.4 "Caught unrecognized exception. Exiting _loop.");
|
272 mike 1.2 }
273
274 PEG_METHOD_EXIT();
|
275 kumpf 1.4 return (ThreadReturnType) 0;
|
276 mike 1.2 }
277
|
278 kumpf 1.4 ThreadStatus ThreadPool::allocate_and_awaken(
279 void* parm,
280 ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*),
281 Semaphore* blocking)
|
282 mike 1.2 {
283 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
284
285 // Allocate_and_awaken will not run if the _dying flag is set.
286 // Once the lock is acquired, ~ThreadPool will not change
287 // the value of _dying until the lock is released.
288
289 try
290 {
291 if (_dying.get())
292 {
|
293 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL3,
|
294 kumpf 1.4 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
|
295 mike 1.2 return PEGASUS_THREAD_UNAVAILABLE;
296 }
297 struct timeval start;
298 Time::gettimeofday(&start);
299 Thread *th = 0;
300
301 th = _idleThreads.remove_front();
302
303 if (th == 0)
304 {
305 if ((_maxThreads == 0) ||
306 (_currentThreads.get() < Uint32(_maxThreads)))
307 {
308 th = _initializeThread();
309 }
310 }
311
312 if (th == 0)
313 {
|
314 marek 1.15 PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
|
315 kumpf 1.3 "ThreadPool::allocate_and_awaken: Insufficient resources: "
316 " pool = %s, running threads = %d, idle threads = %d",
|
317 marek 1.9 _key, _runningThreads.size(), _idleThreads.size()));
|
318 mike 1.2 return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
319 }
320
321 // initialize the thread data with the work function and parameters
|
322 marek 1.9 PEG_TRACE((TRC_THREAD, Tracer::LEVEL4,
|
323 marek 1.14 "Initializing thread(%s)"
324 " with work function and parameters: parm = %p",
325 Threads::id(th->getThreadHandle().thid).buffer,
|
326 marek 1.9 parm));
|
327 mike 1.2
|
328 mike 1.21 th->delete_tsd(TSD_WORK_FUNC);
329 th->put_tsd(TSD_WORK_FUNC, NULL,
|
330 mike 1.2 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
331 (void *)), (void *) work);
|
332 mike 1.21 th->delete_tsd(TSD_WORK_PARM);
333 th->put_tsd(TSD_WORK_PARM, NULL, sizeof (void *), parm);
334 th->delete_tsd(TSD_BLOCKING_SEM);
|
335 mike 1.2 if (blocking != 0)
|
336 mike 1.21 th->put_tsd(TSD_BLOCKING_SEM, NULL, sizeof (Semaphore *), blocking);
|
337 mike 1.2
338 // put the thread on the running list
339 _runningThreads.insert_front(th);
340
341 // signal the thread's sleep semaphore to awaken it
|
342 mike 1.21 Semaphore *sleep_sem = (Semaphore *) th->reference_tsd(TSD_SLEEP_SEM);
|
343 mike 1.2 PEGASUS_ASSERT(sleep_sem != 0);
344
|
345 kumpf 1.10 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
346 "Signal thread to awaken");
|
347 mike 1.2 sleep_sem->signal();
348 th->dereference_tsd();
349 }
|
350 kumpf 1.4 catch (...)
|
351 mike 1.2 {
|
352 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
353 mike 1.2 "ThreadPool::allocate_and_awaken: Operation Failed.");
354 PEG_METHOD_EXIT();
355 // ATTN: Error result has not yet been defined
356 return PEGASUS_THREAD_SETUP_FAILURE;
357 }
358 PEG_METHOD_EXIT();
359 return PEGASUS_THREAD_OK;
360 }
361
362 // caller is responsible for only calling this routine during slack periods
363 // but should call it at least once per _deallocateWait interval.
364
365 Uint32 ThreadPool::cleanupIdleThreads()
366 {
367 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
368
369 Uint32 numThreadsCleanedUp = 0;
370
|
371 kumpf 1.7 Uint32 numIdleThreads = _idleThreads.size();
372 for (Uint32 i = 0; i < numIdleThreads; i++)
|
373 mike 1.2 {
374 // Do not dip below the minimum thread count
375 if (_currentThreads.get() <= (Uint32) _minThreads)
376 {
377 break;
378 }
379
380 Thread *thread = _idleThreads.remove_back();
381
382 // If there are no more threads in the _idleThreads queue, we're
383 // done.
384 if (thread == 0)
385 {
386 break;
387 }
388
|
389 mike 1.21 void* tsd = thread->reference_tsd(TSD_LAST_ACTIVITY_TIME);
|
390 kumpf 1.20 struct timeval *lastActivityTime =
391 reinterpret_cast<struct timeval*>(tsd);
|
392 kumpf 1.17 PEGASUS_ASSERT(lastActivityTime != 0);
|
393 mike 1.2
394 Boolean cleanupThisThread =
395 _timeIntervalExpired(lastActivityTime, &_deallocateWait);
396 thread->dereference_tsd();
397
398 if (cleanupThisThread)
399 {
400 _cleanupThread(thread);
401 _currentThreads--;
402 numThreadsCleanedUp++;
403 }
404 else
405 {
406 _idleThreads.insert_front(thread);
407 }
408 }
409
410 PEG_METHOD_EXIT();
411 return numThreadsCleanedUp;
412 }
413
414 mike 1.2 void ThreadPool::_cleanupThread(Thread * thread)
415 {
416 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
417
|
418 mike 1.21 // Set the TSD_WORK_FUNC and TSD_WORK_PARM to 0 so _loop() knows to exit.
419 thread->delete_tsd(TSD_WORK_FUNC);
420 thread->put_tsd(TSD_WORK_FUNC, 0,
|
421 mike 1.2 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
422 (void *)), (void *) 0);
|
423 mike 1.21 thread->delete_tsd(TSD_WORK_PARM);
424 thread->put_tsd(TSD_WORK_PARM, 0, sizeof (void *), 0);
|
425 mike 1.2
426 // signal the thread's sleep semaphore to awaken it
|
427 mike 1.21 Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd(TSD_SLEEP_SEM);
|
428 mike 1.2 PEGASUS_ASSERT(sleep_sem != 0);
429 sleep_sem->signal();
430 thread->dereference_tsd();
431
432 thread->join();
433 delete thread;
434
435 PEG_METHOD_EXIT();
436 }
437
|
438 kumpf 1.4 Boolean ThreadPool::_timeIntervalExpired(
439 struct timeval* start,
440 struct timeval* interval)
|
441 mike 1.2 {
|
442 kumpf 1.6 PEGASUS_ASSERT(interval != 0);
443
|
444 mike 1.2 // never time out if the interval is zero
|
445 kumpf 1.6 if ((interval->tv_sec == 0) && (interval->tv_usec == 0))
|
446 mike 1.2 {
447 return false;
448 }
449
450 struct timeval now, finish, remaining;
451 Uint32 usec;
452 Time::gettimeofday(&now);
|
453 karl 1.19
454 memset(&remaining, 0, sizeof(remaining));
|
455 mike 1.2
456 finish.tv_sec = start->tv_sec + interval->tv_sec;
457 usec = start->tv_usec + interval->tv_usec;
458 finish.tv_sec += (usec / 1000000);
459 usec %= 1000000;
460 finish.tv_usec = usec;
461
462 return (Time::subtract(&remaining, &finish, &now) != 0);
463 }
464
465 void ThreadPool::_deleteSemaphore(void *p)
466 {
467 delete(Semaphore *) p;
468 }
469
470 Thread *ThreadPool::_initializeThread()
471 {
472 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
473
474 Thread *th = (Thread *) new Thread(_loop, this, false);
475
476 mike 1.2 // allocate a sleep semaphore and pass it in the thread context
477 // initial count is zero, loop function will sleep until
478 // we signal the semaphore
479 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
|
480 kumpf 1.4 th->put_tsd(
|
481 mike 1.21 TSD_SLEEP_SEM, &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);
|
482 mike 1.2
|
483 kumpf 1.4 struct timeval* lastActivityTime =
|
484 mike 1.2 (struct timeval *)::operator new(sizeof (struct timeval));
485 Time::gettimeofday(lastActivityTime);
486
|
487 kumpf 1.4 th->put_tsd(
|
488 mike 1.21 TSD_LAST_ACTIVITY_TIME,
|
489 kumpf 1.4 thread_data::default_delete,
490 sizeof(struct timeval),
491 (void*) lastActivityTime);
|
492 mike 1.2 // thread will enter _loop() and sleep on sleep_sem until we signal it
493
494 if (th->run() != PEGASUS_THREAD_OK)
495 {
|
496 marek 1.15 PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
|
497 marek 1.9 "Could not create thread. Error code is %d.", errno));
|
498 mike 1.2 delete th;
499 return 0;
500 }
501 _currentThreads++;
502
503 PEG_METHOD_EXIT();
504 return th;
505 }
506
507 void ThreadPool::_addToIdleThreadsQueue(Thread * th)
508 {
509 if (th == 0)
510 {
|
511 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
512 kumpf 1.4 "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
|
513 mike 1.2 throw NullPointer();
514 }
515
516 try
517 {
518 _idleThreads.insert_front(th);
519 }
|
520 kumpf 1.4 catch (...)
|
521 mike 1.2 {
|
522 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
523 kumpf 1.4 "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
524 "failed.");
|
525 mike 1.2 }
526 }
527
528 PEGASUS_NAMESPACE_END
|