(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.119 //%2006////////////////////////////////////////////////////////////////////////
   2 mday  1.1   //
   3 karl  1.88  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6 karl  1.82  // IBM Corp.; EMC Corporation, The Open Group.
   7 karl  1.88  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9 karl  1.91  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 karl  1.119 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  12             // EMC Corporation; Symantec Corporation; The Open Group.
  13 mday  1.1   //
  14             // Permission is hereby granted, free of charge, to any person obtaining a copy
  15             // of this software and associated documentation files (the "Software"), to
  16             // deal in the Software without restriction, including without limitation the
  17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  18             // sell copies of the Software, and to permit persons to whom the Software is
  19             // furnished to do so, subject to the following conditions:
  20 karl  1.82  // 
  21 mday  1.1   // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  22             // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  24             // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  27             // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29             //
  30             //==============================================================================
  31             //
  32             //%/////////////////////////////////////////////////////////////////////////////
  33             
  34             #include "MessageQueueService.h"
  35 mday  1.22  #include <Pegasus/Common/Tracer.h>
  36 kumpf 1.126 #include <Pegasus/Common/MessageLoader.h>
  37 mday  1.1   
  38             PEGASUS_NAMESPACE_BEGIN
  39             
  40 mday  1.15  cimom *MessageQueueService::_meta_dispatcher = 0;
  41 mike  1.118 AtomicInt MessageQueueService::_service_count(0);
  42 mday  1.38  Mutex MessageQueueService::_meta_dispatcher_mutex;
  43 mday  1.15  
  44 kumpf 1.104 static struct timeval deallocateWait = {300, 0};
  45 mday  1.53  
  46 mday  1.55  ThreadPool *MessageQueueService::_thread_pool = 0;
  47 mday  1.53  
  48 mike  1.120 MessageQueueService::PollingList* MessageQueueService::_polling_list;
  49             Mutex MessageQueueService::_polling_list_mutex;
  50 mday  1.53  
  51 kumpf 1.62  Thread* MessageQueueService::_polling_thread = 0;
  52 mday  1.61  
  53 kumpf 1.104 ThreadPool *MessageQueueService::get_thread_pool()
  54 mday  1.69  {
  55                return _thread_pool;
  56             }
  57 kumpf 1.117 
  58 jim.wunderlich 1.110 //
  59 kumpf          1.117 // MAX_THREADS_PER_SVC_QUEUE
  60 jim.wunderlich 1.110 //
  61                      // JR Wunderlich Jun 6, 2005
  62                      //
  63                      
  64 kumpf          1.131 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
  65 jim.wunderlich 1.114 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
  66 jim.wunderlich 1.110 
  67 kumpf          1.117 #ifndef MAX_THREADS_PER_SVC_QUEUE
  68                      # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
  69                      #endif
  70                      
  71 jim.wunderlich 1.110 Uint32 max_threads_per_svc_queue;
  72 mday           1.69  
  73 kumpf          1.131 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(
  74                          void* parm)
  75 mday           1.53  {
  76 kumpf          1.131     Thread *myself = reinterpret_cast<Thread *>(parm);
  77                          List<MessageQueueService, Mutex> *list =
  78                              reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm());
  79                      
  80                          while (_stop_polling.get()  == 0)
  81                          {
  82                              _polling_sem.wait();
  83                      
  84                              if (_stop_polling.get() != 0)
  85                              {
  86                                  break;
  87                              }
  88                      
  89                              // The polling_routine thread must hold the lock on the
  90                              // _polling_list while processing incoming messages.
  91                              // This lock is used to give this thread ownership of
  92                              // services on the _polling_routine list.
  93                      
  94                              // This is necessary to avoid confict with other threads
  95                              // processing the _polling_list
  96                              // (e.g., MessageQueueServer::~MessageQueueService).
  97 kumpf          1.131 
  98                              list->lock();
  99                              MessageQueueService *service = list->front();
 100                              ThreadStatus rtn = PEGASUS_THREAD_OK;
 101                              while (service != NULL)
 102                              {
 103                                  if ((service->_incoming.count() > 0) &&
 104                                      (service->_die.get() == 0) &&
 105                                      (service->_threads.get() < max_threads_per_svc_queue))
 106                                  {
 107                                      // The _threads count is used to track the
 108                                      // number of active threads that have been allocated
 109                                      // to process messages for this service.
 110                      
 111                                      // The _threads count MUST be incremented while
 112                                      // the polling_routine owns the _polling_thread
 113                                      // lock and has ownership of the service object.
 114                      
 115                                      service->_threads++;
 116                                      try
 117                                      {
 118 kumpf          1.131                     rtn = _thread_pool->allocate_and_awaken(
 119                                              service, _req_proc, &_polling_sem);
 120                                      }
 121                                      catch (...)
 122                                      {
 123                                          service->_threads--;
 124                      
 125                                          // allocate_and_awaken should never generate an exception.
 126                                          PEGASUS_ASSERT(0);
 127                                      }
 128                                      // if no more threads available, break from processing loop
 129                                      if (rtn != PEGASUS_THREAD_OK )
 130                                      {
 131                                          service->_threads--;
 132 marek          1.137                     PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL1,
 133 kumpf          1.131                         "Could not allocate thread for %s.  Queue has %d "
 134                                                  "messages waiting and %d threads servicing."
 135                                                  "Skipping the service for right now. ",
 136                                              service->getQueueName(),
 137                                              service->_incoming.count(),
 138 marek          1.132                         service->_threads.get()));
 139 kumpf          1.131 
 140                                          Threads::yield();
 141                                          service = NULL;
 142                                      }
 143                                  }
 144                                  if (service != NULL)
 145                                  {
 146                                      service = list->next_of(service);
 147                                  }
 148                              }
 149                              list->unlock();
 150                          }
 151 kumpf          1.135     return ThreadReturnType(0);
 152 mday           1.53  }
 153                      
 154                      
 155                      Semaphore MessageQueueService::_polling_sem(0);
 156                      AtomicInt MessageQueueService::_stop_polling(0);
 157                      
 158 mday           1.15  
 159 kumpf          1.104 MessageQueueService::MessageQueueService(
 160 kumpf          1.131     const char* name,
 161                          Uint32 queueID,
 162                          Uint32 capabilities,
 163                          Uint32 mask)
 164                          : Base(name, true,  queueID),
 165                            _mask(mask),
 166                            _die(0),
 167                            _threads(0),
 168                            _incoming(),
 169                            _incoming_queue_shutdown(0)
 170                      {
 171                          _capabilities = (capabilities | module_capabilities::async);
 172                      
 173                          _default_op_timeout.tv_sec = 30;
 174                          _default_op_timeout.tv_usec = 100;
 175                      
 176                          max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
 177                      
 178                          // if requested thread max is out of range, then set to
 179                          // MAX_THREADS_PER_SVC_QUEUE_LIMIT
 180                      
 181 kumpf          1.131     if ((max_threads_per_svc_queue < 1) ||
 182                              (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
 183                          {
 184                              max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
 185                          }
 186                      
 187 marek          1.138     PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
 188 marek          1.132        "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue));
 189 kumpf          1.131 
 190                          AutoMutex autoMut(_meta_dispatcher_mutex);
 191                      
 192                          if (_meta_dispatcher == 0)
 193                          {
 194                              _stop_polling = 0;
 195                              PEGASUS_ASSERT(_service_count.get() == 0);
 196                              _meta_dispatcher = new cimom();
 197                      
 198                              //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
 199                              //   minimum_cnt, maximum_cnt, deallocateWait);
 200                              //
 201                              _thread_pool =
 202                                  new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
 203                          }
 204                          _service_count++;
 205                      
 206                          if (false == register_service(name, _capabilities, _mask))
 207                          {
 208                              MessageLoaderParms parms(
 209                                  "Common.MessageQueueService.UNABLE_TO_REGISTER",
 210 kumpf          1.131             "CIM base message queue service is unable to register with the "
 211                                      "CIMOM dispatcher.");
 212 kumpf          1.139         throw Exception(parms);
 213 kumpf          1.131     }
 214 kumpf          1.104 
 215 kumpf          1.131     _get_polling_list()->insert_back(this);
 216 mday           1.1   }
 217                      
 218 mday           1.4   
 219 kumpf          1.104 MessageQueueService::~MessageQueueService()
 220 mday           1.1   {
 221 kumpf          1.131     _die = 1;
 222                      
 223                          // The polling_routine locks the _polling_list while
 224                          // processing the incoming messages for services on the
 225                          // list.  Deleting the service from the _polling_list
 226                          // prior to processing, avoids synchronization issues
 227                          // with the _polling_routine.
 228                      
 229                          // ATTN: added to prevent assertion in List in which the list does not
 230                          // contain this element.
 231                      
 232                          if (_get_polling_list()->contains(this))
 233                              _get_polling_list()->remove(this);
 234                      
 235                          // ATTN: The code for closing the _incoming queue
 236                          // is not working correctly. In OpenPegasus 2.5,
 237                          // execution of the following code is very timing
 238                          // dependent. This needs to be fix.
 239                          // See Bug 4079 for details.
 240                          if (_incoming_queue_shutdown.get() == 0)
 241                          {
 242 kumpf          1.131         _shutdown_incoming_queue();
 243                          }
 244 mday           1.76  
 245 kumpf          1.131     // Wait until all threads processing the messages
 246                          // for this service have completed.
 247                      
 248                          while (_threads.get() > 0)
 249 konrad.r       1.109     {
 250 kumpf          1.131         Threads::yield();
 251                          }
 252                      
 253                          {
 254                              AutoMutex autoMut(_meta_dispatcher_mutex);
 255                              _service_count--;
 256                              if (_service_count.get() == 0)
 257                              {
 258                      
 259                                  _stop_polling++;
 260                                  _polling_sem.signal();
 261                                  if (_polling_thread)
 262                                  {
 263                                      _polling_thread->join();
 264                                      delete _polling_thread;
 265                                      _polling_thread = 0;
 266                                  }
 267                                  _meta_dispatcher->_shutdown_routed_queue();
 268                                  delete _meta_dispatcher;
 269                                  _meta_dispatcher = 0;
 270                      
 271 kumpf          1.131             delete _thread_pool;
 272                                  _thread_pool = 0;
 273                              }
 274                          } // mutex unlocks here
 275                          // Clean up in case there are extra stuff on the queue.
 276                          while (_incoming.count())
 277                          {
 278                              try
 279                              {
 280                                  delete _incoming.dequeue();
 281                              }
 282                              catch (const ListClosed&)
 283                              {
 284                                  // If the list is closed, there is nothing we can do.
 285                                  break;
 286                              }
 287 konrad.r       1.109     }
 288 kumpf          1.104 }
 289 mday           1.1   
 290 kumpf          1.104 void MessageQueueService::_shutdown_incoming_queue()
 291 mday           1.7   {
 292 kumpf          1.131     if (_incoming_queue_shutdown.get() > 0)
 293                              return;
 294                      
 295                          AsyncIoctl *msg = new AsyncIoctl(
 296                              0,
 297                              _queueId,
 298                              _queueId,
 299                              true,
 300                              AsyncIoctl::IO_CLOSE,
 301                              0,
 302                              0);
 303                      
 304                          msg->op = get_op();
 305                          msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 306                          msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
 307                              | ASYNC_OPFLAGS_SIMPLE_STATUS);
 308                          msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 309 kumpf          1.104 
 310 kumpf          1.131     msg->op->_op_dest = this;
 311                          msg->op->_request.reset(msg);
 312                          try
 313                          {
 314 kumpf          1.140         _incoming.enqueue(msg->op);
 315 kumpf          1.131         _polling_sem.signal();
 316                          }
 317                          catch (const ListClosed&)
 318                          {
 319 denise.eckstein 1.116         // This means the queue has already been shut-down (happens  when there
 320 kumpf           1.131         // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
 321                               // processed.
 322                               delete msg;
 323                           }
 324                           catch (const Permission&)
 325                           {
 326                               delete msg;
 327                           }
 328 mday            1.7   }
 329                       
 330 mday            1.22  
 331                       
 332 kumpf           1.131 void MessageQueueService::enqueue(Message* msg)
 333 mday            1.22  {
 334 kumpf           1.131     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 335 kumpf           1.28  
 336 kumpf           1.131     Base::enqueue(msg);
 337 kumpf           1.28  
 338 kumpf           1.131     PEG_METHOD_EXIT();
 339 mday            1.22  }
 340                       
 341                       
 342 mike            1.124 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
 343 kumpf           1.131     void* parm)
 344 kumpf           1.103 {
 345 konrad.r        1.107     MessageQueueService* service =
 346 kumpf           1.131         reinterpret_cast<MessageQueueService*>(parm);
 347 konrad.r        1.107     PEGASUS_ASSERT(service != 0);
 348 kumpf           1.103     try
 349                           {
 350 mike            1.118         if (service->_die.get() != 0)
 351 kumpf           1.103         {
 352 denise.eckstein 1.116             service->_threads--;
 353 kumpf           1.131             return 0;
 354 kumpf           1.103         }
 355                               // pull messages off the incoming queue and dispatch them. then
 356                               // check pending messages that are non-blocking
 357                               AsyncOpNode *operation = 0;
 358                       
 359                               // many operations may have been queued.
 360                               do
 361                               {
 362                                   try
 363                                   {
 364 mike            1.120                 operation = service->_incoming.dequeue();
 365 kumpf           1.103             }
 366 kumpf           1.131             catch (ListClosed&)
 367 kumpf           1.103             {
 368 kumpf           1.104                 // ATTN: This appears to be a common loop exit path.
 369 marek           1.132                 //PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 370 kumpf           1.104                 //    "Caught ListClosed exception.  Exiting _req_proc.");
 371 kumpf           1.103                 break;
 372                                   }
 373                       
 374                                   if (operation)
 375                                   {
 376                                      operation->_service_ptr = service;
 377                                      service->_handle_incoming_operation(operation);
 378                                   }
 379                               } while (operation);
 380                           }
 381                           catch (const Exception& e)
 382                           {
 383 marek           1.138         PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
 384 kumpf           1.103             String("Caught exception: \"") + e.getMessage() +
 385                                       "\".  Exiting _req_proc.");
 386                           }
 387                           catch (...)
 388                           {
 389 marek           1.138         PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
 390 kumpf           1.103             "Caught unrecognized exception.  Exiting _req_proc.");
 391                           }
 392 konrad.r        1.107     service->_threads--;
 393 kumpf           1.131     return 0;
 394 mday            1.1   }
 395                       
 396 mday            1.43  
 397 kumpf           1.104 void MessageQueueService::_sendwait_callback(
 398 kumpf           1.131     AsyncOpNode* op,
 399                           MessageQueue* q,
 400 kumpf           1.104     void *parm)
 401 mday            1.33  {
 402 kumpf           1.131     op->_client_sem.signal();
 403 mday            1.33  }
 404                       
 405 mday            1.30  
 406                       // callback function is responsible for cleaning up all resources
 407                       // including op, op->_callback_node, and op->_callback_ptr
 408 kumpf           1.131 void MessageQueueService::_handle_async_callback(AsyncOpNode* op)
 409 mday            1.22  {
 410 kumpf           1.131     if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
 411                           {
 412                               Message *msg = op->removeRequest();
 413                               if (msg && (msg->getMask() & MessageMask::ha_async))
 414                               {
 415 kumpf           1.133             if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_START)
 416 kumpf           1.131             {
 417                                       AsyncLegacyOperationStart *wrapper =
 418                                           static_cast<AsyncLegacyOperationStart *>(msg);
 419                                       msg = wrapper->get_action();
 420                                       delete wrapper;
 421                                   }
 422 kumpf           1.133             else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_START)
 423 kumpf           1.131             {
 424                                       AsyncModuleOperationStart *wrapper =
 425                                           static_cast<AsyncModuleOperationStart *>(msg);
 426                                       msg = wrapper->get_action();
 427                                       delete wrapper;
 428                                   }
 429 kumpf           1.133             else if (msg->getType() == ASYNC_ASYNC_OP_START)
 430 kumpf           1.131             {
 431                                       AsyncOperationStart *wrapper =
 432                                           static_cast<AsyncOperationStart *>(msg);
 433                                       msg = wrapper->get_action();
 434                                       delete wrapper;
 435                                   }
 436                                   delete msg;
 437                               }
 438 mday            1.39  
 439 kumpf           1.131         msg = op->removeResponse();
 440                               if (msg && (msg->getMask() & MessageMask::ha_async))
 441                               {
 442 kumpf           1.133             if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_RESULT)
 443 kumpf           1.131             {
 444                                       AsyncLegacyOperationResult *wrapper =
 445                                           static_cast<AsyncLegacyOperationResult *>(msg);
 446                                       msg = wrapper->get_result();
 447                                       delete wrapper;
 448                                   }
 449 kumpf           1.133             else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_RESULT)
 450 kumpf           1.131             {
 451                                       AsyncModuleOperationResult *wrapper =
 452                                           static_cast<AsyncModuleOperationResult *>(msg);
 453                                       msg = wrapper->get_result();
 454                                       delete wrapper;
 455                                   }
 456                               }
 457                               void (*callback)(Message *, void *, void *) = op->__async_callback;
 458                               void *handle = op->_callback_handle;
 459                               void *parm = op->_callback_parameter;
 460                               op->release();
 461                               return_op(op);
 462                               callback(msg, handle, parm);
 463                           }
 464                           else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
 465                           {
 466                               // note that _callback_node may be different from op
 467                               // op->_callback_response_q is a "this" pointer we can use for
 468                               // static callback methods
 469                               op->_async_callback(
 470                                   op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 471 kumpf           1.131     }
 472 mday            1.22  }
 473                       
 474 mday            1.1   
 475 kumpf           1.131 void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation)
 476 mday            1.1   {
 477 kumpf           1.131     if (operation != 0)
 478                           {
 479 kumpf           1.104 
 480                       // ATTN: optimization
 481 mday            1.22  // << Tue Feb 19 14:10:38 2002 mdd >>
 482 kumpf           1.131         operation->lock();
 483 kumpf           1.104 
 484 kumpf           1.131         Message *rq = operation->_request.get();
 485 kumpf           1.104 
 486 mday            1.31  // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 487 kumpf           1.104 // move this to the bottom of the loop when the majority of
 488                       // messages become async messages.
 489 mday            1.31  
 490 kumpf           1.131         // divert legacy messages to handleEnqueue
 491                               if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
 492                               {
 493                                   operation->_request.release();
 494                                   operation->unlock();
 495                                   // delete the op node
 496                                   operation->release();
 497                                   return_op(operation);
 498                       
 499                                   handleEnqueue(rq);
 500                                   return;
 501                               }
 502                       
 503                               if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
 504                                    operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
 505                                   (operation->_state & ASYNC_OPSTATE_COMPLETE))
 506                               {
 507                                   operation->unlock();
 508                                   _handle_async_callback(operation);
 509                               }
 510                               else
 511 kumpf           1.131         {
 512                                   PEGASUS_ASSERT(rq != 0);
 513                                   operation->unlock();
 514                                   _handle_async_request(static_cast<AsyncRequest *>(rq));
 515                               }
 516                           }
 517                           return;
 518 mday            1.1   }
 519                       
 520                       void MessageQueueService::_handle_async_request(AsyncRequest *req)
 521                       {
 522 kumpf           1.131     if (req != 0)
 523                           {
 524                               req->op->processing();
 525                       
 526 kumpf           1.133         MessageType type = req->getType();
 527                               if (type == ASYNC_HEARTBEAT)
 528 kumpf           1.131             handle_heartbeat_request(req);
 529 kumpf           1.133         else if (type == ASYNC_IOCTL)
 530 kumpf           1.131             handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 531 kumpf           1.133         else if (type == ASYNC_CIMSERVICE_START)
 532 kumpf           1.131             handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 533 kumpf           1.133         else if (type == ASYNC_CIMSERVICE_STOP)
 534 kumpf           1.131             handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 535 kumpf           1.133         else if (type == ASYNC_CIMSERVICE_PAUSE)
 536 kumpf           1.131             handle_CimServicePause(static_cast<CimServicePause *>(req));
 537 kumpf           1.133         else if (type == ASYNC_CIMSERVICE_RESUME)
 538 kumpf           1.131             handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 539 kumpf           1.133         else if (type == ASYNC_ASYNC_OP_START)
 540 kumpf           1.131             handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 541                               else
 542                               {
 543                                   // we don't handle this request message
 544                                   _make_response(req, async_results::CIM_NAK);
 545                               }
 546                           }
 547 mday            1.1   }
 548                       
 549 mday            1.17  
 550                       Boolean MessageQueueService::_enqueueResponse(
 551 kumpf           1.131     Message* request,
 552                           Message* response)
 553 mday            1.17  {
 554 kumpf           1.131     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 555                               "MessageQueueService::_enqueueResponse");
 556 mday            1.25  
 557 kumpf           1.131     if (request->getMask() & MessageMask::ha_async)
 558                           {
 559                               if (response->getMask() & MessageMask::ha_async)
 560                               {
 561                                   _completeAsyncResponse(
 562                                       static_cast<AsyncRequest *>(request),
 563                                       static_cast<AsyncReply *>(response),
 564                                       ASYNC_OPSTATE_COMPLETE, 0);
 565                                   PEG_METHOD_EXIT();
 566                                   return true;
 567                               }
 568                           }
 569 mday            1.63  
 570 kumpf           1.134     AsyncRequest* asyncRequest =
 571                               static_cast<AsyncRequest*>(request->get_async());
 572                       
 573                           if (asyncRequest != 0)
 574 kumpf           1.131     {
 575 kumpf           1.134         PEGASUS_ASSERT(asyncRequest->getMask() &
 576 kumpf           1.131             (MessageMask::ha_async | MessageMask::ha_request));
 577                       
 578 kumpf           1.134         AsyncOpNode* op = asyncRequest->op;
 579                       
 580 kumpf           1.131         // the legacy request is going to be deleted by its handler
 581                               // remove it from the op node
 582                       
 583 kumpf           1.134         static_cast<AsyncLegacyOperationStart *>(asyncRequest)->get_action();
 584 kumpf           1.131 
 585                               AsyncLegacyOperationResult *async_result =
 586                                   new AsyncLegacyOperationResult(
 587                                       op,
 588                                       response);
 589                               _completeAsyncResponse(
 590 kumpf           1.134             asyncRequest,
 591 kumpf           1.131             async_result,
 592                                   ASYNC_OPSTATE_COMPLETE,
 593                                   0);
 594                               PEG_METHOD_EXIT();
 595                               return true;
 596                           }
 597 kumpf           1.104 
 598 kumpf           1.131     // ensure that the destination queue is in response->dest
 599                           PEG_METHOD_EXIT();
 600                           return SendForget(response);
 601 mday            1.17  }
 602                       
 603 kumpf           1.131 void MessageQueueService::_make_response(Message* req, Uint32 code)
 604 mday            1.1   {
 605 kumpf           1.131     cimom::_make_response(req, code);
 606 mday            1.1   }
 607                       
 608                       
 609 kumpf           1.104 void MessageQueueService::_completeAsyncResponse(
 610 kumpf           1.131     AsyncRequest* request,
 611                           AsyncReply* reply,
 612 kumpf           1.104     Uint32 state,
 613                           Uint32 flag)
 614 mday            1.5   {
 615 kumpf           1.131     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 616                               "MessageQueueService::_completeAsyncResponse");
 617 kumpf           1.37  
 618 kumpf           1.131     cimom::_completeAsyncResponse(request, reply, state, flag);
 619 kumpf           1.37  
 620 kumpf           1.131     PEG_METHOD_EXIT();
 621 mday            1.5   }
 622                       
 623                       
 624 kumpf           1.104 void MessageQueueService::_complete_op_node(
 625 kumpf           1.131     AsyncOpNode* op,
 626 kumpf           1.104     Uint32 state,
 627                           Uint32 flag,
 628                           Uint32 code)
 629 mday            1.32  {
 630 kumpf           1.131     cimom::_complete_op_node(op, state, flag, code);
 631 mday            1.32  }
 632                       
 633 mday            1.5   
 634 kumpf           1.131 Boolean MessageQueueService::accept_async(AsyncOpNode* op)
 635 mday            1.5   {
 636 kumpf           1.131     if (_incoming_queue_shutdown.get() > 0)
 637                               return false;
 638                           if (_polling_thread == NULL)
 639                           {
 640                               _polling_thread = new Thread(
 641                                   polling_routine,
 642                                   reinterpret_cast<void *>(_get_polling_list()),
 643                                   false);
 644                               ThreadStatus tr = PEGASUS_THREAD_OK;
 645                               while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
 646                               {
 647                                   if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
 648                                       Threads::yield();
 649                                   else
 650                                       throw Exception(MessageLoaderParms(
 651                                           "Common.MessageQueueService.NOT_ENOUGH_THREAD",
 652                                           "Could not allocate thread for the polling thread."));
 653                               }
 654                           }
 655 kumpf           1.104 // ATTN optimization remove the message checking altogether in the base
 656 mday            1.20  // << Mon Feb 18 14:02:20 2002 mdd >>
 657 kumpf           1.131     op->lock();
 658                           Message *rq = op->_request.get();
 659                           Message *rp = op->_response.get();
 660                           op->unlock();
 661 kumpf           1.104 
 662 kumpf           1.131     if ((rq != 0 && (true == messageOK(rq))) ||
 663                               (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
 664                           {
 665 kumpf           1.140         _incoming.enqueue(op);
 666 kumpf           1.131         _polling_sem.signal();
 667                               return true;
 668                           }
 669                           return false;
 670 mday            1.5   }
 671                       
 672 kumpf           1.131 Boolean MessageQueueService::messageOK(const Message* msg)
 673 mday            1.5   {
 674 kumpf           1.131     if (_incoming_queue_shutdown.get() > 0)
 675                               return false;
 676                           return true;
 677 mday            1.18  }
 678                       
 679 kumpf           1.131 void MessageQueueService::handle_heartbeat_request(AsyncRequest* req)
 680 mday            1.1   {
 681 kumpf           1.131     // default action is to echo a heartbeat response
 682 kumpf           1.104 
 683 kumpf           1.131     AsyncReply *reply = new AsyncReply(
 684 kumpf           1.133         ASYNC_HEARTBEAT,
 685 kumpf           1.131         0,
 686                               req->op,
 687                               async_results::OK,
 688                               req->resp,
 689                               false);
 690                           _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
 691 mday            1.1   }
 692                       
 693                       
 694 kumpf           1.131 void MessageQueueService::handle_heartbeat_reply(AsyncReply* rep)
 695 kumpf           1.104 {
 696 mday            1.1   }
 697 kumpf           1.104 
 698 kumpf           1.131 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req)
 699 mday            1.1   {
 700 kumpf           1.131     switch (req->ctl)
 701                           {
 702                               case AsyncIoctl::IO_CLOSE:
 703                               {
 704                                   MessageQueueService *service =
 705                                       static_cast<MessageQueueService *>(req->op->_service_ptr);
 706 kumpf           1.81  
 707                       #ifdef MESSAGEQUEUESERVICE_DEBUG
 708 kumpf           1.131             PEGASUS_STD(cout) << service->getQueueName() <<
 709                                       " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
 710 kumpf           1.81  #endif
 711 kumpf           1.104 
 712 kumpf           1.131             // respond to this message. this is fire and forget, so we
 713                                   // don't need to delete anything.
 714                                   // this takes care of two problems that were being found
 715                                   // << Thu Oct  9 10:52:48 2003 mdd >>
 716                                   _make_response(req, async_results::OK);
 717                                   // ensure we do not accept any further messages
 718 kumpf           1.104 
 719 kumpf           1.131             // ensure we don't recurse on IO_CLOSE
 720                                   if (_incoming_queue_shutdown.get() > 0)
 721                                       break;
 722 kumpf           1.104 
 723 kumpf           1.131             // set the closing flag
 724                                   service->_incoming_queue_shutdown = 1;
 725                                   // empty out the queue
 726                                   while (1)
 727 kumpf           1.104             {
 728 kumpf           1.131                 AsyncOpNode *operation;
 729                                       try
 730                                       {
 731                                           operation = service->_incoming.dequeue();
 732                                       }
 733                                       catch (IPCException&)
 734                                       {
 735                                           break;
 736                                       }
 737                                       if (operation)
 738                                       {
 739                                           operation->_service_ptr = service;
 740                                           service->_handle_incoming_operation(operation);
 741                                       }
 742                                       else
 743                                           break;
 744                                   } // message processing loop
 745                       
 746                                   // shutdown the AsyncQueue
 747                                   service->_incoming.close();
 748                                   return;
 749 kumpf           1.131         }
 750 kumpf           1.104 
 751 kumpf           1.131         default:
 752                                   _make_response(req, async_results::CIM_NAK);
 753                           }
 754 mday            1.1   }
 755 mday            1.8   
 756 kumpf           1.131 void MessageQueueService::handle_CimServiceStart(CimServiceStart* req)
 757 mday            1.1   {
 758 kumpf           1.81  #ifdef MESSAGEQUEUESERVICE_DEBUG
 759 kumpf           1.131     PEGASUS_STD(cout) << getQueueName() << "received START" <<
 760                               PEGASUS_STD(endl);
 761 kumpf           1.81  #endif
 762 kumpf           1.104 
 763 kumpf           1.131     // clear the stoped bit and update
 764                           _capabilities &= (~(module_capabilities::stopped));
 765                           _make_response(req, async_results::OK);
 766                           // now tell the meta dispatcher we are stopped
 767                           update_service(_capabilities, _mask);
 768                       }
 769 mday            1.10  
 770 kumpf           1.131 void MessageQueueService::handle_CimServiceStop(CimServiceStop* req)
 771 mday            1.1   {
 772 kumpf           1.81  #ifdef MESSAGEQUEUESERVICE_DEBUG
 773 kumpf           1.131     PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 774 kumpf           1.81  #endif
 775 kumpf           1.131     // set the stopeed bit and update
 776                           _capabilities |= module_capabilities::stopped;
 777                           _make_response(req, async_results::CIM_STOPPED);
 778                           // now tell the meta dispatcher we are stopped
 779                           update_service(_capabilities, _mask);
 780 mday            1.1   }
 781 kumpf           1.104 
 782 kumpf           1.131 void MessageQueueService::handle_CimServicePause(CimServicePause* req)
 783 mday            1.1   {
 784 kumpf           1.131     // set the paused bit and update
 785                           _capabilities |= module_capabilities::paused;
 786                           update_service(_capabilities, _mask);
 787                           _make_response(req, async_results::CIM_PAUSED);
 788                           // now tell the meta dispatcher we are stopped
 789 mday            1.1   }
 790 kumpf           1.104 
 791 kumpf           1.131 void MessageQueueService::handle_CimServiceResume(CimServiceResume* req)
 792 mday            1.1   {
 793 kumpf           1.131     // clear the paused  bit and update
 794                           _capabilities &= (~(module_capabilities::paused));
 795                           update_service(_capabilities, _mask);
 796                           _make_response(req, async_results::OK);
 797                           // now tell the meta dispatcher we are stopped
 798 mday            1.1   }
 799 kumpf           1.104 
 800 kumpf           1.131 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart* req)
 801 mday            1.1   {
 802 kumpf           1.131     _make_response(req, async_results::CIM_NAK);
 803 mday            1.1   }
 804                       
 805 kumpf           1.131 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult* req)
 806 mday            1.1   {
 807 mday            1.14  }
 808                       
 809 mday            1.10  
 810 kumpf           1.131 void MessageQueueService::handle_AsyncLegacyOperationStart(
 811                           AsyncLegacyOperationStart* req)
 812 mday            1.14  {
 813 kumpf           1.131     // remove the legacy message from the request and enqueue it to its
 814                           // destination
 815                           Uint32 result = async_results::CIM_NAK;
 816 kumpf           1.104 
 817 kumpf           1.131     Message* legacy = req->_act;
 818                           if (legacy != 0)
 819                           {
 820                               MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 821                               if (queue != 0)
 822                               {
 823                                   if (queue->isAsync() == true)
 824                                   {
 825                                       (static_cast<MessageQueueService *>(queue))->handleEnqueue(
 826                                           legacy);
 827                                   }
 828                                   else
 829                                   {
 830                                       // Enqueue the response:
 831                                       queue->enqueue(req->get_action());
 832                                   }
 833 kumpf           1.104 
 834 kumpf           1.131             result = async_results::OK;
 835                               }
 836                           }
 837                           _make_response(req, result);
 838 mday            1.14  }
 839                       
 840 kumpf           1.131 void MessageQueueService::handle_AsyncLegacyOperationResult(
 841                           AsyncLegacyOperationResult* rep)
 842 mday            1.14  {
 843 mday            1.1   }
 844                       
 845 kumpf           1.131 AsyncOpNode* MessageQueueService::get_op()
 846 mday            1.1   {
 847 kumpf           1.131    AsyncOpNode* op = new AsyncOpNode();
 848 kumpf           1.104 
 849 mday            1.9      op->_state = ASYNC_OPSTATE_UNKNOWN;
 850                          op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 851 kumpf           1.104 
 852 mday            1.1      return op;
 853                       }
 854                       
 855 kumpf           1.131 void MessageQueueService::return_op(AsyncOpNode* op)
 856 mday            1.1   {
 857 kumpf           1.131     PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);
 858                           delete op;
 859 mday            1.1   }
 860                       
 861 mday            1.18  
 862 kumpf           1.104 Boolean MessageQueueService::SendAsync(
 863 kumpf           1.131     AsyncOpNode* op,
 864 kumpf           1.104     Uint32 destination,
 865 kumpf           1.131     void (*callback)(AsyncOpNode*, MessageQueue*, void*),
 866                           MessageQueue* callback_response_q,
 867                           void* callback_ptr)
 868                       {
 869                           PEGASUS_ASSERT(op != 0 && callback != 0);
 870                       
 871                           // get the queue handle for the destination
 872                       
 873                           op->lock();
 874                           // destination of this message
 875                           op->_op_dest = MessageQueue::lookup(destination);
 876                           op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 877                           op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 878                           // initialize the callback data
 879                           // callback function to be executed by recpt. of response
 880                           op->_async_callback = callback;
 881                           // the op node
 882                           op->_callback_node = op;
 883                           // the queue that will receive the response
 884                           op->_callback_response_q = callback_response_q;
 885                           // user data for callback
 886 kumpf           1.131     op->_callback_ptr = callback_ptr;
 887                           // I am the originator of this request
 888                           op->_callback_request_q = this;
 889                       
 890                           op->unlock();
 891                           if (op->_op_dest == 0)
 892                               return false;
 893 kumpf           1.104 
 894 kumpf           1.131     return  _meta_dispatcher->route_async(op);
 895 mday            1.18  }
 896                       
 897                       
 898 kumpf           1.104 Boolean MessageQueueService::SendAsync(
 899 kumpf           1.131     Message* msg,
 900 kumpf           1.104     Uint32 destination,
 901 kumpf           1.131     void (*callback)(Message* response, void* handle, void* parameter),
 902                           void* handle,
 903                           void* parameter)
 904                       {
 905                           if (msg == NULL)
 906                               return false;
 907                           if (callback == NULL)
 908                               return SendForget(msg);
 909                           AsyncOpNode *op = get_op();
 910                           msg->dest = destination;
 911                           if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 912                           {
 913                               op->release();
 914                               return_op(op);
 915                               return false;
 916                           }
 917                           op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 918                           op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 919                           op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 920                           op->__async_callback = callback;
 921                           op->_callback_node = op;
 922 kumpf           1.131     op->_callback_handle = handle;
 923                           op->_callback_parameter = parameter;
 924                           op->_callback_response_q = this;
 925                       
 926                           if (!(msg->getMask() & MessageMask::ha_async))
 927                           {
 928                               AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
 929                                   op,
 930                                   destination,
 931                                   msg,
 932                                   destination);
 933                           }
 934                           else
 935                           {
 936                               op->_request.reset(msg);
 937                               (static_cast<AsyncMessage *>(msg))->op = op;
 938                           }
 939                           return _meta_dispatcher->route_async(op);
 940                       }
 941                       
 942                       
 943 kumpf           1.131 Boolean MessageQueueService::SendForget(Message* msg)
 944                       {
 945                           AsyncOpNode* op = 0;
 946                           Uint32 mask = msg->getMask();
 947                       
 948                           if (mask & MessageMask::ha_async)
 949                           {
 950                               op = (static_cast<AsyncMessage *>(msg))->op;
 951                           }
 952                       
 953                           if (op == 0)
 954                           {
 955                               op = get_op();
 956                               op->_request.reset(msg);
 957                               if (mask & MessageMask::ha_async)
 958                               {
 959                                   (static_cast<AsyncMessage *>(msg))->op = op;
 960                               }
 961                           }
 962                           op->_op_dest = MessageQueue::lookup(msg->dest);
 963                           op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 964 kumpf           1.131     op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
 965                               | ASYNC_OPFLAGS_SIMPLE_STATUS);
 966                           op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 967                           if (op->_op_dest == 0)
 968                           {
 969                               op->release();
 970                               return_op(op);
 971                               return false;
 972                           }
 973                       
 974                           // now see if the meta dispatcher will take it
 975                           return  _meta_dispatcher->route_async(op);
 976                       }
 977                       
 978                       
 979                       AsyncReply *MessageQueueService::SendWait(AsyncRequest* request)
 980                       {
 981                           if (request == 0)
 982                               return 0;
 983                       
 984                           Boolean destroy_op = false;
 985 kumpf           1.131 
 986                           if (request->op == 0)
 987                           {
 988                               request->op = get_op();
 989                               request->op->_request.reset(request);
 990                               destroy_op = true;
 991                           }
 992                       
 993                           request->block = false;
 994                           request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
 995                           SendAsync(
 996                               request->op,
 997                               request->dest,
 998                               _sendwait_callback,
 999                               this,
1000                               (void *)0);
1001                       
1002                           request->op->_client_sem.wait();
1003                       
1004                           AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse());
1005                           rpl->op = 0;
1006 kumpf           1.131 
1007                           if (destroy_op == true)
1008                           {
1009                               request->op->lock();
1010                               request->op->_request.release();
1011                               request->op->_state |= ASYNC_OPSTATE_RELEASED;
1012                               request->op->unlock();
1013                               return_op(request->op);
1014                               request->op = 0;
1015                           }
1016                           return rpl;
1017 mday            1.1   }
1018                       
1019                       
1020 kumpf           1.104 Boolean MessageQueueService::register_service(
1021                           String name,
1022                           Uint32 capabilities,
1023                           Uint32 mask)
1024                       {
1025 kumpf           1.131     RegisterCimService *msg = new RegisterCimService(
1026                               0,
1027                               true,
1028                               name,
1029                               capabilities,
1030                               mask,
1031                               _queueId);
1032                           msg->dest = CIMOM_Q_ID;
1033                       
1034                           Boolean registered = false;
1035                           AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1036                       
1037                           if (reply != 0)
1038                           {
1039                               if (reply->getMask() & MessageMask::ha_async)
1040                               {
1041                                   if (reply->getMask() & MessageMask::ha_reply)
1042 kumpf           1.104             {
1043 kumpf           1.131                 if (reply->result == async_results::OK ||
1044                                           reply->result == async_results::MODULE_ALREADY_REGISTERED)
1045                                       {
1046                                           registered = true;
1047                                       }
1048 kumpf           1.104             }
1049 kumpf           1.131         }
1050 kumpf           1.104 
1051 kumpf           1.131         delete reply;
1052                           }
1053                           delete msg;
1054                           return registered;
1055 mday            1.1   }
1056                       
1057                       Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1058                       {
1059 kumpf           1.131     UpdateCimService *msg = new UpdateCimService(
1060                               0,
1061                               true,
1062                               _queueId,
1063                               _capabilities,
1064                               _mask);
1065                           Boolean registered = false;
1066                       
1067                           AsyncMessage* reply = SendWait(msg);
1068                           if (reply)
1069                           {
1070                               if (reply->getMask() & MessageMask::ha_async)
1071                               {
1072                                   if (reply->getMask() & MessageMask::ha_reply)
1073 kumpf           1.104             {
1074 kumpf           1.131                 if (static_cast<AsyncReply *>(reply)->result ==
1075                                               async_results::OK)
1076                                       {
1077                                           registered = true;
1078                                       }
1079 kumpf           1.104             }
1080 kumpf           1.131         }
1081                               delete reply;
1082                           }
1083                           delete msg;
1084                           return registered;
1085 mday            1.1   }
1086                       
1087                       
1088 kumpf           1.104 Boolean MessageQueueService::deregister_service()
1089 mday            1.1   {
1090 kumpf           1.131     _meta_dispatcher->deregister_module(_queueId);
1091                           return true;
1092 mday            1.1   }
1093                       
1094                       
1095 kumpf           1.104 void MessageQueueService::find_services(
1096                           String name,
1097                           Uint32 capabilities,
1098                           Uint32 mask,
1099 kumpf           1.131     Array<Uint32>* results)
1100 mday            1.1   {
1101 kumpf           1.131     if (results == 0)
1102                           {
1103                               throw NullPointer();
1104                           }
1105                       
1106                           results->clear();
1107                       
1108                           FindServiceQueue *req = new FindServiceQueue(
1109                               0,
1110                               _queueId,
1111                               true,
1112                               name,
1113                               capabilities,
1114                               mask);
1115                       
1116                           req->dest = CIMOM_Q_ID;
1117                       
1118                           AsyncMessage *reply = SendWait(req);
1119                           if (reply)
1120                           {
1121                               if (reply->getMask() & MessageMask::ha_async)
1122 kumpf           1.131         {
1123                                   if (reply->getMask() & MessageMask::ha_reply)
1124 kumpf           1.104             {
1125 kumpf           1.133                 if (reply->getType() == ASYNC_FIND_SERVICE_Q_RESULT)
1126 kumpf           1.131                 {
1127                                           if ((static_cast<FindServiceQueueResult*>(reply))->result ==
1128                                                   async_results::OK)
1129                                               *results =
1130                                                   (static_cast<FindServiceQueueResult*>(reply))->qids;
1131                                       }
1132 kumpf           1.104             }
1133 kumpf           1.131         }
1134                               delete reply;
1135                           }
1136                           delete req;
1137                       }
1138                       
1139                       void MessageQueueService::enumerate_service(
1140                           Uint32 queue,
1141                           message_module* result)
1142                       {
1143                           if (result == 0)
1144                           {
1145                               throw NullPointer();
1146                           }
1147                       
1148                           EnumerateService *req = new EnumerateService(
1149                               0,
1150                               _queueId,
1151                               true,
1152                               queue);
1153                       
1154 kumpf           1.131     AsyncMessage* reply = SendWait(req);
1155                       
1156                           if (reply)
1157                           {
1158                               Boolean found = false;
1159                       
1160                               if (reply->getMask() & MessageMask::ha_async)
1161                               {
1162                                   if (reply->getMask() & MessageMask::ha_reply)
1163 kumpf           1.104             {
1164 kumpf           1.133                 if (reply->getType() == ASYNC_ENUMERATE_SERVICE_RESULT)
1165 kumpf           1.131                 {
1166                                           if ((static_cast<EnumerateServiceResponse*>(reply))->
1167                                                   result == async_results::OK)
1168                                           {
1169                                               if (found == false)
1170                                               {
1171                                                   found = true;
1172                       
1173                                                   result->put_name((static_cast<
1174                                                       EnumerateServiceResponse*>(reply))->name);
1175                                                   result->put_capabilities((static_cast<
1176                                                       EnumerateServiceResponse*>(reply))->
1177                                                           capabilities);
1178                                                   result->put_mask((static_cast<
1179                                                       EnumerateServiceResponse*>(reply))->mask);
1180                                                   result->put_queue((static_cast<
1181                                                       EnumerateServiceResponse*>(reply))->qid);
1182                                               }
1183                                           }
1184                                       }
1185 kumpf           1.104             }
1186 kumpf           1.131         }
1187                               delete reply;
1188                           }
1189                           delete req;
1190 mday            1.1   }
1191                       
1192 mike            1.120 MessageQueueService::PollingList* MessageQueueService::_get_polling_list()
1193                       {
1194                           _polling_list_mutex.lock();
1195                       
1196                           if (!_polling_list)
1197 kumpf           1.131         _polling_list = new PollingList;
1198 mike            1.120 
1199                           _polling_list_mutex.unlock();
1200 kumpf           1.104 
1201 mike            1.120     return _polling_list;
1202 mday            1.1   }
1203                       
1204                       PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2