(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueue.cpp between version 1.21 and 1.22

version 1.21, 2002/01/21 21:20:35 version 1.22, 2002/02/11 01:17:41
Line 43 
Line 43 
  
 Uint32 MessageQueue::getNextQueueId() throw(IPCException) Uint32 MessageQueue::getNextQueueId() throw(IPCException)
 { {
    static Uint32 _nextQueueId = 1;     static Uint32 _nextQueueId = 2;
  
    //    //
    // Lock mutex:    // Lock mutex:
Line 56 
Line 56 
    // a queue id:    // a queue id:
  
    if (_nextQueueId == 0)    if (_nextQueueId == 0)
       _nextQueueId = 1;        _nextQueueId = 2;
  
    Uint32 queueId = _nextQueueId++;    Uint32 queueId = _nextQueueId++;
  
Line 73 
Line 73 
     const char* name,     const char* name,
     Boolean async,     Boolean async,
     Uint32 queueId)     Uint32 queueId)
     :      : _queueId(queueId), _count(0), _front(0), _back(0), _async(async)
     _queueId(queueId), _count(0), _front(0), _back(0), _async(async),  
     _workThread(MessageQueue::workThread, this, false), _workSemaphore(0)  
 { {
     //     //
     // Copy the name:     // Copy the name:
Line 104 
Line 102 
     q_table_mut.unlock();     q_table_mut.unlock();
  
  
     if(_async == true)  
        _workThread.run();  
   
    PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::MessageQueue()");    PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::MessageQueue()");
 } }
  
Line 124 
Line 119 
     _queueTable.remove(_queueId);     _queueTable.remove(_queueId);
     q_table_mut.unlock();     q_table_mut.unlock();
  
     if(_async == true)  
     {  
        _workThread.cancel();    // cancel thread  
        _workSemaphore.signal(); // wake thread  
        _workThread.join();      // wait for thread to complete  
     }  
   
     // Free the name:     // Free the name:
  
     delete [] _name;     delete [] _name;
Line 138 
Line 126 
     PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::~MessageQueue()");     PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::~MessageQueue()");
 } }
  
   
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueue::workThread(void * arg)  
 {  
   
         PEG_FUNC_ENTER(TRC_DISPATCHER,"MessageQueue::workThread()");  
   
         // get thread from argument  
         Thread * thread = (Thread *)arg;  
   
         PEGASUS_ASSERT(thread != 0);  
   
         // get message queue from thread  
         MessageQueue * queue = (MessageQueue *)thread->get_parm();  
   
         PEGASUS_ASSERT(queue != 0);  
   
         while(true)  
         {  
            if(thread->is_cancelled())  
            {  
               break;  
            }  
   
            // wait for work  
            queue->_workSemaphore.wait();  
   
            // stop the thread when the message queue has been destroyed.  
            // ATTN: should check the thread cancel flag that is not yet exposed!  
            if(MessageQueue::lookup(queue->_queueId) == 0)  
            {  
              Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,  
                      "MessageQueue::workThread - ",  
                      "Message queue, %i, for thread no longer exists",  
                      queue->_queueId);  
               break;  
            }  
   
            // ensure the queue has a message before dispatching  
            if(queue->_count != 0)  
            {  
               queue->handleEnqueue();  
            }  
         }  
   
         PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::workThread()");  
   
         thread->exit_self(PEGASUS_THREAD_RETURN(0));  
   
         return(0);  
 }  
   
 void MessageQueue::enqueue(Message* message) throw(IPCException) void MessageQueue::enqueue(Message* message) throw(IPCException)
 { {
  
Line 222 
Line 159 
     }     }
     message->_owner = this;     message->_owner = this;
     _count++;     _count++;
     if( _async == true )  
     {  
        _workSemaphore.signal();  
   
     }  
  
     _mut.unlock();     _mut.unlock();
  
     if(_async == false )  
        handleEnqueue();        handleEnqueue();
  
 } }


Legend:
Removed from v.1.21  
changed lines
  Added in v.1.22

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2