1 mday 1.53 //%////-*-c++-*-////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 kumpf 1.54 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
|
5 mday 1.1 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
|
12 mday 1.53 //
|
13 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
14 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
16 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
19 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 //==============================================================================
23 //
24 // Author: Mike Day (mdday@us.ibm.com)
25 //
26 // Modified By:
27 //
28 //%/////////////////////////////////////////////////////////////////////////////
29
30 #include "MessageQueueService.h"
|
31 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
32 mday 1.1
33 PEGASUS_NAMESPACE_BEGIN
34
|
35 mday 1.15
36 cimom *MessageQueueService::_meta_dispatcher = 0;
37 AtomicInt MessageQueueService::_service_count = 0;
38 AtomicInt MessageQueueService::_xid(1);
|
39 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
40 mday 1.15
|
41 mday 1.58 static struct timeval create_time = {0, 1};
|
42 mday 1.55 static struct timeval destroy_time = {5, 0};
|
43 mday 1.56 static struct timeval deadlock_time = {1000, 0};
|
44 mday 1.53
|
45 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
46 mday 1.53
47 DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
48
49 int MessageQueueService::kill_idle_threads(void)
50 {
51 static struct timeval now, last;
52 gettimeofday(&now, NULL);
|
53 mday 1.56 int dead_threads = 0;
|
54 mday 1.53
55 if( now.tv_sec - last.tv_sec > 0 )
56 {
57 gettimeofday(&last, NULL);
|
58 mday 1.56 try
59 {
60 dead_threads = _thread_pool->kill_dead_threads();
61 }
62 catch(IPCException& )
63 {
64
65 }
|
66 mday 1.53 }
|
67 mday 1.56 return dead_threads;
|
68 mday 1.53 }
69
70 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
71 {
72 Thread *myself = reinterpret_cast<Thread *>(parm);
73
74 DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
75 while ( _stop_polling.value() == 0 )
76 {
77 _polling_sem.wait();
78 list->lock();
79 MessageQueueService *service = list->next(0);
80 while(service != NULL)
81 {
82 if(service->_incoming.count() > 0 )
83 {
|
84 mday 1.55 _thread_pool->allocate_and_awaken(service, _req_proc);
|
85 mday 1.53 // service->_req_proc(service);
86 }
87 service = list->next(service);
88 }
89 list->unlock();
90 }
91 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
92 return(0);
93 }
94
95 Thread MessageQueueService::_polling_thread(polling_routine,
96 reinterpret_cast<void *>(&_polling_list),
97 false);
98
99
100 Semaphore MessageQueueService::_polling_sem(0);
101 AtomicInt MessageQueueService::_stop_polling(0);
102
|
103 mday 1.15
|
104 mday 1.1 MessageQueueService::MessageQueueService(const char *name,
105 Uint32 queueID,
106 Uint32 capabilities,
107 Uint32 mask)
|
108 mday 1.15 : Base(name, true, queueID),
|
109 mday 1.22
|
110 mday 1.1 _mask(mask),
|
111 mday 1.4 _die(0),
|
112 mday 1.41 _incoming(true, 0),
|
113 mday 1.39 _callback(true),
|
114 mday 1.7 _incoming_queue_shutdown(0),
|
115 mday 1.39 _callback_ready(0),
116 _req_thread(_req_proc, this, false),
117 _callback_thread(_callback_proc, this, false)
|
118 mday 1.53
|
119 mday 1.1 {
|
120 mday 1.22 _capabilities = (capabilities | module_capabilities::async);
121
|
122 mday 1.1 _default_op_timeout.tv_sec = 30;
123 _default_op_timeout.tv_usec = 100;
|
124 mday 1.15
125 _meta_dispatcher_mutex.lock(pegasus_thread_self());
126
127 if( _meta_dispatcher == 0 )
128 {
129 PEGASUS_ASSERT( _service_count.value() == 0 );
130 _meta_dispatcher = new cimom();
131 if (_meta_dispatcher == NULL )
132 {
133 _meta_dispatcher_mutex.unlock();
134 throw NullPointer();
135 }
|
136 mday 1.58 _thread_pool = new ThreadPool(0, "MessageQueueService", 0, 0,
|
137 mday 1.55 create_time, destroy_time, deadlock_time);
|
138 mday 1.53 _polling_thread.run();
|
139 mday 1.15 }
140 _service_count++;
141
142 if( false == register_service(name, _capabilities, _mask) )
143 {
144 _meta_dispatcher_mutex.unlock();
145 throw BindFailed("MessageQueueService Base Unable to register with Meta Dispatcher");
146 }
147
|
148 mday 1.53 _polling_list.insert_last(this);
149
|
150 mday 1.15 _meta_dispatcher_mutex.unlock();
|
151 mday 1.39 // _callback_thread.run();
152
|
153 mday 1.53 // _req_thread.run();
|
154 mday 1.1 }
155
|
156 mday 1.4
|
157 mday 1.1 MessageQueueService::~MessageQueueService(void)
158 {
159 _die = 1;
|
160 mday 1.7 if (_incoming_queue_shutdown.value() == 0 )
|
161 mday 1.16 {
|
162 mday 1.32 _shutdown_incoming_queue();
|
163 mday 1.16 }
|
164 mday 1.39 _callback_ready.signal();
|
165 mday 1.53 // _callback_thread.join();
|
166 mday 1.39
|
167 mday 1.15 _meta_dispatcher_mutex.lock(pegasus_thread_self());
168 _service_count--;
169 if (_service_count.value() == 0 )
170 {
|
171 mday 1.53 _stop_polling++;
172 _polling_sem.signal();
173 _polling_thread.join();
|
174 mday 1.15 _meta_dispatcher->_shutdown_routed_queue();
175 delete _meta_dispatcher;
|
176 kumpf 1.45 _meta_dispatcher = 0;
|
177 mday 1.53
|
178 mday 1.15 }
179 _meta_dispatcher_mutex.unlock();
|
180 mday 1.53 _polling_list.remove(this);
181 }
|
182 mday 1.1
|
183 mday 1.7 void MessageQueueService::_shutdown_incoming_queue(void)
184 {
185
|
186 mday 1.9 if (_incoming_queue_shutdown.value() > 0 )
187 return ;
|
188 mday 1.8 AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
189 0,
190 _queueId,
191 _queueId,
192 true,
193 AsyncIoctl::IO_CLOSE,
194 0,
195 0);
|
196 mday 1.9
|
197 mday 1.8 msg->op = get_op();
|
198 mday 1.41 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
199 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
200 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
201 mday 1.32 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
202 mday 1.41
|
203 mday 1.32 msg->op->_op_dest = this;
|
204 mday 1.8 msg->op->_request.insert_first(msg);
205
206 _incoming.insert_last_wait(msg->op);
|
207 mday 1.32
|
208 mday 1.53 // _req_thread.join();
|
209 mday 1.16
|
210 mday 1.7 }
211
|
212 mday 1.22
213
214 void MessageQueueService::enqueue(Message *msg) throw(IPCException)
215 {
|
216 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
217
|
218 mday 1.22 Base::enqueue(msg);
|
219 kumpf 1.28
220 PEG_METHOD_EXIT();
|
221 mday 1.22 }
222
223
|
224 mday 1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
225 {
226 Thread *myself = reinterpret_cast<Thread *>(parm);
227 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
228 AsyncOpNode *operation = 0;
229
230 while ( service->_die.value() == 0 )
231 {
232 service->_callback_ready.wait();
233
234 service->_callback.lock();
235 operation = service->_callback.next(0);
236 while( operation != NULL)
237 {
238 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
239 {
240 operation = service->_callback.remove_no_lock(operation);
241 PEGASUS_ASSERT(operation != NULL);
242 operation->_thread_ptr = myself;
243 operation->_service_ptr = service;
244 service->_handle_async_callback(operation);
245 mday 1.39 break;
246 }
247 operation = service->_callback.next(operation);
248 }
249 service->_callback.unlock();
250 }
251 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
252 return(0);
253 }
254
|
255 mday 1.22
|
256 mday 1.5 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
|
257 mday 1.1 {
|
258 mday 1.53 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
|
259 mday 1.5 // pull messages off the incoming queue and dispatch them. then
260 // check pending messages that are non-blocking
|
261 mday 1.7 AsyncOpNode *operation = 0;
262
|
263 mday 1.56 if ( service->_die.value() == 0 )
264 {
|
265 mday 1.41 try
266 {
|
267 mday 1.53 operation = service->_incoming.remove_first();
|
268 mday 1.41 }
269 catch(ListClosed & )
270 {
|
271 mday 1.56 operation = 0;
272
273 return(0);
|
274 mday 1.41 }
275 if( operation )
276 {
277 operation->_service_ptr = service;
278 service->_handle_incoming_operation(operation);
279 }
|
280 mday 1.56 }
|
281 mday 1.44
|
282 mday 1.5 return(0);
|
283 mday 1.1 }
284
|
285 mday 1.43 Uint32 MessageQueueService::get_pending_callback_count(void)
286 {
287 return _callback.count();
288 }
289
290
291
|
292 mday 1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op,
293 MessageQueue *q,
294 void *parm)
295 {
296 op->_client_sem.signal();
297 }
298
|
299 mday 1.30
300 // callback function is responsible for cleaning up all resources
301 // including op, op->_callback_node, and op->_callback_ptr
|
302 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
303 {
|
304 mday 1.39 if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
305 {
306
307 Message *msg = op->get_request();
308 if( msg && ( msg->getMask() & message_mask::ha_async))
309 {
310 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
311 {
312 AsyncLegacyOperationStart *wrapper =
313 static_cast<AsyncLegacyOperationStart *>(msg);
314 msg = wrapper->get_action();
315 delete wrapper;
316 }
317 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
318 {
319 AsyncModuleOperationStart *wrapper =
320 static_cast<AsyncModuleOperationStart *>(msg);
321 msg = wrapper->get_action();
322 delete wrapper;
323 }
324 else if (msg->getType() == async_messages::ASYNC_OP_START)
325 mday 1.39 {
326 AsyncOperationStart *wrapper =
327 static_cast<AsyncOperationStart *>(msg);
328 msg = wrapper->get_action();
329 delete wrapper;
330 }
331 delete msg;
332 }
333
334 msg = op->get_response();
335 if( msg && ( msg->getMask() & message_mask::ha_async))
336 {
337 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
338 {
339 AsyncLegacyOperationResult *wrapper =
340 static_cast<AsyncLegacyOperationResult *>(msg);
341 msg = wrapper->get_result();
342 delete wrapper;
343 }
344 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
345 {
346 mday 1.39 AsyncModuleOperationResult *wrapper =
347 static_cast<AsyncModuleOperationResult *>(msg);
348 msg = wrapper->get_result();
349 delete wrapper;
350 }
351 }
352 void (*callback)(Message *, void *, void *) = op->__async_callback;
353 void *handle = op->_callback_handle;
354 void *parm = op->_callback_parameter;
355 op->release();
356 return_op(op);
357 callback(msg, handle, parm);
358 }
359 else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
360 {
361 // note that _callback_node may be different from op
362 // op->_callback_response_q is a "this" pointer we can use for
363 // static callback methods
364 op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
365 }
|
366 mday 1.22 }
367
|
368 mday 1.1
|
369 mday 1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
370 // Thread *thread,
371 // MessageQueue *queue)
|
372 mday 1.1 {
|
373 mday 1.5 if ( operation != 0 )
374 {
|
375 mday 1.29
|
376 mday 1.22 // ATTN: optimization
377 // << Tue Feb 19 14:10:38 2002 mdd >>
|
378 mday 1.6 operation->lock();
|
379 mday 1.29
|
380 mday 1.6 Message *rq = operation->_request.next(0);
|
381 mday 1.29
|
382 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
383 // move this to the bottom of the loop when the majority of
384 // messages become async messages.
385
|
386 mday 1.18 // divert legacy messages to handleEnqueue
|
387 mday 1.29 if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
|
388 mday 1.18 {
389 rq = operation->_request.remove_first() ;
390 operation->unlock();
391 // delete the op node
|
392 mday 1.39 operation->release();
393 return_op( operation);
|
394 mday 1.24
|
395 mday 1.26 handleEnqueue(rq);
|
396 mday 1.18 return;
397 }
398
|
399 mday 1.39 if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
400 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
|
401 mday 1.29 (operation->_state & ASYNC_OPSTATE_COMPLETE))
402 {
|
403 mday 1.49
|
404 mday 1.29 operation->unlock();
405 _handle_async_callback(operation);
406 }
407 else
408 {
409 PEGASUS_ASSERT(rq != 0 );
410 // ATTN: optimization
411 // << Wed Mar 6 15:00:39 2002 mdd >>
412 // put thread and queue into the asyncopnode structure.
|
413 mday 1.34 // (static_cast<AsyncMessage *>(rq))->_myself = operation->_thread_ptr;
414 // (static_cast<AsyncMessage *>(rq))->_service = operation->_service_ptr;
415 // done << Tue Mar 12 14:49:07 2002 mdd >>
|
416 mday 1.29 operation->unlock();
417 _handle_async_request(static_cast<AsyncRequest *>(rq));
418 }
|
419 mday 1.5 }
420 return;
|
421 mday 1.1 }
422
423 void MessageQueueService::_handle_async_request(AsyncRequest *req)
424 {
|
425 mday 1.4 if ( req != 0 )
426 {
427 req->op->processing();
|
428 mday 1.1
|
429 mday 1.4 Uint32 type = req->getType();
430 if( type == async_messages::HEARTBEAT )
431 handle_heartbeat_request(req);
432 else if (type == async_messages::IOCTL)
433 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
434 else if (type == async_messages::CIMSERVICE_START)
435 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
436 else if (type == async_messages::CIMSERVICE_STOP)
437 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
438 else if (type == async_messages::CIMSERVICE_PAUSE)
439 handle_CimServicePause(static_cast<CimServicePause *>(req));
440 else if (type == async_messages::CIMSERVICE_RESUME)
441 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
442 else if ( type == async_messages::ASYNC_OP_START)
443 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
444 else
445 {
446 // we don't handle this request message
447 _make_response(req, async_results::CIM_NAK );
448 }
|
449 mday 1.1 }
450 }
451
|
452 mday 1.17
453 Boolean MessageQueueService::_enqueueResponse(
454 Message* request,
455 Message* response)
|
456 mday 1.18
|
457 mday 1.17 {
|
458 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
459 "MessageQueueService::_enqueueResponse");
|
460 mday 1.25
461 if( request->getMask() & message_mask::ha_async)
462 {
463 if (response->getMask() & message_mask::ha_async )
464 {
465 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
466 static_cast<AsyncReply *>(response),
467 ASYNC_OPSTATE_COMPLETE, 0 );
|
468 kumpf 1.37 PEG_METHOD_EXIT();
|
469 mday 1.25 return true;
470 }
471 }
472
|
473 mday 1.17 if(request->_async != 0 )
474 {
475 Uint32 mask = request->_async->getMask();
|
476 mday 1.18 PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
477
478 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
479 AsyncOpNode *op = async->op;
480 request->_async = 0;
|
481 mday 1.27 // this request is probably going to be deleted !!
482 // remove it from the op node
483 op->_request.remove(request);
|
484 mday 1.57
|
485 mday 1.18
486 AsyncLegacyOperationResult *async_result =
487 new AsyncLegacyOperationResult(
488 async->getKey(),
489 async->getRouting(),
490 op,
491 response);
492 _completeAsyncResponse(async,
493 async_result,
494 ASYNC_OPSTATE_COMPLETE,
495 0);
|
496 kumpf 1.37 PEG_METHOD_EXIT();
|
497 mday 1.18 return true;
|
498 mday 1.17 }
|
499 mday 1.18
500 // ensure that the destination queue is in response->dest
|
501 kumpf 1.37 PEG_METHOD_EXIT();
|
502 mday 1.24 return SendForget(response);
|
503 mday 1.18
|
504 mday 1.17 }
505
|
506 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
507 mday 1.1 {
|
508 mday 1.19 cimom::_make_response(req, code);
|
509 mday 1.1 }
510
511
|
512 mday 1.5 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
513 AsyncReply *reply,
514 Uint32 state,
515 Uint32 flag)
516 {
|
517 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
518 "MessageQueueService::_completeAsyncResponse");
519
|
520 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
521 kumpf 1.37
522 PEG_METHOD_EXIT();
|
523 mday 1.5 }
524
525
|
526 mday 1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
527 Uint32 state,
528 Uint32 flag,
529 Uint32 code)
530 {
531 cimom::_complete_op_node(op, state, flag, code);
532 }
533
|
534 mday 1.5
535 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
536 {
|
537 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
538 return false;
539
|
540 mday 1.20 // ATTN optimization remove the message checking altogether in the base
541 // << Mon Feb 18 14:02:20 2002 mdd >>
|
542 mday 1.6 op->lock();
543 Message *rq = op->_request.next(0);
|
544 mday 1.20 Message *rp = op->_response.next(0);
|
545 mday 1.6 op->unlock();
546
|
547 mday 1.22 if( (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&
548 _die.value() == 0 )
|
549 mday 1.5 {
|
550 mday 1.6 _incoming.insert_last_wait(op);
|
551 mday 1.53 _polling_sem.signal();
|
552 mday 1.5 return true;
553 }
|
554 mday 1.32 // else
555 // {
556 // if( (rq != 0 && (true == MessageQueueService::messageOK(rq))) ||
557 // (rp != 0 && ( true == MessageQueueService::messageOK(rp) )) &&
558 // _die.value() == 0)
559 // {
560 // MessageQueueService::_incoming.insert_last_wait(op);
561 // return true;
562 // }
563 // }
564
|
565 mday 1.5 return false;
566 }
567
568 Boolean MessageQueueService::messageOK(const Message *msg)
569 {
|
570 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
571 return false;
|
572 mday 1.18 return true;
573 }
574
|
575 mday 1.29
576 // made pure virtual
577 // << Wed Mar 6 15:11:31 2002 mdd >>
|
578 mday 1.25 // void MessageQueueService::handleEnqueue(Message *msg)
579 // {
580 // if ( msg )
581 // delete msg;
582 // }
|
583 mday 1.5
|
584 mday 1.29 // made pure virtual
585 // << Wed Mar 6 15:11:56 2002 mdd >>
|
586 mday 1.25 // void MessageQueueService::handleEnqueue(void)
587 // {
588 // Message *msg = dequeue();
589 // handleEnqueue(msg);
590 // }
|
591 mday 1.5
|
592 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
593 {
594 // default action is to echo a heartbeat response
595
596 AsyncReply *reply =
597 new AsyncReply(async_messages::HEARTBEAT,
598 req->getKey(),
599 req->getRouting(),
600 0,
601 req->op,
602 async_results::OK,
603 req->resp,
604 false);
|
605 mday 1.4 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
|
606 mday 1.1 }
607
608
609 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
610 {
611 ;
612 }
613
614 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
615 {
|
616 mday 1.8
617 switch( req->ctl )
618 {
619 case AsyncIoctl::IO_CLOSE:
620 {
621 // save my bearings
|
622 mday 1.53 // Thread *myself = req->op->_thread_ptr;
|
623 mday 1.34 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
|
624 mday 1.8
625 // respond to this message.
626 _make_response(req, async_results::OK);
627 // ensure we do not accept any further messages
628
629 // ensure we don't recurse on IO_CLOSE
630 if( _incoming_queue_shutdown.value() > 0 )
631 break;
632
633 // set the closing flag
634 service->_incoming_queue_shutdown = 1;
635 // empty out the queue
636 while( 1 )
637 {
638 AsyncOpNode *operation;
639 try
640 {
641 operation = service->_incoming.remove_first();
642 }
643 catch(IPCException & )
644 {
645 mday 1.8 break;
646 }
647 if( operation )
648 {
|
649 mday 1.53 // operation->_thread_ptr = myself;
|
650 mday 1.34 operation->_service_ptr = service;
651 service->_handle_incoming_operation(operation);
|
652 mday 1.8 }
653 else
654 break;
655 } // message processing loop
656
657 // shutdown the AsyncDQueue
658 service->_incoming.shutdown_queue();
659 // exit the thread !
|
660 mday 1.53 // myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
|
661 mday 1.8 return;
662 }
663
664 default:
665 _make_response(req, async_results::CIM_NAK);
666 }
|
667 mday 1.1 }
|
668 mday 1.8
|
669 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
670 {
|
671 mday 1.10 // clear the stoped bit and update
|
672 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
673 mday 1.10 _make_response(req, async_results::OK);
674 // now tell the meta dispatcher we are stopped
675 update_service(_capabilities, _mask);
676
|
677 mday 1.1 }
678 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
679 {
|
680 mday 1.10 // set the stopeed bit and update
681 _capabilities |= module_capabilities::stopped;
682 _make_response(req, async_results::CIM_STOPPED);
683 // now tell the meta dispatcher we are stopped
684 update_service(_capabilities, _mask);
685
|
686 mday 1.1 }
687 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
688 {
|
689 mday 1.10 // set the paused bit and update
|
690 mday 1.13 _capabilities |= module_capabilities::paused;
|
691 mday 1.11 update_service(_capabilities, _mask);
|
692 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
693 // now tell the meta dispatcher we are stopped
|
694 mday 1.1 }
695 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
696 {
|
697 mday 1.10 // clear the paused bit and update
|
698 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
699 mday 1.11 update_service(_capabilities, _mask);
|
700 mday 1.10 _make_response(req, async_results::OK);
701 // now tell the meta dispatcher we are stopped
|
702 mday 1.1 }
703
704 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
705 {
706 _make_response(req, async_results::CIM_NAK);
707 }
708
709 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
710 {
|
711 mday 1.14 ;
712 }
713
|
714 mday 1.10
|
715 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
716 {
717 // remove the legacy message from the request and enqueue it to its destination
718 Uint32 result = async_results::CIM_NAK;
719
|
720 mday 1.25 Message *legacy = req->_act;
|
721 mday 1.14 if ( legacy != 0 )
722 {
|
723 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
724 mday 1.14 if( queue != 0 )
725 {
|
726 mday 1.25 if(queue->isAsync() == true )
727 {
728 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
729 }
730 else
731 {
732 // Enqueue the response:
733 queue->enqueue(req->get_action());
734 }
735
|
736 mday 1.14 result = async_results::OK;
737 }
738 }
739 _make_response(req, result);
740 }
741
742 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
743 {
744 ;
|
745 mday 1.1 }
746
747 AsyncOpNode *MessageQueueService::get_op(void)
748 {
|
749 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
750 mday 1.1
|
751 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
752 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
753 mday 1.4
|
754 mday 1.1 return op;
755 }
756
757 void MessageQueueService::return_op(AsyncOpNode *op)
758 {
759 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
|
760 mday 1.4 delete op;
|
761 mday 1.1 }
762
|
763 mday 1.18
|
764 mday 1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op,
765 Uint32 destination)
766 {
767 PEGASUS_ASSERT(op != 0 );
|
768 mday 1.30 op->lock();
769 op->_op_dest = MessageQueue::lookup(destination);
|
770 mday 1.29 op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
771 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
|
772 mday 1.30 op->unlock();
773 if ( op->_op_dest == 0 )
774 return false;
775
|
776 mday 1.29 return _meta_dispatcher->route_async(op);
777 }
778
|
779 mday 1.39
|
780 mday 1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op,
781 Uint32 destination,
|
782 mday 1.18 void (*callback)(AsyncOpNode *,
|
783 mday 1.32 MessageQueue *,
|
784 mday 1.30 void *),
785 MessageQueue *callback_response_q,
786 void *callback_ptr)
|
787 mday 1.20 {
|
788 mday 1.21 PEGASUS_ASSERT(op != 0 && callback != 0 );
|
789 mday 1.18
|
790 mday 1.21 // get the queue handle for the destination
791
|
792 mday 1.30 op->lock();
|
793 mday 1.32 op->_op_dest = MessageQueue::lookup(destination); // destination of this message
|
794 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
795 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
796 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
797 mday 1.30 // initialize the callback data
|
798 mday 1.32 op->_async_callback = callback; // callback function to be executed by recpt. of response
799 op->_callback_node = op; // the op node
800 op->_callback_response_q = callback_response_q; // the queue that will receive the response
801 op->_callback_ptr = callback_ptr; // user data for callback
802 op->_callback_request_q = this; // I am the originator of this request
|
803 mday 1.30
804 op->unlock();
805 if(op->_op_dest == 0)
806 return false;
|
807 mday 1.21
808 return _meta_dispatcher->route_async(op);
|
809 mday 1.18 }
810
811
|
812 mday 1.39 Boolean MessageQueueService::SendAsync(Message *msg,
813 Uint32 destination,
814 void (*callback)(Message *response,
815 void *handle,
816 void *parameter),
817 void *handle,
818 void *parameter)
819 {
820 if(msg == NULL)
821 return false;
822 if(callback == NULL)
823 return SendForget(msg);
824 AsyncOpNode *op = get_op();
|
825 mday 1.43 msg->dest = destination;
|
826 mday 1.39 if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
827 {
828 op->release();
829 return_op(op);
830 return false;
831 }
832 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
833 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
834 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
835 op->__async_callback = callback;
836 op->_callback_node = op;
837 op->_callback_handle = handle;
838 op->_callback_parameter = parameter;
839 op->_callback_response_q = this;
840
841
842 if( ! (msg->getMask() & message_mask::ha_async) )
843 {
844 AsyncLegacyOperationStart *wrapper =
845 new AsyncLegacyOperationStart(get_next_xid(),
846 op,
847 mday 1.39 destination,
848 msg,
849 destination);
850 }
851 else
852 {
853 op->_request.insert_first(msg);
854 (static_cast<AsyncMessage *>(msg))->op = op;
855 }
856
857 _callback.insert_last(op);
858 return _meta_dispatcher->route_async(op);
859 }
860
861
|
862 mday 1.18 Boolean MessageQueueService::SendForget(Message *msg)
863 {
864
|
865 mday 1.24
|
866 mday 1.18 AsyncOpNode *op = 0;
|
867 mday 1.22 Uint32 mask = msg->getMask();
868
869 if (mask & message_mask::ha_async)
|
870 mday 1.18 {
871 op = (static_cast<AsyncMessage *>(msg))->op ;
872 }
|
873 mday 1.22
|
874 mday 1.18 if( op == 0 )
|
875 mday 1.20 {
|
876 mday 1.18 op = get_op();
|
877 mday 1.20 op->_request.insert_first(msg);
|
878 mday 1.22 if (mask & message_mask::ha_async)
879 (static_cast<AsyncMessage *>(msg))->op = op;
|
880 mday 1.20 }
|
881 mday 1.30 op->_op_dest = MessageQueue::lookup(msg->dest);
|
882 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
883 mday 1.41 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
884 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
885 mday 1.22 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
886 mday 1.30 if ( op->_op_dest == 0 )
|
887 mday 1.39 {
888 op->release();
889 return_op(op);
|
890 mday 1.21 return false;
|
891 mday 1.39 }
|
892 mday 1.24
|
893 mday 1.18 // now see if the meta dispatcher will take it
|
894 mday 1.30 return _meta_dispatcher->route_async(op);
|
895 mday 1.18 }
|
896 mday 1.2
|
897 mday 1.1
|
898 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
899 mday 1.1 {
|
900 mday 1.4 if ( request == 0 )
901 return 0 ;
|
902 mday 1.5
903 Boolean destroy_op = false;
904
905 if (request->op == false)
906 {
907 request->op = get_op();
|
908 mday 1.7 request->op->_request.insert_first(request);
|
909 mday 1.5 destroy_op = true;
910 }
|
911 mday 1.4
|
912 mday 1.33 request->block = false;
|
913 mday 1.35 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
|
914 mday 1.33 SendAsync(request->op,
915 request->dest,
|
916 mday 1.36 _sendwait_callback,
|
917 mday 1.33 this,
918 (void *)0);
|
919 mday 1.4
|
920 mday 1.33 request->op->_client_sem.wait();
|
921 mday 1.6 request->op->lock();
922 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
923 rpl->op = 0;
924 request->op->unlock();
925
|
926 mday 1.5 if( destroy_op == true)
927 {
|
928 mday 1.6 request->op->lock();
929 request->op->_request.remove(request);
930 request->op->_state |= ASYNC_OPSTATE_RELEASED;
931 request->op->unlock();
932 return_op(request->op);
|
933 mday 1.7 request->op = 0;
|
934 mday 1.5 }
935 return rpl;
|
936 mday 1.1 }
937
938
939 Boolean MessageQueueService::register_service(String name,
940 Uint32 capabilities,
941 Uint32 mask)
942
943 {
944 RegisterCimService *msg = new RegisterCimService(get_next_xid(),
|
945 mday 1.5 0,
|
946 mday 1.1 true,
947 name,
948 capabilities,
949 mask,
950 _queueId);
|
951 mday 1.44 msg->dest = CIMOM_Q_ID;
952
|
953 mday 1.1 Boolean registered = false;
|
954 mday 1.7 AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
|
955 mday 1.1
|
956 mday 1.2 if ( reply != 0 )
|
957 mday 1.1 {
958 if(reply->getMask() & message_mask:: ha_async)
959 {
960 if(reply->getMask() & message_mask::ha_reply)
961 {
|
962 mday 1.15 if(reply->result == async_results::OK ||
963 reply->result == async_results::MODULE_ALREADY_REGISTERED )
|
964 mday 1.1 registered = true;
965 }
966 }
967
|
968 mday 1.7 delete reply;
|
969 mday 1.1 }
|
970 mday 1.5 delete msg;
|
971 mday 1.1 return registered;
972 }
973
974 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
975 {
976
977
978 UpdateCimService *msg = new UpdateCimService(get_next_xid(),
|
979 mday 1.5 0,
|
980 mday 1.1 true,
981 _queueId,
982 _capabilities,
983 _mask);
984 Boolean registered = false;
|
985 mday 1.2
986 AsyncMessage *reply = SendWait(msg);
987 if (reply)
|
988 mday 1.1 {
989 if(reply->getMask() & message_mask:: ha_async)
990 {
991 if(reply->getMask() & message_mask::ha_reply)
992 {
|
993 mday 1.2 if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
|
994 mday 1.1 registered = true;
995 }
996 }
997 delete reply;
998 }
|
999 mday 1.5 delete msg;
|
1000 mday 1.1 return registered;
1001 }
1002
1003
1004 Boolean MessageQueueService::deregister_service(void)
1005 {
|
1006 mday 1.3
|
1007 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
1008 return true;
|
1009 mday 1.1 }
1010
1011
1012 void MessageQueueService::find_services(String name,
1013 Uint32 capabilities,
1014 Uint32 mask,
1015 Array<Uint32> *results)
1016 {
1017
1018 if( results == 0 )
1019 throw NullPointer();
|
1020 mday 1.5
|
1021 mday 1.1 results->clear();
1022
1023 FindServiceQueue *req =
1024 new FindServiceQueue(get_next_xid(),
|
1025 mday 1.5 0,
|
1026 mday 1.1 _queueId,
1027 true,
1028 name,
1029 capabilities,
1030 mask);
|
1031 mday 1.44
1032 req->dest = CIMOM_Q_ID;
|
1033 mday 1.1
|
1034 mday 1.2 AsyncMessage *reply = SendWait(req);
1035 if(reply)
|
1036 mday 1.1 {
1037 if( reply->getMask() & message_mask::ha_async)
1038 {
1039 if(reply->getMask() & message_mask::ha_reply)
1040 {
1041 if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1042 {
1043 if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1044 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1045 }
1046 }
1047 }
1048 delete reply;
1049 }
|
1050 mday 1.5 delete req;
|
1051 mday 1.1 return ;
1052 }
1053
1054 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1055 {
1056 if(result == 0)
1057 throw NullPointer();
1058
1059 EnumerateService *req
1060 = new EnumerateService(get_next_xid(),
|
1061 mday 1.5 0,
|
1062 mday 1.1 _queueId,
1063 true,
1064 queue);
1065
|
1066 mday 1.2 AsyncMessage *reply = SendWait(req);
|
1067 mday 1.1
|
1068 mday 1.2 if (reply)
|
1069 mday 1.1 {
1070 Boolean found = false;
1071
1072 if( reply->getMask() & message_mask::ha_async)
1073 {
1074 if(reply->getMask() & message_mask::ha_reply)
1075 {
1076 if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1077 {
1078 if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1079 {
1080 if( found == false)
1081 {
1082 found = true;
1083
1084 result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1085 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1086 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1087 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1088 }
1089 }
1090 mday 1.1 }
1091 }
1092 }
1093 delete reply;
1094 }
|
1095 mday 1.5 delete req;
1096
|
1097 mday 1.1 return;
1098 }
1099
1100 Uint32 MessageQueueService::get_next_xid(void)
1101 {
1102 _xid++;
1103 return _xid.value();
1104 }
1105
1106 PEGASUS_NAMESPACE_END
|