1 mday 1.1 //%////-*-c++-*-////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
13 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
14 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
15 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
17 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
18 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 //
21 //==============================================================================
22 mday 1.1 //
23 // Author: Mike Day (mdday@us.ibm.com)
24 //
25 // Modified By:
26 //
27 //%/////////////////////////////////////////////////////////////////////////////
28
29 #include "MessageQueueService.h"
|
30 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
31 mday 1.1
32 PEGASUS_NAMESPACE_BEGIN
33
|
34 mday 1.15
35 cimom *MessageQueueService::_meta_dispatcher = 0;
36 AtomicInt MessageQueueService::_service_count = 0;
37 AtomicInt MessageQueueService::_xid(1);
|
38 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
39 mday 1.15
40
|
41 mday 1.1 MessageQueueService::MessageQueueService(const char *name,
42 Uint32 queueID,
43 Uint32 capabilities,
44 Uint32 mask)
|
45 mday 1.15 : Base(name, true, queueID),
|
46 mday 1.22
|
47 mday 1.1 _mask(mask),
|
48 mday 1.4 _die(0),
|
49 mday 1.41 _incoming(true, 0),
|
50 mday 1.39 _callback(true),
|
51 mday 1.7 _incoming_queue_shutdown(0),
|
52 mday 1.39 _callback_ready(0),
53 _req_thread(_req_proc, this, false),
54 _callback_thread(_callback_proc, this, false)
|
55 mday 1.1 {
|
56 mday 1.22 _capabilities = (capabilities | module_capabilities::async);
57
|
58 mday 1.1 _default_op_timeout.tv_sec = 30;
59 _default_op_timeout.tv_usec = 100;
|
60 mday 1.15
61 _meta_dispatcher_mutex.lock(pegasus_thread_self());
62
63 if( _meta_dispatcher == 0 )
64 {
65 PEGASUS_ASSERT( _service_count.value() == 0 );
66 _meta_dispatcher = new cimom();
67 if (_meta_dispatcher == NULL )
68 {
69 _meta_dispatcher_mutex.unlock();
70
71 throw NullPointer();
72 }
73
74 }
75 _service_count++;
76
77
78 if( false == register_service(name, _capabilities, _mask) )
79 {
80 _meta_dispatcher_mutex.unlock();
81 mday 1.15 throw BindFailed("MessageQueueService Base Unable to register with Meta Dispatcher");
82 }
83
84 _meta_dispatcher_mutex.unlock();
|
85 mday 1.39 // _callback_thread.run();
86
|
87 mday 1.4 _req_thread.run();
|
88 mday 1.1 }
89
|
90 mday 1.4
|
91 mday 1.1 MessageQueueService::~MessageQueueService(void)
92 {
93 _die = 1;
|
94 mday 1.7 if (_incoming_queue_shutdown.value() == 0 )
|
95 mday 1.16 {
|
96 mday 1.32 _shutdown_incoming_queue();
|
97 mday 1.16 _req_thread.join();
98 }
|
99 mday 1.39 _callback_ready.signal();
100 _callback_thread.join();
101
|
102 mday 1.15 _meta_dispatcher_mutex.lock(pegasus_thread_self());
103 _service_count--;
104 if (_service_count.value() == 0 )
105 {
106 _meta_dispatcher->_shutdown_routed_queue();
107 delete _meta_dispatcher;
108 }
109 _meta_dispatcher_mutex.unlock();
110
|
111 mday 1.1 }
112
|
113 mday 1.15
|
114 mday 1.1
|
115 mday 1.7 void MessageQueueService::_shutdown_incoming_queue(void)
116 {
117
|
118 mday 1.9 if (_incoming_queue_shutdown.value() > 0 )
119 return ;
|
120 mday 1.8 AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
121 0,
122 _queueId,
123 _queueId,
124 true,
125 AsyncIoctl::IO_CLOSE,
126 0,
127 0);
|
128 mday 1.9
|
129 mday 1.8 msg->op = get_op();
|
130 mday 1.41 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
131 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
132 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
133 mday 1.32 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
134 mday 1.41
|
135 mday 1.32 msg->op->_op_dest = this;
|
136 mday 1.8 msg->op->_request.insert_first(msg);
137
138 _incoming.insert_last_wait(msg->op);
|
139 mday 1.32
|
140 mday 1.16 _req_thread.join();
141
|
142 mday 1.7 }
143
|
144 mday 1.22
145
146 void MessageQueueService::enqueue(Message *msg) throw(IPCException)
147 {
|
148 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
149
|
150 mday 1.22 Base::enqueue(msg);
|
151 kumpf 1.28
152 PEG_METHOD_EXIT();
|
153 mday 1.22 }
154
155
|
156 mday 1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
157 {
158 Thread *myself = reinterpret_cast<Thread *>(parm);
159 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
160 AsyncOpNode *operation = 0;
161
162 while ( service->_die.value() == 0 )
163 {
164 service->_callback_ready.wait();
165
166 service->_callback.lock();
167 operation = service->_callback.next(0);
168 while( operation != NULL)
169 {
170 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
171 {
172 operation = service->_callback.remove_no_lock(operation);
173 PEGASUS_ASSERT(operation != NULL);
174 operation->_thread_ptr = myself;
175 operation->_service_ptr = service;
176 service->_handle_async_callback(operation);
177 mday 1.39 break;
178 }
179 operation = service->_callback.next(operation);
180 }
181 service->_callback.unlock();
182 }
183 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
184 return(0);
185 }
186
|
187 mday 1.22
|
188 mday 1.5 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
|
189 mday 1.1 {
|
190 mday 1.5 Thread *myself = reinterpret_cast<Thread *>(parm);
191 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
192
193 // pull messages off the incoming queue and dispatch them. then
194 // check pending messages that are non-blocking
|
195 mday 1.7 AsyncOpNode *operation = 0;
196
|
197 mday 1.6 while ( service->_die.value() == 0 )
|
198 mday 1.1 {
|
199 mday 1.41 if(service->_incoming.count() > 0 )
|
200 mday 1.7 {
|
201 mday 1.41 try
202 {
203 operation = service->_incoming.remove_first();
204 }
205 catch(ListClosed & )
206 {
207 break;
208 }
209 if( operation )
210 {
211 operation->_thread_ptr = myself;
212 operation->_service_ptr = service;
213 service->_handle_incoming_operation(operation);
214 }
|
215 mday 1.7 }
|
216 mday 1.41 if( service->_callback.count() > 0 )
|
217 mday 1.7 {
|
218 mday 1.41 service->_callback.lock();
219 operation = service->_callback.next(0);
220 while( operation != NULL )
221 {
222 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
223 {
224 operation = service->_callback.remove_no_lock(operation);
225 service->_callback.unlock();
226 PEGASUS_ASSERT(operation != NULL);
227 operation->_thread_ptr = myself;
228 operation->_service_ptr = service;
229 service->_handle_async_callback(operation);
230 service->_callback.lock();
231 operation = service->_callback.next(operation);
232 continue;
233 }
234 pegasus_yield();
235 operation = service->_callback.next(operation);
236 }
237 service->_callback.unlock();
|
238 mday 1.7 }
|
239 mday 1.41 if( (service->_incoming.count() == 0) && (service->_callback.count() == 0 ) )
|
240 mday 1.42 myself->sleep(1);
|
241 mday 1.41 else
|
242 mday 1.42 myself->thread_switch();
243 }
244
|
245 mday 1.5 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
246 return(0);
|
247 mday 1.1 }
248
|
249 mday 1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op,
250 MessageQueue *q,
251 void *parm)
252 {
253 op->_client_sem.signal();
254 }
255
|
256 mday 1.30
257 // callback function is responsible for cleaning up all resources
258 // including op, op->_callback_node, and op->_callback_ptr
|
259 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
260 {
|
261 mday 1.39 if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
262 {
263
264 Message *msg = op->get_request();
265 if( msg && ( msg->getMask() & message_mask::ha_async))
266 {
267 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
268 {
269 AsyncLegacyOperationStart *wrapper =
270 static_cast<AsyncLegacyOperationStart *>(msg);
271 msg = wrapper->get_action();
272 delete wrapper;
273 }
274 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
275 {
276 AsyncModuleOperationStart *wrapper =
277 static_cast<AsyncModuleOperationStart *>(msg);
278 msg = wrapper->get_action();
279 delete wrapper;
280 }
281 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
282 mday 1.39 {
283 AsyncModuleOperationStart *wrapper =
284 static_cast<AsyncModuleOperationStart *>(msg);
285 msg = wrapper->get_action();
286 delete wrapper;
287 }
288 else if (msg->getType() == async_messages::ASYNC_OP_START)
289 {
290 AsyncOperationStart *wrapper =
291 static_cast<AsyncOperationStart *>(msg);
292 msg = wrapper->get_action();
293 delete wrapper;
294 }
295 delete msg;
296 }
297
298 msg = op->get_response();
299 if( msg && ( msg->getMask() & message_mask::ha_async))
300 {
301 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
302 {
303 mday 1.39 AsyncLegacyOperationResult *wrapper =
304 static_cast<AsyncLegacyOperationResult *>(msg);
305 msg = wrapper->get_result();
306 delete wrapper;
307 }
308 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
309 {
310 AsyncModuleOperationResult *wrapper =
311 static_cast<AsyncModuleOperationResult *>(msg);
312 msg = wrapper->get_result();
313 delete wrapper;
314 }
315 }
316 void (*callback)(Message *, void *, void *) = op->__async_callback;
317 void *handle = op->_callback_handle;
318 void *parm = op->_callback_parameter;
319 op->release();
320 return_op(op);
321 callback(msg, handle, parm);
322 }
323 else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
324 mday 1.39 {
325 // note that _callback_node may be different from op
326 // op->_callback_response_q is a "this" pointer we can use for
327 // static callback methods
328 op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
329 }
|
330 mday 1.22 }
331
|
332 mday 1.1
|
333 mday 1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
334 // Thread *thread,
335 // MessageQueue *queue)
|
336 mday 1.1 {
|
337 mday 1.5 if ( operation != 0 )
338 {
|
339 mday 1.29
|
340 mday 1.22 // ATTN: optimization
341 // << Tue Feb 19 14:10:38 2002 mdd >>
|
342 mday 1.6 operation->lock();
|
343 mday 1.29
|
344 mday 1.6 Message *rq = operation->_request.next(0);
|
345 mday 1.29
|
346 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
347 // move this to the bottom of the loop when the majority of
348 // messages become async messages.
349
|
350 mday 1.18 // divert legacy messages to handleEnqueue
|
351 mday 1.29 if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
|
352 mday 1.18 {
353 rq = operation->_request.remove_first() ;
354 operation->unlock();
355 // delete the op node
|
356 mday 1.39 operation->release();
357 return_op( operation);
|
358 mday 1.24
|
359 mday 1.26 handleEnqueue(rq);
|
360 mday 1.18 return;
361 }
362
|
363 mday 1.39 if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
364 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
|
365 mday 1.29 (operation->_state & ASYNC_OPSTATE_COMPLETE))
366 {
367 operation->unlock();
368 _handle_async_callback(operation);
369 }
370 else
371 {
372 PEGASUS_ASSERT(rq != 0 );
373 // ATTN: optimization
374 // << Wed Mar 6 15:00:39 2002 mdd >>
375 // put thread and queue into the asyncopnode structure.
|
376 mday 1.34 // (static_cast<AsyncMessage *>(rq))->_myself = operation->_thread_ptr;
377 // (static_cast<AsyncMessage *>(rq))->_service = operation->_service_ptr;
378 // done << Tue Mar 12 14:49:07 2002 mdd >>
|
379 mday 1.29 operation->unlock();
380 _handle_async_request(static_cast<AsyncRequest *>(rq));
381 }
|
382 mday 1.5 }
383 return;
|
384 mday 1.1 }
385
386 void MessageQueueService::_handle_async_request(AsyncRequest *req)
387 {
|
388 mday 1.4 if ( req != 0 )
389 {
390 req->op->processing();
|
391 mday 1.1
|
392 mday 1.4 Uint32 type = req->getType();
393 if( type == async_messages::HEARTBEAT )
394 handle_heartbeat_request(req);
395 else if (type == async_messages::IOCTL)
396 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
397 else if (type == async_messages::CIMSERVICE_START)
398 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
399 else if (type == async_messages::CIMSERVICE_STOP)
400 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
401 else if (type == async_messages::CIMSERVICE_PAUSE)
402 handle_CimServicePause(static_cast<CimServicePause *>(req));
403 else if (type == async_messages::CIMSERVICE_RESUME)
404 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
405 else if ( type == async_messages::ASYNC_OP_START)
406 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
407 else
408 {
409 // we don't handle this request message
410 _make_response(req, async_results::CIM_NAK );
411 }
|
412 mday 1.1 }
413 }
414
|
415 mday 1.17
416 Boolean MessageQueueService::_enqueueResponse(
417 Message* request,
418 Message* response)
|
419 mday 1.18
|
420 mday 1.17 {
|
421 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
422 "MessageQueueService::_enqueueResponse");
|
423 mday 1.25
424 if( request->getMask() & message_mask::ha_async)
425 {
426 if (response->getMask() & message_mask::ha_async )
427 {
428 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
429 static_cast<AsyncReply *>(response),
430 ASYNC_OPSTATE_COMPLETE, 0 );
|
431 kumpf 1.37 PEG_METHOD_EXIT();
|
432 mday 1.25 return true;
433 }
434 }
435
|
436 mday 1.17 if(request->_async != 0 )
437 {
438 Uint32 mask = request->_async->getMask();
|
439 mday 1.18 PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
440
441 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
442 AsyncOpNode *op = async->op;
443 request->_async = 0;
|
444 mday 1.27 // this request is probably going to be deleted !!
445 // remove it from the op node
446 op->_request.remove(request);
|
447 mday 1.18
448 AsyncLegacyOperationResult *async_result =
449 new AsyncLegacyOperationResult(
450 async->getKey(),
451 async->getRouting(),
452 op,
453 response);
454 _completeAsyncResponse(async,
455 async_result,
456 ASYNC_OPSTATE_COMPLETE,
457 0);
|
458 kumpf 1.37 PEG_METHOD_EXIT();
|
459 mday 1.18 return true;
|
460 mday 1.17 }
|
461 mday 1.18
462 // ensure that the destination queue is in response->dest
|
463 kumpf 1.37 PEG_METHOD_EXIT();
|
464 mday 1.24 return SendForget(response);
|
465 mday 1.18
|
466 mday 1.17 }
467
|
468 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
469 mday 1.1 {
|
470 mday 1.19 cimom::_make_response(req, code);
|
471 mday 1.1 }
472
473
|
474 mday 1.5 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
475 AsyncReply *reply,
476 Uint32 state,
477 Uint32 flag)
478 {
|
479 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
480 "MessageQueueService::_completeAsyncResponse");
481
|
482 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
483 kumpf 1.37
484 PEG_METHOD_EXIT();
|
485 mday 1.5 }
486
487
|
488 mday 1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
489 Uint32 state,
490 Uint32 flag,
491 Uint32 code)
492 {
493 cimom::_complete_op_node(op, state, flag, code);
494 }
495
|
496 mday 1.5
497 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
498 {
|
499 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
500 return false;
501
|
502 mday 1.20 // ATTN optimization remove the message checking altogether in the base
503 // << Mon Feb 18 14:02:20 2002 mdd >>
|
504 mday 1.6 op->lock();
505 Message *rq = op->_request.next(0);
|
506 mday 1.20 Message *rp = op->_response.next(0);
|
507 mday 1.6 op->unlock();
508
|
509 mday 1.22 if( (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&
510 _die.value() == 0 )
|
511 mday 1.5 {
|
512 mday 1.6 _incoming.insert_last_wait(op);
|
513 mday 1.5 return true;
514 }
|
515 mday 1.32 // else
516 // {
517 // if( (rq != 0 && (true == MessageQueueService::messageOK(rq))) ||
518 // (rp != 0 && ( true == MessageQueueService::messageOK(rp) )) &&
519 // _die.value() == 0)
520 // {
521 // MessageQueueService::_incoming.insert_last_wait(op);
522 // return true;
523 // }
524 // }
525
|
526 mday 1.5 return false;
527 }
528
529 Boolean MessageQueueService::messageOK(const Message *msg)
530 {
|
531 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
532 return false;
|
533 mday 1.18 return true;
534 }
535
|
536 mday 1.29
537 // made pure virtual
538 // << Wed Mar 6 15:11:31 2002 mdd >>
|
539 mday 1.25 // void MessageQueueService::handleEnqueue(Message *msg)
540 // {
541 // if ( msg )
542 // delete msg;
543 // }
|
544 mday 1.5
|
545 mday 1.29 // made pure virtual
546 // << Wed Mar 6 15:11:56 2002 mdd >>
|
547 mday 1.25 // void MessageQueueService::handleEnqueue(void)
548 // {
549 // Message *msg = dequeue();
550 // handleEnqueue(msg);
551 // }
|
552 mday 1.5
|
553 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
554 {
555 // default action is to echo a heartbeat response
556
557 AsyncReply *reply =
558 new AsyncReply(async_messages::HEARTBEAT,
559 req->getKey(),
560 req->getRouting(),
561 0,
562 req->op,
563 async_results::OK,
564 req->resp,
565 false);
|
566 mday 1.4 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
|
567 mday 1.1 }
568
569
570 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
571 {
572 ;
573 }
574
575 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
576 {
|
577 mday 1.8
578 switch( req->ctl )
579 {
580 case AsyncIoctl::IO_CLOSE:
581 {
582 // save my bearings
|
583 mday 1.34 Thread *myself = req->op->_thread_ptr;
584 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
|
585 mday 1.8
586 // respond to this message.
587 _make_response(req, async_results::OK);
588 // ensure we do not accept any further messages
589
590 // ensure we don't recurse on IO_CLOSE
591 if( _incoming_queue_shutdown.value() > 0 )
592 break;
593
594 // set the closing flag
595 service->_incoming_queue_shutdown = 1;
596 // empty out the queue
597 while( 1 )
598 {
599 AsyncOpNode *operation;
600 try
601 {
602 operation = service->_incoming.remove_first();
603 }
604 catch(IPCException & )
605 {
606 mday 1.8 break;
607 }
608 if( operation )
609 {
|
610 mday 1.34 operation->_thread_ptr = myself;
611 operation->_service_ptr = service;
612 service->_handle_incoming_operation(operation);
|
613 mday 1.8 }
614 else
615 break;
616 } // message processing loop
617
618 // shutdown the AsyncDQueue
619 service->_incoming.shutdown_queue();
620 // exit the thread !
621 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
622 return;
623 }
624
625 default:
626 _make_response(req, async_results::CIM_NAK);
627 }
|
628 mday 1.1 }
|
629 mday 1.8
|
630 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
631 {
|
632 mday 1.10 // clear the stoped bit and update
|
633 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
634 mday 1.10 _make_response(req, async_results::OK);
635 // now tell the meta dispatcher we are stopped
636 update_service(_capabilities, _mask);
637
|
638 mday 1.1 }
639 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
640 {
|
641 mday 1.10 // set the stopeed bit and update
642 _capabilities |= module_capabilities::stopped;
643 _make_response(req, async_results::CIM_STOPPED);
644 // now tell the meta dispatcher we are stopped
645 update_service(_capabilities, _mask);
646
|
647 mday 1.1 }
648 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
649 {
|
650 mday 1.10 // set the paused bit and update
|
651 mday 1.13 _capabilities |= module_capabilities::paused;
|
652 mday 1.11 update_service(_capabilities, _mask);
|
653 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
654 // now tell the meta dispatcher we are stopped
|
655 mday 1.1 }
656 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
657 {
|
658 mday 1.10 // clear the paused bit and update
|
659 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
660 mday 1.11 update_service(_capabilities, _mask);
|
661 mday 1.10 _make_response(req, async_results::OK);
662 // now tell the meta dispatcher we are stopped
|
663 mday 1.1 }
664
665 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
666 {
667 _make_response(req, async_results::CIM_NAK);
668 }
669
670 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
671 {
|
672 mday 1.14 ;
673 }
674
|
675 mday 1.10
|
676 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
677 {
678 // remove the legacy message from the request and enqueue it to its destination
679 Uint32 result = async_results::CIM_NAK;
680
|
681 mday 1.25 Message *legacy = req->_act;
|
682 mday 1.14 if ( legacy != 0 )
683 {
|
684 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
685 mday 1.14 if( queue != 0 )
686 {
|
687 mday 1.25 if(queue->isAsync() == true )
688 {
689 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
690 }
691 else
692 {
693 // Enqueue the response:
694 queue->enqueue(req->get_action());
695 }
696
|
697 mday 1.14 result = async_results::OK;
698 }
699 }
700 _make_response(req, result);
701 }
702
703 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
704 {
705 ;
|
706 mday 1.1 }
707
708 AsyncOpNode *MessageQueueService::get_op(void)
709 {
|
710 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
711 mday 1.1
|
712 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
713 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
714 mday 1.4
|
715 mday 1.1 return op;
716 }
717
718 void MessageQueueService::return_op(AsyncOpNode *op)
719 {
720 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
|
721 mday 1.4 delete op;
|
722 mday 1.1 }
723
|
724 mday 1.18
|
725 mday 1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op,
726 Uint32 destination)
727 {
728 PEGASUS_ASSERT(op != 0 );
|
729 mday 1.30 op->lock();
730 op->_op_dest = MessageQueue::lookup(destination);
|
731 mday 1.29 op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
732 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
|
733 mday 1.30 op->unlock();
734 if ( op->_op_dest == 0 )
735 return false;
736
|
737 mday 1.29 return _meta_dispatcher->route_async(op);
738 }
739
|
740 mday 1.39
|
741 mday 1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op,
742 Uint32 destination,
|
743 mday 1.18 void (*callback)(AsyncOpNode *,
|
744 mday 1.32 MessageQueue *,
|
745 mday 1.30 void *),
746 MessageQueue *callback_response_q,
747 void *callback_ptr)
|
748 mday 1.20 {
|
749 mday 1.21 PEGASUS_ASSERT(op != 0 && callback != 0 );
|
750 mday 1.18
|
751 mday 1.21 // get the queue handle for the destination
752
|
753 mday 1.30 op->lock();
|
754 mday 1.32 op->_op_dest = MessageQueue::lookup(destination); // destination of this message
|
755 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
756 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
757 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
758 mday 1.30 // initialize the callback data
|
759 mday 1.32 op->_async_callback = callback; // callback function to be executed by recpt. of response
760 op->_callback_node = op; // the op node
761 op->_callback_response_q = callback_response_q; // the queue that will receive the response
762 op->_callback_ptr = callback_ptr; // user data for callback
763 op->_callback_request_q = this; // I am the originator of this request
|
764 mday 1.30
765 op->unlock();
766 if(op->_op_dest == 0)
767 return false;
|
768 mday 1.21
769 return _meta_dispatcher->route_async(op);
|
770 mday 1.18 }
771
772
|
773 mday 1.39 Boolean MessageQueueService::SendAsync(Message *msg,
774 Uint32 destination,
775 void (*callback)(Message *response,
776 void *handle,
777 void *parameter),
778 void *handle,
779 void *parameter)
780 {
781 if(msg == NULL)
782 return false;
783 if(callback == NULL)
784 return SendForget(msg);
785 AsyncOpNode *op = get_op();
786 if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
787 {
788 op->release();
789 return_op(op);
790 return false;
791 }
792 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
793 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
794 mday 1.39 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
795 op->__async_callback = callback;
796 op->_callback_node = op;
797 op->_callback_handle = handle;
798 op->_callback_parameter = parameter;
799 op->_callback_response_q = this;
800
801
802 if( ! (msg->getMask() & message_mask::ha_async) )
803 {
804 AsyncLegacyOperationStart *wrapper =
805 new AsyncLegacyOperationStart(get_next_xid(),
806 op,
807 destination,
808 msg,
809 destination);
810 msg = static_cast<Message *>(wrapper);
811 }
812 else
813 {
814 op->_request.insert_first(msg);
815 mday 1.39 (static_cast<AsyncMessage *>(msg))->op = op;
816 }
817
|
818 mday 1.42 op->_callback_notify = _incoming.get_node_cond();
|
819 mday 1.39 _callback.insert_last(op);
820 return _meta_dispatcher->route_async(op);
821 }
822
823
|
824 mday 1.18 Boolean MessageQueueService::SendForget(Message *msg)
825 {
826
|
827 mday 1.24
|
828 mday 1.18 AsyncOpNode *op = 0;
|
829 mday 1.22 Uint32 mask = msg->getMask();
830
831 if (mask & message_mask::ha_async)
|
832 mday 1.18 {
833 op = (static_cast<AsyncMessage *>(msg))->op ;
834 }
|
835 mday 1.22
|
836 mday 1.18 if( op == 0 )
|
837 mday 1.20 {
|
838 mday 1.18 op = get_op();
|
839 mday 1.20 op->_request.insert_first(msg);
|
840 mday 1.22 if (mask & message_mask::ha_async)
841 (static_cast<AsyncMessage *>(msg))->op = op;
|
842 mday 1.20 }
|
843 mday 1.30 op->_op_dest = MessageQueue::lookup(msg->dest);
|
844 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
845 mday 1.41 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
846 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
847 mday 1.22 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
848 mday 1.30 if ( op->_op_dest == 0 )
|
849 mday 1.39 {
850 op->release();
851 return_op(op);
|
852 mday 1.21 return false;
|
853 mday 1.39 }
|
854 mday 1.24
|
855 mday 1.18 // now see if the meta dispatcher will take it
|
856 mday 1.30 return _meta_dispatcher->route_async(op);
|
857 mday 1.18 }
|
858 mday 1.2
|
859 mday 1.1
|
860 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
861 mday 1.1 {
|
862 mday 1.4 if ( request == 0 )
863 return 0 ;
|
864 mday 1.5
865 Boolean destroy_op = false;
866
867 if (request->op == false)
868 {
869 request->op = get_op();
|
870 mday 1.7 request->op->_request.insert_first(request);
|
871 mday 1.5 destroy_op = true;
872 }
|
873 mday 1.4
|
874 mday 1.33 request->block = false;
|
875 mday 1.35 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
|
876 mday 1.33 SendAsync(request->op,
877 request->dest,
|
878 mday 1.36 _sendwait_callback,
|
879 mday 1.33 this,
880 (void *)0);
|
881 mday 1.4
|
882 mday 1.33 request->op->_client_sem.wait();
|
883 mday 1.6 request->op->lock();
884 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
885 rpl->op = 0;
886 request->op->unlock();
887
|
888 mday 1.5 if( destroy_op == true)
889 {
|
890 mday 1.6 request->op->lock();
891 request->op->_request.remove(request);
892 request->op->_state |= ASYNC_OPSTATE_RELEASED;
893 request->op->unlock();
894 return_op(request->op);
|
895 mday 1.7 request->op = 0;
|
896 mday 1.5 }
897 return rpl;
|
898 mday 1.1 }
899
900
901 Boolean MessageQueueService::register_service(String name,
902 Uint32 capabilities,
903 Uint32 mask)
904
905 {
906 RegisterCimService *msg = new RegisterCimService(get_next_xid(),
|
907 mday 1.5 0,
|
908 mday 1.1 true,
909 name,
910 capabilities,
911 mask,
912 _queueId);
913 Boolean registered = false;
|
914 mday 1.7 AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
|
915 mday 1.1
|
916 mday 1.2 if ( reply != 0 )
|
917 mday 1.1 {
918 if(reply->getMask() & message_mask:: ha_async)
919 {
920 if(reply->getMask() & message_mask::ha_reply)
921 {
|
922 mday 1.15 if(reply->result == async_results::OK ||
923 reply->result == async_results::MODULE_ALREADY_REGISTERED )
|
924 mday 1.1 registered = true;
925 }
926 }
927
|
928 mday 1.7 delete reply;
|
929 mday 1.1 }
|
930 mday 1.5 delete msg;
|
931 mday 1.1 return registered;
932 }
933
934 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
935 {
936
937
938 UpdateCimService *msg = new UpdateCimService(get_next_xid(),
|
939 mday 1.5 0,
|
940 mday 1.1 true,
941 _queueId,
942 _capabilities,
943 _mask);
944 Boolean registered = false;
|
945 mday 1.2
946 AsyncMessage *reply = SendWait(msg);
947 if (reply)
|
948 mday 1.1 {
949 if(reply->getMask() & message_mask:: ha_async)
950 {
951 if(reply->getMask() & message_mask::ha_reply)
952 {
|
953 mday 1.2 if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
|
954 mday 1.1 registered = true;
955 }
956 }
957 delete reply;
958 }
|
959 mday 1.5 delete msg;
|
960 mday 1.1 return registered;
961 }
962
963
964 Boolean MessageQueueService::deregister_service(void)
965 {
|
966 mday 1.3
|
967 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
968 return true;
|
969 mday 1.1 }
970
971
972 void MessageQueueService::find_services(String name,
973 Uint32 capabilities,
974 Uint32 mask,
975 Array<Uint32> *results)
976 {
977
978 if( results == 0 )
979 throw NullPointer();
|
980 mday 1.5
|
981 mday 1.1 results->clear();
982
983 FindServiceQueue *req =
984 new FindServiceQueue(get_next_xid(),
|
985 mday 1.5 0,
|
986 mday 1.1 _queueId,
987 true,
988 name,
989 capabilities,
990 mask);
991
|
992 mday 1.2 AsyncMessage *reply = SendWait(req);
993 if(reply)
|
994 mday 1.1 {
995 if( reply->getMask() & message_mask::ha_async)
996 {
997 if(reply->getMask() & message_mask::ha_reply)
998 {
999 if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1000 {
1001 if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1002 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1003 }
1004 }
1005 }
1006 delete reply;
1007 }
|
1008 mday 1.5 delete req;
|
1009 mday 1.1 return ;
1010 }
1011
1012 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1013 {
1014 if(result == 0)
1015 throw NullPointer();
1016
1017 EnumerateService *req
1018 = new EnumerateService(get_next_xid(),
|
1019 mday 1.5 0,
|
1020 mday 1.1 _queueId,
1021 true,
1022 queue);
1023
|
1024 mday 1.2 AsyncMessage *reply = SendWait(req);
|
1025 mday 1.1
|
1026 mday 1.2 if (reply)
|
1027 mday 1.1 {
1028 Boolean found = false;
1029
1030 if( reply->getMask() & message_mask::ha_async)
1031 {
1032 if(reply->getMask() & message_mask::ha_reply)
1033 {
1034 if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1035 {
1036 if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1037 {
1038 if( found == false)
1039 {
1040 found = true;
1041
1042 result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1043 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1044 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1045 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1046 }
1047 }
1048 mday 1.1 }
1049 }
1050 }
1051 delete reply;
1052 }
|
1053 mday 1.5 delete req;
1054
|
1055 mday 1.1 return;
1056 }
1057
1058 Uint32 MessageQueueService::get_next_xid(void)
1059 {
1060 _xid++;
1061 return _xid.value();
1062 }
1063
1064 PEGASUS_NAMESPACE_END
|