(file) Return to DQueue.h CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.41 //%2003////////////////////////////////////////////////////////////////////////
   2 kumpf 1.34 //
   3 karl  1.41 // Copyright (c) 2000, 2001, 2002  BMC Software, Hewlett-Packard Development
   4            // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
   5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
   6            // IBM Corp.; EMC Corporation, The Open Group.
   7 mike  1.2  //
   8            // Permission is hereby granted, free of charge, to any person obtaining a copy
   9            // of this software and associated documentation files (the "Software"), to
  10            // deal in the Software without restriction, including without limitation the
  11            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  12            // sell copies of the Software, and to permit persons to whom the Software is
  13            // furnished to do so, subject to the following conditions:
  14 kumpf 1.34 // 
  15 mike  1.2  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  16            // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  17            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  18            // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  19            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  20            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  21            // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  22            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  23            //
  24            //==============================================================================
  25            //
  26            // Author: Mike Day (mdday@us.ibm.com)
  27            //
  28            // Modified By:
  29            //
  30            //%/////////////////////////////////////////////////////////////////////////////
  31            
  32            #ifndef Pegasus_DQueue_h
  33            #define Pegasus_DQueue_h
  34            
  35            #include <Pegasus/Common/IPC.h>
  36 kumpf 1.36 #include <Pegasus/Common/Linkage.h>
  37 mike  1.2  
  38            PEGASUS_NAMESPACE_BEGIN
  39            
  40 tony  1.38 template<class L> class DQueue : public internal_dq
  41 mike  1.2  {
  42 mday  1.29    public:
  43                   static void *operator new(size_t size);
  44                   static void operator delete(void *dead, size_t size);
  45 mday  1.20             
  46 mike  1.2     private: 
  47                  Mutex *_mutex;
  48 mday  1.18       AtomicInt *_actual_count;
  49 mday  1.20       DQueue *_dq_next;
  50 mday  1.23       static DQueue<L> *_headOfFreeList;
  51 mday  1.20       static const int BLOCK_SIZE;
  52                  static Mutex _alloc_mut;
  53 mday  1.18       
  54 mike  1.2     public:
  55 mday  1.18       typedef internal_dq Base;
  56 mday  1.23       DQueue(void);
  57                  DQueue(Boolean head) ;
  58 mike  1.2  
  59 mday  1.23       virtual ~DQueue() ;
  60 mike  1.2  
  61 mday  1.23       void lock(void) throw(IPCException);
  62                  void unlock(void) throw(IPCException);
  63                  void try_lock() throw(IPCException);
  64            
  65                  void insert_first_no_lock(L *element) throw(IPCException);
  66                  void insert_first(L *element) throw(IPCException) ;
  67                  void insert_last_no_lock(L *element) throw(IPCException);
  68                  void insert_last(L *element) throw(IPCException);
  69                  void empty_list( void ) throw(IPCException) ;
  70                  L *remove_first ( void ) throw(IPCException) ;
  71                  L *remove_last ( void ) throw(IPCException) ;
  72                  L *remove_no_lock(const void *key) throw(IPCException);
  73                  L *remove_no_lock(const L *key) throw(IPCException);
  74                  L *remove(const void *key) throw(IPCException);
  75                  L *remove(const L *key) throw(IPCException);
  76                  L *reference(const void *key) throw(IPCException);
  77                  L *reference(const L *key);
  78                  L *next( const void * ref) throw(IPCException);
  79                  L *prev( const void *ref) throw(IPCException);
  80                  Boolean exists(const void *key) throw(IPCException) ;
  81                  Boolean exists(const L *key) throw(IPCException);
  82 mday  1.18       
  83                  Uint32 count(void) { return _actual_count->value() ; }
  84 mday  1.37       Uint32 size(void) { return _actual_count->value() ; }
  85            	
  86            	  
  87 mike  1.2  } ;
  88            
  89 mday  1.22  
  90 mday  1.20 
  91 tony  1.38 template<class L> class AsyncDQueue: public internal_dq
  92 mike  1.2  {
  93 mday  1.20 
  94 mday  1.29    public:
  95                  static void * operator new(size_t size);
  96                  static void operator delete(void *, size_t);
  97 mday  1.20       
  98 mike  1.2     private: // asyncdqueue
  99            
 100                  Mutex *_cond;
 101                  Condition *_slot;
 102                  Condition *_node;
 103                  AtomicInt *_actual_count;
 104                  AtomicInt *_disallow;
 105                  AtomicInt * _capacity;
 106 mday  1.20       AsyncDQueue *_dq_next;
 107 mike  1.2  
 108 mday  1.20       static AsyncDQueue *_headOfFreeList;
 109                  static const int BLOCK_SIZE;
 110                  static Mutex _alloc_mut;
 111                  
 112 mday  1.23       void _insert_prep(void) throw(IPCException);
 113                  void _insert_recover(void) throw(IPCException);
 114                  void _unlink_prep(void) throw(IPCException);
 115                  void _unlink_recover(void) throw(IPCException);
 116                  L *_remove_no_lock(const void *key) throw(IPCException);
 117                  L *_remove_no_lock(const L *key) throw(IPCException);
 118 mday  1.6  
 119 mike  1.2     public:
 120            
 121                  typedef internal_dq Base;
 122                  
 123 mday  1.23       AsyncDQueue(void) ;
 124                  AsyncDQueue(Boolean head, Uint32 capacity );
 125                  virtual ~AsyncDQueue(void);
 126                  void shutdown_queue(void);
 127                  Boolean is_full(void);
 128                  Boolean is_empty(void);
 129                  Boolean is_shutdown(void);
 130                  void try_lock(PEGASUS_THREAD_TYPE myself) throw(IPCException);
 131                  void lock(PEGASUS_THREAD_TYPE myself) throw(IPCException);
 132                  void unlock(void);
 133 mday  1.30       void signal_slot(void) throw(IPCException);
 134                  void signal_node(void) throw(IPCException);
 135                  Condition *get_node_cond(void);
 136                  Condition *get_slot_cond(void);
 137 mday  1.23       void wait_for_node(void) throw(IPCException);
 138                  void set_capacity(Uint32 capacity) throw(IPCException);
 139                  Uint32 get_capacity(void);
 140                  void insert_first(L *element) throw(IPCException);
 141                  void insert_first_wait(L *element) throw(IPCException);
 142                  void insert_last(L *element) throw(IPCException);
 143                  void insert_last_wait(L *element) throw(IPCException);
 144                  void empty_list(void) throw(IPCException);
 145                  L *remove_first(void) throw(IPCException);
 146                  L *remove_first_wait(void) throw(IPCException);
 147                  L *remove_last(void) throw(IPCException);
 148                  L *remove_last_wait(void) throw(IPCException);
 149                  L *remove(const void *key) throw(IPCException);
 150                  L *remove(const L *key) throw(IPCException);
 151                  L *remove_no_lock(const void *key) throw(IPCException);
 152                  L *remove_no_lock(const L *key) throw(IPCException);
 153                  L *remove_wait(const void *key) throw(IPCException);
 154                  L *next(const L *ref) throw(IPCException);
 155                  L *prev(const L *ref) throw(IPCException);
 156                  L *reference(const void *key) throw(IPCException);
 157                  L *reference(const L *key) throw(IPCException);
 158 mday  1.23       Uint32 count(void) ;
 159 mday  1.37       Uint32 size(void) ;
 160                  
 161 mike  1.2  };
 162                  
 163 keith.petley 1.39 template<class L> DQueue<L> * DQueue<L>::_headOfFreeList = 0;
 164 mday         1.29 template<class L> const int DQueue<L>::BLOCK_SIZE = 200;
 165 mday         1.25 template<class L> Mutex DQueue<L>::_alloc_mut;
 166                   
 167 mday         1.29 template<class L> void *DQueue<L>::operator new(size_t size)
 168                   {
 169                      if (size != sizeof(DQueue<L>))
 170                         return ::operator new(size);
 171                      
 172                      _alloc_mut.lock(pegasus_thread_self());
 173                      
 174                      DQueue<L> *p = _headOfFreeList;
 175                      if(p)
 176                         _headOfFreeList = p->_dq_next;
 177                      else
 178                      {
 179                         DQueue<L> * newBlock = 
 180                   	 static_cast<DQueue<L> *>(::operator new(BLOCK_SIZE * sizeof(DQueue<L>)));
 181                         int i;
 182                         for( i = 1; i < BLOCK_SIZE - 1; ++i)
 183                   	 newBlock[i]._dq_next = &newBlock[i + 1];
 184                         newBlock[BLOCK_SIZE - 1]._dq_next = 0;
 185                         
 186                         p = newBlock;
 187                         _headOfFreeList = &newBlock[1];
 188 mday         1.29    }
 189                      _alloc_mut.unlock();
 190                      
 191                      return p;
 192                   }
 193                   
 194                   template<class L> void DQueue<L>::operator delete(void *dead, size_t size)
 195                   {
 196                      if(dead == 0)
 197                         return;
 198                      if(size != sizeof(DQueue<L>))
 199                      {
 200                         ::operator delete(dead);
 201                         return;
 202                      }
 203                      DQueue<L> *p = static_cast<DQueue<L> *>(dead);
 204                      _alloc_mut.lock(pegasus_thread_self());
 205                      p->_dq_next = _headOfFreeList;
 206                      _headOfFreeList = p;
 207                      _alloc_mut.unlock();
 208                   }
 209 mday         1.25 
 210 keith.petley 1.39 template<class L> AsyncDQueue<L> * AsyncDQueue<L>::_headOfFreeList =0;
 211 mday         1.25 template<class L> const int AsyncDQueue<L>::BLOCK_SIZE = 20;
 212                   template<class L> Mutex AsyncDQueue<L>::_alloc_mut;
 213                   
 214 mday         1.29 template<class L> void * AsyncDQueue<L>::operator new(size_t size)
 215                   {
 216                      if (size != sizeof(AsyncDQueue<L>))
 217                         return ::operator new(size);
 218                      
 219                      _alloc_mut.lock(pegasus_thread_self());
 220                      
 221                      AsyncDQueue<L> *p = _headOfFreeList;
 222                      if(p)
 223                          _headOfFreeList = p->_dq_next;
 224                      else
 225                      {
 226                         AsyncDQueue<L> * newBlock = 
 227                   	 static_cast<AsyncDQueue<L> *>(::operator new(BLOCK_SIZE * sizeof(AsyncDQueue<L>)));
 228                         int i;
 229                         for( i = 1; i < BLOCK_SIZE - 1; ++i)
 230                   	 newBlock[i]._dq_next = &newBlock[i + 1];
 231                         newBlock[BLOCK_SIZE - 1]._dq_next = 0;
 232                         
 233                         p = newBlock;
 234                         _headOfFreeList = &newBlock[1];
 235 mday         1.29    }
 236                      _alloc_mut.unlock();
 237                      
 238                      return p;
 239                   }
 240                   
 241                   template<class L> void AsyncDQueue<L>::operator delete(void *deadObject, size_t size)
 242                   {
 243                      if(deadObject == 0)
 244                         return;
 245                      if(size != sizeof(AsyncDQueue<L>))
 246                      {
 247                         ::operator delete(deadObject);
 248                         return;
 249                      }
 250                      AsyncDQueue<L> *p = static_cast<AsyncDQueue<L> *>(deadObject);
 251                      _alloc_mut.lock(pegasus_thread_self());
 252                      p->_dq_next = _headOfFreeList;
 253                      _headOfFreeList = p;
 254                      _alloc_mut.unlock();
 255                   }
 256 mday         1.24 
 257                   template<class L> DQueue<L>::DQueue(void) 
 258                      : Base(false)
 259                   { 
 260                      _mutex = 0;
 261                      _actual_count = 0;
 262                   }
 263                   
 264                   template<class L> DQueue<L>::DQueue(Boolean head) 
 265                      :  Base(head)
 266                   {
 267                      if(head == true)
 268                      {
 269                         _mutex = new Mutex();
 270                         _actual_count = new AtomicInt(0);
 271                      }
 272                      
 273                      else
 274                      {
 275                         _mutex = 0;
 276                         _actual_count = 0;
 277 mday         1.24    }
 278                   }
 279                   
 280                   
 281                   template<class L> DQueue<L>::~DQueue() 
 282                   { 
 283                      if(_mutex != 0) delete _mutex; 
 284                      if (_actual_count != 0) delete _actual_count;
 285                   }
 286                   
 287                   
 288                   template<class L> void DQueue<L>::lock(void) throw(IPCException) 
 289                   { _mutex->lock(pegasus_thread_self()); }
 290                   
 291                   template<class L> void DQueue<L>::unlock(void) throw(IPCException) 
 292                   { _mutex->unlock() ; }
 293                   
 294                   template<class L> void DQueue<L>::try_lock() throw(IPCException) 
 295                   {  _mutex->try_lock(pegasus_thread_self()); }
 296                   
 297                   template<class L> void DQueue<L>::insert_first_no_lock(L *element) throw(IPCException)
 298 mday         1.24 {
 299                      if( pegasus_thread_self() != _mutex->get_owner())
 300                         throw Permission(pegasus_thread_self());
 301                      Base::insert_first(static_cast<void *>(element));
 302                      (*_actual_count)++;
 303                      
 304                      return;
 305                   }
 306                   
 307                   template<class L> void DQueue<L>::insert_first(L *element) throw(IPCException) 
 308                   {
 309                      if(element == 0)
 310                         return;
 311                      _mutex->lock(pegasus_thread_self());
 312                      Base::insert_first(static_cast<void *>(element));
 313                      (*_actual_count)++;
 314                      _mutex->unlock();
 315                   }
 316                   
 317                   template<class L> void DQueue<L>::insert_last_no_lock(L *element) throw(IPCException)
 318                   {
 319 mday         1.24    if( pegasus_thread_self() != _mutex->get_owner())
 320                         throw Permission(pegasus_thread_self());
 321                      Base::insert_last(static_cast<void *>(element));
 322                      (*_actual_count)++;
 323                      return;
 324                   }
 325                   
 326                   
 327                   template<class L> void DQueue<L>::insert_last(L *element) throw(IPCException)
 328                   {
 329                      if(element == 0)
 330                         return;
 331                      _mutex->lock(pegasus_thread_self());
 332                      Base::insert_last(static_cast<void *>(element));
 333                      (*_actual_count)++;
 334                      _mutex->unlock();
 335                   }
 336                   
 337                   
 338                   template<class L> void DQueue<L>::empty_list( void ) throw(IPCException) 
 339                   {
 340 mday         1.24    if( Base::count() > 0) {
 341                         _mutex->lock(pegasus_thread_self()); 
 342                         Base::empty_list();
 343                         (*_actual_count) = 0;
 344                         _mutex->unlock();
 345                      }
 346                      return;
 347                   }
 348                   
 349                   
 350                   template<class L> L * DQueue<L>::remove_first ( void ) throw(IPCException) 
 351                   { 
 352                      L *ret = 0;
 353                      
 354                      if( _actual_count->value() )
 355                      {
 356                         _mutex->lock(pegasus_thread_self());
 357                         ret = static_cast<L *>(Base::remove_first());
 358                         if( ret != 0 )
 359                   	 (*_actual_count)--;
 360                         _mutex->unlock();
 361 mday         1.24    }
 362                      return ret;
 363                   }
 364                   
 365                   template<class L> L *DQueue<L>::remove_last ( void ) throw(IPCException) 
 366                   { 
 367                      L * ret = 0;
 368                      if( _actual_count->value() )
 369                      {
 370                         _mutex->lock(pegasus_thread_self());
 371                         ret = static_cast<L *>(Base::remove_last());
 372                         if( ret != 0 )
 373                   	 (*_actual_count)--;
 374                         _mutex->unlock();
 375                      }
 376                      return ret;
 377                   }
 378                   
 379                   template<class L> L *DQueue<L>::remove_no_lock(const void *key) throw(IPCException)
 380                   {
 381                      if(key == 0 )
 382 mday         1.24       return 0;
 383                      if( pegasus_thread_self() != _mutex->get_owner())
 384                         throw Permission(pegasus_thread_self());
 385                      
 386                      if (_actual_count->value() )
 387                      {
 388                         L *ret = static_cast<L *>(Base::next(0));
 389                         while( ret != 0 )
 390                         {
 391                   	 if (ret->operator==(key))
 392                   	 {
 393 kumpf        1.31 	    ret = static_cast<L *>(Base::remove(ret));
 394 mday         1.24 	    if( ret != 0 )
 395                   	       (*_actual_count)--;
 396                   		  return ret;
 397                   	 }
 398                   	       ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
 399                         }
 400                      }
 401                      return 0 ;
 402                   }
 403                   
 404                   template<class L> L * DQueue<L>::remove_no_lock(const L *key) throw(IPCException)
 405                   {
 406                      if(key == 0 )
 407                         return 0;
 408                      if( pegasus_thread_self() != _mutex->get_owner())
 409                         throw Permission(pegasus_thread_self());
 410                      
 411                      if (_actual_count->value() )
 412                      {
 413                         L *ret = static_cast<L *>(Base::next(0));
 414                         while( ret != 0 )
 415 mday         1.24       {
 416 kumpf        1.31 	 if (ret->operator==(*key))
 417 mday         1.24 	 {
 418 kumpf        1.31 	    ret = static_cast<L *>(Base::remove(static_cast<const void *>(ret)));
 419 mday         1.24 	    if( ret != 0 )
 420                   	       (*_actual_count)--;
 421                   	    return ret;
 422                   	 }
 423                   	 ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
 424                         }
 425                      }
 426                      return 0 ;
 427                   }
 428                   
 429                   
 430                   template<class L> L * DQueue<L>::remove(const void *key) throw(IPCException)
 431                   {
 432                      
 433                      L *ret = 0;
 434                      
 435                      if( _actual_count->value() > 0 ) 
 436                      {
 437                         _mutex->lock(pegasus_thread_self());
 438                         ret = DQueue<L>::remove_no_lock(key);
 439                         _mutex->unlock( );
 440 mday         1.24    }
 441                      return(ret);
 442                   }
 443                   
 444                   template<class L>L *DQueue<L>::remove(const L *key) throw(IPCException)
 445                   {
 446                      L *ret = 0;
 447                      
 448                      if( _actual_count->value() > 0 ) 
 449                      {
 450                         _mutex->lock(pegasus_thread_self());
 451                         ret = DQueue<L>::remove_no_lock(key);
 452                         _mutex->unlock();
 453                      }
 454                      return(ret);
 455                   }
 456                   
 457                   template<class L> L *DQueue<L>::reference(const void *key) throw(IPCException)
 458                   {
 459                      if(key == 0)
 460                         return 0;
 461 mday         1.24    
 462                      if( pegasus_thread_self() != _mutex->get_owner())
 463                         throw Permission(pegasus_thread_self());
 464                      
 465                      if( _actual_count->value() ) 
 466                      {
 467                         L *ret = static_cast<L *>(Base::next(0));
 468                         while(ret != 0)
 469                         {
 470                   	 if(ret->operator==(key))
 471                   	    return ret;
 472                   	 ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
 473                         }
 474                      }
 475                      return(0);
 476                   }
 477                   
 478                   template<class L> L *DQueue<L>::reference(const L *key)
 479                   {
 480                      if(key == 0)
 481                         return 0;
 482 mday         1.24 	 
 483                      if( pegasus_thread_self() != _mutex->get_owner())
 484                         throw Permission(pegasus_thread_self());
 485                   	 
 486                      if( _actual_count->value() ) 
 487                      {
 488                         L *ret = static_cast<L *>(Base::next(0));
 489                         while(ret != 0)
 490                         {
 491 kumpf        1.32 	 if(ret->operator==(*key))
 492 mday         1.24 	    return ret;
 493                   	 ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
 494                         }
 495                      }
 496                   	 return(0);
 497                   }
 498                   
 499                   template<class L> L * DQueue<L>::next( const void * ref) throw(IPCException)
 500                   {
 501                      if (_mutex->get_owner() != pegasus_thread_self())
 502                         throw Permission(pegasus_thread_self()) ;
 503                      return static_cast<L *>(Base::next( ref ));
 504                   }
 505                         
 506                   template<class L> L *DQueue<L>::prev( const void *ref) throw(IPCException)
 507                   {
 508                      if (_mutex->get_owner() != pegasus_thread_self())
 509                         throw Permission(pegasus_thread_self());
 510 keith.petley 1.39    return  static_cast<L *>(Base::prev( ref ));
 511 mday         1.24 }
 512                   
 513                   template<class L> Boolean DQueue<L>::exists(const void *key) throw(IPCException) 
 514                   {
 515                      if(key == 0)
 516                         return false;
 517                      
 518                      Boolean ret = false;
 519                      if(_actual_count->value() > 0)
 520                      {
 521                         _mutex->lock(pegasus_thread_self());
 522                         ret = DQueue<L>::reference(key);
 523                         _mutex->unlock();
 524                      }
 525                      return(ret);
 526                   }
 527                   
 528                   template<class L> Boolean DQueue<L>::exists(const L *key) throw(IPCException)
 529                   {
 530                      if(key == 0)
 531                         return false;
 532 mday         1.24    
 533                      Boolean ret = false;
 534                      if(_actual_count->value() > 0)
 535                      {
 536 mday         1.35       _mutex->lock(pegasus_thread_self());
 537 mday         1.24       ret = DQueue<L>::reference(key);
 538                         _mutex->unlock();
 539                      }
 540                      return(ret);
 541                   }
 542                   
 543                   template<class L> void AsyncDQueue<L>::_insert_prep(void) throw(IPCException)
 544                   {
 545                      if(_disallow->value() > 0)
 546                      {
 547                         unlock();
 548                         throw ListClosed();
 549                      }	    
 550                      _slot->lock_object(pegasus_thread_self());
 551                      while(true == is_full())
 552                      {
 553                         _slot->unlocked_wait(pegasus_thread_self());
 554                         if(_disallow->value() > 0)
 555                         {
 556                   	 unlock();
 557                   	 throw ListClosed();
 558 mday         1.24       }	    
 559                      }
 560                   }
 561                   
 562                   template<class L> void AsyncDQueue<L>::_insert_recover(void) throw(IPCException)
 563                   {
 564                      _node->unlocked_signal(pegasus_thread_self());
 565                      (*_actual_count)++;
 566                      unlock();
 567                   }
 568                   
 569                   template<class L> void AsyncDQueue<L>::_unlink_prep(void) throw(IPCException)
 570                   {
 571                      
 572                      if(_disallow->value() > 0)
 573                      {
 574                         unlock();
 575                         throw ListClosed();
 576                      }	    
 577                      _node->lock_object(pegasus_thread_self());
 578                      while(true == is_empty())
 579 mday         1.24    {
 580                         _node->unlocked_wait(pegasus_thread_self());
 581                         if(_disallow->value() > 0)
 582                         {
 583                   	 unlock();
 584                   	 throw ListClosed();
 585                         }
 586                      }
 587                   }
 588                   
 589                   template<class L> void AsyncDQueue<L>::_unlink_recover(void) throw(IPCException)
 590                   {
 591                      _slot->unlocked_signal(pegasus_thread_self());
 592                      (*_actual_count)--;
 593                      unlock();
 594                   }
 595                   
 596                   template<class L> L * AsyncDQueue<L>::_remove_no_lock(const void *key) throw(IPCException)
 597                   {
 598                      if(_disallow->value() > 0)
 599                      {
 600 mday         1.24       unlock();	    
 601                         throw ListClosed();
 602                      }
 603                      if( pegasus_thread_self() != _cond->get_owner())
 604                         throw Permission(pegasus_thread_self());
 605                      L *ret = static_cast<L *>(Base::next(0));
 606                      while(ret != 0)
 607                      {
 608                         if(ret->operator==(key))
 609                         {
 610                   	 return static_cast<L *>(Base::remove(static_cast<const void *>(ret)));
 611                         }
 612                         
 613                         ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
 614                      }
 615                      return 0;
 616                   }
 617                   
 618                   template<class L> L *AsyncDQueue<L>::_remove_no_lock(const L *key) throw(IPCException)
 619                   {
 620                      if(_disallow->value() > 0)
 621 mday         1.24    {
 622                         unlock();	    
 623                         throw ListClosed();
 624                      }
 625                      if( pegasus_thread_self() != _cond->get_owner())
 626                         throw Permission(pegasus_thread_self());
 627                      L *ret = static_cast<L *>(Base::next(0));
 628                      while(ret != 0)
 629                      {
 630 kumpf        1.32       if(ret->operator==(*key))
 631 mday         1.24       {
 632                   	 return static_cast<L *>(Base::remove(static_cast<const void *>(ret)));
 633                         }
 634                         
 635                         ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
 636                      }
 637                      return 0;
 638                   }
 639                   
 640                   
 641                   template<class L> AsyncDQueue<L>::AsyncDQueue(void) 
 642                      : Base(false)
 643                   {
 644                      _cond = 0;
 645                      _slot = 0;
 646                      _node = 0;
 647                      _actual_count = 0;
 648                      _disallow = 0;
 649                      _capacity = 0;
 650                   }
 651                   
 652 mday         1.24 template<class L> AsyncDQueue<L>::AsyncDQueue(Boolean head, Uint32 capacity )
 653                      : Base(head)
 654                   {
 655                      if(head == true)
 656                      {
 657                         _cond = new Mutex();
 658                         _slot = new Condition(*_cond);
 659                         _node = new Condition(*_cond);
 660                         _actual_count = new AtomicInt(0);
 661                         _disallow = new AtomicInt(0);
 662                         _capacity = new AtomicInt(capacity);
 663                      }
 664                      else
 665                      {
 666                         _cond = 0;
 667                         _slot = 0;
 668                         _node = 0;
 669                         _actual_count = 0;
 670                         _disallow = 0;
 671                         _capacity = 0;
 672                      }
 673 mday         1.24 }
 674                   
 675                   template<class L> AsyncDQueue<L>::~AsyncDQueue(void)
 676                   {
 677                      delete _cond;
 678                      delete _slot;
 679                      delete _node;
 680                      delete _actual_count;
 681                      delete _disallow;
 682                      delete _capacity;
 683                   }
 684                   
 685                   
 686                   template<class L> void AsyncDQueue<L>::shutdown_queue(void)
 687                   {
 688                      try 
 689                      {
 690                         lock(pegasus_thread_self());
 691                         (*_disallow)++;
 692                         _node->disallow();
 693                         _node->unlocked_signal(pegasus_thread_self());
 694 mday         1.24       _node->unlocked_signal(pegasus_thread_self());
 695                         _slot->disallow();
 696                         _slot->unlocked_signal(pegasus_thread_self());  
 697                         _slot->unlocked_signal(pegasus_thread_self());  
 698                         unlock();
 699                      }
 700                      catch(ListClosed & )
 701                      {
 702                         (*_disallow)++;
 703                      }
 704                   }
 705                   
 706                   template<class L> Boolean AsyncDQueue<L>::is_full(void)
 707                   {
 708 mday         1.29    return false;
 709                      
 710                   
 711 mday         1.24    if( _capacity->value() == 0 )
 712                         return false;
 713                      
 714                      if(_actual_count->value() >= _capacity->value())
 715                         return true;
 716                      return false;
 717                   }
 718                   
 719                   template<class L> Boolean AsyncDQueue<L>::is_empty(void)
 720                   {
 721                      if(_actual_count->value() == 0)
 722                         return true;
 723                      return false;
 724                   }
 725                   
 726                   
 727                   template<class L> Boolean AsyncDQueue<L>::is_shutdown(void)
 728                   {
 729                      if( _disallow->value() > 0)
 730                         return true;
 731                      return false;
 732 mday         1.24 }
 733                   
 734                   template<class L> void AsyncDQueue<L>::try_lock(PEGASUS_THREAD_TYPE myself) throw(IPCException)
 735                   {
 736                      if(_disallow->value() > 0)
 737                      {
 738                         throw ListClosed();
 739                      }
 740                      _cond->try_lock(myself);
 741                   }
 742                   
 743                   template<class L> void AsyncDQueue<L>::lock(PEGASUS_THREAD_TYPE myself) throw(IPCException)
 744                   {
 745                      if(_disallow->value() > 0)
 746                      {
 747                         throw ListClosed();
 748                      }
 749                      _cond->lock(myself);
 750                   }
 751                   
 752                   template<class L> void AsyncDQueue<L>::unlock(void)
 753 mday         1.24 {
 754                      _cond->unlock();
 755                   }
 756                   
 757 mday         1.30 template<class L> void AsyncDQueue<L>::signal_slot(void) throw(IPCException)
 758                   { 
 759                      _cond->lock(pegasus_thread_self()); 
 760                      _slot->unlocked_signal(pegasus_thread_self());
 761                      _cond->unlock();
 762                   }
 763                   
 764                   template<class L> void AsyncDQueue<L>::signal_node(void) throw(IPCException)
 765                   {
 766                      _cond->lock(pegasus_thread_self()); 
 767                      _node->unlocked_signal(pegasus_thread_self());
 768                      _cond->unlock();
 769                   }
 770                   
 771                   template<class L> Condition *AsyncDQueue<L>::get_node_cond(void) 
 772                   { return _node ; }
 773                      
 774                   template<class L> Condition * AsyncDQueue<L>::get_slot_cond(void)
 775                   { return _slot; }
 776                         
 777 mday         1.24 template<class L> void AsyncDQueue<L>::wait_for_node(void) throw(IPCException)
 778                   {
 779                      _unlink_prep();
 780                   }
 781                   
 782                   template<class L> void AsyncDQueue<L>::set_capacity(Uint32 capacity) throw(IPCException)
 783                   {
 784                      lock(pegasus_thread_self());
 785 keith.petley 1.39    *_capacity = capacity;
 786 mday         1.24    unlock();
 787                   }
 788                   
 789                   template<class L> Uint32 AsyncDQueue<L>::get_capacity(void)
 790                   {
 791                      return _capacity->value();
 792                   }
 793                   
 794                   template<class L> void AsyncDQueue<L>::insert_first(L *element) throw(IPCException)
 795                   {
 796                      if(element == 0)
 797                         return;
 798                   	 
 799                      lock(pegasus_thread_self());
 800                      if(true == is_full())
 801                      {
 802                         unlock();
 803                         throw ListFull(_capacity->value());
 804                      }
 805                      Base::insert_first(static_cast<void *>(element));
 806                      _insert_recover();
 807 mday         1.24 }
 808                   
 809                   template<class L> void AsyncDQueue<L>::insert_first_wait(L *element) throw(IPCException)
 810                   {
 811                      if(element == 0)
 812                         return;
 813                      
 814                      _insert_prep();
 815                      Base::insert_first(static_cast<void *>(element));
 816                      _insert_recover();
 817                   }
 818                   
 819                   template<class L> void AsyncDQueue<L>::insert_last(L *element) throw(IPCException)
 820                   {
 821                      if(element == 0)
 822                         return;
 823                      lock(pegasus_thread_self());
 824                      if(true == is_full())
 825                      {
 826                         unlock();
 827                         throw ListFull(_capacity->value());
 828 mday         1.24    }
 829                      Base::insert_last(static_cast<void *>(element));
 830                      _insert_recover();
 831                   }
 832                   
 833                   template<class L> void AsyncDQueue<L>::insert_last_wait(L *element) throw(IPCException)
 834                   {
 835                      if(element == 0)
 836                         return;
 837                      _insert_prep();
 838                      Base::insert_last(element);
 839                      _insert_recover();
 840                   }
 841                   
 842                   template<class L> void AsyncDQueue<L>::empty_list(void) throw(IPCException)
 843                   {
 844                      lock(pegasus_thread_self());
 845                      Base::empty_list();
 846                      (*_actual_count) = 0; 
 847                      _slot->unlocked_signal(pegasus_thread_self());
 848                      unlock();
 849 mday         1.24 }
 850                   
 851                   template<class L> L *AsyncDQueue<L>::remove_first(void) throw(IPCException)
 852                   {
 853 mday         1.29 
 854 mday         1.24    lock(pegasus_thread_self());
 855                      L *ret = static_cast<L *>(Base::remove_first());
 856                      if(ret != 0)
 857 mday         1.29    {
 858                         _slot->unlocked_signal(pegasus_thread_self());
 859 mday         1.24       (*_actual_count)--;
 860 mday         1.29    }
 861 mday         1.24    unlock();
 862                      return ret;
 863                   }
 864                   
 865                   template<class L> L *AsyncDQueue<L>::remove_first_wait(void) throw(IPCException)
 866                   {
 867                      _unlink_prep();
 868                      L *ret = static_cast<L *>(Base::remove_first());
 869                      _unlink_recover();
 870                      return ret;
 871                   }
 872                   
 873                   template<class L> L *AsyncDQueue<L>::remove_last(void) throw(IPCException)
 874                   {
 875                      lock(pegasus_thread_self());
 876 mday         1.29 
 877 mday         1.24    L *ret = static_cast<L *>(Base::remove_last());
 878                      if(ret != 0)
 879 mday         1.29    {
 880 mday         1.24       (*_actual_count)--;
 881 mday         1.29       _slot->unlocked_signal(pegasus_thread_self());
 882                      }
 883 mday         1.24    unlock();
 884                      return ret;
 885                   }
 886                   
 887                   template<class L> L *AsyncDQueue<L>::remove_last_wait(void) throw(IPCException)
 888                   {
 889                      _unlink_prep();
 890                      L *ret = static_cast<L *>(Base::remove_last());
 891                      _unlink_recover();
 892                      return ret;
 893                   }
 894                   
 895                   template<class L> L *AsyncDQueue<L>::remove(const void *key) throw(IPCException)
 896                   {
 897                      if(key == 0)
 898                         return 0;
 899                      lock(pegasus_thread_self());
 900 mday         1.29 
 901 mday         1.24    L *ret = _remove_no_lock(key);
 902                      if(ret != 0)
 903                      {
 904                         (*_actual_count)--;
 905                         _slot->unlocked_signal(pegasus_thread_self());
 906                      }
 907                      unlock();
 908                      return ret;
 909                   }
 910                   
 911                   template<class L>L *AsyncDQueue<L>::remove(const L *key) throw(IPCException)
 912                   {
 913                      if(key == 0)
 914                         return 0;
 915                      lock(pegasus_thread_self());
 916 mday         1.29 
 917 mday         1.24    L *ret = _remove_no_lock(key);
 918                      if(ret != 0)
 919                      {
 920                         (*_actual_count)--;
 921                         _slot->unlocked_signal(pegasus_thread_self());
 922                      }
 923                      unlock();
 924                      return ret;
 925                   }
 926                   
 927                   template<class L> L *AsyncDQueue<L>::remove_no_lock(const void *key) throw(IPCException)
 928                   {
 929 mday         1.29    if(_disallow->value() > 0)
 930                      {
 931                         unlock();
 932                         throw ListClosed();
 933                      }	    
 934                   
 935 mday         1.24    if(key == 0)
 936                         return 0;
 937                      
 938                      L *ret = 0;
 939                      
 940                      if(Base::count() > 0 ) 
 941                      {
 942                         ret = _remove_no_lock(key);
 943                         if(ret != 0)
 944                         {
 945                   	 (*_actual_count)--;
 946                   	 _slot->unlocked_signal(pegasus_thread_self());
 947                         }
 948                      }
 949                      return ret;
 950                   }
 951                   
 952                   
 953                   template<class L> L *AsyncDQueue<L>::remove_no_lock(const L *key) throw(IPCException)
 954                   {
 955 mday         1.29    if(_disallow->value() > 0)
 956                      {
 957                         unlock();
 958                         throw ListClosed();
 959                      }	    
 960                   
 961 mday         1.24    if(key == 0)
 962                         return 0;
 963                      
 964                      L *ret = 0;
 965                      
 966                      if(Base::count() > 0 ) 
 967                      {
 968                         ret = _remove_no_lock(key);
 969                         if(ret != 0)
 970                         {
 971                   	 (*_actual_count)--;
 972                   	 _slot->unlocked_signal(pegasus_thread_self());
 973                         }
 974                      }
 975                      return ret;
 976                   }
 977                   
 978                   template<class L> L *AsyncDQueue<L>::remove_wait(const void *key) throw(IPCException)
 979                   {
 980 mday         1.29 
 981 mday         1.24    if(key == 0)
 982                         return 0;
 983                      
 984                      lock(pegasus_thread_self());
 985                      
 986                      L *ret = _remove_no_lock(key);
 987                      while( ret == 0 )
 988                      {
 989                         if(_disallow->value() > 0)
 990                         {
 991                   	 unlock();	    
 992                   	 throw ListClosed();
 993                         }
 994                         _node->unlocked_wait(pegasus_thread_self());
 995                         if(_disallow->value() > 0)
 996                         {
 997                   	 unlock();	    
 998                   	 throw ListClosed();
 999                         }
1000                         ret = _remove_no_lock(key);
1001                      }
1002 mday         1.24    if(ret != 0)
1003                      {
1004                         (*_actual_count)--;
1005                         _slot->unlocked_signal(pegasus_thread_self());
1006                      }
1007                      unlock();
1008                      return ret;
1009                   }
1010                   
1011                   template<class L> L *AsyncDQueue<L>::next(const L *ref) throw(IPCException)
1012                   {
1013                      if( pegasus_thread_self() != _cond->get_owner())
1014                         throw Permission(pegasus_thread_self());
1015                      
1016                      return static_cast<L *>(Base::next( reinterpret_cast<const void *>(ref)));
1017                   }
1018                   
1019                   template<class L> L *AsyncDQueue<L>::prev(const L *ref) throw(IPCException)
1020                   {
1021                      if( pegasus_thread_self() != _cond->get_owner())
1022                         throw Permission(pegasus_thread_self());
1023 mday         1.24    
1024                      return static_cast<L *>(Base::prev( reinterpret_cast<const void *>(ref)));
1025                   }
1026                   
1027                   template<class L> L *AsyncDQueue<L>::reference(const void *key) throw(IPCException)
1028                   {
1029 mday         1.29    if(_disallow->value() > 0)
1030                      {
1031                         unlock();
1032                         throw ListClosed();
1033                      }	    
1034                   
1035 mday         1.24    if( key == 0 )
1036                         return 0;
1037                      
1038                      if( pegasus_thread_self() != _cond->get_owner())
1039                         throw Permission(pegasus_thread_self());
1040                      
1041                      if(Base::count() > 0 ) 
1042                      {
1043                         L *ret = static_cast<L *>(Base::next(0));
1044                         while(ret != 0)
1045                         {
1046                   	 if(ret->operator==(key))
1047                   	    return ret;
1048                   	       ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
1049                         }
1050                      }
1051                      return(0);
1052                   }
1053                   
1054                   template<class L> L *AsyncDQueue<L>::reference(const L *key) throw(IPCException)
1055                   {
1056 mday         1.29    if(_disallow->value() > 0)
1057                      {
1058                         unlock();
1059                         throw ListClosed();
1060                      }	    
1061                   
1062 mday         1.24    if(key == 0)
1063                         return 0;
1064                      
1065                      if( pegasus_thread_self() != _cond->get_owner())
1066                         throw Permission(pegasus_thread_self());
1067                      
1068                      if(Base::count() > 0 ) 
1069                      {
1070                         L *ret = static_cast<L *>(Base::next(0));
1071                         while(ret != 0)
1072                         {
1073 kumpf        1.32 	 if(ret->operator==(*key))
1074 mday         1.24 	    return ret;
1075                   	 ret = static_cast<L *>(Base::next(static_cast<const void *>(ret)));
1076                         }
1077                      }
1078                      return(0);
1079                   }
1080                   
1081                   template<class L> Uint32 AsyncDQueue<L>::count(void) { return _actual_count->value() ; }
1082 mday         1.37 template<class L> Uint32 AsyncDQueue<L>::size(void) { return _actual_count->value() ; }
1083 mday         1.24 
1084 mike         1.2  
1085                   PEGASUS_NAMESPACE_END
1086                   
1087                   #endif /* Pegasus_DQueue_h */
1088                   

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2