(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueue.cpp between version 1.14 and 1.26.2.1

version 1.14, 2001/12/20 02:20:07 version 1.26.2.1, 2002/03/04 11:57:39
Line 28 
Line 28 
  
 #include <Pegasus/Common/HashTable.h> #include <Pegasus/Common/HashTable.h>
 #include <Pegasus/Common/IPC.h> #include <Pegasus/Common/IPC.h>
   #include <Pegasus/Common/Tracer.h>
 #include "MessageQueue.h" #include "MessageQueue.h"
   #include "MessageQueueService.h"
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
Line 40 
Line 41 
 static QueueTable _queueTable(128); static QueueTable _queueTable(128);
 static Mutex q_table_mut = Mutex(); static Mutex q_table_mut = Mutex();
  
 static Uint32 _GetNextQueueId() throw(IPCException)  Uint32 MessageQueue::getNextQueueId() throw(IPCException)
 { {
      static Uint32 _nextQueueId = 2;
  
    static Uint32 _queueId = 2;     //
    static Mutex _id_mut = Mutex();     // Lock mutex:
      //
  
      static Mutex _id_mut = Mutex();
    _id_mut.lock(pegasus_thread_self());    _id_mut.lock(pegasus_thread_self());
  
    // Handle wrap-around!     // Assign next queue id. Handle wrap around and never assign zero as
    if (_queueId == 0)     // a queue id:
       _queueId = MessageQueue::_CIMOM_Q_ID;  
    Uint32 ret = _queueId++;     if (_nextQueueId == 0)
         _nextQueueId = 2;
   
      Uint32 queueId = _nextQueueId++;
   
      //
      // Unlock mutex:
      //
   
    _id_mut.unlock();    _id_mut.unlock();
  
    return ret;     return queueId;
 } }
  
 Uint32 MessageQueue::_CIMOM_Q_ID = 1;  
  
 MessageQueue::MessageQueue(const char * name, Boolean async)  
         : _mut( ), _count(0), _front(0), _back(0),  
           _async(async),  
           _workThread(MessageQueue::workThread, this, false),  
           _workSemaphore(0)  
  
   MessageQueue::MessageQueue(
 {      const char* name,
    if(name != NULL)      Boolean async,
       Uint32 queueId)
      : _queueId(queueId), _capabilities(0), _count(0), _front(0), _back(0), _async(async)
    {    {
       strncpy(_name, name, 25);      //
       _name[25] = 0x00;      // Copy the name:
    }      //
    else  
       memset(_name, 0x00,25);  
  
     q_table_mut.lock(pegasus_thread_self());     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::MessageQueue()");
  
     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))      if (!name)
        ;          name = "";
     q_table_mut.unlock();  
  
       _name = new char[strlen(name) + 1];
       strcpy(_name, name);
  
     if(_async == true)      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
     {          "MessageQueue::MessageQueue  name = %s, queueId = %i", name, queueId);
        _workThread.run();  
     }  
  
 }      //
       // Insert into queue table:
       //
  
 MessageQueue::~MessageQueue()  
 {  
     // ATTN-A: thread safety!  
     q_table_mut.lock(pegasus_thread_self());     q_table_mut.lock(pegasus_thread_self());
  
     _queueTable.remove(_queueId);      while (!_queueTable.insert(_queueId, this))
          ;
   
     q_table_mut.unlock();     q_table_mut.unlock();
  
     if(_async == true)  
     {  
        _workThread.cancel();    // cancel thread  
        _workSemaphore.signal();// wake thread  
        _workThread.join();     // wait for thread to complete  
     }  
  
      PEG_METHOD_EXIT();
 } }
  
   MessageQueue::~MessageQueue()
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueue::workThread(void * arg)  
 { {
         // get thread from argument      // ATTN-A: thread safety!
         Thread * thread = (Thread *)arg;  
   
         PEGASUS_ASSERT(thread != 0);  
   
         // get message queue from thread  
         MessageQueue * queue = (MessageQueue *)thread->get_parm();  
  
         PEGASUS_ASSERT(queue != 0);      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
  
         while(true)      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         {          "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
            if(thread->is_cancelled())  
            {  
               break;  
            }  
  
            // wait for work      q_table_mut.lock(pegasus_thread_self());
            queue->_workSemaphore.wait();  
  
            // stop the thread when the message queue has been destroyed.      _queueTable.remove(_queueId);
            // ATTN: should check the thread cancel flag that is not yet exposed!      q_table_mut.unlock();
            if(MessageQueue::lookup(queue->_queueId) == 0)  
            {  
               break;  
            }  
  
            // ensure the queue has a message before dispatching      // Free the name:
            if(queue->_count != 0)  
            {  
               queue->handleEnqueue();  
            }  
         }  
  
         thread->exit_self(PEGASUS_THREAD_RETURN(0));      delete [] _name;
  
         return(0);      PEG_METHOD_EXIT();
 } }
  
 void MessageQueue::enqueue(Message* message) throw(IPCException) void MessageQueue::enqueue(Message* message) throw(IPCException)
 { {
       PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
   
     if (!message)     if (!message)
       {
          Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "MessageQueue::enqueue failure");
          PEG_METHOD_EXIT();
        throw NullPointer();        throw NullPointer();
       }
  
     if (getenv("PEGASUS_TRACE"))     if (getenv("PEGASUS_TRACE"))
     {     {
Line 159 
Line 146 
        message->print(cout);        message->print(cout);
     }     }
  
   
     _mut.lock(pegasus_thread_self());     _mut.lock(pegasus_thread_self());
     if (_back)     if (_back)
     {     {
Line 175 
Line 163 
        message->_next = 0;        message->_next = 0;
     }     }
     message->_owner = this;     message->_owner = this;
   
     _count++;     _count++;
     if( _async == true )      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
     {         "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);
        _workSemaphore.signal();  
     }  
  
     _mut.unlock();     _mut.unlock();
  
     if(_async == false )  
        handleEnqueue();        handleEnqueue();
       PEG_METHOD_EXIT();
 } }
  
 Message* MessageQueue::dequeue() throw(IPCException) Message* MessageQueue::dequeue() throw(IPCException)
 { {
       PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
   
    _mut.lock(pegasus_thread_self());    _mut.lock(pegasus_thread_self());
     if (_front)     if (_front)
     {     {
Line 199 
Line 188 
  
         if (_back == message)         if (_back == message)
             _back = 0;             _back = 0;
   
         _count--;         _count--;
           Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
               "MessageQueue::dequeue _queueId = %d, _count = %d",
               _queueId, _count);
   
         _mut.unlock();         _mut.unlock();
         message->_next = 0;         message->_next = 0;
         message->_prev = 0;         message->_prev = 0;
         message->_owner = 0;         message->_owner = 0;
   
           PEG_METHOD_EXIT();
         return message;         return message;
     }     }
     _mut.unlock();     _mut.unlock();
   
       PEG_METHOD_EXIT();
     return 0;     return 0;
 } }
   ;
   
  
 void MessageQueue::remove(Message* message) throw(IPCException) void MessageQueue::remove(Message* message) throw(IPCException)
 { {
       PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");
   
     if (!message)     if (!message)
       {
           PEG_METHOD_EXIT();
         throw NullPointer();         throw NullPointer();
       }
  
     if (message->_owner != this)     if (message->_owner != this)
       {
           PEG_METHOD_EXIT();
         throw NoSuchMessageOnQueue();         throw NoSuchMessageOnQueue();
       }
  
     _mut.lock(pegasus_thread_self());     _mut.lock(pegasus_thread_self());
  
Line 231 
Line 239 
         _front = message->_next;         _front = message->_next;
  
     _count--;     _count--;
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
          "MessageQueue::remove _count = %d", _count);
   
     _mut.unlock();     _mut.unlock();
  
     message->_prev = 0;     message->_prev = 0;
     message->_next = 0;     message->_next = 0;
     message->_owner = 0;     message->_owner = 0;
   
       PEG_METHOD_EXIT();
 } }
  
 Message* MessageQueue::findByType(Uint32 type) throw(IPCException) Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
Line 329 
Line 342 
  
     q_table_mut.unlock();     q_table_mut.unlock();
  
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "MessageQueue::lookup failure queueId = %i", queueId);
   
     return 0;     return 0;
 } }
  
  
 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException) MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)
 { {
   
    if(name == NULL)    if(name == NULL)
       throw NullPointer();       throw NullPointer();
    q_table_mut.lock(pegasus_thread_self());    q_table_mut.lock(pegasus_thread_self());
Line 351 
Line 368 
    }    }
    q_table_mut.unlock();    q_table_mut.unlock();
  
      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "MessageQueue::lookup failure - name = %s", name);
   
    return 0;    return 0;
 } }
  


Legend:
Removed from v.1.14  
changed lines
  Added in v.1.26.2.1

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2