1 mday 1.53 //%////-*-c++-*-////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 mday 1.53 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
|
4 mday 1.1 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
|
11 mday 1.53 //
|
12 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
13 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
14 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
15 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
17 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
18 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 //
21 //==============================================================================
22 //
23 // Author: Mike Day (mdday@us.ibm.com)
24 //
25 // Modified By:
26 //
27 //%/////////////////////////////////////////////////////////////////////////////
28
29 #include "MessageQueueService.h"
|
30 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
31 mday 1.1
32 PEGASUS_NAMESPACE_BEGIN
33
|
34 mday 1.15
35 cimom *MessageQueueService::_meta_dispatcher = 0;
36 AtomicInt MessageQueueService::_service_count = 0;
37 AtomicInt MessageQueueService::_xid(1);
|
38 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
39 mday 1.15
|
40 mday 1.53 static struct timeval create_time = {0, 100};
41 static struct timeval destroy_time = {1, 0};
42 static struct timeval deadlock_time = {10, 0};
43
44 ThreadPool MessageQueueService::_thread_pool(2, "MessageQueueService", 2, 20,
45 create_time, destroy_time, deadlock_time);
46
47 DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
48
49 int MessageQueueService::kill_idle_threads(void)
50 {
51 static struct timeval now, last;
52 gettimeofday(&now, NULL);
53
54 if( now.tv_sec - last.tv_sec > 0 )
55 {
56 gettimeofday(&last, NULL);
57 return _thread_pool.kill_dead_threads();
58 }
59 return 0;
60 }
61 mday 1.53
62 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
63 {
64 Thread *myself = reinterpret_cast<Thread *>(parm);
65
66 DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
67
68 while ( _stop_polling.value() == 0 )
69 {
70 _polling_sem.wait();
71 list->lock();
72 MessageQueueService *service = list->next(0);
73 while(service != NULL)
74 {
75 if(service->_incoming.count() > 0 )
76 {
77 _thread_pool.allocate_and_awaken(service, _req_proc);
78
79 // service->_req_proc(service);
80 }
81
82 mday 1.53 service = list->next(service);
83 }
84 list->unlock();
85 }
86 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
87 return(0);
88 }
89
90 Thread MessageQueueService::_polling_thread(polling_routine,
91 reinterpret_cast<void *>(&_polling_list),
92 false);
93
94
95 Semaphore MessageQueueService::_polling_sem(0);
96 AtomicInt MessageQueueService::_stop_polling(0);
97
|
98 mday 1.15
|
99 mday 1.1 MessageQueueService::MessageQueueService(const char *name,
100 Uint32 queueID,
101 Uint32 capabilities,
102 Uint32 mask)
|
103 mday 1.15 : Base(name, true, queueID),
|
104 mday 1.22
|
105 mday 1.1 _mask(mask),
|
106 mday 1.4 _die(0),
|
107 mday 1.41 _incoming(true, 0),
|
108 mday 1.39 _callback(true),
|
109 mday 1.7 _incoming_queue_shutdown(0),
|
110 mday 1.39 _callback_ready(0),
111 _req_thread(_req_proc, this, false),
112 _callback_thread(_callback_proc, this, false)
|
113 mday 1.53
|
114 mday 1.1 {
|
115 mday 1.22 _capabilities = (capabilities | module_capabilities::async);
116
|
117 mday 1.1 _default_op_timeout.tv_sec = 30;
118 _default_op_timeout.tv_usec = 100;
|
119 mday 1.15
120 _meta_dispatcher_mutex.lock(pegasus_thread_self());
121
122 if( _meta_dispatcher == 0 )
123 {
124 PEGASUS_ASSERT( _service_count.value() == 0 );
125 _meta_dispatcher = new cimom();
126 if (_meta_dispatcher == NULL )
127 {
128 _meta_dispatcher_mutex.unlock();
129 throw NullPointer();
130 }
|
131 mday 1.53 _polling_thread.run();
|
132 mday 1.15 }
133 _service_count++;
134
135 if( false == register_service(name, _capabilities, _mask) )
136 {
137 _meta_dispatcher_mutex.unlock();
138 throw BindFailed("MessageQueueService Base Unable to register with Meta Dispatcher");
139 }
140
|
141 mday 1.53 _polling_list.insert_last(this);
142
|
143 mday 1.15 _meta_dispatcher_mutex.unlock();
|
144 mday 1.39 // _callback_thread.run();
145
|
146 mday 1.53 // _req_thread.run();
|
147 mday 1.1 }
148
|
149 mday 1.4
|
150 mday 1.1 MessageQueueService::~MessageQueueService(void)
151 {
152 _die = 1;
|
153 mday 1.7 if (_incoming_queue_shutdown.value() == 0 )
|
154 mday 1.16 {
|
155 mday 1.32 _shutdown_incoming_queue();
|
156 mday 1.16 }
|
157 mday 1.39 _callback_ready.signal();
|
158 mday 1.53 // _callback_thread.join();
|
159 mday 1.39
|
160 mday 1.15 _meta_dispatcher_mutex.lock(pegasus_thread_self());
161 _service_count--;
162 if (_service_count.value() == 0 )
163 {
|
164 mday 1.53 _stop_polling++;
165 _polling_sem.signal();
166 _polling_thread.join();
|
167 mday 1.15 _meta_dispatcher->_shutdown_routed_queue();
168 delete _meta_dispatcher;
|
169 kumpf 1.45 _meta_dispatcher = 0;
|
170 mday 1.53
|
171 mday 1.15 }
172 _meta_dispatcher_mutex.unlock();
|
173 mday 1.53 _polling_list.remove(this);
174 }
|
175 mday 1.1
|
176 mday 1.7 void MessageQueueService::_shutdown_incoming_queue(void)
177 {
178
|
179 mday 1.9 if (_incoming_queue_shutdown.value() > 0 )
180 return ;
|
181 mday 1.8 AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
182 0,
183 _queueId,
184 _queueId,
185 true,
186 AsyncIoctl::IO_CLOSE,
187 0,
188 0);
|
189 mday 1.9
|
190 mday 1.8 msg->op = get_op();
|
191 mday 1.41 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
192 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
193 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
194 mday 1.32 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
195 mday 1.41
|
196 mday 1.32 msg->op->_op_dest = this;
|
197 mday 1.8 msg->op->_request.insert_first(msg);
198
199 _incoming.insert_last_wait(msg->op);
|
200 mday 1.32
|
201 mday 1.53 // _req_thread.join();
|
202 mday 1.16
|
203 mday 1.7 }
204
|
205 mday 1.22
206
207 void MessageQueueService::enqueue(Message *msg) throw(IPCException)
208 {
|
209 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
210
|
211 mday 1.22 Base::enqueue(msg);
|
212 kumpf 1.28
213 PEG_METHOD_EXIT();
|
214 mday 1.22 }
215
216
|
217 mday 1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
218 {
219 Thread *myself = reinterpret_cast<Thread *>(parm);
220 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
221 AsyncOpNode *operation = 0;
222
223 while ( service->_die.value() == 0 )
224 {
225 service->_callback_ready.wait();
226
227 service->_callback.lock();
228 operation = service->_callback.next(0);
229 while( operation != NULL)
230 {
231 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
232 {
233 operation = service->_callback.remove_no_lock(operation);
234 PEGASUS_ASSERT(operation != NULL);
235 operation->_thread_ptr = myself;
236 operation->_service_ptr = service;
237 service->_handle_async_callback(operation);
238 mday 1.39 break;
239 }
240 operation = service->_callback.next(operation);
241 }
242 service->_callback.unlock();
243 }
244 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
245 return(0);
246 }
247
|
248 mday 1.22
|
249 mday 1.5 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
|
250 mday 1.1 {
|
251 mday 1.53 // Thread *myself = reinterpret_cast<Thread *>(parm);
252 // MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
253 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
|
254 mday 1.5 // pull messages off the incoming queue and dispatch them. then
255 // check pending messages that are non-blocking
|
256 mday 1.7 AsyncOpNode *operation = 0;
257
|
258 mday 1.53 // while ( service->_die.value() == 0 )
259 // {
|
260 mday 1.41 try
261 {
|
262 mday 1.53 operation = service->_incoming.remove_first();
|
263 mday 1.41 }
264 catch(ListClosed & )
265 {
|
266 mday 1.53 // break;
|
267 mday 1.41 }
268 if( operation )
269 {
|
270 mday 1.53 // operation->_thread_ptr = pegasus_thread_self();
|
271 mday 1.41 operation->_service_ptr = service;
272 service->_handle_incoming_operation(operation);
273 }
|
274 mday 1.53 // }
|
275 mday 1.44
|
276 mday 1.53 // myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
|
277 mday 1.5 return(0);
|
278 mday 1.1 }
279
|
280 mday 1.43 Uint32 MessageQueueService::get_pending_callback_count(void)
281 {
282 return _callback.count();
283 }
284
285
286
|
287 mday 1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op,
288 MessageQueue *q,
289 void *parm)
290 {
291 op->_client_sem.signal();
292 }
293
|
294 mday 1.30
295 // callback function is responsible for cleaning up all resources
296 // including op, op->_callback_node, and op->_callback_ptr
|
297 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
298 {
|
299 mday 1.39 if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
300 {
301
302 Message *msg = op->get_request();
303 if( msg && ( msg->getMask() & message_mask::ha_async))
304 {
305 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
306 {
307 AsyncLegacyOperationStart *wrapper =
308 static_cast<AsyncLegacyOperationStart *>(msg);
309 msg = wrapper->get_action();
310 delete wrapper;
311 }
312 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
313 {
314 AsyncModuleOperationStart *wrapper =
315 static_cast<AsyncModuleOperationStart *>(msg);
316 msg = wrapper->get_action();
317 delete wrapper;
318 }
319 else if (msg->getType() == async_messages::ASYNC_OP_START)
320 mday 1.39 {
321 AsyncOperationStart *wrapper =
322 static_cast<AsyncOperationStart *>(msg);
323 msg = wrapper->get_action();
324 delete wrapper;
325 }
326 delete msg;
327 }
328
329 msg = op->get_response();
330 if( msg && ( msg->getMask() & message_mask::ha_async))
331 {
332 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
333 {
334 AsyncLegacyOperationResult *wrapper =
335 static_cast<AsyncLegacyOperationResult *>(msg);
336 msg = wrapper->get_result();
337 delete wrapper;
338 }
339 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
340 {
341 mday 1.39 AsyncModuleOperationResult *wrapper =
342 static_cast<AsyncModuleOperationResult *>(msg);
343 msg = wrapper->get_result();
344 delete wrapper;
345 }
346 }
347 void (*callback)(Message *, void *, void *) = op->__async_callback;
348 void *handle = op->_callback_handle;
349 void *parm = op->_callback_parameter;
350 op->release();
351 return_op(op);
352 callback(msg, handle, parm);
353 }
354 else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
355 {
356 // note that _callback_node may be different from op
357 // op->_callback_response_q is a "this" pointer we can use for
358 // static callback methods
359 op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
360 }
|
361 mday 1.22 }
362
|
363 mday 1.1
|
364 mday 1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
365 // Thread *thread,
366 // MessageQueue *queue)
|
367 mday 1.1 {
|
368 mday 1.5 if ( operation != 0 )
369 {
|
370 mday 1.29
|
371 mday 1.22 // ATTN: optimization
372 // << Tue Feb 19 14:10:38 2002 mdd >>
|
373 mday 1.6 operation->lock();
|
374 mday 1.29
|
375 mday 1.6 Message *rq = operation->_request.next(0);
|
376 mday 1.29
|
377 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
378 // move this to the bottom of the loop when the majority of
379 // messages become async messages.
380
|
381 mday 1.18 // divert legacy messages to handleEnqueue
|
382 mday 1.29 if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
|
383 mday 1.18 {
384 rq = operation->_request.remove_first() ;
385 operation->unlock();
386 // delete the op node
|
387 mday 1.39 operation->release();
388 return_op( operation);
|
389 mday 1.24
|
390 mday 1.26 handleEnqueue(rq);
|
391 mday 1.18 return;
392 }
393
|
394 mday 1.39 if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
395 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
|
396 mday 1.29 (operation->_state & ASYNC_OPSTATE_COMPLETE))
397 {
|
398 mday 1.49
|
399 mday 1.29 operation->unlock();
400 _handle_async_callback(operation);
401 }
402 else
403 {
404 PEGASUS_ASSERT(rq != 0 );
405 // ATTN: optimization
406 // << Wed Mar 6 15:00:39 2002 mdd >>
407 // put thread and queue into the asyncopnode structure.
|
408 mday 1.34 // (static_cast<AsyncMessage *>(rq))->_myself = operation->_thread_ptr;
409 // (static_cast<AsyncMessage *>(rq))->_service = operation->_service_ptr;
410 // done << Tue Mar 12 14:49:07 2002 mdd >>
|
411 mday 1.29 operation->unlock();
412 _handle_async_request(static_cast<AsyncRequest *>(rq));
413 }
|
414 mday 1.5 }
415 return;
|
416 mday 1.1 }
417
418 void MessageQueueService::_handle_async_request(AsyncRequest *req)
419 {
|
420 mday 1.4 if ( req != 0 )
421 {
422 req->op->processing();
|
423 mday 1.1
|
424 mday 1.4 Uint32 type = req->getType();
425 if( type == async_messages::HEARTBEAT )
426 handle_heartbeat_request(req);
427 else if (type == async_messages::IOCTL)
428 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
429 else if (type == async_messages::CIMSERVICE_START)
430 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
431 else if (type == async_messages::CIMSERVICE_STOP)
432 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
433 else if (type == async_messages::CIMSERVICE_PAUSE)
434 handle_CimServicePause(static_cast<CimServicePause *>(req));
435 else if (type == async_messages::CIMSERVICE_RESUME)
436 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
437 else if ( type == async_messages::ASYNC_OP_START)
438 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
439 else
440 {
441 // we don't handle this request message
442 _make_response(req, async_results::CIM_NAK );
443 }
|
444 mday 1.1 }
445 }
446
|
447 mday 1.17
448 Boolean MessageQueueService::_enqueueResponse(
449 Message* request,
450 Message* response)
|
451 mday 1.18
|
452 mday 1.17 {
|
453 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
454 "MessageQueueService::_enqueueResponse");
|
455 mday 1.25
456 if( request->getMask() & message_mask::ha_async)
457 {
458 if (response->getMask() & message_mask::ha_async )
459 {
460 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
461 static_cast<AsyncReply *>(response),
462 ASYNC_OPSTATE_COMPLETE, 0 );
|
463 kumpf 1.37 PEG_METHOD_EXIT();
|
464 mday 1.25 return true;
465 }
466 }
467
|
468 mday 1.17 if(request->_async != 0 )
469 {
470 Uint32 mask = request->_async->getMask();
|
471 mday 1.18 PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
472
473 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
474 AsyncOpNode *op = async->op;
475 request->_async = 0;
|
476 mday 1.27 // this request is probably going to be deleted !!
477 // remove it from the op node
478 op->_request.remove(request);
|
479 mday 1.18
480 AsyncLegacyOperationResult *async_result =
481 new AsyncLegacyOperationResult(
482 async->getKey(),
483 async->getRouting(),
484 op,
485 response);
486 _completeAsyncResponse(async,
487 async_result,
488 ASYNC_OPSTATE_COMPLETE,
489 0);
|
490 kumpf 1.37 PEG_METHOD_EXIT();
|
491 mday 1.18 return true;
|
492 mday 1.17 }
|
493 mday 1.18
494 // ensure that the destination queue is in response->dest
|
495 kumpf 1.37 PEG_METHOD_EXIT();
|
496 mday 1.24 return SendForget(response);
|
497 mday 1.18
|
498 mday 1.17 }
499
|
500 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
501 mday 1.1 {
|
502 mday 1.19 cimom::_make_response(req, code);
|
503 mday 1.1 }
504
505
|
506 mday 1.5 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
507 AsyncReply *reply,
508 Uint32 state,
509 Uint32 flag)
510 {
|
511 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
512 "MessageQueueService::_completeAsyncResponse");
513
|
514 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
515 kumpf 1.37
516 PEG_METHOD_EXIT();
|
517 mday 1.5 }
518
519
|
520 mday 1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
521 Uint32 state,
522 Uint32 flag,
523 Uint32 code)
524 {
525 cimom::_complete_op_node(op, state, flag, code);
526 }
527
|
528 mday 1.5
529 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
530 {
|
531 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
532 return false;
533
|
534 mday 1.20 // ATTN optimization remove the message checking altogether in the base
535 // << Mon Feb 18 14:02:20 2002 mdd >>
|
536 mday 1.6 op->lock();
537 Message *rq = op->_request.next(0);
|
538 mday 1.20 Message *rp = op->_response.next(0);
|
539 mday 1.6 op->unlock();
540
|
541 mday 1.22 if( (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&
542 _die.value() == 0 )
|
543 mday 1.5 {
|
544 mday 1.6 _incoming.insert_last_wait(op);
|
545 mday 1.53 _polling_sem.signal();
|
546 mday 1.5 return true;
547 }
|
548 mday 1.32 // else
549 // {
550 // if( (rq != 0 && (true == MessageQueueService::messageOK(rq))) ||
551 // (rp != 0 && ( true == MessageQueueService::messageOK(rp) )) &&
552 // _die.value() == 0)
553 // {
554 // MessageQueueService::_incoming.insert_last_wait(op);
555 // return true;
556 // }
557 // }
558
|
559 mday 1.5 return false;
560 }
561
562 Boolean MessageQueueService::messageOK(const Message *msg)
563 {
|
564 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
565 return false;
|
566 mday 1.18 return true;
567 }
568
|
569 mday 1.29
570 // made pure virtual
571 // << Wed Mar 6 15:11:31 2002 mdd >>
|
572 mday 1.25 // void MessageQueueService::handleEnqueue(Message *msg)
573 // {
574 // if ( msg )
575 // delete msg;
576 // }
|
577 mday 1.5
|
578 mday 1.29 // made pure virtual
579 // << Wed Mar 6 15:11:56 2002 mdd >>
|
580 mday 1.25 // void MessageQueueService::handleEnqueue(void)
581 // {
582 // Message *msg = dequeue();
583 // handleEnqueue(msg);
584 // }
|
585 mday 1.5
|
586 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
587 {
588 // default action is to echo a heartbeat response
589
590 AsyncReply *reply =
591 new AsyncReply(async_messages::HEARTBEAT,
592 req->getKey(),
593 req->getRouting(),
594 0,
595 req->op,
596 async_results::OK,
597 req->resp,
598 false);
|
599 mday 1.4 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
|
600 mday 1.1 }
601
602
603 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
604 {
605 ;
606 }
607
608 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
609 {
|
610 mday 1.8
611 switch( req->ctl )
612 {
613 case AsyncIoctl::IO_CLOSE:
614 {
615 // save my bearings
|
616 mday 1.53 // Thread *myself = req->op->_thread_ptr;
|
617 mday 1.34 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
|
618 mday 1.8
619 // respond to this message.
620 _make_response(req, async_results::OK);
621 // ensure we do not accept any further messages
622
623 // ensure we don't recurse on IO_CLOSE
624 if( _incoming_queue_shutdown.value() > 0 )
625 break;
626
627 // set the closing flag
628 service->_incoming_queue_shutdown = 1;
629 // empty out the queue
630 while( 1 )
631 {
632 AsyncOpNode *operation;
633 try
634 {
635 operation = service->_incoming.remove_first();
636 }
637 catch(IPCException & )
638 {
639 mday 1.8 break;
640 }
641 if( operation )
642 {
|
643 mday 1.53 // operation->_thread_ptr = myself;
|
644 mday 1.34 operation->_service_ptr = service;
645 service->_handle_incoming_operation(operation);
|
646 mday 1.8 }
647 else
648 break;
649 } // message processing loop
650
651 // shutdown the AsyncDQueue
652 service->_incoming.shutdown_queue();
653 // exit the thread !
|
654 mday 1.53 // myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
|
655 mday 1.8 return;
656 }
657
658 default:
659 _make_response(req, async_results::CIM_NAK);
660 }
|
661 mday 1.1 }
|
662 mday 1.8
|
663 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
664 {
|
665 mday 1.10 // clear the stoped bit and update
|
666 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
667 mday 1.10 _make_response(req, async_results::OK);
668 // now tell the meta dispatcher we are stopped
669 update_service(_capabilities, _mask);
670
|
671 mday 1.1 }
672 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
673 {
|
674 mday 1.10 // set the stopeed bit and update
675 _capabilities |= module_capabilities::stopped;
676 _make_response(req, async_results::CIM_STOPPED);
677 // now tell the meta dispatcher we are stopped
678 update_service(_capabilities, _mask);
679
|
680 mday 1.1 }
681 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
682 {
|
683 mday 1.10 // set the paused bit and update
|
684 mday 1.13 _capabilities |= module_capabilities::paused;
|
685 mday 1.11 update_service(_capabilities, _mask);
|
686 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
687 // now tell the meta dispatcher we are stopped
|
688 mday 1.1 }
689 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
690 {
|
691 mday 1.10 // clear the paused bit and update
|
692 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
693 mday 1.11 update_service(_capabilities, _mask);
|
694 mday 1.10 _make_response(req, async_results::OK);
695 // now tell the meta dispatcher we are stopped
|
696 mday 1.1 }
697
698 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
699 {
700 _make_response(req, async_results::CIM_NAK);
701 }
702
703 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
704 {
|
705 mday 1.14 ;
706 }
707
|
708 mday 1.10
|
709 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
710 {
711 // remove the legacy message from the request and enqueue it to its destination
712 Uint32 result = async_results::CIM_NAK;
713
|
714 mday 1.25 Message *legacy = req->_act;
|
715 mday 1.14 if ( legacy != 0 )
716 {
|
717 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
718 mday 1.14 if( queue != 0 )
719 {
|
720 mday 1.25 if(queue->isAsync() == true )
721 {
722 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
723 }
724 else
725 {
726 // Enqueue the response:
727 queue->enqueue(req->get_action());
728 }
729
|
730 mday 1.14 result = async_results::OK;
731 }
732 }
733 _make_response(req, result);
734 }
735
736 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
737 {
738 ;
|
739 mday 1.1 }
740
741 AsyncOpNode *MessageQueueService::get_op(void)
742 {
|
743 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
744 mday 1.1
|
745 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
746 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
747 mday 1.4
|
748 mday 1.1 return op;
749 }
750
751 void MessageQueueService::return_op(AsyncOpNode *op)
752 {
753 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
|
754 mday 1.4 delete op;
|
755 mday 1.1 }
756
|
757 mday 1.18
|
758 mday 1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op,
759 Uint32 destination)
760 {
761 PEGASUS_ASSERT(op != 0 );
|
762 mday 1.30 op->lock();
763 op->_op_dest = MessageQueue::lookup(destination);
|
764 mday 1.29 op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
765 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
|
766 mday 1.30 op->unlock();
767 if ( op->_op_dest == 0 )
768 return false;
769
|
770 mday 1.29 return _meta_dispatcher->route_async(op);
771 }
772
|
773 mday 1.39
|
774 mday 1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op,
775 Uint32 destination,
|
776 mday 1.18 void (*callback)(AsyncOpNode *,
|
777 mday 1.32 MessageQueue *,
|
778 mday 1.30 void *),
779 MessageQueue *callback_response_q,
780 void *callback_ptr)
|
781 mday 1.20 {
|
782 mday 1.21 PEGASUS_ASSERT(op != 0 && callback != 0 );
|
783 mday 1.18
|
784 mday 1.21 // get the queue handle for the destination
785
|
786 mday 1.30 op->lock();
|
787 mday 1.32 op->_op_dest = MessageQueue::lookup(destination); // destination of this message
|
788 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
789 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
790 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
791 mday 1.30 // initialize the callback data
|
792 mday 1.32 op->_async_callback = callback; // callback function to be executed by recpt. of response
793 op->_callback_node = op; // the op node
794 op->_callback_response_q = callback_response_q; // the queue that will receive the response
795 op->_callback_ptr = callback_ptr; // user data for callback
796 op->_callback_request_q = this; // I am the originator of this request
|
797 mday 1.30
798 op->unlock();
799 if(op->_op_dest == 0)
800 return false;
|
801 mday 1.21
802 return _meta_dispatcher->route_async(op);
|
803 mday 1.18 }
804
805
|
806 mday 1.39 Boolean MessageQueueService::SendAsync(Message *msg,
807 Uint32 destination,
808 void (*callback)(Message *response,
809 void *handle,
810 void *parameter),
811 void *handle,
812 void *parameter)
813 {
814 if(msg == NULL)
815 return false;
816 if(callback == NULL)
817 return SendForget(msg);
818 AsyncOpNode *op = get_op();
|
819 mday 1.43 msg->dest = destination;
|
820 mday 1.39 if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
821 {
822 op->release();
823 return_op(op);
824 return false;
825 }
826 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
827 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
828 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
829 op->__async_callback = callback;
830 op->_callback_node = op;
831 op->_callback_handle = handle;
832 op->_callback_parameter = parameter;
833 op->_callback_response_q = this;
834
835
836 if( ! (msg->getMask() & message_mask::ha_async) )
837 {
838 AsyncLegacyOperationStart *wrapper =
839 new AsyncLegacyOperationStart(get_next_xid(),
840 op,
841 mday 1.39 destination,
842 msg,
843 destination);
844 }
845 else
846 {
847 op->_request.insert_first(msg);
848 (static_cast<AsyncMessage *>(msg))->op = op;
849 }
850
851 _callback.insert_last(op);
852 return _meta_dispatcher->route_async(op);
853 }
854
855
|
856 mday 1.18 Boolean MessageQueueService::SendForget(Message *msg)
857 {
858
|
859 mday 1.24
|
860 mday 1.18 AsyncOpNode *op = 0;
|
861 mday 1.22 Uint32 mask = msg->getMask();
862
863 if (mask & message_mask::ha_async)
|
864 mday 1.18 {
865 op = (static_cast<AsyncMessage *>(msg))->op ;
866 }
|
867 mday 1.22
|
868 mday 1.18 if( op == 0 )
|
869 mday 1.20 {
|
870 mday 1.18 op = get_op();
|
871 mday 1.20 op->_request.insert_first(msg);
|
872 mday 1.22 if (mask & message_mask::ha_async)
873 (static_cast<AsyncMessage *>(msg))->op = op;
|
874 mday 1.20 }
|
875 mday 1.30 op->_op_dest = MessageQueue::lookup(msg->dest);
|
876 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
877 mday 1.41 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
878 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
879 mday 1.22 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
880 mday 1.30 if ( op->_op_dest == 0 )
|
881 mday 1.39 {
882 op->release();
883 return_op(op);
|
884 mday 1.21 return false;
|
885 mday 1.39 }
|
886 mday 1.24
|
887 mday 1.18 // now see if the meta dispatcher will take it
|
888 mday 1.30 return _meta_dispatcher->route_async(op);
|
889 mday 1.18 }
|
890 mday 1.2
|
891 mday 1.1
|
892 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
893 mday 1.1 {
|
894 mday 1.4 if ( request == 0 )
895 return 0 ;
|
896 mday 1.5
897 Boolean destroy_op = false;
898
899 if (request->op == false)
900 {
901 request->op = get_op();
|
902 mday 1.7 request->op->_request.insert_first(request);
|
903 mday 1.5 destroy_op = true;
904 }
|
905 mday 1.4
|
906 mday 1.33 request->block = false;
|
907 mday 1.35 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
|
908 mday 1.33 SendAsync(request->op,
909 request->dest,
|
910 mday 1.36 _sendwait_callback,
|
911 mday 1.33 this,
912 (void *)0);
|
913 mday 1.4
|
914 mday 1.33 request->op->_client_sem.wait();
|
915 mday 1.6 request->op->lock();
916 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
917 rpl->op = 0;
918 request->op->unlock();
919
|
920 mday 1.5 if( destroy_op == true)
921 {
|
922 mday 1.6 request->op->lock();
923 request->op->_request.remove(request);
924 request->op->_state |= ASYNC_OPSTATE_RELEASED;
925 request->op->unlock();
926 return_op(request->op);
|
927 mday 1.7 request->op = 0;
|
928 mday 1.5 }
929 return rpl;
|
930 mday 1.1 }
931
932
933 Boolean MessageQueueService::register_service(String name,
934 Uint32 capabilities,
935 Uint32 mask)
936
937 {
938 RegisterCimService *msg = new RegisterCimService(get_next_xid(),
|
939 mday 1.5 0,
|
940 mday 1.1 true,
941 name,
942 capabilities,
943 mask,
944 _queueId);
|
945 mday 1.44 msg->dest = CIMOM_Q_ID;
946
|
947 mday 1.1 Boolean registered = false;
|
948 mday 1.7 AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
|
949 mday 1.1
|
950 mday 1.2 if ( reply != 0 )
|
951 mday 1.1 {
952 if(reply->getMask() & message_mask:: ha_async)
953 {
954 if(reply->getMask() & message_mask::ha_reply)
955 {
|
956 mday 1.15 if(reply->result == async_results::OK ||
957 reply->result == async_results::MODULE_ALREADY_REGISTERED )
|
958 mday 1.1 registered = true;
959 }
960 }
961
|
962 mday 1.7 delete reply;
|
963 mday 1.1 }
|
964 mday 1.5 delete msg;
|
965 mday 1.1 return registered;
966 }
967
968 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
969 {
970
971
972 UpdateCimService *msg = new UpdateCimService(get_next_xid(),
|
973 mday 1.5 0,
|
974 mday 1.1 true,
975 _queueId,
976 _capabilities,
977 _mask);
978 Boolean registered = false;
|
979 mday 1.2
980 AsyncMessage *reply = SendWait(msg);
981 if (reply)
|
982 mday 1.1 {
983 if(reply->getMask() & message_mask:: ha_async)
984 {
985 if(reply->getMask() & message_mask::ha_reply)
986 {
|
987 mday 1.2 if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
|
988 mday 1.1 registered = true;
989 }
990 }
991 delete reply;
992 }
|
993 mday 1.5 delete msg;
|
994 mday 1.1 return registered;
995 }
996
997
998 Boolean MessageQueueService::deregister_service(void)
999 {
|
1000 mday 1.3
|
1001 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
1002 return true;
|
1003 mday 1.1 }
1004
1005
1006 void MessageQueueService::find_services(String name,
1007 Uint32 capabilities,
1008 Uint32 mask,
1009 Array<Uint32> *results)
1010 {
1011
1012 if( results == 0 )
1013 throw NullPointer();
|
1014 mday 1.5
|
1015 mday 1.1 results->clear();
1016
1017 FindServiceQueue *req =
1018 new FindServiceQueue(get_next_xid(),
|
1019 mday 1.5 0,
|
1020 mday 1.1 _queueId,
1021 true,
1022 name,
1023 capabilities,
1024 mask);
|
1025 mday 1.44
1026 req->dest = CIMOM_Q_ID;
|
1027 mday 1.1
|
1028 mday 1.2 AsyncMessage *reply = SendWait(req);
1029 if(reply)
|
1030 mday 1.1 {
1031 if( reply->getMask() & message_mask::ha_async)
1032 {
1033 if(reply->getMask() & message_mask::ha_reply)
1034 {
1035 if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1036 {
1037 if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1038 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1039 }
1040 }
1041 }
1042 delete reply;
1043 }
|
1044 mday 1.5 delete req;
|
1045 mday 1.1 return ;
1046 }
1047
1048 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1049 {
1050 if(result == 0)
1051 throw NullPointer();
1052
1053 EnumerateService *req
1054 = new EnumerateService(get_next_xid(),
|
1055 mday 1.5 0,
|
1056 mday 1.1 _queueId,
1057 true,
1058 queue);
1059
|
1060 mday 1.2 AsyncMessage *reply = SendWait(req);
|
1061 mday 1.1
|
1062 mday 1.2 if (reply)
|
1063 mday 1.1 {
1064 Boolean found = false;
1065
1066 if( reply->getMask() & message_mask::ha_async)
1067 {
1068 if(reply->getMask() & message_mask::ha_reply)
1069 {
1070 if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1071 {
1072 if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1073 {
1074 if( found == false)
1075 {
1076 found = true;
1077
1078 result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1079 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1080 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1081 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1082 }
1083 }
1084 mday 1.1 }
1085 }
1086 }
1087 delete reply;
1088 }
|
1089 mday 1.5 delete req;
1090
|
1091 mday 1.1 return;
1092 }
1093
1094 Uint32 MessageQueueService::get_next_xid(void)
1095 {
1096 _xid++;
1097 return _xid.value();
1098 }
1099
1100 PEGASUS_NAMESPACE_END
|