version 1.7, 2001/12/13 14:54:00
|
version 1.8, 2001/12/14 20:13:24
|
|
|
| |
Uint32 MessageQueue::_CIMOM_Q_ID = 1; | Uint32 MessageQueue::_CIMOM_Q_ID = 1; |
| |
MessageQueue::MessageQueue() : _mut( ), _count(0), _front(0), _back(0) |
MessageQueue::MessageQueue(const char * name) |
{ |
: _mut( ), _count(0), _front(0), _back(0), |
// ATTN-A: thread safety! |
_workThread(MessageQueue::workThread, this, false), |
q_table_mut.lock(pegasus_thread_self()); |
_workSemaphore(0) |
|
|
memset(_name, 0x00, 26); |
|
|
|
while (!_queueTable.insert(_queueId = _GetNextQueueId(), this)) |
|
; |
|
q_table_mut.unlock(); |
|
} |
|
|
|
MessageQueue::MessageQueue(char *name) : _mut( ), _count(0), _front(0), _back(0) |
|
{ | { |
if(name != NULL) | if(name != NULL) |
{ | { |
strncpy(_name, name, 25); | strncpy(_name, name, 25); |
_name[25] = 0x00; | _name[25] = 0x00; |
} | } |
|
|
else | else |
memset(_name, 0x00,25); | memset(_name, 0x00,25); |
| |
|
|
; | ; |
q_table_mut.unlock(); | q_table_mut.unlock(); |
| |
|
_workThread.run(); |
} | } |
| |
|
|
MessageQueue::~MessageQueue() | MessageQueue::~MessageQueue() |
{ | { |
// ATTN-A: thread safety! | // ATTN-A: thread safety! |
|
|
_queueTable.remove(_queueId); | _queueTable.remove(_queueId); |
q_table_mut.unlock(); | q_table_mut.unlock(); |
| |
|
_workThread.cancel(); // cancel thread |
|
_workSemaphore.signal();// wake thread |
|
_workThread.join(); // wait for thread to complete |
|
} |
|
|
|
PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueue::workThread(void * arg) |
|
{ |
|
// get thread from argument |
|
Thread * thread = (Thread *)arg; |
|
|
|
PEGASUS_ASSERT(thread != 0); |
|
|
|
// get message queue from thread |
|
MessageQueue * queue = (MessageQueue *)thread->get_parm(); |
|
|
|
PEGASUS_ASSERT(queue != 0); |
|
|
|
while(true) |
|
{ |
|
// wait for work |
|
queue->_workSemaphore.wait(); |
|
|
|
// stop the thread when the message queue has been destroyed. |
|
// ATTN: should check the thread cancel flag that is not yet exposed! |
|
if(MessageQueue::lookup(queue->_queueId) == 0) |
|
{ |
|
break; |
|
} |
|
|
|
// ensure the queue has a message before dispatching |
|
if(queue->_count != 0) |
|
{ |
|
queue->handleEnqueue(); |
|
} |
|
} |
|
|
|
thread->exit_self(PEGASUS_THREAD_RETURN(0)); |
|
|
|
return(0); |
} | } |
| |
void MessageQueue::enqueue(Message* message) throw(IPCException) | void MessageQueue::enqueue(Message* message) throw(IPCException) |
|
|
_count++; | _count++; |
_mut.unlock(); | _mut.unlock(); |
| |
handleEnqueue(); |
_workSemaphore.signal(); |
} | } |
| |
Message* MessageQueue::dequeue() throw(IPCException) | Message* MessageQueue::dequeue() throw(IPCException) |