(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.38 and 1.39

version 1.38, 2002/04/11 13:15:59 version 1.39, 2002/04/23 17:17:22
Line 46 
Line 46 
  
      _mask(mask),      _mask(mask),
      _die(0),      _die(0),
      _pending(true),  
      _incoming(true, 1000),      _incoming(true, 1000),
        _callback(true),
      _incoming_queue_shutdown(0),      _incoming_queue_shutdown(0),
      _req_thread(_req_proc, this, false)       _callback_ready(0),
        _req_thread(_req_proc, this, false),
        _callback_thread(_callback_proc, this, false)
 { {
    _capabilities = (capabilities | module_capabilities::async);    _capabilities = (capabilities | module_capabilities::async);
  
Line 80 
Line 82 
    }    }
  
    _meta_dispatcher_mutex.unlock();    _meta_dispatcher_mutex.unlock();
   //   _callback_thread.run();
  
    _req_thread.run();    _req_thread.run();
 } }
Line 94 
Line 97 
        _req_thread.join();        _req_thread.join();
    }    }
  
      _callback_ready.signal();
      _callback_thread.join();
   
    _meta_dispatcher_mutex.lock(pegasus_thread_self());    _meta_dispatcher_mutex.lock(pegasus_thread_self());
    _service_count--;    _service_count--;
    if (_service_count.value() == 0 )    if (_service_count.value() == 0 )
Line 161 
Line 167 
 } }
  
  
   PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
   {
      Thread *myself = reinterpret_cast<Thread *>(parm);
      MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
      AsyncOpNode *operation = 0;
   
      while ( service->_die.value() == 0 )
      {
         service->_callback_ready.wait();
   
         service->_callback.lock();
         operation = service->_callback.next(0);
         while( operation != NULL)
         {
            if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
            {
               operation = service->_callback.remove_no_lock(operation);
               PEGASUS_ASSERT(operation != NULL);
               operation->_thread_ptr = myself;
               operation->_service_ptr = service;
               service->_handle_async_callback(operation);
               break;
            }
            operation = service->_callback.next(operation);
         }
         service->_callback.unlock();
      }
      myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
      return(0);
   }
   
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm) PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
 { {
Line 181 
Line 218 
       {       {
          break;          break;
       }       }
         catch(Deadlock & )
         {
            cout << "Deadlock Exception in MessageQueueService::_req_proc" << endl;
            abort();
         }
         catch(AlreadyLocked & )
         {
            cout << "Already Locked Exception in MessageQueueService::_req_proc" << endl;
            abort();
         }
   
         catch(TimeOut & )
         {
            cout << "TimeOut Exception in MessageQueueService::_req_proc" << endl;
            abort();
         }
   
         catch(Permission & )
         {
            cout << "Permission Exception in MessageQueueService::_req_proc" << endl;
            abort();
         }
   
         catch(WaitFailed & )
         {
            cout << "WaitFailed Exception in MessageQueueService::_req_proc" << endl;
            abort();
         }
   
         catch(TooManyReaders & )
         {
            cout << "TooManyReaders Exception in MessageQueueService::_req_proc" << endl;
            abort();
         }
   
         catch(ListFull & )
         {
            cout << "ListFull Exception in MessageQueueService::_req_proc" << endl;
            abort();
         }
   
         catch(ListClosed & )
         {
            cout << "ListClosed Exception in MessageQueueService::_req_proc" << endl;
            abort();
         }
   
         catch(ModuleClosed & )
         {
            cout << "ModuleClosed Exception in MessageQueueService::_req_proc" << endl;
            abort();
         }
   
         catch(IPCException & )
         {
   //       << Tue Apr 23 10:21:17 2002 mdd >>
            cout << "IPC Exception in MessageQueueService::_req_proc" << endl;
            abort();
         }
   
       if( operation )       if( operation )
       {       {
          operation->_thread_ptr = myself;          operation->_thread_ptr = myself;
Line 205 
Line 302 
 // including op, op->_callback_node, and op->_callback_ptr // including op, op->_callback_node, and op->_callback_ptr
 void MessageQueueService::_handle_async_callback(AsyncOpNode *op) void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
 { {
      if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
      {
   
         Message *msg = op->get_request();
         if( msg && ( msg->getMask() & message_mask::ha_async))
         {
            if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
            {
               AsyncLegacyOperationStart *wrapper =
                  static_cast<AsyncLegacyOperationStart *>(msg);
               msg = wrapper->get_action();
               delete wrapper;
            }
            else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
            {
               AsyncModuleOperationStart *wrapper =
                  static_cast<AsyncModuleOperationStart *>(msg);
               msg = wrapper->get_action();
               delete wrapper;
            }
            else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
            {
               AsyncModuleOperationStart *wrapper =
                  static_cast<AsyncModuleOperationStart *>(msg);
               msg = wrapper->get_action();
               delete wrapper;
            }
            else if (msg->getType() == async_messages::ASYNC_OP_START)
            {
               AsyncOperationStart *wrapper =
                  static_cast<AsyncOperationStart *>(msg);
               msg = wrapper->get_action();
               delete wrapper;
            }
            delete msg;
         }
   
         msg = op->get_response();
         if( msg && ( msg->getMask() & message_mask::ha_async))
         {
            if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
            {
               AsyncLegacyOperationResult *wrapper =
                  static_cast<AsyncLegacyOperationResult *>(msg);
               msg = wrapper->get_result();
               delete wrapper;
            }
            else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
            {
               AsyncModuleOperationResult *wrapper =
                  static_cast<AsyncModuleOperationResult *>(msg);
               msg = wrapper->get_result();
               delete wrapper;
            }
         }
         void (*callback)(Message *, void *, void *) = op->__async_callback;
         void *handle = op->_callback_handle;
         void *parm = op->_callback_parameter;
         op->release();
         return_op(op);
         callback(msg, handle, parm);
      }
      else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
      {
    // note that _callback_node may be different from op    // note that _callback_node may be different from op
    // op->_callback_response_q is a "this" pointer we can use for    // op->_callback_response_q is a "this" pointer we can use for
    // static callback methods    // static callback methods
    op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);    op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
 } }
   }
  
  
 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation) void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
Line 235 
Line 397 
          rq = operation->_request.remove_first() ;          rq = operation->_request.remove_first() ;
          operation->unlock();          operation->unlock();
          // delete the op node          // delete the op node
          delete operation;           operation->release();
            return_op( operation);
  
          handleEnqueue(rq);          handleEnqueue(rq);
          return;          return;
       }       }
  
       if ( operation->_flags & ASYNC_OPFLAGS_CALLBACK &&        if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
               operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
            (operation->_state & ASYNC_OPSTATE_COMPLETE))            (operation->_state & ASYNC_OPSTATE_COMPLETE))
       {       {
          operation->unlock();          operation->unlock();
Line 650 
Line 814 
 } }
  
  
   Boolean MessageQueueService::SendAsync(Message *msg,
                                          Uint32 destination,
                                          void (*callback)(Message *response,
                                                           void *handle,
                                                           void *parameter),
                                          void *handle,
                                          void *parameter)
   {
      if(msg == NULL)
         return false;
      if(callback == NULL)
         return SendForget(msg);
      AsyncOpNode *op = get_op();
      if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
      {
         op->release();
         return_op(op);
         return false;
      }
      op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
      op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
      op->_state &= ~ASYNC_OPSTATE_COMPLETE;
      op->__async_callback = callback;
      op->_callback_node = op;
      op->_callback_handle = handle;
      op->_callback_parameter = parameter;
      op->_callback_response_q = this;
   
   
      if( ! (msg->getMask() & message_mask::ha_async) )
      {
         AsyncLegacyOperationStart *wrapper =
            new AsyncLegacyOperationStart(get_next_xid(),
                                          op,
                                          destination,
                                          msg,
                                          destination);
         msg = static_cast<Message *>(wrapper);
      }
      else
      {
         op->_request.insert_first(msg);
         (static_cast<AsyncMessage *>(msg))->op = op;
      }
   
      op->_callback_notify = &_callback_ready;
      _callback.insert_last(op);
   
      return _meta_dispatcher->route_async(op);
   }
   
   
 Boolean MessageQueueService::SendForget(Message *msg) Boolean MessageQueueService::SendForget(Message *msg)
 { {
  
Line 669 
Line 885 
       if (mask & message_mask::ha_async)       if (mask & message_mask::ha_async)
          (static_cast<AsyncMessage *>(msg))->op = op;          (static_cast<AsyncMessage *>(msg))->op = op;
    }    }
    op->lock();  
    op->_op_dest = MessageQueue::lookup(msg->dest);    op->_op_dest = MessageQueue::lookup(msg->dest);
    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);    op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);
    op->_state &= ~ASYNC_OPSTATE_COMPLETE;    op->_state &= ~ASYNC_OPSTATE_COMPLETE;
    op->unlock();  
    if ( op->_op_dest == 0 )    if ( op->_op_dest == 0 )
      {
         op->release();
         return_op(op);
       return false;       return false;
      }
  
    // now see if the meta dispatcher will take it    // now see if the meta dispatcher will take it
    return  _meta_dispatcher->route_async(op);    return  _meta_dispatcher->route_async(op);
Line 720 
Line 938 
       return_op(request->op);       return_op(request->op);
       request->op = 0;       request->op = 0;
    }    }
   
    return rpl;    return rpl;
 } }
  


Legend:
Removed from v.1.38  
changed lines
  Added in v.1.39

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2