(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

Diff for /pegasus/src/Pegasus/Common/MessageQueueService.cpp between version 1.131 and 1.140

version 1.131, 2006/11/10 18:14:58 version 1.140, 2008/08/19 17:20:21
Line 129 
Line 129 
                 if (rtn != PEGASUS_THREAD_OK )                 if (rtn != PEGASUS_THREAD_OK )
                 {                 {
                     service->_threads--;                     service->_threads--;
                     Logger::put(                      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL1,
                         Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,  
                         "Not enough threads to process this request. "  
                             "Skipping.");  
   
                     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,  
                         "Could not allocate thread for %s.  Queue has %d "                         "Could not allocate thread for %s.  Queue has %d "
                             "messages waiting and %d threads servicing."                             "messages waiting and %d threads servicing."
                             "Skipping the service for right now. ",                             "Skipping the service for right now. ",
                         service->getQueueName(),                         service->getQueueName(),
                         service->_incoming.count(),                         service->_incoming.count(),
                         service->_threads.get());                          service->_threads.get()));
  
                     Threads::yield();                     Threads::yield();
                     service = NULL;                     service = NULL;
Line 153 
Line 148 
         }         }
         list->unlock();         list->unlock();
     }     }
     myself->exit_self( (ThreadReturnType) 1 );      return ThreadReturnType(0);
     return 0;  
 } }
  
  
Line 190 
Line 184 
         max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;         max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
     }     }
  
     Tracer::trace(TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,      PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
        "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue);         "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue));
  
     AutoMutex autoMut(_meta_dispatcher_mutex);     AutoMutex autoMut(_meta_dispatcher_mutex);
  
Line 215 
Line 209 
             "Common.MessageQueueService.UNABLE_TO_REGISTER",             "Common.MessageQueueService.UNABLE_TO_REGISTER",
             "CIM base message queue service is unable to register with the "             "CIM base message queue service is unable to register with the "
                 "CIMOM dispatcher.");                 "CIMOM dispatcher.");
         throw BindFailedException(parms);          throw Exception(parms);
     }     }
  
     _get_polling_list()->insert_back(this);     _get_polling_list()->insert_back(this);
Line 317 
Line 311 
     msg->op->_request.reset(msg);     msg->op->_request.reset(msg);
     try     try
     {     {
         _incoming.enqueue_wait(msg->op);          _incoming.enqueue(msg->op);
         _polling_sem.signal();         _polling_sem.signal();
     }     }
     catch (const ListClosed&)     catch (const ListClosed&)
Line 372 
Line 366 
             catch (ListClosed&)             catch (ListClosed&)
             {             {
                 // ATTN: This appears to be a common loop exit path.                 // ATTN: This appears to be a common loop exit path.
                 //PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,                  //PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
                 //    "Caught ListClosed exception.  Exiting _req_proc.");                 //    "Caught ListClosed exception.  Exiting _req_proc.");
                 break;                 break;
             }             }
Line 386 
Line 380 
     }     }
     catch (const Exception& e)     catch (const Exception& e)
     {     {
         PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,          PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
             String("Caught exception: \"") + e.getMessage() +             String("Caught exception: \"") + e.getMessage() +
                 "\".  Exiting _req_proc.");                 "\".  Exiting _req_proc.");
     }     }
     catch (...)     catch (...)
     {     {
         PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,          PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
             "Caught unrecognized exception.  Exiting _req_proc.");             "Caught unrecognized exception.  Exiting _req_proc.");
     }     }
     service->_threads--;     service->_threads--;
Line 418 
Line 412 
         Message *msg = op->removeRequest();         Message *msg = op->removeRequest();
         if (msg && (msg->getMask() & MessageMask::ha_async))         if (msg && (msg->getMask() & MessageMask::ha_async))
         {         {
             if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)              if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_START)
             {             {
                 AsyncLegacyOperationStart *wrapper =                 AsyncLegacyOperationStart *wrapper =
                     static_cast<AsyncLegacyOperationStart *>(msg);                     static_cast<AsyncLegacyOperationStart *>(msg);
                 msg = wrapper->get_action();                 msg = wrapper->get_action();
                 delete wrapper;                 delete wrapper;
             }             }
             else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)              else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_START)
             {             {
                 AsyncModuleOperationStart *wrapper =                 AsyncModuleOperationStart *wrapper =
                     static_cast<AsyncModuleOperationStart *>(msg);                     static_cast<AsyncModuleOperationStart *>(msg);
                 msg = wrapper->get_action();                 msg = wrapper->get_action();
                 delete wrapper;                 delete wrapper;
             }             }
             else if (msg->getType() == async_messages::ASYNC_OP_START)              else if (msg->getType() == ASYNC_ASYNC_OP_START)
             {             {
                 AsyncOperationStart *wrapper =                 AsyncOperationStart *wrapper =
                     static_cast<AsyncOperationStart *>(msg);                     static_cast<AsyncOperationStart *>(msg);
Line 445 
Line 439 
         msg = op->removeResponse();         msg = op->removeResponse();
         if (msg && (msg->getMask() & MessageMask::ha_async))         if (msg && (msg->getMask() & MessageMask::ha_async))
         {         {
             if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)              if (msg->getType() == ASYNC_ASYNC_LEGACY_OP_RESULT)
             {             {
                 AsyncLegacyOperationResult *wrapper =                 AsyncLegacyOperationResult *wrapper =
                     static_cast<AsyncLegacyOperationResult *>(msg);                     static_cast<AsyncLegacyOperationResult *>(msg);
                 msg = wrapper->get_result();                 msg = wrapper->get_result();
                 delete wrapper;                 delete wrapper;
             }             }
             else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)              else if (msg->getType() == ASYNC_ASYNC_MODULE_OP_RESULT)
             {             {
                 AsyncModuleOperationResult *wrapper =                 AsyncModuleOperationResult *wrapper =
                     static_cast<AsyncModuleOperationResult *>(msg);                     static_cast<AsyncModuleOperationResult *>(msg);
Line 529 
Line 523 
     {     {
         req->op->processing();         req->op->processing();
  
         Uint32 type = req->getType();          MessageType type = req->getType();
         if (type == async_messages::HEARTBEAT)          if (type == ASYNC_HEARTBEAT)
             handle_heartbeat_request(req);             handle_heartbeat_request(req);
         else if (type == async_messages::IOCTL)          else if (type == ASYNC_IOCTL)
             handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));             handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
         else if (type == async_messages::CIMSERVICE_START)          else if (type == ASYNC_CIMSERVICE_START)
             handle_CimServiceStart(static_cast<CimServiceStart *>(req));             handle_CimServiceStart(static_cast<CimServiceStart *>(req));
         else if (type == async_messages::CIMSERVICE_STOP)          else if (type == ASYNC_CIMSERVICE_STOP)
             handle_CimServiceStop(static_cast<CimServiceStop *>(req));             handle_CimServiceStop(static_cast<CimServiceStop *>(req));
         else if (type == async_messages::CIMSERVICE_PAUSE)          else if (type == ASYNC_CIMSERVICE_PAUSE)
             handle_CimServicePause(static_cast<CimServicePause *>(req));             handle_CimServicePause(static_cast<CimServicePause *>(req));
         else if (type == async_messages::CIMSERVICE_RESUME)          else if (type == ASYNC_CIMSERVICE_RESUME)
             handle_CimServiceResume(static_cast<CimServiceResume *>(req));             handle_CimServiceResume(static_cast<CimServiceResume *>(req));
         else if (type == async_messages::ASYNC_OP_START)          else if (type == ASYNC_ASYNC_OP_START)
             handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));             handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
         else         else
         {         {
Line 573 
Line 567 
         }         }
     }     }
  
     if (request->_async != 0)      AsyncRequest* asyncRequest =
           static_cast<AsyncRequest*>(request->get_async());
   
       if (asyncRequest != 0)
     {     {
         Uint32 mask = request->_async->getMask();          PEGASUS_ASSERT(asyncRequest->getMask() &
         PEGASUS_ASSERT(mask &  
             (MessageMask::ha_async | MessageMask::ha_request));             (MessageMask::ha_async | MessageMask::ha_request));
  
         AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);          AsyncOpNode* op = asyncRequest->op;
         AsyncOpNode *op = async->op;  
         request->_async = 0;  
         // the legacy request is going to be deleted by its handler         // the legacy request is going to be deleted by its handler
         // remove it from the op node         // remove it from the op node
  
         static_cast<AsyncLegacyOperationStart *>(async)->get_action();          static_cast<AsyncLegacyOperationStart *>(asyncRequest)->get_action();
  
         AsyncLegacyOperationResult *async_result =         AsyncLegacyOperationResult *async_result =
             new AsyncLegacyOperationResult(             new AsyncLegacyOperationResult(
                 op,                 op,
                 response);                 response);
         _completeAsyncResponse(         _completeAsyncResponse(
             async,              asyncRequest,
             async_result,             async_result,
             ASYNC_OPSTATE_COMPLETE,             ASYNC_OPSTATE_COMPLETE,
             0);             0);
Line 667 
Line 662 
     if ((rq != 0 && (true == messageOK(rq))) ||     if ((rq != 0 && (true == messageOK(rq))) ||
         (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)         (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
     {     {
         _incoming.enqueue_wait(op);          _incoming.enqueue(op);
         _polling_sem.signal();         _polling_sem.signal();
         return true;         return true;
     }     }
Line 686 
Line 681 
     // default action is to echo a heartbeat response     // default action is to echo a heartbeat response
  
     AsyncReply *reply = new AsyncReply(     AsyncReply *reply = new AsyncReply(
         async_messages::HEARTBEAT,          ASYNC_HEARTBEAT,
         0,         0,
         req->op,         req->op,
         async_results::OK,         async_results::OK,
Line 1127 
Line 1122 
         {         {
             if (reply->getMask() & MessageMask::ha_reply)             if (reply->getMask() & MessageMask::ha_reply)
             {             {
                 if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)                  if (reply->getType() == ASYNC_FIND_SERVICE_Q_RESULT)
                 {                 {
                     if ((static_cast<FindServiceQueueResult*>(reply))->result ==                     if ((static_cast<FindServiceQueueResult*>(reply))->result ==
                             async_results::OK)                             async_results::OK)
Line 1166 
Line 1161 
         {         {
             if (reply->getMask() & MessageMask::ha_reply)             if (reply->getMask() & MessageMask::ha_reply)
             {             {
                 if (reply->getType() ==                  if (reply->getType() == ASYNC_ENUMERATE_SERVICE_RESULT)
                         async_messages::ENUMERATE_SERVICE_RESULT)  
                 {                 {
                     if ((static_cast<EnumerateServiceResponse*>(reply))->                     if ((static_cast<EnumerateServiceResponse*>(reply))->
                             result == async_results::OK)                             result == async_results::OK)


Legend:
Removed from v.1.131  
changed lines
  Added in v.1.140

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2