(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueue.cpp between version 1.47.12.2 and 1.56

version 1.47.12.2, 2006/07/01 15:07:17 version 1.56, 2007/03/28 19:34:53
Line 29 
Line 29 
 // //
 //============================================================================== //==============================================================================
 // //
 // Author: Mike Brasher (mbrasher@bmc.com)  
 //  
 // Modified By: Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090  
 //              Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#2076  
 //              David Dillard, VERITAS Software Corp.  
 //                  (david.dillard@veritas.com)  
 //              Aruran, IBM (ashanmug@in.ibm.com) for Bug# 3475  
 //  
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include <Pegasus/Common/HashTable.h> #include <Pegasus/Common/HashTable.h>
 #include <Pegasus/Common/IPC.h>  
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
   #include <Pegasus/Common/CimomMessage.h>
 #include "MessageQueue.h" #include "MessageQueue.h"
 #include "MessageQueueService.h"  
 #include "IDFactory.h" #include "IDFactory.h"
  
 PEGASUS_USING_STD; PEGASUS_USING_STD;
Line 56 
Line 47 
 static QueueTable _queueTable(256); static QueueTable _queueTable(256);
 static Mutex q_table_mut ; static Mutex q_table_mut ;
  
 void MessageQueue::remove_myself(Uint32 qid)  
 {  
     AutoMutex autoMut(q_table_mut);  
     _queueTable.remove(qid);  
 }  
   
 static IDFactory _qidFactory(CIMOM_Q_ID + 1); static IDFactory _qidFactory(CIMOM_Q_ID + 1);
  
 Uint32 MessageQueue::getNextQueueId() Uint32 MessageQueue::getNextQueueId()
Line 79 
Line 64 
     const char* name,     const char* name,
     Boolean async,     Boolean async,
     Uint32 queueId)     Uint32 queueId)
    : _queueId(queueId), _capabilities(0), _count(0), _front(0), _back(0), _async(async)     : _queueId(queueId), _capabilities(0), _async(async)
 { {
     //     //
     // Copy the name:     // Copy the name:
Line 93 
Line 78 
     _name = new char[strlen(name) + 1];     _name = new char[strlen(name) + 1];
     strcpy(_name, name);     strcpy(_name, name);
  
     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         "MessageQueue::MessageQueue  name = %s, queueId = %u", name, queueId);          "MessageQueue::MessageQueue  name = %s, queueId = %u", name, queueId));
  
     //     //
     // Insert into queue table:     // Insert into queue table:
Line 110 
Line 95 
 { {
     // ATTN-A: thread safety!     // ATTN-A: thread safety!
     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);          "MessageQueue::~MessageQueue queueId = %i, name = %s",
           _queueId,
           _name));
  
     {     {
         AutoMutex autoMut(q_table_mut);         AutoMutex autoMut(q_table_mut);
Line 122 
Line 109 
  
     delete [] _name;     delete [] _name;
  
     while(_front)  
     {  
        Message* tmp = _front;  
        _front = _front->_next;  
        delete tmp;  
     }  
   
     // Return the queue id.     // Return the queue id.
  
     putQueueId(_queueId);     putQueueId(_queueId);
Line 140 
Line 120 
 { {
     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
  
     if (!message)      PEGASUS_ASSERT(message != 0);
     {  
         Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,  
                     "MessageQueue::enqueue failure");  
         PEG_METHOD_EXIT();  
         throw NullPointer();  
     }  
  
     PEG_TRACE_STRING( TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
                       String("Queue name: ") + getQueueName() ) ;          "Queue name: %s",
     Tracer::trace   ( TRC_MESSAGEQUEUESERVICE,          getQueueName()));
                       Tracer::LEVEL3,  
                       "Message: [%s, %d]",  
                       MessageTypeToString(message->getType()),  
                       message->getKey() );  
   
     {  
     AutoMutex autoMut(_mut);  
     if (_back)  
     {  
         _back->_next = message;  
         message->_prev = _back;  
         message->_next = 0;  
         _back = message;  
     }  
     else  
     {  
         _front = message;  
         _back = message;  
         message->_prev = 0;  
         message->_next = 0;  
     }  
     message->_owner = this;  
   
     _count++;  
     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,  
                   "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);  
  
     } // mutex unlocks here      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "Message: [%s]",
           MessageTypeToString(message->getType())));
   
       _messageList.insert_back(message);
  
     handleEnqueue();     handleEnqueue();
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
Line 188 
Line 140 
 { {
     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
  
     AutoMutex autoMut(_mut);      Message* message = _messageList.remove_front();
     if (_front)  
     {  
         Message* message = _front;  
         _front = _front->_next;  
         if (_front)  
             _front->_prev = 0;  
   
         if (_back == message)  
             _back = 0;  
   
         _count--;  
         Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,  
             "MessageQueue::dequeue _queueId = %d, _count = %d",  
             _queueId, _count);  
   
         message->_next = 0;  
         message->_prev = 0;  
         message->_owner = 0;  
  
         PEG_METHOD_EXIT();         PEG_METHOD_EXIT();
         return message;         return message;
     }     }
  
     PEG_METHOD_EXIT();  
     return 0;  
 }  
   
   
   
 void MessageQueue::remove(Message* message)  
 {  
     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");  
   
     if (!message)  
     {  
         PEG_METHOD_EXIT();  
         throw NullPointer();  
     }  
   
     if (message->_owner != this)  
     {  
         PEG_METHOD_EXIT();  
         throw NoSuchMessageOnQueue();  
     }  
   
     {  
     AutoMutex autoMut(_mut);  
   
     if (message->_next)  
         message->_next->_prev = message->_prev;  
     else  
         _back = message->_prev;  
   
     if (message->_prev)  
         message->_prev->_next = message->_next;  
     else  
         _front = message->_next;  
   
     _count--;  
     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,  
        "MessageQueue::remove _count = %d", _count);  
   
     } // mutex unlocks here  
   
     message->_prev = 0;  
     message->_next = 0;  
     message->_owner = 0;  
   
     PEG_METHOD_EXIT();  
 }  
   
 Message* MessageQueue::findByType(Uint32 type)  
 {  
     AutoMutex autoMut(_mut);  
   
     for (Message* m = front(); m; m = m->getNext())  
     {  
         if (m->getType() == type)  
         {  
             return m;  
         }  
     }  
   
     return 0;  
 }  
   
 Message* MessageQueue::findByKey(Uint32 key)  
 {  
     AutoMutex autoMut(_mut);  
   
     for (Message* m = front(); m; m = m->getNext())  
     {  
        if (m->getKey() == key)  
        {  
           return m;  
        }  
   
     }  
   
     return 0;  
 }  
   
 #ifdef PEGASUS_DEBUG  
 void MessageQueue::print(ostream& os) const  
 {  
     AutoMutex autoMut(const_cast<MessageQueue *>(this)->_mut);  
   
     for (const Message* m = front(); m; m = m->getNext())  
         m->print(os);  
 }  
 #endif  
   
 Message* MessageQueue::find(Uint32 type, Uint32 key)  
 {  
     AutoMutex autoMut(_mut);  
   
     for (Message* m = front(); m; m = m->getNext())  
     {  
         if (m->getType() == type && m->getKey() == key)  
         {  
             return m;  
         }  
     }  
   
     return 0;  
 }  
   
 const char* MessageQueue::getQueueName() const const char* MessageQueue::getQueueName() const
 { {
     return _name;     return _name;
Line 334 
Line 164 
  
     // Not found!     // Not found!
  
     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         "MessageQueue::lookup failure queueId = %u", queueId);          "MessageQueue::lookup failure queueId = %u", queueId));
  
     return 0;     return 0;
 } }
Line 353 
Line 183 
         // ATTN: Need to decide how many characters to compare in queue names         // ATTN: Need to decide how many characters to compare in queue names
         if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )         if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )
         {         {
             return( (MessageQueue *)i.value());              return (MessageQueue *)i.value();
         }         }
     }     }
  
     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
                     "MessageQueue::lookup failure - name = %s", name);          "MessageQueue::lookup failure - name = %s", name));
  
     return 0;     return 0;
 } }


Legend:
Removed from v.1.47.12.2  
changed lines
  Added in v.1.56

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2