(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueue.cpp between version 1.6.2.7 and 1.24

version 1.6.2.7, 2001/12/10 18:29:38 version 1.24, 2002/02/20 22:00:51
Line 28 
Line 28 
  
 #include <Pegasus/Common/HashTable.h> #include <Pegasus/Common/HashTable.h>
 #include <Pegasus/Common/IPC.h> #include <Pegasus/Common/IPC.h>
   #include <Pegasus/Common/Tracer.h>
 #include "MessageQueue.h" #include "MessageQueue.h"
   #include "MessageQueueService.h"
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
Line 40 
Line 41 
 static QueueTable _queueTable(128); static QueueTable _queueTable(128);
 static Mutex q_table_mut = Mutex(); static Mutex q_table_mut = Mutex();
  
 static Uint32 _GetNextQueueId() throw(IPCException)  Uint32 MessageQueue::getNextQueueId() throw(IPCException)
 { {
      static Uint32 _nextQueueId = 2;
  
    static Uint32 _queueId = 2;     //
      // Lock mutex:
      //
  
    q_table_mut.lock(pegasus_thread_self());     static Mutex _id_mut = Mutex();
      _id_mut.lock(pegasus_thread_self());
  
    // Handle wrap-around!     // Assign next queue id. Handle wrap around and never assign zero as
    if (_queueId == 0)     // a queue id:
       _queueId = MessageQueue::_CIMOM_Q_ID;  
    Uint32 ret = _queueId++;  
    q_table_mut.unlock();  
  
    return ret;     if (_nextQueueId == 0)
 }        _nextQueueId = 2;
  
 Uint32 MessageQueue::_CIMOM_Q_ID = 1;     Uint32 queueId = _nextQueueId++;
  
 MessageQueue::MessageQueue() : _mut( ), _count(0), _front(0), _back(0)     //
 {     // Unlock mutex:
     // ATTN-A: thread safety!     //
  
    memset(_name, 0x00, 26);     _id_mut.unlock();
    q_table_mut.lock(pegasus_thread_self());  
  
     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))     return queueId;
         ;  
     q_table_mut.unlock();  
 } }
  
 MessageQueue::MessageQueue(char *name) : _mut( ), _count(0), _front(0), _back(0)  
 {  
    if(name != NULL)  MessageQueue::MessageQueue(
       const char* name,
       Boolean async,
       Uint32 queueId)
      : _queueId(queueId), _capabilities(0), _count(0), _front(0), _back(0), _async(async)
    {    {
       strncpy(_name, name, 25);      //
       _name[25] = 0x00;      // Copy the name:
    }      //
  
    else     PEG_FUNC_ENTER(TRC_DISPATCHER,"MessageQueue::MessageQueue()");
       memset(_name, 0x00,25);  
       if (!name)
           name = "";
   
       _name = new char[strlen(name) + 1];
       strcpy(_name, name);
   
       Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
           "MessageQueue::MessageQueue  name = %s, queueId = %i", name, queueId);
   
       //
       // Insert into queue table:
       //
  
    q_table_mut.lock(pegasus_thread_self());    q_table_mut.lock(pegasus_thread_self());
  
     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))      while (!_queueTable.insert(_queueId, this))
        ;        ;
   
     q_table_mut.unlock();     q_table_mut.unlock();
  
 }  
  
      PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::MessageQueue()");
   }
  
 MessageQueue::~MessageQueue() MessageQueue::~MessageQueue()
 { {
     // ATTN-A: thread safety!     // ATTN-A: thread safety!
   
       PEG_FUNC_ENTER(TRC_DISPATCHER,"MessageQueue::~MessageQueue()");
   
       Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
           "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
   
    q_table_mut.lock(pegasus_thread_self());    q_table_mut.lock(pegasus_thread_self());
  
     _queueTable.remove(_queueId);     _queueTable.remove(_queueId);
     q_table_mut.unlock();     q_table_mut.unlock();
  
       // Free the name:
   
       delete [] _name;
   
       PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::~MessageQueue()");
 } }
  
 void MessageQueue::enqueue(Message* message) throw(IPCException) void MessageQueue::enqueue(Message* message) throw(IPCException)
 { {
   
     if (!message)     if (!message)
       {
          Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
           "MessageQueue::enqueue failure");
         throw NullPointer();         throw NullPointer();
       }
     _mut.lock(pegasus_thread_self());  
  
     if (getenv("PEGASUS_TRACE"))     if (getenv("PEGASUS_TRACE"))
     {     {
Line 113 
Line 144 
         message->print(cout);         message->print(cout);
     }     }
  
   
       _mut.lock(pegasus_thread_self());
     if (_back)     if (_back)
     {     {
         _back->_next = message;         _back->_next = message;
Line 129 
Line 162 
     }     }
     message->_owner = this;     message->_owner = this;
     _count++;     _count++;
   
     _mut.unlock();     _mut.unlock();
  
     handleEnqueue();     handleEnqueue();
   
 } }
  
 Message* MessageQueue::dequeue() throw(IPCException) Message* MessageQueue::dequeue() throw(IPCException)
Line 157 
Line 192 
     return 0;     return 0;
 } }
  
   
   
 void MessageQueue::remove(Message* message) throw(IPCException) void MessageQueue::remove(Message* message) throw(IPCException)
 { {
     if (!message)     if (!message)
Line 263 
Line 300 
  
 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException) MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
 { {
   
     MessageQueue* queue = 0;     MessageQueue* queue = 0;
     q_table_mut.lock(pegasus_thread_self());     q_table_mut.lock(pegasus_thread_self());
  
Line 276 
Line 314 
  
     q_table_mut.unlock();     q_table_mut.unlock();
  
       Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
           "MessageQueue::lookup failure queueId = %i", queueId);
   
     return 0;     return 0;
 } }
  
  
 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException) MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
 { {
   
    if(name == NULL)    if(name == NULL)
       throw NullPointer();       throw NullPointer();
    q_table_mut.lock(pegasus_thread_self());    q_table_mut.lock(pegasus_thread_self());
Line 298 
Line 340 
    }    }
    q_table_mut.unlock();    q_table_mut.unlock();
  
      Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
           "MessageQueue::lookup failure - name = %s", name);
   
    return 0;    return 0;
 } }
  


Legend:
Removed from v.1.6.2.7  
changed lines
  Added in v.1.24

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2