(file) Return to IPC.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

File: [Pegasus] / pegasus / src / Pegasus / Common / Attic / IPC.cpp (download)
Revision: 1.19, Mon Oct 28 19:56:06 2002 UTC (21 years, 8 months ago) by kumpf
Branch: MAIN
CVS Tags: pep_88, mday-merge-start, mday-merge-pegasus/src/Pegasus/Server, mday-merge-pegasus/src/Pegasus/Common, VERSION_2_1_RELEASE_HEAD, VERSION_2_1_RELEASE_BRANCH, VERSION_2_1_RELEASE, VERSION_2_1_1_RELEASE, RELEASE_2_3_0-msg-freeze, RELEASE_2_2_1-snapshot, RELEASE_2_2_0_0-release, RELEASE_2_2_0-root, RELEASE_2_2_0-branch, RELEASE_2_2-root, PEGASUS_FC_VERSION_2_2, LOCAL_ASSOCPROV-ROOT, LOCAL_ASSOCPROV-BRANCH
Changes since 1.18: +9 -0 lines
HP-RK Implement the AtomicInt::DecAndTestIfZero() method for the generic implementation of AtomicInt.

//%/////////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
// The Open Group, Tivoli Systems
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to
// deal in the Software without restriction, including without limitation the
// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
// sell copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
// 
// THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
// ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
//
//==============================================================================
//
// Author: Markus Mueller (sedgewick_de@yahoo.de)
//
// Modified By: Mike Day (mdday@us.ibm.com)
//
//%/////////////////////////////////////////////////////////////////////////////

#include "IPC.h"

#if defined(PEGASUS_OS_TYPE_WINDOWS)
# include "IPCWindows.cpp"
#elif defined(PEGASUS_OS_TYPE_UNIX)
# include "IPCUnix.cpp" 
#else
# error "Unsupported platform"
#endif

#include "InternalException.h"

PEGASUS_NAMESPACE_BEGIN

int timeval_subtract (struct timeval *result, 
		      struct timeval *x, 
		      struct timeval *y)
{
   /* Perform the carry for the later subtraction by updating Y. */
   if (x->tv_usec < y->tv_usec) {
      int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1;
      y->tv_usec -= 1000000 * nsec;
      y->tv_sec += nsec;
   }
   if (x->tv_usec - y->tv_usec > 1000000) {
      int nsec = (x->tv_usec - y->tv_usec) / 1000000;
      y->tv_usec += 1000000 * nsec;
      y->tv_sec -= nsec;
   }
   
   /* Compute the time remaining to wait.
      `tv_usec' is certainly positive. */
   result->tv_sec = x->tv_sec - y->tv_sec;
   result->tv_usec = x->tv_usec - y->tv_usec;
   
   /* Return 1 if result is negative. */
   return x->tv_sec < y->tv_sec;
}



//-----------------------------------------------------------------
/// Generic Implementation of read/write semaphore class
//-----------------------------------------------------------------
#ifndef PEGASUS_READWRITE_NATIVE 


// // If i get cancelled, I MUST ensure:
// 1) I do not hold the internal mutex
// 2) I do not hold the write lock
// 3) I am not using a reader slot

void extricate_read_write(void *parm)
{
   ReadWriteSem *rws = (ReadWriteSem *)parm;
   PEGASUS_THREAD_TYPE myself = pegasus_thread_self();
   
   if(rws->_rwlock._wlock.get_owner() == myself)
      rws->_rwlock._wlock.unlock();
   else
      if(rws->_readers > 0)
	 rws->_rwlock._rlock.signal();
   if (rws->_rwlock._internal_lock.get_owner() == myself)
      rws->_rwlock._internal_lock.unlock();
}
 

ReadWriteSem::ReadWriteSem(void) : _readers(0), _writers(0), _rwlock() {}

ReadWriteSem::~ReadWriteSem(void)
{
   // lock everyone out of this object
   try 
   {
      _rwlock._internal_lock.lock(pegasus_thread_self());
   }
   catch(Deadlock& d) 
   {
      d = d; // no problem - we own the lock, which is what we want
   }
   catch(IPCException& ) 
   {
 
      PEGASUS_ASSERT(0); 
   }
   while(_readers > 0 || _writers > 0) 
   {
      pegasus_yield();
   }
   _rwlock._internal_lock.unlock();
}

//-----------------------------------------------------------------
// if milliseconds == -1, wait indefinately
// if milliseconds == 0, fast wait 
//-----------------------------------------------------------------
void ReadWriteSem::timed_wait(Uint32 mode, PEGASUS_THREAD_TYPE caller, int milliseconds) 
   throw(TimeOut, Deadlock, Permission, WaitFailed, TooManyReaders)
{

//-----------------------------------------------------------------
// Lock this object to maintain integrity while we decide 
// exactly what to do next.
//-----------------------------------------------------------------
   IPCException * caught = NULL;
   
   { // cleanup stack frame 
      native_cleanup_push(extricate_read_write, this);
      try
      {
	 if(milliseconds == 0)
	    _rwlock._internal_lock.try_lock(pegasus_thread_self());
	 else if(milliseconds == -1)
	    _rwlock._internal_lock.lock(pegasus_thread_self()); 
	 else
	    _rwlock._internal_lock.timed_lock(milliseconds, pegasus_thread_self());
      }
      catch(IPCException& e)
      {
	 caught = &e;
	 goto throw_from_here;
      }
      
      if(mode == PEG_SEM_WRITE)
      {
//-----------------------------------------------------------------
// Write Lock Step 1: lock the object and allow all the readers to exit
//-----------------------------------------------------------------


	 if(milliseconds == 0) // fast wait
	 {
	    if(_readers > 0)
	    {
	       _rwlock._internal_lock.unlock();
	       caught = new WaitFailed(pegasus_thread_self());
	       goto throw_from_here;
	    }
	 }
	 else if(milliseconds == -1) // infinite wait
	 {
	    while(_readers > 0 )
	       pegasus_yield();
	 }
	 else // timed wait 
	 {
	    struct timeval start, now;
	    gettimeofday(&start, NULL);
	    start.tv_usec += (1000 * milliseconds);
	    while(_readers > 0)
	    {
	       gettimeofday(&now, NULL);
	       if((now.tv_usec > start.tv_usec) || now.tv_sec > start.tv_sec )
	       {
		  _rwlock._internal_lock.unlock();
		  caught = new TimeOut(pegasus_thread_self());
		  goto throw_from_here;
	       }
	       pegasus_yield();
	    }
	 }
//-----------------------------------------------------------------
// Write Lock Step 2: Obtain the Write Mutex
//  Although there are no readers, there may be a writer
//-----------------------------------------------------------------
	 if(milliseconds == 0) // fast wait
	 {
	    try 
	    {
	       _rwlock._wlock.try_lock(pegasus_thread_self());
	    }
	    catch(IPCException e) 
	    {
	       _rwlock._internal_lock.unlock();
	       caught = &e;
	       goto throw_from_here;
	    }
	 }
	 else if(milliseconds == -1) // infinite wait
	 {
	    try 
	    {
	       _rwlock._wlock.lock(pegasus_thread_self());
	    }
	    catch (IPCException& e) 
	    {
	       _rwlock._internal_lock.unlock();
	       caught = &e;
	       goto throw_from_here;
	    }
	 }
	 else // timed wait
	 {
	    try
	    {
	       _rwlock._wlock.timed_lock(milliseconds, pegasus_thread_self());
	    }
	    catch(IPCException& e)
	    {
	       _rwlock._internal_lock.unlock();
	       caught = &e;
	       goto throw_from_here;
	    }
	 }
      
//-----------------------------------------------------------------      
// Write Lock Step 3: set the writer count to one, unlock the object
//   There are no readers and we are the only writer !
//-----------------------------------------------------------------      
	 _writers = 1;
	 // set the owner
	 _rwlock._owner = pegasus_thread_self();
	 // unlock the object
	 _rwlock._internal_lock.unlock();
      } // PEG_SEM_WRITE
      else
      {
//-----------------------------------------------------------------
// Read Lock Step 1: Wait for the existing writer (if any) to clear
//-----------------------------------------------------------------
	 if(milliseconds == 0) // fast wait
	 {
	    if(_writers > 0)
	    {
	       _rwlock._internal_lock.unlock();
	       caught = new WaitFailed(pegasus_thread_self());
	       goto throw_from_here;
	    }
	 }
	 else if(milliseconds == -1) // infinite wait
	 {
	    while(_writers > 0)
	       pegasus_yield(); 
	 }
 	 else // timed wait
	 {
	    struct timeval start, now;
	    gettimeofday(&start, NULL);
	    start.tv_usec += (milliseconds * 1000);
	    
	    while(_writers > 0)
	    {
	       gettimeofday(&now, NULL);
	       if((now.tv_usec > start.tv_usec) || (now.tv_sec > start.tv_sec))
	       {
		  _rwlock._internal_lock.unlock();
		  caught = new TimeOut(pegasus_thread_self());
		  goto throw_from_here;
	       }
	       pegasus_yield();
	       pegasus_gettimeofday(&now);
	    }
	 }
      
//-----------------------------------------------------------------
// Read Lock Step 2: wait for a reader slot to open up, then return
//  At this point there are no writers, but there may be too many
//  readers.
//-----------------------------------------------------------------
	 if(milliseconds == 0) // fast wait
	 {
	    try 
	    {
	       _rwlock._rlock.try_wait();  
	    }
	    catch(IPCException& e) 
	    {
	       // the wait failed, there must be too many readers already. 
	       // unlock the object
	      e = e;
	       _rwlock._internal_lock.unlock();
	       caught = new TooManyReaders(pegasus_thread_self());
	    }
	 }
	 else if(milliseconds == -1) // infinite wait
	 {
	    try 
	    {
	       _rwlock._rlock.wait(); 
	    }
	    catch(IPCException& e)
	    {
	       _rwlock._internal_lock.unlock();
	       caught = &e;
	       goto throw_from_here;
	    }
	 }      
	 else // timed wait
	 {
	    try 
	    {
	       _rwlock._rlock.time_wait(milliseconds);
	    }
	    catch(IPCException& e)
	    {
	       _rwlock._internal_lock.unlock();
	       caught = &e;
	       goto throw_from_here;
	    }
	 }
      
//-----------------------------------------------------------------      
// Read Lock Step 3: increment the number of readers, unlock the object, 
// return
//-----------------------------------------------------------------
	 _readers++;
	 _rwlock._internal_lock.unlock();
      }
  throw_from_here:
      native_cleanup_pop(0);
   } // cleanup stack frame 
   if(caught != NULL)
      throw(*caught);
   return;
}

void ReadWriteSem::wait(Uint32 mode, PEGASUS_THREAD_TYPE caller) 
   throw(Deadlock, Permission, WaitFailed, TooManyReaders)
{
   timed_wait(mode, caller, -1);
}

void ReadWriteSem::try_wait(Uint32 mode, PEGASUS_THREAD_TYPE caller) 
   throw(Deadlock, Permission, WaitFailed, TooManyReaders)
{
   timed_wait(mode, caller, 0);
}


void ReadWriteSem::unlock(Uint32 mode, PEGASUS_THREAD_TYPE caller) 
   throw(Permission)
{
   if(mode == PEG_SEM_WRITE && _writers.value() != 0 )
   {
      _writers = 0;
      _rwlock._wlock.unlock();
   }
   else if (_readers.value() != 0 )
   {
      _readers--;
      _rwlock._rlock.signal();
   }
}

int ReadWriteSem::read_count()

{
   return( _readers.value() );
}

int ReadWriteSem::write_count()
{
   return( _writers.value() );
}

#endif // ! PEGASUS_READWRITE_NATIVE
//-----------------------------------------------------------------
// END of generic read/write semaphore implementaion
//-----------------------------------------------------------------



//-----------------------------------------------------------------
/// Generic Implementation of Atomic Integer class
//-----------------------------------------------------------------

#ifndef PEGASUS_ATOMIC_INT_NATIVE

AtomicInt::AtomicInt() {_rep._value = 0;  }

AtomicInt::AtomicInt(Uint32 initial) {_rep._value = initial; }

AtomicInt::~AtomicInt()
{

}

AtomicInt::AtomicInt(const AtomicInt& original)
{
    _rep._value = original._rep._value;
} 

AtomicInt& AtomicInt::operator=(const AtomicInt& original )
{
    // to avoid deadlocks, always be certain to only hold one mutex at a time. 
    // therefore, get the original value (which will lock and unlock the original's mutex)
    // and _then_ lock this mutex. This pattern is repeated throughout the class
  
    Uint32 temp = original._rep._value;
    _rep._mutex.lock(pegasus_thread_self());
    _rep._value = temp;
    _rep._mutex.unlock();
    return *this;
}

AtomicInt& AtomicInt::operator=(Uint32 val)
{
    _rep._mutex.lock(pegasus_thread_self());
    _rep._value = val;
    _rep._mutex.unlock();
    return *this;
}

Uint32 AtomicInt::value(void) const
{
    _rep._mutex.lock(pegasus_thread_self());
    Uint32 retval = _rep._value;
    _rep._mutex.unlock();
    return retval;
}

void AtomicInt::operator++(void)
{
    _rep._mutex.lock(pegasus_thread_self());
    _rep._value++;
    _rep._mutex.unlock();
}

void AtomicInt::operator++(int)
{
    _rep._mutex.lock(pegasus_thread_self());
    _rep._value++;
    _rep._mutex.unlock();
}

void AtomicInt::operator--(void)
{
    _rep._mutex.lock(pegasus_thread_self());
    _rep._value--;
    _rep._mutex.unlock();
}

void AtomicInt::operator--(int)
{
    _rep._mutex.lock(pegasus_thread_self());
    _rep._value--;
    _rep._mutex.unlock();
}

Uint32 AtomicInt::operator+(const AtomicInt& val)
{
    // never acquire a mutex while holding a mutex 
    Uint32 retval = val._rep._value; 
    _rep._mutex.lock(pegasus_thread_self());
    retval += _rep._value ;
    _rep._mutex.unlock();
    return retval;
}

Uint32 AtomicInt::operator+(Uint32 val)
{
    _rep._mutex.lock(pegasus_thread_self());
    Uint32 retval = _rep._value + val;
    _rep._mutex.unlock();
    return retval;
}

Uint32 AtomicInt::operator-(const AtomicInt& val)
{
    // never acquire a mutex while holding a mutex
    Uint32 retval =  val._rep._value;
    _rep._mutex.lock(pegasus_thread_self());
    retval += _rep._value;
    _rep._mutex.unlock();
    return retval;
}

Uint32 AtomicInt::operator-(Uint32 val)
{
    _rep._mutex.lock(pegasus_thread_self());
    Uint32 retval = _rep._value - val;
    _rep._mutex.unlock();
    return retval;
}


AtomicInt& AtomicInt::operator+=(const AtomicInt& val)
{
    // never acquire a mutex while holding a mutex
    Uint32 temp = val._rep._value;
    _rep._mutex.lock(pegasus_thread_self());
    _rep._value += temp;
    _rep._mutex.unlock();
    return *this;
}
AtomicInt& AtomicInt::operator+=(Uint32 val)
{
    // never acquire a mutex while holding a mutex
    _rep._mutex.lock(pegasus_thread_self());
    _rep._value += val;
    _rep._mutex.unlock();
    return *this;
}

AtomicInt& AtomicInt::operator-=(const AtomicInt& val)
{
    // never acquire a mutex while holding a mutex
    Uint32 temp = val._rep._value;
    _rep._mutex.lock(pegasus_thread_self());
    _rep._value -= temp;
    _rep._mutex.unlock();
    return *this;
}

AtomicInt& AtomicInt::operator-=(Uint32 val)
{
    // never acquire a mutex while holding a mutex
    _rep._mutex.lock(pegasus_thread_self());
    _rep._value -= val;
    _rep._mutex.unlock();
    return *this;
}

Boolean AtomicInt::DecAndTestIfZero()
{
    _rep._mutex.lock(pegasus_thread_self());
    _rep._value--;
    Boolean b = (_rep._value == 0);
    _rep._mutex.unlock();
    return b;
}

#endif // ! PEGASUS_ATOMIC_INT_NATIVE
//-----------------------------------------------------------------
// END of generic atomic integer implementation
//-----------------------------------------------------------------



//-----------------------------------------------------------------
// Generic implementation of conditional semaphore object 
//-----------------------------------------------------------------
#ifndef PEGASUS_CONDITIONAL_NATIVE


// may be entered by many different threads concurrently
// ensure that, upon exit, I OWN the mutex 
// and I DO NOT own the critical section

void extricate_condition(void *parm)
{
   Condition *c = (Condition *)parm;

   // if I own the critical section, release it
   if(pegasus_thread_self() == c->_condition._spin.get_owner())
      c->_condition._spin.unlock();
   // if I DO NOT own the mutex, obtain it
   if(pegasus_thread_self() != c->_cond_mutex->get_owner())
      c->_cond_mutex->lock(pegasus_thread_self());
}

Condition::Condition(void) : _disallow(0), _condition()
{ 
   _cond_mutex = new Mutex();
   _destroy_mut = true;
} 

Condition::Condition(const Mutex & mutex) : _disallow(0), _condition()
{
   _cond_mutex = const_cast<Mutex *>(&mutex);
   _destroy_mut = false;
}


Condition::~Condition(void)
{
   cond_waiter *lingerers;
   // don't allow any new waiters
   _disallow++;
   _condition._spin.lock(pegasus_thread_self());

   while(NULL != (lingerers = static_cast<cond_waiter *>(_condition._waiters.remove_last())))
   {
      lingerers->signalled.signal();
   }
   _condition._spin.unlock();
   while( _condition._waiters.count())   {
      pegasus_yield();
   }
   if(_destroy_mut == true)
      delete _cond_mutex;
}

void Condition::signal(PEGASUS_THREAD_TYPE caller)
   throw(IPCException)
{
   _cond_mutex->lock(caller);
   
   try
   {
      unlocked_signal(caller);
   }
   catch(...)
   {
      _cond_mutex->unlock();
      throw;
   }
      _cond_mutex->unlock();
}

void Condition::unlocked_signal(PEGASUS_THREAD_TYPE caller)
   throw(IPCException)
{
      // enforce that the caller owns the conditional lock
   if(_cond_mutex->_mutex.owner != caller)
      throw(Permission((PEGASUS_THREAD_TYPE)caller));

   // lock the internal list
   _condition._spin.lock(caller);
   if (_condition._waiters.count() > 0) 
   {
      cond_waiter *waiters = static_cast<cond_waiter *>(_condition._waiters.next(0));
      while( waiters != 0) 
      {
	 waiters->signalled.signal();
	 waiters = static_cast<cond_waiter *>(_condition._waiters.next(waiters));
      }
   }
   _condition._spin.unlock();
}

void Condition::lock_object(PEGASUS_THREAD_TYPE caller) 
   throw(IPCException)
{
   if(_disallow.value() > 0) 
      throw ListClosed();
   _cond_mutex->lock(caller);
}

void Condition::try_lock_object(PEGASUS_THREAD_TYPE caller)
   throw(IPCException)
{
   if(_disallow.value() > 0 ) 
      throw ListClosed();
   _cond_mutex->try_lock(caller);
}

void Condition::wait_lock_object(PEGASUS_THREAD_TYPE caller, int milliseconds)
   throw(IPCException)
{
   if(_disallow.value() > 0) 
      throw ListClosed();
   _cond_mutex->timed_lock(milliseconds, caller);
}

void Condition::unlock_object(void)
{
   _cond_mutex->unlock();
}

void Condition::unlocked_wait(PEGASUS_THREAD_TYPE caller) 
   throw(IPCException)
{
   unlocked_timed_wait(-1, caller);
}

void Condition::unlocked_timed_wait(int milliseconds, PEGASUS_THREAD_TYPE caller) 
   throw(IPCException)
{
   if(_disallow.value() > 0)
   {
      _cond_mutex->unlock();
      throw ListClosed();
   }
   // enforce that the caller owns the conditional lock
   if(_cond_mutex->_mutex.owner != caller)
      throw Permission((PEGASUS_THREAD_TYPE)caller);
   cond_waiter *waiter = new cond_waiter(caller, milliseconds);
   {
      native_cleanup_push(extricate_condition, this);
      // lock the internal list
      _condition._spin.lock(caller);
      _condition._waiters.insert_first(waiter);
      // unlock the condition mutex 
      _cond_mutex->unlock();
      _condition._spin.unlock();
      if(milliseconds == -1)
	 waiter->signalled.wait();
      else
	 try 
	 {
	    waiter->signalled.time_wait(milliseconds);
	 }
	 catch(TimeOut &)
	 {
	    _cond_mutex->lock(caller);
	    throw;
	 }
      
      _condition._spin.lock(caller);  
      _condition._waiters.remove(waiter);
      _condition._spin.unlock();
      delete waiter;
      _cond_mutex->lock(caller);
      native_cleanup_pop(0);
   }
   return;
}




#endif // PEGASUS_CONDITIONAL_NATIVE
//-----------------------------------------------------------------
// END of generic implementation of conditional semaphore object 
//-----------------------------------------------------------------

PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2