(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueue.cpp between version 1.6 and 1.6.2.8

version 1.6, 2001/07/12 06:21:55 version 1.6.2.8, 2001/12/10 18:34:06
Line 27 
Line 27 
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include <Pegasus/Common/HashTable.h> #include <Pegasus/Common/HashTable.h>
   #include <Pegasus/Common/IPC.h>
 #include "MessageQueue.h" #include "MessageQueue.h"
  
 PEGASUS_USING_STD; PEGASUS_USING_STD;
Line 37 
Line 38 
     QueueTable;     QueueTable;
  
 static QueueTable _queueTable(128); static QueueTable _queueTable(128);
   static Mutex q_table_mut = Mutex();
  
 static Uint32 _GetNextQueueId()  static Uint32 _GetNextQueueId() throw(IPCException)
 { {
     static Uint32 _queueId = 1;  
  
     // Handle wrap-around!     static Uint32 _queueId = 2;
      static Mutex _id_mut = Mutex();
   
      _id_mut.lock(pegasus_thread_self());
  
      // Handle wrap-around!
     if (_queueId == 0)     if (_queueId == 0)
         _queueId++;        _queueId = MessageQueue::_CIMOM_Q_ID;
      Uint32 ret = _queueId++;
      _id_mut.unlock();
  
     return _queueId++;     return ret;
 } }
  
 MessageQueue::MessageQueue() : _count(0), _front(0), _back(0)  Uint32 MessageQueue::_CIMOM_Q_ID = 1;
   
   MessageQueue::MessageQueue() : _mut( ), _count(0), _front(0), _back(0)
 { {
     // ATTN-A: thread safety!     // ATTN-A: thread safety!
      q_table_mut.lock(pegasus_thread_self());
   
      memset(_name, 0x00, 26);
  
     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
         ;         ;
       q_table_mut.unlock();
   }
   
   MessageQueue::MessageQueue(char *name) : _mut( ), _count(0), _front(0), _back(0)
   {
      if(name != NULL)
      {
         strncpy(_name, name, 25);
         _name[25] = 0x00;
 } }
  
      else
         memset(_name, 0x00,25);
   
      q_table_mut.lock(pegasus_thread_self());
   
       while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))
          ;
       q_table_mut.unlock();
   
   }
   
   
 MessageQueue::~MessageQueue() MessageQueue::~MessageQueue()
 { {
     // ATTN-A: thread safety!     // ATTN-A: thread safety!
      q_table_mut.lock(pegasus_thread_self());
  
     _queueTable.remove(_queueId);     _queueTable.remove(_queueId);
       q_table_mut.unlock();
   
 } }
  
 void MessageQueue::enqueue(Message* message)  void MessageQueue::enqueue(Message* message) throw(IPCException)
 { {
     if (!message)     if (!message)
         throw NullPointer();         throw NullPointer();
  
       _mut.lock(pegasus_thread_self());
   
       if (getenv("PEGASUS_TRACE"))
       {
           cout << "===== " << getQueueName() << ": ";
           message->print(cout);
       }
   
     if (_back)     if (_back)
     {     {
         _back->_next = message;         _back->_next = message;
Line 86 
Line 130 
     }     }
     message->_owner = this;     message->_owner = this;
     _count++;     _count++;
       _mut.unlock();
  
     handleEnqueue();     handleEnqueue();
 } }
  
 Message* MessageQueue::dequeue()  Message* MessageQueue::dequeue() throw(IPCException)
 { {
      _mut.lock(pegasus_thread_self());
     if (_front)     if (_front)
     {     {
         Message* message = _front;         Message* message = _front;
Line 101 
Line 147 
  
         if (_back == message)         if (_back == message)
             _back = 0;             _back = 0;
           _count--;
           _mut.unlock();
         message->_next = 0;         message->_next = 0;
         message->_prev = 0;         message->_prev = 0;
         message->_owner = 0;         message->_owner = 0;
         _count--;  
         return message;         return message;
     }     }
       _mut.unlock();
     return 0;     return 0;
 } }
  
 void MessageQueue::remove(Message* message)  void MessageQueue::remove(Message* message) throw(IPCException)
 { {
     if (!message)     if (!message)
         throw NullPointer();         throw NullPointer();
Line 119 
Line 166 
     if (message->_owner != this)     if (message->_owner != this)
         throw NoSuchMessageOnQueue();         throw NoSuchMessageOnQueue();
  
       _mut.lock(pegasus_thread_self());
   
     if (message->_next)     if (message->_next)
         message->_next->_prev = message->_prev;         message->_next->_prev = message->_prev;
     else     else
Line 129 
Line 178 
     else     else
         _front = message->_next;         _front = message->_next;
  
       _count--;
       _mut.unlock();
   
     message->_prev = 0;     message->_prev = 0;
     message->_next = 0;     message->_next = 0;
     message->_owner = 0;     message->_owner = 0;
     _count--;  
 } }
  
 Message* MessageQueue::findByType(Uint32 type)  Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
 { {
      _mut.lock(pegasus_thread_self());
   
     for (Message* m = front(); m; m = m->getNext())     for (Message* m = front(); m; m = m->getNext())
     {     {
         if (m->getType() == type)         if (m->getType() == type)
          {
             _mut.unlock();
             return m;             return m;
     }     }
       }
       _mut.unlock();
     return 0;     return 0;
 } }
  
 Message* MessageQueue::findByKey(Uint32 key)  Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
 { {
      _mut.lock(pegasus_thread_self());
   
     for (Message* m = front(); m; m = m->getNext())     for (Message* m = front(); m; m = m->getNext())
     {     {
         if (m->getKey() == key)         if (m->getKey() == key)
          {
             _mut.unlock();
             return m;             return m;
     }     }
  
       }
       _mut.unlock();
     return 0;     return 0;
 } }
  
 void MessageQueue::print(ostream& os) const  void MessageQueue::print(ostream& os) const throw(IPCException)
 { {
      const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
   
     for (const Message* m = front(); m; m = m->getNext())     for (const Message* m = front(); m; m = m->getNext())
         m->print(os);         m->print(os);
      const_cast<MessageQueue *>(this)->_mut.unlock();
 } }
  
 Message* MessageQueue::find(Uint32 type, Uint32 key)  Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
 { {
      _mut.lock(pegasus_thread_self());
   
     for (Message* m = front(); m; m = m->getNext())     for (Message* m = front(); m; m = m->getNext())
     {     {
         if (m->getType() == type && m->getKey() == key)         if (m->getType() == type && m->getKey() == key)
          {
             _mut.unlock();
             return m;             return m;
     }     }
       }
       _mut.unlock();
  
     return 0;     return 0;
 } }
  
 void MessageQueue::lock()  void MessageQueue::lock() throw(IPCException)
 { {
      _mut.lock(pegasus_thread_self());
 } }
  
 void MessageQueue::unlock() void MessageQueue::unlock()
 { {
      _mut.unlock();
   }
  
   const char* MessageQueue::getQueueName() const
   {
      if(_name[0] != 0x00)
         return _name;
      return "unknown";
 } }
  
 MessageQueue* MessageQueue::lookup(Uint32 queueId)  MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
 { {
     MessageQueue* queue = 0;     MessageQueue* queue = 0;
       q_table_mut.lock(pegasus_thread_self());
  
     if (_queueTable.lookup(queueId, queue))     if (_queueTable.lookup(queueId, queue))
       {
          q_table_mut.unlock();
         return queue;         return queue;
       }
  
     // Not found!     // Not found!
   
       q_table_mut.unlock();
   
     return 0;     return 0;
 } }
  
   
   MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
   {
      if(name == NULL)
         throw NullPointer();
      q_table_mut.lock(pegasus_thread_self());
   
      for(QueueTable::Iterator i = _queueTable.start(); i; i++)
      {
           // ATTN: Need to decide how many characters to compare in queue names
         if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )
         {
            q_table_mut.unlock();
            return( (MessageQueue *)i.value());
         }
   
      }
      q_table_mut.unlock();
   
      return 0;
   }
   
   
 void MessageQueue::handleEnqueue() void MessageQueue::handleEnqueue()
 { {
  


Legend:
Removed from v.1.6  
changed lines
  Added in v.1.6.2.8

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2