(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.119 //%2006////////////////////////////////////////////////////////////////////////
   2 mday  1.1   //
   3 karl  1.88  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6 karl  1.82  // IBM Corp.; EMC Corporation, The Open Group.
   7 karl  1.88  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9 karl  1.91  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 karl  1.119 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  12             // EMC Corporation; Symantec Corporation; The Open Group.
  13 mday  1.1   //
  14             // Permission is hereby granted, free of charge, to any person obtaining a copy
  15             // of this software and associated documentation files (the "Software"), to
  16             // deal in the Software without restriction, including without limitation the
  17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  18             // sell copies of the Software, and to permit persons to whom the Software is
  19             // furnished to do so, subject to the following conditions:
  20 karl  1.82  // 
  21 mday  1.1   // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  22             // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  24             // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  27             // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29             //
  30             //==============================================================================
  31             //
  32             //%/////////////////////////////////////////////////////////////////////////////
  33             
  34             #include "MessageQueueService.h"
  35 mday  1.22  #include <Pegasus/Common/Tracer.h>
  36 kumpf 1.126 #include <Pegasus/Common/MessageLoader.h>
  37 mday  1.1   
  38             PEGASUS_NAMESPACE_BEGIN
  39             
  40 mday  1.15  cimom *MessageQueueService::_meta_dispatcher = 0;
  41 mike  1.118 AtomicInt MessageQueueService::_service_count(0);
  42 mday  1.38  Mutex MessageQueueService::_meta_dispatcher_mutex;
  43 mday  1.15  
  44 karl  1.133.2.3 static struct timeval deallocateWait = {30, 0};
  45                 
  46                 // set max threads in a single thread pool. Since there is only
  47                 // a single threadpool, this sets the maximum threads that can
  48                 // be used minus permanent threads and provider added threads.
  49                 const Uint32 maximumThreadsInPool = 20;
  50 mday  1.53      
  51 mday  1.55      ThreadPool *MessageQueueService::_thread_pool = 0;
  52 mday  1.53      
  53 mike  1.120     MessageQueueService::PollingList* MessageQueueService::_polling_list;
  54                 Mutex MessageQueueService::_polling_list_mutex;
  55 mday  1.53      
  56 kumpf 1.62      Thread* MessageQueueService::_polling_thread = 0;
  57 mday  1.61      
  58 kumpf 1.104     ThreadPool *MessageQueueService::get_thread_pool()
  59 mday  1.69      {
  60                    return _thread_pool;
  61                 }
  62 kumpf 1.117     
  63 jim.wunderlich 1.110     //
  64 kumpf          1.117     // MAX_THREADS_PER_SVC_QUEUE
  65 jim.wunderlich 1.110     //
  66                          // JR Wunderlich Jun 6, 2005
  67                          //
  68                          
  69 kumpf          1.131     #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
  70 jim.wunderlich 1.114     #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
  71 jim.wunderlich 1.110     
  72 kumpf          1.117     #ifndef MAX_THREADS_PER_SVC_QUEUE
  73                          # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
  74                          #endif
  75                          
  76 jim.wunderlich 1.110     Uint32 max_threads_per_svc_queue;
  77 mday           1.69      
  78 kumpf          1.131     ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(
  79                              void* parm)
  80 mday           1.53      {
  81 kumpf          1.131         Thread *myself = reinterpret_cast<Thread *>(parm);
  82                              List<MessageQueueService, Mutex> *list =
  83                                  reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm());
  84                          
  85                              while (_stop_polling.get()  == 0)
  86                              {
  87                                  _polling_sem.wait();
  88                          
  89                                  if (_stop_polling.get() != 0)
  90                                  {
  91                                      break;
  92                                  }
  93                          
  94                                  // The polling_routine thread must hold the lock on the
  95                                  // _polling_list while processing incoming messages.
  96                                  // This lock is used to give this thread ownership of
  97                                  // services on the _polling_routine list.
  98                          
  99                                  // This is necessary to avoid confict with other threads
 100                                  // processing the _polling_list
 101                                  // (e.g., MessageQueueServer::~MessageQueueService).
 102 kumpf          1.131     
 103                                  list->lock();
 104                                  MessageQueueService *service = list->front();
 105                                  ThreadStatus rtn = PEGASUS_THREAD_OK;
 106                                  while (service != NULL)
 107                                  {
 108                                      if ((service->_incoming.count() > 0) &&
 109                                          (service->_die.get() == 0) &&
 110                                          (service->_threads.get() < max_threads_per_svc_queue))
 111                                      {
 112                                          // The _threads count is used to track the
 113                                          // number of active threads that have been allocated
 114                                          // to process messages for this service.
 115                          
 116                                          // The _threads count MUST be incremented while
 117                                          // the polling_routine owns the _polling_thread
 118                                          // lock and has ownership of the service object.
 119                          
 120                                          service->_threads++;
 121                                          try
 122                                          {
 123 kumpf          1.131                         rtn = _thread_pool->allocate_and_awaken(
 124                                                  service, _req_proc, &_polling_sem);
 125                                          }
 126                                          catch (...)
 127                                          {
 128                                              service->_threads--;
 129                          
 130                                              // allocate_and_awaken should never generate an exception.
 131                                              PEGASUS_ASSERT(0);
 132                                          }
 133                                          // if no more threads available, break from processing loop
 134                                          if (rtn != PEGASUS_THREAD_OK )
 135                                          {
 136                                              service->_threads--;
 137                                              Logger::put(
 138                                                  Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
 139                                                  "Not enough threads to process this request. "
 140                                                      "Skipping.");
 141                          
 142 marek          1.132                         PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 143 kumpf          1.131                             "Could not allocate thread for %s.  Queue has %d "
 144                                                      "messages waiting and %d threads servicing."
 145                                                      "Skipping the service for right now. ",
 146                                                  service->getQueueName(),
 147                                                  service->_incoming.count(),
 148 marek          1.132                             service->_threads.get()));
 149 kumpf          1.131     
 150                                              Threads::yield();
 151                                              service = NULL;
 152                                          }
 153                                      }
 154                                      if (service != NULL)
 155                                      {
 156                                          service = list->next_of(service);
 157                                      }
 158                                  }
 159                                  list->unlock();
 160                              }
 161                              myself->exit_self( (ThreadReturnType) 1 );
 162                              return 0;
 163 mday           1.53      }
 164                          
 165                          
 166                          Semaphore MessageQueueService::_polling_sem(0);
 167                          AtomicInt MessageQueueService::_stop_polling(0);
 168                          
 169 mday           1.15      
 170 kumpf          1.104     MessageQueueService::MessageQueueService(
 171 kumpf          1.131         const char* name,
 172                              Uint32 queueID,
 173                              Uint32 capabilities,
 174                              Uint32 mask)
 175                              : Base(name, true,  queueID),
 176                                _mask(mask),
 177                                _die(0),
 178                                _threads(0),
 179                                _incoming(),
 180                                _incoming_queue_shutdown(0)
 181                          {
 182                              _capabilities = (capabilities | module_capabilities::async);
 183                          
 184                              _default_op_timeout.tv_sec = 30;
 185                              _default_op_timeout.tv_usec = 100;
 186                          
 187                              max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
 188                          
 189                              // if requested thread max is out of range, then set to
 190                              // MAX_THREADS_PER_SVC_QUEUE_LIMIT
 191                          
 192 kumpf          1.131         if ((max_threads_per_svc_queue < 1) ||
 193                                  (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
 194                              {
 195                                  max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
 196                              }
 197                          
 198 marek          1.132         PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 199                                 "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue));
 200 kumpf          1.131     
 201                              AutoMutex autoMut(_meta_dispatcher_mutex);
 202                          
 203                              if (_meta_dispatcher == 0)
 204                              {
 205                                  _stop_polling = 0;
 206                                  PEGASUS_ASSERT(_service_count.get() == 0);
 207                                  _meta_dispatcher = new cimom();
 208                          
 209                                  //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
 210                                  //   minimum_cnt, maximum_cnt, deallocateWait);
 211                                  //
 212 karl           1.133.2.3 
 213                                  _thread_pool = new ThreadPool(0, "MessageQueueService", 0,
 214                                                         maximumThreadsInPool, deallocateWait);
 215 kumpf          1.131         }
 216                              _service_count++;
 217                          
 218                              if (false == register_service(name, _capabilities, _mask))
 219                              {
 220                                  MessageLoaderParms parms(
 221                                      "Common.MessageQueueService.UNABLE_TO_REGISTER",
 222                                      "CIM base message queue service is unable to register with the "
 223                                          "CIMOM dispatcher.");
 224                                  throw BindFailedException(parms);
 225                              }
 226 kumpf          1.104     
 227 kumpf          1.131         _get_polling_list()->insert_back(this);
 228 mday           1.1       }
 229                          
 230 mday           1.4       
 231 kumpf          1.104     MessageQueueService::~MessageQueueService()
 232 mday           1.1       {
 233 kumpf          1.131         _die = 1;
 234                          
 235                              // The polling_routine locks the _polling_list while
 236                              // processing the incoming messages for services on the
 237                              // list.  Deleting the service from the _polling_list
 238                              // prior to processing, avoids synchronization issues
 239                              // with the _polling_routine.
 240                          
 241                              // ATTN: added to prevent assertion in List in which the list does not
 242                              // contain this element.
 243                          
 244                              if (_get_polling_list()->contains(this))
 245                                  _get_polling_list()->remove(this);
 246                          
 247                              // ATTN: The code for closing the _incoming queue
 248                              // is not working correctly. In OpenPegasus 2.5,
 249                              // execution of the following code is very timing
 250                              // dependent. This needs to be fix.
 251                              // See Bug 4079 for details.
 252                              if (_incoming_queue_shutdown.get() == 0)
 253                              {
 254 kumpf          1.131             _shutdown_incoming_queue();
 255                              }
 256 mday           1.76      
 257 kumpf          1.131         // Wait until all threads processing the messages
 258                              // for this service have completed.
 259                          
 260                              while (_threads.get() > 0)
 261 konrad.r       1.109         {
 262 kumpf          1.131             Threads::yield();
 263                              }
 264                          
 265                              {
 266                                  AutoMutex autoMut(_meta_dispatcher_mutex);
 267                                  _service_count--;
 268                                  if (_service_count.get() == 0)
 269                                  {
 270                          
 271                                      _stop_polling++;
 272                                      _polling_sem.signal();
 273                                      if (_polling_thread)
 274                                      {
 275                                          _polling_thread->join();
 276                                          delete _polling_thread;
 277                                          _polling_thread = 0;
 278                                      }
 279                                      _meta_dispatcher->_shutdown_routed_queue();
 280                                      delete _meta_dispatcher;
 281                                      _meta_dispatcher = 0;
 282                          
 283 kumpf          1.131                 delete _thread_pool;
 284                                      _thread_pool = 0;
 285                                  }
 286                              } // mutex unlocks here
 287                              // Clean up in case there are extra stuff on the queue.
 288                              while (_incoming.count())
 289                              {
 290                                  try
 291                                  {
 292                                      delete _incoming.dequeue();
 293                                  }
 294                                  catch (const ListClosed&)
 295                                  {
 296                                      // If the list is closed, there is nothing we can do.
 297                                      break;
 298                                  }
 299 konrad.r       1.109         }
 300 kumpf          1.104     }
 301 mday           1.1       
 302 kumpf          1.104     void MessageQueueService::_shutdown_incoming_queue()
 303 mday           1.7       {
 304 kumpf          1.131         if (_incoming_queue_shutdown.get() > 0)
 305                                  return;
 306                          
 307                              AsyncIoctl *msg = new AsyncIoctl(
 308                                  0,
 309                                  _queueId,
 310                                  _queueId,
 311                                  true,
 312                                  AsyncIoctl::IO_CLOSE,
 313                                  0,
 314                                  0);
 315                          
 316                              msg->op = get_op();
 317                              msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 318                              msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
 319                                  | ASYNC_OPFLAGS_SIMPLE_STATUS);
 320                              msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 321 kumpf          1.104     
 322 kumpf          1.131         msg->op->_op_dest = this;
 323                              msg->op->_request.reset(msg);
 324                              try
 325                              {
 326                                  _incoming.enqueue_wait(msg->op);
 327                                  _polling_sem.signal();
 328                              }
 329                              catch (const ListClosed&)
 330                              {
 331 denise.eckstein 1.116             // This means the queue has already been shut-down (happens  when there
 332 kumpf           1.131             // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
 333                                   // processed.
 334                                   delete msg;
 335                               }
 336                               catch (const Permission&)
 337                               {
 338                                   delete msg;
 339                               }
 340 mday            1.7       }
 341                           
 342 mday            1.22      
 343                           
 344 kumpf           1.131     void MessageQueueService::enqueue(Message* msg)
 345 mday            1.22      {
 346 kumpf           1.131         PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 347 kumpf           1.28      
 348 kumpf           1.131         Base::enqueue(msg);
 349 kumpf           1.28      
 350 kumpf           1.131         PEG_METHOD_EXIT();
 351 mday            1.22      }
 352                           
 353                           
 354 mike            1.124     ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
 355 kumpf           1.131         void* parm)
 356 kumpf           1.103     {
 357 konrad.r        1.107         MessageQueueService* service =
 358 kumpf           1.131             reinterpret_cast<MessageQueueService*>(parm);
 359 konrad.r        1.107         PEGASUS_ASSERT(service != 0);
 360 kumpf           1.103         try
 361                               {
 362 mike            1.118             if (service->_die.get() != 0)
 363 kumpf           1.103             {
 364 denise.eckstein 1.116                 service->_threads--;
 365 kumpf           1.131                 return 0;
 366 kumpf           1.103             }
 367                                   // pull messages off the incoming queue and dispatch them. then
 368                                   // check pending messages that are non-blocking
 369                                   AsyncOpNode *operation = 0;
 370                           
 371                                   // many operations may have been queued.
 372                                   do
 373                                   {
 374                                       try
 375                                       {
 376 mike            1.120                     operation = service->_incoming.dequeue();
 377 kumpf           1.103                 }
 378 kumpf           1.131                 catch (ListClosed&)
 379 kumpf           1.103                 {
 380 kumpf           1.104                     // ATTN: This appears to be a common loop exit path.
 381 marek           1.132                     //PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 382 kumpf           1.104                     //    "Caught ListClosed exception.  Exiting _req_proc.");
 383 kumpf           1.103                     break;
 384                                       }
 385                           
 386                                       if (operation)
 387                                       {
 388                                          operation->_service_ptr = service;
 389                                          service->_handle_incoming_operation(operation);
 390                                       }
 391                                   } while (operation);
 392                               }
 393                               catch (const Exception& e)
 394                               {
 395                                   PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 396                                       String("Caught exception: \"") + e.getMessage() +
 397                                           "\".  Exiting _req_proc.");
 398                               }
 399                               catch (...)
 400                               {
 401 marek           1.132             PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 402 kumpf           1.103                 "Caught unrecognized exception.  Exiting _req_proc.");
 403                               }
 404 konrad.r        1.107         service->_threads--;
 405 kumpf           1.131         return 0;
 406 mday            1.1       }
 407                           
 408 mday            1.43      
 409 kumpf           1.104     void MessageQueueService::_sendwait_callback(
 410 kumpf           1.131         AsyncOpNode* op,
 411                               MessageQueue* q,
 412 kumpf           1.104         void *parm)
 413 mday            1.33      {
 414 kumpf           1.131         op->_client_sem.signal();
 415 mday            1.33      }
 416                           
 417 mday            1.30      
 418                           // callback function is responsible for cleaning up all resources
 419                           // including op, op->_callback_node, and op->_callback_ptr
 420 kumpf           1.131     void MessageQueueService::_handle_async_callback(AsyncOpNode* op)
 421 mday            1.22      {
 422 kumpf           1.131         if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
 423                               {
 424                                   Message *msg = op->removeRequest();
 425                                   if (msg && (msg->getMask() & MessageMask::ha_async))
 426                                   {
 427 kumpf           1.133                 if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_START)
 428 kumpf           1.131                 {
 429                                           AsyncLegacyOperationStart *wrapper =
 430                                               static_cast<AsyncLegacyOperationStart *>(msg);
 431                                           msg = wrapper->get_action();
 432                                           delete wrapper;
 433                                       }
 434 kumpf           1.133                 else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_START)
 435 kumpf           1.131                 {
 436                                           AsyncModuleOperationStart *wrapper =
 437                                               static_cast<AsyncModuleOperationStart *>(msg);
 438                                           msg = wrapper->get_action();
 439                                           delete wrapper;
 440                                       }
 441 kumpf           1.133                 else if (msg->getType() == ASYNC_ASYNC_OP_START)
 442 kumpf           1.131                 {
 443                                           AsyncOperationStart *wrapper =
 444                                               static_cast<AsyncOperationStart *>(msg);
 445                                           msg = wrapper->get_action();
 446                                           delete wrapper;
 447                                       }
 448                                       delete msg;
 449                                   }
 450 mday            1.39      
 451 kumpf           1.131             msg = op->removeResponse();
 452                                   if (msg && (msg->getMask() & MessageMask::ha_async))
 453                                   {
 454 kumpf           1.133                 if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_RESULT)
 455 kumpf           1.131                 {
 456                                           AsyncLegacyOperationResult *wrapper =
 457                                               static_cast<AsyncLegacyOperationResult *>(msg);
 458                                           msg = wrapper->get_result();
 459                                           delete wrapper;
 460                                       }
 461 kumpf           1.133                 else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_RESULT)
 462 kumpf           1.131                 {
 463                                           AsyncModuleOperationResult *wrapper =
 464                                               static_cast<AsyncModuleOperationResult *>(msg);
 465                                           msg = wrapper->get_result();
 466                                           delete wrapper;
 467                                       }
 468                                   }
 469                                   void (*callback)(Message *, void *, void *) = op->__async_callback;
 470                                   void *handle = op->_callback_handle;
 471                                   void *parm = op->_callback_parameter;
 472                                   op->release();
 473                                   return_op(op);
 474                                   callback(msg, handle, parm);
 475                               }
 476                               else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
 477                               {
 478                                   // note that _callback_node may be different from op
 479                                   // op->_callback_response_q is a "this" pointer we can use for
 480                                   // static callback methods
 481                                   op->_async_callback(
 482                                       op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 483 kumpf           1.131         }
 484 mday            1.22      }
 485                           
 486 mday            1.1       
 487 kumpf           1.131     void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation)
 488 mday            1.1       {
 489 kumpf           1.131         if (operation != 0)
 490                               {
 491 kumpf           1.104     
 492                           // ATTN: optimization
 493 mday            1.22      // << Tue Feb 19 14:10:38 2002 mdd >>
 494 kumpf           1.131             operation->lock();
 495 kumpf           1.104     
 496 kumpf           1.131             Message *rq = operation->_request.get();
 497 kumpf           1.104     
 498 mday            1.31      // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 499 kumpf           1.104     // move this to the bottom of the loop when the majority of
 500                           // messages become async messages.
 501 mday            1.31      
 502 kumpf           1.131             // divert legacy messages to handleEnqueue
 503                                   if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
 504                                   {
 505                                       operation->_request.release();
 506                                       operation->unlock();
 507                                       // delete the op node
 508                                       operation->release();
 509                                       return_op(operation);
 510                           
 511                                       handleEnqueue(rq);
 512                                       return;
 513                                   }
 514                           
 515                                   if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
 516                                        operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
 517                                       (operation->_state & ASYNC_OPSTATE_COMPLETE))
 518                                   {
 519                                       operation->unlock();
 520                                       _handle_async_callback(operation);
 521                                   }
 522                                   else
 523 kumpf           1.131             {
 524                                       PEGASUS_ASSERT(rq != 0);
 525                                       operation->unlock();
 526                                       _handle_async_request(static_cast<AsyncRequest *>(rq));
 527                                   }
 528                               }
 529                               return;
 530 mday            1.1       }
 531                           
 532                           void MessageQueueService::_handle_async_request(AsyncRequest *req)
 533                           {
 534 kumpf           1.131         if (req != 0)
 535                               {
 536                                   req->op->processing();
 537                           
 538 kumpf           1.133             MessageType type = req->getType();
 539                                   if (type == ASYNC_HEARTBEAT)
 540 kumpf           1.131                 handle_heartbeat_request(req);
 541 kumpf           1.133             else if (type == ASYNC_IOCTL)
 542 kumpf           1.131                 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 543 kumpf           1.133             else if (type == ASYNC_CIMSERVICE_START)
 544 kumpf           1.131                 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 545 kumpf           1.133             else if (type == ASYNC_CIMSERVICE_STOP)
 546 kumpf           1.131                 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 547 kumpf           1.133             else if (type == ASYNC_CIMSERVICE_PAUSE)
 548 kumpf           1.131                 handle_CimServicePause(static_cast<CimServicePause *>(req));
 549 kumpf           1.133             else if (type == ASYNC_CIMSERVICE_RESUME)
 550 kumpf           1.131                 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 551 kumpf           1.133             else if (type == ASYNC_ASYNC_OP_START)
 552 kumpf           1.131                 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 553                                   else
 554                                   {
 555                                       // we don't handle this request message
 556                                       _make_response(req, async_results::CIM_NAK);
 557                                   }
 558                               }
 559 mday            1.1       }
 560                           
 561 mday            1.17      
 562                           Boolean MessageQueueService::_enqueueResponse(
 563 kumpf           1.131         Message* request,
 564                               Message* response)
 565 mday            1.17      {
 566 kumpf           1.131         PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 567                                   "MessageQueueService::_enqueueResponse");
 568 mday            1.25      
 569 kumpf           1.131         if (request->getMask() & MessageMask::ha_async)
 570                               {
 571                                   if (response->getMask() & MessageMask::ha_async)
 572                                   {
 573                                       _completeAsyncResponse(
 574                                           static_cast<AsyncRequest *>(request),
 575                                           static_cast<AsyncReply *>(response),
 576                                           ASYNC_OPSTATE_COMPLETE, 0);
 577                                       PEG_METHOD_EXIT();
 578                                       return true;
 579                                   }
 580                               }
 581 mday            1.63      
 582 kumpf           1.131         if (request->_async != 0)
 583                               {
 584                                   Uint32 mask = request->_async->getMask();
 585                                   PEGASUS_ASSERT(mask &
 586                                       (MessageMask::ha_async | MessageMask::ha_request));
 587                           
 588                                   AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 589                                   AsyncOpNode *op = async->op;
 590                                   request->_async = 0;
 591                                   // the legacy request is going to be deleted by its handler
 592                                   // remove it from the op node
 593                           
 594                                   static_cast<AsyncLegacyOperationStart *>(async)->get_action();
 595                           
 596                                   AsyncLegacyOperationResult *async_result =
 597                                       new AsyncLegacyOperationResult(
 598                                           op,
 599                                           response);
 600                                   _completeAsyncResponse(
 601                                       async,
 602                                       async_result,
 603 kumpf           1.131                 ASYNC_OPSTATE_COMPLETE,
 604                                       0);
 605                                   PEG_METHOD_EXIT();
 606                                   return true;
 607                               }
 608 kumpf           1.104     
 609 kumpf           1.131         // ensure that the destination queue is in response->dest
 610                               PEG_METHOD_EXIT();
 611                               return SendForget(response);
 612 mday            1.17      }
 613                           
 614 kumpf           1.131     void MessageQueueService::_make_response(Message* req, Uint32 code)
 615 mday            1.1       {
 616 kumpf           1.131         cimom::_make_response(req, code);
 617 mday            1.1       }
 618                           
 619                           
 620 kumpf           1.104     void MessageQueueService::_completeAsyncResponse(
 621 kumpf           1.131         AsyncRequest* request,
 622                               AsyncReply* reply,
 623 kumpf           1.104         Uint32 state,
 624                               Uint32 flag)
 625 mday            1.5       {
 626 kumpf           1.131         PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 627                                   "MessageQueueService::_completeAsyncResponse");
 628 kumpf           1.37      
 629 kumpf           1.131         cimom::_completeAsyncResponse(request, reply, state, flag);
 630 kumpf           1.37      
 631 kumpf           1.131         PEG_METHOD_EXIT();
 632 mday            1.5       }
 633                           
 634                           
 635 kumpf           1.104     void MessageQueueService::_complete_op_node(
 636 kumpf           1.131         AsyncOpNode* op,
 637 kumpf           1.104         Uint32 state,
 638                               Uint32 flag,
 639                               Uint32 code)
 640 mday            1.32      {
 641 kumpf           1.131         cimom::_complete_op_node(op, state, flag, code);
 642 mday            1.32      }
 643                           
 644 mday            1.5       
 645 kumpf           1.131     Boolean MessageQueueService::accept_async(AsyncOpNode* op)
 646 mday            1.5       {
 647 kumpf           1.131         if (_incoming_queue_shutdown.get() > 0)
 648                                   return false;
 649                               if (_polling_thread == NULL)
 650                               {
 651                                   _polling_thread = new Thread(
 652                                       polling_routine,
 653                                       reinterpret_cast<void *>(_get_polling_list()),
 654                                       false);
 655                                   ThreadStatus tr = PEGASUS_THREAD_OK;
 656                                   while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
 657                                   {
 658                                       if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
 659                                           Threads::yield();
 660                                       else
 661                                           throw Exception(MessageLoaderParms(
 662                                               "Common.MessageQueueService.NOT_ENOUGH_THREAD",
 663                                               "Could not allocate thread for the polling thread."));
 664                                   }
 665                               }
 666 kumpf           1.104     // ATTN optimization remove the message checking altogether in the base
 667 mday            1.20      // << Mon Feb 18 14:02:20 2002 mdd >>
 668 kumpf           1.131         op->lock();
 669                               Message *rq = op->_request.get();
 670                               Message *rp = op->_response.get();
 671                               op->unlock();
 672 kumpf           1.104     
 673 kumpf           1.131         if ((rq != 0 && (true == messageOK(rq))) ||
 674                                   (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
 675                               {
 676                                   _incoming.enqueue_wait(op);
 677                                   _polling_sem.signal();
 678                                   return true;
 679                               }
 680                               return false;
 681 mday            1.5       }
 682                           
 683 kumpf           1.131     Boolean MessageQueueService::messageOK(const Message* msg)
 684 mday            1.5       {
 685 kumpf           1.131         if (_incoming_queue_shutdown.get() > 0)
 686                                   return false;
 687                               return true;
 688 mday            1.18      }
 689                           
 690 kumpf           1.131     void MessageQueueService::handle_heartbeat_request(AsyncRequest* req)
 691 mday            1.1       {
 692 kumpf           1.131         // default action is to echo a heartbeat response
 693 kumpf           1.104     
 694 kumpf           1.131         AsyncReply *reply = new AsyncReply(
 695 kumpf           1.133             ASYNC_HEARTBEAT,
 696 kumpf           1.131             0,
 697                                   req->op,
 698                                   async_results::OK,
 699                                   req->resp,
 700                                   false);
 701                               _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
 702 mday            1.1       }
 703                           
 704                           
 705 kumpf           1.131     void MessageQueueService::handle_heartbeat_reply(AsyncReply* rep)
 706 kumpf           1.104     {
 707 mday            1.1       }
 708 kumpf           1.104     
 709 kumpf           1.131     void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req)
 710 mday            1.1       {
 711 kumpf           1.131         switch (req->ctl)
 712                               {
 713                                   case AsyncIoctl::IO_CLOSE:
 714                                   {
 715                                       MessageQueueService *service =
 716                                           static_cast<MessageQueueService *>(req->op->_service_ptr);
 717 kumpf           1.81      
 718                           #ifdef MESSAGEQUEUESERVICE_DEBUG
 719 kumpf           1.131                 PEGASUS_STD(cout) << service->getQueueName() <<
 720                                           " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
 721 kumpf           1.81      #endif
 722 kumpf           1.104     
 723 kumpf           1.131                 // respond to this message. this is fire and forget, so we
 724                                       // don't need to delete anything.
 725                                       // this takes care of two problems that were being found
 726                                       // << Thu Oct  9 10:52:48 2003 mdd >>
 727                                       _make_response(req, async_results::OK);
 728                                       // ensure we do not accept any further messages
 729 kumpf           1.104     
 730 kumpf           1.131                 // ensure we don't recurse on IO_CLOSE
 731                                       if (_incoming_queue_shutdown.get() > 0)
 732                                           break;
 733 kumpf           1.104     
 734 kumpf           1.131                 // set the closing flag
 735                                       service->_incoming_queue_shutdown = 1;
 736                                       // empty out the queue
 737                                       while (1)
 738 kumpf           1.104                 {
 739 kumpf           1.131                     AsyncOpNode *operation;
 740                                           try
 741                                           {
 742                                               operation = service->_incoming.dequeue();
 743                                           }
 744                                           catch (IPCException&)
 745                                           {
 746                                               break;
 747                                           }
 748                                           if (operation)
 749                                           {
 750                                               operation->_service_ptr = service;
 751                                               service->_handle_incoming_operation(operation);
 752                                           }
 753                                           else
 754                                               break;
 755                                       } // message processing loop
 756                           
 757                                       // shutdown the AsyncQueue
 758                                       service->_incoming.close();
 759                                       return;
 760 kumpf           1.131             }
 761 kumpf           1.104     
 762 kumpf           1.131             default:
 763                                       _make_response(req, async_results::CIM_NAK);
 764                               }
 765 mday            1.1       }
 766 mday            1.8       
 767 kumpf           1.131     void MessageQueueService::handle_CimServiceStart(CimServiceStart* req)
 768 mday            1.1       {
 769 kumpf           1.81      #ifdef MESSAGEQUEUESERVICE_DEBUG
 770 kumpf           1.131         PEGASUS_STD(cout) << getQueueName() << "received START" <<
 771                                   PEGASUS_STD(endl);
 772 kumpf           1.81      #endif
 773 kumpf           1.104     
 774 kumpf           1.131         // clear the stoped bit and update
 775                               _capabilities &= (~(module_capabilities::stopped));
 776                               _make_response(req, async_results::OK);
 777                               // now tell the meta dispatcher we are stopped
 778                               update_service(_capabilities, _mask);
 779                           }
 780 mday            1.10      
 781 kumpf           1.131     void MessageQueueService::handle_CimServiceStop(CimServiceStop* req)
 782 mday            1.1       {
 783 kumpf           1.81      #ifdef MESSAGEQUEUESERVICE_DEBUG
 784 kumpf           1.131         PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 785 kumpf           1.81      #endif
 786 kumpf           1.131         // set the stopeed bit and update
 787                               _capabilities |= module_capabilities::stopped;
 788                               _make_response(req, async_results::CIM_STOPPED);
 789                               // now tell the meta dispatcher we are stopped
 790                               update_service(_capabilities, _mask);
 791 mday            1.1       }
 792 kumpf           1.104     
 793 kumpf           1.131     void MessageQueueService::handle_CimServicePause(CimServicePause* req)
 794 mday            1.1       {
 795 kumpf           1.131         // set the paused bit and update
 796                               _capabilities |= module_capabilities::paused;
 797                               update_service(_capabilities, _mask);
 798                               _make_response(req, async_results::CIM_PAUSED);
 799                               // now tell the meta dispatcher we are stopped
 800 mday            1.1       }
 801 kumpf           1.104     
 802 kumpf           1.131     void MessageQueueService::handle_CimServiceResume(CimServiceResume* req)
 803 mday            1.1       {
 804 kumpf           1.131         // clear the paused  bit and update
 805                               _capabilities &= (~(module_capabilities::paused));
 806                               update_service(_capabilities, _mask);
 807                               _make_response(req, async_results::OK);
 808                               // now tell the meta dispatcher we are stopped
 809 mday            1.1       }
 810 kumpf           1.104     
 811 kumpf           1.131     void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart* req)
 812 mday            1.1       {
 813 kumpf           1.131         _make_response(req, async_results::CIM_NAK);
 814 mday            1.1       }
 815                           
 816 kumpf           1.131     void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult* req)
 817 mday            1.1       {
 818 mday            1.14      }
 819                           
 820 mday            1.10      
 821 kumpf           1.131     void MessageQueueService::handle_AsyncLegacyOperationStart(
 822                               AsyncLegacyOperationStart* req)
 823 mday            1.14      {
 824 kumpf           1.131         // remove the legacy message from the request and enqueue it to its
 825                               // destination
 826                               Uint32 result = async_results::CIM_NAK;
 827 kumpf           1.104     
 828 kumpf           1.131         Message* legacy = req->_act;
 829                               if (legacy != 0)
 830                               {
 831                                   MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 832                                   if (queue != 0)
 833                                   {
 834                                       if (queue->isAsync() == true)
 835                                       {
 836                                           (static_cast<MessageQueueService *>(queue))->handleEnqueue(
 837                                               legacy);
 838                                       }
 839                                       else
 840                                       {
 841                                           // Enqueue the response:
 842                                           queue->enqueue(req->get_action());
 843                                       }
 844 kumpf           1.104     
 845 kumpf           1.131                 result = async_results::OK;
 846                                   }
 847                               }
 848                               _make_response(req, result);
 849 mday            1.14      }
 850                           
 851 kumpf           1.131     void MessageQueueService::handle_AsyncLegacyOperationResult(
 852                               AsyncLegacyOperationResult* rep)
 853 mday            1.14      {
 854 mday            1.1       }
 855                           
 856 kumpf           1.131     AsyncOpNode* MessageQueueService::get_op()
 857 mday            1.1       {
 858 kumpf           1.131        AsyncOpNode* op = new AsyncOpNode();
 859 kumpf           1.104     
 860 mday            1.9          op->_state = ASYNC_OPSTATE_UNKNOWN;
 861                              op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 862 kumpf           1.104     
 863 mday            1.1          return op;
 864                           }
 865                           
 866 kumpf           1.131     void MessageQueueService::return_op(AsyncOpNode* op)
 867 mday            1.1       {
 868 kumpf           1.131         PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);
 869                               delete op;
 870 mday            1.1       }
 871                           
 872 mday            1.18      
 873 kumpf           1.104     Boolean MessageQueueService::SendAsync(
 874 kumpf           1.131         AsyncOpNode* op,
 875 kumpf           1.104         Uint32 destination,
 876 kumpf           1.131         void (*callback)(AsyncOpNode*, MessageQueue*, void*),
 877                               MessageQueue* callback_response_q,
 878                               void* callback_ptr)
 879                           {
 880                               PEGASUS_ASSERT(op != 0 && callback != 0);
 881                           
 882                               // get the queue handle for the destination
 883                           
 884                               op->lock();
 885                               // destination of this message
 886                               op->_op_dest = MessageQueue::lookup(destination);
 887                               op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 888                               op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 889                               // initialize the callback data
 890                               // callback function to be executed by recpt. of response
 891                               op->_async_callback = callback;
 892                               // the op node
 893                               op->_callback_node = op;
 894                               // the queue that will receive the response
 895                               op->_callback_response_q = callback_response_q;
 896                               // user data for callback
 897 kumpf           1.131         op->_callback_ptr = callback_ptr;
 898                               // I am the originator of this request
 899                               op->_callback_request_q = this;
 900                           
 901                               op->unlock();
 902                               if (op->_op_dest == 0)
 903                                   return false;
 904 kumpf           1.104     
 905 kumpf           1.131         return  _meta_dispatcher->route_async(op);
 906 mday            1.18      }
 907                           
 908                           
 909 kumpf           1.104     Boolean MessageQueueService::SendAsync(
 910 kumpf           1.131         Message* msg,
 911 kumpf           1.104         Uint32 destination,
 912 kumpf           1.131         void (*callback)(Message* response, void* handle, void* parameter),
 913                               void* handle,
 914                               void* parameter)
 915                           {
 916                               if (msg == NULL)
 917                                   return false;
 918                               if (callback == NULL)
 919                                   return SendForget(msg);
 920                               AsyncOpNode *op = get_op();
 921                               msg->dest = destination;
 922                               if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 923                               {
 924                                   op->release();
 925                                   return_op(op);
 926                                   return false;
 927                               }
 928                               op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 929                               op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 930                               op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 931                               op->__async_callback = callback;
 932                               op->_callback_node = op;
 933 kumpf           1.131         op->_callback_handle = handle;
 934                               op->_callback_parameter = parameter;
 935                               op->_callback_response_q = this;
 936                           
 937                               if (!(msg->getMask() & MessageMask::ha_async))
 938                               {
 939                                   AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
 940                                       op,
 941                                       destination,
 942                                       msg,
 943                                       destination);
 944                               }
 945                               else
 946                               {
 947                                   op->_request.reset(msg);
 948                                   (static_cast<AsyncMessage *>(msg))->op = op;
 949                               }
 950                               return _meta_dispatcher->route_async(op);
 951                           }
 952                           
 953                           
 954 kumpf           1.131     Boolean MessageQueueService::SendForget(Message* msg)
 955                           {
 956                               AsyncOpNode* op = 0;
 957                               Uint32 mask = msg->getMask();
 958                           
 959                               if (mask & MessageMask::ha_async)
 960                               {
 961                                   op = (static_cast<AsyncMessage *>(msg))->op;
 962                               }
 963                           
 964                               if (op == 0)
 965                               {
 966                                   op = get_op();
 967                                   op->_request.reset(msg);
 968                                   if (mask & MessageMask::ha_async)
 969                                   {
 970                                       (static_cast<AsyncMessage *>(msg))->op = op;
 971                                   }
 972                               }
 973                               op->_op_dest = MessageQueue::lookup(msg->dest);
 974                               op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 975 kumpf           1.131         op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
 976                                   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 977                               op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 978                               if (op->_op_dest == 0)
 979                               {
 980                                   op->release();
 981                                   return_op(op);
 982                                   return false;
 983                               }
 984                           
 985                               // now see if the meta dispatcher will take it
 986                               return  _meta_dispatcher->route_async(op);
 987                           }
 988                           
 989                           
 990                           AsyncReply *MessageQueueService::SendWait(AsyncRequest* request)
 991                           {
 992                               if (request == 0)
 993                                   return 0;
 994                           
 995                               Boolean destroy_op = false;
 996 kumpf           1.131     
 997                               if (request->op == 0)
 998                               {
 999                                   request->op = get_op();
1000                                   request->op->_request.reset(request);
1001                                   destroy_op = true;
1002                               }
1003                           
1004                               request->block = false;
1005                               request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
1006                               SendAsync(
1007                                   request->op,
1008                                   request->dest,
1009                                   _sendwait_callback,
1010                                   this,
1011                                   (void *)0);
1012                           
1013                               request->op->_client_sem.wait();
1014                           
1015                               AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse());
1016                               rpl->op = 0;
1017 kumpf           1.131     
1018                               if (destroy_op == true)
1019                               {
1020                                   request->op->lock();
1021                                   request->op->_request.release();
1022                                   request->op->_state |= ASYNC_OPSTATE_RELEASED;
1023                                   request->op->unlock();
1024                                   return_op(request->op);
1025                                   request->op = 0;
1026                               }
1027                               return rpl;
1028 mday            1.1       }
1029                           
1030                           
1031 kumpf           1.104     Boolean MessageQueueService::register_service(
1032                               String name,
1033                               Uint32 capabilities,
1034                               Uint32 mask)
1035                           {
1036 kumpf           1.131         RegisterCimService *msg = new RegisterCimService(
1037                                   0,
1038                                   true,
1039                                   name,
1040                                   capabilities,
1041                                   mask,
1042                                   _queueId);
1043                               msg->dest = CIMOM_Q_ID;
1044                           
1045                               Boolean registered = false;
1046                               AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1047                           
1048                               if (reply != 0)
1049                               {
1050                                   if (reply->getMask() & MessageMask::ha_async)
1051                                   {
1052                                       if (reply->getMask() & MessageMask::ha_reply)
1053 kumpf           1.104                 {
1054 kumpf           1.131                     if (reply->result == async_results::OK ||
1055                                               reply->result == async_results::MODULE_ALREADY_REGISTERED)
1056                                           {
1057                                               registered = true;
1058                                           }
1059 kumpf           1.104                 }
1060 kumpf           1.131             }
1061 kumpf           1.104     
1062 kumpf           1.131             delete reply;
1063                               }
1064                               delete msg;
1065                               return registered;
1066 mday            1.1       }
1067                           
1068                           Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1069                           {
1070 kumpf           1.131         UpdateCimService *msg = new UpdateCimService(
1071                                   0,
1072                                   true,
1073                                   _queueId,
1074                                   _capabilities,
1075                                   _mask);
1076                               Boolean registered = false;
1077                           
1078                               AsyncMessage* reply = SendWait(msg);
1079                               if (reply)
1080                               {
1081                                   if (reply->getMask() & MessageMask::ha_async)
1082                                   {
1083                                       if (reply->getMask() & MessageMask::ha_reply)
1084 kumpf           1.104                 {
1085 kumpf           1.131                     if (static_cast<AsyncReply *>(reply)->result ==
1086                                                   async_results::OK)
1087                                           {
1088                                               registered = true;
1089                                           }
1090 kumpf           1.104                 }
1091 kumpf           1.131             }
1092                                   delete reply;
1093                               }
1094                               delete msg;
1095                               return registered;
1096 mday            1.1       }
1097                           
1098                           
1099 kumpf           1.104     Boolean MessageQueueService::deregister_service()
1100 mday            1.1       {
1101 kumpf           1.131         _meta_dispatcher->deregister_module(_queueId);
1102                               return true;
1103 mday            1.1       }
1104                           
1105                           
1106 kumpf           1.104     void MessageQueueService::find_services(
1107                               String name,
1108                               Uint32 capabilities,
1109                               Uint32 mask,
1110 kumpf           1.131         Array<Uint32>* results)
1111 mday            1.1       {
1112 kumpf           1.131         if (results == 0)
1113                               {
1114                                   throw NullPointer();
1115                               }
1116                           
1117                               results->clear();
1118                           
1119                               FindServiceQueue *req = new FindServiceQueue(
1120                                   0,
1121                                   _queueId,
1122                                   true,
1123                                   name,
1124                                   capabilities,
1125                                   mask);
1126                           
1127                               req->dest = CIMOM_Q_ID;
1128                           
1129                               AsyncMessage *reply = SendWait(req);
1130                               if (reply)
1131                               {
1132                                   if (reply->getMask() & MessageMask::ha_async)
1133 kumpf           1.131             {
1134                                       if (reply->getMask() & MessageMask::ha_reply)
1135 kumpf           1.104                 {
1136 kumpf           1.133                     if (reply->getType() == ASYNC_FIND_SERVICE_Q_RESULT)
1137 kumpf           1.131                     {
1138                                               if ((static_cast<FindServiceQueueResult*>(reply))->result ==
1139                                                       async_results::OK)
1140                                                   *results =
1141                                                       (static_cast<FindServiceQueueResult*>(reply))->qids;
1142                                           }
1143 kumpf           1.104                 }
1144 kumpf           1.131             }
1145                                   delete reply;
1146                               }
1147                               delete req;
1148                           }
1149                           
1150                           void MessageQueueService::enumerate_service(
1151                               Uint32 queue,
1152                               message_module* result)
1153                           {
1154                               if (result == 0)
1155                               {
1156                                   throw NullPointer();
1157                               }
1158                           
1159                               EnumerateService *req = new EnumerateService(
1160                                   0,
1161                                   _queueId,
1162                                   true,
1163                                   queue);
1164                           
1165 kumpf           1.131         AsyncMessage* reply = SendWait(req);
1166                           
1167                               if (reply)
1168                               {
1169                                   Boolean found = false;
1170                           
1171                                   if (reply->getMask() & MessageMask::ha_async)
1172                                   {
1173                                       if (reply->getMask() & MessageMask::ha_reply)
1174 kumpf           1.104                 {
1175 kumpf           1.133                     if (reply->getType() == ASYNC_ENUMERATE_SERVICE_RESULT)
1176 kumpf           1.131                     {
1177                                               if ((static_cast<EnumerateServiceResponse*>(reply))->
1178                                                       result == async_results::OK)
1179                                               {
1180                                                   if (found == false)
1181                                                   {
1182                                                       found = true;
1183                           
1184                                                       result->put_name((static_cast<
1185                                                           EnumerateServiceResponse*>(reply))->name);
1186                                                       result->put_capabilities((static_cast<
1187                                                           EnumerateServiceResponse*>(reply))->
1188                                                               capabilities);
1189                                                       result->put_mask((static_cast<
1190                                                           EnumerateServiceResponse*>(reply))->mask);
1191                                                       result->put_queue((static_cast<
1192                                                           EnumerateServiceResponse*>(reply))->qid);
1193                                                   }
1194                                               }
1195                                           }
1196 kumpf           1.104                 }
1197 kumpf           1.131             }
1198                                   delete reply;
1199                               }
1200                               delete req;
1201 mday            1.1       }
1202                           
1203 mike            1.120     MessageQueueService::PollingList* MessageQueueService::_get_polling_list()
1204                           {
1205                               _polling_list_mutex.lock();
1206                           
1207                               if (!_polling_list)
1208 kumpf           1.131             _polling_list = new PollingList;
1209 mike            1.120     
1210                               _polling_list_mutex.unlock();
1211 kumpf           1.104     
1212 mike            1.120         return _polling_list;
1213 mday            1.1       }
1214                           
1215                           PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2