(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.22 and 1.62

version 1.22, 2002/02/20 22:00:51 version 1.62, 2002/07/12 23:10:04
Line 1 
Line 1 
 //%////-*-c++-*-//////////////////////////////////////////////////////////////// //%////-*-c++-*-////////////////////////////////////////////////////////////////
 // //
 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM  // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
   // The Open Group, Tivoli Systems
 // //
 // Permission is hereby granted, free of charge, to any person obtaining a copy // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to // of this software and associated documentation files (the "Software"), to
Line 35 
Line 36 
 cimom *MessageQueueService::_meta_dispatcher = 0; cimom *MessageQueueService::_meta_dispatcher = 0;
 AtomicInt MessageQueueService::_service_count = 0; AtomicInt MessageQueueService::_service_count = 0;
 AtomicInt MessageQueueService::_xid(1); AtomicInt MessageQueueService::_xid(1);
 Mutex MessageQueueService::_meta_dispatcher_mutex  = Mutex();  Mutex MessageQueueService::_meta_dispatcher_mutex;
   
   static struct timeval create_time = {0, 1};
   static struct timeval destroy_time = {15, 0};
   static struct timeval deadlock_time = {0, 0};
   
   ThreadPool *MessageQueueService::_thread_pool = 0;
   
   DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
   
   Thread* MessageQueueService::_polling_thread = 0;
   
   
   int MessageQueueService::kill_idle_threads(void)
   {
      static struct timeval now, last;
      gettimeofday(&now, NULL);
      int dead_threads = 0;
   
      if( now.tv_sec - last.tv_sec > 0 )
      {
         gettimeofday(&last, NULL);
         try
         {
            dead_threads =  _thread_pool->kill_dead_threads();
         }
         catch(IPCException& )
         {
   
         }
      }
      return dead_threads;
   }
   
   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
   {
      Thread *myself = reinterpret_cast<Thread *>(parm);
      DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
      while ( _stop_polling.value()  == 0 )
      {
         _polling_sem.wait();
         if(_stop_polling.value() != 0 )
         {
            break;
         }
   
         list->lock();
         MessageQueueService *service = list->next(0);
         while(service != NULL)
         {
            if(service->_incoming.count() > 0 )
            {
               _thread_pool->allocate_and_awaken(service, _req_proc);
   //          service->_req_proc(service);
            }
            service = list->next(service);
         }
         list->unlock();
      }
      myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
      return(0);
   }
   
   
   Semaphore MessageQueueService::_polling_sem(0);
   AtomicInt MessageQueueService::_stop_polling(0);
  
  
 MessageQueueService::MessageQueueService(const char *name, MessageQueueService::MessageQueueService(const char *name,
Line 46 
Line 112 
  
      _mask(mask),      _mask(mask),
      _die(0),      _die(0),
      _pending(true),       _incoming(true, 0),
      _incoming(true, 1000),       _callback(true),
      _incoming_queue_shutdown(0),      _incoming_queue_shutdown(0),
      _req_thread(_req_proc, this, false)       _callback_ready(0),
        _req_thread(_req_proc, this, false),
        _callback_thread(_callback_proc, this, false)
   
 { {
   
    _capabilities = (capabilities | module_capabilities::async);    _capabilities = (capabilities | module_capabilities::async);
  
    _default_op_timeout.tv_sec = 30;    _default_op_timeout.tv_sec = 30;
Line 65 
Line 135 
       if (_meta_dispatcher == NULL )       if (_meta_dispatcher == NULL )
       {       {
          _meta_dispatcher_mutex.unlock();          _meta_dispatcher_mutex.unlock();
   
          throw NullPointer();          throw NullPointer();
       }       }
         _thread_pool = new ThreadPool(0, "MessageQueueService", 0, 0,
                                       create_time, destroy_time, deadlock_time);
  
         _polling_thread = new Thread(polling_routine,
                                      reinterpret_cast<void *>(&_polling_list),
                                      false);
         _polling_thread->run();
    }    }
    _service_count++;    _service_count++;
  
   
    if( false == register_service(name, _capabilities, _mask) )    if( false == register_service(name, _capabilities, _mask) )
    {    {
       _meta_dispatcher_mutex.unlock();       _meta_dispatcher_mutex.unlock();
       throw BindFailed("MessageQueueService Base Unable to register with  Meta Dispatcher");       throw BindFailed("MessageQueueService Base Unable to register with  Meta Dispatcher");
    }    }
  
      _polling_list.insert_last(this);
   
    _meta_dispatcher_mutex.unlock();    _meta_dispatcher_mutex.unlock();
   //   _callback_thread.run();
  
    _req_thread.run();  //   _req_thread.run();
 } }
  
  
Line 90 
Line 167 
    _die = 1;    _die = 1;
    if (_incoming_queue_shutdown.value() == 0 )    if (_incoming_queue_shutdown.value() == 0 )
    {    {
        _incoming.shutdown_queue();        _shutdown_incoming_queue();
        _req_thread.join();  
    }    }
      _callback_ready.signal();
   //   _callback_thread.join();
  
    _meta_dispatcher_mutex.lock(pegasus_thread_self());    _meta_dispatcher_mutex.lock(pegasus_thread_self());
    _service_count--;    _service_count--;
    if (_service_count.value() == 0 )    if (_service_count.value() == 0 )
    {    {
         _stop_polling++;
         _polling_sem.signal();
         _polling_thread->join();
         delete _polling_thread;
       _meta_dispatcher->_shutdown_routed_queue();       _meta_dispatcher->_shutdown_routed_queue();
       delete _meta_dispatcher;       delete _meta_dispatcher;
         _meta_dispatcher = 0;
   
    }    }
    _meta_dispatcher_mutex.unlock();    _meta_dispatcher_mutex.unlock();
      _polling_list.remove(this);
 } }
  
   
   
 void MessageQueueService::_shutdown_incoming_queue(void) void MessageQueueService::_shutdown_incoming_queue(void)
 { {
  
    if (_incoming_queue_shutdown.value() > 0 )    if (_incoming_queue_shutdown.value() > 0 )
       return ;       return ;
   
    AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),    AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
                                     0,                                     0,
                                     _queueId,                                     _queueId,
Line 123 
Line 204 
                                     0);                                     0);
  
    msg->op = get_op();    msg->op = get_op();
    msg->op->_request.insert_first(msg);     msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
      msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
                      | ASYNC_OPFLAGS_SIMPLE_STATUS);
      msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
   
    msg->op->_op_dest = this;    msg->op->_op_dest = this;
      msg->op->_request.insert_first(msg);
  
    _incoming.insert_last_wait(msg->op);    _incoming.insert_last_wait(msg->op);
    msg->op->_client_sem.wait();  
   
    msg->op->lock();  
    AsyncReply * reply = static_cast<AsyncReply *>(msg->op->_response.remove_first());  
    reply->op = 0;  
    msg->op->unlock();  
    delete reply;  
  
    msg->op->_request.remove(msg);  //   _req_thread.join();
    msg->op->_state |= ASYNC_OPSTATE_RELEASED;  
    return_op(msg->op);  
   
    msg->op = 0;  
    delete msg;  
    _req_thread.join();  
  
 } }
  
Line 149 
Line 222 
  
 void MessageQueueService::enqueue(Message *msg) throw(IPCException) void MessageQueueService::enqueue(Message *msg) throw(IPCException)
 { {
      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
   
    Base::enqueue(msg);    Base::enqueue(msg);
  
 //    PEGASUS_ASSERT(msg != 0 );     PEG_METHOD_EXIT();
   }
  
 //    cout << "inside overriden enqueue" << endl;  
 //        if (!msg)  
 //     {  
 //        Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,  
 //         "MessageQueue::enqueue failure");  
 //        throw NullPointer();  
 //     }  
  
 //     if (getenv("PEGASUS_TRACE"))  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
 //     {  {
 //        cout << "===== " << getQueueName() << ": ";     Thread *myself = reinterpret_cast<Thread *>(parm);
 //        msg->print(cout);     MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
 //     }     AsyncOpNode *operation = 0;
  
 //    msg->dest = _queueId;     while ( service->_die.value() == 0 )
 //    SendForget(msg);     {
         service->_callback_ready.wait();
  
         service->_callback.lock();
         operation = service->_callback.next(0);
         while( operation != NULL)
         {
            if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
            {
               operation = service->_callback.remove_no_lock(operation);
               PEGASUS_ASSERT(operation != NULL);
               operation->_thread_ptr = myself;
               operation->_service_ptr = service;
               service->_handle_async_callback(operation);
               break;
            }
            operation = service->_callback.next(operation);
         }
         service->_callback.unlock();
      }
      myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
      return(0);
 } }
   
  
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 { {
    Thread *myself = reinterpret_cast<Thread *>(parm);     MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
    MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());  
   
    // pull messages off the incoming queue and dispatch them. then    // pull messages off the incoming queue and dispatch them. then
    // check pending messages that are non-blocking    // check pending messages that are non-blocking
    AsyncOpNode *operation = 0;    AsyncOpNode *operation = 0;
  
    while ( service->_die.value() == 0 )     if ( service->_die.value() == 0 )
    {    {
       try       try
       {       {
          operation = service->_incoming.remove_first_wait();              operation = service->_incoming.remove_first();
       }       }
       catch(ListClosed & )       catch(ListClosed & )
       {       {
          break;              operation = 0;
   
               return(0);
       }       }
       if( operation )       if( operation )
       {       {
               operation->_service_ptr = service;
          service->_handle_incoming_operation(operation, myself, service);              service->_handle_incoming_operation(operation);
       }       }
    }    }
  
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );  
    return(0);    return(0);
 } }
  
   Uint32 MessageQueueService::get_pending_callback_count(void)
   {
      return _callback.count();
   }
   
   
   
   void MessageQueueService::_sendwait_callback(AsyncOpNode *op,
                                                MessageQueue *q,
                                                void *parm)
   {
      op->_client_sem.signal();
   }
   
   
   // callback function is responsible for cleaning up all resources
   // including op, op->_callback_node, and op->_callback_ptr
 void MessageQueueService::_handle_async_callback(AsyncOpNode *op) void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 { {
      if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
      {
   
         Message *msg = op->get_request();
         if( msg && ( msg->getMask() & message_mask::ha_async))
         {
            if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
            {
               AsyncLegacyOperationStart *wrapper =
                  static_cast<AsyncLegacyOperationStart *>(msg);
               msg = wrapper->get_action();
               delete wrapper;
            }
            else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
            {
               AsyncModuleOperationStart *wrapper =
                  static_cast<AsyncModuleOperationStart *>(msg);
               msg = wrapper->get_action();
               delete wrapper;
            }
            else if (msg->getType() == async_messages::ASYNC_OP_START)
            {
               AsyncOperationStart *wrapper =
                  static_cast<AsyncOperationStart *>(msg);
               msg = wrapper->get_action();
               delete wrapper;
            }
            delete msg;
         }
   
         msg = op->get_response();
         if( msg && ( msg->getMask() & message_mask::ha_async))
         {
            if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
            {
               AsyncLegacyOperationResult *wrapper =
                  static_cast<AsyncLegacyOperationResult *>(msg);
               msg = wrapper->get_result();
               delete wrapper;
            }
            else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
            {
               AsyncModuleOperationResult *wrapper =
                  static_cast<AsyncModuleOperationResult *>(msg);
               msg = wrapper->get_result();
               delete wrapper;
            }
         }
         void (*callback)(Message *, void *, void *) = op->__async_callback;
         void *handle = op->_callback_handle;
         void *parm = op->_callback_parameter;
         op->release();
    return_op(op);    return_op(op);
         callback(msg, handle, parm);
      }
      else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
      {
         // note that _callback_node may be different from op
         // op->_callback_response_q is a "this" pointer we can use for
         // static callback methods
         op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
      }
 } }
  
  
 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation,  void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
                                                      Thread *thread,  //                                                   Thread *thread,
                                                      MessageQueue *queue)  //                                                   MessageQueue *queue)
 { {
    if ( operation != 0 )    if ( operation != 0 )
    {    {
Line 220 
Line 385 
 // ATTN: optimization // ATTN: optimization
 // << Tue Feb 19 14:10:38 2002 mdd >> // << Tue Feb 19 14:10:38 2002 mdd >>
       operation->lock();       operation->lock();
       if ((operation->_state & ASYNC_OPFLAGS_CALLBACK) &&  
          (operation->_state & ASYNC_OPSTATE_COMPLETE))  
       {  
          operation->unlock();  
          _handle_async_callback(operation);  
       }  
  
       Message *rq = operation->_request.next(0);       Message *rq = operation->_request.next(0);
       PEGASUS_ASSERT(rq != 0 );  
   // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
   // move this to the bottom of the loop when the majority of
   // messages become async messages.
  
       // divert legacy messages to handleEnqueue       // divert legacy messages to handleEnqueue
       if ( ! (rq->getMask() & message_mask::ha_async) )        if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
       {       {
          rq = operation->_request.remove_first() ;          rq = operation->_request.remove_first() ;
          operation->unlock();          operation->unlock();
          // delete the op node          // delete the op node
          delete operation;           operation->release();
            return_op( operation);
   
          handleEnqueue(rq);          handleEnqueue(rq);
          return;          return;
       }       }
  
         if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
               operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
              (operation->_state & ASYNC_OPSTATE_COMPLETE))
         {
   
            operation->unlock();
            _handle_async_callback(operation);
         }
         else
         {
            PEGASUS_ASSERT(rq != 0 );
            // ATTN: optimization
            // << Wed Mar  6 15:00:39 2002 mdd >>
            // put thread and queue into the asyncopnode structure.
            //  (static_cast<AsyncMessage *>(rq))->_myself = operation->_thread_ptr;
            //   (static_cast<AsyncMessage *>(rq))->_service = operation->_service_ptr;
            // done << Tue Mar 12 14:49:07 2002 mdd >>
       operation->unlock();       operation->unlock();
       static_cast<AsyncMessage *>(rq)->_myself = thread;  
       static_cast<AsyncMessage *>(rq)->_service = queue;  
       _handle_async_request(static_cast<AsyncRequest *>(rq));       _handle_async_request(static_cast<AsyncRequest *>(rq));
    }    }
      }
    return;    return;
   
 } }
  
   
   
   
 void MessageQueueService::_handle_async_request(AsyncRequest *req) void MessageQueueService::_handle_async_request(AsyncRequest *req)
 { {
    if ( req != 0 )    if ( req != 0 )
Line 289 
Line 464 
    Message* response)    Message* response)
  
 { {
      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
                       "MessageQueueService::_enqueueResponse");
   
      if( request->getMask() & message_mask::ha_async)
      {
         if (response->getMask() & message_mask::ha_async )
         {
            _completeAsyncResponse(static_cast<AsyncRequest *>(request),
                                   static_cast<AsyncReply *>(response),
                                   ASYNC_OPSTATE_COMPLETE, 0 );
            PEG_METHOD_EXIT();
            return true;
         }
      }
   
    if(request->_async != 0 )    if(request->_async != 0 )
    {    {
       Uint32 mask = request->_async->getMask();       Uint32 mask = request->_async->getMask();
Line 297 
Line 487 
       AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);       AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
       AsyncOpNode *op = async->op;       AsyncOpNode *op = async->op;
       request->_async = 0;       request->_async = 0;
         // this request is probably going to be deleted !!
         // remove it from the op node
         op->_request.remove(request);
   
  
       AsyncLegacyOperationResult *async_result =       AsyncLegacyOperationResult *async_result =
          new AsyncLegacyOperationResult(          new AsyncLegacyOperationResult(
Line 308 
Line 502 
                              async_result,                              async_result,
                              ASYNC_OPSTATE_COMPLETE,                              ASYNC_OPSTATE_COMPLETE,
                              0);                              0);
         PEG_METHOD_EXIT();
       return true;       return true;
    }    }
  
    // ensure that the destination queue is in response->dest    // ensure that the destination queue is in response->dest
      PEG_METHOD_EXIT();
    return SendForget(response);    return SendForget(response);
  
 } }
Line 327 
Line 523 
                                                 Uint32 state,                                                 Uint32 state,
                                                 Uint32 flag)                                                 Uint32 flag)
 { {
      PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
                       "MessageQueueService::_completeAsyncResponse");
   
    cimom::_completeAsyncResponse(request, reply, state, flag);    cimom::_completeAsyncResponse(request, reply, state, flag);
   
      PEG_METHOD_EXIT();
 } }
  
  
   void MessageQueueService::_complete_op_node(AsyncOpNode *op,
                                               Uint32 state,
                                               Uint32 flag,
                                               Uint32 code)
   {
      cimom::_complete_op_node(op, state, flag, code);
   }
   
  
 Boolean MessageQueueService::accept_async(AsyncOpNode *op) Boolean MessageQueueService::accept_async(AsyncOpNode *op)
 { {
Line 348 
Line 557 
         _die.value() == 0  )         _die.value() == 0  )
    {    {
       _incoming.insert_last_wait(op);       _incoming.insert_last_wait(op);
         _polling_sem.signal();
       return true;       return true;
    }    }
   //    else
   //    {
   //       if(  (rq != 0 && (true == MessageQueueService::messageOK(rq))) ||
   //         (rp != 0 && ( true == MessageQueueService::messageOK(rp) )) &&
   //         _die.value() == 0)
   //       {
   //       MessageQueueService::_incoming.insert_last_wait(op);
   //       return true;
   //       }
   //    }
   
    return false;    return false;
 } }
  
Line 358 
Line 579 
    if (_incoming_queue_shutdown.value() > 0 )    if (_incoming_queue_shutdown.value() > 0 )
       return false;       return false;
    return true;    return true;
   
 } }
  
  
 void MessageQueueService::handleEnqueue(Message *msg)  // made pure virtual
 {  // << Wed Mar  6 15:11:31 2002 mdd >>
   // void MessageQueueService::handleEnqueue(Message *msg)
    if ( msg )  // {
       delete msg;  //    if ( msg )
   //       delete msg;
 }  // }
   
   
 void MessageQueueService::handleEnqueue(void)  
 {  
  
     Message *msg = dequeue();  // made pure virtual
     handleEnqueue(msg);  // << Wed Mar  6 15:11:56 2002 mdd >>
 }  // void MessageQueueService::handleEnqueue(void)
   // {
   //     Message *msg = dequeue();
   //     handleEnqueue(msg);
   // }
  
 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req) void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
 { {
Line 408 
Line 628 
       case AsyncIoctl::IO_CLOSE:       case AsyncIoctl::IO_CLOSE:
       {       {
          // save my bearings          // save my bearings
          Thread *myself = req->_myself;  //       Thread *myself = req->op->_thread_ptr;
          MessageQueueService *service = static_cast<MessageQueueService *>(req->_service);           MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
  
          // respond to this message.          // respond to this message.
          _make_response(req, async_results::OK);          _make_response(req, async_results::OK);
Line 435 
Line 655 
             }             }
             if( operation )             if( operation )
             {             {
                service->_handle_incoming_operation(operation, myself, service);  //             operation->_thread_ptr = myself;
                  operation->_service_ptr = service;
                  service->_handle_incoming_operation(operation);
             }             }
             else             else
                break;                break;
Line 444 
Line 666 
          // shutdown the AsyncDQueue          // shutdown the AsyncDQueue
          service->_incoming.shutdown_queue();          service->_incoming.shutdown_queue();
          // exit the thread !          // exit the thread !
          myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );  //       myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
          return;          return;
       }       }
  
Line 504 
Line 726 
    // remove the legacy message from the request and enqueue it to its destination    // remove the legacy message from the request and enqueue it to its destination
    Uint32 result = async_results::CIM_NAK;    Uint32 result = async_results::CIM_NAK;
  
    Message *legacy = req->act;     Message *legacy = req->_act;
    if ( legacy != 0 )    if ( legacy != 0 )
    {    {
       MessageQueue* queue = MessageQueue::lookup(req->legacy_destination);        MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
       if( queue != 0 )       if( queue != 0 )
       {       {
            if(queue->isAsync() == true )
            {
               (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
            }
            else
            {
         // Enqueue the response:         // Enqueue the response:
          queue->enqueue(legacy);              queue->enqueue(req->get_action());
            }
   
          result = async_results::OK;          result = async_results::OK;
       }       }
    }    }
Line 540 
Line 770 
 } }
  
  
   Boolean MessageQueueService::ForwardOp(AsyncOpNode *op,
                                          Uint32 destination)
   {
      PEGASUS_ASSERT(op != 0 );
      op->lock();
      op->_op_dest = MessageQueue::lookup(destination);
      op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
      op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
      op->unlock();
      if ( op->_op_dest == 0 )
         return false;
   
      return  _meta_dispatcher->route_async(op);
   }
   
   
 Boolean MessageQueueService::SendAsync(AsyncOpNode *op, Boolean MessageQueueService::SendAsync(AsyncOpNode *op,
                                        Uint32 destination,                                        Uint32 destination,
                                        void (*callback)(AsyncOpNode *,                                        void (*callback)(AsyncOpNode *,
                                                         MessageQueue *,                                                         MessageQueue *,
                                                         void *))                                                          void *),
                                          MessageQueue *callback_response_q,
                                          void *callback_ptr)
 { {
    PEGASUS_ASSERT(op != 0 && callback != 0 );    PEGASUS_ASSERT(op != 0 && callback != 0 );
  
    // get the queue handle for the destination    // get the queue handle for the destination
    if ( 0 == (op->_op_dest = MessageQueue::lookup(destination)))  
       return false;  
  
      op->lock();
      op->_op_dest = MessageQueue::lookup(destination); // destination of this message
    op->_flags |= ASYNC_OPFLAGS_CALLBACK;    op->_flags |= ASYNC_OPFLAGS_CALLBACK;
    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
    op->_state &= ~ASYNC_OPSTATE_COMPLETE;    op->_state &= ~ASYNC_OPSTATE_COMPLETE;
      // initialize the callback data
      op->_async_callback = callback;   // callback function to be executed by recpt. of response
      op->_callback_node = op;          // the op node
      op->_callback_response_q = callback_response_q;  // the queue that will receive the response
      op->_callback_ptr = callback_ptr;   // user data for callback
      op->_callback_request_q = this;     // I am the originator of this request
   
      op->unlock();
      if(op->_op_dest == 0)
         return false;
   
      return  _meta_dispatcher->route_async(op);
   }
   
   
   Boolean MessageQueueService::SendAsync(Message *msg,
                                          Uint32 destination,
                                          void (*callback)(Message *response,
                                                           void *handle,
                                                           void *parameter),
                                          void *handle,
                                          void *parameter)
   {
      if(msg == NULL)
         return false;
      if(callback == NULL)
         return SendForget(msg);
      AsyncOpNode *op = get_op();
      msg->dest = destination;
      if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
      {
         op->release();
         return_op(op);
         return false;
      }
      op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
      op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
      op->_state &= ~ASYNC_OPSTATE_COMPLETE;
      op->__async_callback = callback;
      op->_callback_node = op;
      op->_callback_handle = handle;
      op->_callback_parameter = parameter;
      op->_callback_response_q = this;
   
  
      if( ! (msg->getMask() & message_mask::ha_async) )
      {
         AsyncLegacyOperationStart *wrapper =
            new AsyncLegacyOperationStart(get_next_xid(),
                                          op,
                                          destination,
                                          msg,
                                          destination);
      }
      else
      {
         op->_request.insert_first(msg);
         (static_cast<AsyncMessage *>(msg))->op = op;
      }
  
      _callback.insert_last(op);
    return  _meta_dispatcher->route_async(op);    return  _meta_dispatcher->route_async(op);
 } }
  
Line 564 
Line 871 
 Boolean MessageQueueService::SendForget(Message *msg) Boolean MessageQueueService::SendForget(Message *msg)
 { {
  
   
    AsyncOpNode *op = 0;    AsyncOpNode *op = 0;
    Uint32 mask = msg->getMask();    Uint32 mask = msg->getMask();
  
Line 579 
Line 887 
       if (mask & message_mask::ha_async)       if (mask & message_mask::ha_async)
          (static_cast<AsyncMessage *>(msg))->op = op;          (static_cast<AsyncMessage *>(msg))->op = op;
    }    }
      op->_op_dest = MessageQueue::lookup(msg->dest);
    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);     op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
                      | ASYNC_OPFLAGS_SIMPLE_STATUS);
    op->_state &= ~ASYNC_OPSTATE_COMPLETE;    op->_state &= ~ASYNC_OPSTATE_COMPLETE;
      if ( op->_op_dest == 0 )
    // get the queue handle for the destination     {
    if ( 0 == (op->_op_dest = MessageQueue::lookup(msg->dest)))        op->release();
         return_op(op);
       return false;       return false;
      }
  
    // now see if the meta dispatcher will take it    // now see if the meta dispatcher will take it
    return  _meta_dispatcher->route_async(op);    return  _meta_dispatcher->route_async(op);
Line 607 
Line 918 
       destroy_op = true;       destroy_op = true;
    }    }
  
    request->block = true;     request->block = false;
    request->op->_state &= ~ASYNC_OPSTATE_COMPLETE;     request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
    request->op->_flags &= ~ASYNC_OPFLAGS_CALLBACK;     SendAsync(request->op,
                request->dest,
                _sendwait_callback,
                this,
                (void *)0);
  
    // get the queue handle for the destination  
    if ( 0 == (request->op->_op_dest = MessageQueue::lookup(request->dest)))  
       return 0;  
   
   
    // now see if the meta dispatcher will take it  
   
    if (true == _meta_dispatcher->route_async(request->op))  
    {  
       request->op->_client_sem.wait();       request->op->_client_sem.wait();
       PEGASUS_ASSERT(request->op->_state & ASYNC_OPSTATE_COMPLETE);  
   
    }  
   
    request->op->lock();    request->op->lock();
    AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());    AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
    rpl->op = 0;    rpl->op = 0;
Line 636 
Line 938 
       request->op->_request.remove(request);       request->op->_request.remove(request);
       request->op->_state |= ASYNC_OPSTATE_RELEASED;       request->op->_state |= ASYNC_OPSTATE_RELEASED;
       request->op->unlock();       request->op->unlock();
   
       return_op(request->op);       return_op(request->op);
       request->op = 0;       request->op = 0;
    }    }
   
    return rpl;    return rpl;
 } }
  
Line 657 
Line 957 
                                                     capabilities,                                                     capabilities,
                                                     mask,                                                     mask,
                                                     _queueId);                                                     _queueId);
      msg->dest = CIMOM_Q_ID;
   
    Boolean registered = false;    Boolean registered = false;
    AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));    AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
  
Line 736 
Line 1038 
                            capabilities,                            capabilities,
                            mask);                            mask);
  
      req->dest = CIMOM_Q_ID;
   
    AsyncMessage *reply = SendWait(req);    AsyncMessage *reply = SendWait(req);
    if(reply)    if(reply)
    {    {


Legend:
Removed from v.1.22  
changed lines
  Added in v.1.62

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2