(file) Return to MessageQueue.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 karl  1.47 //%2006////////////////////////////////////////////////////////////////////////
  2 mike  1.4  //
  3 karl  1.39 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
  4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
  5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
  6 karl  1.36 // IBM Corp.; EMC Corporation, The Open Group.
  7 karl  1.39 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
  8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
  9 karl  1.40 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
 11 karl  1.47 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
 12            // EMC Corporation; Symantec Corporation; The Open Group.
 13 mike  1.4  //
 14            // Permission is hereby granted, free of charge, to any person obtaining a copy
 15 chip  1.8  // of this software and associated documentation files (the "Software"), to
 16            // deal in the Software without restriction, including without limitation the
 17            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
 18 mike  1.4  // sell copies of the Software, and to permit persons to whom the Software is
 19            // furnished to do so, subject to the following conditions:
 20 karl  1.47 // 
 21 chip  1.8  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 22 mike  1.4  // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 23            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 24 chip  1.8  // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 25            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 26            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 27 mike  1.4  // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 28            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 29            //
 30            //==============================================================================
 31            //
 32            // Author: Mike Brasher (mbrasher@bmc.com)
 33            //
 34 a.arora 1.38 // Modified By: Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090
 35 joyce.j 1.41 //              Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#2076
 36 david.dillard 1.43 //              David Dillard, VERITAS Software Corp.
 37                    //                  (david.dillard@veritas.com)
 38 aruran.ms     1.46 //              Aruran, IBM (ashanmug@in.ibm.com) for Bug# 3475
 39 mike          1.4  //
 40                    //%/////////////////////////////////////////////////////////////////////////////
 41                    
 42 mike          1.5  #include <Pegasus/Common/HashTable.h>
 43 mike          1.7  #include <Pegasus/Common/IPC.h>
 44 kumpf         1.19 #include <Pegasus/Common/Tracer.h>
 45 aruran.ms     1.46 #include "Stack.h"
 46 mike          1.4  #include "MessageQueue.h"
 47 mday          1.24 #include "MessageQueueService.h"
 48 mike          1.4  PEGASUS_USING_STD;
 49                    
 50                    PEGASUS_NAMESPACE_BEGIN
 51                    
 52 mike          1.5  typedef HashTable<Uint32, MessageQueue*, EqualFunc<Uint32>, HashFunc<Uint32> >
 53                        QueueTable;
 54                    
 55 mday          1.33 static QueueTable _queueTable(256);
 56 mday          1.28 static Mutex q_table_mut ;
 57 mike          1.5  
 58 mday          1.35 void MessageQueue::remove_myself(Uint32 qid)
 59                    {
 60 david.dillard 1.43     AutoMutex autoMut(q_table_mut);
 61                        _queueTable.remove(qid);
 62 mday          1.35 }
 63                    
 64 aruran.ms     1.46 static Stack<Uint32> _qid_stack;
 65                    static Uint32 _qid_next = CIMOM_Q_ID + 1;
 66                    static Mutex _qid_mutex;
 67 mday          1.35 
 68 david.dillard 1.43 Uint32 MessageQueue::getNextQueueId()
 69 mike          1.5  {
 70 aruran.ms     1.46     // If _qid_stack is empty, return _qid_next (and then increment _qid_next).
 71                        // Else return the top of the stack.
 72 mike          1.16 
 73 aruran.ms     1.46     AutoMutex autoMutex(_qid_mutex);
 74 chip          1.8  
 75 aruran.ms     1.46     if (_qid_stack.isEmpty())
 76                            return _qid_next++;
 77 chip          1.8  
 78 aruran.ms     1.46     Uint32 queueId = _qid_stack.top();
 79                        _qid_stack.pop();
 80                        return queueId;
 81                    }
 82 chip          1.8  
 83 aruran.ms     1.46 void MessageQueue::putQueueId(Uint32 queueId)
 84                    {
 85                        // Put the queueId on the top of the stack.
 86 mike          1.7  
 87 aruran.ms     1.46     AutoMutex autoMutex(_qid_mutex);
 88                        // Ignore an attempt to return the well-known queue id (CIMOM_Q_ID).
 89                        // This id is reserved for the CIMOM queue.
 90 mday          1.12 
 91 aruran.ms     1.46     if (queueId == CIMOM_Q_ID)
 92                            return;
 93 mday          1.24 
 94 aruran.ms     1.46     _qid_stack.push(queueId);
 95                    }
 96 mday          1.24 
 97 mike          1.16 MessageQueue::MessageQueue(
 98 david.dillard 1.43     const char* name,
 99 mike          1.16     Boolean async,
100                        Uint32 queueId)
101 mday          1.24    : _queueId(queueId), _capabilities(0), _count(0), _front(0), _back(0), _async(async)
102 mike          1.5  {
103 mike          1.16     //
104                        // Copy the name:
105                        //
106                    
107 david.dillard 1.43     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::MessageQueue()");
108 kumpf         1.19 
109 mike          1.16     if (!name)
110 david.dillard 1.43         name = "";
111 mike          1.16 
112                        _name = new char[strlen(name) + 1];
113                        strcpy(_name, name);
114                    
115 kumpf         1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
116 kumpf         1.42         "MessageQueue::MessageQueue  name = %s, queueId = %u", name, queueId);
117 kumpf         1.19 
118 mike          1.16     //
119                        // Insert into queue table:
120                        //
121 a.arora       1.38     AutoMutex autoMut(q_table_mut);
122 david.dillard 1.43     while (!_queueTable.insert(_queueId, this))
123                            ;
124 chip          1.8  
125 david.dillard 1.43     PEG_METHOD_EXIT();
126 mike          1.5  }
127                    
128                    MessageQueue::~MessageQueue()
129                    {
130                        // ATTN-A: thread safety!
131 kumpf         1.25     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::~MessageQueue()");
132                        Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
133 kumpf         1.19         "MessageQueue::~MessageQueue queueId = %i, name = %s", _queueId, _name);
134                    
135 a.arora       1.38     {
136 david.dillard 1.43         AutoMutex autoMut(q_table_mut);
137                            _queueTable.remove(_queueId);
138 a.arora       1.38     } // mutex unlocks here
139 david.dillard 1.43 
140 mike          1.16     // Free the name:
141 david.dillard 1.43 
142 mike          1.16     delete [] _name;
143 kumpf         1.19 
144 david.dillard 1.43     while(_front)
145 joyce.j       1.41     {
146                           Message* tmp = _front;
147                           _front = _front->_next;
148                           delete tmp;
149                        }
150 aruran.ms     1.46     
151                        // Return the queue id.
152                    
153                        putQueueId(_queueId);
154 joyce.j       1.41 
155 kumpf         1.25     PEG_METHOD_EXIT();
156 chip          1.8  }
157                    
158 kumpf         1.37 void MessageQueue::enqueue(Message* message)
159 mike          1.4  {
160 kumpf         1.25     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::enqueue()");
161 kumpf         1.19 
162 david.dillard 1.43     if (!message)
163 kumpf         1.19     {
164 david.dillard 1.43         Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
165                                        "MessageQueue::enqueue failure");
166                            PEG_METHOD_EXIT();
167                            throw NullPointer();
168 kumpf         1.19     }
169 mike          1.7  
170 david.dillard 1.43     PEG_TRACE_STRING( TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
171 kumpf         1.31                       String("Queue name: ") + getQueueName() ) ;
172 david.dillard 1.43     Tracer::trace   ( TRC_MESSAGEQUEUESERVICE,
173 kumpf         1.31                       Tracer::LEVEL3,
174 david.dillard 1.43                       "Message: [%s, %d]",
175                                          MessageTypeToString(message->getType()),
176 kumpf         1.31                       message->getKey() );
177 david.dillard 1.43 
178 a.arora       1.38     {
179                        AutoMutex autoMut(_mut);
180 mike          1.4      if (_back)
181                        {
182 david.dillard 1.43         _back->_next = message;
183                            message->_prev = _back;
184                            message->_next = 0;
185                            _back = message;
186 mike          1.4      }
187                        else
188                        {
189 david.dillard 1.43         _front = message;
190                            _back = message;
191                            message->_prev = 0;
192                            message->_next = 0;
193 mike          1.4      }
194                        message->_owner = this;
195 david.dillard 1.43 
196 mike          1.4      _count++;
197 kumpf         1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
198 david.dillard 1.43                   "MessageQueue::enqueue _queueId = %d, _count = %d", _queueId, _count);
199                    
200 a.arora       1.38     } // mutex unlocks here
201 david.dillard 1.43 
202 mday          1.22     handleEnqueue();
203 kumpf         1.25     PEG_METHOD_EXIT();
204 mday          1.15 }
205                    
206 david.dillard 1.43 Message* MessageQueue::dequeue()
207 mike          1.4  {
208 kumpf         1.25     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::dequeue()");
209                    
210 a.arora       1.38     AutoMutex autoMut(_mut);
211 mike          1.4      if (_front)
212                        {
213 david.dillard 1.43         Message* message = _front;
214                            _front = _front->_next;
215                            if (_front)
216                                _front->_prev = 0;
217 mike          1.4  
218 david.dillard 1.43         if (_back == message)
219                                _back = 0;
220 kumpf         1.25 
221 david.dillard 1.43         _count--;
222 kumpf         1.25         Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
223 david.dillard 1.43             "MessageQueue::dequeue _queueId = %d, _count = %d",
224 kumpf         1.26             _queueId, _count);
225 kumpf         1.25 
226 david.dillard 1.43         message->_next = 0;
227                            message->_prev = 0;
228                            message->_owner = 0;
229 kumpf         1.25 
230                            PEG_METHOD_EXIT();
231 david.dillard 1.43         return message;
232 mike          1.4      }
233 kumpf         1.25 
234                        PEG_METHOD_EXIT();
235 mike          1.4      return 0;
236                    }
237 mday          1.23 
238 mike          1.4  
239 david.dillard 1.43 
240                    void MessageQueue::remove(Message* message)
241 mike          1.4  {
242 kumpf         1.25     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,"MessageQueue::remove()");
243                    
244 mike          1.4      if (!message)
245 kumpf         1.25     {
246                            PEG_METHOD_EXIT();
247 david.dillard 1.43         throw NullPointer();
248 kumpf         1.25     }
249 mike          1.4  
250                        if (message->_owner != this)
251 kumpf         1.25     {
252                            PEG_METHOD_EXIT();
253 david.dillard 1.43         throw NoSuchMessageOnQueue();
254 kumpf         1.25     }
255 mike          1.4  
256 a.arora       1.38     {
257                        AutoMutex autoMut(_mut);
258 chip          1.8  
259 mike          1.4      if (message->_next)
260 david.dillard 1.43         message->_next->_prev = message->_prev;
261 mike          1.4      else
262 david.dillard 1.43         _back = message->_prev;
263 mike          1.4  
264                        if (message->_prev)
265 david.dillard 1.43         message->_prev->_next = message->_next;
266 mike          1.4      else
267 david.dillard 1.43         _front = message->_next;
268 mike          1.4  
269 mike          1.7      _count--;
270 kumpf         1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL4,
271                           "MessageQueue::remove _count = %d", _count);
272                    
273 a.arora       1.38     } // mutex unlocks here
274 chip          1.8  
275 mike          1.4      message->_prev = 0;
276                        message->_next = 0;
277                        message->_owner = 0;
278 kumpf         1.25 
279                        PEG_METHOD_EXIT();
280 mike          1.4  }
281                    
282 david.dillard 1.43 Message* MessageQueue::findByType(Uint32 type)
283 mike          1.4  {
284 david.dillard 1.43     AutoMutex autoMut(_mut);
285 chip          1.8  
286 mike          1.4      for (Message* m = front(); m; m = m->getNext())
287                        {
288 david.dillard 1.43         if (m->getType() == type)
289                            {
290                                return m;
291                            }
292 mike          1.4      }
293 david.dillard 1.43 
294 mike          1.4      return 0;
295                    }
296                    
297 david.dillard 1.43 Message* MessageQueue::findByKey(Uint32 key)
298 mike          1.4  {
299 david.dillard 1.43     AutoMutex autoMut(_mut);
300 chip          1.8  
301 mike          1.4      for (Message* m = front(); m; m = m->getNext())
302                        {
303 mike          1.7         if (m->getKey() == key)
304                           {
305 a.arora       1.38           return m;
306 mike          1.7         }
307 chip          1.8  
308 mike          1.4      }
309 david.dillard 1.43 
310 mike          1.4      return 0;
311                    }
312                    
313 joyce.j       1.44 #ifdef PEGASUS_DEBUG
314 david.dillard 1.43 void MessageQueue::print(ostream& os) const
315 mike          1.4  {
316 david.dillard 1.43     AutoMutex autoMut(const_cast<MessageQueue *>(this)->_mut);
317 chip          1.8  
318 david.dillard 1.43     for (const Message* m = front(); m; m = m->getNext())
319                            m->print(os);
320 mike          1.4  }
321 joyce.j       1.44 #endif
322 mike          1.4  
323 david.dillard 1.43 Message* MessageQueue::find(Uint32 type, Uint32 key)
324 mike          1.4  {
325 david.dillard 1.43     AutoMutex autoMut(_mut);
326 chip          1.8  
327 mike          1.4      for (Message* m = front(); m; m = m->getNext())
328                        {
329 david.dillard 1.43         if (m->getType() == type && m->getKey() == key)
330                            {
331                                return m;
332                            }
333 mike          1.4      }
334                    
335 mike          1.5      return 0;
336                    }
337                    
338 mike          1.7  const char* MessageQueue::getQueueName() const
339 mike          1.5  {
340 david.dillard 1.43     return _name;
341 mike          1.5  }
342                    
343 david.dillard 1.43 MessageQueue* MessageQueue::lookup(Uint32 queueId)
344 mike          1.5  {
345 kumpf         1.19 
346 mike          1.5      MessageQueue* queue = 0;
347 a.arora       1.38     AutoMutex autoMut(q_table_mut);
348 chip          1.8  
349 mike          1.5      if (_queueTable.lookup(queueId, queue))
350 mike          1.7      {
351 david.dillard 1.43         return queue;
352 mike          1.7      }
353 chip          1.8  
354 mike          1.7      // Not found!
355 mike          1.5  
356 kumpf         1.25     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
357 kumpf         1.42         "MessageQueue::lookup failure queueId = %u", queueId);
358 kumpf         1.19 
359 mike          1.4      return 0;
360 mike          1.6  }
361 mike          1.7  
362                    
363 david.dillard 1.43 MessageQueue* MessageQueue::lookup(const char *name)
364 mike          1.7  {
365 kumpf         1.19 
366 david.dillard 1.43     if(name == NULL)
367                            throw NullPointer();
368 chip          1.8  
369 david.dillard 1.43     AutoMutex autoMut(q_table_mut);
370 mike          1.7     for(QueueTable::Iterator i = _queueTable.start(); i; i++)
371                       {
372                            // ATTN: Need to decide how many characters to compare in queue names
373 david.dillard 1.43         if(! strcmp( ((MessageQueue *)i.value())->getQueueName(), name) )
374                            {
375                                return( (MessageQueue *)i.value());
376                            }
377                        }
378 chip          1.8  
379 david.dillard 1.43     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
380                                        "MessageQueue::lookup failure - name = %s", name);
381 kumpf         1.19 
382 david.dillard 1.43     return 0;
383 mike          1.7  }
384                    
385 mike          1.6  
386                    void MessageQueue::handleEnqueue()
387                    {
388                    
389 mike          1.4  }
390                    
391                    PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2