1 martin 1.22 //%LICENSE////////////////////////////////////////////////////////////////
|
2 martin 1.23 //
|
3 martin 1.22 // Licensed to The Open Group (TOG) under one or more contributor license
4 // agreements. Refer to the OpenPegasusNOTICE.txt file distributed with
5 // this work for additional information regarding copyright ownership.
6 // Each contributor licenses this file to you under the OpenPegasus Open
7 // Source License; you may not use this file except in compliance with the
8 // License.
|
9 martin 1.23 //
|
10 martin 1.22 // Permission is hereby granted, free of charge, to any person obtaining a
11 // copy of this software and associated documentation files (the "Software"),
12 // to deal in the Software without restriction, including without limitation
13 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 // and/or sell copies of the Software, and to permit persons to whom the
15 // Software is furnished to do so, subject to the following conditions:
|
16 martin 1.23 //
|
17 martin 1.22 // The above copyright notice and this permission notice shall be included
18 // in all copies or substantial portions of the Software.
|
19 martin 1.23 //
|
20 martin 1.22 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
21 martin 1.23 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
22 martin 1.22 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23 // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
24 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
25 // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
26 // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
27 martin 1.23 //
|
28 martin 1.22 //////////////////////////////////////////////////////////////////////////
|
29 mike 1.2 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include "ThreadPool.h"
33 #include "Thread.h"
34 #include <exception>
35 #include <Pegasus/Common/Tracer.h>
36 #include "Time.h"
37
38 PEGASUS_USING_STD;
39
40 PEGASUS_NAMESPACE_BEGIN
41
42 ///////////////////////////////////////////////////////////////////////////////
43 //
44 // ThreadPool
45 //
46 ///////////////////////////////////////////////////////////////////////////////
47
|
48 kumpf 1.4 ThreadPool::ThreadPool(
49 Sint16 initialSize,
50 const char* key,
51 Sint16 minThreads,
52 Sint16 maxThreads,
53 struct timeval
54 &deallocateWait)
55 : _maxThreads(maxThreads),
56 _minThreads(minThreads),
57 _currentThreads(0),
58 _idleThreads(),
59 _runningThreads(),
60 _dying(0)
|
61 mike 1.2 {
62 _deallocateWait.tv_sec = deallocateWait.tv_sec;
63 _deallocateWait.tv_usec = deallocateWait.tv_usec;
64
65 memset(_key, 0x00, 17);
66 if (key != 0)
67 {
68 strncpy(_key, key, 16);
69 }
70
71 if ((_maxThreads > 0) && (_maxThreads < initialSize))
72 {
73 _maxThreads = initialSize;
74 }
75
76 if (_minThreads > initialSize)
77 {
78 _minThreads = initialSize;
79 }
80
81 for (int i = 0; i < initialSize; i++)
82 mike 1.2 {
83 _addToIdleThreadsQueue(_initializeThread());
84 }
85 }
86
87 ThreadPool::~ThreadPool()
88 {
89 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::~ThreadPool");
90
91 try
92 {
93 // Set the dying flag so all thread know the destructor has been
94 // entered
95 _dying++;
|
96 marek 1.15 PEG_TRACE((TRC_THREAD, Tracer::LEVEL3,
|
97 marek 1.9 "Cleaning up %d idle threads.", _currentThreads.get()));
|
98 mike 1.2
99 while (_currentThreads.get() > 0)
100 {
|
101 kumpf 1.4 Thread* thread = _idleThreads.remove_front();
|
102 mike 1.2 if (thread != 0)
103 {
104 _cleanupThread(thread);
105 _currentThreads--;
106 }
107 else
108 {
109 Threads::yield();
110 }
111 }
112 }
|
113 kumpf 1.4 catch (...)
|
114 mike 1.2 {
115 }
116 }
117
|
118 kumpf 1.4 ThreadReturnType PEGASUS_THREAD_CDECL ThreadPool::_loop(void* parm)
|
119 mike 1.2 {
120 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
121
122 try
123 {
124 Thread *myself = (Thread *) parm;
125 PEGASUS_ASSERT(myself != 0);
126
127 // Set myself into thread specific storage
128 // This will allow code to get its own Thread
129 Thread::setCurrent(myself);
130
131 ThreadPool *pool = (ThreadPool *) myself->get_parm();
132 PEGASUS_ASSERT(pool != 0);
133
134 Semaphore *sleep_sem = 0;
135 struct timeval *lastActivityTime = 0;
136
137 try
138 {
|
139 mike 1.21 sleep_sem = (Semaphore *) myself->reference_tsd(TSD_SLEEP_SEM);
|
140 mike 1.2 myself->dereference_tsd();
141 PEGASUS_ASSERT(sleep_sem != 0);
142
143 lastActivityTime =
144 (struct timeval *) myself->
|
145 mike 1.21 reference_tsd(TSD_LAST_ACTIVITY_TIME);
|
146 mike 1.2 myself->dereference_tsd();
147 PEGASUS_ASSERT(lastActivityTime != 0);
148 }
|
149 kumpf 1.4 catch (...)
|
150 mike 1.2 {
|
151 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
152 kumpf 1.4 "ThreadPool::_loop: Failure getting sleep_sem or "
153 "lastActivityTime.");
|
154 mike 1.2 PEGASUS_ASSERT(false);
155 pool->_idleThreads.remove(myself);
156 pool->_currentThreads--;
157 PEG_METHOD_EXIT();
|
158 kumpf 1.4 return (ThreadReturnType) 1;
|
159 mike 1.2 }
160
161 while (1)
162 {
163 try
164 {
165 sleep_sem->wait();
166 }
|
167 kumpf 1.4 catch (...)
|
168 mike 1.2 {
|
169 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
170 kumpf 1.4 "ThreadPool::_loop: failure on sleep_sem->wait().");
|
171 mike 1.2 PEGASUS_ASSERT(false);
172 pool->_idleThreads.remove(myself);
173 pool->_currentThreads--;
174 PEG_METHOD_EXIT();
|
175 kumpf 1.4 return (ThreadReturnType) 1;
|
176 mike 1.2 }
177
178 // When we awaken we reside on the _runningThreads queue, not the
179 // _idleThreads queue.
180
181 ThreadReturnType(PEGASUS_THREAD_CDECL * work) (void *) = 0;
|
182 kumpf 1.16 void *workParm = 0;
|
183 mike 1.2 Semaphore *blocking_sem = 0;
184
185 try
186 {
187 work = (ThreadReturnType(PEGASUS_THREAD_CDECL *) (void *))
|
188 mike 1.21 myself->reference_tsd(TSD_WORK_FUNC);
|
189 mike 1.2 myself->dereference_tsd();
|
190 mike 1.21 workParm = myself->reference_tsd(TSD_WORK_PARM);
|
191 mike 1.2 myself->dereference_tsd();
192 blocking_sem =
|
193 mike 1.21 (Semaphore *) myself->reference_tsd(TSD_BLOCKING_SEM);
|
194 mike 1.2 myself->dereference_tsd();
195 }
|
196 kumpf 1.4 catch (...)
|
197 mike 1.2 {
|
198 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
199 kumpf 1.4 "ThreadPool::_loop: Failure accessing work func, work "
200 "parm, or blocking sem.");
|
201 mike 1.2 PEGASUS_ASSERT(false);
202 pool->_idleThreads.remove(myself);
203 pool->_currentThreads--;
204 PEG_METHOD_EXIT();
|
205 kumpf 1.4 return (ThreadReturnType) 1;
|
206 mike 1.2 }
207
208 if (work == 0)
209 {
|
210 marek 1.9 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
|
211 kumpf 1.4 "ThreadPool::_loop: work func is 0, meaning we should "
212 "exit.");
|
213 mike 1.2 break;
214 }
215
216 Time::gettimeofday(lastActivityTime);
217
218 try
219 {
|
220 marek 1.9 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
|
221 mike 1.2 "Work starting.");
|
222 kumpf 1.16 work(workParm);
|
223 marek 1.9 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
|
224 mike 1.2 "Work finished.");
225 }
|
226 kumpf 1.4 catch (Exception& e)
|
227 mike 1.2 {
|
228 thilo.boehm 1.18 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
229 "Exception from work in ThreadPool::_loop: %s",
230 (const char*)e.getMessage().getCString()));
|
231 mike 1.2 }
|
232 kumpf 1.4 catch (const exception& e)
|
233 mike 1.2 {
|
234 thilo.boehm 1.18 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
235 "Exception from work in ThreadPool::_loop: %s",e.what()));
|
236 mike 1.2 }
|
237 kumpf 1.4 catch (...)
|
238 mike 1.2 {
|
239 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
240 kumpf 1.4 "Unknown exception from work in ThreadPool::_loop.");
|
241 mike 1.2 }
242
243 // put myself back onto the available list
244 try
245 {
246 Time::gettimeofday(lastActivityTime);
247 if (blocking_sem != 0)
248 {
249 blocking_sem->signal();
250 }
251
252 pool->_runningThreads.remove(myself);
253 pool->_idleThreads.insert_front(myself);
254 }
|
255 kumpf 1.4 catch (...)
|
256 mike 1.2 {
|
257 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
258 kumpf 1.4 "ThreadPool::_loop: Adding thread to idle pool failed.");
|
259 mike 1.2 PEGASUS_ASSERT(false);
260 pool->_currentThreads--;
261 PEG_METHOD_EXIT();
|
262 kumpf 1.4 return (ThreadReturnType) 1;
|
263 mike 1.2 }
264 }
265 }
|
266 kumpf 1.4 catch (const Exception & e)
|
267 mike 1.2 {
|
268 thilo.boehm 1.18 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
269 "Caught exception: \"%s\". Exiting _loop.",
270 (const char*)e.getMessage().getCString()));
|
271 mike 1.2 }
|
272 kumpf 1.4 catch (...)
|
273 mike 1.2 {
|
274 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
275 kumpf 1.4 "Caught unrecognized exception. Exiting _loop.");
|
276 mike 1.2 }
277
278 PEG_METHOD_EXIT();
|
279 kumpf 1.4 return (ThreadReturnType) 0;
|
280 mike 1.2 }
281
|
282 kumpf 1.4 ThreadStatus ThreadPool::allocate_and_awaken(
283 void* parm,
284 ThreadReturnType (PEGASUS_THREAD_CDECL* work) (void*),
285 Semaphore* blocking)
|
286 mike 1.2 {
287 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
288
289 // Allocate_and_awaken will not run if the _dying flag is set.
290 // Once the lock is acquired, ~ThreadPool will not change
291 // the value of _dying until the lock is released.
292
293 try
294 {
295 if (_dying.get())
296 {
|
297 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL3,
|
298 kumpf 1.4 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
|
299 mike 1.2 return PEGASUS_THREAD_UNAVAILABLE;
300 }
301 struct timeval start;
302 Time::gettimeofday(&start);
303 Thread *th = 0;
304
305 th = _idleThreads.remove_front();
306
307 if (th == 0)
308 {
309 if ((_maxThreads == 0) ||
310 (_currentThreads.get() < Uint32(_maxThreads)))
311 {
312 th = _initializeThread();
313 }
314 }
315
316 if (th == 0)
317 {
|
318 marek 1.15 PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
|
319 kumpf 1.3 "ThreadPool::allocate_and_awaken: Insufficient resources: "
320 " pool = %s, running threads = %d, idle threads = %d",
|
321 marek 1.9 _key, _runningThreads.size(), _idleThreads.size()));
|
322 mike 1.2 return PEGASUS_THREAD_INSUFFICIENT_RESOURCES;
323 }
324
325 // initialize the thread data with the work function and parameters
|
326 marek 1.9 PEG_TRACE((TRC_THREAD, Tracer::LEVEL4,
|
327 marek 1.14 "Initializing thread(%s)"
328 " with work function and parameters: parm = %p",
329 Threads::id(th->getThreadHandle().thid).buffer,
|
330 marek 1.9 parm));
|
331 mike 1.2
|
332 mike 1.21 th->delete_tsd(TSD_WORK_FUNC);
333 th->put_tsd(TSD_WORK_FUNC, NULL,
|
334 mike 1.2 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
335 (void *)), (void *) work);
|
336 mike 1.21 th->delete_tsd(TSD_WORK_PARM);
337 th->put_tsd(TSD_WORK_PARM, NULL, sizeof (void *), parm);
338 th->delete_tsd(TSD_BLOCKING_SEM);
|
339 mike 1.2 if (blocking != 0)
|
340 mike 1.21 th->put_tsd(TSD_BLOCKING_SEM, NULL, sizeof (Semaphore *), blocking);
|
341 mike 1.2
342 // put the thread on the running list
343 _runningThreads.insert_front(th);
344
345 // signal the thread's sleep semaphore to awaken it
|
346 mike 1.21 Semaphore *sleep_sem = (Semaphore *) th->reference_tsd(TSD_SLEEP_SEM);
|
347 mike 1.2 PEGASUS_ASSERT(sleep_sem != 0);
348
|
349 kumpf 1.10 PEG_TRACE_CSTRING(TRC_THREAD, Tracer::LEVEL4,
350 "Signal thread to awaken");
|
351 mike 1.2 sleep_sem->signal();
352 th->dereference_tsd();
353 }
|
354 kumpf 1.4 catch (...)
|
355 mike 1.2 {
|
356 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
357 mike 1.2 "ThreadPool::allocate_and_awaken: Operation Failed.");
358 PEG_METHOD_EXIT();
359 // ATTN: Error result has not yet been defined
360 return PEGASUS_THREAD_SETUP_FAILURE;
361 }
362 PEG_METHOD_EXIT();
363 return PEGASUS_THREAD_OK;
364 }
365
366 // caller is responsible for only calling this routine during slack periods
367 // but should call it at least once per _deallocateWait interval.
368
369 Uint32 ThreadPool::cleanupIdleThreads()
370 {
371 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupIdleThreads");
372
373 Uint32 numThreadsCleanedUp = 0;
374
|
375 kumpf 1.7 Uint32 numIdleThreads = _idleThreads.size();
376 for (Uint32 i = 0; i < numIdleThreads; i++)
|
377 mike 1.2 {
378 // Do not dip below the minimum thread count
379 if (_currentThreads.get() <= (Uint32) _minThreads)
380 {
381 break;
382 }
383
384 Thread *thread = _idleThreads.remove_back();
385
386 // If there are no more threads in the _idleThreads queue, we're
387 // done.
388 if (thread == 0)
389 {
390 break;
391 }
392
|
393 mike 1.21 void* tsd = thread->reference_tsd(TSD_LAST_ACTIVITY_TIME);
|
394 kumpf 1.20 struct timeval *lastActivityTime =
395 reinterpret_cast<struct timeval*>(tsd);
|
396 kumpf 1.17 PEGASUS_ASSERT(lastActivityTime != 0);
|
397 mike 1.2
398 Boolean cleanupThisThread =
399 _timeIntervalExpired(lastActivityTime, &_deallocateWait);
400 thread->dereference_tsd();
401
402 if (cleanupThisThread)
403 {
404 _cleanupThread(thread);
405 _currentThreads--;
406 numThreadsCleanedUp++;
407 }
408 else
409 {
410 _idleThreads.insert_front(thread);
411 }
412 }
413
414 PEG_METHOD_EXIT();
415 return numThreadsCleanedUp;
416 }
417
418 mike 1.2 void ThreadPool::_cleanupThread(Thread * thread)
419 {
420 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::cleanupThread");
421
|
422 mike 1.21 // Set the TSD_WORK_FUNC and TSD_WORK_PARM to 0 so _loop() knows to exit.
423 thread->delete_tsd(TSD_WORK_FUNC);
424 thread->put_tsd(TSD_WORK_FUNC, 0,
|
425 mike 1.2 sizeof (ThreadReturnType(PEGASUS_THREAD_CDECL *)
426 (void *)), (void *) 0);
|
427 mike 1.21 thread->delete_tsd(TSD_WORK_PARM);
428 thread->put_tsd(TSD_WORK_PARM, 0, sizeof (void *), 0);
|
429 mike 1.2
430 // signal the thread's sleep semaphore to awaken it
|
431 mike 1.21 Semaphore *sleep_sem = (Semaphore *) thread->reference_tsd(TSD_SLEEP_SEM);
|
432 mike 1.2 PEGASUS_ASSERT(sleep_sem != 0);
433 sleep_sem->signal();
434 thread->dereference_tsd();
435
436 thread->join();
437 delete thread;
438
439 PEG_METHOD_EXIT();
440 }
441
|
442 kumpf 1.4 Boolean ThreadPool::_timeIntervalExpired(
443 struct timeval* start,
444 struct timeval* interval)
|
445 mike 1.2 {
|
446 kumpf 1.6 PEGASUS_ASSERT(interval != 0);
447
|
448 mike 1.2 // never time out if the interval is zero
|
449 kumpf 1.6 if ((interval->tv_sec == 0) && (interval->tv_usec == 0))
|
450 mike 1.2 {
451 return false;
452 }
453
454 struct timeval now, finish, remaining;
455 Uint32 usec;
456 Time::gettimeofday(&now);
|
457 karl 1.19
458 memset(&remaining, 0, sizeof(remaining));
|
459 mike 1.2
460 finish.tv_sec = start->tv_sec + interval->tv_sec;
461 usec = start->tv_usec + interval->tv_usec;
462 finish.tv_sec += (usec / 1000000);
463 usec %= 1000000;
464 finish.tv_usec = usec;
465
466 return (Time::subtract(&remaining, &finish, &now) != 0);
467 }
468
469 void ThreadPool::_deleteSemaphore(void *p)
470 {
471 delete(Semaphore *) p;
472 }
473
474 Thread *ThreadPool::_initializeThread()
475 {
476 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_initializeThread");
477
478 Thread *th = (Thread *) new Thread(_loop, this, false);
479
480 mike 1.2 // allocate a sleep semaphore and pass it in the thread context
481 // initial count is zero, loop function will sleep until
482 // we signal the semaphore
483 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
|
484 kumpf 1.4 th->put_tsd(
|
485 mike 1.21 TSD_SLEEP_SEM, &_deleteSemaphore, sizeof(Semaphore), (void*) sleep_sem);
|
486 mike 1.2
|
487 kumpf 1.4 struct timeval* lastActivityTime =
|
488 mike 1.2 (struct timeval *)::operator new(sizeof (struct timeval));
489 Time::gettimeofday(lastActivityTime);
490
|
491 kumpf 1.4 th->put_tsd(
|
492 mike 1.21 TSD_LAST_ACTIVITY_TIME,
|
493 kumpf 1.4 thread_data::default_delete,
494 sizeof(struct timeval),
495 (void*) lastActivityTime);
|
496 mike 1.2 // thread will enter _loop() and sleep on sleep_sem until we signal it
497
498 if (th->run() != PEGASUS_THREAD_OK)
499 {
|
500 marek 1.15 PEG_TRACE((TRC_THREAD, Tracer::LEVEL1,
|
501 marek 1.9 "Could not create thread. Error code is %d.", errno));
|
502 mike 1.2 delete th;
503 return 0;
504 }
505 _currentThreads++;
506
507 PEG_METHOD_EXIT();
508 return th;
509 }
510
511 void ThreadPool::_addToIdleThreadsQueue(Thread * th)
512 {
513 if (th == 0)
514 {
|
515 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
516 kumpf 1.4 "ThreadPool::_addToIdleThreadsQueue: Thread pointer is null.");
|
517 mike 1.2 throw NullPointer();
518 }
519
520 try
521 {
522 _idleThreads.insert_front(th);
523 }
|
524 kumpf 1.4 catch (...)
|
525 mike 1.2 {
|
526 marek 1.15 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
527 kumpf 1.4 "ThreadPool::_addToIdleThreadsQueue: _idleThreads.insert_front "
528 "failed.");
|
529 mike 1.2 }
530 }
531
532 PEGASUS_NAMESPACE_END
|