(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueue.cpp between version 1.6.2.3 and 1.32.4.1

version 1.6.2.3, 2001/10/17 16:02:45 version 1.32.4.1, 2002/10/25 20:49:43
Line 1 
Line 1 
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM  // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
   // The Open Group, Tivoli Systems
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 27 
Line 28 
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include <Pegasus/Common/HashTable.h> #include <Pegasus/Common/HashTable.h>
   #include <Pegasus/Common/IPC.h>
   #include <Pegasus/Common/Tracer.h>
 #include "MessageQueue.h" #include "MessageQueue.h"
   #include "MessageQueueService.h"
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
Line 37 
Line 40 
     QueueTable;     QueueTable;
  
 static QueueTable _queueTable(128); static QueueTable _queueTable(128);
   static Mutex q_table_mut ;
  
 static Uint32 _GetNextQueueId()  Uint32 MessageQueue::getNextQueueId() throw(IPCException)
 { {
     static Uint32 _queueId = 1;     static Uint32 _nextQueueId = 2;
  
     // Handle wrap-around!     //
      // Lock mutex:
      //
  
     if (_queueId == 0)     static Mutex _id_mut ;
         _queueId++;     _id_mut.lock(pegasus_thread_self());
  
     return _queueId++;     // Assign next queue id. Handle wrap around and never assign zero as
 }     // a queue id:
  
 MessageQueue::MessageQueue() : _count(0), _front(0), _back(0)     if (_nextQueueId == 0)
 {        _nextQueueId = 2;
     // ATTN-A: thread safety!  
  
    memset(_name, 0x00, 16);     Uint32 queueId = _nextQueueId++;
     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))  
         ;     //
      // Unlock mutex:
      //
   
      _id_mut.unlock();
   
      return queueId;
 } }
  
 MessageQueue::MessageQueue(char *name) : _count(0), _front(0), _back(0)  
 {  
    if(name != NULL)  MessageQueue::MessageQueue(
       const char* name,
       Boolean async,
       Uint32 queueId)
      : _queueId(queueId), _capabilities(0), _count(0), _front(0), _back(0), _async(async)
    {    {
       strncpy(_name, name, 15);      //
       _name[15] = 0x00;      // Copy the name:
    }      //
  
    else     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::MessageQueue()");
       memset(_name, 0x00,16);  
     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))      if (!name)
        ;          name = "";
 }  
       _name = new char[strlen(name) + 1];
       strcpy(_name, name);
  
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "MessageQueue::MessageQueue  name = %s, queueId = %i", name, queueId);
   
       //
       // Insert into queue table:
       //
   
       q_table_mut.lock(pegasus_thread_self());
   
       if ( ! _queueTable.insert(_queueId, this))
          abort();
   
       q_table_mut.unlock();
   
   
      PEG_METHOD_EXIT();
   }
  
 MessageQueue::~MessageQueue() MessageQueue::~MessageQueue()
 { {
     // ATTN-A: thread safety!     // ATTN-A: thread safety!
  
       PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
   
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
   
       q_table_mut.lock(pegasus_thread_self());
   
     _queueTable.remove(_queueId);     _queueTable.remove(_queueId);
       q_table_mut.unlock();
   
       // Free the name:
   
       delete [] _name;
   
       PEG_METHOD_EXIT();
 } }
  
 void MessageQueue::enqueue(Message* message)  void MessageQueue::enqueue(Message* message) throw(IPCException)
 { {
     if (!message)      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
         throw NullPointer();  
  
     if (getenv("PEGASUS_TRACE"))      if (!message)
     {     {
         cout << "===== " << getQueueName() << ": ";         Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         message->print(cout);          "MessageQueue::enqueue failure");
          PEG_METHOD_EXIT();
          throw NullPointer();
     }     }
  
       PEG_TRACE_STRING( TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
                         String("Queue name: ") + getQueueName() ) ;
       Tracer::trace   ( TRC_MESSAGEQUEUESERVICE,
                         Tracer::LEVEL3,
                         "Message: [%s, %d]",
                         MessageTypeToString(message->getType()),
                         message->getKey() );
   
       _mut.lock(pegasus_thread_self());
     if (_back)     if (_back)
     {     {
         _back->_next = message;         _back->_next = message;
Line 107 
Line 165 
         message->_next = 0;         message->_next = 0;
     }     }
     message->_owner = this;     message->_owner = this;
   
     _count++;     _count++;
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
                     "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);
   
       _mut.unlock();
  
     handleEnqueue();     handleEnqueue();
       PEG_METHOD_EXIT();
 } }
  
 Message* MessageQueue::dequeue()  Message* MessageQueue::dequeue() throw(IPCException)
 { {
       PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
   
      _mut.lock(pegasus_thread_self());
     if (_front)     if (_front)
     {     {
         Message* message = _front;         Message* message = _front;
Line 124 
Line 191 
         if (_back == message)         if (_back == message)
             _back = 0;             _back = 0;
  
           _count--;
           Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
               "MessageQueue::dequeue _queueId = %d, _count = %d",
               _queueId, _count);
   
           _mut.unlock();
         message->_next = 0;         message->_next = 0;
         message->_prev = 0;         message->_prev = 0;
         message->_owner = 0;         message->_owner = 0;
         _count--;  
  
           PEG_METHOD_EXIT();
         return message;         return message;
     }     }
       _mut.unlock();
   
       PEG_METHOD_EXIT();
     return 0;     return 0;
 } }
   ;
  
 void MessageQueue::remove(Message* message)  
   void MessageQueue::remove(Message* message) throw(IPCException)
 { {
       PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");
   
     if (!message)     if (!message)
       {
           PEG_METHOD_EXIT();
         throw NullPointer();         throw NullPointer();
       }
  
     if (message->_owner != this)     if (message->_owner != this)
       {
           PEG_METHOD_EXIT();
         throw NoSuchMessageOnQueue();         throw NoSuchMessageOnQueue();
       }
   
       _mut.lock(pegasus_thread_self());
  
     if (message->_next)     if (message->_next)
         message->_next->_prev = message->_prev;         message->_next->_prev = message->_prev;
Line 152 
Line 240 
     else     else
         _front = message->_next;         _front = message->_next;
  
       _count--;
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
          "MessageQueue::remove _count = %d", _count);
   
       _mut.unlock();
   
     message->_prev = 0;     message->_prev = 0;
     message->_next = 0;     message->_next = 0;
     message->_owner = 0;     message->_owner = 0;
     _count--;  
       PEG_METHOD_EXIT();
 } }
  
 Message* MessageQueue::findByType(Uint32 type)  Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
 { {
      _mut.lock(pegasus_thread_self());
   
     for (Message* m = front(); m; m = m->getNext())     for (Message* m = front(); m; m = m->getNext())
     {     {
         if (m->getType() == type)         if (m->getType() == type)
          {
             _mut.unlock();
             return m;             return m;
     }     }
       }
       _mut.unlock();
     return 0;     return 0;
 } }
  
 Message* MessageQueue::findByKey(Uint32 key)  Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)
 { {
      _mut.lock(pegasus_thread_self());
   
     for (Message* m = front(); m; m = m->getNext())     for (Message* m = front(); m; m = m->getNext())
     {     {
         if (m->getKey() == key)         if (m->getKey() == key)
          {
             _mut.unlock();
             return m;             return m;
     }     }
  
       }
       _mut.unlock();
     return 0;     return 0;
 } }
  
 void MessageQueue::print(ostream& os) const  void MessageQueue::print(ostream& os) const throw(IPCException)
 { {
      const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());
   
     for (const Message* m = front(); m; m = m->getNext())     for (const Message* m = front(); m; m = m->getNext())
         m->print(os);         m->print(os);
      const_cast<MessageQueue *>(this)->_mut.unlock();
 } }
  
 Message* MessageQueue::find(Uint32 type, Uint32 key)  Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)
 { {
      _mut.lock(pegasus_thread_self());
   
     for (Message* m = front(); m; m = m->getNext())     for (Message* m = front(); m; m = m->getNext())
     {     {
         if (m->getType() == type && m->getKey() == key)         if (m->getType() == type && m->getKey() == key)
          {
             _mut.unlock();
             return m;             return m;
     }     }
       }
       _mut.unlock();
  
     return 0;     return 0;
 } }
  
 void MessageQueue::lock()  void MessageQueue::lock() throw(IPCException)
 { {
      _mut.lock(pegasus_thread_self());
 } }
  
 void MessageQueue::unlock() void MessageQueue::unlock()
 { {
      _mut.unlock();
 } }
  
 const char* MessageQueue::getQueueName() const const char* MessageQueue::getQueueName() const
 { {
    if(_name[0] != 0x00)  
       return _name;       return _name;
    return "unknown";  
 } }
  
 MessageQueue* MessageQueue::lookup(Uint32 queueId)  MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
 { {
   
     MessageQueue* queue = 0;     MessageQueue* queue = 0;
       q_table_mut.lock(pegasus_thread_self());
  
     if (_queueTable.lookup(queueId, queue))     if (_queueTable.lookup(queueId, queue))
       {
          q_table_mut.unlock();
         return queue;         return queue;
       }
  
     // Not found!     // Not found!
   
       q_table_mut.unlock();
   
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "MessageQueue::lookup failure queueId = %i", queueId);
   
     return 0;     return 0;
 } }
  
  
 MessageQueue* MessageQueue::lookup(const char *name)  MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
 { {
   
    if(name == NULL)    if(name == NULL)
       throw NullPointer();       throw NullPointer();
      q_table_mut.lock(pegasus_thread_self());
   
    for(QueueTable::Iterator i = _queueTable.start(); i; i++)    for(QueueTable::Iterator i = _queueTable.start(); i; i++)
    {    {
       if(! strncmp( ((MessageQueue *)i.value())->_name, name, 16) )          // ATTN: Need to decide how many characters to compare in queue names
         if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )
         {
            q_table_mut.unlock();
          return( (MessageQueue *)i.value());          return( (MessageQueue *)i.value());
    }    }
   
      }
      q_table_mut.unlock();
   
      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "MessageQueue::lookup failure - name = %s", name);
   
    return 0;    return 0;
 } }
  


Legend:
Removed from v.1.6.2.3  
changed lines
  Added in v.1.32.4.1

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2