1 karl 1.41 //%2003////////////////////////////////////////////////////////////////////////
|
2 kumpf 1.34 //
|
3 karl 1.41 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Development
4 // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
|
7 mike 1.2 //
8 // Permission is hereby granted, free of charge, to any person obtaining a copy
9 // of this software and associated documentation files (the "Software"), to
10 // deal in the Software without restriction, including without limitation the
11 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
12 // sell copies of the Software, and to permit persons to whom the Software is
13 // furnished to do so, subject to the following conditions:
|
14 kumpf 1.34 //
|
15 mike 1.2 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
16 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
17 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
18 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
19 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 //
24 //==============================================================================
25 //
26 // Author: Mike Day (mdday@us.ibm.com)
27 //
28 // Modified By:
29 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #ifndef Pegasus_DQueue_h
33 #define Pegasus_DQueue_h
34
35 #include <Pegasus/Common/IPC.h>
|
36 kumpf 1.36 #include <Pegasus/Common/Linkage.h>
|
37 mike 1.2
38 PEGASUS_NAMESPACE_BEGIN
39
|
40 tony 1.38 template<class L> class DQueue : public internal_dq
|
41 mike 1.2 {
|
42 mday 1.29 public:
43 static void *operator new(size_t size);
44 static void operator delete(void *dead, size_t size);
|
45 mday 1.20
|
46 mike 1.2 private:
47 Mutex *_mutex;
|
48 mday 1.18 AtomicInt *_actual_count;
|
49 mday 1.20 DQueue *_dq_next;
|
50 mday 1.23 static DQueue<L> *_headOfFreeList;
|
51 mday 1.20 static const int BLOCK_SIZE;
52 static Mutex _alloc_mut;
|
53 mday 1.18
|
54 mike 1.2 public:
|
55 mday 1.18 typedef internal_dq Base;
|
56 mday 1.23 DQueue(void);
57 DQueue(Boolean head) ;
|
58 mike 1.2
|
59 mday 1.23 virtual ~DQueue() ;
|
60 mike 1.2
|
61 mday 1.23 void lock(void) throw(IPCException);
62 void unlock(void) throw(IPCException);
63 void try_lock() throw(IPCException);
64
65 void insert_first_no_lock(L *element) throw(IPCException);
66 void insert_first(L *element) throw(IPCException) ;
67 void insert_last_no_lock(L *element) throw(IPCException);
68 void insert_last(L *element) throw(IPCException);
69 void empty_list( void ) throw(IPCException) ;
70 L *remove_first ( void ) throw(IPCException) ;
71 L *remove_last ( void ) throw(IPCException) ;
72 L *remove_no_lock(const void *key) throw(IPCException);
73 L *remove_no_lock(const L *key) throw(IPCException);
74 L *remove(const void *key) throw(IPCException);
75 L *remove(const L *key) throw(IPCException);
76 L *reference(const void *key) throw(IPCException);
77 L *reference(const L *key);
78 L *next( const void * ref) throw(IPCException);
79 L *prev( const void *ref) throw(IPCException);
80 Boolean exists(const void *key) throw(IPCException) ;
81 Boolean exists(const L *key) throw(IPCException);
|
82 mday 1.18
83 Uint32 count(void) { return _actual_count->value() ; }
|
84 mday 1.37 Uint32 size(void) { return _actual_count->value() ; }
85
86
|
87 mike 1.2 } ;
88
|
89 mday 1.22
|
90 mday 1.20
|
91 tony 1.38 template<class L> class AsyncDQueue: public internal_dq
|
92 mike 1.2 {
|
93 mday 1.20
|
94 mday 1.29 public:
95 static void * operator new(size_t size);
96 static void operator delete(void *, size_t);
|
97 mday 1.20
|
98 mike 1.2 private: // asyncdqueue
99
100 Mutex *_cond;
101 Condition *_slot;
102 Condition *_node;
103 AtomicInt *_actual_count;
104 AtomicInt *_disallow;
105 AtomicInt * _capacity;
|
106 mday 1.20 AsyncDQueue *_dq_next;
|
107 mike 1.2
|
108 mday 1.20 static AsyncDQueue *_headOfFreeList;
109 static const int BLOCK_SIZE;
110 static Mutex _alloc_mut;
111
|
112 mday 1.23 void _insert_prep(void) throw(IPCException);
113 void _insert_recover(void) throw(IPCException);
114 void _unlink_prep(void) throw(IPCException);
115 void _unlink_recover(void) throw(IPCException);
116 L *_remove_no_lock(const void *key) throw(IPCException);
117 L *_remove_no_lock(const L *key) throw(IPCException);
|
118 mday 1.6
|
119 mike 1.2 public:
120
121 typedef internal_dq Base;
122
|
123 mday 1.23 AsyncDQueue(void) ;
124 AsyncDQueue(Boolean head, Uint32 capacity );
125 virtual ~AsyncDQueue(void);
126 void shutdown_queue(void);
127 Boolean is_full(void);
128 Boolean is_empty(void);
129 Boolean is_shutdown(void);
130 void try_lock(PEGASUS_THREAD_TYPE myself) throw(IPCException);
131 void lock(PEGASUS_THREAD_TYPE myself) throw(IPCException);
132 void unlock(void);
|
133 mday 1.30 void signal_slot(void) throw(IPCException);
134 void signal_node(void) throw(IPCException);
135 Condition *get_node_cond(void);
136 Condition *get_slot_cond(void);
|
137 mday 1.23 void wait_for_node(void) throw(IPCException);
138 void set_capacity(Uint32 capacity) throw(IPCException);
139 Uint32 get_capacity(void);
140 void insert_first(L *element) throw(IPCException);
141 void insert_first_wait(L *element) throw(IPCException);
142 void insert_last(L *element) throw(IPCException);
143 void insert_last_wait(L *element) throw(IPCException);
144 void empty_list(void) throw(IPCException);
145 L *remove_first(void) throw(IPCException);
146 L *remove_first_wait(void) throw(IPCException);
147 L *remove_last(void) throw(IPCException);
148 L *remove_last_wait(void) throw(IPCException);
149 L *remove(const void *key) throw(IPCException);
150 L *remove(const L *key) throw(IPCException);
151 L *remove_no_lock(const void *key) throw(IPCException);
152 L *remove_no_lock(const L *key) throw(IPCException);
153 L *remove_wait(const void *key) throw(IPCException);
154 L *next(const L *ref) throw(IPCException);
155 L *prev(const L *ref) throw(IPCException);
156 L *reference(const void *key) throw(IPCException);
157 L *reference(const L *key) throw(IPCException);
158 mday 1.23 Uint32 count(void) ;
|
159 mday 1.37 Uint32 size(void) ;
160
|
161 mike 1.2 };
162
|
163 keith.petley 1.39 template<class L> DQueue<L> * DQueue<L>::_headOfFreeList = 0;
|
164 mday 1.29 template<class L> const int DQueue<L>::BLOCK_SIZE = 200;
|
165 mday 1.25 template<class L> Mutex DQueue<L>::_alloc_mut;
166
|
167 mday 1.29 template<class L> void *DQueue<L>::operator new(size_t size)
168 {
169 if (size != sizeof(DQueue<L>))
170 return ::operator new(size);
171
172 _alloc_mut.lock(pegasus_thread_self());
173
174 DQueue<L> *p = _headOfFreeList;
175 if(p)
176 _headOfFreeList = p->_dq_next;
177 else
178 {
179 DQueue<L> * newBlock =
180 static_cast<DQueue<L> *>(::operator new(BLOCK_SIZE * sizeof(DQueue<L>)));
181 int i;
182 for( i = 1; i < BLOCK_SIZE - 1; ++i)
183 newBlock[i]._dq_next = &newBlock[i + 1];
184 newBlock[BLOCK_SIZE - 1]._dq_next = 0;
185
186 p = newBlock;
187 _headOfFreeList = &newBlock[1];
188 mday 1.29 }
189 _alloc_mut.unlock();
190
191 return p;
192 }
193
194 template<class L> void DQueue<L>::operator delete(void *dead, size_t size)
195 {
196 if(dead == 0)
197 return;
198 if(size != sizeof(DQueue<L>))
199 {
200 ::operator delete(dead);
201 return;
202 }
203 DQueue<L> *p = static_cast<DQueue<L> *>(dead);
204 _alloc_mut.lock(pegasus_thread_self());
205 p->_dq_next = _headOfFreeList;
206 _headOfFreeList = p;
207 _alloc_mut.unlock();
208 }
|
209 mday 1.25
|
210 keith.petley 1.39 template<class L> AsyncDQueue<L> * AsyncDQueue<L>::_headOfFreeList =0;
|
211 mday 1.25 template<class L> const int AsyncDQueue<L>::BLOCK_SIZE = 20;
212 template<class L> Mutex AsyncDQueue<L>::_alloc_mut;
213
|
214 mday 1.29 template<class L> void * AsyncDQueue<L>::operator new(size_t size)
215 {
216 if (size != sizeof(AsyncDQueue<L>))
217 return ::operator new(size);
218
219 _alloc_mut.lock(pegasus_thread_self());
220
221 AsyncDQueue<L> *p = _headOfFreeList;
222 if(p)
223 _headOfFreeList = p->_dq_next;
224 else
225 {
226 AsyncDQueue<L> * newBlock =
227 static_cast<AsyncDQueue<L> *>(::operator new(BLOCK_SIZE * sizeof(AsyncDQueue<L>)));
228 int i;
229 for( i = 1; i < BLOCK_SIZE - 1; ++i)
230 newBlock[i]._dq_next = &newBlock[i + 1];
231 newBlock[BLOCK_SIZE - 1]._dq_next = 0;
232
233 p = newBlock;
234 _headOfFreeList = &newBlock[1];
235 mday 1.29 }
236 _alloc_mut.unlock();
237
238 return p;
239 }
240
241 template<class L> void AsyncDQueue<L>::operator delete(void *deadObject, size_t size)
242 {
243 if(deadObject == 0)
244 return;
245 if(size != sizeof(AsyncDQueue<L>))
246 {
247 ::operator delete(deadObject);
248 return;
249 }
250 AsyncDQueue<L> *p = static_cast<AsyncDQueue<L> *>(deadObject);
251 _alloc_mut.lock(pegasus_thread_self());
252 p->_dq_next = _headOfFreeList;
253 _headOfFreeList = p;
254 _alloc_mut.unlock();
255 }
|
256 mday 1.24
257 template<class L> DQueue<L>::DQueue(void)
258 : Base(false)
259 {
260 _mutex = 0;
261 _actual_count = 0;
262 }
263
264 template<class L> DQueue<L>::DQueue(Boolean head)
265 : Base(head)
266 {
267 if(head == true)
268 {
269 _mutex = new Mutex();
270 _actual_count = new AtomicInt(0);
271 }
272
273 else
274 {
275 _mutex = 0;
276 _actual_count = 0;
277 mday 1.24 }
278 }
279
280
281 template<class L> DQueue<L>::~DQueue()
282 {
283 if(_mutex != 0) delete _mutex;
284 if (_actual_count != 0) delete _actual_count;
285 }
286
287
288 template<class L> void DQueue<L>::lock(void) throw(IPCException)
289 { _mutex->lock(pegasus_thread_self()); }
290
291 template<class L> void DQueue<L>::unlock(void) throw(IPCException)
292 { _mutex->unlock() ; }
293
294 template<class L> void DQueue<L>::try_lock() throw(IPCException)
295 { _mutex->try_lock(pegasus_thread_self()); }
296
297 template<class L> void DQueue<L>::insert_first_no_lock(L *element) throw(IPCException)
298 mday 1.24 {
299 if( pegasus_thread_self() != _mutex->get_owner())
300 throw Permission(pegasus_thread_self());
301 Base::insert_first(static_cast<void *>(element));
302 (*_actual_count)++;
303
304 return;
305 }
306
307 template<class L> void DQueue<L>::insert_first(L *element) throw(IPCException)
308 {
309 if(element == 0)
310 return;
311 _mutex->lock(pegasus_thread_self());
312 Base::insert_first(static_cast<void *>(element));
313 (*_actual_count)++;
314 _mutex->unlock();
315 }
316
317 template<class L> void DQueue<L>::insert_last_no_lock(L *element) throw(IPCException)
318 {
319 mday 1.24 if( pegasus_thread_self() != _mutex->get_owner())
320 throw Permission(pegasus_thread_self());
321 Base::insert_last(static_cast<void *>(element));
322 (*_actual_count)++;
323 return;
324 }
325
326
327 template<class L> void DQueue<L>::insert_last(L *element) throw(IPCException)
328 {
329 if(element == 0)
330 return;
331 _mutex->lock(pegasus_thread_self());
332 Base::insert_last(static_cast<void *>(element));
333 (*_actual_count)++;
334 _mutex->unlock();
335 }
336
337
338 template<class L> void DQueue<L>::empty_list( void ) throw(IPCException)
339 {
340 mday 1.24 if( Base::count() > 0) {
341 _mutex->lock(pegasus_thread_self());
342 Base::empty_list();
343 (*_actual_count) = 0;
344 _mutex->unlock();
345 }
346 return;
347 }
348
349
350 template<class L> L * DQueue<L>::remove_first ( void ) throw(IPCException)
351 {
352 L *ret = 0;
353
354 if( _actual_count->value() )
355 {
356 _mutex->lock(pegasus_thread_self());
357 ret = static_cast<L *>(Base::remove_first());
358 if( ret != 0 )
359 (*_actual_count)--;
360 _mutex->unlock();
361 mday 1.24 }
362 return ret;
363 }
364
365 template<class L> L *DQueue<L>::remove_last ( void ) throw(IPCException)
366 {
367 L * ret = 0;
368 if( _actual_count->value() )
369 {
370 _mutex->lock(pegasus_thread_self());
371 ret = static_cast<L *>(Base::remove_last());
372 if( ret != 0 )
373 (*_actual_count)--;
374 _mutex->unlock();
375 }
376 return ret;
377 }
378
379 template<class L> L *DQueue<L>::remove_no_lock(const void *key) throw(IPCException)
380 {
381 if(key == 0 )
382 mday 1.24 return 0;
383 if( pegasus_thread_self() != _mutex->get_owner())
384 throw Permission(pegasus_thread_self());
385
386 if (_actual_count->value() )
387 {
388 L *ret = static_cast<L *>(Base::next(0));
389 while( ret != 0 )
390 {
391 if (ret->operator==(key))
392 {
|
393 kumpf 1.31 ret = static_cast<L *>(Base::remove(ret));
|
394 mday 1.24 if( ret != 0 )
395 (*_actual_count)--;
396 return ret;
397 }
398 ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
399 }
400 }
401 return 0 ;
402 }
403
404 template<class L> L * DQueue<L>::remove_no_lock(const L *key) throw(IPCException)
405 {
406 if(key == 0 )
407 return 0;
408 if( pegasus_thread_self() != _mutex->get_owner())
409 throw Permission(pegasus_thread_self());
410
411 if (_actual_count->value() )
412 {
413 L *ret = static_cast<L *>(Base::next(0));
414 while( ret != 0 )
415 mday 1.24 {
|
416 kumpf 1.31 if (ret->operator==(*key))
|
417 mday 1.24 {
|
418 kumpf 1.31 ret = static_cast<L *>(Base::remove(static_cast<const void *>(ret)));
|
419 mday 1.24 if( ret != 0 )
420 (*_actual_count)--;
421 return ret;
422 }
423 ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
424 }
425 }
426 return 0 ;
427 }
428
429
430 template<class L> L * DQueue<L>::remove(const void *key) throw(IPCException)
431 {
432
433 L *ret = 0;
434
435 if( _actual_count->value() > 0 )
436 {
437 _mutex->lock(pegasus_thread_self());
438 ret = DQueue<L>::remove_no_lock(key);
439 _mutex->unlock( );
440 mday 1.24 }
441 return(ret);
442 }
443
444 template<class L>L *DQueue<L>::remove(const L *key) throw(IPCException)
445 {
446 L *ret = 0;
447
448 if( _actual_count->value() > 0 )
449 {
450 _mutex->lock(pegasus_thread_self());
451 ret = DQueue<L>::remove_no_lock(key);
452 _mutex->unlock();
453 }
454 return(ret);
455 }
456
457 template<class L> L *DQueue<L>::reference(const void *key) throw(IPCException)
458 {
459 if(key == 0)
460 return 0;
461 mday 1.24
462 if( pegasus_thread_self() != _mutex->get_owner())
463 throw Permission(pegasus_thread_self());
464
465 if( _actual_count->value() )
466 {
467 L *ret = static_cast<L *>(Base::next(0));
468 while(ret != 0)
469 {
470 if(ret->operator==(key))
471 return ret;
472 ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
473 }
474 }
475 return(0);
476 }
477
478 template<class L> L *DQueue<L>::reference(const L *key)
479 {
480 if(key == 0)
481 return 0;
482 mday 1.24
483 if( pegasus_thread_self() != _mutex->get_owner())
484 throw Permission(pegasus_thread_self());
485
486 if( _actual_count->value() )
487 {
488 L *ret = static_cast<L *>(Base::next(0));
489 while(ret != 0)
490 {
|
491 kumpf 1.32 if(ret->operator==(*key))
|
492 mday 1.24 return ret;
493 ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
494 }
495 }
496 return(0);
497 }
498
499 template<class L> L * DQueue<L>::next( const void * ref) throw(IPCException)
500 {
501 if (_mutex->get_owner() != pegasus_thread_self())
502 throw Permission(pegasus_thread_self()) ;
503 return static_cast<L *>(Base::next( ref ));
504 }
505
506 template<class L> L *DQueue<L>::prev( const void *ref) throw(IPCException)
507 {
508 if (_mutex->get_owner() != pegasus_thread_self())
509 throw Permission(pegasus_thread_self());
|
510 keith.petley 1.39 return static_cast<L *>(Base::prev( ref ));
|
511 mday 1.24 }
512
513 template<class L> Boolean DQueue<L>::exists(const void *key) throw(IPCException)
514 {
515 if(key == 0)
516 return false;
517
518 Boolean ret = false;
519 if(_actual_count->value() > 0)
520 {
521 _mutex->lock(pegasus_thread_self());
522 ret = DQueue<L>::reference(key);
523 _mutex->unlock();
524 }
525 return(ret);
526 }
527
528 template<class L> Boolean DQueue<L>::exists(const L *key) throw(IPCException)
529 {
530 if(key == 0)
531 return false;
532 mday 1.24
533 Boolean ret = false;
534 if(_actual_count->value() > 0)
535 {
|
536 mday 1.35 _mutex->lock(pegasus_thread_self());
|
537 mday 1.24 ret = DQueue<L>::reference(key);
538 _mutex->unlock();
539 }
540 return(ret);
541 }
542
543 template<class L> void AsyncDQueue<L>::_insert_prep(void) throw(IPCException)
544 {
545 if(_disallow->value() > 0)
546 {
547 unlock();
548 throw ListClosed();
549 }
550 _slot->lock_object(pegasus_thread_self());
551 while(true == is_full())
552 {
553 _slot->unlocked_wait(pegasus_thread_self());
554 if(_disallow->value() > 0)
555 {
556 unlock();
557 throw ListClosed();
558 mday 1.24 }
559 }
560 }
561
562 template<class L> void AsyncDQueue<L>::_insert_recover(void) throw(IPCException)
563 {
564 _node->unlocked_signal(pegasus_thread_self());
565 (*_actual_count)++;
566 unlock();
567 }
568
569 template<class L> void AsyncDQueue<L>::_unlink_prep(void) throw(IPCException)
570 {
571
572 if(_disallow->value() > 0)
573 {
574 unlock();
575 throw ListClosed();
576 }
577 _node->lock_object(pegasus_thread_self());
578 while(true == is_empty())
579 mday 1.24 {
580 _node->unlocked_wait(pegasus_thread_self());
581 if(_disallow->value() > 0)
582 {
583 unlock();
584 throw ListClosed();
585 }
586 }
587 }
588
589 template<class L> void AsyncDQueue<L>::_unlink_recover(void) throw(IPCException)
590 {
591 _slot->unlocked_signal(pegasus_thread_self());
592 (*_actual_count)--;
593 unlock();
594 }
595
596 template<class L> L * AsyncDQueue<L>::_remove_no_lock(const void *key) throw(IPCException)
597 {
598 if(_disallow->value() > 0)
599 {
600 mday 1.24 unlock();
601 throw ListClosed();
602 }
603 if( pegasus_thread_self() != _cond->get_owner())
604 throw Permission(pegasus_thread_self());
605 L *ret = static_cast<L *>(Base::next(0));
606 while(ret != 0)
607 {
608 if(ret->operator==(key))
609 {
610 return static_cast<L *>(Base::remove(static_cast<const void *>(ret)));
611 }
612
613 ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
614 }
615 return 0;
616 }
617
618 template<class L> L *AsyncDQueue<L>::_remove_no_lock(const L *key) throw(IPCException)
619 {
620 if(_disallow->value() > 0)
621 mday 1.24 {
622 unlock();
623 throw ListClosed();
624 }
625 if( pegasus_thread_self() != _cond->get_owner())
626 throw Permission(pegasus_thread_self());
627 L *ret = static_cast<L *>(Base::next(0));
628 while(ret != 0)
629 {
|
630 kumpf 1.32 if(ret->operator==(*key))
|
631 mday 1.24 {
632 return static_cast<L *>(Base::remove(static_cast<const void *>(ret)));
633 }
634
635 ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
636 }
637 return 0;
638 }
639
640
641 template<class L> AsyncDQueue<L>::AsyncDQueue(void)
642 : Base(false)
643 {
644 _cond = 0;
645 _slot = 0;
646 _node = 0;
647 _actual_count = 0;
648 _disallow = 0;
649 _capacity = 0;
650 }
651
652 mday 1.24 template<class L> AsyncDQueue<L>::AsyncDQueue(Boolean head, Uint32 capacity )
653 : Base(head)
654 {
655 if(head == true)
656 {
657 _cond = new Mutex();
658 _slot = new Condition(*_cond);
659 _node = new Condition(*_cond);
660 _actual_count = new AtomicInt(0);
661 _disallow = new AtomicInt(0);
662 _capacity = new AtomicInt(capacity);
663 }
664 else
665 {
666 _cond = 0;
667 _slot = 0;
668 _node = 0;
669 _actual_count = 0;
670 _disallow = 0;
671 _capacity = 0;
672 }
673 mday 1.24 }
674
675 template<class L> AsyncDQueue<L>::~AsyncDQueue(void)
676 {
677 delete _cond;
678 delete _slot;
679 delete _node;
680 delete _actual_count;
681 delete _disallow;
682 delete _capacity;
683 }
684
685
686 template<class L> void AsyncDQueue<L>::shutdown_queue(void)
687 {
688 try
689 {
690 lock(pegasus_thread_self());
691 (*_disallow)++;
692 _node->disallow();
693 _node->unlocked_signal(pegasus_thread_self());
694 mday 1.24 _node->unlocked_signal(pegasus_thread_self());
695 _slot->disallow();
696 _slot->unlocked_signal(pegasus_thread_self());
697 _slot->unlocked_signal(pegasus_thread_self());
698 unlock();
699 }
700 catch(ListClosed & )
701 {
702 (*_disallow)++;
703 }
704 }
705
706 template<class L> Boolean AsyncDQueue<L>::is_full(void)
707 {
|
708 mday 1.29 return false;
709
710
|
711 mday 1.24 if( _capacity->value() == 0 )
712 return false;
713
714 if(_actual_count->value() >= _capacity->value())
715 return true;
716 return false;
717 }
718
719 template<class L> Boolean AsyncDQueue<L>::is_empty(void)
720 {
721 if(_actual_count->value() == 0)
722 return true;
723 return false;
724 }
725
726
727 template<class L> Boolean AsyncDQueue<L>::is_shutdown(void)
728 {
729 if( _disallow->value() > 0)
730 return true;
731 return false;
732 mday 1.24 }
733
734 template<class L> void AsyncDQueue<L>::try_lock(PEGASUS_THREAD_TYPE myself) throw(IPCException)
735 {
736 if(_disallow->value() > 0)
737 {
738 throw ListClosed();
739 }
740 _cond->try_lock(myself);
741 }
742
743 template<class L> void AsyncDQueue<L>::lock(PEGASUS_THREAD_TYPE myself) throw(IPCException)
744 {
745 if(_disallow->value() > 0)
746 {
747 throw ListClosed();
748 }
749 _cond->lock(myself);
750 }
751
752 template<class L> void AsyncDQueue<L>::unlock(void)
753 mday 1.24 {
754 _cond->unlock();
755 }
756
|
757 mday 1.30 template<class L> void AsyncDQueue<L>::signal_slot(void) throw(IPCException)
758 {
759 _cond->lock(pegasus_thread_self());
760 _slot->unlocked_signal(pegasus_thread_self());
761 _cond->unlock();
762 }
763
764 template<class L> void AsyncDQueue<L>::signal_node(void) throw(IPCException)
765 {
766 _cond->lock(pegasus_thread_self());
767 _node->unlocked_signal(pegasus_thread_self());
768 _cond->unlock();
769 }
770
771 template<class L> Condition *AsyncDQueue<L>::get_node_cond(void)
772 { return _node ; }
773
774 template<class L> Condition * AsyncDQueue<L>::get_slot_cond(void)
775 { return _slot; }
776
|
777 mday 1.24 template<class L> void AsyncDQueue<L>::wait_for_node(void) throw(IPCException)
778 {
779 _unlink_prep();
780 }
781
782 template<class L> void AsyncDQueue<L>::set_capacity(Uint32 capacity) throw(IPCException)
783 {
784 lock(pegasus_thread_self());
|
785 keith.petley 1.39 *_capacity = capacity;
|
786 mday 1.24 unlock();
787 }
788
789 template<class L> Uint32 AsyncDQueue<L>::get_capacity(void)
790 {
791 return _capacity->value();
792 }
793
794 template<class L> void AsyncDQueue<L>::insert_first(L *element) throw(IPCException)
795 {
796 if(element == 0)
797 return;
798
799 lock(pegasus_thread_self());
800 if(true == is_full())
801 {
802 unlock();
803 throw ListFull(_capacity->value());
804 }
805 Base::insert_first(static_cast<void *>(element));
806 _insert_recover();
807 mday 1.24 }
808
809 template<class L> void AsyncDQueue<L>::insert_first_wait(L *element) throw(IPCException)
810 {
811 if(element == 0)
812 return;
813
814 _insert_prep();
815 Base::insert_first(static_cast<void *>(element));
816 _insert_recover();
817 }
818
819 template<class L> void AsyncDQueue<L>::insert_last(L *element) throw(IPCException)
820 {
821 if(element == 0)
822 return;
823 lock(pegasus_thread_self());
824 if(true == is_full())
825 {
826 unlock();
827 throw ListFull(_capacity->value());
828 mday 1.24 }
829 Base::insert_last(static_cast<void *>(element));
830 _insert_recover();
831 }
832
833 template<class L> void AsyncDQueue<L>::insert_last_wait(L *element) throw(IPCException)
834 {
835 if(element == 0)
836 return;
837 _insert_prep();
838 Base::insert_last(element);
839 _insert_recover();
840 }
841
842 template<class L> void AsyncDQueue<L>::empty_list(void) throw(IPCException)
843 {
844 lock(pegasus_thread_self());
845 Base::empty_list();
846 (*_actual_count) = 0;
847 _slot->unlocked_signal(pegasus_thread_self());
848 unlock();
849 mday 1.24 }
850
851 template<class L> L *AsyncDQueue<L>::remove_first(void) throw(IPCException)
852 {
|
853 mday 1.29
|
854 mday 1.24 lock(pegasus_thread_self());
855 L *ret = static_cast<L *>(Base::remove_first());
856 if(ret != 0)
|
857 mday 1.29 {
858 _slot->unlocked_signal(pegasus_thread_self());
|
859 mday 1.24 (*_actual_count)--;
|
860 mday 1.29 }
|
861 mday 1.24 unlock();
862 return ret;
863 }
864
865 template<class L> L *AsyncDQueue<L>::remove_first_wait(void) throw(IPCException)
866 {
867 _unlink_prep();
868 L *ret = static_cast<L *>(Base::remove_first());
869 _unlink_recover();
870 return ret;
871 }
872
873 template<class L> L *AsyncDQueue<L>::remove_last(void) throw(IPCException)
874 {
875 lock(pegasus_thread_self());
|
876 mday 1.29
|
877 mday 1.24 L *ret = static_cast<L *>(Base::remove_last());
878 if(ret != 0)
|
879 mday 1.29 {
|
880 mday 1.24 (*_actual_count)--;
|
881 mday 1.29 _slot->unlocked_signal(pegasus_thread_self());
882 }
|
883 mday 1.24 unlock();
884 return ret;
885 }
886
887 template<class L> L *AsyncDQueue<L>::remove_last_wait(void) throw(IPCException)
888 {
889 _unlink_prep();
890 L *ret = static_cast<L *>(Base::remove_last());
891 _unlink_recover();
892 return ret;
893 }
894
895 template<class L> L *AsyncDQueue<L>::remove(const void *key) throw(IPCException)
896 {
897 if(key == 0)
898 return 0;
899 lock(pegasus_thread_self());
|
900 mday 1.29
|
901 mday 1.24 L *ret = _remove_no_lock(key);
902 if(ret != 0)
903 {
904 (*_actual_count)--;
905 _slot->unlocked_signal(pegasus_thread_self());
906 }
907 unlock();
908 return ret;
909 }
910
911 template<class L>L *AsyncDQueue<L>::remove(const L *key) throw(IPCException)
912 {
913 if(key == 0)
914 return 0;
915 lock(pegasus_thread_self());
|
916 mday 1.29
|
917 mday 1.24 L *ret = _remove_no_lock(key);
918 if(ret != 0)
919 {
920 (*_actual_count)--;
921 _slot->unlocked_signal(pegasus_thread_self());
922 }
923 unlock();
924 return ret;
925 }
926
927 template<class L> L *AsyncDQueue<L>::remove_no_lock(const void *key) throw(IPCException)
928 {
|
929 mday 1.29 if(_disallow->value() > 0)
930 {
931 unlock();
932 throw ListClosed();
933 }
934
|
935 mday 1.24 if(key == 0)
936 return 0;
937
938 L *ret = 0;
939
940 if(Base::count() > 0 )
941 {
942 ret = _remove_no_lock(key);
943 if(ret != 0)
944 {
945 (*_actual_count)--;
946 _slot->unlocked_signal(pegasus_thread_self());
947 }
948 }
949 return ret;
950 }
951
952
953 template<class L> L *AsyncDQueue<L>::remove_no_lock(const L *key) throw(IPCException)
954 {
|
955 mday 1.29 if(_disallow->value() > 0)
956 {
957 unlock();
958 throw ListClosed();
959 }
960
|
961 mday 1.24 if(key == 0)
962 return 0;
963
964 L *ret = 0;
965
966 if(Base::count() > 0 )
967 {
968 ret = _remove_no_lock(key);
969 if(ret != 0)
970 {
971 (*_actual_count)--;
972 _slot->unlocked_signal(pegasus_thread_self());
973 }
974 }
975 return ret;
976 }
977
978 template<class L> L *AsyncDQueue<L>::remove_wait(const void *key) throw(IPCException)
979 {
|
980 mday 1.29
|
981 mday 1.24 if(key == 0)
982 return 0;
983
984 lock(pegasus_thread_self());
985
986 L *ret = _remove_no_lock(key);
987 while( ret == 0 )
988 {
989 if(_disallow->value() > 0)
990 {
991 unlock();
992 throw ListClosed();
993 }
994 _node->unlocked_wait(pegasus_thread_self());
995 if(_disallow->value() > 0)
996 {
997 unlock();
998 throw ListClosed();
999 }
1000 ret = _remove_no_lock(key);
1001 }
1002 mday 1.24 if(ret != 0)
1003 {
1004 (*_actual_count)--;
1005 _slot->unlocked_signal(pegasus_thread_self());
1006 }
1007 unlock();
1008 return ret;
1009 }
1010
1011 template<class L> L *AsyncDQueue<L>::next(const L *ref) throw(IPCException)
1012 {
1013 if( pegasus_thread_self() != _cond->get_owner())
1014 throw Permission(pegasus_thread_self());
1015
1016 return static_cast<L *>(Base::next( reinterpret_cast<const void *>(ref)));
1017 }
1018
1019 template<class L> L *AsyncDQueue<L>::prev(const L *ref) throw(IPCException)
1020 {
1021 if( pegasus_thread_self() != _cond->get_owner())
1022 throw Permission(pegasus_thread_self());
1023 mday 1.24
1024 return static_cast<L *>(Base::prev( reinterpret_cast<const void *>(ref)));
1025 }
1026
1027 template<class L> L *AsyncDQueue<L>::reference(const void *key) throw(IPCException)
1028 {
|
1029 mday 1.29 if(_disallow->value() > 0)
1030 {
1031 unlock();
1032 throw ListClosed();
1033 }
1034
|
1035 mday 1.24 if( key == 0 )
1036 return 0;
1037
1038 if( pegasus_thread_self() != _cond->get_owner())
1039 throw Permission(pegasus_thread_self());
1040
1041 if(Base::count() > 0 )
1042 {
1043 L *ret = static_cast<L *>(Base::next(0));
1044 while(ret != 0)
1045 {
1046 if(ret->operator==(key))
1047 return ret;
1048 ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
1049 }
1050 }
1051 return(0);
1052 }
1053
1054 template<class L> L *AsyncDQueue<L>::reference(const L *key) throw(IPCException)
1055 {
|
1056 mday 1.29 if(_disallow->value() > 0)
1057 {
1058 unlock();
1059 throw ListClosed();
1060 }
1061
|
1062 mday 1.24 if(key == 0)
1063 return 0;
1064
1065 if( pegasus_thread_self() != _cond->get_owner())
1066 throw Permission(pegasus_thread_self());
1067
1068 if(Base::count() > 0 )
1069 {
1070 L *ret = static_cast<L *>(Base::next(0));
1071 while(ret != 0)
1072 {
|
1073 kumpf 1.32 if(ret->operator==(*key))
|
1074 mday 1.24 return ret;
1075 ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
1076 }
1077 }
1078 return(0);
1079 }
1080
1081 template<class L> Uint32 AsyncDQueue<L>::count(void) { return _actual_count->value() ; }
|
1082 mday 1.37 template<class L> Uint32 AsyncDQueue<L>::size(void) { return _actual_count->value() ; }
|
1083 mday 1.24
|
1084 mike 1.2
1085 PEGASUS_NAMESPACE_END
1086
1087 #endif /* Pegasus_DQueue_h */
1088
|