(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueue.cpp between version 1.32.4.1 and 1.49.2.1

version 1.32.4.1, 2002/10/25 20:49:43 version 1.49.2.1, 2006/07/27 23:11:51
Line 1 
Line 1 
 //%/////////////////////////////////////////////////////////////////////////////  //%2006////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
 // The Open Group, Tivoli Systems  // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   // IBM Corp.; EMC Corporation, The Open Group.
   // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; Symantec Corporation; The Open Group.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 23 
Line 31 
 // //
 // Author: Mike Brasher (mbrasher@bmc.com) // Author: Mike Brasher (mbrasher@bmc.com)
 // //
 // Modified By:  // Modified By: Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090
   //              Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#2076
   //              David Dillard, VERITAS Software Corp.
   //                  (david.dillard@veritas.com)
   //              Aruran, IBM (ashanmug@in.ibm.com) for Bug# 3475
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include <Pegasus/Common/HashTable.h> #include <Pegasus/Common/HashTable.h>
 #include <Pegasus/Common/IPC.h>  
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
 #include "MessageQueue.h" #include "MessageQueue.h"
 #include "MessageQueueService.h" #include "MessageQueueService.h"
   #include "IDFactory.h"
   
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
Line 39 
Line 52 
 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> > typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
     QueueTable;     QueueTable;
  
 static QueueTable _queueTable(128);  static QueueTable _queueTable(256);
 static Mutex q_table_mut ; static Mutex q_table_mut ;
  
 Uint32 MessageQueue::getNextQueueId() throw(IPCException)  void MessageQueue::remove_myself(Uint32 qid)
 { {
    static Uint32 _nextQueueId = 2;      AutoMutex autoMut(q_table_mut);
       _queueTable.remove(qid);
    //  }
    // Lock mutex:  
    //  
   
    static Mutex _id_mut ;  
    _id_mut.lock(pegasus_thread_self());  
   
    // Assign next queue id. Handle wrap around and never assign zero as  
    // a queue id:  
   
    if (_nextQueueId == 0)  
       _nextQueueId = 2;  
   
    Uint32 queueId = _nextQueueId++;  
   
    //  
    // Unlock mutex:  
    //  
  
    _id_mut.unlock();  static IDFactory _qidFactory(CIMOM_Q_ID + 1);
  
    return queueId;  Uint32 MessageQueue::getNextQueueId()
   {
       return _qidFactory.getID();
 } }
  
   void MessageQueue::putQueueId(Uint32 queueId)
   {
       if (queueId != CIMOM_Q_ID)
           _qidFactory.putID(queueId);
   }
  
 MessageQueue::MessageQueue( MessageQueue::MessageQueue(
     const char* name,     const char* name,
Line 91 
Line 93 
     strcpy(_name, name);     strcpy(_name, name);
  
     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         "MessageQueue::MessageQueue  name = %s, queueId = %i", name, queueId);          "MessageQueue::MessageQueue  name = %s, queueId = %u", name, queueId);
  
     //     //
     // Insert into queue table:     // Insert into queue table:
     //     //
       AutoMutex autoMut(q_table_mut);
     q_table_mut.lock(pegasus_thread_self());      while (!_queueTable.insert(_queueId, this))
           ;
     if ( ! _queueTable.insert(_queueId, this))  
        abort();  
   
     q_table_mut.unlock();  
   
  
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
 } }
Line 111 
Line 108 
 MessageQueue::~MessageQueue() MessageQueue::~MessageQueue()
 { {
     // ATTN-A: thread safety!     // ATTN-A: thread safety!
   
     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
   
     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);         "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
  
     q_table_mut.lock(pegasus_thread_self());      {
           AutoMutex autoMut(q_table_mut);
     _queueTable.remove(_queueId);     _queueTable.remove(_queueId);
     q_table_mut.unlock();      } // mutex unlocks here
  
     // Free the name:     // Free the name:
  
     delete [] _name;     delete [] _name;
  
       while(_front)
       {
          Message* tmp = _front;
          _front = _front->_next;
          delete tmp;
       }
   
       // Return the queue id.
   
       putQueueId(_queueId);
   
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 } }
  
 void MessageQueue::enqueue(Message* message) throw(IPCException)  void MessageQueue::enqueue(Message* message)
 { {
     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
  
Line 145 
Line 151 
                       String("Queue name: ") + getQueueName() ) ;                       String("Queue name: ") + getQueueName() ) ;
     Tracer::trace   ( TRC_MESSAGEQUEUESERVICE,     Tracer::trace   ( TRC_MESSAGEQUEUESERVICE,
                       Tracer::LEVEL3,                       Tracer::LEVEL3,
                       "Message: [%s, %d]",                        "Message: [%s]",
                       MessageTypeToString(message->getType()),                        MessageTypeToString(message->getType()));
                       message->getKey() );  
  
     _mut.lock(pegasus_thread_self());      {
       AutoMutex autoMut(_mut);
     if (_back)     if (_back)
     {     {
        _back->_next = message;        _back->_next = message;
Line 170 
Line 176 
     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
                   "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);                   "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);
  
     _mut.unlock();      } // mutex unlocks here
  
     handleEnqueue();     handleEnqueue();
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 } }
  
 Message* MessageQueue::dequeue() throw(IPCException)  Message* MessageQueue::dequeue()
 { {
     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
  
    _mut.lock(pegasus_thread_self());      AutoMutex autoMut(_mut);
     if (_front)     if (_front)
     {     {
         Message* message = _front;         Message* message = _front;
Line 196 
Line 202 
             "MessageQueue::dequeue _queueId = %d, _count = %d",             "MessageQueue::dequeue _queueId = %d, _count = %d",
             _queueId, _count);             _queueId, _count);
  
         _mut.unlock();  
         message->_next = 0;         message->_next = 0;
         message->_prev = 0;         message->_prev = 0;
         message->_owner = 0;         message->_owner = 0;
Line 204 
Line 209 
         PEG_METHOD_EXIT();         PEG_METHOD_EXIT();
         return message;         return message;
     }     }
     _mut.unlock();  
  
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
     return 0;     return 0;
 } }
 ;  
  
  
 void MessageQueue::remove(Message* message) throw(IPCException)  
   void MessageQueue::remove(Message* message)
 { {
     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");
  
Line 228 
Line 232 
         throw NoSuchMessageOnQueue();         throw NoSuchMessageOnQueue();
     }     }
  
     _mut.lock(pegasus_thread_self());      {
       AutoMutex autoMut(_mut);
  
     if (message->_next)     if (message->_next)
         message->_next->_prev = message->_prev;         message->_next->_prev = message->_prev;
Line 244 
Line 249 
     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
        "MessageQueue::remove _count = %d", _count);        "MessageQueue::remove _count = %d", _count);
  
     _mut.unlock();      } // mutex unlocks here
  
     message->_prev = 0;     message->_prev = 0;
     message->_next = 0;     message->_next = 0;
Line 253 
Line 258 
     PEG_METHOD_EXIT();     PEG_METHOD_EXIT();
 } }
  
 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)  Message* MessageQueue::findByType(Uint32 type)
 { {
    _mut.lock(pegasus_thread_self());      AutoMutex autoMut(_mut);
  
     for (Message* m = front(); m; m = m->getNext())     for (Message* m = front(); m; m = m->getNext())
     {     {
        if (m->getType() == type)        if (m->getType() == type)
        {        {
           _mut.unlock();  
           return m;           return m;
        }        }
     }     }
     _mut.unlock();  
     return 0;  
 }  
  
 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)  
 {  
    _mut.lock(pegasus_thread_self());  
   
     for (Message* m = front(); m; m = m->getNext())  
     {  
        if (m->getKey() == key)  
        {  
           _mut.unlock();  
           return m;  
        }  
   
     }  
     _mut.unlock();  
     return 0;     return 0;
 } }
  
 void MessageQueue::print(ostream& os) const throw(IPCException)  #ifdef PEGASUS_DEBUG
   void MessageQueue::print(ostream& os) const
 { {
    const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());      AutoMutex autoMut(const_cast<MessageQueue *>(this)->_mut);
  
    for (const Message* m = front(); m; m = m->getNext())    for (const Message* m = front(); m; m = m->getNext())
         m->print(os);         m->print(os);
    const_cast<MessageQueue *>(this)->_mut.unlock();  
 }  
   
 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)  
 {  
    _mut.lock(pegasus_thread_self());  
   
     for (Message* m = front(); m; m = m->getNext())  
     {  
        if (m->getType() == type && m->getKey() == key)  
        {  
           _mut.unlock();  
           return m;  
        }  
     }  
     _mut.unlock();  
   
     return 0;  
 }  
   
 void MessageQueue::lock() throw(IPCException)  
 {  
    _mut.lock(pegasus_thread_self());  
 }  
   
 void MessageQueue::unlock()  
 {  
    _mut.unlock();  
 } }
   #endif
  
 const char* MessageQueue::getQueueName() const const char* MessageQueue::getQueueName() const
 { {
    return _name;    return _name;
 } }
  
 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)  MessageQueue* MessageQueue::lookup(Uint32 queueId)
 { {
  
     MessageQueue* queue = 0;     MessageQueue* queue = 0;
     q_table_mut.lock(pegasus_thread_self());      AutoMutex autoMut(q_table_mut);
  
     if (_queueTable.lookup(queueId, queue))     if (_queueTable.lookup(queueId, queue))
     {     {
        q_table_mut.unlock();  
        return queue;        return queue;
     }     }
  
     // Not found!     // Not found!
  
     q_table_mut.unlock();  
   
     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         "MessageQueue::lookup failure queueId = %i", queueId);          "MessageQueue::lookup failure queueId = %u", queueId);
  
     return 0;     return 0;
 } }
  
  
 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)  MessageQueue* MessageQueue::lookup(const char *name)
 { {
  
    if(name == NULL)    if(name == NULL)
       throw NullPointer();       throw NullPointer();
    q_table_mut.lock(pegasus_thread_self());  
  
       AutoMutex autoMut(q_table_mut);
    for(QueueTable::Iterator i = _queueTable.start(); i; i++)    for(QueueTable::Iterator i = _queueTable.start(); i; i++)
    {    {
         // ATTN: Need to decide how many characters to compare in queue names         // ATTN: Need to decide how many characters to compare in queue names
       if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )       if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )
       {       {
          q_table_mut.unlock();  
          return( (MessageQueue *)i.value());          return( (MessageQueue *)i.value());
       }       }
   
    }    }
    q_table_mut.unlock();  
  
    Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,    Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         "MessageQueue::lookup failure - name = %s", name);         "MessageQueue::lookup failure - name = %s", name);


Legend:
Removed from v.1.32.4.1  
changed lines
  Added in v.1.49.2.1

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2