(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueue.cpp between version 1.7 and 1.8

version 1.7, 2001/12/13 14:54:00 version 1.8, 2001/12/14 20:13:24
Line 59 
Line 59 
  
 Uint32 MessageQueue::_CIMOM_Q_ID = 1; Uint32 MessageQueue::_CIMOM_Q_ID = 1;
  
 MessageQueue::MessageQueue() : _mut( ), _count(0), _front(0), _back(0)  MessageQueue::MessageQueue(const char * name)
 {          : _mut( ), _count(0), _front(0), _back(0),
     // ATTN-A: thread safety!          _workThread(MessageQueue::workThread, this, false),
    q_table_mut.lock(pegasus_thread_self());          _workSemaphore(0)
   
    memset(_name, 0x00, 26);  
   
     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))  
         ;  
     q_table_mut.unlock();  
 }  
   
 MessageQueue::MessageQueue(char *name) : _mut( ), _count(0), _front(0), _back(0)  
 { {
    if(name != NULL)    if(name != NULL)
    {    {
       strncpy(_name, name, 25);       strncpy(_name, name, 25);
       _name[25] = 0x00;       _name[25] = 0x00;
    }    }
   
    else    else
       memset(_name, 0x00,25);       memset(_name, 0x00,25);
  
Line 88 
Line 78 
        ;        ;
     q_table_mut.unlock();     q_table_mut.unlock();
  
           _workThread.run();
 } }
  
   
 MessageQueue::~MessageQueue() MessageQueue::~MessageQueue()
 { {
     // ATTN-A: thread safety!     // ATTN-A: thread safety!
Line 99 
Line 89 
     _queueTable.remove(_queueId);     _queueTable.remove(_queueId);
     q_table_mut.unlock();     q_table_mut.unlock();
  
           _workThread.cancel();   // cancel thread
           _workSemaphore.signal();// wake thread
           _workThread.join();             // wait for thread to complete
   }
   
   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueue::workThread(void * arg)
   {
           // get thread from argument
           Thread * thread = (Thread *)arg;
   
           PEGASUS_ASSERT(thread != 0);
   
           // get message queue from thread
           MessageQueue * queue = (MessageQueue *)thread->get_parm();
   
           PEGASUS_ASSERT(queue != 0);
   
           while(true)
           {
                   // wait for work
                   queue->_workSemaphore.wait();
   
                   // stop the thread when the message queue has been destroyed.
                   // ATTN: should check the thread cancel flag that is not yet exposed!
                   if(MessageQueue::lookup(queue->_queueId) == 0)
                   {
                           break;
                   }
   
                   // ensure the queue has a message before dispatching
                   if(queue->_count != 0)
                   {
                           queue->handleEnqueue();
                   }
           }
   
           thread->exit_self(PEGASUS_THREAD_RETURN(0));
   
           return(0);
 } }
  
 void MessageQueue::enqueue(Message* message) throw(IPCException) void MessageQueue::enqueue(Message* message) throw(IPCException)
Line 132 
Line 161 
     _count++;     _count++;
     _mut.unlock();     _mut.unlock();
  
     handleEnqueue();          _workSemaphore.signal();
 } }
  
 Message* MessageQueue::dequeue() throw(IPCException) Message* MessageQueue::dequeue() throw(IPCException)


Legend:
Removed from v.1.7  
changed lines
  Added in v.1.8

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2