(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.119 //%2006////////////////////////////////////////////////////////////////////////
   2 mday  1.1   //
   3 karl  1.88  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6 karl  1.82  // IBM Corp.; EMC Corporation, The Open Group.
   7 karl  1.88  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9 karl  1.91  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 karl  1.119 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  12             // EMC Corporation; Symantec Corporation; The Open Group.
  13 mday  1.1   //
  14             // Permission is hereby granted, free of charge, to any person obtaining a copy
  15             // of this software and associated documentation files (the "Software"), to
  16             // deal in the Software without restriction, including without limitation the
  17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  18             // sell copies of the Software, and to permit persons to whom the Software is
  19             // furnished to do so, subject to the following conditions:
  20 karl  1.82  // 
  21 mday  1.1   // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  22             // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  24             // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  27             // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29             //
  30             //==============================================================================
  31             //
  32             //%/////////////////////////////////////////////////////////////////////////////
  33             
  34             #include "MessageQueueService.h"
  35 mday  1.22  #include <Pegasus/Common/Tracer.h>
  36 kumpf 1.126 #include <Pegasus/Common/MessageLoader.h>
  37 mday  1.1   
  38             PEGASUS_NAMESPACE_BEGIN
  39             
  40 mday  1.15  cimom *MessageQueueService::_meta_dispatcher = 0;
  41 mike  1.118 AtomicInt MessageQueueService::_service_count(0);
  42 mday  1.38  Mutex MessageQueueService::_meta_dispatcher_mutex;
  43 mday  1.15  
  44 kumpf 1.104 static struct timeval deallocateWait = {300, 0};
  45 mday  1.53  
  46 mday  1.55  ThreadPool *MessageQueueService::_thread_pool = 0;
  47 mday  1.53  
  48 mike  1.120 MessageQueueService::PollingList* MessageQueueService::_polling_list;
  49             Mutex MessageQueueService::_polling_list_mutex;
  50 mday  1.53  
  51 kumpf 1.62  Thread* MessageQueueService::_polling_thread = 0;
  52 mday  1.61  
  53 kumpf 1.104 ThreadPool *MessageQueueService::get_thread_pool()
  54 mday  1.69  {
  55                return _thread_pool;
  56             }
  57 kumpf 1.117 
  58 jim.wunderlich 1.110 //
  59 kumpf          1.117 // MAX_THREADS_PER_SVC_QUEUE
  60 jim.wunderlich 1.110 //
  61                      // JR Wunderlich Jun 6, 2005
  62                      //
  63                      
  64 kumpf          1.131 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
  65 jim.wunderlich 1.114 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
  66 jim.wunderlich 1.110 
  67 kumpf          1.117 #ifndef MAX_THREADS_PER_SVC_QUEUE
  68                      # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
  69                      #endif
  70                      
  71 jim.wunderlich 1.110 Uint32 max_threads_per_svc_queue;
  72 mday           1.69  
  73 kumpf          1.131 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(
  74                          void* parm)
  75 mday           1.53  {
  76 kumpf          1.131     Thread *myself = reinterpret_cast<Thread *>(parm);
  77                          List<MessageQueueService, Mutex> *list =
  78                              reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm());
  79                      
  80                          while (_stop_polling.get()  == 0)
  81                          {
  82                              _polling_sem.wait();
  83                      
  84                              if (_stop_polling.get() != 0)
  85                              {
  86                                  break;
  87                              }
  88                      
  89                              // The polling_routine thread must hold the lock on the
  90                              // _polling_list while processing incoming messages.
  91                              // This lock is used to give this thread ownership of
  92                              // services on the _polling_routine list.
  93                      
  94                              // This is necessary to avoid confict with other threads
  95                              // processing the _polling_list
  96                              // (e.g., MessageQueueServer::~MessageQueueService).
  97 kumpf          1.131 
  98                              list->lock();
  99                              MessageQueueService *service = list->front();
 100                              ThreadStatus rtn = PEGASUS_THREAD_OK;
 101                              while (service != NULL)
 102                              {
 103                                  if ((service->_incoming.count() > 0) &&
 104                                      (service->_die.get() == 0) &&
 105                                      (service->_threads.get() < max_threads_per_svc_queue))
 106                                  {
 107                                      // The _threads count is used to track the
 108                                      // number of active threads that have been allocated
 109                                      // to process messages for this service.
 110                      
 111                                      // The _threads count MUST be incremented while
 112                                      // the polling_routine owns the _polling_thread
 113                                      // lock and has ownership of the service object.
 114                      
 115                                      service->_threads++;
 116                                      try
 117                                      {
 118 kumpf          1.131                     rtn = _thread_pool->allocate_and_awaken(
 119                                              service, _req_proc, &_polling_sem);
 120                                      }
 121                                      catch (...)
 122                                      {
 123                                          service->_threads--;
 124                      
 125                                          // allocate_and_awaken should never generate an exception.
 126                                          PEGASUS_ASSERT(0);
 127                                      }
 128                                      // if no more threads available, break from processing loop
 129                                      if (rtn != PEGASUS_THREAD_OK )
 130                                      {
 131                                          service->_threads--;
 132                                          Logger::put(
 133                                              Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
 134                                              "Not enough threads to process this request. "
 135                                                  "Skipping.");
 136                      
 137 marek          1.132                     PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 138 kumpf          1.131                         "Could not allocate thread for %s.  Queue has %d "
 139                                                  "messages waiting and %d threads servicing."
 140                                                  "Skipping the service for right now. ",
 141                                              service->getQueueName(),
 142                                              service->_incoming.count(),
 143 marek          1.132                         service->_threads.get()));
 144 kumpf          1.131 
 145                                          Threads::yield();
 146                                          service = NULL;
 147                                      }
 148                                  }
 149                                  if (service != NULL)
 150                                  {
 151                                      service = list->next_of(service);
 152                                  }
 153                              }
 154                              list->unlock();
 155                          }
 156                          myself->exit_self( (ThreadReturnType) 1 );
 157                          return 0;
 158 mday           1.53  }
 159                      
 160                      
 161                      Semaphore MessageQueueService::_polling_sem(0);
 162                      AtomicInt MessageQueueService::_stop_polling(0);
 163                      
 164 mday           1.15  
 165 kumpf          1.104 MessageQueueService::MessageQueueService(
 166 kumpf          1.131     const char* name,
 167                          Uint32 queueID,
 168                          Uint32 capabilities,
 169                          Uint32 mask)
 170                          : Base(name, true,  queueID),
 171                            _mask(mask),
 172                            _die(0),
 173                            _threads(0),
 174                            _incoming(),
 175                            _incoming_queue_shutdown(0)
 176                      {
 177                          _capabilities = (capabilities | module_capabilities::async);
 178                      
 179                          _default_op_timeout.tv_sec = 30;
 180                          _default_op_timeout.tv_usec = 100;
 181                      
 182                          max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
 183                      
 184                          // if requested thread max is out of range, then set to
 185                          // MAX_THREADS_PER_SVC_QUEUE_LIMIT
 186                      
 187 kumpf          1.131     if ((max_threads_per_svc_queue < 1) ||
 188                              (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
 189                          {
 190                              max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
 191                          }
 192                      
 193 marek          1.132     PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 194                             "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue));
 195 kumpf          1.131 
 196                          AutoMutex autoMut(_meta_dispatcher_mutex);
 197                      
 198                          if (_meta_dispatcher == 0)
 199                          {
 200                              _stop_polling = 0;
 201                              PEGASUS_ASSERT(_service_count.get() == 0);
 202                              _meta_dispatcher = new cimom();
 203                      
 204                              //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
 205                              //   minimum_cnt, maximum_cnt, deallocateWait);
 206                              //
 207                              _thread_pool =
 208                                  new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
 209                          }
 210                          _service_count++;
 211                      
 212                          if (false == register_service(name, _capabilities, _mask))
 213                          {
 214                              MessageLoaderParms parms(
 215                                  "Common.MessageQueueService.UNABLE_TO_REGISTER",
 216 kumpf          1.131             "CIM base message queue service is unable to register with the "
 217                                      "CIMOM dispatcher.");
 218                              throw BindFailedException(parms);
 219                          }
 220 kumpf          1.104 
 221 kumpf          1.131     _get_polling_list()->insert_back(this);
 222 mday           1.1   }
 223                      
 224 mday           1.4   
 225 kumpf          1.104 MessageQueueService::~MessageQueueService()
 226 mday           1.1   {
 227 kumpf          1.131     _die = 1;
 228                      
 229                          // The polling_routine locks the _polling_list while
 230                          // processing the incoming messages for services on the
 231                          // list.  Deleting the service from the _polling_list
 232                          // prior to processing, avoids synchronization issues
 233                          // with the _polling_routine.
 234                      
 235                          // ATTN: added to prevent assertion in List in which the list does not
 236                          // contain this element.
 237                      
 238                          if (_get_polling_list()->contains(this))
 239                              _get_polling_list()->remove(this);
 240                      
 241                          // ATTN: The code for closing the _incoming queue
 242                          // is not working correctly. In OpenPegasus 2.5,
 243                          // execution of the following code is very timing
 244                          // dependent. This needs to be fix.
 245                          // See Bug 4079 for details.
 246                          if (_incoming_queue_shutdown.get() == 0)
 247                          {
 248 kumpf          1.131         _shutdown_incoming_queue();
 249                          }
 250 mday           1.76  
 251 kumpf          1.131     // Wait until all threads processing the messages
 252                          // for this service have completed.
 253                      
 254                          while (_threads.get() > 0)
 255 konrad.r       1.109     {
 256 kumpf          1.131         Threads::yield();
 257                          }
 258                      
 259                          {
 260                              AutoMutex autoMut(_meta_dispatcher_mutex);
 261                              _service_count--;
 262                              if (_service_count.get() == 0)
 263                              {
 264                      
 265                                  _stop_polling++;
 266                                  _polling_sem.signal();
 267                                  if (_polling_thread)
 268                                  {
 269                                      _polling_thread->join();
 270                                      delete _polling_thread;
 271                                      _polling_thread = 0;
 272                                  }
 273                                  _meta_dispatcher->_shutdown_routed_queue();
 274                                  delete _meta_dispatcher;
 275                                  _meta_dispatcher = 0;
 276                      
 277 kumpf          1.131             delete _thread_pool;
 278                                  _thread_pool = 0;
 279                              }
 280                          } // mutex unlocks here
 281                          // Clean up in case there are extra stuff on the queue.
 282                          while (_incoming.count())
 283                          {
 284                              try
 285                              {
 286                                  delete _incoming.dequeue();
 287                              }
 288                              catch (const ListClosed&)
 289                              {
 290                                  // If the list is closed, there is nothing we can do.
 291                                  break;
 292                              }
 293 konrad.r       1.109     }
 294 kumpf          1.104 }
 295 mday           1.1   
 296 kumpf          1.104 void MessageQueueService::_shutdown_incoming_queue()
 297 mday           1.7   {
 298 kumpf          1.131     if (_incoming_queue_shutdown.get() > 0)
 299                              return;
 300                      
 301                          AsyncIoctl *msg = new AsyncIoctl(
 302                              0,
 303                              _queueId,
 304                              _queueId,
 305                              true,
 306                              AsyncIoctl::IO_CLOSE,
 307                              0,
 308                              0);
 309                      
 310                          msg->op = get_op();
 311                          msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 312                          msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
 313                              | ASYNC_OPFLAGS_SIMPLE_STATUS);
 314                          msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 315 kumpf          1.104 
 316 kumpf          1.131     msg->op->_op_dest = this;
 317                          msg->op->_request.reset(msg);
 318                          try
 319                          {
 320                              _incoming.enqueue_wait(msg->op);
 321                              _polling_sem.signal();
 322                          }
 323                          catch (const ListClosed&)
 324                          {
 325 denise.eckstein 1.116         // This means the queue has already been shut-down (happens  when there
 326 kumpf           1.131         // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
 327                               // processed.
 328                               delete msg;
 329                           }
 330                           catch (const Permission&)
 331                           {
 332                               delete msg;
 333                           }
 334 mday            1.7   }
 335                       
 336 mday            1.22  
 337                       
 338 kumpf           1.131 void MessageQueueService::enqueue(Message* msg)
 339 mday            1.22  {
 340 kumpf           1.131     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 341 kumpf           1.28  
 342 kumpf           1.131     Base::enqueue(msg);
 343 kumpf           1.28  
 344 kumpf           1.131     PEG_METHOD_EXIT();
 345 mday            1.22  }
 346                       
 347                       
 348 mike            1.124 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
 349 kumpf           1.131     void* parm)
 350 kumpf           1.103 {
 351 konrad.r        1.107     MessageQueueService* service =
 352 kumpf           1.131         reinterpret_cast<MessageQueueService*>(parm);
 353 konrad.r        1.107     PEGASUS_ASSERT(service != 0);
 354 kumpf           1.103     try
 355                           {
 356 mike            1.118         if (service->_die.get() != 0)
 357 kumpf           1.103         {
 358 denise.eckstein 1.116             service->_threads--;
 359 kumpf           1.131             return 0;
 360 kumpf           1.103         }
 361                               // pull messages off the incoming queue and dispatch them. then
 362                               // check pending messages that are non-blocking
 363                               AsyncOpNode *operation = 0;
 364                       
 365                               // many operations may have been queued.
 366                               do
 367                               {
 368                                   try
 369                                   {
 370 mike            1.120                 operation = service->_incoming.dequeue();
 371 kumpf           1.103             }
 372 kumpf           1.131             catch (ListClosed&)
 373 kumpf           1.103             {
 374 kumpf           1.104                 // ATTN: This appears to be a common loop exit path.
 375 marek           1.132                 //PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 376 kumpf           1.104                 //    "Caught ListClosed exception.  Exiting _req_proc.");
 377 kumpf           1.103                 break;
 378                                   }
 379                       
 380                                   if (operation)
 381                                   {
 382                                      operation->_service_ptr = service;
 383                                      service->_handle_incoming_operation(operation);
 384                                   }
 385                               } while (operation);
 386                           }
 387                           catch (const Exception& e)
 388                           {
 389                               PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 390                                   String("Caught exception: \"") + e.getMessage() +
 391                                       "\".  Exiting _req_proc.");
 392                           }
 393                           catch (...)
 394                           {
 395 marek           1.132         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 396 kumpf           1.103             "Caught unrecognized exception.  Exiting _req_proc.");
 397                           }
 398 konrad.r        1.107     service->_threads--;
 399 kumpf           1.131     return 0;
 400 mday            1.1   }
 401                       
 402 mday            1.43  
 403 kumpf           1.104 void MessageQueueService::_sendwait_callback(
 404 kumpf           1.131     AsyncOpNode* op,
 405                           MessageQueue* q,
 406 kumpf           1.104     void *parm)
 407 mday            1.33  {
 408 kumpf           1.131     op->_client_sem.signal();
 409 mday            1.33  }
 410                       
 411 mday            1.30  
 412                       // callback function is responsible for cleaning up all resources
 413                       // including op, op->_callback_node, and op->_callback_ptr
 414 kumpf           1.131 void MessageQueueService::_handle_async_callback(AsyncOpNode* op)
 415 mday            1.22  {
 416 kumpf           1.131     if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
 417                           {
 418                               Message *msg = op->removeRequest();
 419                               if (msg && (msg->getMask() & MessageMask::ha_async))
 420                               {
 421                                   if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)
 422                                   {
 423                                       AsyncLegacyOperationStart *wrapper =
 424                                           static_cast<AsyncLegacyOperationStart *>(msg);
 425                                       msg = wrapper->get_action();
 426                                       delete wrapper;
 427                                   }
 428                                   else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 429                                   {
 430                                       AsyncModuleOperationStart *wrapper =
 431                                           static_cast<AsyncModuleOperationStart *>(msg);
 432                                       msg = wrapper->get_action();
 433                                       delete wrapper;
 434                                   }
 435                                   else if (msg->getType() == async_messages::ASYNC_OP_START)
 436                                   {
 437 kumpf           1.131                 AsyncOperationStart *wrapper =
 438                                           static_cast<AsyncOperationStart *>(msg);
 439                                       msg = wrapper->get_action();
 440                                       delete wrapper;
 441                                   }
 442                                   delete msg;
 443                               }
 444 mday            1.39  
 445 kumpf           1.131         msg = op->removeResponse();
 446                               if (msg && (msg->getMask() & MessageMask::ha_async))
 447                               {
 448                                   if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)
 449                                   {
 450                                       AsyncLegacyOperationResult *wrapper =
 451                                           static_cast<AsyncLegacyOperationResult *>(msg);
 452                                       msg = wrapper->get_result();
 453                                       delete wrapper;
 454                                   }
 455                                   else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 456                                   {
 457                                       AsyncModuleOperationResult *wrapper =
 458                                           static_cast<AsyncModuleOperationResult *>(msg);
 459                                       msg = wrapper->get_result();
 460                                       delete wrapper;
 461                                   }
 462                               }
 463                               void (*callback)(Message *, void *, void *) = op->__async_callback;
 464                               void *handle = op->_callback_handle;
 465                               void *parm = op->_callback_parameter;
 466 kumpf           1.131         op->release();
 467                               return_op(op);
 468                               callback(msg, handle, parm);
 469                           }
 470                           else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
 471                           {
 472                               // note that _callback_node may be different from op
 473                               // op->_callback_response_q is a "this" pointer we can use for
 474                               // static callback methods
 475                               op->_async_callback(
 476                                   op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 477                           }
 478 mday            1.22  }
 479                       
 480 mday            1.1   
 481 kumpf           1.131 void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation)
 482 mday            1.1   {
 483 kumpf           1.131     if (operation != 0)
 484                           {
 485 kumpf           1.104 
 486                       // ATTN: optimization
 487 mday            1.22  // << Tue Feb 19 14:10:38 2002 mdd >>
 488 kumpf           1.131         operation->lock();
 489 kumpf           1.104 
 490 kumpf           1.131         Message *rq = operation->_request.get();
 491 kumpf           1.104 
 492 mday            1.31  // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 493 kumpf           1.104 // move this to the bottom of the loop when the majority of
 494                       // messages become async messages.
 495 mday            1.31  
 496 kumpf           1.131         // divert legacy messages to handleEnqueue
 497                               if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
 498                               {
 499                                   operation->_request.release();
 500                                   operation->unlock();
 501                                   // delete the op node
 502                                   operation->release();
 503                                   return_op(operation);
 504                       
 505                                   handleEnqueue(rq);
 506                                   return;
 507                               }
 508                       
 509                               if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
 510                                    operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
 511                                   (operation->_state & ASYNC_OPSTATE_COMPLETE))
 512                               {
 513                                   operation->unlock();
 514                                   _handle_async_callback(operation);
 515                               }
 516                               else
 517 kumpf           1.131         {
 518                                   PEGASUS_ASSERT(rq != 0);
 519                                   operation->unlock();
 520                                   _handle_async_request(static_cast<AsyncRequest *>(rq));
 521                               }
 522                           }
 523                           return;
 524 mday            1.1   }
 525                       
 526                       void MessageQueueService::_handle_async_request(AsyncRequest *req)
 527                       {
 528 kumpf           1.131     if (req != 0)
 529                           {
 530                               req->op->processing();
 531                       
 532                               Uint32 type = req->getType();
 533                               if (type == async_messages::HEARTBEAT)
 534                                   handle_heartbeat_request(req);
 535                               else if (type == async_messages::IOCTL)
 536                                   handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 537                               else if (type == async_messages::CIMSERVICE_START)
 538                                   handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 539                               else if (type == async_messages::CIMSERVICE_STOP)
 540                                   handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 541                               else if (type == async_messages::CIMSERVICE_PAUSE)
 542                                   handle_CimServicePause(static_cast<CimServicePause *>(req));
 543                               else if (type == async_messages::CIMSERVICE_RESUME)
 544                                   handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 545                               else if (type == async_messages::ASYNC_OP_START)
 546                                   handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 547                               else
 548                               {
 549 kumpf           1.131             // we don't handle this request message
 550                                   _make_response(req, async_results::CIM_NAK);
 551                               }
 552                           }
 553 mday            1.1   }
 554                       
 555 mday            1.17  
 556                       Boolean MessageQueueService::_enqueueResponse(
 557 kumpf           1.131     Message* request,
 558                           Message* response)
 559 mday            1.17  {
 560 kumpf           1.131     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 561                               "MessageQueueService::_enqueueResponse");
 562 mday            1.25  
 563 kumpf           1.131     if (request->getMask() & MessageMask::ha_async)
 564                           {
 565                               if (response->getMask() & MessageMask::ha_async)
 566                               {
 567                                   _completeAsyncResponse(
 568                                       static_cast<AsyncRequest *>(request),
 569                                       static_cast<AsyncReply *>(response),
 570                                       ASYNC_OPSTATE_COMPLETE, 0);
 571                                   PEG_METHOD_EXIT();
 572                                   return true;
 573                               }
 574                           }
 575 mday            1.63  
 576 kumpf           1.131     if (request->_async != 0)
 577                           {
 578                               Uint32 mask = request->_async->getMask();
 579                               PEGASUS_ASSERT(mask &
 580                                   (MessageMask::ha_async | MessageMask::ha_request));
 581                       
 582                               AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 583                               AsyncOpNode *op = async->op;
 584                               request->_async = 0;
 585                               // the legacy request is going to be deleted by its handler
 586                               // remove it from the op node
 587                       
 588                               static_cast<AsyncLegacyOperationStart *>(async)->get_action();
 589                       
 590                               AsyncLegacyOperationResult *async_result =
 591                                   new AsyncLegacyOperationResult(
 592                                       op,
 593                                       response);
 594                               _completeAsyncResponse(
 595                                   async,
 596                                   async_result,
 597 kumpf           1.131             ASYNC_OPSTATE_COMPLETE,
 598                                   0);
 599                               PEG_METHOD_EXIT();
 600                               return true;
 601                           }
 602 kumpf           1.104 
 603 kumpf           1.131     // ensure that the destination queue is in response->dest
 604                           PEG_METHOD_EXIT();
 605                           return SendForget(response);
 606 mday            1.17  }
 607                       
 608 kumpf           1.131 void MessageQueueService::_make_response(Message* req, Uint32 code)
 609 mday            1.1   {
 610 kumpf           1.131     cimom::_make_response(req, code);
 611 mday            1.1   }
 612                       
 613                       
 614 kumpf           1.104 void MessageQueueService::_completeAsyncResponse(
 615 kumpf           1.131     AsyncRequest* request,
 616                           AsyncReply* reply,
 617 kumpf           1.104     Uint32 state,
 618                           Uint32 flag)
 619 mday            1.5   {
 620 kumpf           1.131     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 621                               "MessageQueueService::_completeAsyncResponse");
 622 kumpf           1.37  
 623 kumpf           1.131     cimom::_completeAsyncResponse(request, reply, state, flag);
 624 kumpf           1.37  
 625 kumpf           1.131     PEG_METHOD_EXIT();
 626 mday            1.5   }
 627                       
 628                       
 629 kumpf           1.104 void MessageQueueService::_complete_op_node(
 630 kumpf           1.131     AsyncOpNode* op,
 631 kumpf           1.104     Uint32 state,
 632                           Uint32 flag,
 633                           Uint32 code)
 634 mday            1.32  {
 635 kumpf           1.131     cimom::_complete_op_node(op, state, flag, code);
 636 mday            1.32  }
 637                       
 638 mday            1.5   
 639 kumpf           1.131 Boolean MessageQueueService::accept_async(AsyncOpNode* op)
 640 mday            1.5   {
 641 kumpf           1.131     if (_incoming_queue_shutdown.get() > 0)
 642                               return false;
 643                           if (_polling_thread == NULL)
 644                           {
 645                               _polling_thread = new Thread(
 646                                   polling_routine,
 647                                   reinterpret_cast<void *>(_get_polling_list()),
 648                                   false);
 649                               ThreadStatus tr = PEGASUS_THREAD_OK;
 650                               while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
 651                               {
 652                                   if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
 653                                       Threads::yield();
 654                                   else
 655                                       throw Exception(MessageLoaderParms(
 656                                           "Common.MessageQueueService.NOT_ENOUGH_THREAD",
 657                                           "Could not allocate thread for the polling thread."));
 658                               }
 659                           }
 660 kumpf           1.104 // ATTN optimization remove the message checking altogether in the base
 661 mday            1.20  // << Mon Feb 18 14:02:20 2002 mdd >>
 662 kumpf           1.131     op->lock();
 663                           Message *rq = op->_request.get();
 664                           Message *rp = op->_response.get();
 665                           op->unlock();
 666 kumpf           1.104 
 667 kumpf           1.131     if ((rq != 0 && (true == messageOK(rq))) ||
 668                               (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
 669                           {
 670                               _incoming.enqueue_wait(op);
 671                               _polling_sem.signal();
 672                               return true;
 673                           }
 674                           return false;
 675 mday            1.5   }
 676                       
 677 kumpf           1.131 Boolean MessageQueueService::messageOK(const Message* msg)
 678 mday            1.5   {
 679 kumpf           1.131     if (_incoming_queue_shutdown.get() > 0)
 680                               return false;
 681                           return true;
 682 mday            1.18  }
 683                       
 684 kumpf           1.131 void MessageQueueService::handle_heartbeat_request(AsyncRequest* req)
 685 mday            1.1   {
 686 kumpf           1.131     // default action is to echo a heartbeat response
 687 kumpf           1.104 
 688 kumpf           1.131     AsyncReply *reply = new AsyncReply(
 689                               async_messages::HEARTBEAT,
 690                               0,
 691                               req->op,
 692                               async_results::OK,
 693                               req->resp,
 694                               false);
 695                           _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
 696 mday            1.1   }
 697                       
 698                       
 699 kumpf           1.131 void MessageQueueService::handle_heartbeat_reply(AsyncReply* rep)
 700 kumpf           1.104 {
 701 mday            1.1   }
 702 kumpf           1.104 
 703 kumpf           1.131 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req)
 704 mday            1.1   {
 705 kumpf           1.131     switch (req->ctl)
 706                           {
 707                               case AsyncIoctl::IO_CLOSE:
 708                               {
 709                                   MessageQueueService *service =
 710                                       static_cast<MessageQueueService *>(req->op->_service_ptr);
 711 kumpf           1.81  
 712                       #ifdef MESSAGEQUEUESERVICE_DEBUG
 713 kumpf           1.131             PEGASUS_STD(cout) << service->getQueueName() <<
 714                                       " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
 715 kumpf           1.81  #endif
 716 kumpf           1.104 
 717 kumpf           1.131             // respond to this message. this is fire and forget, so we
 718                                   // don't need to delete anything.
 719                                   // this takes care of two problems that were being found
 720                                   // << Thu Oct  9 10:52:48 2003 mdd >>
 721                                   _make_response(req, async_results::OK);
 722                                   // ensure we do not accept any further messages
 723 kumpf           1.104 
 724 kumpf           1.131             // ensure we don't recurse on IO_CLOSE
 725                                   if (_incoming_queue_shutdown.get() > 0)
 726                                       break;
 727 kumpf           1.104 
 728 kumpf           1.131             // set the closing flag
 729                                   service->_incoming_queue_shutdown = 1;
 730                                   // empty out the queue
 731                                   while (1)
 732 kumpf           1.104             {
 733 kumpf           1.131                 AsyncOpNode *operation;
 734                                       try
 735                                       {
 736                                           operation = service->_incoming.dequeue();
 737                                       }
 738                                       catch (IPCException&)
 739                                       {
 740                                           break;
 741                                       }
 742                                       if (operation)
 743                                       {
 744                                           operation->_service_ptr = service;
 745                                           service->_handle_incoming_operation(operation);
 746                                       }
 747                                       else
 748                                           break;
 749                                   } // message processing loop
 750                       
 751                                   // shutdown the AsyncQueue
 752                                   service->_incoming.close();
 753                                   return;
 754 kumpf           1.131         }
 755 kumpf           1.104 
 756 kumpf           1.131         default:
 757                                   _make_response(req, async_results::CIM_NAK);
 758                           }
 759 mday            1.1   }
 760 mday            1.8   
 761 kumpf           1.131 void MessageQueueService::handle_CimServiceStart(CimServiceStart* req)
 762 mday            1.1   {
 763 kumpf           1.81  #ifdef MESSAGEQUEUESERVICE_DEBUG
 764 kumpf           1.131     PEGASUS_STD(cout) << getQueueName() << "received START" <<
 765                               PEGASUS_STD(endl);
 766 kumpf           1.81  #endif
 767 kumpf           1.104 
 768 kumpf           1.131     // clear the stoped bit and update
 769                           _capabilities &= (~(module_capabilities::stopped));
 770                           _make_response(req, async_results::OK);
 771                           // now tell the meta dispatcher we are stopped
 772                           update_service(_capabilities, _mask);
 773                       }
 774 mday            1.10  
 775 kumpf           1.131 void MessageQueueService::handle_CimServiceStop(CimServiceStop* req)
 776 mday            1.1   {
 777 kumpf           1.81  #ifdef MESSAGEQUEUESERVICE_DEBUG
 778 kumpf           1.131     PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 779 kumpf           1.81  #endif
 780 kumpf           1.131     // set the stopeed bit and update
 781                           _capabilities |= module_capabilities::stopped;
 782                           _make_response(req, async_results::CIM_STOPPED);
 783                           // now tell the meta dispatcher we are stopped
 784                           update_service(_capabilities, _mask);
 785 mday            1.1   }
 786 kumpf           1.104 
 787 kumpf           1.131 void MessageQueueService::handle_CimServicePause(CimServicePause* req)
 788 mday            1.1   {
 789 kumpf           1.131     // set the paused bit and update
 790                           _capabilities |= module_capabilities::paused;
 791                           update_service(_capabilities, _mask);
 792                           _make_response(req, async_results::CIM_PAUSED);
 793                           // now tell the meta dispatcher we are stopped
 794 mday            1.1   }
 795 kumpf           1.104 
 796 kumpf           1.131 void MessageQueueService::handle_CimServiceResume(CimServiceResume* req)
 797 mday            1.1   {
 798 kumpf           1.131     // clear the paused  bit and update
 799                           _capabilities &= (~(module_capabilities::paused));
 800                           update_service(_capabilities, _mask);
 801                           _make_response(req, async_results::OK);
 802                           // now tell the meta dispatcher we are stopped
 803 mday            1.1   }
 804 kumpf           1.104 
 805 kumpf           1.131 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart* req)
 806 mday            1.1   {
 807 kumpf           1.131     _make_response(req, async_results::CIM_NAK);
 808 mday            1.1   }
 809                       
 810 kumpf           1.131 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult* req)
 811 mday            1.1   {
 812 mday            1.14  }
 813                       
 814 mday            1.10  
 815 kumpf           1.131 void MessageQueueService::handle_AsyncLegacyOperationStart(
 816                           AsyncLegacyOperationStart* req)
 817 mday            1.14  {
 818 kumpf           1.131     // remove the legacy message from the request and enqueue it to its
 819                           // destination
 820                           Uint32 result = async_results::CIM_NAK;
 821 kumpf           1.104 
 822 kumpf           1.131     Message* legacy = req->_act;
 823                           if (legacy != 0)
 824                           {
 825                               MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 826                               if (queue != 0)
 827                               {
 828                                   if (queue->isAsync() == true)
 829                                   {
 830                                       (static_cast<MessageQueueService *>(queue))->handleEnqueue(
 831                                           legacy);
 832                                   }
 833                                   else
 834                                   {
 835                                       // Enqueue the response:
 836                                       queue->enqueue(req->get_action());
 837                                   }
 838 kumpf           1.104 
 839 kumpf           1.131             result = async_results::OK;
 840                               }
 841                           }
 842                           _make_response(req, result);
 843 mday            1.14  }
 844                       
 845 kumpf           1.131 void MessageQueueService::handle_AsyncLegacyOperationResult(
 846                           AsyncLegacyOperationResult* rep)
 847 mday            1.14  {
 848 mday            1.1   }
 849                       
 850 kumpf           1.131 AsyncOpNode* MessageQueueService::get_op()
 851 mday            1.1   {
 852 kumpf           1.131    AsyncOpNode* op = new AsyncOpNode();
 853 kumpf           1.104 
 854 mday            1.9      op->_state = ASYNC_OPSTATE_UNKNOWN;
 855                          op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 856 kumpf           1.104 
 857 mday            1.1      return op;
 858                       }
 859                       
 860 kumpf           1.131 void MessageQueueService::return_op(AsyncOpNode* op)
 861 mday            1.1   {
 862 kumpf           1.131     PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);
 863                           delete op;
 864 mday            1.1   }
 865                       
 866 mday            1.18  
 867 kumpf           1.104 Boolean MessageQueueService::SendAsync(
 868 kumpf           1.131     AsyncOpNode* op,
 869 kumpf           1.104     Uint32 destination,
 870 kumpf           1.131     void (*callback)(AsyncOpNode*, MessageQueue*, void*),
 871                           MessageQueue* callback_response_q,
 872                           void* callback_ptr)
 873                       {
 874                           PEGASUS_ASSERT(op != 0 && callback != 0);
 875                       
 876                           // get the queue handle for the destination
 877                       
 878                           op->lock();
 879                           // destination of this message
 880                           op->_op_dest = MessageQueue::lookup(destination);
 881                           op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 882                           op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 883                           // initialize the callback data
 884                           // callback function to be executed by recpt. of response
 885                           op->_async_callback = callback;
 886                           // the op node
 887                           op->_callback_node = op;
 888                           // the queue that will receive the response
 889                           op->_callback_response_q = callback_response_q;
 890                           // user data for callback
 891 kumpf           1.131     op->_callback_ptr = callback_ptr;
 892                           // I am the originator of this request
 893                           op->_callback_request_q = this;
 894                       
 895                           op->unlock();
 896                           if (op->_op_dest == 0)
 897                               return false;
 898 kumpf           1.104 
 899 kumpf           1.131     return  _meta_dispatcher->route_async(op);
 900 mday            1.18  }
 901                       
 902                       
 903 kumpf           1.104 Boolean MessageQueueService::SendAsync(
 904 kumpf           1.131     Message* msg,
 905 kumpf           1.104     Uint32 destination,
 906 kumpf           1.131     void (*callback)(Message* response, void* handle, void* parameter),
 907                           void* handle,
 908                           void* parameter)
 909                       {
 910                           if (msg == NULL)
 911                               return false;
 912                           if (callback == NULL)
 913                               return SendForget(msg);
 914                           AsyncOpNode *op = get_op();
 915                           msg->dest = destination;
 916                           if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 917                           {
 918                               op->release();
 919                               return_op(op);
 920                               return false;
 921                           }
 922                           op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 923                           op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 924                           op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 925                           op->__async_callback = callback;
 926                           op->_callback_node = op;
 927 kumpf           1.131     op->_callback_handle = handle;
 928                           op->_callback_parameter = parameter;
 929                           op->_callback_response_q = this;
 930                       
 931                           if (!(msg->getMask() & MessageMask::ha_async))
 932                           {
 933                               AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
 934                                   op,
 935                                   destination,
 936                                   msg,
 937                                   destination);
 938                           }
 939                           else
 940                           {
 941                               op->_request.reset(msg);
 942                               (static_cast<AsyncMessage *>(msg))->op = op;
 943                           }
 944                           return _meta_dispatcher->route_async(op);
 945                       }
 946                       
 947                       
 948 kumpf           1.131 Boolean MessageQueueService::SendForget(Message* msg)
 949                       {
 950                           AsyncOpNode* op = 0;
 951                           Uint32 mask = msg->getMask();
 952                       
 953                           if (mask & MessageMask::ha_async)
 954                           {
 955                               op = (static_cast<AsyncMessage *>(msg))->op;
 956                           }
 957                       
 958                           if (op == 0)
 959                           {
 960                               op = get_op();
 961                               op->_request.reset(msg);
 962                               if (mask & MessageMask::ha_async)
 963                               {
 964                                   (static_cast<AsyncMessage *>(msg))->op = op;
 965                               }
 966                           }
 967                           op->_op_dest = MessageQueue::lookup(msg->dest);
 968                           op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 969 kumpf           1.131     op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
 970                               | ASYNC_OPFLAGS_SIMPLE_STATUS);
 971                           op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 972                           if (op->_op_dest == 0)
 973                           {
 974                               op->release();
 975                               return_op(op);
 976                               return false;
 977                           }
 978                       
 979                           // now see if the meta dispatcher will take it
 980                           return  _meta_dispatcher->route_async(op);
 981                       }
 982                       
 983                       
 984                       AsyncReply *MessageQueueService::SendWait(AsyncRequest* request)
 985                       {
 986                           if (request == 0)
 987                               return 0;
 988                       
 989                           Boolean destroy_op = false;
 990 kumpf           1.131 
 991                           if (request->op == 0)
 992                           {
 993                               request->op = get_op();
 994                               request->op->_request.reset(request);
 995                               destroy_op = true;
 996                           }
 997                       
 998                           request->block = false;
 999                           request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
1000                           SendAsync(
1001                               request->op,
1002                               request->dest,
1003                               _sendwait_callback,
1004                               this,
1005                               (void *)0);
1006                       
1007                           request->op->_client_sem.wait();
1008                       
1009                           AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse());
1010                           rpl->op = 0;
1011 kumpf           1.131 
1012                           if (destroy_op == true)
1013                           {
1014                               request->op->lock();
1015                               request->op->_request.release();
1016                               request->op->_state |= ASYNC_OPSTATE_RELEASED;
1017                               request->op->unlock();
1018                               return_op(request->op);
1019                               request->op = 0;
1020                           }
1021                           return rpl;
1022 mday            1.1   }
1023                       
1024                       
1025 kumpf           1.104 Boolean MessageQueueService::register_service(
1026                           String name,
1027                           Uint32 capabilities,
1028                           Uint32 mask)
1029                       {
1030 kumpf           1.131     RegisterCimService *msg = new RegisterCimService(
1031                               0,
1032                               true,
1033                               name,
1034                               capabilities,
1035                               mask,
1036                               _queueId);
1037                           msg->dest = CIMOM_Q_ID;
1038                       
1039                           Boolean registered = false;
1040                           AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1041                       
1042                           if (reply != 0)
1043                           {
1044                               if (reply->getMask() & MessageMask::ha_async)
1045                               {
1046                                   if (reply->getMask() & MessageMask::ha_reply)
1047 kumpf           1.104             {
1048 kumpf           1.131                 if (reply->result == async_results::OK ||
1049                                           reply->result == async_results::MODULE_ALREADY_REGISTERED)
1050                                       {
1051                                           registered = true;
1052                                       }
1053 kumpf           1.104             }
1054 kumpf           1.131         }
1055 kumpf           1.104 
1056 kumpf           1.131         delete reply;
1057                           }
1058                           delete msg;
1059                           return registered;
1060 mday            1.1   }
1061                       
1062                       Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1063                       {
1064 kumpf           1.131     UpdateCimService *msg = new UpdateCimService(
1065                               0,
1066                               true,
1067                               _queueId,
1068                               _capabilities,
1069                               _mask);
1070                           Boolean registered = false;
1071                       
1072                           AsyncMessage* reply = SendWait(msg);
1073                           if (reply)
1074                           {
1075                               if (reply->getMask() & MessageMask::ha_async)
1076                               {
1077                                   if (reply->getMask() & MessageMask::ha_reply)
1078 kumpf           1.104             {
1079 kumpf           1.131                 if (static_cast<AsyncReply *>(reply)->result ==
1080                                               async_results::OK)
1081                                       {
1082                                           registered = true;
1083                                       }
1084 kumpf           1.104             }
1085 kumpf           1.131         }
1086                               delete reply;
1087                           }
1088                           delete msg;
1089                           return registered;
1090 mday            1.1   }
1091                       
1092                       
1093 kumpf           1.104 Boolean MessageQueueService::deregister_service()
1094 mday            1.1   {
1095 kumpf           1.131     _meta_dispatcher->deregister_module(_queueId);
1096                           return true;
1097 mday            1.1   }
1098                       
1099                       
1100 kumpf           1.104 void MessageQueueService::find_services(
1101                           String name,
1102                           Uint32 capabilities,
1103                           Uint32 mask,
1104 kumpf           1.131     Array<Uint32>* results)
1105 mday            1.1   {
1106 kumpf           1.131     if (results == 0)
1107                           {
1108                               throw NullPointer();
1109                           }
1110                       
1111                           results->clear();
1112                       
1113                           FindServiceQueue *req = new FindServiceQueue(
1114                               0,
1115                               _queueId,
1116                               true,
1117                               name,
1118                               capabilities,
1119                               mask);
1120                       
1121                           req->dest = CIMOM_Q_ID;
1122                       
1123                           AsyncMessage *reply = SendWait(req);
1124                           if (reply)
1125                           {
1126                               if (reply->getMask() & MessageMask::ha_async)
1127 kumpf           1.131         {
1128                                   if (reply->getMask() & MessageMask::ha_reply)
1129 kumpf           1.104             {
1130 kumpf           1.131                 if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1131                                       {
1132                                           if ((static_cast<FindServiceQueueResult*>(reply))->result ==
1133                                                   async_results::OK)
1134                                               *results =
1135                                                   (static_cast<FindServiceQueueResult*>(reply))->qids;
1136                                       }
1137 kumpf           1.104             }
1138 kumpf           1.131         }
1139                               delete reply;
1140                           }
1141                           delete req;
1142                       }
1143                       
1144                       void MessageQueueService::enumerate_service(
1145                           Uint32 queue,
1146                           message_module* result)
1147                       {
1148                           if (result == 0)
1149                           {
1150                               throw NullPointer();
1151                           }
1152                       
1153                           EnumerateService *req = new EnumerateService(
1154                               0,
1155                               _queueId,
1156                               true,
1157                               queue);
1158                       
1159 kumpf           1.131     AsyncMessage* reply = SendWait(req);
1160                       
1161                           if (reply)
1162                           {
1163                               Boolean found = false;
1164                       
1165                               if (reply->getMask() & MessageMask::ha_async)
1166                               {
1167                                   if (reply->getMask() & MessageMask::ha_reply)
1168 kumpf           1.104             {
1169 kumpf           1.131                 if (reply->getType() ==
1170                                               async_messages::ENUMERATE_SERVICE_RESULT)
1171                                       {
1172                                           if ((static_cast<EnumerateServiceResponse*>(reply))->
1173                                                   result == async_results::OK)
1174                                           {
1175                                               if (found == false)
1176                                               {
1177                                                   found = true;
1178                       
1179                                                   result->put_name((static_cast<
1180                                                       EnumerateServiceResponse*>(reply))->name);
1181                                                   result->put_capabilities((static_cast<
1182                                                       EnumerateServiceResponse*>(reply))->
1183                                                           capabilities);
1184                                                   result->put_mask((static_cast<
1185                                                       EnumerateServiceResponse*>(reply))->mask);
1186                                                   result->put_queue((static_cast<
1187                                                       EnumerateServiceResponse*>(reply))->qid);
1188                                               }
1189                                           }
1190 kumpf           1.131                 }
1191 kumpf           1.104             }
1192 kumpf           1.131         }
1193                               delete reply;
1194                           }
1195                           delete req;
1196 mday            1.1   }
1197                       
1198 mike            1.120 MessageQueueService::PollingList* MessageQueueService::_get_polling_list()
1199                       {
1200                           _polling_list_mutex.lock();
1201                       
1202                           if (!_polling_list)
1203 kumpf           1.131         _polling_list = new PollingList;
1204 mike            1.120 
1205                           _polling_list_mutex.unlock();
1206 kumpf           1.104 
1207 mike            1.120     return _polling_list;
1208 mday            1.1   }
1209                       
1210                       PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2