(file) Return to AsyncQueue.h CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

File: [Pegasus] / pegasus / src / Pegasus / Common / AsyncQueue.h (download)
Revision: 1.1.2.2, Fri Jun 30 01:52:01 2006 UTC (18 years ago) by mike
Branch: TASK_BUG_5191_QUEUE_CONSOLIDATION_BRANCH
Changes since 1.1.2.1: +2 -1 lines
BUG#: 5191
TITLE: Queue Consolidation

DESCRIPTION: Queue Consolidation Branch Work

//%2006////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
// Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
// Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
// IBM Corp.; EMC Corporation, The Open Group.
// Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
// IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
// Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
// EMC Corporation; VERITAS Software Corporation; The Open Group.
// Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
// EMC Corporation; Symantec Corporation; The Open Group.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// 
// THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
// ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
//
//==============================================================================
//
// Author: Mike Day (mdday@us.ibm.com)
//
// Modified By: Amit K Arora, IBM (amita@in.ibm.com) for PEP#101
//              Alagaraja Ramasubramanian (alags_raj@in.ibm.com) for Bug#1090
//              Amit K Arora, IBM (amita@in.ibm.com) for Bug#2322
//              David Dillard, VERITAS Software Corp.
//                  (david.dillard@veritas.com)
//              Amit K Arora, IBM (amita@in.ibm.com) for Bug#2960
//
// Reworked By:
//              Mike Brasher (m.brasher@inovadevelopment.com)
//
//%/////////////////////////////////////////////////////////////////////////////

#ifndef Pegasus_AsyncQueue_h
#define Pegasus_AsyncQueue_h

#include <Pegasus/Common/IPC.h>
#include <Pegasus/Common/Linkage.h>
#include <Pegasus/Common/List.h>

PEGASUS_NAMESPACE_BEGIN

/** AsyncQueue implementation (formerly AsyncDQueue).
*/
template<class ElemType> 
class AsyncQueue
{
public:

    /** Constructor.
    */
    AsyncQueue(Uint32 capacity = 0);

    /** Destructor.
    */
    virtual ~AsyncQueue();

    /** Shutdownt the queue.
    */
    void shutdown_queue();

    /** Enqueue an element at the back of queue.
    */
    void enqueue(ElemType *element);

    /** Enqueue an element at the back of queue (wait for dequeue by another
	thread).
    */
    void enqueue_wait(ElemType *element);

    /** Dequeue an element from the front of the queue. Return null immediately
	if queue is empty.
    */
    ElemType *dequeue();

    /** Dequeue an element from the front of the queue (if there is no element
	on queue, wait until there is).
    */
    ElemType *dequeue_wait();

    /** Discard all the elements on the list. The size becomes zero afterwards.
    */
    void clear();

    /** Return number of element in queue.
    */
    Uint32 count() const { return _count.get(); }

    /** Return number of element in queue.
    */
    Uint32 size() const { return _count.get(); }

    /** Return the maximum number of elements permitted on queue at once.
    */
    Uint32 capacity() const { return _capacity.get(); }

    /** Return true if queue has reached its capacity.
    */
    Boolean is_full() const { /* never full */ return false; }

    /** Return true is queue is empty (has zero elements).
    */
    Boolean is_empty() const { return size() == 0; }

    /** Return true if the queue has been shutdown (in which case no new
        elements may be enqueued).
    */
    Boolean is_shutdown() const { return _disallow.get() > 0; }

    /** Attempt to lock the queue.
    */
    void try_lock(PEGASUS_THREAD_TYPE myself);

    /** Lock the queue.
    */
    void lock(PEGASUS_THREAD_TYPE myself);

    /** Unlock the queue.
    */
    void unlock(void) { _cond.unlock(); }

private:

    /**
        @exception  IPCException    Indicates an IPC error occurred.
    */
    void _insert_prep();

    /** @exception  IPCException    Indicates an IPC error occurred.
    */
    void _insert_recover();

    /** @exception  IPCException    Indicates an IPC error occurred.
    */
    void _unlink_prep();

    /** @exception  IPCException    Indicates an IPC error occurred.
    */
    void _unlink_recover();

    /** @exception  IPCException    Indicates an IPC error occurred.
    */
    ElemType *_remove_no_lock(const void *key);

    /** @exception  IPCException    Indicates an IPC error occurred.
    */
    ElemType *_remove_no_lock(const ElemType *key);

    static bool _equal_key(const ElemType* elem, const void* key)
    {
        return elem->operator==(key);
    }

    static bool _equal_object(const ElemType* elem, const void* object)
    {
        return elem->operator==(*((ElemType*)object));
    }

    Mutex _cond;
    Condition _slot;
    Condition _node;
    AtomicInt _count;
    AtomicInt _disallow;
    AtomicInt _capacity;
    typedef List<ElemType,NullLock> Rep;
    Rep _rep;
};

template<class ElemType> 
AsyncQueue<ElemType>::AsyncQueue(Uint32 capacity) :
    _slot(_cond), _node(_cond), _capacity(0)
{
}

template<class ElemType> 
AsyncQueue<ElemType>::~AsyncQueue()
{

}

template<class ElemType> 
void AsyncQueue<ElemType>::_insert_prep()
{
    if(_disallow.get() > 0)
    {
        unlock();
        throw ListClosed();
    }

    _slot.lock_object(pegasus_thread_self());
    while(true == is_full())
    {
        _slot.unlocked_wait(pegasus_thread_self());
        if(_disallow.get() > 0)
        {
            unlock();
            throw ListClosed();
        }
    }
}

template<class ElemType> 
void AsyncQueue<ElemType>::_insert_recover()
{
    _node.unlocked_signal(pegasus_thread_self());
    _count++;
    unlock();
}

template<class ElemType> 
void AsyncQueue<ElemType>::_unlink_prep()
{
    if(_disallow.get() > 0)
    {
        unlock();
        throw ListClosed();
    }
    _node.lock_object(pegasus_thread_self());
    while(true == is_empty())
    {
        _node.unlocked_wait(pegasus_thread_self());
       if(_disallow.get() > 0)
       {
           unlock();
           throw ListClosed();
       }
    }
}

template<class ElemType> 
void AsyncQueue<ElemType>::_unlink_recover()
{
    _slot.unlocked_signal(pegasus_thread_self());
    _count--;
    unlock();
}

template<class ElemType> 
ElemType* AsyncQueue<ElemType>::_remove_no_lock(const void *key)
{
    if(_disallow.get() > 0)
    {
        unlock();
        throw ListClosed();
    }

    if( pegasus_thread_self() != _cond.get_owner())
        throw Permission(pegasus_thread_self());

    return _rep.remove(_equal_key, key);
}

template<class ElemType> 
ElemType *AsyncQueue<ElemType>::_remove_no_lock(const ElemType *key)
{
    if(_disallow.get() > 0)
    {
        unlock();
        throw ListClosed();
    }
    if( pegasus_thread_self() != _cond.get_owner())
        throw Permission(pegasus_thread_self());

    return _rep.remove(_equal_object, key);
}

template<class ElemType> 
void AsyncQueue<ElemType>::shutdown_queue()
{
    try
    {
        lock(pegasus_thread_self());
        _disallow++;
        _node.disallow();
        _node.unlocked_signal(pegasus_thread_self());
        _node.unlocked_signal(pegasus_thread_self());
        _slot.disallow();
        _slot.unlocked_signal(pegasus_thread_self());
        _slot.unlocked_signal(pegasus_thread_self());
        unlock();
    }
    catch(const ListClosed &)
    {
        _disallow++;
    }
}

template<class ElemType> 
void AsyncQueue<ElemType>::try_lock(PEGASUS_THREAD_TYPE myself)
{
    if(_disallow.get() > 0)
    {
        throw ListClosed();
    }

    _cond.try_lock(myself);
}

template<class ElemType> 
void AsyncQueue<ElemType>::lock(PEGASUS_THREAD_TYPE myself)
{
    if(_disallow.get() > 0)
    {
       throw ListClosed();
    }
    _cond.lock(myself);
}

template<class ElemType> 
void AsyncQueue<ElemType>::enqueue(ElemType *element)
{
    if(element != 0)
    {
        lock(pegasus_thread_self());
        if(true == is_full())
        {
            unlock();
            throw ListFull(_capacity.get());
        }
        _rep.insert_back(element);
        _insert_recover();
    }
}

template<class ElemType> 
void AsyncQueue<ElemType>::enqueue_wait(ElemType *element)
{
    if(element != 0)
    {
        _insert_prep();
        _rep.insert_back(element);
        _insert_recover();
    }
}

template<class ElemType> 
void AsyncQueue<ElemType>::clear()
{
    lock(pegasus_thread_self());
    _rep.clear();
    _count = 0;
    _slot.unlocked_signal(pegasus_thread_self());
    unlock();
}

template<class ElemType> 
ElemType *AsyncQueue<ElemType>::dequeue()
{

    lock(pegasus_thread_self());
    ElemType *ret = _rep.remove_front();
    if(ret != 0)
    {
        _slot.unlocked_signal(pegasus_thread_self());
        _count--;
    }
    unlock();
    return ret;
}

template<class ElemType> 
ElemType *AsyncQueue<ElemType>::dequeue_wait()
{
    _unlink_prep();
    ElemType *ret = _rep.remove_front();
    _unlink_recover();
    return ret;
}

PEGASUS_NAMESPACE_END

#endif /* Pegasus_AsyncQueue_h */

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2