1 mday 1.53 //%////-*-c++-*-////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 kumpf 1.54 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Company, IBM,
4 // The Open Group, Tivoli Systems
|
5 mday 1.1 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
|
12 mday 1.53 //
|
13 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
14 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
15 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
16 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
17 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
18 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
19 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
21 //
22 //==============================================================================
23 //
24 // Author: Mike Day (mdday@us.ibm.com)
25 //
26 // Modified By:
27 //
28 //%/////////////////////////////////////////////////////////////////////////////
29
30 #include "MessageQueueService.h"
|
31 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
32 mday 1.1
33 PEGASUS_NAMESPACE_BEGIN
34
|
35 mday 1.15
36 cimom *MessageQueueService::_meta_dispatcher = 0;
37 AtomicInt MessageQueueService::_service_count = 0;
38 AtomicInt MessageQueueService::_xid(1);
|
39 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
40 mday 1.15
|
41 mday 1.58 static struct timeval create_time = {0, 1};
|
42 mday 1.64.2.2 static struct timeval destroy_time = {15, 0};
|
43 mday 1.59 static struct timeval deadlock_time = {0, 0};
|
44 mday 1.53
|
45 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
46 mday 1.53
47 DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
48
|
49 kumpf 1.62 Thread* MessageQueueService::_polling_thread = 0;
|
50 mday 1.61
51
|
52 mday 1.64.2.2 int MessageQueueService::kill_idle_threads(void)
|
53 mday 1.53 {
|
54 mday 1.64.2.2 static struct timeval now, last;
|
55 mday 1.53 gettimeofday(&now, NULL);
|
56 mday 1.56 int dead_threads = 0;
|
57 mday 1.53
|
58 mday 1.64.2.2 if( now.tv_sec - last.tv_sec > 0 )
|
59 mday 1.53 {
60 gettimeofday(&last, NULL);
|
61 mday 1.56 try
62 {
|
63 mday 1.64.2.2 dead_threads = _thread_pool->kill_dead_threads();
|
64 mday 1.56 }
|
65 mday 1.64.2.2 catch(IPCException& )
|
66 mday 1.56 {
67
68 }
|
69 mday 1.53 }
|
70 mday 1.64.2.2 return dead_threads;
|
71 mday 1.53 }
72
73 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
74 {
75 Thread *myself = reinterpret_cast<Thread *>(parm);
76 DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
77 while ( _stop_polling.value() == 0 )
78 {
79 _polling_sem.wait();
|
80 mday 1.61 if(_stop_polling.value() != 0 )
81 {
|
82 kumpf 1.62 break;
|
83 mday 1.61 }
|
84 mday 1.64.2.2
|
85 mday 1.53 list->lock();
86 MessageQueueService *service = list->next(0);
87 while(service != NULL)
88 {
89 if(service->_incoming.count() > 0 )
90 {
|
91 mday 1.55 _thread_pool->allocate_and_awaken(service, _req_proc);
|
92 mday 1.64.2.2 // service->_req_proc(service);
|
93 mday 1.53 }
94 service = list->next(service);
95 }
96 list->unlock();
97 }
98 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
99 return(0);
100 }
101
102
103 Semaphore MessageQueueService::_polling_sem(0);
104 AtomicInt MessageQueueService::_stop_polling(0);
105
|
106 mday 1.15
|
107 mday 1.1 MessageQueueService::MessageQueueService(const char *name,
108 Uint32 queueID,
109 Uint32 capabilities,
110 Uint32 mask)
|
111 mday 1.15 : Base(name, true, queueID),
|
112 mday 1.22
|
113 mday 1.1 _mask(mask),
|
114 mday 1.4 _die(0),
|
115 mday 1.41 _incoming(true, 0),
|
116 mday 1.39 _callback(true),
|
117 mday 1.7 _incoming_queue_shutdown(0),
|
118 mday 1.39 _callback_ready(0),
119 _req_thread(_req_proc, this, false),
120 _callback_thread(_callback_proc, this, false)
|
121 mday 1.64.2.2
|
122 mday 1.1 {
|
123 mday 1.60
|
124 mday 1.22 _capabilities = (capabilities | module_capabilities::async);
125
|
126 mday 1.1 _default_op_timeout.tv_sec = 30;
127 _default_op_timeout.tv_usec = 100;
|
128 mday 1.15
129 _meta_dispatcher_mutex.lock(pegasus_thread_self());
130
131 if( _meta_dispatcher == 0 )
132 {
133 PEGASUS_ASSERT( _service_count.value() == 0 );
134 _meta_dispatcher = new cimom();
135 if (_meta_dispatcher == NULL )
136 {
137 _meta_dispatcher_mutex.unlock();
138 throw NullPointer();
139 }
|
140 mday 1.58 _thread_pool = new ThreadPool(0, "MessageQueueService", 0, 0,
|
141 mday 1.55 create_time, destroy_time, deadlock_time);
|
142 mday 1.61
|
143 kumpf 1.62 _polling_thread = new Thread(polling_routine,
144 reinterpret_cast<void *>(&_polling_list),
145 false);
146 _polling_thread->run();
|
147 mday 1.15 }
148 _service_count++;
149
150 if( false == register_service(name, _capabilities, _mask) )
151 {
152 _meta_dispatcher_mutex.unlock();
|
153 mday 1.64.2.2 throw BindFailed("MessageQueueService Base Unable to register with Meta Dispatcher");
|
154 mday 1.15 }
155
|
156 mday 1.53 _polling_list.insert_last(this);
157
|
158 mday 1.15 _meta_dispatcher_mutex.unlock();
|
159 mday 1.39 // _callback_thread.run();
160
|
161 mday 1.53 // _req_thread.run();
|
162 mday 1.1 }
163
|
164 mday 1.4
|
165 mday 1.1 MessageQueueService::~MessageQueueService(void)
166 {
167 _die = 1;
|
168 mday 1.7 if (_incoming_queue_shutdown.value() == 0 )
|
169 mday 1.16 {
|
170 mday 1.32 _shutdown_incoming_queue();
|
171 mday 1.16 }
|
172 mday 1.39 _callback_ready.signal();
|
173 mday 1.53 // _callback_thread.join();
|
174 mday 1.39
|
175 mday 1.15 _meta_dispatcher_mutex.lock(pegasus_thread_self());
176 _service_count--;
177 if (_service_count.value() == 0 )
178 {
|
179 mday 1.53 _stop_polling++;
180 _polling_sem.signal();
|
181 kumpf 1.62 _polling_thread->join();
182 delete _polling_thread;
|
183 mday 1.15 _meta_dispatcher->_shutdown_routed_queue();
184 delete _meta_dispatcher;
|
185 kumpf 1.45 _meta_dispatcher = 0;
|
186 mday 1.64.2.2
|
187 mday 1.15 }
188 _meta_dispatcher_mutex.unlock();
|
189 mday 1.53 _polling_list.remove(this);
190 }
|
191 mday 1.1
|
192 mday 1.7 void MessageQueueService::_shutdown_incoming_queue(void)
193 {
194
|
195 mday 1.9 if (_incoming_queue_shutdown.value() > 0 )
196 return ;
|
197 mday 1.8 AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
198 0,
199 _queueId,
200 _queueId,
201 true,
202 AsyncIoctl::IO_CLOSE,
203 0,
204 0);
|
205 mday 1.9
|
206 mday 1.8 msg->op = get_op();
|
207 mday 1.41 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
208 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
209 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
210 mday 1.32 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
211 mday 1.41
|
212 mday 1.32 msg->op->_op_dest = this;
|
213 mday 1.8 msg->op->_request.insert_first(msg);
214
215 _incoming.insert_last_wait(msg->op);
|
216 mday 1.32
|
217 mday 1.53 // _req_thread.join();
|
218 mday 1.16
|
219 mday 1.7 }
220
|
221 mday 1.22
222
223 void MessageQueueService::enqueue(Message *msg) throw(IPCException)
224 {
|
225 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
226
|
227 mday 1.22 Base::enqueue(msg);
|
228 kumpf 1.28
229 PEG_METHOD_EXIT();
|
230 mday 1.22 }
231
232
|
233 mday 1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
234 {
235 Thread *myself = reinterpret_cast<Thread *>(parm);
236 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
237 AsyncOpNode *operation = 0;
238
239 while ( service->_die.value() == 0 )
240 {
241 service->_callback_ready.wait();
242
243 service->_callback.lock();
244 operation = service->_callback.next(0);
245 while( operation != NULL)
246 {
247 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
248 {
249 operation = service->_callback.remove_no_lock(operation);
250 PEGASUS_ASSERT(operation != NULL);
251 operation->_thread_ptr = myself;
252 operation->_service_ptr = service;
253 service->_handle_async_callback(operation);
254 mday 1.39 break;
255 }
256 operation = service->_callback.next(operation);
257 }
258 service->_callback.unlock();
259 }
260 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
261 return(0);
262 }
263
|
264 mday 1.22
|
265 mday 1.5 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
|
266 mday 1.1 {
|
267 mday 1.53 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
|
268 mday 1.5 // pull messages off the incoming queue and dispatch them. then
269 // check pending messages that are non-blocking
|
270 mday 1.7 AsyncOpNode *operation = 0;
271
|
272 mday 1.56 if ( service->_die.value() == 0 )
273 {
|
274 mday 1.41 try
275 {
|
276 mday 1.53 operation = service->_incoming.remove_first();
|
277 mday 1.41 }
278 catch(ListClosed & )
279 {
|
280 mday 1.56 operation = 0;
281
282 return(0);
|
283 mday 1.41 }
284 if( operation )
285 {
286 operation->_service_ptr = service;
287 service->_handle_incoming_operation(operation);
288 }
|
289 mday 1.56 }
|
290 mday 1.44
|
291 mday 1.5 return(0);
|
292 mday 1.1 }
293
|
294 mday 1.43 Uint32 MessageQueueService::get_pending_callback_count(void)
295 {
296 return _callback.count();
297 }
298
299
300
|
301 mday 1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op,
302 MessageQueue *q,
303 void *parm)
304 {
305 op->_client_sem.signal();
306 }
307
|
308 mday 1.30
309 // callback function is responsible for cleaning up all resources
310 // including op, op->_callback_node, and op->_callback_ptr
|
311 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
312 {
|
313 mday 1.39 if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
314 {
315
316 Message *msg = op->get_request();
317 if( msg && ( msg->getMask() & message_mask::ha_async))
318 {
319 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
320 {
321 AsyncLegacyOperationStart *wrapper =
322 static_cast<AsyncLegacyOperationStart *>(msg);
323 msg = wrapper->get_action();
324 delete wrapper;
325 }
326 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
327 {
328 AsyncModuleOperationStart *wrapper =
329 static_cast<AsyncModuleOperationStart *>(msg);
330 msg = wrapper->get_action();
331 delete wrapper;
332 }
333 else if (msg->getType() == async_messages::ASYNC_OP_START)
334 mday 1.39 {
335 AsyncOperationStart *wrapper =
336 static_cast<AsyncOperationStart *>(msg);
337 msg = wrapper->get_action();
338 delete wrapper;
339 }
340 delete msg;
341 }
342
343 msg = op->get_response();
344 if( msg && ( msg->getMask() & message_mask::ha_async))
345 {
346 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
347 {
348 AsyncLegacyOperationResult *wrapper =
349 static_cast<AsyncLegacyOperationResult *>(msg);
350 msg = wrapper->get_result();
351 delete wrapper;
352 }
353 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
354 {
355 mday 1.39 AsyncModuleOperationResult *wrapper =
356 static_cast<AsyncModuleOperationResult *>(msg);
357 msg = wrapper->get_result();
358 delete wrapper;
359 }
360 }
361 void (*callback)(Message *, void *, void *) = op->__async_callback;
362 void *handle = op->_callback_handle;
363 void *parm = op->_callback_parameter;
364 op->release();
365 return_op(op);
366 callback(msg, handle, parm);
367 }
368 else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
369 {
370 // note that _callback_node may be different from op
371 // op->_callback_response_q is a "this" pointer we can use for
372 // static callback methods
373 op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
374 }
|
375 mday 1.22 }
376
|
377 mday 1.1
|
378 mday 1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
379 // Thread *thread,
380 // MessageQueue *queue)
|
381 mday 1.1 {
|
382 mday 1.5 if ( operation != 0 )
383 {
|
384 mday 1.29
|
385 mday 1.22 // ATTN: optimization
386 // << Tue Feb 19 14:10:38 2002 mdd >>
|
387 mday 1.6 operation->lock();
|
388 mday 1.29
|
389 mday 1.6 Message *rq = operation->_request.next(0);
|
390 mday 1.29
|
391 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
392 // move this to the bottom of the loop when the majority of
393 // messages become async messages.
394
|
395 mday 1.18 // divert legacy messages to handleEnqueue
|
396 mday 1.29 if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
|
397 mday 1.18 {
398 rq = operation->_request.remove_first() ;
399 operation->unlock();
400 // delete the op node
|
401 mday 1.39 operation->release();
402 return_op( operation);
|
403 mday 1.24
|
404 mday 1.26 handleEnqueue(rq);
|
405 mday 1.18 return;
406 }
407
|
408 mday 1.39 if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
409 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
|
410 mday 1.29 (operation->_state & ASYNC_OPSTATE_COMPLETE))
411 {
|
412 mday 1.49
|
413 mday 1.29 operation->unlock();
414 _handle_async_callback(operation);
415 }
416 else
417 {
418 PEGASUS_ASSERT(rq != 0 );
419 // ATTN: optimization
420 // << Wed Mar 6 15:00:39 2002 mdd >>
421 // put thread and queue into the asyncopnode structure.
|
422 mday 1.34 // (static_cast<AsyncMessage *>(rq))->_myself = operation->_thread_ptr;
423 // (static_cast<AsyncMessage *>(rq))->_service = operation->_service_ptr;
424 // done << Tue Mar 12 14:49:07 2002 mdd >>
|
425 mday 1.29 operation->unlock();
426 _handle_async_request(static_cast<AsyncRequest *>(rq));
427 }
|
428 mday 1.5 }
429 return;
|
430 mday 1.1 }
431
432 void MessageQueueService::_handle_async_request(AsyncRequest *req)
433 {
|
434 mday 1.4 if ( req != 0 )
435 {
436 req->op->processing();
|
437 mday 1.1
|
438 mday 1.4 Uint32 type = req->getType();
439 if( type == async_messages::HEARTBEAT )
440 handle_heartbeat_request(req);
441 else if (type == async_messages::IOCTL)
442 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
443 else if (type == async_messages::CIMSERVICE_START)
444 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
445 else if (type == async_messages::CIMSERVICE_STOP)
446 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
447 else if (type == async_messages::CIMSERVICE_PAUSE)
448 handle_CimServicePause(static_cast<CimServicePause *>(req));
449 else if (type == async_messages::CIMSERVICE_RESUME)
450 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
451 else if ( type == async_messages::ASYNC_OP_START)
452 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
453 else
454 {
455 // we don't handle this request message
456 _make_response(req, async_results::CIM_NAK );
457 }
|
458 mday 1.1 }
459 }
460
|
461 mday 1.17
462 Boolean MessageQueueService::_enqueueResponse(
463 Message* request,
464 Message* response)
|
465 mday 1.18
|
466 mday 1.17 {
|
467 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
468 "MessageQueueService::_enqueueResponse");
|
469 mday 1.25
470 if( request->getMask() & message_mask::ha_async)
471 {
472 if (response->getMask() & message_mask::ha_async )
473 {
474 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
475 static_cast<AsyncReply *>(response),
476 ASYNC_OPSTATE_COMPLETE, 0 );
|
477 kumpf 1.37 PEG_METHOD_EXIT();
|
478 mday 1.25 return true;
479 }
480 }
481
|
482 mday 1.17 if(request->_async != 0 )
483 {
484 Uint32 mask = request->_async->getMask();
|
485 mday 1.18 PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
486
487 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
488 AsyncOpNode *op = async->op;
489 request->_async = 0;
|
490 mday 1.63 // the legacy request is going to be deleted by its handler
|
491 mday 1.27 // remove it from the op node
|
492 mday 1.63
493 static_cast<AsyncLegacyOperationStart *>(async)->get_action();
|
494 mday 1.18
|
495 mday 1.63
|
496 mday 1.18 AsyncLegacyOperationResult *async_result =
497 new AsyncLegacyOperationResult(
498 async->getKey(),
499 async->getRouting(),
500 op,
501 response);
502 _completeAsyncResponse(async,
503 async_result,
504 ASYNC_OPSTATE_COMPLETE,
505 0);
|
506 kumpf 1.37 PEG_METHOD_EXIT();
|
507 mday 1.18 return true;
|
508 mday 1.17 }
|
509 mday 1.18
510 // ensure that the destination queue is in response->dest
|
511 kumpf 1.37 PEG_METHOD_EXIT();
|
512 mday 1.24 return SendForget(response);
|
513 mday 1.18
|
514 mday 1.17 }
515
|
516 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
517 mday 1.1 {
|
518 mday 1.19 cimom::_make_response(req, code);
|
519 mday 1.1 }
520
521
|
522 mday 1.5 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
523 AsyncReply *reply,
524 Uint32 state,
525 Uint32 flag)
526 {
|
527 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
528 "MessageQueueService::_completeAsyncResponse");
529
|
530 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
531 kumpf 1.37
532 PEG_METHOD_EXIT();
|
533 mday 1.5 }
534
535
|
536 mday 1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
537 Uint32 state,
538 Uint32 flag,
539 Uint32 code)
540 {
541 cimom::_complete_op_node(op, state, flag, code);
542 }
543
|
544 mday 1.5
545 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
546 {
|
547 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
548 return false;
549
|
550 mday 1.20 // ATTN optimization remove the message checking altogether in the base
551 // << Mon Feb 18 14:02:20 2002 mdd >>
|
552 mday 1.64.2.2 op->lock();
|
553 mday 1.6 Message *rq = op->_request.next(0);
|
554 mday 1.20 Message *rp = op->_response.next(0);
|
555 mday 1.64.2.2 op->unlock();
|
556 mday 1.6
|
557 mday 1.22 if( (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&
558 _die.value() == 0 )
|
559 mday 1.5 {
|
560 mday 1.6 _incoming.insert_last_wait(op);
|
561 mday 1.53 _polling_sem.signal();
|
562 mday 1.5 return true;
563 }
|
564 mday 1.32 // else
565 // {
566 // if( (rq != 0 && (true == MessageQueueService::messageOK(rq))) ||
567 // (rp != 0 && ( true == MessageQueueService::messageOK(rp) )) &&
568 // _die.value() == 0)
569 // {
570 // MessageQueueService::_incoming.insert_last_wait(op);
571 // return true;
572 // }
573 // }
574
|
575 mday 1.5 return false;
576 }
577
578 Boolean MessageQueueService::messageOK(const Message *msg)
579 {
|
580 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
581 return false;
|
582 mday 1.18 return true;
583 }
584
|
585 mday 1.29
586 // made pure virtual
587 // << Wed Mar 6 15:11:31 2002 mdd >>
|
588 mday 1.25 // void MessageQueueService::handleEnqueue(Message *msg)
589 // {
590 // if ( msg )
591 // delete msg;
592 // }
|
593 mday 1.5
|
594 mday 1.29 // made pure virtual
595 // << Wed Mar 6 15:11:56 2002 mdd >>
|
596 mday 1.25 // void MessageQueueService::handleEnqueue(void)
597 // {
598 // Message *msg = dequeue();
599 // handleEnqueue(msg);
600 // }
|
601 mday 1.5
|
602 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
603 {
604 // default action is to echo a heartbeat response
605
606 AsyncReply *reply =
607 new AsyncReply(async_messages::HEARTBEAT,
608 req->getKey(),
609 req->getRouting(),
610 0,
611 req->op,
612 async_results::OK,
613 req->resp,
614 false);
|
615 mday 1.4 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
|
616 mday 1.1 }
617
618
619 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
620 {
621 ;
622 }
623
624 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
625 {
|
626 mday 1.8
627 switch( req->ctl )
628 {
629 case AsyncIoctl::IO_CLOSE:
630 {
631 // save my bearings
|
632 mday 1.53 // Thread *myself = req->op->_thread_ptr;
|
633 mday 1.34 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
|
634 mday 1.8
635 // respond to this message.
636 _make_response(req, async_results::OK);
637 // ensure we do not accept any further messages
638
639 // ensure we don't recurse on IO_CLOSE
640 if( _incoming_queue_shutdown.value() > 0 )
641 break;
642
643 // set the closing flag
644 service->_incoming_queue_shutdown = 1;
645 // empty out the queue
646 while( 1 )
647 {
648 AsyncOpNode *operation;
649 try
650 {
651 operation = service->_incoming.remove_first();
652 }
653 catch(IPCException & )
654 {
655 mday 1.8 break;
656 }
657 if( operation )
658 {
|
659 mday 1.53 // operation->_thread_ptr = myself;
|
660 mday 1.34 operation->_service_ptr = service;
661 service->_handle_incoming_operation(operation);
|
662 mday 1.8 }
663 else
664 break;
665 } // message processing loop
666
667 // shutdown the AsyncDQueue
668 service->_incoming.shutdown_queue();
669 // exit the thread !
|
670 mday 1.53 // myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
|
671 mday 1.8 return;
672 }
673
674 default:
675 _make_response(req, async_results::CIM_NAK);
676 }
|
677 mday 1.1 }
|
678 mday 1.8
|
679 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
680 {
|
681 mday 1.10 // clear the stoped bit and update
|
682 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
683 mday 1.10 _make_response(req, async_results::OK);
684 // now tell the meta dispatcher we are stopped
685 update_service(_capabilities, _mask);
686
|
687 mday 1.1 }
688 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
689 {
|
690 mday 1.10 // set the stopeed bit and update
691 _capabilities |= module_capabilities::stopped;
692 _make_response(req, async_results::CIM_STOPPED);
693 // now tell the meta dispatcher we are stopped
694 update_service(_capabilities, _mask);
695
|
696 mday 1.1 }
697 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
698 {
|
699 mday 1.10 // set the paused bit and update
|
700 mday 1.13 _capabilities |= module_capabilities::paused;
|
701 mday 1.11 update_service(_capabilities, _mask);
|
702 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
703 // now tell the meta dispatcher we are stopped
|
704 mday 1.1 }
705 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
706 {
|
707 mday 1.10 // clear the paused bit and update
|
708 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
709 mday 1.11 update_service(_capabilities, _mask);
|
710 mday 1.10 _make_response(req, async_results::OK);
711 // now tell the meta dispatcher we are stopped
|
712 mday 1.1 }
713
714 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
715 {
716 _make_response(req, async_results::CIM_NAK);
717 }
718
719 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
720 {
|
721 mday 1.14 ;
722 }
723
|
724 mday 1.10
|
725 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
726 {
727 // remove the legacy message from the request and enqueue it to its destination
728 Uint32 result = async_results::CIM_NAK;
729
|
730 mday 1.25 Message *legacy = req->_act;
|
731 mday 1.14 if ( legacy != 0 )
732 {
|
733 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
734 mday 1.14 if( queue != 0 )
735 {
|
736 mday 1.25 if(queue->isAsync() == true )
737 {
738 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
739 }
740 else
741 {
742 // Enqueue the response:
743 queue->enqueue(req->get_action());
744 }
745
|
746 mday 1.14 result = async_results::OK;
747 }
748 }
749 _make_response(req, result);
750 }
751
752 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
753 {
754 ;
|
755 mday 1.1 }
756
757 AsyncOpNode *MessageQueueService::get_op(void)
758 {
|
759 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
760 mday 1.1
|
761 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
762 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
763 mday 1.4
|
764 mday 1.1 return op;
765 }
766
767 void MessageQueueService::return_op(AsyncOpNode *op)
768 {
769 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
|
770 mday 1.4 delete op;
|
771 mday 1.1 }
772
|
773 mday 1.18
|
774 mday 1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op,
775 Uint32 destination)
776 {
777 PEGASUS_ASSERT(op != 0 );
|
778 mday 1.30 op->lock();
779 op->_op_dest = MessageQueue::lookup(destination);
|
780 mday 1.29 op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
781 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
|
782 mday 1.30 op->unlock();
783 if ( op->_op_dest == 0 )
784 return false;
785
|
786 mday 1.29 return _meta_dispatcher->route_async(op);
787 }
788
|
789 mday 1.39
|
790 mday 1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op,
791 Uint32 destination,
|
792 mday 1.18 void (*callback)(AsyncOpNode *,
|
793 mday 1.32 MessageQueue *,
|
794 mday 1.30 void *),
795 MessageQueue *callback_response_q,
796 void *callback_ptr)
|
797 mday 1.20 {
|
798 mday 1.21 PEGASUS_ASSERT(op != 0 && callback != 0 );
|
799 mday 1.18
|
800 mday 1.21 // get the queue handle for the destination
801
|
802 mday 1.30 op->lock();
|
803 mday 1.32 op->_op_dest = MessageQueue::lookup(destination); // destination of this message
|
804 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
805 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
806 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
807 mday 1.30 // initialize the callback data
|
808 mday 1.32 op->_async_callback = callback; // callback function to be executed by recpt. of response
809 op->_callback_node = op; // the op node
810 op->_callback_response_q = callback_response_q; // the queue that will receive the response
811 op->_callback_ptr = callback_ptr; // user data for callback
812 op->_callback_request_q = this; // I am the originator of this request
|
813 mday 1.30
814 op->unlock();
815 if(op->_op_dest == 0)
816 return false;
|
817 mday 1.21
818 return _meta_dispatcher->route_async(op);
|
819 mday 1.18 }
820
821
|
822 mday 1.39 Boolean MessageQueueService::SendAsync(Message *msg,
823 Uint32 destination,
824 void (*callback)(Message *response,
825 void *handle,
826 void *parameter),
827 void *handle,
828 void *parameter)
829 {
830 if(msg == NULL)
831 return false;
832 if(callback == NULL)
833 return SendForget(msg);
834 AsyncOpNode *op = get_op();
|
835 mday 1.43 msg->dest = destination;
|
836 mday 1.39 if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
837 {
838 op->release();
839 return_op(op);
840 return false;
841 }
842 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
843 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
844 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
845 op->__async_callback = callback;
846 op->_callback_node = op;
847 op->_callback_handle = handle;
848 op->_callback_parameter = parameter;
849 op->_callback_response_q = this;
850
851
852 if( ! (msg->getMask() & message_mask::ha_async) )
853 {
854 AsyncLegacyOperationStart *wrapper =
855 new AsyncLegacyOperationStart(get_next_xid(),
856 op,
857 mday 1.39 destination,
858 msg,
859 destination);
860 }
861 else
862 {
863 op->_request.insert_first(msg);
864 (static_cast<AsyncMessage *>(msg))->op = op;
865 }
866
867 _callback.insert_last(op);
868 return _meta_dispatcher->route_async(op);
869 }
870
871
|
872 mday 1.18 Boolean MessageQueueService::SendForget(Message *msg)
873 {
874
|
875 mday 1.24
|
876 mday 1.18 AsyncOpNode *op = 0;
|
877 mday 1.22 Uint32 mask = msg->getMask();
878
879 if (mask & message_mask::ha_async)
|
880 mday 1.18 {
881 op = (static_cast<AsyncMessage *>(msg))->op ;
882 }
|
883 mday 1.22
|
884 mday 1.18 if( op == 0 )
|
885 mday 1.20 {
|
886 mday 1.18 op = get_op();
|
887 mday 1.20 op->_request.insert_first(msg);
|
888 mday 1.22 if (mask & message_mask::ha_async)
889 (static_cast<AsyncMessage *>(msg))->op = op;
|
890 mday 1.20 }
|
891 mday 1.30 op->_op_dest = MessageQueue::lookup(msg->dest);
|
892 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
893 mday 1.41 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
894 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
895 mday 1.22 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
896 mday 1.30 if ( op->_op_dest == 0 )
|
897 mday 1.39 {
898 op->release();
899 return_op(op);
|
900 mday 1.21 return false;
|
901 mday 1.39 }
|
902 mday 1.24
|
903 mday 1.18 // now see if the meta dispatcher will take it
|
904 mday 1.30 return _meta_dispatcher->route_async(op);
|
905 mday 1.18 }
|
906 mday 1.2
|
907 mday 1.1
|
908 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
909 mday 1.1 {
|
910 mday 1.4 if ( request == 0 )
911 return 0 ;
|
912 mday 1.5
913 Boolean destroy_op = false;
914
915 if (request->op == false)
916 {
917 request->op = get_op();
|
918 mday 1.7 request->op->_request.insert_first(request);
|
919 mday 1.5 destroy_op = true;
920 }
|
921 mday 1.4
|
922 mday 1.33 request->block = false;
|
923 mday 1.35 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
|
924 mday 1.33 SendAsync(request->op,
925 request->dest,
|
926 mday 1.36 _sendwait_callback,
|
927 mday 1.33 this,
928 (void *)0);
|
929 mday 1.4
|
930 mday 1.33 request->op->_client_sem.wait();
|
931 mday 1.6 request->op->lock();
932 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
933 rpl->op = 0;
934 request->op->unlock();
935
|
936 mday 1.5 if( destroy_op == true)
937 {
|
938 mday 1.6 request->op->lock();
939 request->op->_request.remove(request);
940 request->op->_state |= ASYNC_OPSTATE_RELEASED;
941 request->op->unlock();
942 return_op(request->op);
|
943 mday 1.7 request->op = 0;
|
944 mday 1.5 }
945 return rpl;
|
946 mday 1.1 }
947
948
949 Boolean MessageQueueService::register_service(String name,
950 Uint32 capabilities,
951 Uint32 mask)
952
953 {
954 RegisterCimService *msg = new RegisterCimService(get_next_xid(),
|
955 mday 1.5 0,
|
956 mday 1.1 true,
957 name,
958 capabilities,
959 mask,
960 _queueId);
|
961 mday 1.44 msg->dest = CIMOM_Q_ID;
962
|
963 mday 1.1 Boolean registered = false;
|
964 mday 1.7 AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
|
965 mday 1.1
|
966 mday 1.2 if ( reply != 0 )
|
967 mday 1.1 {
968 if(reply->getMask() & message_mask:: ha_async)
969 {
970 if(reply->getMask() & message_mask::ha_reply)
971 {
|
972 mday 1.15 if(reply->result == async_results::OK ||
973 reply->result == async_results::MODULE_ALREADY_REGISTERED )
|
974 mday 1.1 registered = true;
975 }
976 }
977
|
978 mday 1.7 delete reply;
|
979 mday 1.1 }
|
980 mday 1.5 delete msg;
|
981 mday 1.1 return registered;
982 }
983
984 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
985 {
986
987
988 UpdateCimService *msg = new UpdateCimService(get_next_xid(),
|
989 mday 1.5 0,
|
990 mday 1.1 true,
991 _queueId,
992 _capabilities,
993 _mask);
994 Boolean registered = false;
|
995 mday 1.2
996 AsyncMessage *reply = SendWait(msg);
997 if (reply)
|
998 mday 1.1 {
999 if(reply->getMask() & message_mask:: ha_async)
1000 {
1001 if(reply->getMask() & message_mask::ha_reply)
1002 {
|
1003 mday 1.2 if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
|
1004 mday 1.1 registered = true;
1005 }
1006 }
1007 delete reply;
1008 }
|
1009 mday 1.5 delete msg;
|
1010 mday 1.1 return registered;
1011 }
1012
1013
1014 Boolean MessageQueueService::deregister_service(void)
1015 {
|
1016 mday 1.3
|
1017 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
1018 return true;
|
1019 mday 1.1 }
1020
1021
1022 void MessageQueueService::find_services(String name,
1023 Uint32 capabilities,
1024 Uint32 mask,
1025 Array<Uint32> *results)
1026 {
1027
1028 if( results == 0 )
1029 throw NullPointer();
|
1030 mday 1.5
|
1031 mday 1.1 results->clear();
1032
1033 FindServiceQueue *req =
1034 new FindServiceQueue(get_next_xid(),
|
1035 mday 1.5 0,
|
1036 mday 1.1 _queueId,
1037 true,
1038 name,
1039 capabilities,
1040 mask);
|
1041 mday 1.44
1042 req->dest = CIMOM_Q_ID;
|
1043 mday 1.1
|
1044 mday 1.2 AsyncMessage *reply = SendWait(req);
1045 if(reply)
|
1046 mday 1.1 {
1047 if( reply->getMask() & message_mask::ha_async)
1048 {
1049 if(reply->getMask() & message_mask::ha_reply)
1050 {
1051 if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1052 {
1053 if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1054 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1055 }
1056 }
1057 }
1058 delete reply;
1059 }
|
1060 mday 1.5 delete req;
|
1061 mday 1.1 return ;
1062 }
1063
1064 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1065 {
1066 if(result == 0)
1067 throw NullPointer();
1068
1069 EnumerateService *req
1070 = new EnumerateService(get_next_xid(),
|
1071 mday 1.5 0,
|
1072 mday 1.1 _queueId,
1073 true,
1074 queue);
1075
|
1076 mday 1.2 AsyncMessage *reply = SendWait(req);
|
1077 mday 1.1
|
1078 mday 1.2 if (reply)
|
1079 mday 1.1 {
1080 Boolean found = false;
1081
1082 if( reply->getMask() & message_mask::ha_async)
1083 {
1084 if(reply->getMask() & message_mask::ha_reply)
1085 {
1086 if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1087 {
1088 if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1089 {
1090 if( found == false)
1091 {
1092 found = true;
1093
1094 result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1095 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1096 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1097 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1098 }
1099 }
1100 mday 1.1 }
1101 }
1102 }
1103 delete reply;
1104 }
|
1105 mday 1.5 delete req;
1106
|
1107 mday 1.1 return;
1108 }
1109
1110 Uint32 MessageQueueService::get_next_xid(void)
1111 {
1112 _xid++;
1113 return _xid.value();
1114 }
1115
1116 PEGASUS_NAMESPACE_END
|