version 1.2, 2006/07/11 18:39:28
|
version 1.2.2.1, 2006/07/27 23:11:51
|
|
|
#ifndef Pegasus_AsyncQueue_h | #ifndef Pegasus_AsyncQueue_h |
#define Pegasus_AsyncQueue_h | #define Pegasus_AsyncQueue_h |
| |
#include <Pegasus/Common/IPC.h> |
|
#include <Pegasus/Common/Linkage.h> | #include <Pegasus/Common/Linkage.h> |
#include <Pegasus/Common/List.h> | #include <Pegasus/Common/List.h> |
|
#include <Pegasus/Common/Condition.h> |
| |
PEGASUS_NAMESPACE_BEGIN | PEGASUS_NAMESPACE_BEGIN |
| |
|
|
{ | { |
public: | public: |
| |
/** Constructor. |
/** Constructor (zero means unlimited capacity). |
*/ | */ |
AsyncQueue(Uint32 capacity = 0); |
AsyncQueue(size_t capacity = 0); |
| |
/** Destructor. | /** Destructor. |
*/ | */ |
virtual ~AsyncQueue(); | virtual ~AsyncQueue(); |
| |
/** Shutdownt the queue. |
/** Close the queue so that subsequent enqueue() and dequeue() requests |
|
result in ListClosed() exceptions. |
*/ | */ |
void shutdown_queue(); |
void close(); |
| |
/** Enqueue an element at the back of queue. | /** Enqueue an element at the back of queue. |
*/ | */ |
void enqueue(ElemType *element); | void enqueue(ElemType *element); |
| |
/** Enqueue an element at the back of queue (wait for dequeue by another |
/** Enqueue an element at the back of queue (wait if the queue is full). |
thread). |
|
*/ | */ |
void enqueue_wait(ElemType *element); | void enqueue_wait(ElemType *element); |
| |
|
|
*/ | */ |
ElemType *dequeue(); | ElemType *dequeue(); |
| |
/** Dequeue an element from the front of the queue (if there is no element |
/** Dequeue an element from the front of the queue (wait if the queue is |
on queue, wait until there is). |
empty). |
*/ | */ |
ElemType *dequeue_wait(); | ElemType *dequeue_wait(); |
| |
|
|
| |
/** Return number of element in queue. | /** Return number of element in queue. |
*/ | */ |
Uint32 count() const { return _count.get(); } |
Uint32 count() const { return _size.get(); } |
| |
/** Return number of element in queue. |
/** Get capacity. |
*/ |
|
Uint32 size() const { return _count.get(); } |
|
|
|
/** Return the maximum number of elements permitted on queue at once. |
|
*/ | */ |
Uint32 capacity() const { return _capacity.get(); } | Uint32 capacity() const { return _capacity.get(); } |
| |
/** Return true if queue has reached its capacity. |
/** Return number of element in queue. |
*/ | */ |
Boolean is_full() const { /* never full */ return false; } |
Uint32 size() const { return _size.get(); } |
| |
/** Return true is queue is empty (has zero elements). | /** Return true is queue is empty (has zero elements). |
*/ | */ |
Boolean is_empty() const { return size() == 0; } |
Boolean is_empty() const { return _size.get() == 0; } |
|
|
/** Return true if the queue has been shutdown (in which case no new |
|
elements may be enqueued). |
|
*/ |
|
Boolean is_shutdown() const { return _disallow.get() > 0; } |
|
| |
/** Attempt to lock the queue. |
/** Return true if the queue is full. |
*/ | */ |
void try_lock(PEGASUS_THREAD_TYPE myself); |
Boolean is_full() const { return _size.get() == _capacity.get(); } |
| |
/** Lock the queue. |
/** Return true if the queue has been closed (in which case no new |
*/ |
elements may be enqueued). |
void lock(PEGASUS_THREAD_TYPE myself); |
|
|
|
/** Unlock the queue. |
|
*/ | */ |
void unlock(void) { _cond.unlock(); } |
Boolean is_closed() const { return _closed.get(); } |
| |
private: | private: |
| |
/** |
Mutex _mutex; |
@exception IPCException Indicates an IPC error occurred. |
Condition _not_empty; |
*/ |
Condition _not_full; |
void _insert_prep(); |
|
|
|
/** @exception IPCException Indicates an IPC error occurred. |
|
*/ |
|
void _insert_recover(); |
|
|
|
/** @exception IPCException Indicates an IPC error occurred. |
|
*/ |
|
void _unlink_prep(); |
|
|
|
/** @exception IPCException Indicates an IPC error occurred. |
|
*/ |
|
void _unlink_recover(); |
|
|
|
/** @exception IPCException Indicates an IPC error occurred. |
|
*/ |
|
ElemType *_remove_no_lock(const void *key); |
|
|
|
/** @exception IPCException Indicates an IPC error occurred. |
|
*/ |
|
ElemType *_remove_no_lock(const ElemType *key); |
|
|
|
static bool _equal_key(const ElemType* elem, const void* key) |
|
{ |
|
return elem->operator==(key); |
|
} |
|
|
|
static bool _equal_object(const ElemType* elem, const void* object) |
|
{ |
|
return elem->operator==(*((ElemType*)object)); |
|
} |
|
|
|
Mutex _cond; |
|
Condition _slot; |
|
Condition _node; |
|
AtomicInt _count; |
|
AtomicInt _disallow; |
|
AtomicInt _capacity; | AtomicInt _capacity; |
|
AtomicInt _size; |
|
AtomicInt _closed; |
typedef List<ElemType,NullLock> Rep; | typedef List<ElemType,NullLock> Rep; |
Rep _rep; | Rep _rep; |
}; | }; |
| |
template<class ElemType> | template<class ElemType> |
AsyncQueue<ElemType>::AsyncQueue(Uint32 capacity) : |
AsyncQueue<ElemType>::AsyncQueue(size_t capacity) : _capacity(capacity) |
_slot(_cond), _node(_cond), _capacity(0) |
|
{ | { |
|
if (capacity == 0) |
|
_capacity.set(0x7FFFFFFF); |
} | } |
| |
template<class ElemType> | template<class ElemType> |
AsyncQueue<ElemType>::~AsyncQueue() | AsyncQueue<ElemType>::~AsyncQueue() |
{ | { |
|
|
} | } |
| |
template<class ElemType> | template<class ElemType> |
void AsyncQueue<ElemType>::_insert_prep() |
void AsyncQueue<ElemType>::close() |
{ | { |
if(_disallow.get() > 0) |
AutoMutex auto_mutex(_mutex); |
{ |
|
unlock(); |
|
throw ListClosed(); |
|
} |
|
| |
_slot.lock_object(pegasus_thread_self()); |
if (!is_closed()) |
while(true == is_full()) |
|
{ |
|
_slot.unlocked_wait(pegasus_thread_self()); |
|
if(_disallow.get() > 0) |
|
{ | { |
unlock(); |
_closed++; |
throw ListClosed(); |
_not_full.signal(); |
|
_not_empty.signal(); |
} | } |
} | } |
} |
|
|
|
template<class ElemType> |
|
void AsyncQueue<ElemType>::_insert_recover() |
|
{ |
|
_node.unlocked_signal(pegasus_thread_self()); |
|
_count++; |
|
unlock(); |
|
} |
|
| |
template<class ElemType> | template<class ElemType> |
void AsyncQueue<ElemType>::_unlink_prep() |
void AsyncQueue<ElemType>::enqueue(ElemType *element) |
{ |
|
if(_disallow.get() > 0) |
|
{ |
|
unlock(); |
|
throw ListClosed(); |
|
} |
|
_node.lock_object(pegasus_thread_self()); |
|
while(true == is_empty()) |
|
{ | { |
_node.unlocked_wait(pegasus_thread_self()); |
if (element) |
if(_disallow.get() > 0) |
|
{ | { |
unlock(); |
AutoMutex auto_mutex(_mutex); |
throw ListClosed(); |
|
} |
|
} |
|
} |
|
|
|
template<class ElemType> |
|
void AsyncQueue<ElemType>::_unlink_recover() |
|
{ |
|
_slot.unlocked_signal(pegasus_thread_self()); |
|
_count--; |
|
unlock(); |
|
} |
|
| |
template<class ElemType> |
if (is_closed()) |
ElemType* AsyncQueue<ElemType>::_remove_no_lock(const void *key) |
|
{ |
|
if(_disallow.get() > 0) |
|
{ |
|
unlock(); |
|
throw ListClosed(); | throw ListClosed(); |
} |
|
|
|
if( pegasus_thread_self() != _cond.get_owner()) |
|
throw Permission(pegasus_thread_self()); |
|
| |
return _rep.remove(_equal_key, key); |
if (is_full()) |
} |
throw ListFull(_capacity.get()); |
| |
template<class ElemType> |
_rep.insert_back(element); |
ElemType *AsyncQueue<ElemType>::_remove_no_lock(const ElemType *key) |
_size++; |
{ |
_not_empty.signal(); |
if(_disallow.get() > 0) |
|
{ |
|
unlock(); |
|
throw ListClosed(); |
|
} | } |
if( pegasus_thread_self() != _cond.get_owner()) |
|
throw Permission(pegasus_thread_self()); |
|
|
|
return _rep.remove(_equal_object, key); |
|
} | } |
| |
template<class ElemType> | template<class ElemType> |
void AsyncQueue<ElemType>::shutdown_queue() |
void AsyncQueue<ElemType>::enqueue_wait(ElemType *element) |
{ | { |
try |
if (element) |
{ | { |
lock(pegasus_thread_self()); |
AutoMutex auto_mutex(_mutex); |
_disallow++; |
|
_node.disallow(); |
|
_node.unlocked_signal(pegasus_thread_self()); |
|
_node.unlocked_signal(pegasus_thread_self()); |
|
_slot.disallow(); |
|
_slot.unlocked_signal(pegasus_thread_self()); |
|
_slot.unlocked_signal(pegasus_thread_self()); |
|
unlock(); |
|
} |
|
catch(const ListClosed &) |
|
{ |
|
_disallow++; |
|
} |
|
} |
|
| |
template<class ElemType> |
while (is_full()) |
void AsyncQueue<ElemType>::try_lock(PEGASUS_THREAD_TYPE myself) |
|
{ |
|
if(_disallow.get() > 0) |
|
{ | { |
|
if (is_closed()) |
throw ListClosed(); | throw ListClosed(); |
} |
|
| |
_cond.try_lock(myself); |
_not_full.wait(_mutex); |
} | } |
| |
template<class ElemType> |
if (is_closed()) |
void AsyncQueue<ElemType>::lock(PEGASUS_THREAD_TYPE myself) |
|
{ |
|
if(_disallow.get() > 0) |
|
{ |
|
throw ListClosed(); | throw ListClosed(); |
} |
|
_cond.lock(myself); |
|
} |
|
| |
template<class ElemType> |
|
void AsyncQueue<ElemType>::enqueue(ElemType *element) |
|
{ |
|
if(element != 0) |
|
{ |
|
lock(pegasus_thread_self()); |
|
if(true == is_full()) |
|
{ |
|
unlock(); |
|
throw ListFull(_capacity.get()); |
|
} |
|
_rep.insert_back(element); |
|
_insert_recover(); |
|
} |
|
} |
|
|
|
template<class ElemType> |
|
void AsyncQueue<ElemType>::enqueue_wait(ElemType *element) |
|
{ |
|
if(element != 0) |
|
{ |
|
_insert_prep(); |
|
_rep.insert_back(element); | _rep.insert_back(element); |
_insert_recover(); |
_size++; |
|
_not_empty.signal(); |
} | } |
} | } |
| |
template<class ElemType> | template<class ElemType> |
void AsyncQueue<ElemType>::clear() | void AsyncQueue<ElemType>::clear() |
{ | { |
lock(pegasus_thread_self()); |
AutoMutex auto_mutex(_mutex); |
_rep.clear(); | _rep.clear(); |
_count = 0; |
_size = 0; |
_slot.unlocked_signal(pegasus_thread_self()); |
_not_full.signal(); |
unlock(); |
|
} | } |
| |
template<class ElemType> | template<class ElemType> |
ElemType *AsyncQueue<ElemType>::dequeue() | ElemType *AsyncQueue<ElemType>::dequeue() |
{ | { |
|
AutoMutex auto_mutex(_mutex); |
| |
lock(pegasus_thread_self()); |
if (is_closed()) |
ElemType *ret = _rep.remove_front(); |
throw ListClosed(); |
if(ret != 0) |
|
|
ElemType* p = _rep.remove_front(); |
|
|
|
if (p) |
{ | { |
_slot.unlocked_signal(pegasus_thread_self()); |
_size--; |
_count--; |
_not_full.signal(); |
} | } |
unlock(); |
|
return ret; |
return p; |
} | } |
| |
template<class ElemType> | template<class ElemType> |
ElemType *AsyncQueue<ElemType>::dequeue_wait() | ElemType *AsyncQueue<ElemType>::dequeue_wait() |
{ | { |
_unlink_prep(); |
AutoMutex auto_mutex(_mutex); |
ElemType *ret = _rep.remove_front(); |
|
_unlink_recover(); |
while (is_empty()) |
return ret; |
{ |
|
if (is_closed()) |
|
throw ListClosed(); |
|
|
|
_not_empty.wait(_mutex); |
|
} |
|
|
|
if (is_closed()) |
|
throw ListClosed(); |
|
|
|
ElemType* p = _rep.remove_front(); |
|
PEGASUS_DEBUG_ASSERT(p != 0); |
|
_size--; |
|
_not_full.signal(); |
|
|
|
return p; |
} | } |
| |
PEGASUS_NAMESPACE_END | PEGASUS_NAMESPACE_END |