(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.28.2.1 and 1.29

version 1.28.2.1, 2002/03/04 11:57:39 version 1.29, 2002/03/06 21:25:06
Line 154 
Line 154 
    Base::enqueue(msg);    Base::enqueue(msg);
  
    PEG_METHOD_EXIT();    PEG_METHOD_EXIT();
   
 //    PEGASUS_ASSERT(msg != 0 );  
   
 //    cout << "inside overriden enqueue" << endl;  
 //    if (!msg)  
 //     {  
 //        Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,  
 //         "MessageQueue::enqueue failure");  
 //        throw NullPointer();  
 //     }  
   
 //     if (getenv("PEGASUS_TRACE"))  
 //     {  
 //        cout << "===== " << getQueueName() << ": ";  
 //        msg->print(cout);  
 //     }  
   
 //    msg->dest = _queueId;  
   
 //    SendForget(msg);  
   
 } }
  
  
Line 225 
Line 204 
 // ATTN: optimization // ATTN: optimization
 // << Tue Feb 19 14:10:38 2002 mdd >> // << Tue Feb 19 14:10:38 2002 mdd >>
       operation->lock();       operation->lock();
       if ((operation->_state & ASYNC_OPFLAGS_CALLBACK) &&  
          (operation->_state & ASYNC_OPSTATE_COMPLETE))  
       {  
          operation->unlock();  
          _handle_async_callback(operation);  
       }  
  
       Message *rq = operation->_request.next(0);       Message *rq = operation->_request.next(0);
       PEGASUS_ASSERT(rq != 0 );  
  
       // divert "plain" legacy messages to handleEnqueue        // divert legacy messages to handleEnqueue
       if ( ! (rq->getMask() & message_mask::ha_async)  )        if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
       {       {
          rq = operation->_request.remove_first() ;          rq = operation->_request.remove_first() ;
          operation->unlock();          operation->unlock();
          // delete the op node          // delete the op node
          delete operation;          delete operation;
  
   
 //      Attn:  change to handleEnqueue(msg) when we have that method in all messagequeueservices  
 //             make handleEnqueue pure virtual !!!  
 //      << Fri Feb 22 13:39:09 2002 mdd >>  
   
          handleEnqueue(rq);          handleEnqueue(rq);
          return;          return;
       }       }
  
         if ( operation->_state & ASYNC_OPFLAGS_CALLBACK &&
              (operation->_state & ASYNC_OPSTATE_COMPLETE))
         {
            operation->unlock();
            _handle_async_callback(operation);
         }
         else
         {
            PEGASUS_ASSERT(rq != 0 );
            // ATTN: optimization
            // << Wed Mar  6 15:00:39 2002 mdd >>
            // put thread and queue into the asyncopnode structure.
            (static_cast<AsyncMessage *>(rq))->_myself = thread;
            (static_cast<AsyncMessage *>(rq))->_service = queue;
       operation->unlock();       operation->unlock();
       static_cast<AsyncMessage *>(rq)->_myself = thread;  
       static_cast<AsyncMessage *>(rq)->_service = queue;  
       _handle_async_request(static_cast<AsyncRequest *>(rq));       _handle_async_request(static_cast<AsyncRequest *>(rq));
    }    }
      }
    return;    return;
   
 } }
  
 void MessageQueueService::_handle_async_request(AsyncRequest *req) void MessageQueueService::_handle_async_request(AsyncRequest *req)
Line 383 
Line 361 
    return true;    return true;
 } }
  
   
   // made pure virtual
   // << Wed Mar  6 15:11:31 2002 mdd >>
 // void MessageQueueService::handleEnqueue(Message *msg) // void MessageQueueService::handleEnqueue(Message *msg)
 // { // {
   
   
 //    if ( msg ) //    if ( msg )
 //       delete msg; //       delete msg;
 // } // }
  
   // made pure virtual
   // << Wed Mar  6 15:11:56 2002 mdd >>
 // void MessageQueueService::handleEnqueue(void) // void MessageQueueService::handleEnqueue(void)
 // { // {
 //     Message *msg = dequeue(); //     Message *msg = dequeue();
Line 568 
Line 548 
 } }
  
  
 void MessageQueueService::ReplyAsync(AsyncOpNode *op,  Boolean MessageQueueService::ForwardOp(AsyncOpNode *op,
                 Uint32 destination)                 Uint32 destination)
 { {
    PEGASUS_ASSERT( op->_flags &  ASYNC_OPFLAGS_CALLBACK );     PEGASUS_ASSERT(op != 0 );
   
   
    // get the queue handle for the destination  
    if ( 0 == (op->_op_dest = MessageQueue::lookup(destination)))    if ( 0 == (op->_op_dest = MessageQueue::lookup(destination)))
    {        return false;
       delete op;  
    }  
   
    op->_response.next(0)->dest = destination;  
  
      op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
      op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
  
    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);     return  _meta_dispatcher->route_async(op);
    op->_state |= ASYNC_OPSTATE_COMPLETE;  
   
    if ( false == _meta_dispatcher->route_async(op) )  
       delete op;  
    return;  
 } }
  
  
Line 596 
Line 566 
                                        Uint32 destination,                                        Uint32 destination,
                                        void (*callback)(AsyncOpNode *,                                        void (*callback)(AsyncOpNode *,
                                                         MessageQueue *,                                                         MessageQueue *,
                                                         void *),                                                          void *))
                                        void * parm)  
 { {
    PEGASUS_ASSERT( callback != 0 && op != 0 );     PEGASUS_ASSERT(op != 0 && callback != 0 );
   
    op->_callback_ptr = parm;  
  
    // get the queue handle for the destination    // get the queue handle for the destination
    if ( 0 == (op->_op_dest = MessageQueue::lookup(destination)))    if ( 0 == (op->_op_dest = MessageQueue::lookup(destination)))
       return false;       return false;
    op->_request.next(0)->dest = destination;  
  
    op->_flags |= ASYNC_OPFLAGS_CALLBACK;    op->_flags |= ASYNC_OPFLAGS_CALLBACK;
    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);    op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
Line 615 
Line 581 
    return  _meta_dispatcher->route_async(op);    return  _meta_dispatcher->route_async(op);
 } }
  
 void MessageQueueService::ForwardRequest(AsyncOpNode *op, MessageQueue *dest)  
 {  
   
    Message *msg = op->_request.next(0);  
    if ( msg == 0 )  
    {  
       delete op;  
       return;  
    }  
    msg->_async = 0;  
    msg->dest = dest->getQueueId();  
    if(msg->getMask() & message_mask::ha_async)  
       (static_cast<AsyncMessage *>(msg))->op = 0;  
   
    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;  
    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);  
    op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
   
    op->_op_dest = dest;  
   
    // now see if the meta dispatcher will take it  
    if( false == _meta_dispatcher->route_async(op))  
       delete op;  
   
    return;  
 }  
   
   
 void MessageQueueService::ForwardResponse(AsyncOpNode *op, MessageQueue *dest)  
 {  
   
    Message *msg = op->_response.next(0);  
    if ( msg == 0 || dest == 0 )  
    {  
       delete op;  
       return;  
    }  
    msg->_async = 0;  
    msg->dest = dest->getQueueId();  
    if(msg->getMask() & message_mask::ha_async)  
       (static_cast<AsyncMessage *>(msg))->op = 0;  
   
    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;  
    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);  
    op->_state &= ~ASYNC_OPSTATE_COMPLETE;  
   
    op->_op_dest = dest;  
   
    // now see if the meta dispatcher will take it  
    if( false == _meta_dispatcher->route_async(op))  
       delete op;  
   
    return;  
 }  
  
 Boolean MessageQueueService::SendForget(Message *msg) Boolean MessageQueueService::SendForget(Message *msg)
 { {
Line 688 
Line 600 
       op->_request.insert_first(msg);       op->_request.insert_first(msg);
       if (mask & message_mask::ha_async)       if (mask & message_mask::ha_async)
          (static_cast<AsyncMessage *>(msg))->op = op;          (static_cast<AsyncMessage *>(msg))->op = op;
       else  
          msg->_async = 0;  
    }    }
    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);


Legend:
Removed from v.1.28.2.1  
changed lines
  Added in v.1.29

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2