(file) Return to AsyncQueue.h CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mike  1.2 //%2006////////////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4           // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5           // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6           // IBM Corp.; EMC Corporation, The Open Group.
  7           // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8           // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9           // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10           // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11           // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12           // EMC Corporation; Symantec Corporation; The Open Group.
 13           //
 14           // Permission is hereby granted, free of charge, to any person obtaining a copy
 15           // of this software and associated documentation files (the "Software"), to
 16           // deal in the Software without restriction, including without limitation the
 17           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18           // sell copies of the Software, and to permit persons to whom the Software is
 19           // furnished to do so, subject to the following conditions:
 20           // 
 21           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29           //
 30           //==============================================================================
 31           //
 32           //%/////////////////////////////////////////////////////////////////////////////
 33           
 34           #ifndef Pegasus_AsyncQueue_h
 35           #define Pegasus_AsyncQueue_h
 36           
 37           #include <Pegasus/Common/Linkage.h>
 38           #include <Pegasus/Common/List.h>
 39 mike  1.3 #include <Pegasus/Common/Condition.h>
 40 mike  1.2 
 41           PEGASUS_NAMESPACE_BEGIN
 42           
 43           /** AsyncQueue implementation (formerly AsyncDQueue).
 44           */
 45 kumpf 1.5 template<class ElemType>
 46 mike  1.2 class AsyncQueue
 47           {
 48           public:
 49           
 50 mike  1.3     /** Constructor (zero means unlimited capacity).
 51 mike  1.2     */
 52 mike  1.3     AsyncQueue(size_t capacity = 0);
 53 mike  1.2 
 54               /** Destructor.
 55               */
 56               virtual ~AsyncQueue();
 57           
 58 mike  1.3     /** Close the queue so that subsequent enqueue() and dequeue() requests
 59                   result in ListClosed() exceptions.
 60 mike  1.2     */
 61 mike  1.3     void close();
 62 mike  1.2 
 63               /** Enqueue an element at the back of queue.
 64               */
 65               void enqueue(ElemType *element);
 66           
 67 mike  1.3     /** Enqueue an element at the back of queue (wait if the queue is full).
 68 mike  1.2     */
 69               void enqueue_wait(ElemType *element);
 70           
 71               /** Dequeue an element from the front of the queue. Return null immediately
 72 kumpf 1.5         if queue is empty.
 73 mike  1.2     */
 74               ElemType *dequeue();
 75           
 76 mike  1.3     /** Dequeue an element from the front of the queue (wait if the queue is
 77                   empty).
 78 mike  1.2     */
 79               ElemType *dequeue_wait();
 80           
 81               /** Discard all the elements on the list. The size becomes zero afterwards.
 82               */
 83               void clear();
 84           
 85               /** Return number of element in queue.
 86               */
 87 mike  1.3     Uint32 count() const { return _size.get(); }
 88 mike  1.2 
 89 mike  1.3     /** Get capacity.
 90 mike  1.2     */
 91               Uint32 capacity() const { return _capacity.get(); }
 92           
 93 mike  1.3     /** Return number of element in queue.
 94 mike  1.2     */
 95 mike  1.3     Uint32 size() const { return _size.get(); }
 96 mike  1.2 
 97               /** Return true is queue is empty (has zero elements).
 98               */
 99 mike  1.3     Boolean is_empty() const { return _size.get() == 0; }
100 mike  1.2 
101 mike  1.3     /** Return true if the queue is full.
102 mike  1.2     */
103 mike  1.3     Boolean is_full() const { return _size.get() == _capacity.get(); }
104 mike  1.2 
105 mike  1.3     /** Return true if the queue has been closed (in which case no new
106                   elements may be enqueued).
107 mike  1.2     */
108 mike  1.3     Boolean is_closed() const { return _closed.get(); }
109 mike  1.2 
110           private:
111           
112 mike  1.3     Mutex _mutex;
113               Condition _not_empty;
114               Condition _not_full;
115 mike  1.2     AtomicInt _capacity;
116 mike  1.3     AtomicInt _size;
117               AtomicInt _closed;
118 mike  1.2     typedef List<ElemType,NullLock> Rep;
119               Rep _rep;
120           };
121           
122 kumpf 1.5 template<class ElemType>
123           AsyncQueue<ElemType>::AsyncQueue(size_t capacity) :
124 mike  1.4     _mutex(Mutex::NON_RECURSIVE), _capacity(capacity)
125 mike  1.2 {
126 mike  1.3     if (capacity == 0)
127                   _capacity.set(0x7FFFFFFF);
128 mike  1.2 }
129           
130 kumpf 1.5 template<class ElemType>
131 mike  1.2 AsyncQueue<ElemType>::~AsyncQueue()
132           {
133           }
134           
135 kumpf 1.5 template<class ElemType>
136 mike  1.3 void AsyncQueue<ElemType>::close()
137 mike  1.2 {
138 mike  1.3     AutoMutex auto_mutex(_mutex);
139 mike  1.2 
140 mike  1.3     if (!is_closed())
141 mike  1.2     {
142 mike  1.3         _closed++;
143                   _not_full.signal();
144                   _not_empty.signal();
145 mike  1.2     }
146           }
147           
148 kumpf 1.5 template<class ElemType>
149 mike  1.3 void AsyncQueue<ElemType>::enqueue(ElemType *element)
150 mike  1.2 {
151 mike  1.3     if (element)
152               {
153                   AutoMutex auto_mutex(_mutex);
154           
155                   if (is_closed())
156                       throw ListClosed();
157           
158                   if (is_full())
159                       throw ListFull(_capacity.get());
160 mike  1.2 
161 mike  1.3         _rep.insert_back(element);
162                   _size++;
163                   _not_empty.signal();
164 mike  1.2     }
165           }
166           
167 kumpf 1.5 template<class ElemType>
168 mike  1.3 void AsyncQueue<ElemType>::enqueue_wait(ElemType *element)
169 mike  1.2 {
170 mike  1.3     if (element)
171               {
172                   AutoMutex auto_mutex(_mutex);
173 mike  1.2 
174 mike  1.3         while (is_full())
175                   {
176                       if (is_closed())
177                           throw ListClosed();
178 mike  1.2 
179 mike  1.3             _not_full.wait(_mutex);
180                   }
181 mike  1.2 
182 mike  1.3         if (is_closed())
183                       throw ListClosed();
184 mike  1.2 
185 mike  1.3         _rep.insert_back(element);
186                   _size++;
187                   _not_empty.signal();
188 mike  1.2     }
189           }
190           
191 kumpf 1.5 template<class ElemType>
192 mike  1.3 void AsyncQueue<ElemType>::clear()
193 mike  1.2 {
194 mike  1.3     AutoMutex auto_mutex(_mutex);
195               _rep.clear();
196               _size = 0;
197               _not_full.signal();
198 mike  1.2 }
199           
200 kumpf 1.5 template<class ElemType>
201 mike  1.3 ElemType* AsyncQueue<ElemType>::dequeue()
202 mike  1.2 {
203 mike  1.3     AutoMutex auto_mutex(_mutex);
204           
205               if (is_closed())
206 mike  1.2         throw ListClosed();
207           
208 mike  1.3     ElemType* p = _rep.remove_front();
209 mike  1.2 
210 mike  1.3     if (p)
211 mike  1.2     {
212 mike  1.3         _size--;
213                   _not_full.signal();
214 mike  1.2     }
215 mike  1.3 
216               return p;
217 mike  1.2 }
218           
219 kumpf 1.5 template<class ElemType>
220 mike  1.3 ElemType* AsyncQueue<ElemType>::dequeue_wait()
221 mike  1.2 {
222 mike  1.3     AutoMutex auto_mutex(_mutex);
223           
224               while (is_empty())
225 mike  1.2     {
226 mike  1.3         if (is_closed())
227                       throw ListClosed();
228 mike  1.2 
229 mike  1.3         _not_empty.wait(_mutex);
230 mike  1.2     }
231           
232 mike  1.3     if (is_closed())
233                   throw ListClosed();
234 mike  1.2 
235 mike  1.3     ElemType* p = _rep.remove_front();
236               PEGASUS_DEBUG_ASSERT(p != 0);
237               _size--;
238               _not_full.signal();
239 mike  1.2 
240 mike  1.3     return p;
241 mike  1.2 }
242           
243           PEGASUS_NAMESPACE_END
244           
245           #endif /* Pegasus_AsyncQueue_h */

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2