(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

   1 karl  1.91 //%2005////////////////////////////////////////////////////////////////////////
   2 mday  1.1  //
   3 karl  1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
   4            // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
   5            // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
   6 karl  1.82 // IBM Corp.; EMC Corporation, The Open Group.
   7 karl  1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
   8            // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
   9 karl  1.91 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
  10            // EMC Corporation; VERITAS Software Corporation; The Open Group.
  11 mday  1.1  //
  12            // Permission is hereby granted, free of charge, to any person obtaining a copy
  13            // of this software and associated documentation files (the "Software"), to
  14            // deal in the Software without restriction, including without limitation the
  15            // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  16            // sell copies of the Software, and to permit persons to whom the Software is
  17            // furnished to do so, subject to the following conditions:
  18 karl  1.82 // 
  19 mday  1.1  // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
  20            // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
  21            // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
  22            // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
  23            // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
  24            // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
  25            // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  26            // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  27            //
  28            //==============================================================================
  29            //
  30            // Author: Mike Day (mdday@us.ibm.com)
  31            //
  32            // Modified By:
  33 a.arora 1.93 //              Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090,#2657
  34 joyce.j 1.101 //              Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#3259
  35 jim.wunderlich 1.114 //              Jim Wunderlich (Jim_Wunderlich@prodigy.net)
  36 mday           1.1   //
  37                      //%/////////////////////////////////////////////////////////////////////////////
  38                      
  39 jim.wunderlich 1.111 // #include <iostream.h>
  40 mday           1.1   #include "MessageQueueService.h"
  41 mday           1.22  #include <Pegasus/Common/Tracer.h>
  42 humberto       1.71  #include <Pegasus/Common/MessageLoader.h> //l10n
  43 mday           1.1   
  44                      PEGASUS_NAMESPACE_BEGIN
  45                      
  46 mday           1.15  cimom *MessageQueueService::_meta_dispatcher = 0;
  47 mike           1.118 AtomicInt MessageQueueService::_service_count(0);
  48 mday           1.15  AtomicInt MessageQueueService::_xid(1);
  49 mday           1.38  Mutex MessageQueueService::_meta_dispatcher_mutex;
  50 mday           1.15  
  51 kumpf          1.104 static struct timeval deallocateWait = {300, 0};
  52 mday           1.53  
  53 mday           1.55  ThreadPool *MessageQueueService::_thread_pool = 0;
  54 mday           1.53  
  55                      DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
  56                      
  57 kumpf          1.62  Thread* MessageQueueService::_polling_thread = 0;
  58 mday           1.61  
  59 kumpf          1.104 ThreadPool *MessageQueueService::get_thread_pool()
  60 mday           1.69  {
  61                         return _thread_pool;
  62                      }
  63 kumpf          1.117 
  64 jim.wunderlich 1.110 //
  65 kumpf          1.117 // MAX_THREADS_PER_SVC_QUEUE
  66 jim.wunderlich 1.110 //
  67                      // JR Wunderlich Jun 6, 2005
  68                      //
  69                      
  70                      #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000 
  71 jim.wunderlich 1.114 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
  72 jim.wunderlich 1.110 
  73 kumpf          1.117 #ifndef MAX_THREADS_PER_SVC_QUEUE
  74                      # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
  75                      #endif
  76                      
  77 jim.wunderlich 1.110 Uint32 max_threads_per_svc_queue;
  78 mday           1.69  
  79 kumpf          1.104 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL
  80                      MessageQueueService::kill_idle_threads(void *parm)
  81 mday           1.53  {
  82 mday           1.69  
  83                         static struct timeval now, last = {0,0};
  84 mday           1.53     gettimeofday(&now, NULL);
  85 mday           1.56     int dead_threads = 0;
  86 kumpf          1.104 
  87                         if (now.tv_sec - last.tv_sec > 120)
  88 mday           1.53     {
  89                            gettimeofday(&last, NULL);
  90 kumpf          1.104       try
  91 mday           1.56        {
  92 kumpf          1.104          dead_threads = MessageQueueService::_thread_pool->cleanupIdleThreads();
  93 mday           1.56        }
  94 mday           1.69        catch(...)
  95 mday           1.56        {
  96                      
  97                            }
  98 mday           1.53     }
  99 jenny.yu       1.92  
 100 w.otsuka       1.94  #ifdef PEGASUS_POINTER_64BIT
 101 jenny.yu       1.92     return (PEGASUS_THREAD_RETURN)(Uint64)dead_threads;
 102                      #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX
 103                         return (PEGASUS_THREAD_RETURN)(unsigned long)dead_threads;
 104                      #else
 105                         return (PEGASUS_THREAD_RETURN)(Uint32)dead_threads;
 106                      #endif
 107 mday           1.53  }
 108                      
 109                      PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
 110                      {
 111                         Thread *myself = reinterpret_cast<Thread *>(parm);
 112                         DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
 113 mike           1.118    while (_stop_polling.get()  == 0)
 114 mday           1.53     {
 115 baggio.br      1.100       _polling_sem.wait();
 116                      
 117 mike           1.118       if (_stop_polling.get() != 0)
 118 mday           1.61        {
 119 kumpf          1.62           break;
 120 mday           1.61        }
 121 kumpf          1.104 
 122 denise.eckstein 1.116       // The polling_routine thread must hold the lock on the
 123                             // _polling_thread list while processing incoming messages.
 124                             // This lock is used to give this thread ownership of
 125                             // services on the _polling_routine list.
 126                       
 127                             // This is necessary to avoid confict with other threads
 128                             // processing the _polling_list
 129                             // (e.g., MessageQueueServer::~MessageQueueService).
 130                       
 131 mday            1.53        list->lock();
 132                             MessageQueueService *service = list->next(0);
 133 denise.eckstein 1.116       ThreadStatus rtn = PEGASUS_THREAD_OK;
 134                             while (service != NULL)
 135                             {
 136                                 if ((service->_incoming.count() > 0) &&
 137 mike            1.118               (service->_die.get() == 0) &&
 138                                     (service->_threads.get() < max_threads_per_svc_queue))
 139 denise.eckstein 1.116           {
 140                                    // The _threads count is used to track the 
 141                                    // number of active threads that have been allocated
 142                                    // to process messages for this service.
 143                       
 144                                    // The _threads count MUST be incremented while
 145                                    // the polling_routine owns the _polling_thread
 146                                    // lock and has ownership of the service object.
 147                       
 148                                    service->_threads++;
 149                                    try
 150                                    {
 151                                        rtn = _thread_pool->allocate_and_awaken(
 152                                             service, _req_proc, &_polling_sem);
 153                                    }
 154                                    catch (...)
 155                                    {
 156                                        service->_threads--;
 157                       
 158                                        // allocate_and_awaken should never generate an exception.
 159                                        PEGASUS_ASSERT(0);
 160 denise.eckstein 1.116              }
 161                                    // if no more threads available, break from processing loop
 162                                    if (rtn != PEGASUS_THREAD_OK )
 163                                    {
 164                                        service->_threads--;
 165                                        Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
 166                                           "Not enough threads to process this request. Skipping.");
 167                       
 168                                        Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 169                                           "Could not allocate thread for %s. " \
 170                                           "Queue has %d messages waiting and %d threads servicing." \
 171                                           "Skipping the service for right now. ",
 172                                           service->getQueueName(),
 173                                           service->_incoming.count(),
 174 mike            1.118                     service->_threads.get());
 175 denise.eckstein 1.116 
 176                                        pegasus_yield();
 177                                        service = NULL;
 178                                     } 
 179                                 }
 180                                 if (service != NULL) 
 181                                 {
 182                                    service = list->next(service);
 183                                 }
 184                             }
 185 mday            1.53        list->unlock();
 186 jim.wunderlich  1.110 
 187 mike            1.118       if (_check_idle_flag.get() != 0)
 188 mday            1.69        {
 189 kumpf           1.104          _check_idle_flag = 0;
 190 denise.eckstein 1.116          // try to do idle thread clean up processing when system is not busy
 191                                // if system is busy there may not be a thread available to allocate 
 192                                // so nothing will be done and that is OK. 
 193 kumpf           1.84  
 194 konrad.r        1.113          if ( _thread_pool->allocate_and_awaken(service, kill_idle_threads, &_polling_sem) != PEGASUS_THREAD_OK)
 195 denise.eckstein 1.116          {
 196                                       Logger::put(Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
 197                                               "Not enough threads to kill idle threads. What an irony.");
 198 konrad.r        1.113 
 199 denise.eckstein 1.116                 Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 200                                               "Could not allocate thread to kill idle threads." \
 201                                               "Skipping. ");
 202                                }
 203                       
 204                                
 205 mday            1.69        }
 206 mday            1.53     }
 207                          myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
 208                          return(0);
 209                       }
 210                       
 211                       
 212                       Semaphore MessageQueueService::_polling_sem(0);
 213                       AtomicInt MessageQueueService::_stop_polling(0);
 214 mday            1.69  AtomicInt MessageQueueService::_check_idle_flag(0);
 215 mday            1.53  
 216 mday            1.15  
 217 kumpf           1.104 MessageQueueService::MessageQueueService(
 218                          const char *name,
 219                          Uint32 queueID,
 220                          Uint32 capabilities,
 221                          Uint32 mask)
 222 mday            1.15     : Base(name, true,  queueID),
 223 mday            1.1        _mask(mask),
 224 mday            1.4        _die(0),
 225 denise.eckstein 1.116      _threads(0),
 226 mday            1.41       _incoming(true, 0),
 227 konrad.r        1.106      _incoming_queue_shutdown(0)
 228 kumpf           1.104 {
 229 jim.wunderlich  1.110 
 230 kumpf           1.104    _capabilities = (capabilities | module_capabilities::async);
 231 mday            1.53  
 232 mday            1.1      _default_op_timeout.tv_sec = 30;
 233                          _default_op_timeout.tv_usec = 100;
 234 mday            1.15  
 235 jim.wunderlich  1.110    max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
 236                       
 237 kumpf           1.117    // if requested thread max is out of range, then set to
 238                          // MAX_THREADS_PER_SVC_QUEUE_LIMIT
 239 jim.wunderlich  1.110 
 240 kumpf           1.117    if ((max_threads_per_svc_queue < 1) ||
 241                              (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
 242                          {
 243 jim.wunderlich  1.110        max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
 244 kumpf           1.117    }
 245 jim.wunderlich  1.110 
 246 kumpf           1.117    Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
 247                             "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue);
 248 jim.wunderlich  1.110 
 249 a.arora         1.87     AutoMutex autoMut(_meta_dispatcher_mutex);
 250 kumpf           1.104 
 251                          if (_meta_dispatcher == 0)
 252 mday            1.15     {
 253 joyce.j         1.101       _stop_polling = 0;
 254 mike            1.118       PEGASUS_ASSERT(_service_count.get() == 0);
 255 mday            1.15        _meta_dispatcher = new cimom();
 256 kumpf           1.104       if (_meta_dispatcher == NULL)
 257 mday            1.15        {
 258 a.arora         1.87           throw NullPointer();
 259 mday            1.15        }
 260 jim.wunderlich  1.110       //  _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
 261                             //   minimum_cnt, maximum_cnt, deallocateWait); 
 262                             //
 263 kumpf           1.99        _thread_pool =
 264 kumpf           1.104           new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
 265 mday            1.15     }
 266                          _service_count++;
 267                       
 268 kumpf           1.104    if (false == register_service(name, _capabilities, _mask))
 269 mday            1.15     {
 270 humberto        1.71        //l10n
 271                             //throw BindFailedException("MessageQueueService Base Unable to register with  Meta Dispatcher");
 272                             MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
 273 kumpf           1.104          "MessageQueueService Base Unable to register with  Meta Dispatcher");
 274                       
 275 humberto        1.71        throw BindFailedException(parms);
 276 mday            1.15     }
 277 kumpf           1.104 
 278 mday            1.53     _polling_list.insert_last(this);
 279 kumpf           1.104 
 280 mday            1.1   }
 281                       
 282 mday            1.4   
 283 kumpf           1.104 MessageQueueService::~MessageQueueService()
 284 mday            1.1   {
 285                          _die = 1;
 286 mday            1.76  
 287 denise.eckstein 1.116    // The polling_routine locks the _polling_list while
 288                          // processing the incoming messages for services on the 
 289                          // list.  Deleting the service from the _polling_list
 290                          // prior to processing, avoids synchronization issues
 291                          // with the _polling_routine.
 292                       
 293                          _polling_list.remove(this);
 294                       
 295                          // ATTN: The code for closing the _incoming queue
 296                          // is not working correctly. In OpenPegasus 2.5,
 297                          // execution of the following code is very timing
 298                          // dependent. This needs to be fix.
 299                          // See Bug 4079 for details. 
 300 mike            1.118    if (_incoming_queue_shutdown.get() == 0)
 301 mday            1.76     {
 302 denise.eckstein 1.116        _shutdown_incoming_queue();
 303                          }
 304 konrad.r        1.107 
 305 denise.eckstein 1.116    // Wait until all threads processing the messages
 306                          // for this service have completed.
 307                       
 308 mike            1.118    while (_threads.get() > 0)
 309 denise.eckstein 1.116    {
 310                             pegasus_yield();
 311 mday            1.76     }
 312 kumpf           1.104 
 313 mday            1.15     {
 314 a.arora         1.87       AutoMutex autoMut(_meta_dispatcher_mutex);
 315                            _service_count--;
 316 mike            1.118      if (_service_count.get() == 0)
 317 a.arora         1.87       {
 318 mday            1.76  
 319 mday            1.53        _stop_polling++;
 320                             _polling_sem.signal();
 321 kumpf           1.104       if (_polling_thread) {
 322                                 _polling_thread->join();
 323                                 delete _polling_thread;
 324                                 _polling_thread = 0;
 325                             }
 326 mday            1.15        _meta_dispatcher->_shutdown_routed_queue();
 327                             delete _meta_dispatcher;
 328 kumpf           1.45        _meta_dispatcher = 0;
 329 mday            1.76  
 330 kumpf           1.65        delete _thread_pool;
 331                             _thread_pool = 0;
 332 a.arora         1.87       }
 333                          } // mutex unlocks here
 334 kumpf           1.104    // Clean up in case there are extra stuff on the queue.
 335 konrad.r        1.72    while (_incoming.count())
 336                         {
 337 konrad.r        1.109     try { 
 338                             delete _incoming.remove_first();
 339                           } catch (const ListClosed &e)
 340                           {
 341                             // If the list is closed, there is nothing we can do. 
 342                             break;
 343                           }
 344 konrad.r        1.72    }
 345 kumpf           1.104 }
 346 mday            1.1   
 347 kumpf           1.104 void MessageQueueService::_shutdown_incoming_queue()
 348 mday            1.7   {
 349 mike            1.118    if (_incoming_queue_shutdown.get() > 0)
 350 kumpf           1.104       return;
 351                       
 352                          AsyncIoctl *msg = new AsyncIoctl(
 353                             get_next_xid(),
 354                             0,
 355                             _queueId,
 356                             _queueId,
 357                             true,
 358                             AsyncIoctl::IO_CLOSE,
 359                             0,
 360                             0);
 361 mday            1.9   
 362 mday            1.8      msg->op = get_op();
 363 mday            1.41     msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
 364 kumpf           1.104    msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
 365                              | ASYNC_OPFLAGS_SIMPLE_STATUS);
 366 mday            1.32     msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 367 mday            1.41  
 368 mday            1.32     msg->op->_op_dest = this;
 369 mday            1.8      msg->op->_request.insert_first(msg);
 370 konrad.r        1.108    try {
 371                            _incoming.insert_last_wait(msg->op);
 372                            _polling_sem.signal();
 373                          } catch (const ListClosed &) 
 374                          { 
 375 denise.eckstein 1.116         // This means the queue has already been shut-down (happens  when there
 376 konrad.r        1.108     // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
 377                           // processed.
 378                            delete msg; 
 379                          }
 380 konrad.r        1.112    catch (const Permission &)
 381                          {
 382                            delete msg;
 383                          }
 384 mday            1.7   }
 385                       
 386 mday            1.22  
 387                       
 388 kumpf           1.85  void MessageQueueService::enqueue(Message *msg)
 389 mday            1.22  {
 390 kumpf           1.28     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
 391                       
 392 mday            1.22     Base::enqueue(msg);
 393 kumpf           1.28  
 394                          PEG_METHOD_EXIT();
 395 mday            1.22  }
 396                       
 397                       
 398 kumpf           1.103 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
 399                           void * parm)
 400                       {
 401 konrad.r        1.107     MessageQueueService* service =
 402                                   reinterpret_cast<MessageQueueService*>(parm);
 403                           PEGASUS_ASSERT(service != 0);
 404 kumpf           1.103     try
 405                           {
 406                       
 407 mike            1.118         if (service->_die.get() != 0)
 408 kumpf           1.103         {
 409 denise.eckstein 1.116             service->_threads--;
 410 kumpf           1.103             return (0);
 411                               }
 412                               // pull messages off the incoming queue and dispatch them. then
 413                               // check pending messages that are non-blocking
 414                               AsyncOpNode *operation = 0;
 415                       
 416                               // many operations may have been queued.
 417                               do
 418                               {
 419                                   try
 420                                   {
 421                                       operation = service->_incoming.remove_first();
 422                                   }
 423                                   catch (ListClosed &)
 424                                   {
 425 kumpf           1.104                 // ATTN: This appears to be a common loop exit path.
 426                                       //PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 427                                       //    "Caught ListClosed exception.  Exiting _req_proc.");
 428 kumpf           1.103                 break;
 429                                   }
 430                       
 431                                   if (operation)
 432                                   {
 433                                      operation->_service_ptr = service;
 434                                      service->_handle_incoming_operation(operation);
 435                                   }
 436                               } while (operation);
 437                           }
 438                           catch (const Exception& e)
 439                           {
 440                               PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 441                                   String("Caught exception: \"") + e.getMessage() +
 442                                       "\".  Exiting _req_proc.");
 443                           }
 444                           catch (...)
 445                           {
 446                               PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
 447                                   "Caught unrecognized exception.  Exiting _req_proc.");
 448                           }
 449 konrad.r        1.107     service->_threads--;
 450 kumpf           1.103     return(0);
 451 mday            1.1   }
 452                       
 453 mday            1.43  
 454 kumpf           1.104 void MessageQueueService::_sendwait_callback(
 455                           AsyncOpNode *op,
 456                           MessageQueue *q,
 457                           void *parm)
 458 mday            1.33  {
 459                          op->_client_sem.signal();
 460                       }
 461                       
 462 mday            1.30  
 463                       // callback function is responsible for cleaning up all resources
 464                       // including op, op->_callback_node, and op->_callback_ptr
 465 mday            1.22  void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 466                       {
 467 kumpf           1.104    if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
 468 mday            1.39     {
 469                       
 470                             Message *msg = op->get_request();
 471 kumpf           1.104       if (msg && (msg->getMask() & message_mask::ha_async))
 472 mday            1.39        {
 473 kumpf           1.104          if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)
 474                                {
 475                                   AsyncLegacyOperationStart *wrapper =
 476                                      static_cast<AsyncLegacyOperationStart *>(msg);
 477                                   msg = wrapper->get_action();
 478                                   delete wrapper;
 479                                }
 480                                else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
 481                                {
 482                                   AsyncModuleOperationStart *wrapper =
 483                                      static_cast<AsyncModuleOperationStart *>(msg);
 484                                   msg = wrapper->get_action();
 485                                   delete wrapper;
 486                                }
 487                                else if (msg->getType() == async_messages::ASYNC_OP_START)
 488                                {
 489                                   AsyncOperationStart *wrapper =
 490                                      static_cast<AsyncOperationStart *>(msg);
 491                                   msg = wrapper->get_action();
 492                                   delete wrapper;
 493                                }
 494 kumpf           1.104          delete msg;
 495 mday            1.39        }
 496                       
 497                             msg = op->get_response();
 498 kumpf           1.104       if (msg && (msg->getMask() & message_mask::ha_async))
 499 mday            1.39        {
 500 kumpf           1.104          if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)
 501                                {
 502                                   AsyncLegacyOperationResult *wrapper =
 503                                      static_cast<AsyncLegacyOperationResult *>(msg);
 504                                   msg = wrapper->get_result();
 505                                   delete wrapper;
 506                                }
 507                                else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
 508                                {
 509                                   AsyncModuleOperationResult *wrapper =
 510                                      static_cast<AsyncModuleOperationResult *>(msg);
 511                                   msg = wrapper->get_result();
 512                                   delete wrapper;
 513                                }
 514 mday            1.39        }
 515                             void (*callback)(Message *, void *, void *) = op->__async_callback;
 516                             void *handle = op->_callback_handle;
 517                             void *parm = op->_callback_parameter;
 518                             op->release();
 519                             return_op(op);
 520                             callback(msg, handle, parm);
 521                          }
 522 kumpf           1.104    else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
 523 mday            1.39     {
 524 kumpf           1.104       // note that _callback_node may be different from op
 525                             // op->_callback_response_q is a "this" pointer we can use for
 526 mday            1.39        // static callback methods
 527                             op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 528                          }
 529 mday            1.22  }
 530                       
 531 mday            1.1   
 532 kumpf           1.104 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
 533 mday            1.1   {
 534 kumpf           1.104    if (operation != 0)
 535 mday            1.5      {
 536 kumpf           1.104 
 537                       // ATTN: optimization
 538 mday            1.22  // << Tue Feb 19 14:10:38 2002 mdd >>
 539 mday            1.6         operation->lock();
 540 kumpf           1.104 
 541 mday            1.6         Message *rq = operation->_request.next(0);
 542 kumpf           1.104 
 543 mday            1.31  // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 544 kumpf           1.104 // move this to the bottom of the loop when the majority of
 545                       // messages become async messages.
 546 mday            1.31  
 547 mday            1.18        // divert legacy messages to handleEnqueue
 548 mday            1.29        if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
 549 mday            1.18        {
 550 kumpf           1.104          rq = operation->_request.remove_first() ;
 551                                operation->unlock();
 552                                // delete the op node
 553                                operation->release();
 554                                return_op(operation);
 555 mday            1.24  
 556 kumpf           1.104          handleEnqueue(rq);
 557                                return;
 558 mday            1.18        }
 559                       
 560 kumpf           1.104       if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
 561                                  operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
 562                                 (operation->_state & ASYNC_OPSTATE_COMPLETE))
 563 mday            1.29        {
 564 kumpf           1.104          operation->unlock();
 565                                _handle_async_callback(operation);
 566 mday            1.29        }
 567 kumpf           1.104       else
 568 mday            1.29        {
 569 kumpf           1.104          PEGASUS_ASSERT(rq != 0);
 570                                operation->unlock();
 571                                _handle_async_request(static_cast<AsyncRequest *>(rq));
 572 mday            1.29        }
 573 mday            1.5      }
 574                          return;
 575 mday            1.1   }
 576                       
 577                       void MessageQueueService::_handle_async_request(AsyncRequest *req)
 578                       {
 579 kumpf           1.104    if (req != 0)
 580 mday            1.4      {
 581                             req->op->processing();
 582 kumpf           1.104 
 583 mday            1.4         Uint32 type = req->getType();
 584 kumpf           1.104       if (type == async_messages::HEARTBEAT)
 585                                handle_heartbeat_request(req);
 586 mday            1.4         else if (type == async_messages::IOCTL)
 587 kumpf           1.104          handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
 588 mday            1.4         else if (type == async_messages::CIMSERVICE_START)
 589 kumpf           1.104          handle_CimServiceStart(static_cast<CimServiceStart *>(req));
 590 mday            1.4         else if (type == async_messages::CIMSERVICE_STOP)
 591 kumpf           1.104          handle_CimServiceStop(static_cast<CimServiceStop *>(req));
 592 mday            1.4         else if (type == async_messages::CIMSERVICE_PAUSE)
 593 kumpf           1.104          handle_CimServicePause(static_cast<CimServicePause *>(req));
 594 mday            1.4         else if (type == async_messages::CIMSERVICE_RESUME)
 595 kumpf           1.104          handle_CimServiceResume(static_cast<CimServiceResume *>(req));
 596                             else if (type == async_messages::ASYNC_OP_START)
 597                                handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
 598                             else
 599 mday            1.4         {
 600 kumpf           1.104          // we don't handle this request message
 601                                _make_response(req, async_results::CIM_NAK);
 602 mday            1.4         }
 603 mday            1.1      }
 604                       }
 605                       
 606 mday            1.17  
 607                       Boolean MessageQueueService::_enqueueResponse(
 608 kumpf           1.104    Message* request,
 609 mday            1.17     Message* response)
 610                       {
 611 w.white         1.102 
 612                         STAT_COPYDISPATCHER
 613                       
 614 kumpf           1.37     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 615                                           "MessageQueueService::_enqueueResponse");
 616 mday            1.25  
 617 kumpf           1.104    if (request->getMask() & message_mask::ha_async)
 618 mday            1.25     {
 619 kumpf           1.104       if (response->getMask() & message_mask::ha_async)
 620 mday            1.25        {
 621 kumpf           1.104          _completeAsyncResponse(static_cast<AsyncRequest *>(request),
 622                                                       static_cast<AsyncReply *>(response),
 623                                                       ASYNC_OPSTATE_COMPLETE, 0);
 624 kumpf           1.37           PEG_METHOD_EXIT();
 625 kumpf           1.104          return true;
 626 mday            1.25        }
 627                          }
 628 kumpf           1.104 
 629                          if (request->_async != 0)
 630 mday            1.17     {
 631                             Uint32 mask = request->_async->getMask();
 632 kumpf           1.104       PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request));
 633                       
 634 mday            1.18        AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
 635                             AsyncOpNode *op = async->op;
 636                             request->_async = 0;
 637 mday            1.63        // the legacy request is going to be deleted by its handler
 638 kumpf           1.104       // remove it from the op node
 639 mday            1.63  
 640                             static_cast<AsyncLegacyOperationStart *>(async)->get_action();
 641 kumpf           1.104 
 642                             AsyncLegacyOperationResult *async_result =
 643                                new AsyncLegacyOperationResult(
 644                                   async->getKey(),
 645                                   async->getRouting(),
 646                                   op,
 647                                   response);
 648                             _completeAsyncResponse(
 649                                async,
 650                                async_result,
 651                                ASYNC_OPSTATE_COMPLETE,
 652                                0);
 653 kumpf           1.37        PEG_METHOD_EXIT();
 654 mday            1.18        return true;
 655 mday            1.17     }
 656 kumpf           1.104 
 657 mday            1.18     // ensure that the destination queue is in response->dest
 658 kumpf           1.37     PEG_METHOD_EXIT();
 659 mday            1.24     return SendForget(response);
 660 kumpf           1.104 
 661 mday            1.17  }
 662                       
 663 mday            1.18  void MessageQueueService::_make_response(Message *req, Uint32 code)
 664 mday            1.1   {
 665 mday            1.19     cimom::_make_response(req, code);
 666 mday            1.1   }
 667                       
 668                       
 669 kumpf           1.104 void MessageQueueService::_completeAsyncResponse(
 670                           AsyncRequest *request,
 671                           AsyncReply *reply,
 672                           Uint32 state,
 673                           Uint32 flag)
 674 mday            1.5   {
 675 kumpf           1.37     PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
 676                                           "MessageQueueService::_completeAsyncResponse");
 677                       
 678 mday            1.19     cimom::_completeAsyncResponse(request, reply, state, flag);
 679 kumpf           1.37  
 680                          PEG_METHOD_EXIT();
 681 mday            1.5   }
 682                       
 683                       
 684 kumpf           1.104 void MessageQueueService::_complete_op_node(
 685                           AsyncOpNode *op,
 686                           Uint32 state,
 687                           Uint32 flag,
 688                           Uint32 code)
 689 mday            1.32  {
 690                          cimom::_complete_op_node(op, state, flag, code);
 691                       }
 692                       
 693 mday            1.5   
 694                       Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 695                       {
 696 mike            1.118    if (_incoming_queue_shutdown.get() > 0)
 697 mday            1.8         return false;
 698 kumpf           1.104    if (_polling_thread == NULL)
 699                          {
 700                             _polling_thread = new Thread(
 701                                 polling_routine,
 702                                 reinterpret_cast<void *>(&_polling_list),
 703                                 false);
 704 konrad.r        1.113       ThreadStatus tr = PEGASUS_THREAD_OK;
 705                             while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
 706 a.arora         1.93        {
 707 denise.eckstein 1.116         if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
 708 konrad.r        1.113            pegasus_yield();
 709 denise.eckstein 1.116         else
 710                                  throw Exception(MessageLoaderParms("Common.MessageQueueService.NOT_ENOUGH_THREAD",
 711                                               "Could not allocate thread for the polling thread."));
 712 a.arora         1.93        }
 713 kumpf           1.104    }
 714                       // ATTN optimization remove the message checking altogether in the base
 715 mday            1.20  // << Mon Feb 18 14:02:20 2002 mdd >>
 716 mday            1.6      op->lock();
 717                          Message *rq = op->_request.next(0);
 718 mday            1.20     Message *rp = op->_response.next(0);
 719 mday            1.6      op->unlock();
 720 kumpf           1.104 
 721                          if ((rq != 0 && (true == messageOK(rq))) ||
 722 mike            1.118        (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
 723 mday            1.5      {
 724 mday            1.6         _incoming.insert_last_wait(op);
 725 mday            1.53        _polling_sem.signal();
 726 mday            1.5         return true;
 727                          }
 728                          return false;
 729                       }
 730                       
 731                       Boolean MessageQueueService::messageOK(const Message *msg)
 732                       {
 733 mike            1.118    if (_incoming_queue_shutdown.get() > 0)
 734 mday            1.8         return false;
 735 mday            1.18     return true;
 736                       }
 737                       
 738 mday            1.1   void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 739                       {
 740 kumpf           1.104    // default action is to echo a heartbeat response
 741                       
 742                          AsyncReply *reply = new AsyncReply(
 743                             async_messages::HEARTBEAT,
 744                             req->getKey(),
 745                             req->getRouting(),
 746                             0,
 747                             req->op,
 748                             async_results::OK,
 749                             req->resp,
 750                             false);
 751                          _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
 752 mday            1.1   }
 753                       
 754                       
 755                       void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
 756 kumpf           1.104 {
 757 mday            1.1   }
 758 kumpf           1.104 
 759 mday            1.1   void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
 760                       {
 761 kumpf           1.104    switch (req->ctl)
 762 mday            1.8      {
 763                             case AsyncIoctl::IO_CLOSE:
 764                             {
 765 kumpf           1.104          MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
 766 kumpf           1.81  
 767                       #ifdef MESSAGEQUEUESERVICE_DEBUG
 768 kumpf           1.104          PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
 769 kumpf           1.81  #endif
 770 kumpf           1.104 
 771                                // respond to this message. this is fire and forget, so we don't need to delete anything.
 772                                // this takes care of two problems that were being found
 773                                // << Thu Oct  9 10:52:48 2003 mdd >>
 774                                 _make_response(req, async_results::OK);
 775                                // ensure we do not accept any further messages
 776                       
 777                                // ensure we don't recurse on IO_CLOSE
 778 mike            1.118          if (_incoming_queue_shutdown.get() > 0)
 779 kumpf           1.104             break;
 780                       
 781                                // set the closing flag
 782                                service->_incoming_queue_shutdown = 1;
 783                                // empty out the queue
 784                                while (1)
 785                                {
 786                                   AsyncOpNode *operation;
 787                                   try
 788                                   {
 789                                      operation = service->_incoming.remove_first();
 790                                   }
 791                                   catch(IPCException &)
 792                                   {
 793                                      break;
 794                                   }
 795                                   if (operation)
 796                                   {
 797                                      operation->_service_ptr = service;
 798                                      service->_handle_incoming_operation(operation);
 799                                   }
 800 kumpf           1.104             else
 801                                      break;
 802                                } // message processing loop
 803                       
 804                                // shutdown the AsyncDQueue
 805                                service->_incoming.shutdown_queue();
 806                                return;
 807 mday            1.8         }
 808                       
 809                             default:
 810 kumpf           1.104          _make_response(req, async_results::CIM_NAK);
 811 mday            1.8      }
 812 mday            1.1   }
 813 mday            1.8   
 814 mday            1.1   void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
 815                       {
 816 mday            1.79  
 817 kumpf           1.81  #ifdef MESSAGEQUEUESERVICE_DEBUG
 818 mday            1.79     PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
 819 kumpf           1.81  #endif
 820 kumpf           1.104 
 821 mday            1.10     // clear the stoped bit and update
 822 mday            1.13     _capabilities &= (~(module_capabilities::stopped));
 823 mday            1.10     _make_response(req, async_results::OK);
 824 kumpf           1.104    // now tell the meta dispatcher we are stopped
 825 mday            1.10     update_service(_capabilities, _mask);
 826                       
 827 mday            1.1   }
 828                       void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
 829                       {
 830 kumpf           1.81  #ifdef MESSAGEQUEUESERVICE_DEBUG
 831 mday            1.79     PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
 832 kumpf           1.81  #endif
 833 mday            1.10     // set the stopeed bit and update
 834                          _capabilities |= module_capabilities::stopped;
 835                          _make_response(req, async_results::CIM_STOPPED);
 836 kumpf           1.104    // now tell the meta dispatcher we are stopped
 837 mday            1.10     update_service(_capabilities, _mask);
 838 mday            1.1   }
 839 kumpf           1.104 
 840 mday            1.1   void MessageQueueService::handle_CimServicePause(CimServicePause *req)
 841                       {
 842 mday            1.10     // set the paused bit and update
 843 mday            1.13     _capabilities |= module_capabilities::paused;
 844 mday            1.11     update_service(_capabilities, _mask);
 845 mday            1.10     _make_response(req, async_results::CIM_PAUSED);
 846 kumpf           1.104    // now tell the meta dispatcher we are stopped
 847 mday            1.1   }
 848 kumpf           1.104 
 849 mday            1.1   void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
 850                       {
 851 mday            1.10     // clear the paused  bit and update
 852 mday            1.13     _capabilities &= (~(module_capabilities::paused));
 853 mday            1.11     update_service(_capabilities, _mask);
 854 mday            1.10     _make_response(req, async_results::OK);
 855 kumpf           1.104    // now tell the meta dispatcher we are stopped
 856 mday            1.1   }
 857 kumpf           1.104 
 858 mday            1.1   void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
 859                       {
 860                          _make_response(req, async_results::CIM_NAK);
 861                       }
 862                       
 863                       void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
 864                       {
 865 mday            1.14     ;
 866                       }
 867                       
 868 mday            1.10  
 869 mday            1.14  void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
 870                       {
 871                          // remove the legacy message from the request and enqueue it to its destination
 872                          Uint32 result = async_results::CIM_NAK;
 873 kumpf           1.104 
 874 mday            1.25     Message *legacy = req->_act;
 875 kumpf           1.104    if (legacy != 0)
 876 mday            1.14     {
 877 mday            1.25        MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
 878 kumpf           1.104       if (queue != 0)
 879 mday            1.14        {
 880 kumpf           1.104          if (queue->isAsync() == true)
 881                                {
 882                                   (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
 883                                }
 884                                else
 885                                {
 886                                   // Enqueue the response:
 887                                   queue->enqueue(req->get_action());
 888                                }
 889                       
 890                                result = async_results::OK;
 891 mday            1.14        }
 892                          }
 893                          _make_response(req, result);
 894                       }
 895                       
 896                       void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
 897                       {
 898                          ;
 899 mday            1.1   }
 900                       
 901 kumpf           1.104 AsyncOpNode *MessageQueueService::get_op()
 902 mday            1.1   {
 903 mday            1.4      AsyncOpNode *op = new AsyncOpNode();
 904 kumpf           1.104 
 905 mday            1.9      op->_state = ASYNC_OPSTATE_UNKNOWN;
 906                          op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
 907 kumpf           1.104 
 908 mday            1.1      return op;
 909                       }
 910                       
 911                       void MessageQueueService::return_op(AsyncOpNode *op)
 912                       {
 913 kumpf           1.104    PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED);
 914 mday            1.4      delete op;
 915 mday            1.1   }
 916                       
 917 mday            1.18  
 918 kumpf           1.104 Boolean MessageQueueService::ForwardOp(
 919                           AsyncOpNode *op,
 920                           Uint32 destination)
 921 mday            1.29  {
 922 kumpf           1.104    PEGASUS_ASSERT(op != 0);
 923 mday            1.30     op->lock();
 924                          op->_op_dest = MessageQueue::lookup(destination);
 925 mday            1.29     op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
 926                          op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
 927 mday            1.30     op->unlock();
 928 kumpf           1.104    if (op->_op_dest == 0)
 929 mday            1.30        return false;
 930 kumpf           1.104 
 931 mday            1.29     return  _meta_dispatcher->route_async(op);
 932                       }
 933                       
 934 kumpf           1.104 
 935                       Boolean MessageQueueService::SendAsync(
 936                           AsyncOpNode *op,
 937                           Uint32 destination,
 938                           void (*callback)(AsyncOpNode *, MessageQueue *, void *),
 939                           MessageQueue *callback_response_q,
 940                           void *callback_ptr)
 941                       {
 942                          PEGASUS_ASSERT(op != 0 && callback != 0);
 943                       
 944 mday            1.21     // get the queue handle for the destination
 945                       
 946 mday            1.30     op->lock();
 947 mday            1.32     op->_op_dest = MessageQueue::lookup(destination); // destination of this message
 948 mday            1.22     op->_flags |= ASYNC_OPFLAGS_CALLBACK;
 949                          op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 950 mday            1.30     // initialize the callback data
 951 mday            1.32     op->_async_callback = callback;   // callback function to be executed by recpt. of response
 952                          op->_callback_node = op;          // the op node
 953                          op->_callback_response_q = callback_response_q;  // the queue that will receive the response
 954                          op->_callback_ptr = callback_ptr;   // user data for callback
 955                          op->_callback_request_q = this;     // I am the originator of this request
 956 kumpf           1.104 
 957 mday            1.30     op->unlock();
 958 kumpf           1.104    if (op->_op_dest == 0)
 959 mday            1.30        return false;
 960 kumpf           1.104 
 961 mday            1.21     return  _meta_dispatcher->route_async(op);
 962 mday            1.18  }
 963                       
 964                       
 965 kumpf           1.104 Boolean MessageQueueService::SendAsync(
 966                           Message *msg,
 967                           Uint32 destination,
 968                           void (*callback)(Message *response, void *handle, void *parameter),
 969                           void *handle,
 970                           void *parameter)
 971 mday            1.39  {
 972 kumpf           1.104    if (msg == NULL)
 973 mday            1.39        return false;
 974 kumpf           1.104    if (callback == NULL)
 975 mday            1.39        return SendForget(msg);
 976                          AsyncOpNode *op = get_op();
 977 mday            1.43     msg->dest = destination;
 978 kumpf           1.104    if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
 979 mday            1.39     {
 980                             op->release();
 981                             return_op(op);
 982                             return false;
 983                          }
 984                          op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
 985                          op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
 986                          op->_state &= ~ASYNC_OPSTATE_COMPLETE;
 987                          op->__async_callback = callback;
 988                          op->_callback_node = op;
 989                          op->_callback_handle = handle;
 990                          op->_callback_parameter = parameter;
 991                          op->_callback_response_q = this;
 992                       
 993 kumpf           1.104    if (!(msg->getMask() & message_mask::ha_async))
 994 mday            1.39     {
 995 kumpf           1.104       AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
 996                                get_next_xid(),
 997                                op,
 998                                destination,
 999                                msg,
1000                                destination);
1001 mday            1.39     }
1002 kumpf           1.104    else
1003 mday            1.39     {
1004                             op->_request.insert_first(msg);
1005                             (static_cast<AsyncMessage *>(msg))->op = op;
1006                          }
1007                          return _meta_dispatcher->route_async(op);
1008                       }
1009                       
1010                       
1011 mday            1.18  Boolean MessageQueueService::SendForget(Message *msg)
1012                       {
1013                          AsyncOpNode *op = 0;
1014 mday            1.22     Uint32 mask = msg->getMask();
1015 kumpf           1.104 
1016 mday            1.22     if (mask & message_mask::ha_async)
1017 mday            1.18     {
1018                             op = (static_cast<AsyncMessage *>(msg))->op ;
1019                          }
1020 mday            1.22  
1021 kumpf           1.104    if (op == 0)
1022 mday            1.20     {
1023 mday            1.18        op = get_op();
1024 mday            1.20        op->_request.insert_first(msg);
1025 mday            1.22        if (mask & message_mask::ha_async)
1026 kumpf           1.104       {
1027                                (static_cast<AsyncMessage *>(msg))->op = op;
1028                             }
1029 mday            1.20     }
1030 mday            1.30     op->_op_dest = MessageQueue::lookup(msg->dest);
1031 mday            1.22     op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
1032 kumpf           1.104    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
1033                              | ASYNC_OPFLAGS_SIMPLE_STATUS);
1034 mday            1.22     op->_state &= ~ASYNC_OPSTATE_COMPLETE;
1035 kumpf           1.104    if (op->_op_dest == 0)
1036 mday            1.39     {
1037                             op->release();
1038                             return_op(op);
1039 mday            1.21        return false;
1040 mday            1.39     }
1041 kumpf           1.104 
1042 mday            1.18     // now see if the meta dispatcher will take it
1043 mday            1.30     return  _meta_dispatcher->route_async(op);
1044 mday            1.18  }
1045 mday            1.2   
1046 mday            1.1   
1047 mday            1.4   AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
1048 mday            1.1   {
1049 kumpf           1.104    if (request == 0)
1050 mday            1.4         return 0 ;
1051 mday            1.5   
1052                          Boolean destroy_op = false;
1053 kumpf           1.104 
1054 s.hills         1.80     if (request->op == 0)
1055 mday            1.5      {
1056                             request->op = get_op();
1057 mday            1.7         request->op->_request.insert_first(request);
1058 mday            1.5         destroy_op = true;
1059                          }
1060 kumpf           1.104 
1061 mday            1.33     request->block = false;
1062 mday            1.35     request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
1063 kumpf           1.104    SendAsync(
1064                             request->op,
1065                             request->dest,
1066                             _sendwait_callback,
1067                             this,
1068                             (void *)0);
1069                       
1070 a.dunfey        1.118.2.1    request->op->_client_sem.wait();
1071                           
1072 baggio.br       1.100     
1073 mday            1.6          request->op->lock();
1074                              AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
1075                              rpl->op = 0;
1076                              request->op->unlock();
1077 kumpf           1.104     
1078                              if (destroy_op == true)
1079 mday            1.5          {
1080 mday            1.6             request->op->lock();
1081                                 request->op->_request.remove(request);
1082                                 request->op->_state |= ASYNC_OPSTATE_RELEASED;
1083                                 request->op->unlock();
1084                                 return_op(request->op);
1085 mday            1.7             request->op = 0;
1086 mday            1.5          }
1087                              return rpl;
1088 mday            1.1       }
1089                           
1090                           
1091 kumpf           1.104     Boolean MessageQueueService::register_service(
1092                               String name,
1093                               Uint32 capabilities,
1094                               Uint32 mask)
1095                           {
1096                              RegisterCimService *msg = new RegisterCimService(
1097                                 get_next_xid(),
1098                                 0,
1099                                 true,
1100                                 name,
1101                                 capabilities,
1102                                 mask,
1103                                 _queueId);
1104                              msg->dest = CIMOM_Q_ID;
1105 mday            1.1       
1106                              Boolean registered = false;
1107 kumpf           1.104        AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1108                           
1109                              if (reply != 0)
1110                              {
1111                                 if (reply->getMask() & message_mask::ha_async)
1112                                 {
1113                                    if (reply->getMask() & message_mask::ha_reply)
1114                                    {
1115                                       if (reply->result == async_results::OK ||
1116                                           reply->result == async_results::MODULE_ALREADY_REGISTERED)
1117                                       {
1118                                           registered = true;
1119                                       }
1120                                    }
1121 mday            1.1             }
1122 kumpf           1.104     
1123                                 delete reply;
1124 mday            1.1          }
1125 mday            1.5          delete msg;
1126 mday            1.1          return registered;
1127                           }
1128                           
1129                           Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1130                           {
1131 kumpf           1.104        UpdateCimService *msg = new UpdateCimService(
1132                                 get_next_xid(),
1133                                 0,
1134                                 true,
1135                                 _queueId,
1136                                 _capabilities,
1137                                 _mask);
1138 mday            1.1          Boolean registered = false;
1139 mday            1.2       
1140                              AsyncMessage *reply = SendWait(msg);
1141                              if (reply)
1142 mday            1.1          {
1143 kumpf           1.104           if (reply->getMask() & message_mask::ha_async)
1144 mday            1.1             {
1145 kumpf           1.104              if (reply->getMask() & message_mask::ha_reply)
1146                                    {
1147                                       if (static_cast<AsyncReply *>(reply)->result == async_results::OK)
1148                                       {
1149                                          registered = true;
1150                                       }
1151                                    }
1152 mday            1.1             }
1153                                 delete reply;
1154                              }
1155 mday            1.5          delete msg;
1156 mday            1.1          return registered;
1157                           }
1158                           
1159                           
1160 kumpf           1.104     Boolean MessageQueueService::deregister_service()
1161 mday            1.1       {
1162 mday            1.3       
1163 mday            1.5          _meta_dispatcher->deregister_module(_queueId);
1164                              return true;
1165 mday            1.1       }
1166                           
1167                           
1168 kumpf           1.104     void MessageQueueService::find_services(
1169                               String name,
1170                               Uint32 capabilities,
1171                               Uint32 mask,
1172                               Array<Uint32> *results)
1173 mday            1.1       {
1174 kumpf           1.104        if (results == 0)
1175                              {
1176 mday            1.1             throw NullPointer();
1177 kumpf           1.104        }
1178                           
1179 mday            1.1          results->clear();
1180 kumpf           1.104     
1181                              FindServiceQueue *req = new FindServiceQueue(
1182                                 get_next_xid(),
1183                                 0,
1184                                 _queueId,
1185                                 true,
1186                                 name,
1187                                 capabilities,
1188                                 mask);
1189                           
1190 mday            1.44         req->dest = CIMOM_Q_ID;
1191 kumpf           1.104     
1192                              AsyncMessage *reply = SendWait(req);
1193                              if (reply)
1194                              {
1195                                 if (reply->getMask() & message_mask::ha_async)
1196                                 {
1197                                    if (reply->getMask() & message_mask::ha_reply)
1198                                    {
1199                                       if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1200                                       {
1201                                          if ((static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK)
1202                                             *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1203                                       }
1204                                    }
1205 mday            1.1             }
1206                                 delete reply;
1207                              }
1208 mday            1.5          delete req;
1209 mday            1.1          return ;
1210                           }
1211                           
1212                           void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1213                           {
1214 kumpf           1.104        if (result == 0)
1215                              {
1216 mday            1.1             throw NullPointer();
1217 kumpf           1.104        }
1218                           
1219                              EnumerateService *req = new EnumerateService(
1220                                 get_next_xid(),
1221                                 0,
1222                                 _queueId,
1223                                 true,
1224                                 queue);
1225                           
1226 mday            1.2          AsyncMessage *reply = SendWait(req);
1227 kumpf           1.104     
1228 mday            1.2          if (reply)
1229 mday            1.1          {
1230                                 Boolean found = false;
1231 kumpf           1.104     
1232                                 if (reply->getMask() & message_mask::ha_async)
1233 mday            1.1             {
1234 kumpf           1.104              if (reply->getMask() & message_mask::ha_reply)
1235                                    {
1236                                       if (reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1237                                       {
1238                                          if ((static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK)
1239                                          {
1240                                             if (found == false)
1241                                             {
1242                                                found = true;
1243                           
1244                                                result->put_name((static_cast<EnumerateServiceResponse *>(reply))->name);
1245                                                result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1246                                                result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1247                                                result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1248                                             }
1249                                          }
1250                                       }
1251                                    }
1252 mday            1.1             }
1253                                 delete reply;
1254                              }
1255 mday            1.5          delete req;
1256 kumpf           1.104     
1257 mday            1.1          return;
1258                           }
1259                           
1260 kumpf           1.104     Uint32 MessageQueueService::get_next_xid()
1261 mday            1.1       {
1262 mday            1.78         static Mutex _monitor;
1263                              Uint32 value;
1264 kumpf           1.104        AutoMutex autoMut(_monitor);
1265 mday            1.1          _xid++;
1266 mike            1.118        value =  _xid.get();
1267 mday            1.78         return value;
1268 kumpf           1.104     
1269 mday            1.1       }
1270                           
1271                           PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2