(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueue.cpp between version 1.6.2.5 and 1.54

version 1.6.2.5, 2001/12/06 22:33:25 version 1.54, 2006/11/10 18:14:58
Line 1 
Line 1 
 //%/////////////////////////////////////////////////////////////////////////////  //%2006////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   // IBM Corp.; EMC Corporation, The Open Group.
   // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; Symantec Corporation; The Open Group.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 20 
Line 29 
 // //
 //============================================================================== //==============================================================================
 // //
 // Author: Mike Brasher (mbrasher@bmc.com)  
 //  
 // Modified By:  
 //  
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include <Pegasus/Common/HashTable.h> #include <Pegasus/Common/HashTable.h>
 #include <Pegasus/Common/IPC.h>  #include <Pegasus/Common/Tracer.h>
   #include <Pegasus/Common/CimomMessage.h>
 #include "MessageQueue.h" #include "MessageQueue.h"
   #include "IDFactory.h"
  
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
Line 37 
Line 44 
 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> > typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
     QueueTable;     QueueTable;
  
 static QueueTable _queueTable(128);  static QueueTable _queueTable(256);
   static Mutex q_table_mut ;
 static Uint32 _GetNextQueueId()  
 {  
    static Mutex _mut;  
   
    static Uint32 _queueId = 2;  
  
    _mut.lock(pegasus_thread_self());  static IDFactory _qidFactory(CIMOM_Q_ID + 1);
  
    // Handle wrap-around!  Uint32 MessageQueue::getNextQueueId()
    if (_queueId == 0)  
       _queueId = MessageQueue::_CIMOM_Q_ID;  
    Uint32 ret = _queueId++;  
    _mut.unlock();  
   
    return ret;  
 }  
   
 Uint32 MessageQueue::_CIMOM_Q_ID = 1;  
   
 MessageQueue::MessageQueue() : _count(0), _front(0), _back(0)  
 { {
     // ATTN-A: thread safety!      return _qidFactory.getID();
   
    memset(_name, 0x00, 16);  
     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))  
         ;  
 } }
  
 MessageQueue::MessageQueue(char *name) : _count(0), _front(0), _back(0)  void MessageQueue::putQueueId(Uint32 queueId)
 {  
    if(name != NULL)  
    {    {
       strncpy(_name, name, 15);      if (queueId != CIMOM_Q_ID)
       _name[15] = 0x00;          _qidFactory.putID(queueId);
    }  
   
    else  
       memset(_name, 0x00,16);  
     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))  
        ;  
 } }
  
   MessageQueue::MessageQueue(
 MessageQueue::~MessageQueue()      const char* name,
       Boolean async,
       Uint32 queueId)
      : _queueId(queueId), _capabilities(0), _async(async)
 { {
     // ATTN-A: thread safety!      //
       // Copy the name:
     _queueTable.remove(_queueId);      //
 }  
  
 void MessageQueue::enqueue(Message* message)      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::MessageQueue()");
 {  
     if (!message)  
         throw NullPointer();  
  
     if (getenv("PEGASUS_TRACE"))      if (!name)
     {          name = "";
         cout << "===== " << getQueueName() << ": ";  
         message->print(cout);  
     }  
  
     if (_back)      _name = new char[strlen(name) + 1];
     {      strcpy(_name, name);
         _back->_next = message;  
         message->_prev = _back;  
         message->_next = 0;  
         _back = message;  
     }  
     else  
     {  
         _front = message;  
         _back = message;  
         message->_prev = 0;  
         message->_next = 0;  
     }  
     message->_owner = this;  
     _count++;  
  
     handleEnqueue();      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
 }          "MessageQueue::MessageQueue  name = %s, queueId = %u", name, queueId);
  
 Message* MessageQueue::dequeue()      //
 {      // Insert into queue table:
     if (_front)      //
     {      AutoMutex autoMut(q_table_mut);
         Message* message = _front;      while (!_queueTable.insert(_queueId, this))
         _front = _front->_next;          ;
         if (_front)  
             _front->_prev = 0;  
   
         if (_back == message)  
             _back = 0;  
   
         message->_next = 0;  
         message->_prev = 0;  
         message->_owner = 0;  
         _count--;  
  
         return message;      PEG_METHOD_EXIT();
     }  
     return 0;  
 } }
  
 void MessageQueue::remove(Message* message)  MessageQueue::~MessageQueue()
 { {
     if (!message)      // ATTN-A: thread safety!
         throw NullPointer();      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
  
     if (message->_owner != this)      {
         throw NoSuchMessageOnQueue();          AutoMutex autoMut(q_table_mut);
           _queueTable.remove(_queueId);
       } // mutex unlocks here
  
     if (message->_next)      // Free the name:
         message->_next->_prev = message->_prev;  
     else  
         _back = message->_prev;  
  
     if (message->_prev)      delete [] _name;
         message->_prev->_next = message->_next;  
     else  
         _front = message->_next;  
  
     message->_prev = 0;      // Return the queue id.
     message->_next = 0;  
     message->_owner = 0;  
     _count--;  
 }  
  
 Message* MessageQueue::findByType(Uint32 type)      putQueueId(_queueId);
 {  
     for (Message* m = front(); m; m = m->getNext())  
     {  
         if (m->getType() == type)  
             return m;  
     }  
  
     return 0;      PEG_METHOD_EXIT();
 } }
  
 Message* MessageQueue::findByKey(Uint32 key)  void MessageQueue::enqueue(Message* message)
 {  
     for (Message* m = front(); m; m = m->getNext())  
     {     {
         if (m->getKey() == key)      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
             return m;  
     }  
  
     return 0;      PEGASUS_ASSERT(message != 0);
 }  
  
 void MessageQueue::print(ostream& os) const      PEG_TRACE_STRING(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
 {          String("Queue name: ") + getQueueName());
     for (const Message* m = front(); m; m = m->getNext())      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         m->print(os);          "Message: [%s]",
 }          MessageTypeToString(message->getType()));
  
 Message* MessageQueue::find(Uint32 type, Uint32 key)      _messageList.insert_back(message);
 {  
     for (Message* m = front(); m; m = m->getNext())  
     {  
         if (m->getType() == type && m->getKey() == key)  
             return m;  
     }  
  
     return 0;      handleEnqueue();
       PEG_METHOD_EXIT();
 } }
  
 void MessageQueue::lock()  Message* MessageQueue::dequeue()
 { {
       PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
  
 }      Message* message = _messageList.remove_front();
   
 void MessageQueue::unlock()  
 {  
  
       PEG_METHOD_EXIT();
       return message;
 } }
  
 const char* MessageQueue::getQueueName() const const char* MessageQueue::getQueueName() const
 { {
    if(_name[0] != 0x00)  
       return _name;       return _name;
    return "unknown";  
 } }
  
 MessageQueue* MessageQueue::lookup(Uint32 queueId) MessageQueue* MessageQueue::lookup(Uint32 queueId)
 { {
   
     MessageQueue* queue = 0;     MessageQueue* queue = 0;
       AutoMutex autoMut(q_table_mut);
  
     if (_queueTable.lookup(queueId, queue))     if (_queueTable.lookup(queueId, queue))
       {
         return queue;         return queue;
       }
  
     // Not found!     // Not found!
   
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "MessageQueue::lookup failure queueId = %u", queueId);
   
     return 0;     return 0;
 } }
  
  
 MessageQueue* MessageQueue::lookup(const char *name) MessageQueue* MessageQueue::lookup(const char *name)
 { {
   
    if(name == NULL)    if(name == NULL)
       throw NullPointer();       throw NullPointer();
   
       AutoMutex autoMut(q_table_mut);
    for(QueueTable::Iterator i = _queueTable.start(); i; i++)    for(QueueTable::Iterator i = _queueTable.start(); i; i++)
    {    {
         // ATTN: Need to decide how many characters to compare in queue names         // ATTN: Need to decide how many characters to compare in queue names
         if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )          if (!strcmp(((MessageQueue *)i.value())->getQueueName(), name))
             return( (MessageQueue *)i.value());          {
               return (MessageQueue *)i.value();
           }
    }    }
   
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "MessageQueue::lookup failure - name = %s", name);
   
    return 0;    return 0;
 } }
  


Legend:
Removed from v.1.6.2.5  
changed lines
  Added in v.1.54

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2