(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.119 //%2006////////////////////////////////////////////////////////////////////////
   2 mday  1.1   //
   3 karl  1.88  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6 karl  1.82  // IBM Corp.; EMC Corporation, The Open Group.
   7 karl  1.88  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9 karl  1.91  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 karl  1.119 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  12             // EMC Corporation; Symantec Corporation; The Open Group.
  13 mday  1.1   //
  14             // Permission is hereby granted, free of charge, to any person obtaining a copy
  15             // of this software and associated documentation files (the "Software"), to
  16             // deal in the Software without restriction, including without limitation the
  17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  18             // sell copies of the Software, and to permit persons to whom the Software is
  19             // furnished to do so, subject to the following conditions:
  20 karl  1.82  // 
  21 mday  1.1   // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  22             // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  24             // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  27             // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29             //
  30             //==============================================================================
  31             //
  32             //%/////////////////////////////////////////////////////////////////////////////
  33             
  34             #include "MessageQueueService.h"
  35 mday  1.22  #include <Pegasus/Common/Tracer.h>
  36 kumpf 1.126 #include <Pegasus/Common/MessageLoader.h>
  37 mday  1.1   
  38             PEGASUS_NAMESPACE_BEGIN
  39             
  40 mday  1.15  cimom *MessageQueueService::_meta_dispatcher = 0;
  41 mike  1.118 AtomicInt MessageQueueService::_service_count(0);
  42 mday  1.38  Mutex MessageQueueService::_meta_dispatcher_mutex;
  43 mday  1.15  
  44 kumpf 1.104 static struct timeval deallocateWait = {300, 0};
  45 mday  1.53  
  46 mday  1.55  ThreadPool *MessageQueueService::_thread_pool = 0;
  47 mday  1.53  
  48 mike  1.120 MessageQueueService::PollingList* MessageQueueService::_polling_list;
  49             Mutex MessageQueueService::_polling_list_mutex;
  50 mday  1.53  
  51 kumpf 1.62  Thread* MessageQueueService::_polling_thread = 0;
  52 mday  1.61  
  53 kumpf 1.104 ThreadPool *MessageQueueService::get_thread_pool()
  54 mday  1.69  {
  55                return _thread_pool;
  56             }
  57 kumpf 1.117 
  58 jim.wunderlich 1.110 //
  59 kumpf          1.117 // MAX_THREADS_PER_SVC_QUEUE
  60 jim.wunderlich 1.110 //
  61                      // JR Wunderlich Jun 6, 2005
  62                      //
  63                      
  64                      #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000 
  65 jim.wunderlich 1.114 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
  66 jim.wunderlich 1.110 
  67 kumpf          1.117 #ifndef MAX_THREADS_PER_SVC_QUEUE
  68                      # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
  69                      #endif
  70                      
  71 jim.wunderlich 1.110 Uint32 max_threads_per_svc_queue;
  72 mday           1.69  
  73 mike           1.124 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
  74 mday           1.53  {
  75                         Thread *myself = reinterpret_cast<Thread *>(parm);
  76 mike           1.124    List<MessageQueueService, Mutex> *list = 
  77                             reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm());
  78 mike           1.120 
  79 mike           1.118    while (_stop_polling.get()  == 0)
  80 mday           1.53     {
  81 baggio.br      1.100       _polling_sem.wait();
  82                      
  83 mike           1.118       if (_stop_polling.get() != 0)
  84 mday           1.61        {
  85 kumpf          1.62           break;
  86 mday           1.61        }
  87 kumpf          1.104 
  88 denise.eckstein 1.116       // The polling_routine thread must hold the lock on the
  89 kumpf           1.130       // _polling_list while processing incoming messages.
  90 denise.eckstein 1.116       // This lock is used to give this thread ownership of
  91                             // services on the _polling_routine list.
  92                       
  93                             // This is necessary to avoid confict with other threads
  94                             // processing the _polling_list
  95                             // (e.g., MessageQueueServer::~MessageQueueService).
  96                       
  97 mday            1.53        list->lock();
  98 mike            1.120       MessageQueueService *service = list->front();
  99 denise.eckstein 1.116       ThreadStatus rtn = PEGASUS_THREAD_OK;
 100                             while (service != NULL)
 101                             {
 102                                 if ((service->_incoming.count() > 0) &&
 103 mike            1.118               (service->_die.get() == 0) &&
 104                                     (service->_threads.get() < max_threads_per_svc_queue))
 105 denise.eckstein 1.116           {
 106                                    // The _threads count is used to track the 
 107                                    // number of active threads that have been allocated
 108                                    // to process messages for this service.
 109                       
 110                                    // The _threads count MUST be incremented while
 111                                    // the polling_routine owns the _polling_thread
 112                                    // lock and has ownership of the service object.
 113                       
 114                                    service->_threads++;
 115                                    try
 116                                    {
 117                                        rtn = _thread_pool->allocate_and_awaken(
 118                                             service, _req_proc, &_polling_sem);
 119                                    }
 120                                    catch (...)
 121                                    {
 122                                        service->_threads--;
 123                       
 124                                        // allocate_and_awaken should never generate an exception.
 125                                        PEGASUS_ASSERT(0);
 126 denise.eckstein 1.116              }
 127                                    // if no more threads available, break from processing loop
 128                                    if (rtn != PEGASUS_THREAD_OK )
 129                                    {
 130                                        service->_threads--;
 131                                        Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
 132                                           "Not enough threads to process this request. Skipping.");
 133                       
 134                                        Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 135                                           "Could not allocate thread for %s. " \
 136                                           "Queue has %d messages waiting and %d threads servicing." \
 137                                           "Skipping the service for right now. ",
 138                                           service->getQueueName(),
 139                                           service->_incoming.count(),
 140 mike            1.118                     service->_threads.get());
 141 denise.eckstein 1.116 
 142 mike            1.124                  Threads::yield();
 143 denise.eckstein 1.116                  service = NULL;
 144                                     } 
 145                                 }
 146                                 if (service != NULL) 
 147                                 {
 148 mike            1.120              service = list->next_of(service);
 149 denise.eckstein 1.116           }
 150                             }
 151 mday            1.53        list->unlock();
 152                          }
 153 mike            1.124    myself->exit_self( (ThreadReturnType) 1 );
 154 mday            1.53     return(0);
 155                       }
 156                       
 157                       
 158                       Semaphore MessageQueueService::_polling_sem(0);
 159                       AtomicInt MessageQueueService::_stop_polling(0);
 160                       
 161 mday            1.15  
 162 kumpf           1.104 MessageQueueService::MessageQueueService(
 163                          const char *name,
 164                          Uint32 queueID,
 165                          Uint32 capabilities,
 166                          Uint32 mask)
 167 mday            1.15     : Base(name, true,  queueID),
 168 mday            1.1        _mask(mask),
 169 mday            1.4        _die(0),
 170 denise.eckstein 1.116      _threads(0),
 171 mike            1.124      _incoming(),
 172 konrad.r        1.106      _incoming_queue_shutdown(0)
 173 kumpf           1.104 {
 174 jim.wunderlich  1.110 
 175 kumpf           1.104    _capabilities = (capabilities | module_capabilities::async);
 176 mday            1.53  
 177 mday            1.1      _default_op_timeout.tv_sec = 30;
 178                          _default_op_timeout.tv_usec = 100;
 179 mday            1.15  
 180 jim.wunderlich  1.110    max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
 181                       
 182 kumpf           1.117    // if requested thread max is out of range, then set to
 183                          // MAX_THREADS_PER_SVC_QUEUE_LIMIT
 184 jim.wunderlich  1.110 
 185 kumpf           1.117    if ((max_threads_per_svc_queue < 1) ||
 186                              (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
 187                          {
 188 jim.wunderlich  1.110        max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
 189 kumpf           1.117    }
 190 jim.wunderlich  1.110 
 191 kumpf           1.117    Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 192                             "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue);
 193 jim.wunderlich  1.110 
 194 a.arora         1.87     AutoMutex autoMut(_meta_dispatcher_mutex);
 195 kumpf           1.104 
 196                          if (_meta_dispatcher == 0)
 197 mday            1.15     {
 198 joyce.j         1.101       _stop_polling = 0;
 199 mike            1.118       PEGASUS_ASSERT(_service_count.get() == 0);
 200 mday            1.15        _meta_dispatcher = new cimom();
 201 kumpf           1.130 
 202 jim.wunderlich  1.110       //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
 203                             //   minimum_cnt, maximum_cnt, deallocateWait); 
 204                             //
 205 kumpf           1.99        _thread_pool =
 206 kumpf           1.104           new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
 207 mday            1.15     }
 208                          _service_count++;
 209                       
 210 kumpf           1.104    if (false == register_service(name, _capabilities, _mask))
 211 mday            1.15     {
 212 humberto        1.71        MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
 213 kumpf           1.130          "CIM base message queue service is unable to register with the CIMOM "
 214                                    "dispatcher.");
 215 humberto        1.71        throw BindFailedException(parms);
 216 mday            1.15     }
 217 kumpf           1.104 
 218 mike            1.120    _get_polling_list()->insert_back(this);
 219 mday            1.1   }
 220                       
 221 mday            1.4   
 222 kumpf           1.104 MessageQueueService::~MessageQueueService()
 223 mday            1.1   {
 224                          _die = 1;
 225 mday            1.76  
 226 denise.eckstein 1.116    // The polling_routine locks the _polling_list while
 227                          // processing the incoming messages for services on the 
 228                          // list.  Deleting the service from the _polling_list
 229                          // prior to processing, avoids synchronization issues
 230                          // with the _polling_routine.
 231                       
 232 mike            1.120    // ATTN: added to prevent assertion in List in which the list does not
 233                          // contain this element.
 234                       
 235                          if (_get_polling_list()->contains(this))
 236                              _get_polling_list()->remove(this);
 237 denise.eckstein 1.116 
 238                          // ATTN: The code for closing the _incoming queue
 239                          // is not working correctly. In OpenPegasus 2.5,
 240                          // execution of the following code is very timing
 241                          // dependent. This needs to be fix.
 242                          // See Bug 4079 for details. 
 243 mike            1.118    if (_incoming_queue_shutdown.get() == 0)
 244 mday            1.76     {
 245 denise.eckstein 1.116        _shutdown_incoming_queue();
 246                          }
 247 konrad.r        1.107 
 248 denise.eckstein 1.116    // Wait until all threads processing the messages
 249                          // for this service have completed.
 250                       
 251 mike            1.118    while (_threads.get() > 0)
 252 denise.eckstein 1.116    {
 253 mike            1.124       Threads::yield();
 254 mday            1.76     }
 255 kumpf           1.104 
 256 mday            1.15     {
 257 a.arora         1.87       AutoMutex autoMut(_meta_dispatcher_mutex);
 258                            _service_count--;
 259 mike            1.118      if (_service_count.get() == 0)
 260 a.arora         1.87       {
 261 mday            1.76  
 262 mday            1.53        _stop_polling++;
 263                             _polling_sem.signal();
 264 kumpf           1.104       if (_polling_thread) {
 265                                 _polling_thread->join();
 266                                 delete _polling_thread;
 267                                 _polling_thread = 0;
 268                             }
 269 mday            1.15        _meta_dispatcher->_shutdown_routed_queue();
 270                             delete _meta_dispatcher;
 271 kumpf           1.45        _meta_dispatcher = 0;
 272 mday            1.76  
 273 kumpf           1.65        delete _thread_pool;
 274                             _thread_pool = 0;
 275 a.arora         1.87       }
 276                          } // mutex unlocks here
 277 kumpf           1.104    // Clean up in case there are extra stuff on the queue.
 278 konrad.r        1.72    while (_incoming.count())
 279                         {
 280 konrad.r        1.109     try { 
 281 mike            1.120       delete _incoming.dequeue();
 282 kumpf           1.125     } catch (const ListClosed&)
 283 konrad.r        1.109     {
 284                             // If the list is closed, there is nothing we can do. 
 285                             break;
 286                           }
 287 konrad.r        1.72    }
 288 kumpf           1.104 }
 289 mday            1.1   
 290 kumpf           1.104 void MessageQueueService::_shutdown_incoming_queue()
 291 mday            1.7   {
 292 mike            1.118    if (_incoming_queue_shutdown.get() > 0)
 293 kumpf           1.104       return;
 294                       
 295                          AsyncIoctl *msg = new AsyncIoctl(
 296                             0,
 297                             _queueId,
 298                             _queueId,
 299                             true,
 300                             AsyncIoctl::IO_CLOSE,
 301                             0,
 302                             0);
 303 mday            1.9   
 304 mday            1.8      msg->op = get_op();
 305 mday            1.41     msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 306 kumpf           1.104    msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
 307                              | ASYNC_OPFLAGS_SIMPLE_STATUS);
 308 mday            1.32     msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 309 mday            1.41  
 310 mday            1.32     msg->op->_op_dest = this;
 311 kumpf           1.123    msg->op->_request.reset(msg);
 312 konrad.r        1.108    try {
 313 mike            1.120      _incoming.enqueue_wait(msg->op);
 314 konrad.r        1.108      _polling_sem.signal();
 315                          } catch (const ListClosed &) 
 316                          { 
 317 denise.eckstein 1.116         // This means the queue has already been shut-down (happens  when there
 318 konrad.r        1.108     // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
 319                           // processed.
 320                            delete msg; 
 321                          }
 322 konrad.r        1.112    catch (const Permission &)
 323                          {
 324                            delete msg;
 325                          }
 326 mday            1.7   }
 327                       
 328 mday            1.22  
 329                       
 330 kumpf           1.85  void MessageQueueService::enqueue(Message *msg)
 331 mday            1.22  {
 332 kumpf           1.28     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 333                       
 334 mday            1.22     Base::enqueue(msg);
 335 kumpf           1.28  
 336                          PEG_METHOD_EXIT();
 337 mday            1.22  }
 338                       
 339                       
 340 mike            1.124 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
 341 kumpf           1.103     void * parm)
 342                       {
 343 konrad.r        1.107     MessageQueueService* service =
 344                                   reinterpret_cast<MessageQueueService*>(parm);
 345                           PEGASUS_ASSERT(service != 0);
 346 kumpf           1.103     try
 347                           {
 348                       
 349 mike            1.118         if (service->_die.get() != 0)
 350 kumpf           1.103         {
 351 denise.eckstein 1.116             service->_threads--;
 352 kumpf           1.103             return (0);
 353                               }
 354                               // pull messages off the incoming queue and dispatch them. then
 355                               // check pending messages that are non-blocking
 356                               AsyncOpNode *operation = 0;
 357                       
 358                               // many operations may have been queued.
 359                               do
 360                               {
 361                                   try
 362                                   {
 363 mike            1.120                 operation = service->_incoming.dequeue();
 364 kumpf           1.103             }
 365                                   catch (ListClosed &)
 366                                   {
 367 kumpf           1.104                 // ATTN: This appears to be a common loop exit path.
 368                                       //PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 369                                       //    "Caught ListClosed exception.  Exiting _req_proc.");
 370 kumpf           1.103                 break;
 371                                   }
 372                       
 373                                   if (operation)
 374                                   {
 375                                      operation->_service_ptr = service;
 376                                      service->_handle_incoming_operation(operation);
 377                                   }
 378                               } while (operation);
 379                           }
 380                           catch (const Exception& e)
 381                           {
 382                               PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 383                                   String("Caught exception: \"") + e.getMessage() +
 384                                       "\".  Exiting _req_proc.");
 385                           }
 386                           catch (...)
 387                           {
 388                               PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 389                                   "Caught unrecognized exception.  Exiting _req_proc.");
 390                           }
 391 konrad.r        1.107     service->_threads--;
 392 kumpf           1.103     return(0);
 393 mday            1.1   }
 394                       
 395 mday            1.43  
 396 kumpf           1.104 void MessageQueueService::_sendwait_callback(
 397                           AsyncOpNode *op,
 398                           MessageQueue *q,
 399                           void *parm)
 400 mday            1.33  {
 401                          op->_client_sem.signal();
 402                       }
 403                       
 404 mday            1.30  
 405                       // callback function is responsible for cleaning up all resources
 406                       // including op, op->_callback_node, and op->_callback_ptr
 407 mday            1.22  void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 408                       {
 409 kumpf           1.104    if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
 410 mday            1.39     {
 411                       
 412 kumpf           1.123       Message *msg = op->removeRequest();
 413 kumpf           1.128       if (msg && (msg->getMask() & MessageMask::ha_async))
 414 mday            1.39        {
 415 kumpf           1.104          if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)
 416                                {
 417                                   AsyncLegacyOperationStart *wrapper =
 418                                      static_cast<AsyncLegacyOperationStart *>(msg);
 419                                   msg = wrapper->get_action();
 420                                   delete wrapper;
 421                                }
 422                                else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 423                                {
 424                                   AsyncModuleOperationStart *wrapper =
 425                                      static_cast<AsyncModuleOperationStart *>(msg);
 426                                   msg = wrapper->get_action();
 427                                   delete wrapper;
 428                                }
 429                                else if (msg->getType() == async_messages::ASYNC_OP_START)
 430                                {
 431                                   AsyncOperationStart *wrapper =
 432                                      static_cast<AsyncOperationStart *>(msg);
 433                                   msg = wrapper->get_action();
 434                                   delete wrapper;
 435                                }
 436 kumpf           1.104          delete msg;
 437 mday            1.39        }
 438                       
 439 kumpf           1.123       msg = op->removeResponse();
 440 kumpf           1.128       if (msg && (msg->getMask() & MessageMask::ha_async))
 441 mday            1.39        {
 442 kumpf           1.104          if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)
 443                                {
 444                                   AsyncLegacyOperationResult *wrapper =
 445                                      static_cast<AsyncLegacyOperationResult *>(msg);
 446                                   msg = wrapper->get_result();
 447                                   delete wrapper;
 448                                }
 449                                else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 450                                {
 451                                   AsyncModuleOperationResult *wrapper =
 452                                      static_cast<AsyncModuleOperationResult *>(msg);
 453                                   msg = wrapper->get_result();
 454                                   delete wrapper;
 455                                }
 456 mday            1.39        }
 457                             void (*callback)(Message *, void *, void *) = op->__async_callback;
 458                             void *handle = op->_callback_handle;
 459                             void *parm = op->_callback_parameter;
 460                             op->release();
 461                             return_op(op);
 462                             callback(msg, handle, parm);
 463                          }
 464 kumpf           1.104    else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
 465 mday            1.39     {
 466 kumpf           1.104       // note that _callback_node may be different from op
 467                             // op->_callback_response_q is a "this" pointer we can use for
 468 mday            1.39        // static callback methods
 469                             op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 470                          }
 471 mday            1.22  }
 472                       
 473 mday            1.1   
 474 kumpf           1.104 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
 475 mday            1.1   {
 476 kumpf           1.104    if (operation != 0)
 477 mday            1.5      {
 478 kumpf           1.104 
 479                       // ATTN: optimization
 480 mday            1.22  // << Tue Feb 19 14:10:38 2002 mdd >>
 481 mday            1.6         operation->lock();
 482 kumpf           1.104 
 483 kumpf           1.123       Message *rq = operation->_request.get();
 484 kumpf           1.104 
 485 mday            1.31  // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 486 kumpf           1.104 // move this to the bottom of the loop when the majority of
 487                       // messages become async messages.
 488 mday            1.31  
 489 mday            1.18        // divert legacy messages to handleEnqueue
 490 kumpf           1.128       if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
 491 mday            1.18        {
 492 kumpf           1.123          operation->_request.release();
 493 kumpf           1.104          operation->unlock();
 494                                // delete the op node
 495                                operation->release();
 496                                return_op(operation);
 497 mday            1.24  
 498 kumpf           1.104          handleEnqueue(rq);
 499                                return;
 500 mday            1.18        }
 501                       
 502 kumpf           1.104       if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
 503                                  operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
 504                                 (operation->_state & ASYNC_OPSTATE_COMPLETE))
 505 mday            1.29        {
 506 kumpf           1.104          operation->unlock();
 507                                _handle_async_callback(operation);
 508 mday            1.29        }
 509 kumpf           1.104       else
 510 mday            1.29        {
 511 kumpf           1.104          PEGASUS_ASSERT(rq != 0);
 512                                operation->unlock();
 513                                _handle_async_request(static_cast<AsyncRequest *>(rq));
 514 mday            1.29        }
 515 mday            1.5      }
 516                          return;
 517 mday            1.1   }
 518                       
 519                       void MessageQueueService::_handle_async_request(AsyncRequest *req)
 520                       {
 521 kumpf           1.104    if (req != 0)
 522 mday            1.4      {
 523                             req->op->processing();
 524 kumpf           1.104 
 525 mday            1.4         Uint32 type = req->getType();
 526 kumpf           1.104       if (type == async_messages::HEARTBEAT)
 527                                handle_heartbeat_request(req);
 528 mday            1.4         else if (type == async_messages::IOCTL)
 529 kumpf           1.104          handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 530 mday            1.4         else if (type == async_messages::CIMSERVICE_START)
 531 kumpf           1.104          handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 532 mday            1.4         else if (type == async_messages::CIMSERVICE_STOP)
 533 kumpf           1.104          handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 534 mday            1.4         else if (type == async_messages::CIMSERVICE_PAUSE)
 535 kumpf           1.104          handle_CimServicePause(static_cast<CimServicePause *>(req));
 536 mday            1.4         else if (type == async_messages::CIMSERVICE_RESUME)
 537 kumpf           1.104          handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 538                             else if (type == async_messages::ASYNC_OP_START)
 539                                handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 540                             else
 541 mday            1.4         {
 542 kumpf           1.104          // we don't handle this request message
 543                                _make_response(req, async_results::CIM_NAK);
 544 mday            1.4         }
 545 mday            1.1      }
 546                       }
 547                       
 548 mday            1.17  
 549                       Boolean MessageQueueService::_enqueueResponse(
 550 kumpf           1.104    Message* request,
 551 mday            1.17     Message* response)
 552                       {
 553 kumpf           1.37     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 554                                           "MessageQueueService::_enqueueResponse");
 555 mday            1.25  
 556 kumpf           1.128    if (request->getMask() & MessageMask::ha_async)
 557 mday            1.25     {
 558 kumpf           1.128       if (response->getMask() & MessageMask::ha_async)
 559 mday            1.25        {
 560 kumpf           1.104          _completeAsyncResponse(static_cast<AsyncRequest *>(request),
 561                                                       static_cast<AsyncReply *>(response),
 562                                                       ASYNC_OPSTATE_COMPLETE, 0);
 563 kumpf           1.37           PEG_METHOD_EXIT();
 564 kumpf           1.104          return true;
 565 mday            1.25        }
 566                          }
 567 kumpf           1.104 
 568                          if (request->_async != 0)
 569 mday            1.17     {
 570                             Uint32 mask = request->_async->getMask();
 571 kumpf           1.128       PEGASUS_ASSERT(mask & (MessageMask::ha_async | MessageMask::ha_request));
 572 kumpf           1.104 
 573 mday            1.18        AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 574                             AsyncOpNode *op = async->op;
 575                             request->_async = 0;
 576 mday            1.63        // the legacy request is going to be deleted by its handler
 577 kumpf           1.104       // remove it from the op node
 578 mday            1.63  
 579                             static_cast<AsyncLegacyOperationStart *>(async)->get_action();
 580 kumpf           1.104 
 581                             AsyncLegacyOperationResult *async_result =
 582                                new AsyncLegacyOperationResult(
 583                                   op,
 584                                   response);
 585                             _completeAsyncResponse(
 586                                async,
 587                                async_result,
 588                                ASYNC_OPSTATE_COMPLETE,
 589                                0);
 590 kumpf           1.37        PEG_METHOD_EXIT();
 591 mday            1.18        return true;
 592 mday            1.17     }
 593 kumpf           1.104 
 594 mday            1.18     // ensure that the destination queue is in response->dest
 595 kumpf           1.37     PEG_METHOD_EXIT();
 596 mday            1.24     return SendForget(response);
 597 kumpf           1.104 
 598 mday            1.17  }
 599                       
 600 mday            1.18  void MessageQueueService::_make_response(Message *req, Uint32 code)
 601 mday            1.1   {
 602 mday            1.19     cimom::_make_response(req, code);
 603 mday            1.1   }
 604                       
 605                       
 606 kumpf           1.104 void MessageQueueService::_completeAsyncResponse(
 607                           AsyncRequest *request,
 608                           AsyncReply *reply,
 609                           Uint32 state,
 610                           Uint32 flag)
 611 mday            1.5   {
 612 kumpf           1.37     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 613                                           "MessageQueueService::_completeAsyncResponse");
 614                       
 615 mday            1.19     cimom::_completeAsyncResponse(request, reply, state, flag);
 616 kumpf           1.37  
 617                          PEG_METHOD_EXIT();
 618 mday            1.5   }
 619                       
 620                       
 621 kumpf           1.104 void MessageQueueService::_complete_op_node(
 622                           AsyncOpNode *op,
 623                           Uint32 state,
 624                           Uint32 flag,
 625                           Uint32 code)
 626 mday            1.32  {
 627                          cimom::_complete_op_node(op, state, flag, code);
 628                       }
 629                       
 630 mday            1.5   
 631                       Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 632                       {
 633 mike            1.118    if (_incoming_queue_shutdown.get() > 0)
 634 mday            1.8         return false;
 635 kumpf           1.104    if (_polling_thread == NULL)
 636                          {
 637                             _polling_thread = new Thread(
 638                                 polling_routine,
 639 mike            1.120           reinterpret_cast<void *>(_get_polling_list()),
 640 kumpf           1.104           false);
 641 konrad.r        1.113       ThreadStatus tr = PEGASUS_THREAD_OK;
 642                             while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
 643 a.arora         1.93        {
 644 denise.eckstein 1.116         if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
 645 mike            1.124            Threads::yield();
 646 denise.eckstein 1.116         else
 647                                  throw Exception(MessageLoaderParms("Common.MessageQueueService.NOT_ENOUGH_THREAD",
 648                                               "Could not allocate thread for the polling thread."));
 649 a.arora         1.93        }
 650 kumpf           1.104    }
 651                       // ATTN optimization remove the message checking altogether in the base
 652 mday            1.20  // << Mon Feb 18 14:02:20 2002 mdd >>
 653 mday            1.6      op->lock();
 654 kumpf           1.123    Message *rq = op->_request.get();
 655                          Message *rp = op->_response.get();
 656 mday            1.6      op->unlock();
 657 kumpf           1.104 
 658                          if ((rq != 0 && (true == messageOK(rq))) ||
 659 mike            1.118        (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
 660 mday            1.5      {
 661 mike            1.120       _incoming.enqueue_wait(op);
 662 mday            1.53        _polling_sem.signal();
 663 mday            1.5         return true;
 664                          }
 665                          return false;
 666                       }
 667                       
 668                       Boolean MessageQueueService::messageOK(const Message *msg)
 669                       {
 670 mike            1.118    if (_incoming_queue_shutdown.get() > 0)
 671 mday            1.8         return false;
 672 mday            1.18     return true;
 673                       }
 674                       
 675 mday            1.1   void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 676                       {
 677 kumpf           1.104    // default action is to echo a heartbeat response
 678                       
 679                          AsyncReply *reply = new AsyncReply(
 680                             async_messages::HEARTBEAT,
 681                             0,
 682                             req->op,
 683                             async_results::OK,
 684                             req->resp,
 685                             false);
 686                          _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
 687 mday            1.1   }
 688                       
 689                       
 690                       void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
 691 kumpf           1.104 {
 692 mday            1.1   }
 693 kumpf           1.104 
 694 mday            1.1   void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 695                       {
 696 kumpf           1.104    switch (req->ctl)
 697 mday            1.8      {
 698                             case AsyncIoctl::IO_CLOSE:
 699                             {
 700 kumpf           1.104          MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
 701 kumpf           1.81  
 702                       #ifdef MESSAGEQUEUESERVICE_DEBUG
 703 kumpf           1.104          PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
 704 kumpf           1.81  #endif
 705 kumpf           1.104 
 706                                // respond to this message. this is fire and forget, so we don't need to delete anything.
 707                                // this takes care of two problems that were being found
 708                                // << Thu Oct  9 10:52:48 2003 mdd >>
 709                                 _make_response(req, async_results::OK);
 710                                // ensure we do not accept any further messages
 711                       
 712                                // ensure we don't recurse on IO_CLOSE
 713 mike            1.118          if (_incoming_queue_shutdown.get() > 0)
 714 kumpf           1.104             break;
 715                       
 716                                // set the closing flag
 717                                service->_incoming_queue_shutdown = 1;
 718                                // empty out the queue
 719                                while (1)
 720                                {
 721                                   AsyncOpNode *operation;
 722                                   try
 723                                   {
 724 mike            1.120                operation = service->_incoming.dequeue();
 725 kumpf           1.104             }
 726                                   catch(IPCException &)
 727                                   {
 728                                      break;
 729                                   }
 730                                   if (operation)
 731                                   {
 732                                      operation->_service_ptr = service;
 733                                      service->_handle_incoming_operation(operation);
 734                                   }
 735                                   else
 736                                      break;
 737                                } // message processing loop
 738                       
 739 mike            1.120          // shutdown the AsyncQueue
 740 mike            1.124          service->_incoming.close();
 741 kumpf           1.104          return;
 742 mday            1.8         }
 743                       
 744                             default:
 745 kumpf           1.104          _make_response(req, async_results::CIM_NAK);
 746 mday            1.8      }
 747 mday            1.1   }
 748 mday            1.8   
 749 mday            1.1   void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 750                       {
 751 mday            1.79  
 752 kumpf           1.81  #ifdef MESSAGEQUEUESERVICE_DEBUG
 753 mday            1.79     PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
 754 kumpf           1.81  #endif
 755 kumpf           1.104 
 756 mday            1.10     // clear the stoped bit and update
 757 mday            1.13     _capabilities &= (~(module_capabilities::stopped));
 758 mday            1.10     _make_response(req, async_results::OK);
 759 kumpf           1.104    // now tell the meta dispatcher we are stopped
 760 mday            1.10     update_service(_capabilities, _mask);
 761                       
 762 mday            1.1   }
 763                       void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 764                       {
 765 kumpf           1.81  #ifdef MESSAGEQUEUESERVICE_DEBUG
 766 mday            1.79     PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 767 kumpf           1.81  #endif
 768 mday            1.10     // set the stopeed bit and update
 769                          _capabilities |= module_capabilities::stopped;
 770                          _make_response(req, async_results::CIM_STOPPED);
 771 kumpf           1.104    // now tell the meta dispatcher we are stopped
 772 mday            1.10     update_service(_capabilities, _mask);
 773 mday            1.1   }
 774 kumpf           1.104 
 775 mday            1.1   void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 776                       {
 777 mday            1.10     // set the paused bit and update
 778 mday            1.13     _capabilities |= module_capabilities::paused;
 779 mday            1.11     update_service(_capabilities, _mask);
 780 mday            1.10     _make_response(req, async_results::CIM_PAUSED);
 781 kumpf           1.104    // now tell the meta dispatcher we are stopped
 782 mday            1.1   }
 783 kumpf           1.104 
 784 mday            1.1   void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 785                       {
 786 mday            1.10     // clear the paused  bit and update
 787 mday            1.13     _capabilities &= (~(module_capabilities::paused));
 788 mday            1.11     update_service(_capabilities, _mask);
 789 mday            1.10     _make_response(req, async_results::OK);
 790 kumpf           1.104    // now tell the meta dispatcher we are stopped
 791 mday            1.1   }
 792 kumpf           1.104 
 793 mday            1.1   void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 794                       {
 795                          _make_response(req, async_results::CIM_NAK);
 796                       }
 797                       
 798                       void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 799                       {
 800 mday            1.14     ;
 801                       }
 802                       
 803 mday            1.10  
 804 mday            1.14  void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
 805                       {
 806                          // remove the legacy message from the request and enqueue it to its destination
 807                          Uint32 result = async_results::CIM_NAK;
 808 kumpf           1.104 
 809 mday            1.25     Message *legacy = req->_act;
 810 kumpf           1.104    if (legacy != 0)
 811 mday            1.14     {
 812 mday            1.25        MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 813 kumpf           1.104       if (queue != 0)
 814 mday            1.14        {
 815 kumpf           1.104          if (queue->isAsync() == true)
 816                                {
 817                                   (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
 818                                }
 819                                else
 820                                {
 821                                   // Enqueue the response:
 822                                   queue->enqueue(req->get_action());
 823                                }
 824                       
 825                                result = async_results::OK;
 826 mday            1.14        }
 827                          }
 828                          _make_response(req, result);
 829                       }
 830                       
 831                       void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
 832                       {
 833                          ;
 834 mday            1.1   }
 835                       
 836 kumpf           1.104 AsyncOpNode *MessageQueueService::get_op()
 837 mday            1.1   {
 838 mday            1.4      AsyncOpNode *op = new AsyncOpNode();
 839 kumpf           1.104 
 840 mday            1.9      op->_state = ASYNC_OPSTATE_UNKNOWN;
 841                          op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 842 kumpf           1.104 
 843 mday            1.1      return op;
 844                       }
 845                       
 846                       void MessageQueueService::return_op(AsyncOpNode *op)
 847                       {
 848 kumpf           1.123    PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);
 849 mday            1.4      delete op;
 850 mday            1.1   }
 851                       
 852 mday            1.18  
 853 kumpf           1.104 Boolean MessageQueueService::SendAsync(
 854                           AsyncOpNode *op,
 855                           Uint32 destination,
 856                           void (*callback)(AsyncOpNode *, MessageQueue *, void *),
 857                           MessageQueue *callback_response_q,
 858                           void *callback_ptr)
 859                       {
 860                          PEGASUS_ASSERT(op != 0 && callback != 0);
 861                       
 862 mday            1.21     // get the queue handle for the destination
 863                       
 864 mday            1.30     op->lock();
 865 mday            1.32     op->_op_dest = MessageQueue::lookup(destination); // destination of this message
 866 mday            1.22     op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 867                          op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 868 mday            1.30     // initialize the callback data
 869 mday            1.32     op->_async_callback = callback;   // callback function to be executed by recpt. of response
 870                          op->_callback_node = op;          // the op node
 871                          op->_callback_response_q = callback_response_q;  // the queue that will receive the response
 872                          op->_callback_ptr = callback_ptr;   // user data for callback
 873                          op->_callback_request_q = this;     // I am the originator of this request
 874 kumpf           1.104 
 875 mday            1.30     op->unlock();
 876 kumpf           1.104    if (op->_op_dest == 0)
 877 mday            1.30        return false;
 878 kumpf           1.104 
 879 mday            1.21     return  _meta_dispatcher->route_async(op);
 880 mday            1.18  }
 881                       
 882                       
 883 kumpf           1.104 Boolean MessageQueueService::SendAsync(
 884                           Message *msg,
 885                           Uint32 destination,
 886                           void (*callback)(Message *response, void *handle, void *parameter),
 887                           void *handle,
 888                           void *parameter)
 889 mday            1.39  {
 890 kumpf           1.104    if (msg == NULL)
 891 mday            1.39        return false;
 892 kumpf           1.104    if (callback == NULL)
 893 mday            1.39        return SendForget(msg);
 894                          AsyncOpNode *op = get_op();
 895 mday            1.43     msg->dest = destination;
 896 kumpf           1.104    if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 897 mday            1.39     {
 898                             op->release();
 899                             return_op(op);
 900                             return false;
 901                          }
 902                          op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 903                          op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 904                          op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 905                          op->__async_callback = callback;
 906                          op->_callback_node = op;
 907                          op->_callback_handle = handle;
 908                          op->_callback_parameter = parameter;
 909                          op->_callback_response_q = this;
 910                       
 911 kumpf           1.128    if (!(msg->getMask() & MessageMask::ha_async))
 912 mday            1.39     {
 913 kumpf           1.104       AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
 914                                op,
 915                                destination,
 916                                msg,
 917                                destination);
 918 mday            1.39     }
 919 kumpf           1.104    else
 920 mday            1.39     {
 921 kumpf           1.123       op->_request.reset(msg);
 922 mday            1.39        (static_cast<AsyncMessage *>(msg))->op = op;
 923                          }
 924                          return _meta_dispatcher->route_async(op);
 925                       }
 926                       
 927                       
 928 mday            1.18  Boolean MessageQueueService::SendForget(Message *msg)
 929                       {
 930                          AsyncOpNode *op = 0;
 931 mday            1.22     Uint32 mask = msg->getMask();
 932 kumpf           1.104 
 933 kumpf           1.128    if (mask & MessageMask::ha_async)
 934 mday            1.18     {
 935                             op = (static_cast<AsyncMessage *>(msg))->op ;
 936                          }
 937 mday            1.22  
 938 kumpf           1.104    if (op == 0)
 939 mday            1.20     {
 940 mday            1.18        op = get_op();
 941 kumpf           1.123       op->_request.reset(msg);
 942 kumpf           1.128       if (mask & MessageMask::ha_async)
 943 kumpf           1.104       {
 944                                (static_cast<AsyncMessage *>(msg))->op = op;
 945                             }
 946 mday            1.20     }
 947 mday            1.30     op->_op_dest = MessageQueue::lookup(msg->dest);
 948 mday            1.22     op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 949 kumpf           1.104    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
 950                              | ASYNC_OPFLAGS_SIMPLE_STATUS);
 951 mday            1.22     op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 952 kumpf           1.104    if (op->_op_dest == 0)
 953 mday            1.39     {
 954                             op->release();
 955                             return_op(op);
 956 mday            1.21        return false;
 957 mday            1.39     }
 958 kumpf           1.104 
 959 mday            1.18     // now see if the meta dispatcher will take it
 960 mday            1.30     return  _meta_dispatcher->route_async(op);
 961 mday            1.18  }
 962 mday            1.2   
 963 mday            1.1   
 964 mday            1.4   AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
 965 mday            1.1   {
 966 kumpf           1.104    if (request == 0)
 967 mday            1.4         return 0 ;
 968 mday            1.5   
 969                          Boolean destroy_op = false;
 970 kumpf           1.104 
 971 s.hills         1.80     if (request->op == 0)
 972 mday            1.5      {
 973                             request->op = get_op();
 974 kumpf           1.123       request->op->_request.reset(request);
 975 mday            1.5         destroy_op = true;
 976                          }
 977 kumpf           1.104 
 978 mday            1.33     request->block = false;
 979 mday            1.35     request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
 980 kumpf           1.104    SendAsync(
 981                             request->op,
 982                             request->dest,
 983                             _sendwait_callback,
 984                             this,
 985                             (void *)0);
 986                       
 987 kumpf           1.130    request->op->_client_sem.wait();
 988 baggio.br       1.100 
 989 kumpf           1.123    AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse());
 990 mday            1.6      rpl->op = 0;
 991 kumpf           1.104 
 992                          if (destroy_op == true)
 993 mday            1.5      {
 994 mday            1.6         request->op->lock();
 995 kumpf           1.123       request->op->_request.release();
 996 mday            1.6         request->op->_state |= ASYNC_OPSTATE_RELEASED;
 997                             request->op->unlock();
 998                             return_op(request->op);
 999 mday            1.7         request->op = 0;
1000 mday            1.5      }
1001                          return rpl;
1002 mday            1.1   }
1003                       
1004                       
1005 kumpf           1.104 Boolean MessageQueueService::register_service(
1006                           String name,
1007                           Uint32 capabilities,
1008                           Uint32 mask)
1009                       {
1010                          RegisterCimService *msg = new RegisterCimService(
1011                             0,
1012                             true,
1013                             name,
1014                             capabilities,
1015                             mask,
1016                             _queueId);
1017                          msg->dest = CIMOM_Q_ID;
1018 mday            1.1   
1019                          Boolean registered = false;
1020 kumpf           1.104    AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1021                       
1022                          if (reply != 0)
1023                          {
1024 kumpf           1.128       if (reply->getMask() & MessageMask::ha_async)
1025 kumpf           1.104       {
1026 kumpf           1.128          if (reply->getMask() & MessageMask::ha_reply)
1027 kumpf           1.104          {
1028                                   if (reply->result == async_results::OK ||
1029                                       reply->result == async_results::MODULE_ALREADY_REGISTERED)
1030                                   {
1031                                       registered = true;
1032                                   }
1033                                }
1034 mday            1.1         }
1035 kumpf           1.104 
1036                             delete reply;
1037 mday            1.1      }
1038 mday            1.5      delete msg;
1039 mday            1.1      return registered;
1040                       }
1041                       
1042                       Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1043                       {
1044 kumpf           1.104    UpdateCimService *msg = new UpdateCimService(
1045                             0,
1046                             true,
1047                             _queueId,
1048                             _capabilities,
1049                             _mask);
1050 mday            1.1      Boolean registered = false;
1051 mday            1.2   
1052                          AsyncMessage *reply = SendWait(msg);
1053                          if (reply)
1054 mday            1.1      {
1055 kumpf           1.128       if (reply->getMask() & MessageMask::ha_async)
1056 mday            1.1         {
1057 kumpf           1.128          if (reply->getMask() & MessageMask::ha_reply)
1058 kumpf           1.104          {
1059                                   if (static_cast<AsyncReply *>(reply)->result == async_results::OK)
1060                                   {
1061                                      registered = true;
1062                                   }
1063                                }
1064 mday            1.1         }
1065                             delete reply;
1066                          }
1067 mday            1.5      delete msg;
1068 mday            1.1      return registered;
1069                       }
1070                       
1071                       
1072 kumpf           1.104 Boolean MessageQueueService::deregister_service()
1073 mday            1.1   {
1074 mday            1.3   
1075 mday            1.5      _meta_dispatcher->deregister_module(_queueId);
1076                          return true;
1077 mday            1.1   }
1078                       
1079                       
1080 kumpf           1.104 void MessageQueueService::find_services(
1081                           String name,
1082                           Uint32 capabilities,
1083                           Uint32 mask,
1084                           Array<Uint32> *results)
1085 mday            1.1   {
1086 kumpf           1.104    if (results == 0)
1087                          {
1088 mday            1.1         throw NullPointer();
1089 kumpf           1.104    }
1090                       
1091 mday            1.1      results->clear();
1092 kumpf           1.104 
1093                          FindServiceQueue *req = new FindServiceQueue(
1094                             0,
1095                             _queueId,
1096                             true,
1097                             name,
1098                             capabilities,
1099                             mask);
1100                       
1101 mday            1.44     req->dest = CIMOM_Q_ID;
1102 kumpf           1.104 
1103                          AsyncMessage *reply = SendWait(req);
1104                          if (reply)
1105                          {
1106 kumpf           1.128       if (reply->getMask() & MessageMask::ha_async)
1107 kumpf           1.104       {
1108 kumpf           1.128          if (reply->getMask() & MessageMask::ha_reply)
1109 kumpf           1.104          {
1110                                   if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1111                                   {
1112                                      if ((static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK)
1113                                         *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1114                                   }
1115                                }
1116 mday            1.1         }
1117                             delete reply;
1118                          }
1119 mday            1.5      delete req;
1120 mday            1.1      return ;
1121                       }
1122                       
1123                       void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1124                       {
1125 kumpf           1.104    if (result == 0)
1126                          {
1127 mday            1.1         throw NullPointer();
1128 kumpf           1.104    }
1129                       
1130                          EnumerateService *req = new EnumerateService(
1131                             0,
1132                             _queueId,
1133                             true,
1134                             queue);
1135                       
1136 mday            1.2      AsyncMessage *reply = SendWait(req);
1137 kumpf           1.104 
1138 mday            1.2      if (reply)
1139 mday            1.1      {
1140                             Boolean found = false;
1141 kumpf           1.104 
1142 kumpf           1.128       if (reply->getMask() & MessageMask::ha_async)
1143 mday            1.1         {
1144 kumpf           1.128          if (reply->getMask() & MessageMask::ha_reply)
1145 kumpf           1.104          {
1146                                   if (reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1147                                   {
1148                                      if ((static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK)
1149                                      {
1150                                         if (found == false)
1151                                         {
1152                                            found = true;
1153                       
1154                                            result->put_name((static_cast<EnumerateServiceResponse *>(reply))->name);
1155                                            result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1156                                            result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1157                                            result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1158                                         }
1159                                      }
1160                                   }
1161                                }
1162 mday            1.1         }
1163                             delete reply;
1164                          }
1165 mday            1.5      delete req;
1166 kumpf           1.104 
1167 mday            1.1      return;
1168                       }
1169                       
1170 mike            1.120 MessageQueueService::PollingList* MessageQueueService::_get_polling_list()
1171                       {
1172                           _polling_list_mutex.lock();
1173                       
1174                           if (!_polling_list)
1175                       	_polling_list = new PollingList;
1176                       
1177                           _polling_list_mutex.unlock();
1178 kumpf           1.104 
1179 mike            1.120     return _polling_list;
1180 mday            1.1   }
1181                       
1182                       PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2