1 mike 1.2 //%2006////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
7 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
9 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
11 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
13 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
20 //
21 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 mike 1.2 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #ifndef Pegasus_AsyncQueue_h
35 #define Pegasus_AsyncQueue_h
36
37 #include <Pegasus/Common/Linkage.h>
38 #include <Pegasus/Common/List.h>
|
39 mike 1.3 #include <Pegasus/Common/Condition.h>
|
40 mike 1.2
41 PEGASUS_NAMESPACE_BEGIN
42
43 /** AsyncQueue implementation (formerly AsyncDQueue).
44 */
|
45 kumpf 1.5 template<class ElemType>
|
46 mike 1.2 class AsyncQueue
47 {
48 public:
49
|
50 mike 1.3 /** Constructor (zero means unlimited capacity).
|
51 mike 1.2 */
|
52 mike 1.3 AsyncQueue(size_t capacity = 0);
|
53 mike 1.2
54 /** Destructor.
55 */
56 virtual ~AsyncQueue();
57
|
58 mike 1.3 /** Close the queue so that subsequent enqueue() and dequeue() requests
59 result in ListClosed() exceptions.
|
60 mike 1.2 */
|
61 mike 1.3 void close();
|
62 mike 1.2
63 /** Enqueue an element at the back of queue.
64 */
65 void enqueue(ElemType *element);
66
|
67 mike 1.3 /** Enqueue an element at the back of queue (wait if the queue is full).
|
68 mike 1.2 */
69 void enqueue_wait(ElemType *element);
70
71 /** Dequeue an element from the front of the queue. Return null immediately
|
72 kumpf 1.5 if queue is empty.
|
73 mike 1.2 */
74 ElemType *dequeue();
75
|
76 mike 1.3 /** Dequeue an element from the front of the queue (wait if the queue is
77 empty).
|
78 mike 1.2 */
79 ElemType *dequeue_wait();
80
81 /** Discard all the elements on the list. The size becomes zero afterwards.
82 */
83 void clear();
84
85 /** Return number of element in queue.
86 */
|
87 mike 1.3 Uint32 count() const { return _size.get(); }
|
88 mike 1.2
|
89 mike 1.3 /** Get capacity.
|
90 mike 1.2 */
91 Uint32 capacity() const { return _capacity.get(); }
92
|
93 mike 1.3 /** Return number of element in queue.
|
94 mike 1.2 */
|
95 mike 1.3 Uint32 size() const { return _size.get(); }
|
96 mike 1.2
97 /** Return true is queue is empty (has zero elements).
98 */
|
99 mike 1.3 Boolean is_empty() const { return _size.get() == 0; }
|
100 mike 1.2
|
101 mike 1.3 /** Return true if the queue is full.
|
102 mike 1.2 */
|
103 mike 1.3 Boolean is_full() const { return _size.get() == _capacity.get(); }
|
104 mike 1.2
|
105 mike 1.3 /** Return true if the queue has been closed (in which case no new
106 elements may be enqueued).
|
107 mike 1.2 */
|
108 mike 1.3 Boolean is_closed() const { return _closed.get(); }
|
109 mike 1.2
110 private:
111
|
112 mike 1.3 Mutex _mutex;
113 Condition _not_empty;
114 Condition _not_full;
|
115 mike 1.2 AtomicInt _capacity;
|
116 mike 1.3 AtomicInt _size;
117 AtomicInt _closed;
|
118 mike 1.2 typedef List<ElemType,NullLock> Rep;
119 Rep _rep;
120 };
121
|
122 kumpf 1.5 template<class ElemType>
123 AsyncQueue<ElemType>::AsyncQueue(size_t capacity) :
|
124 mike 1.4 _mutex(Mutex::NON_RECURSIVE), _capacity(capacity)
|
125 mike 1.2 {
|
126 mike 1.3 if (capacity == 0)
127 _capacity.set(0x7FFFFFFF);
|
128 mike 1.2 }
129
|
130 kumpf 1.5 template<class ElemType>
|
131 mike 1.2 AsyncQueue<ElemType>::~AsyncQueue()
132 {
133 }
134
|
135 kumpf 1.5 template<class ElemType>
|
136 mike 1.3 void AsyncQueue<ElemType>::close()
|
137 mike 1.2 {
|
138 mike 1.3 AutoMutex auto_mutex(_mutex);
|
139 mike 1.2
|
140 mike 1.3 if (!is_closed())
|
141 mike 1.2 {
|
142 mike 1.3 _closed++;
143 _not_full.signal();
144 _not_empty.signal();
|
145 mike 1.2 }
146 }
147
|
148 kumpf 1.5 template<class ElemType>
|
149 mike 1.3 void AsyncQueue<ElemType>::enqueue(ElemType *element)
|
150 mike 1.2 {
|
151 mike 1.3 if (element)
152 {
153 AutoMutex auto_mutex(_mutex);
154
155 if (is_closed())
156 throw ListClosed();
157
158 if (is_full())
159 throw ListFull(_capacity.get());
|
160 mike 1.2
|
161 mike 1.3 _rep.insert_back(element);
162 _size++;
163 _not_empty.signal();
|
164 mike 1.2 }
165 }
166
|
167 kumpf 1.5 template<class ElemType>
|
168 mike 1.3 void AsyncQueue<ElemType>::enqueue_wait(ElemType *element)
|
169 mike 1.2 {
|
170 mike 1.3 if (element)
171 {
172 AutoMutex auto_mutex(_mutex);
|
173 mike 1.2
|
174 mike 1.3 while (is_full())
175 {
176 if (is_closed())
177 throw ListClosed();
|
178 mike 1.2
|
179 mike 1.3 _not_full.wait(_mutex);
180 }
|
181 mike 1.2
|
182 mike 1.3 if (is_closed())
183 throw ListClosed();
|
184 mike 1.2
|
185 mike 1.3 _rep.insert_back(element);
186 _size++;
187 _not_empty.signal();
|
188 mike 1.2 }
189 }
190
|
191 kumpf 1.5 template<class ElemType>
|
192 mike 1.3 void AsyncQueue<ElemType>::clear()
|
193 mike 1.2 {
|
194 mike 1.3 AutoMutex auto_mutex(_mutex);
195 _rep.clear();
196 _size = 0;
197 _not_full.signal();
|
198 mike 1.2 }
199
|
200 kumpf 1.5 template<class ElemType>
|
201 mike 1.3 ElemType* AsyncQueue<ElemType>::dequeue()
|
202 mike 1.2 {
|
203 mike 1.3 AutoMutex auto_mutex(_mutex);
204
205 if (is_closed())
|
206 mike 1.2 throw ListClosed();
207
|
208 mike 1.3 ElemType* p = _rep.remove_front();
|
209 mike 1.2
|
210 mike 1.3 if (p)
|
211 mike 1.2 {
|
212 mike 1.3 _size--;
213 _not_full.signal();
|
214 mike 1.2 }
|
215 mike 1.3
216 return p;
|
217 mike 1.2 }
218
|
219 kumpf 1.5 template<class ElemType>
|
220 mike 1.3 ElemType* AsyncQueue<ElemType>::dequeue_wait()
|
221 mike 1.2 {
|
222 mike 1.3 AutoMutex auto_mutex(_mutex);
223
224 while (is_empty())
|
225 mike 1.2 {
|
226 mike 1.3 if (is_closed())
227 throw ListClosed();
|
228 mike 1.2
|
229 mike 1.3 _not_empty.wait(_mutex);
|
230 mike 1.2 }
231
|
232 mike 1.3 if (is_closed())
233 throw ListClosed();
|
234 mike 1.2
|
235 mike 1.3 ElemType* p = _rep.remove_front();
236 PEGASUS_DEBUG_ASSERT(p != 0);
237 _size--;
238 _not_full.signal();
|
239 mike 1.2
|
240 mike 1.3 return p;
|
241 mike 1.2 }
242
243 PEGASUS_NAMESPACE_END
244
245 #endif /* Pegasus_AsyncQueue_h */
|