1 karl 1.56 //%2003////////////////////////////////////////////////////////////////////////
|
2 mike 1.2 //
|
3 karl 1.56 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Development
4 // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
|
7 mike 1.2 //
8 // Permission is hereby granted, free of charge, to any person obtaining a copy
|
9 chip 1.11 // of this software and associated documentation files (the "Software"), to
10 // deal in the Software without restriction, including without limitation the
11 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
12 mike 1.2 // sell copies of the Software, and to permit persons to whom the Software is
13 // furnished to do so, subject to the following conditions:
|
14 kumpf 1.17 //
|
15 chip 1.11 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
|
16 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
17 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
|
18 chip 1.11 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
19 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
21 mike 1.2 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 //
24 //==============================================================================
25 //
26 // Author: Mike Day (mdday@us.ibm.com)
27 //
28 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
|
29 chip 1.11 // added nsk platform support
|
30 kumpf 1.59 // Roger Kumpf, Hewlett-Packard Company (roger_kumpf@hp.com)
|
31 a.arora 1.64 // Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
|
32 mike 1.2 //
33 //%/////////////////////////////////////////////////////////////////////////////
34
35 #include "Thread.h"
36 #include <Pegasus/Common/IPC.h>
|
37 kumpf 1.14 #include <Pegasus/Common/Tracer.h>
|
38 mike 1.2
39 #if defined(PEGASUS_OS_TYPE_WINDOWS)
|
40 chip 1.11 # include "ThreadWindows.cpp"
|
41 mike 1.2 #elif defined(PEGASUS_OS_TYPE_UNIX)
42 # include "ThreadUnix.cpp"
43 #elif defined(PEGASUS_OS_TYPE_NSK)
44 # include "ThreadNsk.cpp"
45 #else
46 # error "Unsupported platform"
47 #endif
48
49 PEGASUS_NAMESPACE_BEGIN
50
|
51 mday 1.42
|
52 chip 1.11 void thread_data::default_delete(void * data)
53 {
|
54 mike 1.2 if( data != NULL)
|
55 chip 1.11 ::operator delete(data);
|
56 mike 1.2 }
57
|
58 chuck 1.43 // l10n start
59 void language_delete(void * data)
60 {
61 if( data != NULL)
62 {
|
63 a.arora 1.64 AutoPtr<AcceptLanguages> al(static_cast<AcceptLanguages *>(data));
|
64 chuck 1.43 }
65 }
66 // l10n end
67
|
68 mike 1.2 Boolean Thread::_signals_blocked = false;
|
69 chuck 1.37 // l10n
|
70 marek 1.63 #ifndef PEGASUS_OS_ZOS
|
71 mday 1.48 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key = -1;
|
72 marek 1.63 #else
73 PEGASUS_THREAD_KEY_TYPE Thread::_platform_thread_key;
74 #endif
|
75 chuck 1.37 Boolean Thread::_key_initialized = false;
|
76 chuck 1.41 Boolean Thread::_key_error = false;
|
77 chuck 1.37
|
78 mike 1.2
79 // for non-native implementations
|
80 chip 1.11 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
|
81 mike 1.2 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
82 {
|
83 a.arora 1.64 AutoPtr<cleanup_handler> cu(new cleanup_handler(routine, parm));
|
84 chip 1.11 try
85 {
|
86 a.arora 1.64 _cleanup.insert_first(cu.get());
|
87 chip 1.11 }
88 catch(IPCException&)
|
89 mike 1.2 {
|
90 chip 1.11 throw;
|
91 mike 1.2 }
|
92 a.arora 1.64 cu.release();
|
93 mike 1.2 return;
94 }
|
95 chip 1.11
|
96 mike 1.2 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
97 {
|
98 a.arora 1.64 AutoPtr<cleanup_handler> cu ;
|
99 chip 1.11 try
100 {
|
101 a.arora 1.64 cu.reset(_cleanup.remove_first());
|
102 mike 1.2 }
|
103 chip 1.11 catch(IPCException&)
|
104 mike 1.2 {
|
105 chip 1.11 PEGASUS_ASSERT(0);
|
106 mike 1.2 }
107 if(execute == true)
108 cu->execute();
109 }
|
110 chip 1.11
|
111 mike 1.2 #endif
112
113
|
114 kumpf 1.8 //thread_data *Thread::put_tsd(const Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
|
115 mike 1.2
116
|
117 chip 1.11 #ifndef PEGASUS_THREAD_EXIT_NATIVE
118 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
119 {
120 // execute the cleanup stack and then return
|
121 mike 1.2 while( _cleanup.count() )
122 {
|
123 chip 1.11 try
124 {
125 cleanup_pop(true);
126 }
127 catch(IPCException&)
128 {
129 PEGASUS_ASSERT(0);
130 break;
|
131 mike 1.2 }
132 }
133 _exit_code = exit_code;
134 exit_thread(exit_code);
|
135 mday 1.4 _handle.thid = 0;
|
136 mike 1.2 }
137
138
139 #endif
140
|
141 chuck 1.37 // l10n start
|
142 chuck 1.39 Sint8 Thread::initializeKey()
143 {
144 PEG_METHOD_ENTER(TRC_THREAD, "Thread::initializeKey");
145 if (!Thread::_key_initialized)
146 {
|
147 chuck 1.41 if (Thread::_key_error)
148 {
149 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
150 "Thread: ERROR - thread key error");
151 return -1;
152 }
153
|
154 chuck 1.39 if (pegasus_key_create(&Thread::_platform_thread_key) == 0)
155 {
156 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
157 "Thread: able to create a thread key");
158 Thread::_key_initialized = true;
159 }
160 else
161 {
162 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
163 "Thread: ERROR - unable to create a thread key");
|
164 chuck 1.41 Thread::_key_error = true;
|
165 chuck 1.39 return -1;
166 }
167 }
168
169 PEG_METHOD_EXIT();
170 return 0;
171 }
172
|
173 chuck 1.37 Thread * Thread::getCurrent()
174 {
|
175 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getCurrent");
|
176 chuck 1.40 if (Thread::initializeKey() != 0)
|
177 chuck 1.39 {
178 return NULL;
179 }
|
180 chuck 1.38 PEG_METHOD_EXIT();
|
181 chuck 1.39 return (Thread *)pegasus_get_thread_specific(_platform_thread_key);
182 }
183
184 void Thread::setCurrent(Thread * thrd)
185 {
186 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setCurrent");
187 if (Thread::initializeKey() == 0)
188 {
189 if (pegasus_set_thread_specific(Thread::_platform_thread_key,
190 (void *) thrd) == 0)
191 {
192 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
193 "Successful set Thread * into thread specific storage");
194 }
195 else
196 {
197 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
198 "ERROR: got error setting Thread * into thread specific storage");
199 }
200 }
201 PEG_METHOD_EXIT();
|
202 chuck 1.37 }
203
204 AcceptLanguages * Thread::getLanguages()
205 {
|
206 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::getLanguages");
|
207 chuck 1.37
208 Thread * curThrd = Thread::getCurrent();
209 if (curThrd == NULL)
210 return NULL;
211 AcceptLanguages * acceptLangs =
212 (AcceptLanguages *)curThrd->reference_tsd("acceptLanguages");
213 curThrd->dereference_tsd();
214 PEG_METHOD_EXIT();
215 return acceptLangs;
216 }
217
218 void Thread::setLanguages(AcceptLanguages *langs) //l10n
219 {
|
220 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::setLanguages");
|
221 chuck 1.37
222 Thread * currentThrd = Thread::getCurrent();
223 if (currentThrd != NULL)
224 {
225 // deletes the old tsd and creates a new one
226 currentThrd->put_tsd("acceptLanguages",
|
227 chuck 1.43 language_delete,
|
228 chuck 1.37 sizeof(AcceptLanguages *),
229 langs);
230 }
231
232 PEG_METHOD_EXIT();
233 }
234
235 void Thread::clearLanguages() //l10n
236 {
|
237 chuck 1.39 PEG_METHOD_ENTER(TRC_THREAD, "Thread::clearLanguages");
|
238 chuck 1.37
239 Thread * currentThrd = Thread::getCurrent();
240 if (currentThrd != NULL)
241 {
242 // deletes the old tsd
243 currentThrd->delete_tsd("acceptLanguages");
244 }
245
246 PEG_METHOD_EXIT();
247 }
248 // l10n end
249
|
250 kumpf 1.57 #if 0
|
251 mday 1.52 // two special synchronization classes for ThreadPool
|
252 kumpf 1.57 //
|
253 mday 1.52
|
254 kumpf 1.57 class timed_mutex
|
255 mday 1.52 {
256 public:
257 timed_mutex(Mutex* mut, int msec)
|
258 kumpf 1.57 :_mut(mut)
|
259 mday 1.52 {
|
260 kumpf 1.57 _mut->timed_lock(msec, pegasus_thread_self());
|
261 mday 1.52 }
262 ~timed_mutex(void)
263 {
|
264 kumpf 1.57 _mut->unlock();
|
265 mday 1.52 }
266 Mutex* _mut;
267 };
|
268 kumpf 1.57 #endif
|
269 mday 1.52
270 class try_mutex
271 {
272 public:
273 try_mutex(Mutex* mut)
274 :_mut(mut)
275 {
276 _mut->try_lock(pegasus_thread_self());
277 }
278 ~try_mutex(void)
279 {
280 _mut->unlock();
281 }
282
283 Mutex* _mut;
284 };
285
|
286 mday 1.58 class auto_int
287 {
288 public:
289 auto_int(AtomicInt* num)
290 : _int(num)
291 {
292 _int->operator++();
293 }
294 ~auto_int(void)
295 {
296 _int->operator--();
297 }
298 AtomicInt *_int;
299 };
300
301
302 AtomicInt _idle_control;
|
303 mday 1.52
|
304 mday 1.20 DQueue<ThreadPool> ThreadPool::_pools(true);
305
306 void ThreadPool::kill_idle_threads(void)
307 {
308 static struct timeval now, last = {0, 0};
309
310 pegasus_gettimeofday(&now);
311 if(now.tv_sec - last.tv_sec > 5)
312 {
313 _pools.lock();
314 ThreadPool *p = _pools.next(0);
315 while(p != 0)
316 {
317 try
318 {
319 p->kill_dead_threads();
320 }
321 catch(...)
322 {
323 }
324 p = _pools.next(p);
325 mday 1.20 }
326 _pools.unlock();
327 pegasus_gettimeofday(&last);
328 }
329 }
330
331
|
332 mike 1.2 ThreadPool::ThreadPool(Sint16 initial_size,
|
333 kumpf 1.8 const Sint8 *key,
|
334 mike 1.2 Sint16 min,
335 Sint16 max,
336 struct timeval & alloc_wait,
|
337 chip 1.11 struct timeval & dealloc_wait,
|
338 mike 1.2 struct timeval & deadlock_detect)
339 : _max_threads(max), _min_threads(min),
|
340 mday 1.12 _current_threads(0),
341 _pool(true), _running(true),
|
342 mike 1.2 _dead(true), _dying(0)
343 {
344 _allocate_wait.tv_sec = alloc_wait.tv_sec;
345 _allocate_wait.tv_usec = alloc_wait.tv_usec;
|
346 chip 1.11 _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
|
347 mike 1.2 _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
348 _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
349 _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
350 memset(_key, 0x00, 17);
351 if(key != 0)
352 strncpy(_key, key, 16);
|
353 mday 1.21 if(_max_threads > 0 && _max_threads < initial_size)
|
354 mike 1.2 _max_threads = initial_size;
355 if(_min_threads > initial_size)
356 _min_threads = initial_size;
|
357 chip 1.11
|
358 mike 1.2 int i;
359 for(i = 0; i < initial_size; i++)
360 {
361 _link_pool(_init_thread());
362 }
|
363 mday 1.20 _pools.insert_last(this);
|
364 mike 1.2 }
365
|
366 chip 1.11
|
367 mday 1.52 // Note: <<< Fri Oct 17 09:19:03 2003 mdd >>>
368 // the pegasus_yield() calls that preceed each th->join() are to
369 // give a thread on the running list a chance to reach a cancellation
370 // point before the join
|
371 mike 1.2
372 ThreadPool::~ThreadPool(void)
373 {
|
374 kumpf 1.57 PEG_METHOD_ENTER(TRC_THREAD, "Thread::~ThreadPool");
|
375 mday 1.35 try
|
376 mday 1.47 {
|
377 kumpf 1.57 // Set the dying flag so all thread know the destructor has been entered
|
378 mday 1.58 _dying++;
379
|
380 mday 1.52 // remove from the global pools list
|
381 mday 1.35 _pools.remove(this);
|
382 mday 1.52
383 // start with idle threads.
|
384 mday 1.35 Thread *th = 0;
385 th = _pool.remove_first();
|
386 mday 1.52 Semaphore* sleep_sem;
387
|
388 mday 1.35 while(th != 0)
|
389 mike 1.2 {
|
390 mday 1.52 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
391 kumpf 1.57 PEGASUS_ASSERT(sleep_sem != 0);
392
|
393 mday 1.35 if(sleep_sem == 0)
394 {
395 th->dereference_tsd();
396 }
|
397 kumpf 1.57 else
398 {
399 // Signal to get the thread out of the work loop.
400 sleep_sem->signal();
|
401 mday 1.52
|
402 kumpf 1.57 // Signal to get the thread past the end. See the comment
403 // "wait to be awakend by the thread pool destructor"
404 // Note: the current implementation of Thread for Windows
405 // does not implement "pthread" cancelation points so this
406 // is needed.
407 sleep_sem->signal();
408 th->dereference_tsd();
409 th->cancel();
410 th->join();
411 delete th;
412 }
|
413 mday 1.35 th = _pool.remove_first();
|
414 mike 1.2 }
|
415 kumpf 1.57
|
416 mday 1.58 while(_idle_control.value())
417 pegasus_yield();
418
|
419 mday 1.47 th = _dead.remove_first();
|
420 mday 1.35 while(th != 0)
421 {
|
422 mday 1.52 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
423 kumpf 1.57 PEGASUS_ASSERT(sleep_sem != 0);
|
424 mday 1.58
|
425 mday 1.47 if(sleep_sem == 0)
426 {
427 th->dereference_tsd();
428 }
|
429 kumpf 1.57 else
430 {
431 //ATTN-DME-P3-20030322: _dead queue processing in
432 //ThreadPool::~ThreadPool is inconsistent with the
433 //processing in kill_dead_threads. Is this correct?
|
434 mday 1.58
|
435 kumpf 1.57 // signal the thread's sleep semaphore
436 sleep_sem->signal();
437 sleep_sem->signal();
438 th->dereference_tsd();
439 th->cancel();
440 th->join();
441 delete th;
442 }
|
443 mday 1.47 th = _dead.remove_first();
|
444 mday 1.35 }
|
445 mday 1.52
|
446 mday 1.47 {
|
447 mday 1.52 th = _running.remove_first();
448 while(th != 0)
449 {
450 // signal the thread's sleep semaphore
451
452 sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
453 kumpf 1.57 PEGASUS_ASSERT(sleep_sem != 0);
454
|
455 mday 1.52 if(sleep_sem == 0 )
456 {
457 th->dereference_tsd();
458 }
|
459 kumpf 1.57 else
460 {
461 sleep_sem->signal();
462 sleep_sem->signal();
463 th->dereference_tsd();
464 th->cancel();
465 pegasus_yield();
|
466 mday 1.52
|
467 kumpf 1.57 th->join();
468 delete th;
469 }
|
470 mday 1.52 th = _running.remove_first();
|
471 kumpf 1.57 }
472 }
|
473 mike 1.2 }
|
474 mday 1.52
|
475 mday 1.35 catch(...)
|
476 mike 1.2 {
477 }
478 }
479
|
480 chip 1.11 // make this static to the class
|
481 mike 1.2 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
482 {
|
483 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::_loop");
484
|
485 mike 1.2 Thread *myself = (Thread *)parm;
486 if(myself == 0)
|
487 kumpf 1.14 {
|
488 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
489 "ThreadPool::_loop: Thread pointer is null");
|
490 kumpf 1.14 PEG_METHOD_EXIT();
|
491 mike 1.2 throw NullPointer();
|
492 kumpf 1.14 }
|
493 chuck 1.37
494 // l10n
495 // Set myself into thread specific storage
|
496 chuck 1.38 // This will allow code to get its own Thread
|
497 chuck 1.39 Thread::setCurrent(myself);
498
|
499 mike 1.2 ThreadPool *pool = (ThreadPool *)myself->get_parm();
|
500 kumpf 1.14 if(pool == 0 )
501 {
|
502 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
503 "ThreadPool::_loop: ThreadPool pointer is null");
|
504 kumpf 1.14 PEG_METHOD_EXIT();
|
505 mike 1.2 throw NullPointer();
|
506 kumpf 1.14 }
|
507 mday 1.52 if(pool->_dying.value())
508 {
|
509 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
510 "ThreadPool::_loop: ThreadPool is dying(1)");
|
511 mday 1.52 PEG_METHOD_EXIT();
512 return((PEGASUS_THREAD_RETURN)0);
513 }
514
|
515 mike 1.5 Semaphore *sleep_sem = 0;
|
516 mday 1.13 Semaphore *blocking_sem = 0;
517
|
518 mike 1.5 struct timeval *deadlock_timer = 0;
|
519 mday 1.47
|
520 chip 1.11 try
|
521 mike 1.2 {
522 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
523 myself->dereference_tsd();
524 deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
|
525 mday 1.22 myself->dereference_tsd();
|
526 mike 1.2 }
|
527 mday 1.52
|
528 mday 1.30 catch(...)
529 {
|
530 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
531 "ThreadPool::_loop: Failure getting sleep_sem or deadlock_timer");
|
532 mday 1.30 PEG_METHOD_EXIT();
|
533 mday 1.52 return((PEGASUS_THREAD_RETURN)0);
|
534 mday 1.30 }
535
|
536 mike 1.2 if(sleep_sem == 0 || deadlock_timer == 0)
|
537 kumpf 1.14 {
|
538 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
539 "ThreadPool::_loop: sleep_sem or deadlock_timer are null.");
|
540 kumpf 1.14 PEG_METHOD_EXIT();
|
541 mday 1.52 return((PEGASUS_THREAD_RETURN)0);
|
542 kumpf 1.14 }
|
543 mike 1.2
|
544 mday 1.54 while(1)
|
545 mike 1.2 {
|
546 mday 1.58 if(pool->_dying.value())
547 break;
548
|
549 mday 1.52 try
550 {
551 sleep_sem->wait();
552 }
553 catch(IPCException& )
554 {
|
555 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
556 "ThreadPool::_loop: failure on sleep_sem->wait().");
|
557 mday 1.52 PEG_METHOD_EXIT();
558 return((PEGASUS_THREAD_RETURN)0);
559 }
560
|
561 mike 1.2 // when we awaken we reside on the running queue, not the pool queue
|
562 mday 1.47
|
563 mike 1.5 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
564 void *parm = 0;
|
565 mike 1.2
|
566 chip 1.11 try
|
567 mike 1.2 {
568 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
569 myself->reference_tsd("work func");
570 myself->dereference_tsd();
571 parm = myself->reference_tsd("work parm");
572 myself->dereference_tsd();
|
573 mday 1.13 blocking_sem = (Semaphore *)myself->reference_tsd("blocking sem");
574 myself->dereference_tsd();
575
|
576 mike 1.2 }
|
577 mike 1.6 catch(IPCException &)
|
578 mike 1.2 {
|
579 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
580 "ThreadPool::_loop: Failure accessing work func, work parm, or blocking sem.");
|
581 kumpf 1.14 PEG_METHOD_EXIT();
|
582 mday 1.52 return((PEGASUS_THREAD_RETURN)0);
|
583 mike 1.2 }
|
584 mday 1.52
|
585 mike 1.2 if(_work == 0)
|
586 kumpf 1.14 {
|
587 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
588 "ThreadPool::_loop: work func is null.");
|
589 kumpf 1.14 PEG_METHOD_EXIT();
|
590 kumpf 1.57 return((PEGASUS_THREAD_RETURN)0);
|
591 kumpf 1.14 }
|
592 kumpf 1.24
593 if(_work ==
594 (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) &_undertaker)
595 {
|
596 kumpf 1.57 PEG_METHOD_EXIT();
|
597 mday 1.23 _work(parm);
|
598 kumpf 1.24 }
599
|
600 mike 1.2 gettimeofday(deadlock_timer, NULL);
|
601 kumpf 1.57
602 if (pool->_dying.value() == 0)
|
603 mday 1.20 {
|
604 kumpf 1.57 try
605 {
606 _work(parm);
607 }
|
608 kumpf 1.59 catch(Exception & e)
609 {
610 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
611 String("Exception from _work in ThreadPool::_loop: ") +
612 e.getMessage());
613 PEG_METHOD_EXIT();
614 return((PEGASUS_THREAD_RETURN)0);
615 }
|
616 kumpf 1.57 catch(...)
617 {
618 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
619 "ThreadPool::_loop: execution of _work failed.");
620 PEG_METHOD_EXIT();
621 return((PEGASUS_THREAD_RETURN)0);
622 }
623 }
|
624 chuck 1.37
|
625 chip 1.11 // put myself back onto the available list
626 try
|
627 mike 1.2 {
|
628 mday 1.47 if(pool->_dying.value() == 0)
629 {
630 gettimeofday(deadlock_timer, NULL);
631 if( blocking_sem != 0 )
632 blocking_sem->signal();
633
|
634 s.hills 1.53 // If we are not on _running then ~ThreadPool has removed
635 // us and now "owns" our pointer.
|
636 kumpf 1.57 if ( pool->_running.remove((void *)myself) != 0 )
637 {
638 pool->_pool.insert_first(myself);
639 }
640 else
641 {
|
642 kumpf 1.60 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
643 "ThreadPool::_loop: Failed to remove thread from running queue.");
|
644 kumpf 1.57 PEG_METHOD_EXIT();
|
645 mday 1.54 return((PEGASUS_THREAD_RETURN)0);
|
646 kumpf 1.57 }
|
647 mday 1.47 }
648 else
649 {
650 PEG_METHOD_EXIT();
651 return((PEGASUS_THREAD_RETURN)0);
652 }
|
653 mike 1.2 }
|
654 mday 1.52 catch(...)
|
655 mike 1.2 {
|
656 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
657 "ThreadPool::_loop: Adding thread to idle pool failed.");
|
658 kumpf 1.14 PEG_METHOD_EXIT();
|
659 mday 1.47 return((PEGASUS_THREAD_RETURN)0);
|
660 mike 1.2 }
|
661 mday 1.51
|
662 mike 1.2 }
|
663 s.hills 1.49
664 // TODO: Why is this needed? Why not just continue?
|
665 mike 1.2 // wait to be awakend by the thread pool destructor
|
666 mday 1.50 //sleep_sem->wait();
|
667 s.hills 1.49
|
668 mike 1.2 myself->test_cancel();
|
669 kumpf 1.14
670 PEG_METHOD_EXIT();
|
671 mike 1.2 return((PEGASUS_THREAD_RETURN)0);
672 }
673
|
674 kumpf 1.59 Boolean ThreadPool::allocate_and_awaken(void *parm,
675 PEGASUS_THREAD_RETURN \
676 (PEGASUS_THREAD_CDECL *work)(void *),
677 Semaphore *blocking)
|
678 mike 1.2 throw(IPCException)
679 {
|
680 kumpf 1.14 PEG_METHOD_ENTER(TRC_THREAD, "ThreadPool::allocate_and_awaken");
|
681 kumpf 1.57
682 // Allocate_and_awaken will not run if the _dying flag is set.
683 // Once the lock is acquired, ~ThreadPool will not change
684 // the value of _dying until the lock is released.
|
685 mday 1.47
|
686 kumpf 1.57 try
|
687 mday 1.47 {
|
688 kumpf 1.57 if (_dying.value())
|
689 mday 1.47 {
|
690 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
691 "ThreadPool::allocate_and_awaken: ThreadPool is dying(1).");
|
692 kumpf 1.59 // ATTN: Error result has not yet been defined
693 return true;
|
694 kumpf 1.57 }
695 struct timeval now;
696 struct timeval start;
697 gettimeofday(&start, NULL);
698 Thread *th = 0;
699
|
700 mday 1.47 th = _pool.remove_first();
701
|
702 kumpf 1.59 if (th == 0)
|
703 kumpf 1.57 {
704 // will throw an IPCException&
705 _check_deadlock(&start) ;
|
706 mday 1.12
|
707 kumpf 1.57 if(_max_threads == 0 || _current_threads < _max_threads)
708 {
709 th = _init_thread();
710 }
|
711 kumpf 1.59 }
712
713 if (th == 0)
714 {
|
715 kumpf 1.60 // ATTN-DME-P3-20031103: This trace message should not be
716 // be labeled TRC_DISCARDED_DATA, because it does not
717 // necessarily imply that a failure has occurred. However,
718 // this label is being used temporarily to help isolate
719 // the cause of client timeout problems.
720
721 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
722 "ThreadPool::allocate_and_awaken: Insufficient resources: "
723 " pool = %s, running threads = %d, idle threads = %d, dead threads = %d ",
724 _key, _running.count(), _pool.count(), _dead.count());
|
725 kumpf 1.59 return false;
|
726 mday 1.47 }
|
727 chip 1.11
|
728 mike 1.2 // initialize the thread data with the work function and parameters
|
729 kumpf 1.14 Tracer::trace(TRC_THREAD, Tracer::LEVEL4,
|
730 kumpf 1.57 "Initializing thread with work function and parameters: parm = %p",
|
731 kumpf 1.14 parm);
732
|
733 kumpf 1.15 th->delete_tsd("work func");
|
734 chip 1.11 th->put_tsd("work func", NULL,
|
735 mike 1.2 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
736 (void *)work);
|
737 kumpf 1.15 th->delete_tsd("work parm");
|
738 mike 1.2 th->put_tsd("work parm", NULL, sizeof(void *), parm);
|
739 kumpf 1.15 th->delete_tsd("blocking sem");
|
740 mday 1.13 if(blocking != 0 )
|
741 kumpf 1.57 th->put_tsd("blocking sem", NULL, sizeof(Semaphore *), blocking);
|
742 mday 1.47
|
743 kumpf 1.57 // put the thread on the running list
744 _running.insert_first(th);
|
745 mike 1.2
746 // signal the thread's sleep semaphore to awaken it
|
747 kumpf 1.57 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
748 mday 1.47
|
749 kumpf 1.57 if(sleep_sem == 0)
|
750 mike 1.2 {
|
751 kumpf 1.57 th->dereference_tsd();
752 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
753 "ThreadPool::allocate_and_awaken: thread data is corrupted.");
754 PEG_METHOD_EXIT();
755 throw NullPointer();
|
756 mike 1.2 }
|
757 kumpf 1.57 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "Signal thread to awaken");
758 sleep_sem->signal();
759 th->dereference_tsd();
|
760 mike 1.2 }
|
761 kumpf 1.57 catch (...)
|
762 mday 1.47 {
|
763 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
764 "ThreadPool::allocate_and_awaken: Operation Failed.");
765 PEG_METHOD_EXIT();
|
766 kumpf 1.59 // ATTN: Error result has not yet been defined
767 return true;
|
768 mday 1.47 }
|
769 kumpf 1.14 PEG_METHOD_EXIT();
|
770 kumpf 1.59 return true;
|
771 mike 1.2 }
772
773 // caller is responsible for only calling this routine during slack periods
774 // but should call it at least once per _deadlock_detect with the running q
775 // and at least once per _deallocate_wait for the pool q
776
|
777 mday 1.12 Uint32 ThreadPool::kill_dead_threads(void)
|
778 mike 1.2 throw(IPCException)
779 {
|
780 kumpf 1.57 // Since the kill_dead_threads, ThreadPool or allocate_and_awaken
781 // manipulate the threads on the ThreadPool queues, they should never
782 // be allowed to run at the same time.
783
|
784 mday 1.58 // << Thu Oct 23 14:41:02 2003 mdd >>
785 // not true, the queues are thread safe. they are syncrhonized.
|
786 kumpf 1.57
|
787 mday 1.58 auto_int do_not_destruct(&_idle_control);
788
|
789 kumpf 1.57 try
|
790 mday 1.47 {
|
791 kumpf 1.57 if (_dying.value())
|
792 mday 1.47 {
|
793 kumpf 1.57 return 0;
|
794 mday 1.47 }
|
795 mday 1.58
|
796 kumpf 1.57 struct timeval now;
797 gettimeofday(&now, NULL);
798 Uint32 bodies = 0;
799
800 // first go thread the dead q and clean it up as much as possible
801 try
|
802 mday 1.47 {
|
803 mday 1.58 while(_dying.value() == 0 && _dead.count() > 0)
|
804 kumpf 1.57 {
805 Tracer::trace(TRC_THREAD, Tracer::LEVEL4, "ThreadPool:: removing and joining dead thread");
806 Thread *dead = _dead.remove_first();
|
807 mday 1.58
808 if(dead )
809 {
810 dead->join();
811 delete dead;
812 }
|
813 kumpf 1.57 }
814 }
815 catch(...)
816 {
|
817 mday 1.47 }
818
|
819 mday 1.58 if (_dying.value())
820 {
821 return 0;
822 }
|
823 mday 1.47
|
824 kumpf 1.57 DQueue<Thread> * map[2] =
825 {
826 &_pool, &_running
827 };
|
828 chip 1.11
829
|
830 kumpf 1.57 DQueue<Thread> *q = 0;
831 int i = 0;
832 AtomicInt needed(0);
|
833 mday 1.58 Thread *th = 0;
834 internal_dq idq;
835
|
836 kumpf 1.62 #ifdef PEGASUS_KILL_LONG_RUNNING_THREADS
837 // Defining PEGASUS_KILL_LONG_RUNNING_THREADS causes the thread pool
838 // to kill threads that are on the _running queue longer than the
839 // _deadlock_detect time interval specified for the thread pool.
840 // Cancelling long-running threads has proven to be problematic and
841 // may cause a crash depending on the state of the thread when it is
842 // killed. Use this option with care.
843 for( ; i < 2; i++)
844 #else
|
845 kumpf 1.57 for( ; i < 1; i++)
|
846 kumpf 1.31 #endif
|
847 mday 1.52 {
|
848 kumpf 1.57 q = map[i];
849 if(q->count() > 0 )
850 {
|
851 chip 1.11 try
|
852 mike 1.2 {
|
853 kumpf 1.57 q->try_lock();
|
854 mike 1.2 }
|
855 mday 1.18 catch(...)
|
856 mike 1.2 {
|
857 mday 1.18 return bodies;
|
858 mike 1.2 }
|
859 kumpf 1.57
860 struct timeval dt = { 0, 0 };
861 struct timeval *dtp;
|
862 mday 1.58
|
863 kumpf 1.57 th = q->next(th);
864 while (th != 0 )
865 {
866 try
867 {
868 dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
869 }
870 catch(...)
871 {
872 q->unlock();
873 return bodies;
874 }
|
875 chip 1.11
|
876 kumpf 1.57 if(dtp != 0)
877 {
878 memcpy(&dt, dtp, sizeof(struct timeval));
879 }
880 th->dereference_tsd();
881 struct timeval deadlock_timeout;
882 Boolean too_long;
883 if( i == 0)
884 {
885 too_long = check_time(&dt, get_deallocate_wait(&deadlock_timeout));
886 }
887 else
888 {
889 too_long = check_time(&dt, get_deadlock_detect(&deadlock_timeout));
890 }
|
891 mday 1.18
|
892 kumpf 1.57 if( true == too_long)
|
893 mike 1.2 {
|
894 kumpf 1.57 // if we are deallocating from the pool, escape if we are
895 // down to the minimum thread count
896 _current_threads--;
897 if( _current_threads.value() < (Uint32)_min_threads )
898 {
899 if( i == 0)
900 {
901 _current_threads++;
902 th = q->next(th);
903 continue;
904 }
905 else
906 {
907 // we are killing a hung thread and we will drop below the
908 // minimum. create another thread to make up for the one
909 // we are about to kill
910 needed++;
911 }
912 }
|
913 chip 1.11
|
914 kumpf 1.57 th = q->remove_no_lock((void *)th);
|
915 mday 1.58 idq.insert_first((void*)th);
|
916 mike 1.2 }
|
917 kumpf 1.57 th = q->next(th);
|
918 mike 1.2 }
|
919 kumpf 1.57 q->unlock();
920 }
|
921 mday 1.58
922 th = (Thread*)idq.remove_last();
923 while(th != 0)
924 {
925 if( i == 0 )
926 {
927 th->delete_tsd("work func");
928 th->put_tsd("work func", NULL,
929 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
930 (void *)&_undertaker);
931 th->delete_tsd("work parm");
932 th->put_tsd("work parm", NULL, sizeof(void *), th);
933
934 // signal the thread's sleep semaphore to awaken it
935 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
|
936 kumpf 1.61 PEGASUS_ASSERT(sleep_sem != 0);
|
937 mday 1.58
938 bodies++;
939 th->dereference_tsd();
|
940 kumpf 1.59 // Putting thread on _dead queue delays availability to others
941 //_dead.insert_first(th);
|
942 mday 1.58 sleep_sem->signal();
|
943 kumpf 1.59 th->join(); // Note: Clean up the thread here rather than
944 delete th; // leave it sitting unused on the _dead queue
|
945 mday 1.58 th = 0;
946 }
947 else
948 {
949 // deadlocked threads
|
950 kumpf 1.62 struct timeval deadlock_timeout;
951 Tracer::trace(TRC_THREAD, Tracer::LEVEL2,
952 "A thread has run longer than %u seconds and "
953 "will be cancelled.",
954 Uint32(_deadlock_detect.tv_sec));
955 Logger::put_l(Logger::ERROR_LOG, System::CIMSERVER,
956 Logger::SEVERE,
957 "Common.Thread.CANCEL_LONG_RUNNING_THREAD",
958 "A thread has run longer than {0} seconds and "
959 "will be cancelled.",
960 Uint32(_deadlock_detect.tv_sec));
|
961 mday 1.58 th->cancel();
962 delete th;
963 }
964 th = (Thread*)idq.remove_last();
965 }
|
966 mike 1.2 }
|
967 kumpf 1.57
968 while (needed.value() > 0) {
969 _link_pool(_init_thread());
970 needed--;
971 pegasus_sleep(0);
972 }
973 return bodies;
974 }
|
975 mday 1.58 catch (...)
|
976 kumpf 1.57 {
977 }
978 return 0;
|
979 mike 1.2 }
980
|
981 mday 1.12
|
982 mike 1.2 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
983 {
|
984 mday 1.22 // never time out if the interval is zero
985 if(interval && interval->tv_sec == 0 && interval->tv_usec == 0)
986 return false;
987
|
988 mday 1.55 struct timeval now , finish , remaining ;
|
989 mday 1.13 Uint32 usec;
|
990 mday 1.33 pegasus_gettimeofday(&now);
|
991 mday 1.36 /* remove valgrind error */
992 pegasus_gettimeofday(&remaining);
993
|
994 mday 1.13
995 finish.tv_sec = start->tv_sec + interval->tv_sec;
996 usec = start->tv_usec + interval->tv_usec;
997 finish.tv_sec += (usec / 1000000);
998 usec %= 1000000;
999 finish.tv_usec = usec;
1000
1001 if ( timeval_subtract(&remaining, &finish, &now) )
|
1002 mike 1.2 return true;
1003 else
1004 return false;
1005 }
1006
1007 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
1008 {
|
1009 mday 1.30 exit_thread((PEGASUS_THREAD_RETURN)1);
1010 return (PEGASUS_THREAD_RETURN)1;
|
1011 mike 1.2 }
|
1012 mday 1.19
1013
1014 void ThreadPool::_sleep_sem_del(void *p)
1015 {
1016 if(p != 0)
1017 {
1018 delete (Semaphore *)p;
1019 }
1020 }
1021
1022 void ThreadPool::_check_deadlock(struct timeval *start) throw(Deadlock)
1023 {
1024 if (true == check_time(start, &_deadlock_detect))
1025 throw Deadlock(pegasus_thread_self());
1026 return;
1027 }
1028
1029
1030 Boolean ThreadPool::_check_deadlock_no_throw(struct timeval *start)
1031 {
1032 return(check_time(start, &_deadlock_detect));
1033 mday 1.19 }
1034
1035 Boolean ThreadPool::_check_dealloc(struct timeval *start)
1036 {
1037 return(check_time(start, &_deallocate_wait));
1038 }
1039
1040 Thread *ThreadPool::_init_thread(void) throw(IPCException)
1041 {
1042 Thread *th = (Thread *) new Thread(_loop, this, false);
1043 // allocate a sleep semaphore and pass it in the thread context
1044 // initial count is zero, loop function will sleep until
1045 // we signal the semaphore
1046 Semaphore *sleep_sem = (Semaphore *) new Semaphore(0);
1047 th->put_tsd("sleep sem", &_sleep_sem_del, sizeof(Semaphore), (void *)sleep_sem);
1048
1049 struct timeval *dldt = (struct timeval *) ::operator new(sizeof(struct timeval));
|
1050 mday 1.35 pegasus_gettimeofday(dldt);
1051
|
1052 mday 1.19 th->put_tsd("deadlock timer", thread_data::default_delete, sizeof(struct timeval), (void *)dldt);
1053 // thread will enter _loop(void *) and sleep on sleep_sem until we signal it
|
1054 chuck 1.37
|
1055 kumpf 1.59 if (!th->run())
1056 {
1057 delete th;
1058 return 0;
1059 }
|
1060 mday 1.19 _current_threads++;
1061 pegasus_yield();
1062
1063 return th;
1064 }
1065
1066 void ThreadPool::_link_pool(Thread *th) throw(IPCException)
1067 {
1068 if(th == 0)
|
1069 kumpf 1.57 {
1070 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1071 "ThreadPool::_link_pool: Thread pointer is null.");
|
1072 mday 1.19 throw NullPointer();
|
1073 kumpf 1.57 }
|
1074 mday 1.47 try
1075 {
1076 _pool.insert_first(th);
1077 }
1078 catch(...)
1079 {
|
1080 kumpf 1.57 Tracer::trace(TRC_DISCARDED_DATA, Tracer::LEVEL2,
1081 "ThreadPool::_link_pool: _pool.insert_first failed.");
|
1082 mday 1.47 }
|
1083 mday 1.19 }
|
1084 mike 1.2
1085
1086 PEGASUS_NAMESPACE_END
1087
|