(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueue.cpp between version 1.21 and 1.37

version 1.21, 2002/01/21 21:20:35 version 1.37, 2003/11/07 19:18:46
Line 1 
Line 1 
 //%/////////////////////////////////////////////////////////////////////////////  //%2003////////////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM  // Copyright (c) 2000, 2001, 2002  BMC Software, Hewlett-Packard Development
   // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
   // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
   // IBM Corp.; EMC Corporation, The Open Group.
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 30 
Line 33 
 #include <Pegasus/Common/IPC.h> #include <Pegasus/Common/IPC.h>
 #include <Pegasus/Common/Tracer.h> #include <Pegasus/Common/Tracer.h>
 #include "MessageQueue.h" #include "MessageQueue.h"
   #include "MessageQueueService.h"
 PEGASUS_USING_STD; PEGASUS_USING_STD;
  
 PEGASUS_NAMESPACE_BEGIN PEGASUS_NAMESPACE_BEGIN
Line 38 
Line 41 
 typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> > typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
     QueueTable;     QueueTable;
  
 static QueueTable _queueTable(128);  static QueueTable _queueTable(256);
 static Mutex q_table_mut = Mutex();  static Mutex q_table_mut ;
   
   void MessageQueue::remove_myself(Uint32 qid)
   {
      q_table_mut.lock(pegasus_thread_self());
   
      _queueTable.remove(qid);
      q_table_mut.unlock();
   }
   
  
 Uint32 MessageQueue::getNextQueueId() throw(IPCException) Uint32 MessageQueue::getNextQueueId() throw(IPCException)
 { {
    static Uint32 _nextQueueId = 1;     static Uint32 _nextQueueId = 2;
  
    //    //
    // Lock mutex:    // Lock mutex:
    //    //
  
    static Mutex _id_mut = Mutex();     static Mutex _id_mut ;
    _id_mut.lock(pegasus_thread_self());    _id_mut.lock(pegasus_thread_self());
  
    // Assign next queue id. Handle wrap around and never assign zero as     Uint32 queueId;
    // a queue id:  
  
      // Assign the next queue ID that is not already in use
      do
      {
         // Handle wrap around and never assign zero or one as a queue id:
    if (_nextQueueId == 0)    if (_nextQueueId == 0)
       _nextQueueId = 1;        {
            _nextQueueId = 2;
         }
  
    Uint32 queueId = _nextQueueId++;        queueId = _nextQueueId++;
      } while (lookup(queueId) != 0);
  
    //    //
    // Unlock mutex:    // Unlock mutex:
Line 69 
Line 87 
    return queueId;    return queueId;
 } }
  
   
   
 MessageQueue::MessageQueue( MessageQueue::MessageQueue(
     const char* name,     const char* name,
     Boolean async,     Boolean async,
     Uint32 queueId)     Uint32 queueId)
     :     : _queueId(queueId), _capabilities(0), _count(0), _front(0), _back(0), _async(async)
     _queueId(queueId), _count(0), _front(0), _back(0), _async(async),  
     _workThread(MessageQueue::workThread, this, false), _workSemaphore(0)  
 { {
     //     //
     // Copy the name:     // Copy the name:
     //     //
  
    PEG_FUNC_ENTER(TRC_DISPATCHER,"MessageQueue::MessageQueue()");     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::MessageQueue()");
  
     if (!name)     if (!name)
         name = "";         name = "";
Line 89 
Line 107 
     _name = new char[strlen(name) + 1];     _name = new char[strlen(name) + 1];
     strcpy(_name, name);     strcpy(_name, name);
  
     Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         "MessageQueue::MessageQueue  name = %s, queueId = %i", name, queueId);         "MessageQueue::MessageQueue  name = %s, queueId = %i", name, queueId);
  
     //     //
Line 104 
Line 122 
     q_table_mut.unlock();     q_table_mut.unlock();
  
  
     if(_async == true)     PEG_METHOD_EXIT();
        _workThread.run();  
   
    PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::MessageQueue()");  
 } }
  
 MessageQueue::~MessageQueue() MessageQueue::~MessageQueue()
 { {
     // ATTN-A: thread safety!     // ATTN-A: thread safety!
  
     PEG_FUNC_ENTER(TRC_DISPATCHER,"MessageQueue::~MessageQueue()");      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
  
     Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);         "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
  
     q_table_mut.lock(pegasus_thread_self());     q_table_mut.lock(pegasus_thread_self());
Line 124 
Line 139 
     _queueTable.remove(_queueId);     _queueTable.remove(_queueId);
     q_table_mut.unlock();     q_table_mut.unlock();
  
     if(_async == true)  
     {  
        _workThread.cancel();    // cancel thread  
        _workSemaphore.signal(); // wake thread  
        _workThread.join();      // wait for thread to complete  
     }  
   
     // Free the name:     // Free the name:
  
     delete [] _name;     delete [] _name;
  
     PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::~MessageQueue()");      PEG_METHOD_EXIT();
 }  
   
   
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueue::workThread(void * arg)  
 {  
   
         PEG_FUNC_ENTER(TRC_DISPATCHER,"MessageQueue::workThread()");  
   
         // get thread from argument  
         Thread * thread = (Thread *)arg;  
   
         PEGASUS_ASSERT(thread != 0);  
   
         // get message queue from thread  
         MessageQueue * queue = (MessageQueue *)thread->get_parm();  
   
         PEGASUS_ASSERT(queue != 0);  
   
         while(true)  
         {  
            if(thread->is_cancelled())  
            {  
               break;  
            }  
   
            // wait for work  
            queue->_workSemaphore.wait();  
   
            // stop the thread when the message queue has been destroyed.  
            // ATTN: should check the thread cancel flag that is not yet exposed!  
            if(MessageQueue::lookup(queue->_queueId) == 0)  
            {  
              Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,  
                      "MessageQueue::workThread - ",  
                      "Message queue, %i, for thread no longer exists",  
                      queue->_queueId);  
               break;  
            }  
   
            // ensure the queue has a message before dispatching  
            if(queue->_count != 0)  
            {  
               queue->handleEnqueue();  
            }  
         }  
   
         PEG_FUNC_EXIT(TRC_DISPATCHER,"MessageQueue::workThread()");  
   
         thread->exit_self(PEGASUS_THREAD_RETURN(0));  
   
         return(0);  
 } }
  
 void MessageQueue::enqueue(Message* message) throw(IPCException)  void MessageQueue::enqueue(Message* message)
 { {
       PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
  
     if (!message)     if (!message)
     {     {
        Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,         Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         "MessageQueue::enqueue failure");         "MessageQueue::enqueue failure");
          PEG_METHOD_EXIT();
        throw NullPointer();        throw NullPointer();
     }     }
  
     if (getenv("PEGASUS_TRACE"))      PEG_TRACE_STRING( TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
     {                        String("Queue name: ") + getQueueName() ) ;
        cout << "===== " << getQueueName() << ": ";      Tracer::trace   ( TRC_MESSAGEQUEUESERVICE,
        message->print(cout);                        Tracer::LEVEL3,
     }                        "Message: [%s, %d]",
                         MessageTypeToString(message->getType()),
                         message->getKey() );
  
     _mut.lock(pegasus_thread_self());     _mut.lock(pegasus_thread_self());
     if (_back)     if (_back)
Line 221 
Line 182 
        message->_next = 0;        message->_next = 0;
     }     }
     message->_owner = this;     message->_owner = this;
     _count++;  
     if( _async == true )  
     {  
        _workSemaphore.signal();  
  
     }      _count++;
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
                     "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);
  
     _mut.unlock();     _mut.unlock();
  
     if(_async == false )  
        handleEnqueue();        handleEnqueue();
       PEG_METHOD_EXIT();
 } }
  
   
 Message* MessageQueue::dequeue() throw(IPCException) Message* MessageQueue::dequeue() throw(IPCException)
 { {
       PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
   
    _mut.lock(pegasus_thread_self());    _mut.lock(pegasus_thread_self());
     if (_front)     if (_front)
     {     {
Line 248 
Line 207 
  
         if (_back == message)         if (_back == message)
             _back = 0;             _back = 0;
   
         _count--;         _count--;
           Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
               "MessageQueue::dequeue _queueId = %d, _count = %d",
               _queueId, _count);
   
         _mut.unlock();         _mut.unlock();
         message->_next = 0;         message->_next = 0;
         message->_prev = 0;         message->_prev = 0;
         message->_owner = 0;         message->_owner = 0;
   
           PEG_METHOD_EXIT();
         return message;         return message;
     }     }
     _mut.unlock();     _mut.unlock();
   
       PEG_METHOD_EXIT();
     return 0;     return 0;
 } }
   ;
   
  
 void MessageQueue::remove(Message* message) throw(IPCException) void MessageQueue::remove(Message* message) throw(IPCException)
 { {
       PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");
   
     if (!message)     if (!message)
       {
           PEG_METHOD_EXIT();
         throw NullPointer();         throw NullPointer();
       }
  
     if (message->_owner != this)     if (message->_owner != this)
       {
           PEG_METHOD_EXIT();
         throw NoSuchMessageOnQueue();         throw NoSuchMessageOnQueue();
       }
  
     _mut.lock(pegasus_thread_self());     _mut.lock(pegasus_thread_self());
  
Line 280 
Line 258 
         _front = message->_next;         _front = message->_next;
  
     _count--;     _count--;
       Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
          "MessageQueue::remove _count = %d", _count);
   
     _mut.unlock();     _mut.unlock();
  
     message->_prev = 0;     message->_prev = 0;
     message->_next = 0;     message->_next = 0;
     message->_owner = 0;     message->_owner = 0;
   
       PEG_METHOD_EXIT();
 } }
  
 Message* MessageQueue::findByType(Uint32 type) throw(IPCException) Message* MessageQueue::findByType(Uint32 type) throw(IPCException)
Line 358 
Line 341 
  
 const char* MessageQueue::getQueueName() const const char* MessageQueue::getQueueName() const
 { {
    if(_name[0] != 0x00)  
       return _name;       return _name;
    return "unknown";  
 } }
  
 MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException) MessageQueue* MessageQueue::lookup(Uint32 queueId) throw(IPCException)
Line 379 
Line 360 
  
     q_table_mut.unlock();     q_table_mut.unlock();
  
     Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         "MessageQueue::lookup failure queueId = %i", queueId);         "MessageQueue::lookup failure queueId = %i", queueId);
  
     return 0;     return 0;
Line 396 
Line 377 
    for(QueueTable::Iterator i = _queueTable.start(); i; i++)    for(QueueTable::Iterator i = _queueTable.start(); i; i++)
    {    {
         // ATTN: Need to decide how many characters to compare in queue names         // ATTN: Need to decide how many characters to compare in queue names
       if(! strncmp( ((MessageQueue *)i.value())->getQueueName(), name, 25) )        if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )
       {       {
          q_table_mut.unlock();          q_table_mut.unlock();
          return( (MessageQueue *)i.value());          return( (MessageQueue *)i.value());
Line 405 
Line 386 
    }    }
    q_table_mut.unlock();    q_table_mut.unlock();
  
    Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
         "MessageQueue::lookup failure - name = %s", name);         "MessageQueue::lookup failure - name = %s", name);
  
    return 0;    return 0;


Legend:
Removed from v.1.21  
changed lines
  Added in v.1.37

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2