(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueue.cpp between version 1.10 and 1.62

version 1.10, 2001/12/17 14:33:01 version 1.62, 2009/04/30 07:25:12
Line 1 
Line 1 
 //%/////////////////////////////////////////////////////////////////////////////  //%LICENSE////////////////////////////////////////////////////////////////
 //  
 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM  
 //  
 // Permission is hereby granted, free of charge, to any person obtaining a copy  
 // of this software and associated documentation files (the "Software"), to  
 // deal in the Software without restriction, including without limitation the  
 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or  
 // sell copies of the Software, and to permit persons to whom the Software is  
 // furnished to do so, subject to the following conditions:  
 //  
 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN  
 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED  
 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT  
 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR  
 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT  
 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN  
 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION  
 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.  
 // //
 //==============================================================================  // Licensed to The Open Group (TOG) under one or more contributor license
   // agreements.  Refer to the OpenPegasusNOTICE.txt file distributed with
   // this work for additional information regarding copyright ownership.
   // Each contributor licenses this file to you under the OpenPegasus Open
   // Source License; you may not use this file except in compliance with the
   // License.
   //
   // Permission is hereby granted, free of charge, to any person obtaining a
   // copy of this software and associated documentation files (the "Software"),
   // to deal in the Software without restriction, including without limitation
   // the rights to use, copy, modify, merge, publish, distribute, sublicense,
   // and/or sell copies of the Software, and to permit persons to whom the
   // Software is furnished to do so, subject to the following conditions:
   //
   // The above copyright notice and this permission notice shall be included
   // in all copies or substantial portions of the Software.
   //
   // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
   // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
   // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
   // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
   // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
   // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
   // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 // //
 // Author: Mike Brasher (mbrasher@bmc.com)  //////////////////////////////////////////////////////////////////////////
 //  
 // Modified By:  
 // //
 //%///////////////////////////////////////////////////////////////////////////// //%/////////////////////////////////////////////////////////////////////////////
  
 #include <Pegasus/Common/HashTable.h> #include <Pegasus/Common/HashTable.h>
 #include <Pegasus/Common/IPC.h>  #include <Pegasus/Common/Tracer.h>
   #include <Pegasus/Common/CimomMessage.h>
 #include "MessageQueue.h" #include "MessageQueue.h"
   #include "IDFactory.h"
  
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
Line 37 
Line 42 
 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> > typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
     QueueTable;     QueueTable;
  
 static QueueTable _queueTable(128);  static QueueTable _queueTable(256);
 static Mutex q_table_mut = Mutex();  static Mutex q_table_mut ;
   
 static Uint32 _GetNextQueueId() throw(IPCException)  
 {  
  
    static Uint32 _queueId = 2;  static IDFactory _qidFactory;
    static Mutex _id_mut = Mutex();  
  
    _id_mut.lock(pegasus_thread_self());  Uint32 MessageQueue::getNextQueueId()
   
    // Handle wrap-around!  
    if (_queueId == 0)  
       _queueId = MessageQueue::_CIMOM_Q_ID;  
    Uint32 ret = _queueId++;  
    _id_mut.unlock();  
   
    return ret;  
 }  
   
 Uint32 MessageQueue::_CIMOM_Q_ID = 1;  
   
 MessageQueue::MessageQueue(const char * name)  
         : _mut( ), _count(0), _front(0), _back(0),  
         _workThread(MessageQueue::workThread, this, false),  
         _workSemaphore(0)  
 { {
    if(name != NULL)      return _qidFactory.getID();
    {  
       strncpy(_name, name, 25);  
       _name[25] = 0x00;  
    }    }
    else  
       memset(_name, 0x00,25);  
   
     q_table_mut.lock(pegasus_thread_self());  
   
     while (!_queueTable.insert(_queueId = _GetNextQueueId(), this))  
        ;  
     q_table_mut.unlock();  
  
         _workThread.run();  void MessageQueue::putQueueId(Uint32 queueId)
 }  
   
 MessageQueue::~MessageQueue()  
 { {
     // ATTN-A: thread safety!      _qidFactory.putID(queueId);
     q_table_mut.lock(pegasus_thread_self());  
   
     _queueTable.remove(_queueId);  
     q_table_mut.unlock();  
   
         _workThread.cancel();   // cancel thread  
         _workSemaphore.signal();// wake thread  
         _workThread.join();             // wait for thread to complete  
 } }
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueue::workThread(void * arg)  MessageQueue::MessageQueue(const char* name)
 {     : _queueId(getNextQueueId())
         // get thread from argument  
         Thread * thread = (Thread *)arg;  
   
         PEGASUS_ASSERT(thread != 0);  
   
         // get message queue from thread  
         MessageQueue * queue = (MessageQueue *)thread->get_parm();  
   
         PEGASUS_ASSERT(queue != 0);  
   
         while(true)  
         {  
            thread->sleep(1);  
            continue;  
   
                 // wait for work  
                 queue->_workSemaphore.wait();  
   
                 // stop the thread when the message queue has been destroyed.  
                 // ATTN: should check the thread cancel flag that is not yet exposed!  
                 if(MessageQueue::lookup(queue->_queueId) == 0)  
                 {  
                         break;  
                 }  
   
                 // ensure the queue has a message before dispatching  
                 if(queue->_count != 0)  
                 {                 {
                         queue->handleEnqueue();      //
                 }      // Copy the name:
       //
         }  
   
         thread->exit_self(PEGASUS_THREAD_RETURN(0));  
   
         return(0);  
 }  
  
 void MessageQueue::enqueue(Message* message) throw(IPCException)      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::MessageQueue()");
 {  
     if (!message)  
        throw NullPointer();  
  
       if (!name)
           name = "";
  
       _name = new char[strlen(name) + 1];
       strcpy(_name, name);
  
     if (getenv("PEGASUS_TRACE"))      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
     {          "MessageQueue::MessageQueue  name = %s, queueId = %u", name, _queueId));
        cout << "===== " << getQueueName() << ": ";  
        message->print(cout);  
     }  
  
     _mut.lock(pegasus_thread_self());      //
     if (_back)      // Insert into queue table:
     {      //
        _back->_next = message;      AutoMutex autoMut(q_table_mut);
        message->_prev = _back;      while (!_queueTable.insert(_queueId, this))
        message->_next = 0;          ;
        _back = message;  
     }  
     else  
     {  
        _front = message;  
        _back = message;  
        message->_prev = 0;  
        message->_next = 0;  
     }  
     message->_owner = this;  
     _count++;  
     _mut.unlock();  
   
 //    _workSemaphore.signal();  
   
     handleEnqueue();  
  
       PEG_METHOD_EXIT();
 } }
  
 Message* MessageQueue::dequeue() throw(IPCException)  MessageQueue::~MessageQueue()
 {  
    _mut.lock(pegasus_thread_self());  
     if (_front)  
     {     {
         Message* message = _front;      // ATTN-A: thread safety!
         _front = _front->_next;      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
         if (_front)      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
             _front->_prev = 0;          "MessageQueue::~MessageQueue queueId = %i, name = %s",
           _queueId,
         if (_back == message)          _name));
             _back = 0;  
         _count--;  
         _mut.unlock();  
         message->_next = 0;  
         message->_prev = 0;  
         message->_owner = 0;  
         return message;  
     }  
     _mut.unlock();  
     return 0;  
 }  
  
 void MessageQueue::remove(Message* message) throw(IPCException)  
 { {
     if (!message)          AutoMutex autoMut(q_table_mut);
         throw NullPointer();          _queueTable.remove(_queueId);
       } // mutex unlocks here
     if (message->_owner != this)  
         throw NoSuchMessageOnQueue();  
   
     _mut.lock(pegasus_thread_self());  
   
     if (message->_next)  
         message->_next->_prev = message->_prev;  
     else  
         _back = message->_prev;  
  
     if (message->_prev)      // Free the name:
         message->_prev->_next = message->_next;  
     else  
         _front = message->_next;  
  
     _count--;      delete [] _name;
     _mut.unlock();  
  
     message->_prev = 0;      // Return the queue id.
     message->_next = 0;  
     message->_owner = 0;  
 }  
  
 Message* MessageQueue::findByType(Uint32 type) throw(IPCException)      putQueueId(_queueId);
 {  
    _mut.lock(pegasus_thread_self());  
  
     for (Message* m = front(); m; m = m->getNext())      PEG_METHOD_EXIT();
     {  
        if (m->getType() == type)  
        {  
           _mut.unlock();  
           return m;  
        }  
     }  
     _mut.unlock();  
     return 0;  
 } }
  
 Message* MessageQueue::findByKey(Uint32 key) throw(IPCException)  void MessageQueue::enqueue(Message* message)
 { {
    _mut.lock(pegasus_thread_self());      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
  
     for (Message* m = front(); m; m = m->getNext())      PEGASUS_ASSERT(message != 0);
     {  
        if (m->getKey() == key)  
        {  
           _mut.unlock();  
           return m;  
        }  
  
     }      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
     _mut.unlock();          "Queue name: [%s], Message: [%s]",
     return 0;          getQueueName(),
 }          MessageTypeToString(message->getType())));
  
 void MessageQueue::print(ostream& os) const throw(IPCException)      _messageList.insert_back(message);
 {  
    const_cast<MessageQueue *>(this)->_mut.lock(pegasus_thread_self());  
  
    for (const Message* m = front(); m; m = m->getNext())      handleEnqueue();
         m->print(os);      PEG_METHOD_EXIT();
    const_cast<MessageQueue *>(this)->_mut.unlock();  
 } }
  
 Message* MessageQueue::find(Uint32 type, Uint32 key) throw(IPCException)  Message* MessageQueue::dequeue()
 { {
    _mut.lock(pegasus_thread_self());      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
  
     for (Message* m = front(); m; m = m->getNext())      Message* message = _messageList.remove_front();
     {  
        if (m->getType() == type && m->getKey() == key)  
        {  
           _mut.unlock();  
           return m;  
        }  
     }  
     _mut.unlock();  
  
     return 0;      PEG_METHOD_EXIT();
 }      return message;
   
 void MessageQueue::lock() throw(IPCException)  
 {  
    _mut.lock(pegasus_thread_self());  
 }  
   
 void MessageQueue::unlock()  
 {  
    _mut.unlock();  
 } }
  
 const char* MessageQueue::getQueueName() const const char* MessageQueue::getQueueName() const
 { {
    if(_name[0] != 0x00)  
       return _name;       return _name;
    return "unknown";  
 } }
  
 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)  MessageQueue* MessageQueue::lookup(Uint32 queueId)
 { {
   
     MessageQueue* queue = 0;     MessageQueue* queue = 0;
     q_table_mut.lock(pegasus_thread_self());      AutoMutex autoMut(q_table_mut);
  
     if (_queueTable.lookup(queueId, queue))     if (_queueTable.lookup(queueId, queue))
     {     {
        q_table_mut.unlock();  
        return queue;        return queue;
     }     }
  
     // Not found!     // Not found!
  
     q_table_mut.unlock();      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL1,
           "MessageQueue::lookup failure queueId = %u", queueId));
  
     return 0;     return 0;
 } }
  
  
 MessageQueue* MessageQueue::lookup(const char *name) throw(IPCException)  MessageQueue* MessageQueue::lookup(const char *name)
 { {
   
    if(name == NULL)    if(name == NULL)
       throw NullPointer();       throw NullPointer();
    q_table_mut.lock(pegasus_thread_self());  
  
       AutoMutex autoMut(q_table_mut);
    for(QueueTable::Iterator i = _queueTable.start(); i; i++)    for(QueueTable::Iterator i = _queueTable.start(); i; i++)
    {    {
         // ATTN: Need to decide how many characters to compare in queue names         // ATTN: Need to decide how many characters to compare in queue names
       if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )          if (!strcmp(((MessageQueue *)i.value())->getQueueName(), name))
       {       {
          q_table_mut.unlock();              return (MessageQueue *)i.value();
          return( (MessageQueue *)i.value());  
       }       }
   
    }    }
    q_table_mut.unlock();  
       PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL1,
           "MessageQueue::lookup failure - name = %s", name));
  
    return 0;    return 0;
 } }


Legend:
Removed from v.1.10  
changed lines
  Added in v.1.62

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2