(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.119 //%2006////////////////////////////////////////////////////////////////////////
   2 mday  1.1   //
   3 karl  1.88  // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4             // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5             // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6 karl  1.82  // IBM Corp.; EMC Corporation, The Open Group.
   7 karl  1.88  // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8             // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9 karl  1.91  // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10             // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 karl  1.119 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  12             // EMC Corporation; Symantec Corporation; The Open Group.
  13 mday  1.1   //
  14             // Permission is hereby granted, free of charge, to any person obtaining a copy
  15             // of this software and associated documentation files (the "Software"), to
  16             // deal in the Software without restriction, including without limitation the
  17             // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  18             // sell copies of the Software, and to permit persons to whom the Software is
  19             // furnished to do so, subject to the following conditions:
  20 karl  1.82  // 
  21 mday  1.1   // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  22             // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  23             // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  24             // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  25             // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  26             // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  27             // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  28             // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  29             //
  30             //==============================================================================
  31             //
  32             // Author: Mike Day (mdday@us.ibm.com)
  33             //
  34             // Modified By:
  35 a.arora 1.93  //              Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090,#2657
  36 joyce.j 1.101 //              Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#3259
  37 jim.wunderlich 1.114 //              Jim Wunderlich (Jim_Wunderlich@prodigy.net)
  38 mday           1.1   //
  39                      //%/////////////////////////////////////////////////////////////////////////////
  40                      
  41 jim.wunderlich 1.111 // #include <iostream.h>
  42 mday           1.1   #include "MessageQueueService.h"
  43 mday           1.22  #include <Pegasus/Common/Tracer.h>
  44 humberto       1.71  #include <Pegasus/Common/MessageLoader.h> //l10n
  45 mday           1.1   
  46                      PEGASUS_NAMESPACE_BEGIN
  47                      
  48 mday           1.15  cimom *MessageQueueService::_meta_dispatcher = 0;
  49 mike           1.118 AtomicInt MessageQueueService::_service_count(0);
  50 kumpf          1.119.12.2 Mutex MessageQueueService::_xidMutex;
  51                           Uint32 MessageQueueService::_xid = 1;
  52 mday           1.38       Mutex MessageQueueService::_meta_dispatcher_mutex;
  53 mday           1.15       
  54 kumpf          1.104      static struct timeval deallocateWait = {300, 0};
  55 mday           1.53       
  56 mday           1.55       ThreadPool *MessageQueueService::_thread_pool = 0;
  57 mday           1.53       
  58 mike           1.119.12.3 List<MessageQueueService, RecursiveMutex> MessageQueueService::_polling_list;
  59 mday           1.53       
  60 kumpf          1.62       Thread* MessageQueueService::_polling_thread = 0;
  61 mday           1.61       
  62 kumpf          1.104      ThreadPool *MessageQueueService::get_thread_pool()
  63 mday           1.69       {
  64                              return _thread_pool;
  65                           }
  66 kumpf          1.117      
  67 jim.wunderlich 1.110      //
  68 kumpf          1.117      // MAX_THREADS_PER_SVC_QUEUE
  69 jim.wunderlich 1.110      //
  70                           // JR Wunderlich Jun 6, 2005
  71                           //
  72                           
  73                           #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000 
  74 jim.wunderlich 1.114      #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
  75 jim.wunderlich 1.110      
  76 kumpf          1.117      #ifndef MAX_THREADS_PER_SVC_QUEUE
  77                           # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
  78                           #endif
  79                           
  80 jim.wunderlich 1.110      Uint32 max_threads_per_svc_queue;
  81 mday           1.69       
  82 kumpf          1.104      PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL
  83                           MessageQueueService::kill_idle_threads(void *parm)
  84 mday           1.53       {
  85 mday           1.69       
  86                              static struct timeval now, last = {0,0};
  87 mday           1.53          gettimeofday(&now, NULL);
  88 mday           1.56          int dead_threads = 0;
  89 kumpf          1.104      
  90                              if (now.tv_sec - last.tv_sec > 120)
  91 mday           1.53          {
  92                                 gettimeofday(&last, NULL);
  93 kumpf          1.104            try
  94 mday           1.56             {
  95 kumpf          1.104               dead_threads = MessageQueueService::_thread_pool->cleanupIdleThreads();
  96 mday           1.56             }
  97 mday           1.69             catch(...)
  98 mday           1.56             {
  99                           
 100                                 }
 101 mday           1.53          }
 102 jenny.yu       1.92       
 103 w.otsuka       1.94       #ifdef PEGASUS_POINTER_64BIT
 104 jenny.yu       1.92          return (PEGASUS_THREAD_RETURN)(Uint64)dead_threads;
 105                           #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX
 106                              return (PEGASUS_THREAD_RETURN)(unsigned long)dead_threads;
 107                           #else
 108                              return (PEGASUS_THREAD_RETURN)(Uint32)dead_threads;
 109                           #endif
 110 mday           1.53       }
 111                           
 112                           PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
 113                           {
 114                              Thread *myself = reinterpret_cast<Thread *>(parm);
 115 mike           1.119.12.3    List<MessageQueueService, RecursiveMutex> *list = 
 116                                  reinterpret_cast<List<MessageQueueService, RecursiveMutex>*>(myself->get_parm());
 117 mike           1.119.12.1 
 118 mike           1.118         while (_stop_polling.get()  == 0)
 119 mday           1.53          {
 120 baggio.br      1.100            _polling_sem.wait();
 121                           
 122 mike           1.118            if (_stop_polling.get() != 0)
 123 mday           1.61             {
 124 kumpf          1.62                break;
 125 mday           1.61             }
 126 kumpf          1.104      
 127 denise.eckstein 1.116            // The polling_routine thread must hold the lock on the
 128                                  // _polling_thread list while processing incoming messages.
 129                                  // This lock is used to give this thread ownership of
 130                                  // services on the _polling_routine list.
 131                            
 132                                  // This is necessary to avoid confict with other threads
 133                                  // processing the _polling_list
 134                                  // (e.g., MessageQueueServer::~MessageQueueService).
 135                            
 136 mday            1.53             list->lock();
 137 mike            1.119.12.1       MessageQueueService *service = list->front();
 138 denise.eckstein 1.116            ThreadStatus rtn = PEGASUS_THREAD_OK;
 139                                  while (service != NULL)
 140                                  {
 141                                      if ((service->_incoming.count() > 0) &&
 142 mike            1.118                    (service->_die.get() == 0) &&
 143                                          (service->_threads.get() < max_threads_per_svc_queue))
 144 denise.eckstein 1.116                {
 145                                         // The _threads count is used to track the 
 146                                         // number of active threads that have been allocated
 147                                         // to process messages for this service.
 148                            
 149                                         // The _threads count MUST be incremented while
 150                                         // the polling_routine owns the _polling_thread
 151                                         // lock and has ownership of the service object.
 152                            
 153                                         service->_threads++;
 154                                         try
 155                                         {
 156                                             rtn = _thread_pool->allocate_and_awaken(
 157                                                  service, _req_proc, &_polling_sem);
 158                                         }
 159                                         catch (...)
 160                                         {
 161                                             service->_threads--;
 162                            
 163                                             // allocate_and_awaken should never generate an exception.
 164                                             PEGASUS_ASSERT(0);
 165 denise.eckstein 1.116                   }
 166                                         // if no more threads available, break from processing loop
 167                                         if (rtn != PEGASUS_THREAD_OK )
 168                                         {
 169                                             service->_threads--;
 170                                             Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
 171                                                "Not enough threads to process this request. Skipping.");
 172                            
 173                                             Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 174                                                "Could not allocate thread for %s. " \
 175                                                "Queue has %d messages waiting and %d threads servicing." \
 176                                                "Skipping the service for right now. ",
 177                                                service->getQueueName(),
 178                                                service->_incoming.count(),
 179 mike            1.118                          service->_threads.get());
 180 denise.eckstein 1.116      
 181                                             pegasus_yield();
 182                                             service = NULL;
 183                                          } 
 184                                      }
 185                                      if (service != NULL) 
 186                                      {
 187 mike            1.119.12.1              service = list->next_of(service);
 188 denise.eckstein 1.116                }
 189                                  }
 190 mday            1.53             list->unlock();
 191 jim.wunderlich  1.110      
 192 mike            1.118            if (_check_idle_flag.get() != 0)
 193 mday            1.69             {
 194 kumpf           1.104               _check_idle_flag = 0;
 195 denise.eckstein 1.116               // try to do idle thread clean up processing when system is not busy
 196                                     // if system is busy there may not be a thread available to allocate 
 197                                     // so nothing will be done and that is OK. 
 198 kumpf           1.84       
 199 konrad.r        1.113               if ( _thread_pool->allocate_and_awaken(service, kill_idle_threads, &_polling_sem) != PEGASUS_THREAD_OK)
 200 denise.eckstein 1.116               {
 201                                            Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
 202                                                    "Not enough threads to kill idle threads. What an irony.");
 203 konrad.r        1.113      
 204 denise.eckstein 1.116                      Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 205                                                    "Could not allocate thread to kill idle threads." \
 206                                                    "Skipping. ");
 207                                     }
 208                            
 209                                     
 210 mday            1.69             }
 211 mday            1.53          }
 212                               myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 213                               return(0);
 214                            }
 215                            
 216                            
 217                            Semaphore MessageQueueService::_polling_sem(0);
 218                            AtomicInt MessageQueueService::_stop_polling(0);
 219 mday            1.69       AtomicInt MessageQueueService::_check_idle_flag(0);
 220 mday            1.53       
 221 mday            1.15       
 222 kumpf           1.104      MessageQueueService::MessageQueueService(
 223                               const char *name,
 224                               Uint32 queueID,
 225                               Uint32 capabilities,
 226                               Uint32 mask)
 227 mday            1.15          : Base(name, true,  queueID),
 228 mday            1.1             _mask(mask),
 229 mday            1.4             _die(0),
 230 denise.eckstein 1.116           _threads(0),
 231 mike            1.119.12.1      _incoming(0),
 232 konrad.r        1.106           _incoming_queue_shutdown(0)
 233 kumpf           1.104      {
 234 jim.wunderlich  1.110      
 235 kumpf           1.104         _capabilities = (capabilities | module_capabilities::async);
 236 mday            1.53       
 237 mday            1.1           _default_op_timeout.tv_sec = 30;
 238                               _default_op_timeout.tv_usec = 100;
 239 mday            1.15       
 240 jim.wunderlich  1.110         max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
 241                            
 242 kumpf           1.117         // if requested thread max is out of range, then set to
 243                               // MAX_THREADS_PER_SVC_QUEUE_LIMIT
 244 jim.wunderlich  1.110      
 245 kumpf           1.117         if ((max_threads_per_svc_queue < 1) ||
 246                                   (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
 247                               {
 248 jim.wunderlich  1.110             max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
 249 kumpf           1.117         }
 250 jim.wunderlich  1.110      
 251 kumpf           1.117         Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 252                                  "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue);
 253 jim.wunderlich  1.110      
 254 a.arora         1.87          AutoMutex autoMut(_meta_dispatcher_mutex);
 255 kumpf           1.104      
 256                               if (_meta_dispatcher == 0)
 257 mday            1.15          {
 258 joyce.j         1.101            _stop_polling = 0;
 259 mike            1.118            PEGASUS_ASSERT(_service_count.get() == 0);
 260 mday            1.15             _meta_dispatcher = new cimom();
 261 kumpf           1.104            if (_meta_dispatcher == NULL)
 262 mday            1.15             {
 263 a.arora         1.87                throw NullPointer();
 264 mday            1.15             }
 265 jim.wunderlich  1.110            //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
 266                                  //   minimum_cnt, maximum_cnt, deallocateWait); 
 267                                  //
 268 kumpf           1.99             _thread_pool =
 269 kumpf           1.104                new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
 270 mday            1.15          }
 271                               _service_count++;
 272                            
 273 kumpf           1.104         if (false == register_service(name, _capabilities, _mask))
 274 mday            1.15          {
 275 humberto        1.71             //l10n
 276                                  //throw BindFailedException("MessageQueueService Base Unable to register with  Meta Dispatcher");
 277                                  MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
 278 kumpf           1.104               "MessageQueueService Base Unable to register with  Meta Dispatcher");
 279                            
 280 humberto        1.71             throw BindFailedException(parms);
 281 mday            1.15          }
 282 kumpf           1.104      
 283 mike            1.119.12.1    _polling_list.insert_back(this);
 284 kumpf           1.104      
 285 mday            1.1        }
 286                            
 287 mday            1.4        
 288 kumpf           1.104      MessageQueueService::~MessageQueueService()
 289 mday            1.1        {
 290                               _die = 1;
 291 mday            1.76       
 292 denise.eckstein 1.116         // The polling_routine locks the _polling_list while
 293                               // processing the incoming messages for services on the 
 294                               // list.  Deleting the service from the _polling_list
 295                               // prior to processing, avoids synchronization issues
 296                               // with the _polling_routine.
 297                            
 298 mike            1.119.12.1    // ATTN: added to prevent assertion in List in which the list does not
 299                               // contain this element.
 300 mike            1.119.12.4 
 301                               if (_polling_list.contains(this))
 302                                   _polling_list.remove(this);
 303 denise.eckstein 1.116      
 304                               // ATTN: The code for closing the _incoming queue
 305                               // is not working correctly. In OpenPegasus 2.5,
 306                               // execution of the following code is very timing
 307                               // dependent. This needs to be fix.
 308                               // See Bug 4079 for details. 
 309 mike            1.118         if (_incoming_queue_shutdown.get() == 0)
 310 mday            1.76          {
 311 denise.eckstein 1.116             _shutdown_incoming_queue();
 312                               }
 313 konrad.r        1.107      
 314 denise.eckstein 1.116         // Wait until all threads processing the messages
 315                               // for this service have completed.
 316                            
 317 mike            1.118         while (_threads.get() > 0)
 318 denise.eckstein 1.116         {
 319                                  pegasus_yield();
 320 mday            1.76          }
 321 kumpf           1.104      
 322 mday            1.15          {
 323 a.arora         1.87            AutoMutex autoMut(_meta_dispatcher_mutex);
 324                                 _service_count--;
 325 mike            1.118           if (_service_count.get() == 0)
 326 a.arora         1.87            {
 327 mday            1.76       
 328 mday            1.53             _stop_polling++;
 329                                  _polling_sem.signal();
 330 kumpf           1.104            if (_polling_thread) {
 331                                      _polling_thread->join();
 332                                      delete _polling_thread;
 333                                      _polling_thread = 0;
 334                                  }
 335 mday            1.15             _meta_dispatcher->_shutdown_routed_queue();
 336                                  delete _meta_dispatcher;
 337 kumpf           1.45             _meta_dispatcher = 0;
 338 mday            1.76       
 339 kumpf           1.65             delete _thread_pool;
 340                                  _thread_pool = 0;
 341 a.arora         1.87            }
 342                               } // mutex unlocks here
 343 kumpf           1.104         // Clean up in case there are extra stuff on the queue.
 344 konrad.r        1.72         while (_incoming.count())
 345                              {
 346 konrad.r        1.109          try { 
 347 mike            1.119.12.1       delete _incoming.dequeue();
 348 konrad.r        1.109          } catch (const ListClosed &e)
 349                                {
 350                                  // If the list is closed, there is nothing we can do. 
 351                                  break;
 352                                }
 353 konrad.r        1.72         }
 354 kumpf           1.104      }
 355 mday            1.1        
 356 kumpf           1.104      void MessageQueueService::_shutdown_incoming_queue()
 357 mday            1.7        {
 358 mike            1.118         if (_incoming_queue_shutdown.get() > 0)
 359 kumpf           1.104            return;
 360                            
 361                               AsyncIoctl *msg = new AsyncIoctl(
 362                                  get_next_xid(),
 363                                  0,
 364                                  _queueId,
 365                                  _queueId,
 366                                  true,
 367                                  AsyncIoctl::IO_CLOSE,
 368                                  0,
 369                                  0);
 370 mday            1.9        
 371 mday            1.8           msg->op = get_op();
 372 mday            1.41          msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 373 kumpf           1.104         msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
 374                                   | ASYNC_OPFLAGS_SIMPLE_STATUS);
 375 mday            1.32          msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 376 mday            1.41       
 377 mday            1.32          msg->op->_op_dest = this;
 378 mike            1.119.12.1    msg->op->_request.insert_front(msg);
 379 konrad.r        1.108         try {
 380 mike            1.119.12.1      _incoming.enqueue_wait(msg->op);
 381 konrad.r        1.108           _polling_sem.signal();
 382                               } catch (const ListClosed &) 
 383                               { 
 384 denise.eckstein 1.116              // This means the queue has already been shut-down (happens  when there
 385 konrad.r        1.108          // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
 386                                // processed.
 387                                 delete msg; 
 388                               }
 389 konrad.r        1.112         catch (const Permission &)
 390                               {
 391                                 delete msg;
 392                               }
 393 mday            1.7        }
 394                            
 395 mday            1.22       
 396                            
 397 kumpf           1.85       void MessageQueueService::enqueue(Message *msg)
 398 mday            1.22       {
 399 kumpf           1.28          PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 400                            
 401 mday            1.22          Base::enqueue(msg);
 402 kumpf           1.28       
 403                               PEG_METHOD_EXIT();
 404 mday            1.22       }
 405                            
 406                            
 407 kumpf           1.103      PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
 408                                void * parm)
 409                            {
 410 konrad.r        1.107          MessageQueueService* service =
 411                                        reinterpret_cast<MessageQueueService*>(parm);
 412                                PEGASUS_ASSERT(service != 0);
 413 kumpf           1.103          try
 414                                {
 415                            
 416 mike            1.118              if (service->_die.get() != 0)
 417 kumpf           1.103              {
 418 denise.eckstein 1.116                  service->_threads--;
 419 kumpf           1.103                  return (0);
 420                                    }
 421                                    // pull messages off the incoming queue and dispatch them. then
 422                                    // check pending messages that are non-blocking
 423                                    AsyncOpNode *operation = 0;
 424                            
 425                                    // many operations may have been queued.
 426                                    do
 427                                    {
 428                                        try
 429                                        {
 430 mike            1.119.12.1                 operation = service->_incoming.dequeue();
 431 kumpf           1.103                  }
 432                                        catch (ListClosed &)
 433                                        {
 434 kumpf           1.104                      // ATTN: This appears to be a common loop exit path.
 435                                            //PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 436                                            //    "Caught ListClosed exception.  Exiting _req_proc.");
 437 kumpf           1.103                      break;
 438                                        }
 439                            
 440                                        if (operation)
 441                                        {
 442                                           operation->_service_ptr = service;
 443                                           service->_handle_incoming_operation(operation);
 444                                        }
 445                                    } while (operation);
 446                                }
 447                                catch (const Exception& e)
 448                                {
 449                                    PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 450                                        String("Caught exception: \"") + e.getMessage() +
 451                                            "\".  Exiting _req_proc.");
 452                                }
 453                                catch (...)
 454                                {
 455                                    PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 456                                        "Caught unrecognized exception.  Exiting _req_proc.");
 457                                }
 458 konrad.r        1.107          service->_threads--;
 459 kumpf           1.103          return(0);
 460 mday            1.1        }
 461                            
 462 mday            1.43       
 463 kumpf           1.104      void MessageQueueService::_sendwait_callback(
 464                                AsyncOpNode *op,
 465                                MessageQueue *q,
 466                                void *parm)
 467 mday            1.33       {
 468                               op->_client_sem.signal();
 469                            }
 470                            
 471 mday            1.30       
 472                            // callback function is responsible for cleaning up all resources
 473                            // including op, op->_callback_node, and op->_callback_ptr
 474 mday            1.22       void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 475                            {
 476 kumpf           1.104         if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
 477 mday            1.39          {
 478                            
 479                                  Message *msg = op->get_request();
 480 kumpf           1.104            if (msg && (msg->getMask() & message_mask::ha_async))
 481 mday            1.39             {
 482 kumpf           1.104               if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)
 483                                     {
 484                                        AsyncLegacyOperationStart *wrapper =
 485                                           static_cast<AsyncLegacyOperationStart *>(msg);
 486                                        msg = wrapper->get_action();
 487                                        delete wrapper;
 488                                     }
 489                                     else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 490                                     {
 491                                        AsyncModuleOperationStart *wrapper =
 492                                           static_cast<AsyncModuleOperationStart *>(msg);
 493                                        msg = wrapper->get_action();
 494                                        delete wrapper;
 495                                     }
 496                                     else if (msg->getType() == async_messages::ASYNC_OP_START)
 497                                     {
 498                                        AsyncOperationStart *wrapper =
 499                                           static_cast<AsyncOperationStart *>(msg);
 500                                        msg = wrapper->get_action();
 501                                        delete wrapper;
 502                                     }
 503 kumpf           1.104               delete msg;
 504 mday            1.39             }
 505                            
 506                                  msg = op->get_response();
 507 kumpf           1.104            if (msg && (msg->getMask() & message_mask::ha_async))
 508 mday            1.39             {
 509 kumpf           1.104               if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)
 510                                     {
 511                                        AsyncLegacyOperationResult *wrapper =
 512                                           static_cast<AsyncLegacyOperationResult *>(msg);
 513                                        msg = wrapper->get_result();
 514                                        delete wrapper;
 515                                     }
 516                                     else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 517                                     {
 518                                        AsyncModuleOperationResult *wrapper =
 519                                           static_cast<AsyncModuleOperationResult *>(msg);
 520                                        msg = wrapper->get_result();
 521                                        delete wrapper;
 522                                     }
 523 mday            1.39             }
 524                                  void (*callback)(Message *, void *, void *) = op->__async_callback;
 525                                  void *handle = op->_callback_handle;
 526                                  void *parm = op->_callback_parameter;
 527                                  op->release();
 528                                  return_op(op);
 529                                  callback(msg, handle, parm);
 530                               }
 531 kumpf           1.104         else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
 532 mday            1.39          {
 533 kumpf           1.104            // note that _callback_node may be different from op
 534                                  // op->_callback_response_q is a "this" pointer we can use for
 535 mday            1.39             // static callback methods
 536                                  op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 537                               }
 538 mday            1.22       }
 539                            
 540 mday            1.1        
 541 kumpf           1.104      void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
 542 mday            1.1        {
 543 kumpf           1.104         if (operation != 0)
 544 mday            1.5           {
 545 kumpf           1.104      
 546                            // ATTN: optimization
 547 mday            1.22       // << Tue Feb 19 14:10:38 2002 mdd >>
 548 mday            1.6              operation->lock();
 549 kumpf           1.104      
 550 mike            1.119.12.1       Message *rq = operation->_request.front();
 551 kumpf           1.104      
 552 mday            1.31       // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 553 kumpf           1.104      // move this to the bottom of the loop when the majority of
 554                            // messages become async messages.
 555 mday            1.31       
 556 mday            1.18             // divert legacy messages to handleEnqueue
 557 mday            1.29             if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
 558 mday            1.18             {
 559 mike            1.119.12.1          rq = operation->_request.remove_front() ;
 560 kumpf           1.104               operation->unlock();
 561                                     // delete the op node
 562                                     operation->release();
 563                                     return_op(operation);
 564 mday            1.24       
 565 kumpf           1.104               handleEnqueue(rq);
 566                                     return;
 567 mday            1.18             }
 568                            
 569 kumpf           1.104            if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
 570                                       operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
 571                                      (operation->_state & ASYNC_OPSTATE_COMPLETE))
 572 mday            1.29             {
 573 kumpf           1.104               operation->unlock();
 574                                     _handle_async_callback(operation);
 575 mday            1.29             }
 576 kumpf           1.104            else
 577 mday            1.29             {
 578 kumpf           1.104               PEGASUS_ASSERT(rq != 0);
 579                                     operation->unlock();
 580                                     _handle_async_request(static_cast<AsyncRequest *>(rq));
 581 mday            1.29             }
 582 mday            1.5           }
 583                               return;
 584 mday            1.1        }
 585                            
 586                            void MessageQueueService::_handle_async_request(AsyncRequest *req)
 587                            {
 588 kumpf           1.104         if (req != 0)
 589 mday            1.4           {
 590                                  req->op->processing();
 591 kumpf           1.104      
 592 mday            1.4              Uint32 type = req->getType();
 593 kumpf           1.104            if (type == async_messages::HEARTBEAT)
 594                                     handle_heartbeat_request(req);
 595 mday            1.4              else if (type == async_messages::IOCTL)
 596 kumpf           1.104               handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 597 mday            1.4              else if (type == async_messages::CIMSERVICE_START)
 598 kumpf           1.104               handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 599 mday            1.4              else if (type == async_messages::CIMSERVICE_STOP)
 600 kumpf           1.104               handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 601 mday            1.4              else if (type == async_messages::CIMSERVICE_PAUSE)
 602 kumpf           1.104               handle_CimServicePause(static_cast<CimServicePause *>(req));
 603 mday            1.4              else if (type == async_messages::CIMSERVICE_RESUME)
 604 kumpf           1.104               handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 605                                  else if (type == async_messages::ASYNC_OP_START)
 606                                     handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 607                                  else
 608 mday            1.4              {
 609 kumpf           1.104               // we don't handle this request message
 610                                     _make_response(req, async_results::CIM_NAK);
 611 mday            1.4              }
 612 mday            1.1           }
 613                            }
 614                            
 615 mday            1.17       
 616                            Boolean MessageQueueService::_enqueueResponse(
 617 kumpf           1.104         Message* request,
 618 mday            1.17          Message* response)
 619                            {
 620 w.white         1.102      
 621                              STAT_COPYDISPATCHER
 622                            
 623 kumpf           1.37          PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 624                                                "MessageQueueService::_enqueueResponse");
 625 mday            1.25       
 626 kumpf           1.104         if (request->getMask() & message_mask::ha_async)
 627 mday            1.25          {
 628 kumpf           1.104            if (response->getMask() & message_mask::ha_async)
 629 mday            1.25             {
 630 kumpf           1.104               _completeAsyncResponse(static_cast<AsyncRequest *>(request),
 631                                                            static_cast<AsyncReply *>(response),
 632                                                            ASYNC_OPSTATE_COMPLETE, 0);
 633 kumpf           1.37                PEG_METHOD_EXIT();
 634 kumpf           1.104               return true;
 635 mday            1.25             }
 636                               }
 637 kumpf           1.104      
 638                               if (request->_async != 0)
 639 mday            1.17          {
 640                                  Uint32 mask = request->_async->getMask();
 641 kumpf           1.104            PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request));
 642                            
 643 mday            1.18             AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 644                                  AsyncOpNode *op = async->op;
 645                                  request->_async = 0;
 646 mday            1.63             // the legacy request is going to be deleted by its handler
 647 kumpf           1.104            // remove it from the op node
 648 mday            1.63       
 649                                  static_cast<AsyncLegacyOperationStart *>(async)->get_action();
 650 kumpf           1.104      
 651                                  AsyncLegacyOperationResult *async_result =
 652                                     new AsyncLegacyOperationResult(
 653                                        async->getKey(),
 654                                        async->getRouting(),
 655                                        op,
 656                                        response);
 657                                  _completeAsyncResponse(
 658                                     async,
 659                                     async_result,
 660                                     ASYNC_OPSTATE_COMPLETE,
 661                                     0);
 662 kumpf           1.37             PEG_METHOD_EXIT();
 663 mday            1.18             return true;
 664 mday            1.17          }
 665 kumpf           1.104      
 666 mday            1.18          // ensure that the destination queue is in response->dest
 667 kumpf           1.37          PEG_METHOD_EXIT();
 668 mday            1.24          return SendForget(response);
 669 kumpf           1.104      
 670 mday            1.17       }
 671                            
 672 mday            1.18       void MessageQueueService::_make_response(Message *req, Uint32 code)
 673 mday            1.1        {
 674 mday            1.19          cimom::_make_response(req, code);
 675 mday            1.1        }
 676                            
 677                            
 678 kumpf           1.104      void MessageQueueService::_completeAsyncResponse(
 679                                AsyncRequest *request,
 680                                AsyncReply *reply,
 681                                Uint32 state,
 682                                Uint32 flag)
 683 mday            1.5        {
 684 kumpf           1.37          PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 685                                                "MessageQueueService::_completeAsyncResponse");
 686                            
 687 mday            1.19          cimom::_completeAsyncResponse(request, reply, state, flag);
 688 kumpf           1.37       
 689                               PEG_METHOD_EXIT();
 690 mday            1.5        }
 691                            
 692                            
 693 kumpf           1.104      void MessageQueueService::_complete_op_node(
 694                                AsyncOpNode *op,
 695                                Uint32 state,
 696                                Uint32 flag,
 697                                Uint32 code)
 698 mday            1.32       {
 699                               cimom::_complete_op_node(op, state, flag, code);
 700                            }
 701                            
 702 mday            1.5        
 703                            Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 704                            {
 705 mike            1.118         if (_incoming_queue_shutdown.get() > 0)
 706 mday            1.8              return false;
 707 kumpf           1.104         if (_polling_thread == NULL)
 708                               {
 709                                  _polling_thread = new Thread(
 710                                      polling_routine,
 711                                      reinterpret_cast<void *>(&_polling_list),
 712                                      false);
 713 konrad.r        1.113            ThreadStatus tr = PEGASUS_THREAD_OK;
 714                                  while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
 715 a.arora         1.93             {
 716 denise.eckstein 1.116              if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
 717 konrad.r        1.113                 pegasus_yield();
 718 denise.eckstein 1.116              else
 719                                       throw Exception(MessageLoaderParms("Common.MessageQueueService.NOT_ENOUGH_THREAD",
 720                                                    "Could not allocate thread for the polling thread."));
 721 a.arora         1.93             }
 722 kumpf           1.104         }
 723                            // ATTN optimization remove the message checking altogether in the base
 724 mday            1.20       // << Mon Feb 18 14:02:20 2002 mdd >>
 725 mday            1.6           op->lock();
 726 mike            1.119.12.1    Message *rq = op->_request.front();
 727                               Message *rp = op->_response.front();
 728 mday            1.6           op->unlock();
 729 kumpf           1.104      
 730                               if ((rq != 0 && (true == messageOK(rq))) ||
 731 mike            1.118             (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
 732 mday            1.5           {
 733 mike            1.119.12.1       _incoming.enqueue_wait(op);
 734 mday            1.53             _polling_sem.signal();
 735 mday            1.5              return true;
 736                               }
 737                               return false;
 738                            }
 739                            
 740                            Boolean MessageQueueService::messageOK(const Message *msg)
 741                            {
 742 mike            1.118         if (_incoming_queue_shutdown.get() > 0)
 743 mday            1.8              return false;
 744 mday            1.18          return true;
 745                            }
 746                            
 747 mday            1.1        void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 748                            {
 749 kumpf           1.104         // default action is to echo a heartbeat response
 750                            
 751                               AsyncReply *reply = new AsyncReply(
 752                                  async_messages::HEARTBEAT,
 753                                  req->getKey(),
 754                                  req->getRouting(),
 755                                  0,
 756                                  req->op,
 757                                  async_results::OK,
 758                                  req->resp,
 759                                  false);
 760                               _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
 761 mday            1.1        }
 762                            
 763                            
 764                            void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
 765 kumpf           1.104      {
 766 mday            1.1        }
 767 kumpf           1.104      
 768 mday            1.1        void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 769                            {
 770 kumpf           1.104         switch (req->ctl)
 771 mday            1.8           {
 772                                  case AsyncIoctl::IO_CLOSE:
 773                                  {
 774 kumpf           1.104               MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
 775 kumpf           1.81       
 776                            #ifdef MESSAGEQUEUESERVICE_DEBUG
 777 kumpf           1.104               PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
 778 kumpf           1.81       #endif
 779 kumpf           1.104      
 780                                     // respond to this message. this is fire and forget, so we don't need to delete anything.
 781                                     // this takes care of two problems that were being found
 782                                     // << Thu Oct  9 10:52:48 2003 mdd >>
 783                                      _make_response(req, async_results::OK);
 784                                     // ensure we do not accept any further messages
 785                            
 786                                     // ensure we don't recurse on IO_CLOSE
 787 mike            1.118               if (_incoming_queue_shutdown.get() > 0)
 788 kumpf           1.104                  break;
 789                            
 790                                     // set the closing flag
 791                                     service->_incoming_queue_shutdown = 1;
 792                                     // empty out the queue
 793                                     while (1)
 794                                     {
 795                                        AsyncOpNode *operation;
 796                                        try
 797                                        {
 798 mike            1.119.12.1                operation = service->_incoming.dequeue();
 799 kumpf           1.104                  }
 800                                        catch(IPCException &)
 801                                        {
 802                                           break;
 803                                        }
 804                                        if (operation)
 805                                        {
 806                                           operation->_service_ptr = service;
 807                                           service->_handle_incoming_operation(operation);
 808                                        }
 809                                        else
 810                                           break;
 811                                     } // message processing loop
 812                            
 813 mike            1.119.12.1          // shutdown the AsyncQueue
 814 kumpf           1.104               service->_incoming.shutdown_queue();
 815                                     return;
 816 mday            1.8              }
 817                            
 818                                  default:
 819 kumpf           1.104               _make_response(req, async_results::CIM_NAK);
 820 mday            1.8           }
 821 mday            1.1        }
 822 mday            1.8        
 823 mday            1.1        void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 824                            {
 825 mday            1.79       
 826 kumpf           1.81       #ifdef MESSAGEQUEUESERVICE_DEBUG
 827 mday            1.79          PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
 828 kumpf           1.81       #endif
 829 kumpf           1.104      
 830 mday            1.10          // clear the stoped bit and update
 831 mday            1.13          _capabilities &= (~(module_capabilities::stopped));
 832 mday            1.10          _make_response(req, async_results::OK);
 833 kumpf           1.104         // now tell the meta dispatcher we are stopped
 834 mday            1.10          update_service(_capabilities, _mask);
 835                            
 836 mday            1.1        }
 837                            void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 838                            {
 839 kumpf           1.81       #ifdef MESSAGEQUEUESERVICE_DEBUG
 840 mday            1.79          PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 841 kumpf           1.81       #endif
 842 mday            1.10          // set the stopeed bit and update
 843                               _capabilities |= module_capabilities::stopped;
 844                               _make_response(req, async_results::CIM_STOPPED);
 845 kumpf           1.104         // now tell the meta dispatcher we are stopped
 846 mday            1.10          update_service(_capabilities, _mask);
 847 mday            1.1        }
 848 kumpf           1.104      
 849 mday            1.1        void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 850                            {
 851 mday            1.10          // set the paused bit and update
 852 mday            1.13          _capabilities |= module_capabilities::paused;
 853 mday            1.11          update_service(_capabilities, _mask);
 854 mday            1.10          _make_response(req, async_results::CIM_PAUSED);
 855 kumpf           1.104         // now tell the meta dispatcher we are stopped
 856 mday            1.1        }
 857 kumpf           1.104      
 858 mday            1.1        void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 859                            {
 860 mday            1.10          // clear the paused  bit and update
 861 mday            1.13          _capabilities &= (~(module_capabilities::paused));
 862 mday            1.11          update_service(_capabilities, _mask);
 863 mday            1.10          _make_response(req, async_results::OK);
 864 kumpf           1.104         // now tell the meta dispatcher we are stopped
 865 mday            1.1        }
 866 kumpf           1.104      
 867 mday            1.1        void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 868                            {
 869                               _make_response(req, async_results::CIM_NAK);
 870                            }
 871                            
 872                            void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 873                            {
 874 mday            1.14          ;
 875                            }
 876                            
 877 mday            1.10       
 878 mday            1.14       void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
 879                            {
 880                               // remove the legacy message from the request and enqueue it to its destination
 881                               Uint32 result = async_results::CIM_NAK;
 882 kumpf           1.104      
 883 mday            1.25          Message *legacy = req->_act;
 884 kumpf           1.104         if (legacy != 0)
 885 mday            1.14          {
 886 mday            1.25             MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 887 kumpf           1.104            if (queue != 0)
 888 mday            1.14             {
 889 kumpf           1.104               if (queue->isAsync() == true)
 890                                     {
 891                                        (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
 892                                     }
 893                                     else
 894                                     {
 895                                        // Enqueue the response:
 896                                        queue->enqueue(req->get_action());
 897                                     }
 898                            
 899                                     result = async_results::OK;
 900 mday            1.14             }
 901                               }
 902                               _make_response(req, result);
 903                            }
 904                            
 905                            void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
 906                            {
 907                               ;
 908 mday            1.1        }
 909                            
 910 kumpf           1.104      AsyncOpNode *MessageQueueService::get_op()
 911 mday            1.1        {
 912 mday            1.4           AsyncOpNode *op = new AsyncOpNode();
 913 kumpf           1.104      
 914 mday            1.9           op->_state = ASYNC_OPSTATE_UNKNOWN;
 915                               op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 916 kumpf           1.104      
 917 mday            1.1           return op;
 918                            }
 919                            
 920                            void MessageQueueService::return_op(AsyncOpNode *op)
 921                            {
 922 kumpf           1.104         PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED);
 923 mday            1.4           delete op;
 924 mday            1.1        }
 925                            
 926 mday            1.18       
 927 kumpf           1.104      Boolean MessageQueueService::ForwardOp(
 928                                AsyncOpNode *op,
 929                                Uint32 destination)
 930 mday            1.29       {
 931 kumpf           1.104         PEGASUS_ASSERT(op != 0);
 932 mday            1.30          op->lock();
 933                               op->_op_dest = MessageQueue::lookup(destination);
 934 mday            1.29          op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
 935                               op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
 936 mday            1.30          op->unlock();
 937 kumpf           1.104         if (op->_op_dest == 0)
 938 mday            1.30             return false;
 939 kumpf           1.104      
 940 mday            1.29          return  _meta_dispatcher->route_async(op);
 941                            }
 942                            
 943 kumpf           1.104      
 944                            Boolean MessageQueueService::SendAsync(
 945                                AsyncOpNode *op,
 946                                Uint32 destination,
 947                                void (*callback)(AsyncOpNode *, MessageQueue *, void *),
 948                                MessageQueue *callback_response_q,
 949                                void *callback_ptr)
 950                            {
 951                               PEGASUS_ASSERT(op != 0 && callback != 0);
 952                            
 953 mday            1.21          // get the queue handle for the destination
 954                            
 955 mday            1.30          op->lock();
 956 mday            1.32          op->_op_dest = MessageQueue::lookup(destination); // destination of this message
 957 mday            1.22          op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 958                               op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 959 mday            1.30          // initialize the callback data
 960 mday            1.32          op->_async_callback = callback;   // callback function to be executed by recpt. of response
 961                               op->_callback_node = op;          // the op node
 962                               op->_callback_response_q = callback_response_q;  // the queue that will receive the response
 963                               op->_callback_ptr = callback_ptr;   // user data for callback
 964                               op->_callback_request_q = this;     // I am the originator of this request
 965 kumpf           1.104      
 966 mday            1.30          op->unlock();
 967 kumpf           1.104         if (op->_op_dest == 0)
 968 mday            1.30             return false;
 969 kumpf           1.104      
 970 mday            1.21          return  _meta_dispatcher->route_async(op);
 971 mday            1.18       }
 972                            
 973                            
 974 kumpf           1.104      Boolean MessageQueueService::SendAsync(
 975                                Message *msg,
 976                                Uint32 destination,
 977                                void (*callback)(Message *response, void *handle, void *parameter),
 978                                void *handle,
 979                                void *parameter)
 980 mday            1.39       {
 981 kumpf           1.104         if (msg == NULL)
 982 mday            1.39             return false;
 983 kumpf           1.104         if (callback == NULL)
 984 mday            1.39             return SendForget(msg);
 985                               AsyncOpNode *op = get_op();
 986 mday            1.43          msg->dest = destination;
 987 kumpf           1.104         if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 988 mday            1.39          {
 989                                  op->release();
 990                                  return_op(op);
 991                                  return false;
 992                               }
 993                               op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 994                               op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 995                               op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 996                               op->__async_callback = callback;
 997                               op->_callback_node = op;
 998                               op->_callback_handle = handle;
 999                               op->_callback_parameter = parameter;
1000                               op->_callback_response_q = this;
1001                            
1002 kumpf           1.104         if (!(msg->getMask() & message_mask::ha_async))
1003 mday            1.39          {
1004 kumpf           1.104            AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
1005                                     get_next_xid(),
1006                                     op,
1007                                     destination,
1008                                     msg,
1009                                     destination);
1010 mday            1.39          }
1011 kumpf           1.104         else
1012 mday            1.39          {
1013 mike            1.119.12.1       op->_request.insert_front(msg);
1014 mday            1.39             (static_cast<AsyncMessage *>(msg))->op = op;
1015                               }
1016                               return _meta_dispatcher->route_async(op);
1017                            }
1018                            
1019                            
1020 mday            1.18       Boolean MessageQueueService::SendForget(Message *msg)
1021                            {
1022                               AsyncOpNode *op = 0;
1023 mday            1.22          Uint32 mask = msg->getMask();
1024 kumpf           1.104      
1025 mday            1.22          if (mask & message_mask::ha_async)
1026 mday            1.18          {
1027                                  op = (static_cast<AsyncMessage *>(msg))->op ;
1028                               }
1029 mday            1.22       
1030 kumpf           1.104         if (op == 0)
1031 mday            1.20          {
1032 mday            1.18             op = get_op();
1033 mike            1.119.12.1       op->_request.insert_front(msg);
1034 mday            1.22             if (mask & message_mask::ha_async)
1035 kumpf           1.104            {
1036                                     (static_cast<AsyncMessage *>(msg))->op = op;
1037                                  }
1038 mday            1.20          }
1039 mday            1.30          op->_op_dest = MessageQueue::lookup(msg->dest);
1040 mday            1.22          op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
1041 kumpf           1.104         op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
1042                                   | ASYNC_OPFLAGS_SIMPLE_STATUS);
1043 mday            1.22          op->_state &= ~ASYNC_OPSTATE_COMPLETE;
1044 kumpf           1.104         if (op->_op_dest == 0)
1045 mday            1.39          {
1046                                  op->release();
1047                                  return_op(op);
1048 mday            1.21             return false;
1049 mday            1.39          }
1050 kumpf           1.104      
1051 mday            1.18          // now see if the meta dispatcher will take it
1052 mday            1.30          return  _meta_dispatcher->route_async(op);
1053 mday            1.18       }
1054 mday            1.2        
1055 mday            1.1        
1056 mday            1.4        AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
1057 mday            1.1        {
1058 kumpf           1.104         if (request == 0)
1059 mday            1.4              return 0 ;
1060 mday            1.5        
1061                               Boolean destroy_op = false;
1062 kumpf           1.104      
1063 s.hills         1.80          if (request->op == 0)
1064 mday            1.5           {
1065                                  request->op = get_op();
1066 mike            1.119.12.1       request->op->_request.insert_front(request);
1067 mday            1.5              destroy_op = true;
1068                               }
1069 kumpf           1.104      
1070 mday            1.33          request->block = false;
1071 mday            1.35          request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
1072 kumpf           1.104         SendAsync(
1073                                  request->op,
1074                                  request->dest,
1075                                  _sendwait_callback,
1076                                  this,
1077                                  (void *)0);
1078                            
1079 baggio.br       1.100         request->op->_client_sem.wait();
1080                            
1081 mday            1.6           request->op->lock();
1082 mike            1.119.12.1    AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_front());
1083 mday            1.6           rpl->op = 0;
1084                               request->op->unlock();
1085 kumpf           1.104      
1086                               if (destroy_op == true)
1087 mday            1.5           {
1088 mday            1.6              request->op->lock();
1089                                  request->op->_request.remove(request);
1090                                  request->op->_state |= ASYNC_OPSTATE_RELEASED;
1091                                  request->op->unlock();
1092                                  return_op(request->op);
1093 mday            1.7              request->op = 0;
1094 mday            1.5           }
1095                               return rpl;
1096 mday            1.1        }
1097                            
1098                            
1099 kumpf           1.104      Boolean MessageQueueService::register_service(
1100                                String name,
1101                                Uint32 capabilities,
1102                                Uint32 mask)
1103                            {
1104                               RegisterCimService *msg = new RegisterCimService(
1105                                  get_next_xid(),
1106                                  0,
1107                                  true,
1108                                  name,
1109                                  capabilities,
1110                                  mask,
1111                                  _queueId);
1112                               msg->dest = CIMOM_Q_ID;
1113 mday            1.1        
1114                               Boolean registered = false;
1115 kumpf           1.104         AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1116                            
1117                               if (reply != 0)
1118                               {
1119                                  if (reply->getMask() & message_mask::ha_async)
1120                                  {
1121                                     if (reply->getMask() & message_mask::ha_reply)
1122                                     {
1123                                        if (reply->result == async_results::OK ||
1124                                            reply->result == async_results::MODULE_ALREADY_REGISTERED)
1125                                        {
1126                                            registered = true;
1127                                        }
1128                                     }
1129 mday            1.1              }
1130 kumpf           1.104      
1131                                  delete reply;
1132 mday            1.1           }
1133 mday            1.5           delete msg;
1134 mday            1.1           return registered;
1135                            }
1136                            
1137                            Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1138                            {
1139 kumpf           1.104         UpdateCimService *msg = new UpdateCimService(
1140                                  get_next_xid(),
1141                                  0,
1142                                  true,
1143                                  _queueId,
1144                                  _capabilities,
1145                                  _mask);
1146 mday            1.1           Boolean registered = false;
1147 mday            1.2        
1148                               AsyncMessage *reply = SendWait(msg);
1149                               if (reply)
1150 mday            1.1           {
1151 kumpf           1.104            if (reply->getMask() & message_mask::ha_async)
1152 mday            1.1              {
1153 kumpf           1.104               if (reply->getMask() & message_mask::ha_reply)
1154                                     {
1155                                        if (static_cast<AsyncReply *>(reply)->result == async_results::OK)
1156                                        {
1157                                           registered = true;
1158                                        }
1159                                     }
1160 mday            1.1              }
1161                                  delete reply;
1162                               }
1163 mday            1.5           delete msg;
1164 mday            1.1           return registered;
1165                            }
1166                            
1167                            
1168 kumpf           1.104      Boolean MessageQueueService::deregister_service()
1169 mday            1.1        {
1170 mday            1.3        
1171 mday            1.5           _meta_dispatcher->deregister_module(_queueId);
1172                               return true;
1173 mday            1.1        }
1174                            
1175                            
1176 kumpf           1.104      void MessageQueueService::find_services(
1177                                String name,
1178                                Uint32 capabilities,
1179                                Uint32 mask,
1180                                Array<Uint32> *results)
1181 mday            1.1        {
1182 kumpf           1.104         if (results == 0)
1183                               {
1184 mday            1.1              throw NullPointer();
1185 kumpf           1.104         }
1186                            
1187 mday            1.1           results->clear();
1188 kumpf           1.104      
1189                               FindServiceQueue *req = new FindServiceQueue(
1190                                  get_next_xid(),
1191                                  0,
1192                                  _queueId,
1193                                  true,
1194                                  name,
1195                                  capabilities,
1196                                  mask);
1197                            
1198 mday            1.44          req->dest = CIMOM_Q_ID;
1199 kumpf           1.104      
1200                               AsyncMessage *reply = SendWait(req);
1201                               if (reply)
1202                               {
1203                                  if (reply->getMask() & message_mask::ha_async)
1204                                  {
1205                                     if (reply->getMask() & message_mask::ha_reply)
1206                                     {
1207                                        if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1208                                        {
1209                                           if ((static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK)
1210                                              *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1211                                        }
1212                                     }
1213 mday            1.1              }
1214                                  delete reply;
1215                               }
1216 mday            1.5           delete req;
1217 mday            1.1           return ;
1218                            }
1219                            
1220                            void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1221                            {
1222 kumpf           1.104         if (result == 0)
1223                               {
1224 mday            1.1              throw NullPointer();
1225 kumpf           1.104         }
1226                            
1227                               EnumerateService *req = new EnumerateService(
1228                                  get_next_xid(),
1229                                  0,
1230                                  _queueId,
1231                                  true,
1232                                  queue);
1233                            
1234 mday            1.2           AsyncMessage *reply = SendWait(req);
1235 kumpf           1.104      
1236 mday            1.2           if (reply)
1237 mday            1.1           {
1238                                  Boolean found = false;
1239 kumpf           1.104      
1240                                  if (reply->getMask() & message_mask::ha_async)
1241 mday            1.1              {
1242 kumpf           1.104               if (reply->getMask() & message_mask::ha_reply)
1243                                     {
1244                                        if (reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1245                                        {
1246                                           if ((static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK)
1247                                           {
1248                                              if (found == false)
1249                                              {
1250                                                 found = true;
1251                            
1252                                                 result->put_name((static_cast<EnumerateServiceResponse *>(reply))->name);
1253                                                 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1254                                                 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1255                                                 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1256                                              }
1257                                           }
1258                                        }
1259                                     }
1260 mday            1.1              }
1261                                  delete reply;
1262                               }
1263 mday            1.5           delete req;
1264 kumpf           1.104      
1265 mday            1.1           return;
1266                            }
1267                            
1268 kumpf           1.104      Uint32 MessageQueueService::get_next_xid()
1269 mday            1.1        {
1270 kumpf           1.119.12.2    AutoMutex autoMut(_xidMutex);
1271                               return ++_xid;
1272 mday            1.1        }
1273                            
1274                            PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2