(file) Return to MessageQueueService.cpp CVS log (file) (dir) Up to [Pegasus] / pegasus / src / Pegasus / Common

  1 mday  1.1 //%////-*-c++-*-////////////////////////////////////////////////////////////////
  2           //
  3           // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
  4           //
  5           // Permission is hereby granted, free of charge, to any person obtaining a copy
  6           // of this software and associated documentation files (the "Software"), to
  7           // deal in the Software without restriction, including without limitation the
  8           // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  9           // sell copies of the Software, and to permit persons to whom the Software is
 10           // furnished to do so, subject to the following conditions:
 11           //
 12           // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
 13           // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
 14           // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
 15           // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 16           // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
 17           // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
 18           // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
 19           // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
 20           //
 21           //==============================================================================
 22 mday  1.1 //
 23           // Author: Mike Day (mdday@us.ibm.com)
 24           //
 25           // Modified By:
 26           //
 27           //%/////////////////////////////////////////////////////////////////////////////
 28           
 29           #include "MessageQueueService.h"
 30 mday  1.22 #include <Pegasus/Common/Tracer.h>
 31 mday  1.1  
 32            PEGASUS_NAMESPACE_BEGIN
 33            
 34 mday  1.15  
 35            cimom *MessageQueueService::_meta_dispatcher = 0;
 36            AtomicInt MessageQueueService::_service_count = 0;
 37            AtomicInt MessageQueueService::_xid(1);
 38            Mutex MessageQueueService::_meta_dispatcher_mutex  = Mutex();
 39            
 40            
 41 mday  1.1  MessageQueueService::MessageQueueService(const char *name, 
 42            					 Uint32 queueID, 
 43            					 Uint32 capabilities, 
 44            					 Uint32 mask) 
 45 mday  1.15    : Base(name, true,  queueID),
 46 mday  1.22      
 47 mday  1.1       _mask(mask),
 48 mday  1.4       _die(0),
 49 mday  1.5       _pending(true), 
 50 mday  1.6       _incoming(true, 1000),
 51 mday  1.7       _incoming_queue_shutdown(0),
 52 mday  1.5       _req_thread(_req_proc, this, false)
 53 mday  1.1  { 
 54 mday  1.22    _capabilities = (capabilities | module_capabilities::async);
 55               
 56 mday  1.1     _default_op_timeout.tv_sec = 30;
 57               _default_op_timeout.tv_usec = 100;
 58 mday  1.15 
 59               _meta_dispatcher_mutex.lock(pegasus_thread_self());
 60               
 61               if( _meta_dispatcher == 0 )
 62               {
 63                  PEGASUS_ASSERT( _service_count.value() == 0 );
 64                  _meta_dispatcher = new cimom();
 65                  if (_meta_dispatcher == NULL )
 66                  {
 67            	 _meta_dispatcher_mutex.unlock();
 68            	 
 69            	 throw NullPointer();
 70                  }
 71                  
 72               }
 73               _service_count++;
 74            
 75            
 76               if( false == register_service(name, _capabilities, _mask) )
 77               {
 78                  _meta_dispatcher_mutex.unlock();
 79 mday  1.15       throw BindFailed("MessageQueueService Base Unable to register with  Meta Dispatcher");
 80               }
 81               
 82               _meta_dispatcher_mutex.unlock();
 83            
 84 mday  1.4     _req_thread.run();
 85 mday  1.1  }
 86            
 87 mday  1.4  
 88 mday  1.1  MessageQueueService::~MessageQueueService(void)
 89            {
 90               _die = 1;
 91 mday  1.7     if (_incoming_queue_shutdown.value() == 0 )
 92 mday  1.16    {
 93 mday  1.7         _incoming.shutdown_queue();
 94 mday  1.16        _req_thread.join();
 95               }
 96               
 97 mday  1.15    _meta_dispatcher_mutex.lock(pegasus_thread_self());
 98               _service_count--;
 99               if (_service_count.value() == 0 )
100               {
101                  _meta_dispatcher->_shutdown_routed_queue();
102                  delete _meta_dispatcher;
103               }
104               _meta_dispatcher_mutex.unlock();
105               
106 mday  1.1  }
107            
108 mday  1.15 
109 mday  1.1  
110 mday  1.7  void MessageQueueService::_shutdown_incoming_queue(void)
111            {
112               
113 mday  1.9     if (_incoming_queue_shutdown.value() > 0 )
114                  return ;
115               
116 mday  1.8     AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
117            				    0, 
118            				    _queueId, 
119            				    _queueId, 
120            				    true, 
121            				    AsyncIoctl::IO_CLOSE, 
122            				    0, 
123            				    0);
124 mday  1.9  
125 mday  1.8     msg->op = get_op();
126               msg->op->_request.insert_first(msg);
127 mday  1.21    msg->op->_op_dest = this;
128 mday  1.8     
129               _incoming.insert_last_wait(msg->op);
130               msg->op->_client_sem.wait();
131               
132               msg->op->lock();
133               AsyncReply * reply = static_cast<AsyncReply *>(msg->op->_response.remove_first());
134               reply->op = 0;
135               msg->op->unlock();
136 mday  1.9     delete reply; 
137 mday  1.8        
138               msg->op->_request.remove(msg);
139               msg->op->_state |= ASYNC_OPSTATE_RELEASED;
140               return_op(msg->op);
141 mday  1.9  
142 mday  1.8     msg->op = 0;
143               delete msg;
144 mday  1.16    _req_thread.join();
145               
146 mday  1.7  }
147            
148 mday  1.22 
149            
150            void MessageQueueService::enqueue(Message *msg) throw(IPCException)
151            {
152 kumpf 1.28    PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
153            
154 mday  1.22    Base::enqueue(msg);
155 kumpf 1.28 
156               PEG_METHOD_EXIT();
157 mday  1.22    
158            //    PEGASUS_ASSERT(msg != 0 );
159               
160            //    cout << "inside overriden enqueue" << endl;
161 mday  1.26 //    if (!msg) 
162 mday  1.22 //     {
163            //        Tracer::trace(TRC_DISPATCHER, Tracer::LEVEL3,
164            //         "MessageQueue::enqueue failure");
165            //        throw NullPointer();
166            //     }
167            
168            //     if (getenv("PEGASUS_TRACE"))
169            //     {
170            //        cout << "===== " << getQueueName() << ": ";
171            //        msg->print(cout);
172            //     }
173            
174            //    msg->dest = _queueId;
175 mday  1.26    
176 mday  1.22 //    SendForget(msg);
177               
178            }
179            
180            
181            
182 mday  1.5  PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
183 mday  1.1  {
184 mday  1.5     Thread *myself = reinterpret_cast<Thread *>(parm);
185               MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
186            
187               // pull messages off the incoming queue and dispatch them. then 
188               // check pending messages that are non-blocking
189 mday  1.7     AsyncOpNode *operation = 0;
190               
191 mday  1.6     while ( service->_die.value() == 0 ) 
192 mday  1.1     {
193 mday  1.7        try 
194                  {
195 mday  1.8  	 operation = service->_incoming.remove_first_wait();
196 mday  1.7        }
197                  catch(ListClosed & )
198                  {
199 mday  1.6  	 break;
200 mday  1.7        }
201                  if( operation )
202 mday  1.8        {
203            	 
204            	 service->_handle_incoming_operation(operation, myself, service);
205                  }
206 mday  1.1     }
207 mday  1.5     
208               myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
209               return(0);
210 mday  1.1  }
211            
212 mday  1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
213            {
214               return_op(op);
215            }
216            
217 mday  1.1  
218 mday  1.8  void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation, 
219            						     Thread *thread, 
220            						     MessageQueue *queue)
221 mday  1.1  {
222 mday  1.5     if ( operation != 0 )
223               {
224 mday  1.22 
225            // ATTN: optimization 
226            // << Tue Feb 19 14:10:38 2002 mdd >>
227 mday  1.6        operation->lock();
228 mday  1.22       if ((operation->_state & ASYNC_OPFLAGS_CALLBACK) &&  
229            	 (operation->_state & ASYNC_OPSTATE_COMPLETE))
230                  {
231            	 operation->unlock();
232            	 _handle_async_callback(operation);
233                  }
234                  
235 mday  1.6        Message *rq = operation->_request.next(0);
236 mday  1.18       PEGASUS_ASSERT(rq != 0 );
237 mday  1.28.2.1       
238                      // divert "plain" legacy messages to handleEnqueue
239                      if ( ! (rq->getMask() & message_mask::ha_async)  )
240 mday  1.18           {
241                	 rq = operation->_request.remove_first() ;
242                	 operation->unlock();
243                	 // delete the op node 
244                	 delete operation;
245 mday  1.24     
246 mday  1.28.2.1 
247 mday  1.24     //	Attn:  change to handleEnqueue(msg) when we have that method in all messagequeueservices
248                //             make handleEnqueue pure virtual !!!
249                //      << Fri Feb 22 13:39:09 2002 mdd >>
250                
251 mday  1.26     	 handleEnqueue(rq);
252 mday  1.18     	 return;
253                      }
254                
255 mday  1.6            operation->unlock();
256 mday  1.8            static_cast<AsyncMessage *>(rq)->_myself = thread;
257                      static_cast<AsyncMessage *>(rq)->_service = queue;
258 mday  1.5            _handle_async_request(static_cast<AsyncRequest *>(rq));
259                   }
260 mday  1.1         
261 mday  1.5         return;
262 mday  1.1         
263                }
264                
265                void MessageQueueService::_handle_async_request(AsyncRequest *req)
266                {
267 mday  1.4         if ( req != 0 )
268                   {
269                      req->op->processing();
270 mday  1.1         
271 mday  1.4            Uint32 type = req->getType();
272                      if( type == async_messages::HEARTBEAT )
273                	 handle_heartbeat_request(req);
274                      else if (type == async_messages::IOCTL)
275                	 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
276                      else if (type == async_messages::CIMSERVICE_START)
277                	 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
278                      else if (type == async_messages::CIMSERVICE_STOP)
279                	 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
280                      else if (type == async_messages::CIMSERVICE_PAUSE)
281                	 handle_CimServicePause(static_cast<CimServicePause *>(req));
282                      else if (type == async_messages::CIMSERVICE_RESUME)
283                	 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
284                      else if ( type == async_messages::ASYNC_OP_START)
285                	 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
286                      else 
287                      {
288                	 // we don't handle this request message 
289                	 _make_response(req, async_results::CIM_NAK );
290                      }
291 mday  1.1         }
292                }
293                
294 mday  1.17     
295                Boolean MessageQueueService::_enqueueResponse(
296                   Message* request, 
297                   Message* response)
298 mday  1.18        
299 mday  1.17     {
300 mday  1.25     
301                   if( request->getMask() & message_mask::ha_async)
302                   {
303                      if (response->getMask() & message_mask::ha_async )
304                      {
305                	 _completeAsyncResponse(static_cast<AsyncRequest *>(request), 
306                				static_cast<AsyncReply *>(response), 
307                				ASYNC_OPSTATE_COMPLETE, 0 );
308                	 return true;
309                      }
310                   }
311                   
312 mday  1.17        if(request->_async != 0 )
313                   {
314                      Uint32 mask = request->_async->getMask();
315 mday  1.18           PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
316                      
317                      AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
318                      AsyncOpNode *op = async->op;
319                      request->_async = 0;
320 mday  1.27           // this request is probably going to be deleted !!
321                      // remove it from the op node 
322                      op->_request.remove(request);
323 mday  1.18           
324                      AsyncLegacyOperationResult *async_result = 
325                	 new AsyncLegacyOperationResult( 
326                	    async->getKey(),
327                	    async->getRouting(),
328                	    op,
329                	    response);
330                      _completeAsyncResponse(async,
331                			     async_result,
332                			     ASYNC_OPSTATE_COMPLETE, 
333                			     0);
334                      return true;
335 mday  1.17        }
336 mday  1.18        
337                   // ensure that the destination queue is in response->dest
338 mday  1.24        return SendForget(response);
339 mday  1.18        
340 mday  1.17     }
341                
342 mday  1.18     void MessageQueueService::_make_response(Message *req, Uint32 code)
343 mday  1.1      {
344 mday  1.19        cimom::_make_response(req, code);
345 mday  1.1      }
346                
347                
348 mday  1.5      void MessageQueueService::_completeAsyncResponse(AsyncRequest *request, 
349                						AsyncReply *reply, 
350                						Uint32 state, 
351                						Uint32 flag)
352                {
353 mday  1.19        cimom::_completeAsyncResponse(request, reply, state, flag);
354 mday  1.5      }
355                
356                
357                
358                Boolean MessageQueueService::accept_async(AsyncOpNode *op)
359                {
360 mday  1.8         if (_incoming_queue_shutdown.value() > 0 )
361                      return false;
362                   
363 mday  1.20     // ATTN optimization remove the message checking altogether in the base 
364                // << Mon Feb 18 14:02:20 2002 mdd >>
365 mday  1.6         op->lock();
366                   Message *rq = op->_request.next(0);
367 mday  1.20        Message *rp = op->_response.next(0);
368 mday  1.6         op->unlock();
369                   
370 mday  1.22        if(  (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&  
371                	_die.value() == 0  )
372 mday  1.5         {
373 mday  1.6            _incoming.insert_last_wait(op);
374 mday  1.5            return true;
375                   }
376                   return false;
377                }
378                
379                Boolean MessageQueueService::messageOK(const Message *msg)
380                {
381 mday  1.8         if (_incoming_queue_shutdown.value() > 0 )
382                      return false;
383 mday  1.18        return true;
384                }
385                
386 mday  1.25     // void MessageQueueService::handleEnqueue(Message *msg)
387                // {
388 mday  1.22        
389 mday  1.24        
390 mday  1.25     //    if ( msg )
391                //       delete msg;
392                // }
393 mday  1.5      
394                
395 mday  1.25     // void MessageQueueService::handleEnqueue(void)
396                // {
397                //     Message *msg = dequeue();
398                //     handleEnqueue(msg);
399                // }
400 mday  1.5      
401 mday  1.1      void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
402                {
403                   // default action is to echo a heartbeat response 
404                   
405                   AsyncReply *reply = 
406                      new AsyncReply(async_messages::HEARTBEAT,
407                		     req->getKey(),
408                		     req->getRouting(),
409                		     0,
410                		     req->op, 
411                		     async_results::OK, 
412                		     req->resp,
413                		     false);
414 mday  1.4         _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
415 mday  1.1      }
416                
417                
418                void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
419                { 
420                   ;
421                }
422                      
423                void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
424                {
425 mday  1.8         
426                   switch( req->ctl )
427                   {
428                      case AsyncIoctl::IO_CLOSE:
429                      {
430                	 // save my bearings 
431                	 Thread *myself = req->_myself;
432                	 MessageQueueService *service = static_cast<MessageQueueService *>(req->_service);
433                	 
434                	 // respond to this message.
435                	 _make_response(req, async_results::OK);
436                	 // ensure we do not accept any further messages
437                
438                	 // ensure we don't recurse on IO_CLOSE
439                	 if( _incoming_queue_shutdown.value() > 0 )
440                	    break;
441                	 
442                	 // set the closing flag 
443                	 service->_incoming_queue_shutdown = 1;
444                	 // empty out the queue
445                	 while( 1 )
446 mday  1.8      	 {
447                	    AsyncOpNode *operation;
448                	    try 
449                	    {
450                	       operation = service->_incoming.remove_first();
451                	    }
452                	    catch(IPCException & )
453                	    {
454                	       break;
455                	    }
456                	    if( operation )
457                	    {
458                	       service->_handle_incoming_operation(operation, myself, service);
459                	    }
460                	    else
461                	       break;
462                	 } // message processing loop
463                
464                	 // shutdown the AsyncDQueue
465                	 service->_incoming.shutdown_queue();
466                	 // exit the thread ! 
467 mday  1.8      	 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
468                	 return;
469                      }
470                
471                      default:
472                	 _make_response(req, async_results::CIM_NAK);
473                   }
474 mday  1.1      }
475 mday  1.8      
476 mday  1.1      void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
477                {
478 mday  1.10        // clear the stoped bit and update
479 mday  1.13        _capabilities &= (~(module_capabilities::stopped));
480 mday  1.10        _make_response(req, async_results::OK);
481                   // now tell the meta dispatcher we are stopped 
482                   update_service(_capabilities, _mask);
483                
484 mday  1.1      }
485                void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
486                {
487 mday  1.10        // set the stopeed bit and update
488                   _capabilities |= module_capabilities::stopped;
489                   _make_response(req, async_results::CIM_STOPPED);
490                   // now tell the meta dispatcher we are stopped 
491                   update_service(_capabilities, _mask);
492                   
493 mday  1.1      }
494                void MessageQueueService::handle_CimServicePause(CimServicePause *req)
495                {
496 mday  1.10        // set the paused bit and update
497 mday  1.13        _capabilities |= module_capabilities::paused;
498 mday  1.11        update_service(_capabilities, _mask);
499 mday  1.10        _make_response(req, async_results::CIM_PAUSED);
500                   // now tell the meta dispatcher we are stopped 
501 mday  1.1      }
502                void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
503                {
504 mday  1.10        // clear the paused  bit and update
505 mday  1.13        _capabilities &= (~(module_capabilities::paused));
506 mday  1.11        update_service(_capabilities, _mask);
507 mday  1.10        _make_response(req, async_results::OK);
508                   // now tell the meta dispatcher we are stopped 
509 mday  1.1      }
510                      
511                void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
512                {
513                   _make_response(req, async_results::CIM_NAK);
514                }
515                
516                void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
517                {
518 mday  1.14        ;
519                }
520                
521 mday  1.10     
522 mday  1.14     void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
523                {
524                   // remove the legacy message from the request and enqueue it to its destination
525                   Uint32 result = async_results::CIM_NAK;
526                   
527 mday  1.25        Message *legacy = req->_act;
528 mday  1.14        if ( legacy != 0 )
529                   {
530 mday  1.25           MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
531 mday  1.14           if( queue != 0 )
532                      {
533 mday  1.25     	 if(queue->isAsync() == true )
534                	 {
535                	    (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
536                	 }
537                	 else 
538                	 {
539                	    // Enqueue the response:
540                	    queue->enqueue(req->get_action());
541                	 }
542                	 
543 mday  1.14     	 result = async_results::OK;
544                      }
545                   }
546                   _make_response(req, result);
547                }
548                
549                void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
550                {
551                   ;
552 mday  1.1      }
553                
554                AsyncOpNode *MessageQueueService::get_op(void)
555                {
556 mday  1.4         AsyncOpNode *op = new AsyncOpNode();
557 mday  1.1         
558 mday  1.9         op->_state = ASYNC_OPSTATE_UNKNOWN;
559                   op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
560 mday  1.4         
561 mday  1.1         return op;
562                }
563                
564                void MessageQueueService::return_op(AsyncOpNode *op)
565                {
566                   PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
567 mday  1.4         delete op;
568 mday  1.1      }
569                
570 mday  1.18     
571 mday  1.28.2.1 void MessageQueueService::ReplyAsync(AsyncOpNode *op, 
572                		Uint32 destination)
573                {
574                   PEGASUS_ASSERT( op->_flags &  ASYNC_OPFLAGS_CALLBACK );
575                   
576                
577                   // get the queue handle for the destination
578                   if ( 0 == (op->_op_dest = MessageQueue::lookup(destination)))
579                   {
580                      delete op;
581                   }
582                   
583                   op->_response.next(0)->dest = destination;
584                   
585                
586                   op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
587                   op->_state |= ASYNC_OPSTATE_COMPLETE;
588                   
589                   if ( false == _meta_dispatcher->route_async(op) )
590                      delete op;
591                   return;
592 mday  1.28.2.1 }
593                
594                
595 mday  1.21     Boolean MessageQueueService::SendAsync(AsyncOpNode *op, 
596                				       Uint32 destination,
597 mday  1.18     				       void (*callback)(AsyncOpNode *, 
598 mday  1.21     							MessageQueue *, 
599 mday  1.28.2.1 							void *), 
600                				       void * parm)
601 mday  1.20     { 
602 mday  1.28.2.1    PEGASUS_ASSERT( callback != 0 && op != 0 );
603                      
604                   op->_callback_ptr = parm;
605 mday  1.18        
606 mday  1.21        // get the queue handle for the destination
607                   if ( 0 == (op->_op_dest = MessageQueue::lookup(destination)))
608                      return false;
609 mday  1.28.2.1    op->_request.next(0)->dest = destination;
610                   
611 mday  1.22        op->_flags |= ASYNC_OPFLAGS_CALLBACK;
612                   op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
613                   op->_state &= ~ASYNC_OPSTATE_COMPLETE;
614 mday  1.21        
615                   return  _meta_dispatcher->route_async(op);
616 mday  1.18     }
617                
618 mday  1.28.2.1 void MessageQueueService::ForwardRequest(AsyncOpNode *op, MessageQueue *dest)
619                {
620                
621                   Message *msg = op->_request.next(0);
622                   if ( msg == 0 )
623                   {
624                      delete op;
625                      return;
626                   }
627                   msg->_async = 0;
628                   msg->dest = dest->getQueueId();
629                   if(msg->getMask() & message_mask::ha_async)
630                      (static_cast<AsyncMessage *>(msg))->op = 0;
631                   
632                   op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
633                   op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);
634                   op->_state &= ~ASYNC_OPSTATE_COMPLETE;
635                   
636                   op->_op_dest = dest;
637                   
638                   // now see if the meta dispatcher will take it
639 mday  1.28.2.1    if( false == _meta_dispatcher->route_async(op))
640                      delete op;
641                   
642                   return;
643                }
644                
645                
646                void MessageQueueService::ForwardResponse(AsyncOpNode *op, MessageQueue *dest)
647                {
648                
649                   Message *msg = op->_response.next(0);
650                   if ( msg == 0 || dest == 0 )
651                   {
652                      delete op;
653                      return;
654                   }
655                   msg->_async = 0;
656                   msg->dest = dest->getQueueId();
657                   if(msg->getMask() & message_mask::ha_async)
658                      (static_cast<AsyncMessage *>(msg))->op = 0;
659                   
660 mday  1.28.2.1    op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
661                   op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);
662                   op->_state &= ~ASYNC_OPSTATE_COMPLETE;
663                   
664                   op->_op_dest = dest;
665                   
666                   // now see if the meta dispatcher will take it
667                   if( false == _meta_dispatcher->route_async(op))
668                      delete op;
669                   
670                   return;
671                }
672 mday  1.18     
673                Boolean MessageQueueService::SendForget(Message *msg)
674                {
675                
676 mday  1.24        
677 mday  1.18        AsyncOpNode *op = 0;
678 mday  1.22        Uint32 mask = msg->getMask();
679                   
680                   if (mask & message_mask::ha_async)
681 mday  1.18        {
682                      op = (static_cast<AsyncMessage *>(msg))->op ;
683                   }
684 mday  1.22     
685 mday  1.18        if( op == 0 )
686 mday  1.20        {
687 mday  1.18           op = get_op();
688 mday  1.20           op->_request.insert_first(msg);
689 mday  1.22           if (mask & message_mask::ha_async)
690                	 (static_cast<AsyncMessage *>(msg))->op = op;
691 mday  1.28.2.1       else 
692                	 msg->_async = 0;
693 mday  1.20        }
694 mday  1.22        op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
695                   op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SIMPLE_STATUS);
696                   op->_state &= ~ASYNC_OPSTATE_COMPLETE;
697 mday  1.18        
698 mday  1.21        // get the queue handle for the destination
699                   if ( 0 == (op->_op_dest = MessageQueue::lookup(msg->dest)))
700 mday  1.24        {
701 mday  1.21           return false;
702 mday  1.24        }
703                   
704 mday  1.18        // now see if the meta dispatcher will take it
705 mday  1.24        Boolean return_code = _meta_dispatcher->route_async(op);
706                   return  return_code;
707                   
708                
709 mday  1.18     }
710 mday  1.2      
711 mday  1.1      
712 mday  1.4      AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
713 mday  1.1      {
714 mday  1.4         if ( request == 0 )
715                      return 0 ;
716 mday  1.5      
717                   Boolean destroy_op = false;
718                   
719                   if (request->op == false)
720                   {
721                      request->op = get_op();
722 mday  1.7            request->op->_request.insert_first(request);
723 mday  1.5            destroy_op = true;
724                   }
725 mday  1.4         
726                   request->block = true;
727 mday  1.22        request->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
728                   request->op->_flags &= ~ASYNC_OPFLAGS_CALLBACK; 
729 mday  1.1         
730 mday  1.21        // get the queue handle for the destination
731                   if ( 0 == (request->op->_op_dest = MessageQueue::lookup(request->dest)))
732                      return 0;
733                   
734 mday  1.2         
735 mday  1.4         // now see if the meta dispatcher will take it
736 mday  1.2      
737 mday  1.4         if (true == _meta_dispatcher->route_async(request->op))
738 mday  1.1         {
739 mday  1.4            request->op->_client_sem.wait();
740 mday  1.6            PEGASUS_ASSERT(request->op->_state & ASYNC_OPSTATE_COMPLETE);
741                      
742 mday  1.1         }
743 mday  1.4         
744 mday  1.6         request->op->lock();
745                   AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
746                   rpl->op = 0;
747                   request->op->unlock();
748                   
749 mday  1.5         if( destroy_op == true)
750                   {
751 mday  1.6            request->op->lock();
752                      request->op->_request.remove(request);
753                      request->op->_state |= ASYNC_OPSTATE_RELEASED;
754                      request->op->unlock();
755                      
756                      return_op(request->op);
757 mday  1.7            request->op = 0;
758 mday  1.5         }
759                   
760                   return rpl;
761 mday  1.1      }
762                
763                
764                Boolean MessageQueueService::register_service(String name, 
765                					      Uint32 capabilities, 
766                					      Uint32 mask)
767                
768                {
769                   RegisterCimService *msg = new RegisterCimService(get_next_xid(),
770 mday  1.5      						    0, 
771 mday  1.1      						    true, 
772                						    name, 
773                						    capabilities, 
774                						    mask,
775                						    _queueId);
776                   Boolean registered = false;
777 mday  1.7         AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
778 mday  1.1         
779 mday  1.2         if ( reply != 0 )
780 mday  1.1         {
781                      if(reply->getMask() & message_mask:: ha_async)
782                      {
783                	 if(reply->getMask() & message_mask::ha_reply)
784                	 {
785 mday  1.15     	    if(reply->result == async_results::OK || 
786                	       reply->result == async_results::MODULE_ALREADY_REGISTERED )
787 mday  1.1      	       registered = true;
788                	 }
789                      }
790                      
791 mday  1.7            delete reply; 
792 mday  1.1         }
793 mday  1.5         delete msg;
794 mday  1.1         return registered;
795                }
796                
797                Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
798                {
799                   
800                   
801                   UpdateCimService *msg = new UpdateCimService(get_next_xid(), 
802 mday  1.5      						0, 
803 mday  1.1      						true, 
804                						_queueId,
805                						_capabilities, 
806                						_mask);
807                   Boolean registered = false;
808 mday  1.2      
809                   AsyncMessage *reply = SendWait(msg);
810                   if (reply)
811 mday  1.1         {
812                      if(reply->getMask() & message_mask:: ha_async)
813                      {
814                	 if(reply->getMask() & message_mask::ha_reply)
815                	 {
816 mday  1.2      	    if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
817 mday  1.1      	       registered = true;
818                	 }
819                      }
820                      delete reply;
821                   }
822 mday  1.5         delete msg;
823 mday  1.1         return registered;
824                }
825                
826                
827                Boolean MessageQueueService::deregister_service(void)
828                {
829 mday  1.3      
830 mday  1.5         _meta_dispatcher->deregister_module(_queueId);
831                   return true;
832 mday  1.1      }
833                
834                
835                void MessageQueueService::find_services(String name, 
836                					Uint32 capabilities, 
837                					Uint32 mask, 
838                					Array<Uint32> *results)
839                {
840                   
841                   if( results == 0 )
842                      throw NullPointer();
843 mday  1.5          
844 mday  1.1         results->clear();
845                   
846                   FindServiceQueue *req = 
847                      new FindServiceQueue(get_next_xid(), 
848 mday  1.5      			   0, 
849 mday  1.1      			   _queueId, 
850                			   true, 
851                			   name, 
852                			   capabilities, 
853                			   mask);
854                   
855 mday  1.2         AsyncMessage *reply = SendWait(req); 
856                   if(reply)
857 mday  1.1         {
858                      if( reply->getMask() & message_mask::ha_async)
859                      {
860                	 if(reply->getMask() & message_mask::ha_reply)
861                	 {
862                	    if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
863                	    {
864                	       if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
865                		  *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
866                	    }
867                	 }
868                      }
869                      delete reply;
870                   }
871 mday  1.5         delete req;
872 mday  1.1         return ;
873                }
874                
875                void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
876                {
877                   if(result == 0)
878                      throw NullPointer();
879                   
880                   EnumerateService *req 
881                      = new EnumerateService(get_next_xid(),
882 mday  1.5      			     0, 
883 mday  1.1      			     _queueId, 
884                			     true, 
885                			     queue);
886                   
887 mday  1.2         AsyncMessage *reply = SendWait(req);
888 mday  1.1         
889 mday  1.2         if (reply)
890 mday  1.1         {
891                      Boolean found = false;
892                      
893                      if( reply->getMask() & message_mask::ha_async)
894                      {
895                	 if(reply->getMask() & message_mask::ha_reply)
896                	 {
897                	    if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
898                	    {
899                	       if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
900                	       {
901                		  if( found == false)
902                		  {
903                		     found = true;
904                		     
905                		     result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
906                		     result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
907                		     result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
908                		     result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
909                		  }
910                	       }
911 mday  1.1      	    }
912                	 }
913                      }
914                      delete reply;
915                   }
916 mday  1.5         delete req;
917                   
918 mday  1.1         return;
919                }
920                
921                Uint32 MessageQueueService::get_next_xid(void)
922                {
923                   _xid++;
924                   return _xid.value();
925                }
926                
927                PEGASUS_NAMESPACE_END

No CVS admin address has been configured
Powered by
ViewCVS 0.9.2