(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueue.cpp between version 1.11 and 1.43

version 1.11, 2001/12/17 20:30:52 version 1.43, 2005/05/04 12:44:03
Line 1 
Line 1 
 //%/////////////////////////////////////////////////////////////////////////////  //%2005////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   // IBM Corp.; EMC Corporation, The Open Group.
   // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
   // EMC Corporation; VERITAS Software Corporation; The Open Group.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 22 
Line 29 
 // //
 // Author: Mike Brasher (mbrasher@bmc.com) // Author: Mike Brasher (mbrasher@bmc.com)
 // //
 // Modified By:  // Modified By: Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090
   //              Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#2076
   //              David Dillard, VERITAS Software Corp.
   //                  (david.dillard@veritas.com)
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include <Pegasus/Common/HashTable.h> #include <Pegasus/Common/HashTable.h>
 #include <Pegasus/Common/IPC.h> #include <Pegasus/Common/IPC.h>
   #include <Pegasus/Common/Tracer.h>
 #include "MessageQueue.h" #include "MessageQueue.h"
   #include "MessageQueueService.h"
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
Line 37 
Line 48 
 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> > typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
     QueueTable;     QueueTable;
  
 static QueueTable _queueTable(128);  static QueueTable _queueTable(256);
 static Mutex q_table_mut = Mutex();  static Mutex q_table_mut ;
  
 static Uint32 _GetNextQueueId() throw(IPCException)  void MessageQueue::remove_myself(Uint32 qid)
 { {
       AutoMutex autoMut(q_table_mut);
       _queueTable.remove(qid);
   }
  
    static Uint32 _queueId = 2;  
    static Mutex _id_mut = Mutex();  
   
    _id_mut.lock(pegasus_thread_self());  
  
    // Handle wrap-around!  Uint32 MessageQueue::getNextQueueId()
    if (_queueId == 0)  {
       _queueId = MessageQueue::_CIMOM_Q_ID;      static Uint32 _nextQueueId = 2;
    Uint32 ret = _queueId++;  
    _id_mut.unlock();  
  
    return ret;      //
 }      // Lock mutex:
       //
       static Mutex _id_mut ;
       AutoMutex autoMut(_id_mut);
  
 Uint32 MessageQueue::_CIMOM_Q_ID = 1;      Uint32 queueId;
  
 MessageQueue::MessageQueue(const char * name)      // Assign the next queue ID that is not already in use
         : _mut( ), _count(0), _front(0), _back(0),      do
         _workThread(MessageQueue::workThread, this, false),  
         _workSemaphore(0)  
 { {
    if(name != NULL)          // Handle wrap around and never assign zero or one as a queue id:
           if (_nextQueueId == 0)
    {    {
       strncpy(_name, name, 25);              _nextQueueId = 2;
       _name[25] = 0x00;  
    }    }
    else  
       memset(_name, 0x00,25);  
   
     q_table_mut.lock(pegasus_thread_self());  
  
     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))          queueId = _nextQueueId++;
        ;      } while (lookup(queueId) != 0);
     q_table_mut.unlock();  
  
         _workThread.run();      return queueId;
 } }
  
 MessageQueue::~MessageQueue()  
   
   MessageQueue::MessageQueue(
       const char* name,
       Boolean async,
       Uint32 queueId)
      : _queueId(queueId), _capabilities(0), _count(0), _front(0), _back(0), _async(async)
 { {
     // ATTN-A: thread safety!      //
     q_table_mut.lock(pegasus_thread_self());      // Copy the name:
       //
  
     _queueTable.remove(_queueId);      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::MessageQueue()");
     q_table_mut.unlock();  
  
         _workThread.cancel();   // cancel thread      if (!name)
         _workSemaphore.signal();// wake thread          name = "";
         _workThread.join();             // wait for thread to complete  
 }  
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueue::workThread(void * arg)      _name = new char[strlen(name) + 1];
 {      strcpy(_name, name);
         // get thread from argument  
         Thread * thread = (Thread *)arg;  
  
         PEGASUS_ASSERT(thread != 0);      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "MessageQueue::MessageQueue  name = %s, queueId = %u", name, queueId);
  
         // get message queue from thread      //
         MessageQueue * queue = (MessageQueue *)thread->get_parm();      // Insert into queue table:
       //
       AutoMutex autoMut(q_table_mut);
       while (!_queueTable.insert(_queueId, this))
           ;
  
         PEGASUS_ASSERT(queue != 0);      PEG_METHOD_EXIT();
   }
  
         while(true)  MessageQueue::~MessageQueue()
         {  
                 if(thread->is_cancelled())  
                 {                 {
                         break;      // ATTN-A: thread safety!
                 }      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
  
                 thread->sleep(1);      {
                 continue;          AutoMutex autoMut(q_table_mut);
           _queueTable.remove(_queueId);
       } // mutex unlocks here
  
                 // wait for work      // Free the name:
                 queue->_workSemaphore.wait();  
  
                 // stop the thread when the message queue has been destroyed.      delete [] _name;
                 // ATTN: should check the thread cancel flag that is not yet exposed!  
                 if(MessageQueue::lookup(queue->_queueId) == 0)  
                 {  
                         break;  
                 }  
  
                 // ensure the queue has a message before dispatching      while(_front)
                 if(queue->_count != 0)  
                 {                 {
                         queue->handleEnqueue();         Message* tmp = _front;
                 }         _front = _front->_next;
          delete tmp;
         }         }
  
         thread->exit_self(PEGASUS_THREAD_RETURN(0));      PEG_METHOD_EXIT();
   
         return(0);  
 } }
  
 void MessageQueue::enqueue(Message* message) throw(IPCException)  void MessageQueue::enqueue(Message* message)
 { {
     if (!message)      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
        throw NullPointer();  
  
     if (getenv("PEGASUS_TRACE"))      if (!message)
     {     {
        cout << "===== " << getQueueName() << ": ";          Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
        message->print(cout);                      "MessageQueue::enqueue failure");
           PEG_METHOD_EXIT();
           throw NullPointer();
     }     }
  
     _mut.lock(pegasus_thread_self());      PEG_TRACE_STRING( TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
                         String("Queue name: ") + getQueueName() ) ;
       Tracer::trace   ( TRC_MESSAGEQUEUESERVICE,
                         Tracer::LEVEL3,
                         "Message: [%s, %d]",
                         MessageTypeToString(message->getType()),
                         message->getKey() );
   
       {
       AutoMutex autoMut(_mut);
     if (_back)     if (_back)
     {     {
        _back->_next = message;        _back->_next = message;
Line 165 
Line 181 
        message->_next = 0;        message->_next = 0;
     }     }
     message->_owner = this;     message->_owner = this;
   
     _count++;     _count++;
     _mut.unlock();      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
                     "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);
  
 //    _workSemaphore.signal();      } // mutex unlocks here
  
     handleEnqueue();     handleEnqueue();
       PEG_METHOD_EXIT();
 } }
  
 Message* MessageQueue::dequeue() throw(IPCException)  Message* MessageQueue::dequeue()
 { {
    _mut.lock(pegasus_thread_self());      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
   
       AutoMutex autoMut(_mut);
     if (_front)     if (_front)
     {     {
         Message* message = _front;         Message* message = _front;
Line 185 
Line 206 
  
         if (_back == message)         if (_back == message)
             _back = 0;             _back = 0;
   
         _count--;         _count--;
         _mut.unlock();          Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
               "MessageQueue::dequeue _queueId = %d, _count = %d",
               _queueId, _count);
   
         message->_next = 0;         message->_next = 0;
         message->_prev = 0;         message->_prev = 0;
         message->_owner = 0;         message->_owner = 0;
   
           PEG_METHOD_EXIT();
         return message;         return message;
     }     }
     _mut.unlock();  
       PEG_METHOD_EXIT();
     return 0;     return 0;
 } }
  
 void MessageQueue::remove(Message* message) throw(IPCException)  
   
   void MessageQueue::remove(Message* message)
 { {
       PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");
   
     if (!message)     if (!message)
       {
           PEG_METHOD_EXIT();
         throw NullPointer();         throw NullPointer();
       }
  
     if (message->_owner != this)     if (message->_owner != this)
       {
           PEG_METHOD_EXIT();
         throw NoSuchMessageOnQueue();         throw NoSuchMessageOnQueue();
       }
  
     _mut.lock(pegasus_thread_self());      {
       AutoMutex autoMut(_mut);
  
     if (message->_next)     if (message->_next)
         message->_next->_prev = message->_prev;         message->_next->_prev = message->_prev;
Line 217 
Line 256 
         _front = message->_next;         _front = message->_next;
  
     _count--;     _count--;
     _mut.unlock();      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
          "MessageQueue::remove _count = %d", _count);
   
       } // mutex unlocks here
  
     message->_prev = 0;     message->_prev = 0;
     message->_next = 0;     message->_next = 0;
     message->_owner = 0;     message->_owner = 0;
   
       PEG_METHOD_EXIT();
 } }
  
 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)  Message* MessageQueue::findByType(Uint32 type)
 { {
    _mut.lock(pegasus_thread_self());      AutoMutex autoMut(_mut);
  
     for (Message* m = front(); m; m = m->getNext())     for (Message* m = front(); m; m = m->getNext())
     {     {
        if (m->getType() == type)        if (m->getType() == type)
        {        {
           _mut.unlock();  
           return m;           return m;
        }        }
     }     }
     _mut.unlock();  
     return 0;     return 0;
 } }
  
 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)  Message* MessageQueue::findByKey(Uint32 key)
 { {
    _mut.lock(pegasus_thread_self());      AutoMutex autoMut(_mut);
  
     for (Message* m = front(); m; m = m->getNext())     for (Message* m = front(); m; m = m->getNext())
     {     {
        if (m->getKey() == key)        if (m->getKey() == key)
        {        {
           _mut.unlock();  
           return m;           return m;
        }        }
  
     }     }
     _mut.unlock();  
     return 0;     return 0;
 } }
  
 void MessageQueue::print(ostream& os) const throw(IPCException)  void MessageQueue::print(ostream& os) const
 { {
    const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());      AutoMutex autoMut(const_cast<MessageQueue *>(this)->_mut);
  
    for (const Message* m = front(); m; m = m->getNext())    for (const Message* m = front(); m; m = m->getNext())
         m->print(os);         m->print(os);
    const_cast<MessageQueue *>(this)->_mut.unlock();  
 } }
  
 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)  Message* MessageQueue::find(Uint32 type, Uint32 key)
 { {
    _mut.lock(pegasus_thread_self());      AutoMutex autoMut(_mut);
  
     for (Message* m = front(); m; m = m->getNext())     for (Message* m = front(); m; m = m->getNext())
     {     {
        if (m->getType() == type && m->getKey() == key)        if (m->getType() == type && m->getKey() == key)
        {        {
           _mut.unlock();  
           return m;           return m;
        }        }
     }     }
     _mut.unlock();  
  
     return 0;     return 0;
 } }
  
 void MessageQueue::lock() throw(IPCException)  void MessageQueue::lock()
 { {
    _mut.lock(pegasus_thread_self());    _mut.lock(pegasus_thread_self());
 } }
Line 295 
Line 334 
  
 const char* MessageQueue::getQueueName() const const char* MessageQueue::getQueueName() const
 { {
    if(_name[0] != 0x00)  
       return _name;       return _name;
    return "unknown";  
 } }
  
 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)  MessageQueue* MessageQueue::lookup(Uint32 queueId)
 { {
   
     MessageQueue* queue = 0;     MessageQueue* queue = 0;
     q_table_mut.lock(pegasus_thread_self());      AutoMutex autoMut(q_table_mut);
  
     if (_queueTable.lookup(queueId, queue))     if (_queueTable.lookup(queueId, queue))
     {     {
        q_table_mut.unlock();  
        return queue;        return queue;
     }     }
  
     // Not found!     // Not found!
  
     q_table_mut.unlock();      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
           "MessageQueue::lookup failure queueId = %u", queueId);
  
     return 0;     return 0;
 } }
  
  
 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)  MessageQueue* MessageQueue::lookup(const char *name)
 { {
   
    if(name == NULL)    if(name == NULL)
       throw NullPointer();       throw NullPointer();
    q_table_mut.lock(pegasus_thread_self());  
  
       AutoMutex autoMut(q_table_mut);
    for(QueueTable::Iterator i = _queueTable.start(); i; i++)    for(QueueTable::Iterator i = _queueTable.start(); i; i++)
    {    {
         // ATTN: Need to decide how many characters to compare in queue names         // ATTN: Need to decide how many characters to compare in queue names
       if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )          if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )
       {       {
          q_table_mut.unlock();  
          return( (MessageQueue *)i.value());          return( (MessageQueue *)i.value());
       }       }
   
    }    }
    q_table_mut.unlock();  
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
                       "MessageQueue::lookup failure - name = %s", name);
  
    return 0;    return 0;
 } }


Legend:
Removed from v.1.11  
changed lines
  Added in v.1.43

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2