(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.119.12.1 and 1.123.2.1

version 1.119.12.1, 2006/06/29 17:33:12 version 1.123.2.1, 2006/07/27 23:11:51
Line 47 
Line 47 
  
 cimom *MessageQueueService::_meta_dispatcher = 0; cimom *MessageQueueService::_meta_dispatcher = 0;
 AtomicInt MessageQueueService::_service_count(0); AtomicInt MessageQueueService::_service_count(0);
 AtomicInt MessageQueueService::_xid(1);  
 Mutex MessageQueueService::_meta_dispatcher_mutex; Mutex MessageQueueService::_meta_dispatcher_mutex;
  
 static struct timeval deallocateWait = {300, 0}; static struct timeval deallocateWait = {300, 0};
  
 ThreadPool *MessageQueueService::_thread_pool = 0; ThreadPool *MessageQueueService::_thread_pool = 0;
  
 List<MessageQueueService, RMutex> MessageQueueService::_polling_list;  MessageQueueService::PollingList* MessageQueueService::_polling_list;
   Mutex MessageQueueService::_polling_list_mutex;
  
 Thread* MessageQueueService::_polling_thread = 0; Thread* MessageQueueService::_polling_thread = 0;
  
Line 78 
Line 78 
  
 Uint32 max_threads_per_svc_queue; Uint32 max_threads_per_svc_queue;
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL  ThreadReturnType PEGASUS_THREAD_CDECL
 MessageQueueService::kill_idle_threads(void *parm) MessageQueueService::kill_idle_threads(void *parm)
 { {
  
Line 100 
Line 100 
    }    }
  
 #ifdef PEGASUS_POINTER_64BIT #ifdef PEGASUS_POINTER_64BIT
    return (PEGASUS_THREAD_RETURN)(Uint64)dead_threads;     return (ThreadReturnType)(Uint64)dead_threads;
 #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX
    return (PEGASUS_THREAD_RETURN)(unsigned long)dead_threads;     return (ThreadReturnType)(unsigned long)dead_threads;
 #else #else
    return (PEGASUS_THREAD_RETURN)(Uint32)dead_threads;     return (ThreadReturnType)(Uint32)dead_threads;
 #endif #endif
 } }
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)  ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
 { {
    Thread *myself = reinterpret_cast<Thread *>(parm);    Thread *myself = reinterpret_cast<Thread *>(parm);
    List<MessageQueueService, RMutex> *list =     List<MessageQueueService, Mutex> *list =
        reinterpret_cast<List<MessageQueueService, RMutex>*>(myself->get_parm());         reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm());
  
    while (_stop_polling.get()  == 0)    while (_stop_polling.get()  == 0)
    {    {
Line 177 
Line 177 
                     service->_incoming.count(),                     service->_incoming.count(),
                     service->_threads.get());                     service->_threads.get());
  
                  pegasus_yield();                   Threads::yield();
                  service = NULL;                  service = NULL;
               }               }
           }           }
Line 208 
Line 208 
  
       }       }
    }    }
    myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );     myself->exit_self( (ThreadReturnType) 1 );
    return(0);    return(0);
 } }
  
Line 227 
Line 227 
      _mask(mask),      _mask(mask),
      _die(0),      _die(0),
      _threads(0),      _threads(0),
      _incoming(0),       _incoming(),
      _incoming_queue_shutdown(0)      _incoming_queue_shutdown(0)
 { {
  
Line 279 
Line 279 
       throw BindFailedException(parms);       throw BindFailedException(parms);
    }    }
  
    _polling_list.insert_back(this);     _get_polling_list()->insert_back(this);
  
 } }
  
Line 296 
Line 296 
  
    // ATTN: added to prevent assertion in List in which the list does not    // ATTN: added to prevent assertion in List in which the list does not
    // contain this element.    // contain this element.
    _polling_list.remove(this);  
      if (_get_polling_list()->contains(this))
          _get_polling_list()->remove(this);
  
    // ATTN: The code for closing the _incoming queue    // ATTN: The code for closing the _incoming queue
    // is not working correctly. In OpenPegasus 2.5,    // is not working correctly. In OpenPegasus 2.5,
Line 313 
Line 315 
  
    while (_threads.get() > 0)    while (_threads.get() > 0)
    {    {
       pegasus_yield();        Threads::yield();
    }    }
  
    {    {
Line 356 
Line 358 
       return;       return;
  
    AsyncIoctl *msg = new AsyncIoctl(    AsyncIoctl *msg = new AsyncIoctl(
       get_next_xid(),  
       0,       0,
       _queueId,       _queueId,
       _queueId,       _queueId,
Line 372 
Line 373 
    msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;    msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
  
    msg->op->_op_dest = this;    msg->op->_op_dest = this;
    msg->op->_request.insert_front(msg);     msg->op->_request.reset(msg);
    try {    try {
      _incoming.enqueue_wait(msg->op);      _incoming.enqueue_wait(msg->op);
      _polling_sem.signal();      _polling_sem.signal();
Line 401 
Line 402 
 } }
  
  
 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(  ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
     void * parm)     void * parm)
 { {
     MessageQueueService* service =     MessageQueueService* service =
Line 473 
Line 474 
    if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)    if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
    {    {
  
       Message *msg = op->get_request();        Message *msg = op->removeRequest();
       if (msg && (msg->getMask() & message_mask::ha_async))       if (msg && (msg->getMask() & message_mask::ha_async))
       {       {
          if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)          if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)
Line 500 
Line 501 
          delete msg;          delete msg;
       }       }
  
       msg = op->get_response();        msg = op->removeResponse();
       if (msg && (msg->getMask() & message_mask::ha_async))       if (msg && (msg->getMask() & message_mask::ha_async))
       {       {
          if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)          if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)
Line 544 
Line 545 
 // << Tue Feb 19 14:10:38 2002 mdd >> // << Tue Feb 19 14:10:38 2002 mdd >>
       operation->lock();       operation->lock();
  
       Message *rq = operation->_request.front();        Message *rq = operation->_request.get();
  
 // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>> // optimization <<< Thu Mar  7 21:04:05 2002 mdd >>>
 // move this to the bottom of the loop when the majority of // move this to the bottom of the loop when the majority of
Line 553 
Line 554 
       // divert legacy messages to handleEnqueue       // divert legacy messages to handleEnqueue
       if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))       if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
       {       {
          rq = operation->_request.remove_front() ;           operation->_request.release();
          operation->unlock();          operation->unlock();
          // delete the op node          // delete the op node
          operation->release();          operation->release();
Line 647 
Line 648 
  
       AsyncLegacyOperationResult *async_result =       AsyncLegacyOperationResult *async_result =
          new AsyncLegacyOperationResult(          new AsyncLegacyOperationResult(
             async->getKey(),  
             async->getRouting(),  
             op,             op,
             response);             response);
       _completeAsyncResponse(       _completeAsyncResponse(
Line 705 
Line 704 
    {    {
       _polling_thread = new Thread(       _polling_thread = new Thread(
           polling_routine,           polling_routine,
           reinterpret_cast<void *>(&_polling_list),            reinterpret_cast<void *>(_get_polling_list()),
           false);           false);
       ThreadStatus tr = PEGASUS_THREAD_OK;       ThreadStatus tr = PEGASUS_THREAD_OK;
       while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)       while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
       {       {
         if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)         if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
            pegasus_yield();             Threads::yield();
         else         else
            throw Exception(MessageLoaderParms("Common.MessageQueueService.NOT_ENOUGH_THREAD",            throw Exception(MessageLoaderParms("Common.MessageQueueService.NOT_ENOUGH_THREAD",
                         "Could not allocate thread for the polling thread."));                         "Could not allocate thread for the polling thread."));
Line 720 
Line 719 
 // ATTN optimization remove the message checking altogether in the base // ATTN optimization remove the message checking altogether in the base
 // << Mon Feb 18 14:02:20 2002 mdd >> // << Mon Feb 18 14:02:20 2002 mdd >>
    op->lock();    op->lock();
    Message *rq = op->_request.front();     Message *rq = op->_request.get();
    Message *rp = op->_response.front();     Message *rp = op->_response.get();
    op->unlock();    op->unlock();
  
    if ((rq != 0 && (true == messageOK(rq))) ||    if ((rq != 0 && (true == messageOK(rq))) ||
Line 747 
Line 746 
  
    AsyncReply *reply = new AsyncReply(    AsyncReply *reply = new AsyncReply(
       async_messages::HEARTBEAT,       async_messages::HEARTBEAT,
       req->getKey(),  
       req->getRouting(),  
       0,       0,
       req->op,       req->op,
       async_results::OK,       async_results::OK,
Line 808 
Line 805 
          } // message processing loop          } // message processing loop
  
          // shutdown the AsyncQueue          // shutdown the AsyncQueue
          service->_incoming.shutdown_queue();           service->_incoming.close();
          return;          return;
       }       }
  
Line 916 
Line 913 
  
 void MessageQueueService::return_op(AsyncOpNode *op) void MessageQueueService::return_op(AsyncOpNode *op)
 { {
    PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED);     PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);
    delete op;    delete op;
 } }
  
Line 999 
Line 996 
    if (!(msg->getMask() & message_mask::ha_async))    if (!(msg->getMask() & message_mask::ha_async))
    {    {
       AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(       AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
          get_next_xid(),  
          op,          op,
          destination,          destination,
          msg,          msg,
Line 1007 
Line 1003 
    }    }
    else    else
    {    {
       op->_request.insert_front(msg);        op->_request.reset(msg);
       (static_cast<AsyncMessage *>(msg))->op = op;       (static_cast<AsyncMessage *>(msg))->op = op;
    }    }
    return _meta_dispatcher->route_async(op);    return _meta_dispatcher->route_async(op);
Line 1027 
Line 1023 
    if (op == 0)    if (op == 0)
    {    {
       op = get_op();       op = get_op();
       op->_request.insert_front(msg);        op->_request.reset(msg);
       if (mask & message_mask::ha_async)       if (mask & message_mask::ha_async)
       {       {
          (static_cast<AsyncMessage *>(msg))->op = op;          (static_cast<AsyncMessage *>(msg))->op = op;
Line 1060 
Line 1056 
    if (request->op == 0)    if (request->op == 0)
    {    {
       request->op = get_op();       request->op = get_op();
       request->op->_request.insert_front(request);        request->op->_request.reset(request);
       destroy_op = true;       destroy_op = true;
    }    }
  
Line 1075 
Line 1071 
  
    request->op->_client_sem.wait();    request->op->_client_sem.wait();
  
    request->op->lock();     AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse());
    AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_front());  
    rpl->op = 0;    rpl->op = 0;
    request->op->unlock();  
  
    if (destroy_op == true)    if (destroy_op == true)
    {    {
       request->op->lock();       request->op->lock();
       request->op->_request.remove(request);        request->op->_request.release();
       request->op->_state |= ASYNC_OPSTATE_RELEASED;       request->op->_state |= ASYNC_OPSTATE_RELEASED;
       request->op->unlock();       request->op->unlock();
       return_op(request->op);       return_op(request->op);
Line 1099 
Line 1093 
     Uint32 mask)     Uint32 mask)
 { {
    RegisterCimService *msg = new RegisterCimService(    RegisterCimService *msg = new RegisterCimService(
       get_next_xid(),  
       0,       0,
       true,       true,
       name,       name,
Line 1134 
Line 1127 
 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask) Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
 { {
    UpdateCimService *msg = new UpdateCimService(    UpdateCimService *msg = new UpdateCimService(
       get_next_xid(),  
       0,       0,
       true,       true,
       _queueId,       _queueId,
Line 1184 
Line 1176 
    results->clear();    results->clear();
  
    FindServiceQueue *req = new FindServiceQueue(    FindServiceQueue *req = new FindServiceQueue(
       get_next_xid(),  
       0,       0,
       _queueId,       _queueId,
       true,       true,
Line 1222 
Line 1213 
    }    }
  
    EnumerateService *req = new EnumerateService(    EnumerateService *req = new EnumerateService(
       get_next_xid(),  
       0,       0,
       _queueId,       _queueId,
       true,       true,
Line 1262 
Line 1252 
    return;    return;
 } }
  
 Uint32 MessageQueueService::get_next_xid()  MessageQueueService::PollingList* MessageQueueService::_get_polling_list()
 { {
    static Mutex _monitor;      _polling_list_mutex.lock();
    Uint32 value;  
    AutoMutex autoMut(_monitor);      if (!_polling_list)
    _xid++;          _polling_list = new PollingList;
    value =  _xid.get();  
    return value;      _polling_list_mutex.unlock();
  
       return _polling_list;
 } }
  
 PEGASUS_NAMESPACE_END PEGASUS_NAMESPACE_END


Legend:
Removed from v.1.119.12.1  
changed lines
  Added in v.1.123.2.1

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2