1 mike 1.2 //%/////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM,
4 // Compaq Computer Corporation
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
14 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
16 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
19 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 mike 1.2 //==============================================================================
23 //
24 // Author: Mike Day (mdday@us.ibm.com)
25 //
26 // Modified By: Rudy Schuet (rudy.schuet@compaq.com) 11/12/01
27 // added nsk platform support
28 //
29 //%/////////////////////////////////////////////////////////////////////////////
30
31 #include "Thread.h"
32 #include <Pegasus/Common/IPC.h>
33
34 #if defined(PEGASUS_OS_TYPE_WINDOWS)
35 # include "ThreadWindows.cpp"
36 #elif defined(PEGASUS_OS_TYPE_UNIX)
37 # include "ThreadUnix.cpp"
38 #elif defined(PEGASUS_OS_TYPE_NSK)
39 # include "ThreadNsk.cpp"
40 #else
41 # error "Unsupported platform"
42 #endif
43 mike 1.2
44 PEGASUS_NAMESPACE_BEGIN
45
46 void thread_data::default_delete(void * data)
47 {
48 if( data != NULL)
49 ::operator delete(data);
50 }
51
52 Boolean Thread::_signals_blocked = false;
53
54 // for non-native implementations
55 #ifndef PEGASUS_THREAD_CLEANUP_NATIVE
56 void Thread::cleanup_push( void (*routine)(void *), void *parm) throw(IPCException)
57 {
58 cleanup_handler *cu = new cleanup_handler(routine, parm);
59 try
60 {
61 _cleanup.insert_first(cu);
62 }
|
63 mike 1.6 catch(IPCException&)
|
64 mike 1.2 {
65 delete cu;
66 throw;
67 }
68 return;
69 }
70
71 void Thread::cleanup_pop(Boolean execute) throw(IPCException)
72 {
73 cleanup_handler *cu ;
74 try
75 {
76 cu = _cleanup.remove_first() ;
77 }
|
78 mike 1.6 catch(IPCException&)
|
79 mike 1.2 {
80 PEGASUS_ASSERT(0);
81 }
82 if(execute == true)
83 cu->execute();
84 delete cu;
85 }
86
87 #endif
88
89
90 //thread_data *Thread::put_tsd(Sint8 *key, void (*delete_func)(void *), Uint32 size, void *value) throw(IPCException)
91
92
93 #ifndef PEGASUS_THREAD_EXIT_NATIVE
94 void Thread::exit_self(PEGASUS_THREAD_RETURN exit_code)
95 {
96 // execute the cleanup stack and then return
97 while( _cleanup.count() )
98 {
99 try
100 mike 1.2 {
101 cleanup_pop(true);
102 }
|
103 mike 1.6 catch(IPCException&)
|
104 mike 1.2 {
105 PEGASUS_ASSERT(0);
106 break;
107 }
108 }
109 _exit_code = exit_code;
110 exit_thread(exit_code);
|
111 mday 1.4 _handle.thid = 0;
|
112 mike 1.2 }
113
114
115 #endif
116
117 ThreadPool::ThreadPool(Sint16 initial_size,
118 Sint8 *key,
119 Sint16 min,
120 Sint16 max,
121 struct timeval & alloc_wait,
122 struct timeval & dealloc_wait,
123 struct timeval & deadlock_detect)
124 : _max_threads(max), _min_threads(min),
125 _current_threads(0), _waiters(initial_size),
126 _pool_sem(0), _pool(true), _running(true),
127 _dead(true), _dying(0)
128 {
129 _allocate_wait.tv_sec = alloc_wait.tv_sec;
130 _allocate_wait.tv_usec = alloc_wait.tv_usec;
131 _deallocate_wait.tv_sec = dealloc_wait.tv_sec;
132 _deallocate_wait.tv_usec = dealloc_wait.tv_usec;
133 mike 1.2 _deadlock_detect.tv_sec = deadlock_detect.tv_sec;
134 _deadlock_detect.tv_usec = deadlock_detect.tv_usec;
135 memset(_key, 0x00, 17);
136 if(key != 0)
137 strncpy(_key, key, 16);
138 if(_max_threads < initial_size)
139 _max_threads = initial_size;
140 if(_min_threads > initial_size)
141 _min_threads = initial_size;
142
143 int i;
144 for(i = 0; i < initial_size; i++)
145 {
146 _link_pool(_init_thread());
147 }
148 }
149
150
151
152 ThreadPool::~ThreadPool(void)
153 {
154 mike 1.2 _dying++;
155 Thread *th = _pool.remove_first();
156 while(th != 0)
157 {
158 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
159
160 if(sleep_sem == 0)
161 {
162 th->dereference_tsd();
163 throw NullPointer();
164 }
165
166 sleep_sem->signal();
167 sleep_sem->signal();
168 th->dereference_tsd();
169 // signal the thread's sleep semaphore
170 th->cancel();
171 th->join();
172 th->empty_tsd();
173 delete th;
174 th = _pool.remove_first();
175 mike 1.2 }
176
177 th = _running.remove_first();
178 while(th != 0)
179 {
180 // signal the thread's sleep semaphore
181 th->cancel();
182 th->join();
183 th->empty_tsd();
184 delete th;
185 th = _running.remove_first();
186 }
187
188 th = _dead.remove_first();
189 while(th != 0)
190 {
191 // signal the thread's sleep semaphore
192 th->cancel();
193 th->join();
194 th->empty_tsd();
195 delete th;
196 mike 1.2 th = _dead.remove_first();
197 }
198 }
199
200 // make this static to the class
201 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL ThreadPool::_loop(void *parm)
202 {
203 Thread *myself = (Thread *)parm;
204 if(myself == 0)
205 throw NullPointer();
206 ThreadPool *pool = (ThreadPool *)myself->get_parm();
207 if(pool == 0 )
208 throw NullPointer();
|
209 mike 1.5 Semaphore *sleep_sem = 0;
210 struct timeval *deadlock_timer = 0;
|
211 mike 1.2
212 try
213 {
214 sleep_sem = (Semaphore *)myself->reference_tsd("sleep sem");
215 myself->dereference_tsd();
216 deadlock_timer = (struct timeval *)myself->reference_tsd("deadlock timer");
217 myself->dereference_tsd();
218 }
|
219 mike 1.6 catch(IPCException &)
|
220 mike 1.2 {
221 myself->exit_self(0);
222 }
223 if(sleep_sem == 0 || deadlock_timer == 0)
224 throw NullPointer();
225
226 while(pool->_dying < 1)
227 {
228 sleep_sem->wait();
229 // when we awaken we reside on the running queue, not the pool queue
230 if(pool->_dying > 0)
231 break;
232
233
|
234 mike 1.5 PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *_work)(void *) = 0;
235 void *parm = 0;
|
236 mike 1.2
237 try
238 {
239 _work = (PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)) \
240 myself->reference_tsd("work func");
241 myself->dereference_tsd();
242 parm = myself->reference_tsd("work parm");
243 myself->dereference_tsd();
244 }
|
245 mike 1.6 catch(IPCException &)
|
246 mike 1.2 {
247 myself->exit_self(0);
248 }
249
250 if(_work == 0)
251 throw NullPointer();
252 gettimeofday(deadlock_timer, NULL);
253 _work(parm);
254
255 // put myself back onto the available list
256 try
257 {
258 pool->_running.remove((void *)myself);
259 pool->_link_pool(myself);
260 }
|
261 mike 1.6 catch(IPCException &)
|
262 mike 1.2 {
263 myself->exit_self(0);
264 }
265 }
266 // wait to be awakend by the thread pool destructor
267 sleep_sem->wait();
268 myself->test_cancel();
269 myself->exit_self(0);
270 return((PEGASUS_THREAD_RETURN)0);
271 }
272
273
274 void ThreadPool::allocate_and_awaken(void *parm,
275 PEGASUS_THREAD_RETURN \
276 (PEGASUS_THREAD_CDECL *work)(void *))
277 throw(IPCException)
278 {
279 struct timeval start;
280 gettimeofday(&start, NULL);
281
282 Thread *th = _pool.remove_first();
283 mike 1.2
284 while (th == 0 && _dying < 1)
285 {
286 try // we couldn't get a free thread from the pool
287 {
288 // wait for the right interval and try again
289 while(th == 0 && _dying < 1)
290 {
291 _check_deadlock(&start);
292 Uint32 interval = _allocate_wait.tv_sec * 1000;
293 if(_allocate_wait.tv_usec > 0)
294 interval += (_deallocate_wait.tv_usec / 1000);
295 // will throw a timeout if no thread comes free
296 _pool_sem.time_wait(interval);
297 th = _pool.remove_first();
298 }
299 }
|
300 mike 1.6 catch(TimeOut &)
|
301 mike 1.2 {
302 if(_current_threads < _max_threads)
303 {
304 th = _init_thread();
305 break;
306 }
307 }
308 // will throw a Deadlock Exception before falling out of the loop
309
310 _check_deadlock(&start);
311
312 } // while th == null
313
314 if(_dying < 1)
315 {
316 // initialize the thread data with the work function and parameters
317 th->remove_tsd("work func");
318 th->put_tsd("work func", NULL,
319 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
320 (void *)work);
321 th->remove_tsd("work parm");
322 mike 1.2 th->put_tsd("work parm", NULL, sizeof(void *), parm);
323
324 // put the thread on the running list
325 _running.insert_first(th);
326
327 // signal the thread's sleep semaphore to awaken it
328 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
329
330 if(sleep_sem == 0)
331 {
332 th->dereference_tsd();
333 throw NullPointer();
334 }
335
336 sleep_sem->signal();
337 th->dereference_tsd();
338 }
339 else
340 _pool.insert_first(th);
341 }
342
343 mike 1.2 // caller is responsible for only calling this routine during slack periods
344 // but should call it at least once per _deadlock_detect with the running q
345 // and at least once per _deallocate_wait for the pool q
346
347 void ThreadPool::kill_dead_threads(void)
348 throw(IPCException)
349 {
350 struct timeval now;
351 gettimeofday(&now, NULL);
352
353
354 // first go thread the dead q and clean it up as much as possible
355 while(_dead.count() > 0)
356 {
357 Thread *dead = _dead.remove_first();
358 if(dead == 0)
359 throw NullPointer();
360 if(dead->_handle.thid != 0)
361 {
362 dead->detach();
363 destroy_thread(dead->_handle.thid, 0);
364 mike 1.2 dead->_handle.thid = 0;
365 while(dead->_cleanup.count() )
366 {
367 // this may throw a permission exception,
368 // which I will remove from the code prior to stabilizing
369 dead->cleanup_pop(true);
370 }
371 }
372 delete dead;
373 }
374
375 DQueue<Thread> * map[2] =
376 {
377 &_pool, &_running
378 };
379
380
381 DQueue<Thread> *q = 0;
382 int i = 0;
383 AtomicInt needed(0);
384
385 mike 1.2 for( q = map[i] ; i < 2; i++, q = map[i])
386 {
387 if(q->count() > 0 )
388 {
389 try
390 {
391 q->try_lock();
392 }
|
393 mike 1.6 catch(AlreadyLocked &)
|
394 mike 1.2 {
395 q++;
396 continue;
397 }
398
399 struct timeval dt = { 0, 0 };
400 struct timeval *dtp;
401 Thread *th = 0;
402 th = q->next(th);
403 while (th != 0 )
404 {
405 try
406 {
407 dtp = (struct timeval *)th->try_reference_tsd("deadlock timer");
408 }
|
409 mike 1.6 catch(AlreadyLocked &)
|
410 mike 1.2 {
411 th = q->next(th);
412 continue;
413 }
414
415 if(dtp != 0)
416 {
417 memcpy(&dt, dtp, sizeof(struct timeval));
418
419 }
420 th->dereference_tsd();
421 struct timeval deadlock_timeout;
422 if( true == check_time(&dt, get_deadlock_detect(&deadlock_timeout) ))
423 {
424 // if we are deallocating from the pool, escape if we are
425 // down to the minimum thread count
426 if( _current_threads.value() <= (Uint32)_min_threads )
427 {
428 if( i == 1)
429 {
430 th = q->next(th);
431 mike 1.2 continue;
432 }
433 else
434 {
435 // we are killing a hung thread and we will drop below the
436 // minimum. create another thread to make up for the one
437 // we are about to kill
438 needed++;
439 }
440 }
441
442 th = q->remove_no_lock((void *)th);
443
444 if(th != 0)
445 {
446 th->remove_tsd("work func");
447 th->put_tsd("work func", NULL,
448 sizeof( PEGASUS_THREAD_RETURN (PEGASUS_THREAD_CDECL *)(void *)),
449 (void *)&_undertaker);
450 th->remove_tsd("work parm");
451 th->put_tsd("work parm", NULL, sizeof(void *), th);
452 mike 1.2
453 // signal the thread's sleep semaphore to awaken it
454 Semaphore *sleep_sem = (Semaphore *)th->reference_tsd("sleep sem");
455
456 if(sleep_sem == 0)
457 {
458 th->dereference_tsd();
459 throw NullPointer();
460 }
461 // put the thread on the dead list
462 _dead.insert_first(th);
463 sleep_sem->signal();
464 th->dereference_tsd();
465 th = 0;
466 }
467 }
468 th = q->next(th);
469 }
470 q->unlock();
471 while (needed.value() > 0)
472 {
473 mike 1.2 _link_pool(_init_thread());
474 needed--;
475 }
476 }
477 }
478
479
480 return;
481 }
482
483 Boolean ThreadPool::check_time(struct timeval *start, struct timeval *interval)
484 {
485 struct timeval now;
486 gettimeofday(&now, NULL);
487 if( (now.tv_sec - start->tv_sec) > interval->tv_sec ||
488 (((now.tv_sec - start->tv_sec) == interval->tv_sec) &&
489 ((now.tv_usec - start->tv_usec) >= interval->tv_usec ) ) )
490 return true;
491 else
492 return false;
493 }
494 mike 1.2
495
496 PEGASUS_THREAD_RETURN ThreadPool::_undertaker( void *parm )
497 {
498 Thread *myself = reinterpret_cast<Thread *>(parm);
499 if(myself != 0)
500 {
501 myself->detach();
502 myself->_handle.thid = 0;
503 myself->cancel();
504 myself->test_cancel();
505 myself->exit_self(0);
506 }
507 return((PEGASUS_THREAD_RETURN)0);
508 }
509
510
511 PEGASUS_NAMESPACE_END
512
|