(file) Return to AsyncQueue.h CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/AsyncQueue.h between version 1.2 and 1.2.2.1

version 1.2, 2006/07/11 18:39:28 version 1.2.2.1, 2006/07/27 23:11:51
Line 46 
Line 46 
 #ifndef Pegasus_AsyncQueue_h #ifndef Pegasus_AsyncQueue_h
 #define Pegasus_AsyncQueue_h #define Pegasus_AsyncQueue_h
  
 #include <Pegasus/Common/IPC.h>  
 #include <Pegasus/Common/Linkage.h> #include <Pegasus/Common/Linkage.h>
 #include <Pegasus/Common/List.h> #include <Pegasus/Common/List.h>
   #include <Pegasus/Common/Condition.h>
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
  
Line 59 
Line 59 
 { {
 public: public:
  
     /** Constructor.      /** Constructor (zero means unlimited capacity).
     */     */
     AsyncQueue(Uint32 capacity = 0);      AsyncQueue(size_t capacity = 0);
  
     /** Destructor.     /** Destructor.
     */     */
     virtual ~AsyncQueue();     virtual ~AsyncQueue();
  
     /** Shutdownt the queue.      /** Close the queue so that subsequent enqueue() and dequeue() requests
           result in ListClosed() exceptions.
     */     */
     void shutdown_queue();      void close();
  
     /** Enqueue an element at the back of queue.     /** Enqueue an element at the back of queue.
     */     */
     void enqueue(ElemType *element);     void enqueue(ElemType *element);
  
     /** Enqueue an element at the back of queue (wait for dequeue by another      /** Enqueue an element at the back of queue (wait if the queue is full).
         thread).  
     */     */
     void enqueue_wait(ElemType *element);     void enqueue_wait(ElemType *element);
  
Line 85 
Line 85 
     */     */
     ElemType *dequeue();     ElemType *dequeue();
  
     /** Dequeue an element from the front of the queue (if there is no element      /** Dequeue an element from the front of the queue (wait if the queue is
         on queue, wait until there is).          empty).
     */     */
     ElemType *dequeue_wait();     ElemType *dequeue_wait();
  
Line 96 
Line 96 
  
     /** Return number of element in queue.     /** Return number of element in queue.
     */     */
     Uint32 count() const { return _count.get(); }      Uint32 count() const { return _size.get(); }
  
     /** Return number of element in queue.      /** Get capacity.
     */  
     Uint32 size() const { return _count.get(); }  
   
     /** Return the maximum number of elements permitted on queue at once.  
     */     */
     Uint32 capacity() const { return _capacity.get(); }     Uint32 capacity() const { return _capacity.get(); }
  
     /** Return true if queue has reached its capacity.      /** Return number of element in queue.
     */     */
     Boolean is_full() const { /* never full */ return false; }      Uint32 size() const { return _size.get(); }
  
     /** Return true is queue is empty (has zero elements).     /** Return true is queue is empty (has zero elements).
     */     */
     Boolean is_empty() const { return size() == 0; }      Boolean is_empty() const { return _size.get() == 0; }
   
     /** Return true if the queue has been shutdown (in which case no new  
         elements may be enqueued).  
     */  
     Boolean is_shutdown() const { return _disallow.get() > 0; }  
  
     /** Attempt to lock the queue.      /** Return true if the queue is full.
     */     */
     void try_lock(PEGASUS_THREAD_TYPE myself);      Boolean is_full() const { return _size.get() == _capacity.get(); }
  
     /** Lock the queue.      /** Return true if the queue has been closed (in which case no new
     */          elements may be enqueued).
     void lock(PEGASUS_THREAD_TYPE myself);  
   
     /** Unlock the queue.  
     */     */
     void unlock(void) { _cond.unlock(); }      Boolean is_closed() const { return _closed.get(); }
  
 private: private:
  
     /**      Mutex _mutex;
         @exception  IPCException    Indicates an IPC error occurred.      Condition _not_empty;
     */      Condition _not_full;
     void _insert_prep();  
   
     /** @exception  IPCException    Indicates an IPC error occurred.  
     */  
     void _insert_recover();  
   
     /** @exception  IPCException    Indicates an IPC error occurred.  
     */  
     void _unlink_prep();  
   
     /** @exception  IPCException    Indicates an IPC error occurred.  
     */  
     void _unlink_recover();  
   
     /** @exception  IPCException    Indicates an IPC error occurred.  
     */  
     ElemType *_remove_no_lock(const void *key);  
   
     /** @exception  IPCException    Indicates an IPC error occurred.  
     */  
     ElemType *_remove_no_lock(const ElemType *key);  
   
     static bool _equal_key(const ElemType* elem, const void* key)  
     {  
         return elem->operator==(key);  
     }  
   
     static bool _equal_object(const ElemType* elem, const void* object)  
     {  
         return elem->operator==(*((ElemType*)object));  
     }  
   
     Mutex _cond;  
     Condition _slot;  
     Condition _node;  
     AtomicInt _count;  
     AtomicInt _disallow;  
     AtomicInt _capacity;     AtomicInt _capacity;
       AtomicInt _size;
       AtomicInt _closed;
     typedef List<ElemType,NullLock> Rep;     typedef List<ElemType,NullLock> Rep;
     Rep _rep;     Rep _rep;
 }; };
  
 template<class ElemType> template<class ElemType>
 AsyncQueue<ElemType>::AsyncQueue(Uint32 capacity) :  AsyncQueue<ElemType>::AsyncQueue(size_t capacity) : _capacity(capacity)
     _slot(_cond), _node(_cond), _capacity(0)  
 { {
       if (capacity == 0)
           _capacity.set(0x7FFFFFFF);
 } }
  
 template<class ElemType> template<class ElemType>
 AsyncQueue<ElemType>::~AsyncQueue() AsyncQueue<ElemType>::~AsyncQueue()
 { {
   
 } }
  
 template<class ElemType> template<class ElemType>
 void AsyncQueue<ElemType>::_insert_prep()  void AsyncQueue<ElemType>::close()
 { {
     if(_disallow.get() > 0)      AutoMutex auto_mutex(_mutex);
     {  
         unlock();  
         throw ListClosed();  
     }  
  
     _slot.lock_object(pegasus_thread_self());      if (!is_closed())
     while(true == is_full())  
     {  
         _slot.unlocked_wait(pegasus_thread_self());  
         if(_disallow.get() > 0)  
         {         {
             unlock();          _closed++;
             throw ListClosed();          _not_full.signal();
           _not_empty.signal();
         }         }
     }     }
 }  
   
 template<class ElemType>  
 void AsyncQueue<ElemType>::_insert_recover()  
 {  
     _node.unlocked_signal(pegasus_thread_self());  
     _count++;  
     unlock();  
 }  
  
 template<class ElemType> template<class ElemType>
 void AsyncQueue<ElemType>::_unlink_prep()  void AsyncQueue<ElemType>::enqueue(ElemType *element)
 {  
     if(_disallow.get() > 0)  
     {  
         unlock();  
         throw ListClosed();  
     }  
     _node.lock_object(pegasus_thread_self());  
     while(true == is_empty())  
     {     {
         _node.unlocked_wait(pegasus_thread_self());      if (element)
        if(_disallow.get() > 0)  
        {        {
            unlock();          AutoMutex auto_mutex(_mutex);
            throw ListClosed();  
        }  
     }  
 }  
   
 template<class ElemType>  
 void AsyncQueue<ElemType>::_unlink_recover()  
 {  
     _slot.unlocked_signal(pegasus_thread_self());  
     _count--;  
     unlock();  
 }  
  
 template<class ElemType>          if (is_closed())
 ElemType* AsyncQueue<ElemType>::_remove_no_lock(const void *key)  
 {  
     if(_disallow.get() > 0)  
     {  
         unlock();  
         throw ListClosed();         throw ListClosed();
     }  
   
     if( pegasus_thread_self() != _cond.get_owner())  
         throw Permission(pegasus_thread_self());  
  
     return _rep.remove(_equal_key, key);          if (is_full())
 }              throw ListFull(_capacity.get());
  
 template<class ElemType>          _rep.insert_back(element);
 ElemType *AsyncQueue<ElemType>::_remove_no_lock(const ElemType *key)          _size++;
 {          _not_empty.signal();
     if(_disallow.get() > 0)  
     {  
         unlock();  
         throw ListClosed();  
     }     }
     if( pegasus_thread_self() != _cond.get_owner())  
         throw Permission(pegasus_thread_self());  
   
     return _rep.remove(_equal_object, key);  
 } }
  
 template<class ElemType> template<class ElemType>
 void AsyncQueue<ElemType>::shutdown_queue()  void AsyncQueue<ElemType>::enqueue_wait(ElemType *element)
 { {
     try      if (element)
     {     {
         lock(pegasus_thread_self());          AutoMutex auto_mutex(_mutex);
         _disallow++;  
         _node.disallow();  
         _node.unlocked_signal(pegasus_thread_self());  
         _node.unlocked_signal(pegasus_thread_self());  
         _slot.disallow();  
         _slot.unlocked_signal(pegasus_thread_self());  
         _slot.unlocked_signal(pegasus_thread_self());  
         unlock();  
     }  
     catch(const ListClosed &)  
     {  
         _disallow++;  
     }  
 }  
  
 template<class ElemType>          while (is_full())
 void AsyncQueue<ElemType>::try_lock(PEGASUS_THREAD_TYPE myself)  
 {  
     if(_disallow.get() > 0)  
     {     {
               if (is_closed())
         throw ListClosed();         throw ListClosed();
     }  
  
     _cond.try_lock(myself);              _not_full.wait(_mutex);
 } }
  
 template<class ElemType>          if (is_closed())
 void AsyncQueue<ElemType>::lock(PEGASUS_THREAD_TYPE myself)  
 {  
     if(_disallow.get() > 0)  
     {  
        throw ListClosed();        throw ListClosed();
     }  
     _cond.lock(myself);  
 }  
  
 template<class ElemType>  
 void AsyncQueue<ElemType>::enqueue(ElemType *element)  
 {  
     if(element != 0)  
     {  
         lock(pegasus_thread_self());  
         if(true == is_full())  
         {  
             unlock();  
             throw ListFull(_capacity.get());  
         }  
         _rep.insert_back(element);  
         _insert_recover();  
     }  
 }  
   
 template<class ElemType>  
 void AsyncQueue<ElemType>::enqueue_wait(ElemType *element)  
 {  
     if(element != 0)  
     {  
         _insert_prep();  
         _rep.insert_back(element);         _rep.insert_back(element);
         _insert_recover();          _size++;
           _not_empty.signal();
     }     }
 } }
  
 template<class ElemType> template<class ElemType>
 void AsyncQueue<ElemType>::clear() void AsyncQueue<ElemType>::clear()
 { {
     lock(pegasus_thread_self());      AutoMutex auto_mutex(_mutex);
     _rep.clear();     _rep.clear();
     _count = 0;      _size = 0;
     _slot.unlocked_signal(pegasus_thread_self());      _not_full.signal();
     unlock();  
 } }
  
 template<class ElemType> template<class ElemType>
 ElemType *AsyncQueue<ElemType>::dequeue() ElemType *AsyncQueue<ElemType>::dequeue()
 { {
       AutoMutex auto_mutex(_mutex);
  
     lock(pegasus_thread_self());      if (is_closed())
     ElemType *ret = _rep.remove_front();          throw ListClosed();
     if(ret != 0)  
       ElemType* p = _rep.remove_front();
   
       if (p)
     {     {
         _slot.unlocked_signal(pegasus_thread_self());          _size--;
         _count--;          _not_full.signal();
     }     }
     unlock();  
     return ret;      return p;
 } }
  
 template<class ElemType> template<class ElemType>
 ElemType *AsyncQueue<ElemType>::dequeue_wait() ElemType *AsyncQueue<ElemType>::dequeue_wait()
 { {
     _unlink_prep();      AutoMutex auto_mutex(_mutex);
     ElemType *ret = _rep.remove_front();  
     _unlink_recover();      while (is_empty())
     return ret;      {
           if (is_closed())
               throw ListClosed();
   
           _not_empty.wait(_mutex);
       }
   
       if (is_closed())
           throw ListClosed();
   
       ElemType* p = _rep.remove_front();
       PEGASUS_DEBUG_ASSERT(p != 0);
       _size--;
       _not_full.signal();
   
       return p;
 } }
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.2  
changed lines
  Added in v.1.2.2.1

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2