1 karl 1.82 //%2003////////////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 karl 1.82 // Copyright (c) 2000, 2001, 2002 BMC Software, Hewlett-Packard Development
4 // Company, L. P., IBM Corp., The Open Group, Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L. P.;
6 // IBM Corp.; EMC Corporation, The Open Group.
|
7 mday 1.1 //
8 // Permission is hereby granted, free of charge, to any person obtaining a copy
9 // of this software and associated documentation files (the "Software"), to
10 // deal in the Software without restriction, including without limitation the
11 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
12 // sell copies of the Software, and to permit persons to whom the Software is
13 // furnished to do so, subject to the following conditions:
|
14 karl 1.82 //
|
15 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
16 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
17 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
18 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
19 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
20 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
22 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
23 //
24 //==============================================================================
25 //
26 // Author: Mike Day (mdday@us.ibm.com)
27 //
28 // Modified By:
29 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include "MessageQueueService.h"
|
33 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
34 humberto 1.71 #include <Pegasus/Common/MessageLoader.h> //l10n
|
35 mday 1.1
36 PEGASUS_NAMESPACE_BEGIN
37
|
38 mday 1.15
39 cimom *MessageQueueService::_meta_dispatcher = 0;
40 AtomicInt MessageQueueService::_service_count = 0;
41 AtomicInt MessageQueueService::_xid(1);
|
42 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
43 mday 1.15
|
44 mday 1.58 static struct timeval create_time = {0, 1};
|
45 mday 1.70 static struct timeval destroy_time = {300, 0};
|
46 mday 1.59 static struct timeval deadlock_time = {0, 0};
|
47 mday 1.53
|
48 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
49 mday 1.53
50 DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
51
|
52 kumpf 1.62 Thread* MessageQueueService::_polling_thread = 0;
|
53 mday 1.61
|
54 mday 1.69 ThreadPool *MessageQueueService::get_thread_pool(void)
55 {
56 return _thread_pool;
57 }
58
59 void unload_idle_providers(void);
|
60 mday 1.61
|
61 mday 1.69 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::kill_idle_threads(void *parm)
|
62 mday 1.53 {
|
63 mday 1.69
64 static struct timeval now, last = {0,0};
|
65 mday 1.53 gettimeofday(&now, NULL);
|
66 mday 1.56 int dead_threads = 0;
|
67 mday 1.53
|
68 mday 1.70 if( now.tv_sec - last.tv_sec > 120 )
|
69 mday 1.53 {
70 gettimeofday(&last, NULL);
|
71 mday 1.56 try
72 {
|
73 mday 1.69 dead_threads = MessageQueueService::_thread_pool->kill_dead_threads();
|
74 mday 1.56 }
|
75 mday 1.69 catch(...)
|
76 mday 1.56 {
77
78 }
|
79 mday 1.53 }
|
80 mday 1.69 exit_thread((PEGASUS_THREAD_RETURN)dead_threads);
81 return (PEGASUS_THREAD_RETURN)dead_threads;
|
82 mday 1.53 }
83
|
84 mday 1.68
|
85 mday 1.76 void MessageQueueService::force_shutdown(Boolean destroy_flag)
|
86 mday 1.68 {
|
87 mday 1.79 return;
88
|
89 kumpf 1.75 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
90 humberto 1.71 //l10n
91 MessageLoaderParms parms("Common.MessageQueueService.FORCING_SHUTDOWN",
|
92 humberto 1.74 "Forcing shutdown of CIMOM Message Router");
|
93 humberto 1.71 PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);
|
94 kumpf 1.75 #endif
|
95 mday 1.76
|
96 kumpf 1.75
|
97 mday 1.68 MessageQueueService *svc;
|
98 konrad.r 1.73 int counter = 0;
|
99 mday 1.68 _polling_list.lock();
100 svc = _polling_list.next(0);
|
101 humberto 1.71
|
102 mday 1.68 while(svc != 0)
103 {
|
104 kumpf 1.75 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
105 humberto 1.71 //l10n - reuse same MessageLoaderParms to avoid multiple creates
106 parms.msg_id = "Common.MessageQueueService.STOPPING_SERVICE";
107 parms.default_msg = "Stopping $0";
108 parms.arg0 = svc->getQueueName();
109 PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);
|
110 kumpf 1.75 #endif
|
111 humberto 1.71
|
112 mday 1.68 _polling_sem.signal();
113 svc->_shutdown_incoming_queue();
|
114 konrad.r 1.73 counter++;
|
115 mday 1.68 _polling_sem.signal();
116 svc = _polling_list.next(svc);
117 }
118 _polling_list.unlock();
|
119 konrad.r 1.73
120 _polling_sem.signal();
121
|
122 mday 1.76 MessageQueueService::_stop_polling = 1;
123
124 if(destroy_flag == true)
125 {
126
127 svc = _polling_list.remove_last();
128 while(svc)
129 {
130 delete svc;
131 svc = _polling_list.remove_last();
132 }
133
|
134 konrad.r 1.73 }
|
135 mday 1.68 }
136
137
|
138 mday 1.53 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
139 {
140 Thread *myself = reinterpret_cast<Thread *>(parm);
141 DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
142 while ( _stop_polling.value() == 0 )
143 {
|
144 mday 1.69 _polling_sem.wait();
|
145 mday 1.61 if(_stop_polling.value() != 0 )
146 {
|
147 kumpf 1.62 break;
|
148 mday 1.61 }
149
|
150 mday 1.53 list->lock();
151 MessageQueueService *service = list->next(0);
152 while(service != NULL)
153 {
154 if(service->_incoming.count() > 0 )
155 {
|
156 mday 1.55 _thread_pool->allocate_and_awaken(service, _req_proc);
|
157 mday 1.53 }
158 service = list->next(service);
159 }
160 list->unlock();
|
161 mday 1.69 if(_check_idle_flag.value() != 0 )
162 {
163 _check_idle_flag = 0;
164 Thread th(kill_idle_threads, 0, true);
165 th.run();
166 }
|
167 mday 1.53 }
168 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
169 return(0);
170 }
171
172
173 Semaphore MessageQueueService::_polling_sem(0);
174 AtomicInt MessageQueueService::_stop_polling(0);
|
175 mday 1.69 AtomicInt MessageQueueService::_check_idle_flag(0);
|
176 mday 1.53
|
177 mday 1.15
|
178 mday 1.1 MessageQueueService::MessageQueueService(const char *name,
179 Uint32 queueID,
180 Uint32 capabilities,
|
181 mday 1.82.6.1 Uint32 mask,
182 int threads)
|
183 mday 1.15 : Base(name, true, queueID),
|
184 mday 1.22
|
185 mday 1.1 _mask(mask),
|
186 mday 1.4 _die(0),
|
187 mday 1.41 _incoming(true, 0),
|
188 mday 1.39 _callback(true),
|
189 mday 1.7 _incoming_queue_shutdown(0),
|
190 mday 1.39 _callback_ready(0),
191 _req_thread(_req_proc, this, false),
192 _callback_thread(_callback_proc, this, false)
|
193 mday 1.53
|
194 mday 1.1 {
|
195 mday 1.60
|
196 mday 1.22 _capabilities = (capabilities | module_capabilities::async);
197
|
198 mday 1.1 _default_op_timeout.tv_sec = 30;
199 _default_op_timeout.tv_usec = 100;
|
200 mday 1.15
201 _meta_dispatcher_mutex.lock(pegasus_thread_self());
202
203 if( _meta_dispatcher == 0 )
204 {
205 PEGASUS_ASSERT( _service_count.value() == 0 );
206 _meta_dispatcher = new cimom();
207 if (_meta_dispatcher == NULL )
208 {
209 _meta_dispatcher_mutex.unlock();
210 throw NullPointer();
211 }
|
212 mday 1.82.6.1 _thread_pool = new ThreadPool(0, "MessageQueueService", 0, threads,
|
213 mday 1.55 create_time, destroy_time, deadlock_time);
|
214 mday 1.61
|
215 kumpf 1.62 _polling_thread = new Thread(polling_routine,
216 reinterpret_cast<void *>(&_polling_list),
217 false);
218 _polling_thread->run();
|
219 mday 1.15 }
220 _service_count++;
221
222 if( false == register_service(name, _capabilities, _mask) )
223 {
224 _meta_dispatcher_mutex.unlock();
|
225 humberto 1.71 //l10n
226 //throw BindFailedException("MessageQueueService Base Unable to register with Meta Dispatcher");
227 MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
|
228 humberto 1.74 "MessageQueueService Base Unable to register with Meta Dispatcher");
|
229 humberto 1.71
230 throw BindFailedException(parms);
|
231 mday 1.15 }
232
|
233 mday 1.53 _polling_list.insert_last(this);
234
|
235 mday 1.15 _meta_dispatcher_mutex.unlock();
|
236 mday 1.39 // _callback_thread.run();
237
|
238 mday 1.53 // _req_thread.run();
|
239 mday 1.1 }
240
|
241 mday 1.4
|
242 mday 1.1 MessageQueueService::~MessageQueueService(void)
243 {
244 _die = 1;
|
245 mday 1.76
246 if (_incoming_queue_shutdown.value() == 0 )
247 {
248 _shutdown_incoming_queue();
249 }
|
250 mday 1.39 _callback_ready.signal();
251
|
252 mday 1.15 _meta_dispatcher_mutex.lock(pegasus_thread_self());
253 _service_count--;
254 if (_service_count.value() == 0 )
255 {
|
256 mday 1.76
|
257 mday 1.53 _stop_polling++;
258 _polling_sem.signal();
|
259 kumpf 1.62 _polling_thread->join();
260 delete _polling_thread;
|
261 kumpf 1.65 _polling_thread = 0;
|
262 mday 1.15 _meta_dispatcher->_shutdown_routed_queue();
263 delete _meta_dispatcher;
|
264 kumpf 1.45 _meta_dispatcher = 0;
|
265 mday 1.76
|
266 kumpf 1.65 delete _thread_pool;
267 _thread_pool = 0;
|
268 mday 1.15 }
269 _meta_dispatcher_mutex.unlock();
|
270 mday 1.53 _polling_list.remove(this);
|
271 konrad.r 1.72 // Clean up in case there are extra stuff on the queue.
272 while (_incoming.count())
273 {
274 delete _incoming.remove_first();
275 }
|
276 mday 1.53 }
|
277 mday 1.1
|
278 mday 1.7 void MessageQueueService::_shutdown_incoming_queue(void)
279 {
280
|
281 mday 1.76
|
282 mday 1.9 if (_incoming_queue_shutdown.value() > 0 )
283 return ;
|
284 mday 1.8 AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
285 0,
286 _queueId,
287 _queueId,
288 true,
289 AsyncIoctl::IO_CLOSE,
290 0,
291 0);
|
292 mday 1.9
|
293 mday 1.8 msg->op = get_op();
|
294 mday 1.41 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
295 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
296 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
297 mday 1.32 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
298 mday 1.41
|
299 mday 1.32 msg->op->_op_dest = this;
|
300 mday 1.8 msg->op->_request.insert_first(msg);
301
302 _incoming.insert_last_wait(msg->op);
|
303 mday 1.32
|
304 mday 1.7 }
305
|
306 mday 1.22
307
308 void MessageQueueService::enqueue(Message *msg) throw(IPCException)
309 {
|
310 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
311
|
312 mday 1.22 Base::enqueue(msg);
|
313 kumpf 1.28
314 PEG_METHOD_EXIT();
|
315 mday 1.22 }
316
317
|
318 mday 1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
319 {
320 Thread *myself = reinterpret_cast<Thread *>(parm);
321 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
322 AsyncOpNode *operation = 0;
323
324 while ( service->_die.value() == 0 )
325 {
326 service->_callback_ready.wait();
327
328 service->_callback.lock();
329 operation = service->_callback.next(0);
330 while( operation != NULL)
331 {
332 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
333 {
334 operation = service->_callback.remove_no_lock(operation);
335 PEGASUS_ASSERT(operation != NULL);
336 operation->_thread_ptr = myself;
337 operation->_service_ptr = service;
338 service->_handle_async_callback(operation);
339 mday 1.39 break;
340 }
341 operation = service->_callback.next(operation);
342 }
343 service->_callback.unlock();
344 }
345 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
346 return(0);
347 }
348
|
349 mday 1.22
|
350 mday 1.5 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
|
351 mday 1.1 {
|
352 mday 1.53 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
|
353 mday 1.5 // pull messages off the incoming queue and dispatch them. then
354 // check pending messages that are non-blocking
|
355 mday 1.7 AsyncOpNode *operation = 0;
356
|
357 mday 1.56 if ( service->_die.value() == 0 )
358 {
|
359 mday 1.41 try
360 {
|
361 mday 1.53 operation = service->_incoming.remove_first();
|
362 mday 1.41 }
363 catch(ListClosed & )
364 {
|
365 mday 1.56 operation = 0;
366
367 return(0);
|
368 mday 1.41 }
369 if( operation )
370 {
371 operation->_service_ptr = service;
372 service->_handle_incoming_operation(operation);
373 }
|
374 mday 1.56 }
|
375 mday 1.44
|
376 mday 1.5 return(0);
|
377 mday 1.1 }
378
|
379 mday 1.43 Uint32 MessageQueueService::get_pending_callback_count(void)
380 {
381 return _callback.count();
382 }
383
384
385
|
386 mday 1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op,
387 MessageQueue *q,
388 void *parm)
389 {
390 op->_client_sem.signal();
391 }
392
|
393 mday 1.30
394 // callback function is responsible for cleaning up all resources
395 // including op, op->_callback_node, and op->_callback_ptr
|
396 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
397 {
|
398 mday 1.39 if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
399 {
400
401 Message *msg = op->get_request();
402 if( msg && ( msg->getMask() & message_mask::ha_async))
403 {
404 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
405 {
406 AsyncLegacyOperationStart *wrapper =
407 static_cast<AsyncLegacyOperationStart *>(msg);
408 msg = wrapper->get_action();
409 delete wrapper;
410 }
411 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
412 {
413 AsyncModuleOperationStart *wrapper =
414 static_cast<AsyncModuleOperationStart *>(msg);
415 msg = wrapper->get_action();
416 delete wrapper;
417 }
418 else if (msg->getType() == async_messages::ASYNC_OP_START)
419 mday 1.39 {
420 AsyncOperationStart *wrapper =
421 static_cast<AsyncOperationStart *>(msg);
422 msg = wrapper->get_action();
423 delete wrapper;
424 }
425 delete msg;
426 }
427
428 msg = op->get_response();
429 if( msg && ( msg->getMask() & message_mask::ha_async))
430 {
431 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
432 {
433 AsyncLegacyOperationResult *wrapper =
434 static_cast<AsyncLegacyOperationResult *>(msg);
435 msg = wrapper->get_result();
436 delete wrapper;
437 }
438 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
439 {
440 mday 1.39 AsyncModuleOperationResult *wrapper =
441 static_cast<AsyncModuleOperationResult *>(msg);
442 msg = wrapper->get_result();
443 delete wrapper;
444 }
445 }
446 void (*callback)(Message *, void *, void *) = op->__async_callback;
447 void *handle = op->_callback_handle;
448 void *parm = op->_callback_parameter;
449 op->release();
450 return_op(op);
451 callback(msg, handle, parm);
452 }
453 else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
454 {
455 // note that _callback_node may be different from op
456 // op->_callback_response_q is a "this" pointer we can use for
457 // static callback methods
458 op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
459 }
|
460 mday 1.22 }
461
|
462 mday 1.1
|
463 mday 1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
464 // Thread *thread,
465 // MessageQueue *queue)
|
466 mday 1.1 {
|
467 mday 1.5 if ( operation != 0 )
468 {
|
469 mday 1.29
|
470 mday 1.22 // ATTN: optimization
471 // << Tue Feb 19 14:10:38 2002 mdd >>
|
472 mday 1.6 operation->lock();
|
473 mday 1.29
|
474 mday 1.6 Message *rq = operation->_request.next(0);
|
475 mday 1.29
|
476 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
477 // move this to the bottom of the loop when the majority of
478 // messages become async messages.
479
|
480 mday 1.18 // divert legacy messages to handleEnqueue
|
481 mday 1.29 if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
|
482 mday 1.18 {
483 rq = operation->_request.remove_first() ;
484 operation->unlock();
485 // delete the op node
|
486 mday 1.39 operation->release();
487 return_op( operation);
|
488 mday 1.24
|
489 mday 1.26 handleEnqueue(rq);
|
490 mday 1.18 return;
491 }
492
|
493 mday 1.39 if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
494 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
|
495 mday 1.29 (operation->_state & ASYNC_OPSTATE_COMPLETE))
496 {
|
497 mday 1.49
|
498 mday 1.29 operation->unlock();
499 _handle_async_callback(operation);
500 }
501 else
502 {
503 PEGASUS_ASSERT(rq != 0 );
504 operation->unlock();
505 _handle_async_request(static_cast<AsyncRequest *>(rq));
506 }
|
507 mday 1.5 }
508 return;
|
509 mday 1.1 }
510
511 void MessageQueueService::_handle_async_request(AsyncRequest *req)
512 {
|
513 mday 1.4 if ( req != 0 )
514 {
515 req->op->processing();
|
516 mday 1.1
|
517 mday 1.4 Uint32 type = req->getType();
518 if( type == async_messages::HEARTBEAT )
519 handle_heartbeat_request(req);
520 else if (type == async_messages::IOCTL)
521 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
522 else if (type == async_messages::CIMSERVICE_START)
523 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
524 else if (type == async_messages::CIMSERVICE_STOP)
525 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
526 else if (type == async_messages::CIMSERVICE_PAUSE)
527 handle_CimServicePause(static_cast<CimServicePause *>(req));
528 else if (type == async_messages::CIMSERVICE_RESUME)
529 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
530 else if ( type == async_messages::ASYNC_OP_START)
531 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
532 else
533 {
534 // we don't handle this request message
535 _make_response(req, async_results::CIM_NAK );
536 }
|
537 mday 1.1 }
538 }
539
|
540 mday 1.17
541 Boolean MessageQueueService::_enqueueResponse(
542 Message* request,
543 Message* response)
|
544 mday 1.18
|
545 mday 1.17 {
|
546 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
547 "MessageQueueService::_enqueueResponse");
|
548 mday 1.25
549 if( request->getMask() & message_mask::ha_async)
550 {
551 if (response->getMask() & message_mask::ha_async )
552 {
553 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
554 static_cast<AsyncReply *>(response),
555 ASYNC_OPSTATE_COMPLETE, 0 );
|
556 kumpf 1.37 PEG_METHOD_EXIT();
|
557 mday 1.25 return true;
558 }
559 }
560
|
561 mday 1.17 if(request->_async != 0 )
562 {
563 Uint32 mask = request->_async->getMask();
|
564 mday 1.18 PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
565
566 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
567 AsyncOpNode *op = async->op;
568 request->_async = 0;
|
569 mday 1.63 // the legacy request is going to be deleted by its handler
|
570 mday 1.27 // remove it from the op node
|
571 mday 1.63
572 static_cast<AsyncLegacyOperationStart *>(async)->get_action();
|
573 mday 1.18
|
574 mday 1.63
|
575 mday 1.18 AsyncLegacyOperationResult *async_result =
576 new AsyncLegacyOperationResult(
577 async->getKey(),
578 async->getRouting(),
579 op,
580 response);
581 _completeAsyncResponse(async,
582 async_result,
583 ASYNC_OPSTATE_COMPLETE,
584 0);
|
585 kumpf 1.37 PEG_METHOD_EXIT();
|
586 mday 1.18 return true;
|
587 mday 1.17 }
|
588 mday 1.18
589 // ensure that the destination queue is in response->dest
|
590 kumpf 1.37 PEG_METHOD_EXIT();
|
591 mday 1.24 return SendForget(response);
|
592 mday 1.18
|
593 mday 1.17 }
594
|
595 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
596 mday 1.1 {
|
597 mday 1.19 cimom::_make_response(req, code);
|
598 mday 1.1 }
599
600
|
601 mday 1.5 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
602 AsyncReply *reply,
603 Uint32 state,
604 Uint32 flag)
605 {
|
606 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
607 "MessageQueueService::_completeAsyncResponse");
608
|
609 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
610 kumpf 1.37
611 PEG_METHOD_EXIT();
|
612 mday 1.5 }
613
614
|
615 mday 1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
616 Uint32 state,
617 Uint32 flag,
618 Uint32 code)
619 {
620 cimom::_complete_op_node(op, state, flag, code);
621 }
622
|
623 mday 1.5
624 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
625 {
|
626 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
627 return false;
628
|
629 mday 1.20 // ATTN optimization remove the message checking altogether in the base
630 // << Mon Feb 18 14:02:20 2002 mdd >>
|
631 mday 1.6 op->lock();
632 Message *rq = op->_request.next(0);
|
633 mday 1.20 Message *rp = op->_response.next(0);
|
634 mday 1.6 op->unlock();
635
|
636 mday 1.22 if( (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&
637 _die.value() == 0 )
|
638 mday 1.5 {
|
639 mday 1.6 _incoming.insert_last_wait(op);
|
640 mday 1.53 _polling_sem.signal();
|
641 mday 1.5 return true;
642 }
643 return false;
644 }
645
646 Boolean MessageQueueService::messageOK(const Message *msg)
647 {
|
648 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
649 return false;
|
650 mday 1.18 return true;
651 }
652
|
653 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
654 {
655 // default action is to echo a heartbeat response
656
657 AsyncReply *reply =
658 new AsyncReply(async_messages::HEARTBEAT,
659 req->getKey(),
660 req->getRouting(),
661 0,
662 req->op,
663 async_results::OK,
664 req->resp,
665 false);
|
666 mday 1.4 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
|
667 mday 1.1 }
668
669
670 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
671 {
672 ;
673 }
674
675 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
676 {
|
677 mday 1.8
678 switch( req->ctl )
679 {
680 case AsyncIoctl::IO_CLOSE:
681 {
|
682 mday 1.76
|
683 mday 1.34 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
|
684 kumpf 1.81
685 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
686 mday 1.79 PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
|
687 kumpf 1.81 #endif
|
688 mday 1.8
|
689 mday 1.76 // respond to this message. this is fire and forget, so we don't need to delete anything.
690 // this takes care of two problems that were being found
691 // << Thu Oct 9 10:52:48 2003 mdd >>
692 _make_response(req, async_results::OK);
|
693 mday 1.8 // ensure we do not accept any further messages
694
695 // ensure we don't recurse on IO_CLOSE
696 if( _incoming_queue_shutdown.value() > 0 )
697 break;
698
699 // set the closing flag
700 service->_incoming_queue_shutdown = 1;
701 // empty out the queue
702 while( 1 )
703 {
704 AsyncOpNode *operation;
705 try
706 {
707 operation = service->_incoming.remove_first();
708 }
709 catch(IPCException & )
710 {
711 break;
712 }
713 if( operation )
714 mday 1.8 {
|
715 mday 1.34 operation->_service_ptr = service;
716 service->_handle_incoming_operation(operation);
|
717 mday 1.8 }
718 else
719 break;
720 } // message processing loop
|
721 mday 1.76
|
722 mday 1.8 // shutdown the AsyncDQueue
723 service->_incoming.shutdown_queue();
724 return;
725 }
726
727 default:
728 _make_response(req, async_results::CIM_NAK);
729 }
|
730 mday 1.1 }
|
731 mday 1.8
|
732 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
733 {
|
734 mday 1.79
|
735 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
736 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
|
737 kumpf 1.81 #endif
|
738 mday 1.79
|
739 mday 1.10 // clear the stoped bit and update
|
740 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
741 mday 1.10 _make_response(req, async_results::OK);
742 // now tell the meta dispatcher we are stopped
743 update_service(_capabilities, _mask);
744
|
745 mday 1.1 }
746 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
747 {
|
748 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
749 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
|
750 kumpf 1.81 #endif
|
751 mday 1.10 // set the stopeed bit and update
752 _capabilities |= module_capabilities::stopped;
753 _make_response(req, async_results::CIM_STOPPED);
754 // now tell the meta dispatcher we are stopped
755 update_service(_capabilities, _mask);
756
|
757 mday 1.1 }
758 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
759 {
|
760 mday 1.10 // set the paused bit and update
|
761 mday 1.13 _capabilities |= module_capabilities::paused;
|
762 mday 1.11 update_service(_capabilities, _mask);
|
763 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
764 // now tell the meta dispatcher we are stopped
|
765 mday 1.1 }
766 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
767 {
|
768 mday 1.10 // clear the paused bit and update
|
769 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
770 mday 1.11 update_service(_capabilities, _mask);
|
771 mday 1.10 _make_response(req, async_results::OK);
772 // now tell the meta dispatcher we are stopped
|
773 mday 1.1 }
774
775 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
776 {
777 _make_response(req, async_results::CIM_NAK);
778 }
779
780 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
781 {
|
782 mday 1.14 ;
783 }
784
|
785 mday 1.10
|
786 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
787 {
788 // remove the legacy message from the request and enqueue it to its destination
789 Uint32 result = async_results::CIM_NAK;
790
|
791 mday 1.25 Message *legacy = req->_act;
|
792 mday 1.14 if ( legacy != 0 )
793 {
|
794 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
795 mday 1.14 if( queue != 0 )
796 {
|
797 mday 1.25 if(queue->isAsync() == true )
798 {
799 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
800 }
801 else
802 {
803 // Enqueue the response:
804 queue->enqueue(req->get_action());
805 }
806
|
807 mday 1.14 result = async_results::OK;
808 }
809 }
810 _make_response(req, result);
811 }
812
813 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
814 {
815 ;
|
816 mday 1.1 }
817
818 AsyncOpNode *MessageQueueService::get_op(void)
819 {
|
820 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
821 mday 1.1
|
822 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
823 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
824 mday 1.4
|
825 mday 1.1 return op;
826 }
827
828 void MessageQueueService::return_op(AsyncOpNode *op)
829 {
830 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
|
831 mday 1.4 delete op;
|
832 mday 1.1 }
833
|
834 mday 1.18
|
835 mday 1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op,
836 Uint32 destination)
837 {
838 PEGASUS_ASSERT(op != 0 );
|
839 mday 1.30 op->lock();
840 op->_op_dest = MessageQueue::lookup(destination);
|
841 mday 1.29 op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
842 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
|
843 mday 1.30 op->unlock();
844 if ( op->_op_dest == 0 )
845 return false;
846
|
847 mday 1.29 return _meta_dispatcher->route_async(op);
848 }
849
|
850 mday 1.39
|
851 mday 1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op,
852 Uint32 destination,
|
853 mday 1.18 void (*callback)(AsyncOpNode *,
|
854 mday 1.32 MessageQueue *,
|
855 mday 1.30 void *),
856 MessageQueue *callback_response_q,
857 void *callback_ptr)
|
858 mday 1.20 {
|
859 mday 1.21 PEGASUS_ASSERT(op != 0 && callback != 0 );
|
860 mday 1.18
|
861 mday 1.21 // get the queue handle for the destination
862
|
863 mday 1.30 op->lock();
|
864 mday 1.32 op->_op_dest = MessageQueue::lookup(destination); // destination of this message
|
865 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
866 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
867 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
868 mday 1.30 // initialize the callback data
|
869 mday 1.32 op->_async_callback = callback; // callback function to be executed by recpt. of response
870 op->_callback_node = op; // the op node
871 op->_callback_response_q = callback_response_q; // the queue that will receive the response
872 op->_callback_ptr = callback_ptr; // user data for callback
873 op->_callback_request_q = this; // I am the originator of this request
|
874 mday 1.30
875 op->unlock();
876 if(op->_op_dest == 0)
877 return false;
|
878 mday 1.21
879 return _meta_dispatcher->route_async(op);
|
880 mday 1.18 }
881
882
|
883 mday 1.39 Boolean MessageQueueService::SendAsync(Message *msg,
884 Uint32 destination,
885 void (*callback)(Message *response,
886 void *handle,
887 void *parameter),
888 void *handle,
889 void *parameter)
890 {
891 if(msg == NULL)
892 return false;
893 if(callback == NULL)
894 return SendForget(msg);
895 AsyncOpNode *op = get_op();
|
896 mday 1.43 msg->dest = destination;
|
897 mday 1.39 if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
898 {
899 op->release();
900 return_op(op);
901 return false;
902 }
903 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
904 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
905 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
906 op->__async_callback = callback;
907 op->_callback_node = op;
908 op->_callback_handle = handle;
909 op->_callback_parameter = parameter;
910 op->_callback_response_q = this;
911
912
913 if( ! (msg->getMask() & message_mask::ha_async) )
914 {
915 AsyncLegacyOperationStart *wrapper =
916 new AsyncLegacyOperationStart(get_next_xid(),
917 op,
918 mday 1.39 destination,
919 msg,
920 destination);
921 }
922 else
923 {
924 op->_request.insert_first(msg);
925 (static_cast<AsyncMessage *>(msg))->op = op;
926 }
927
928 _callback.insert_last(op);
929 return _meta_dispatcher->route_async(op);
930 }
931
932
|
933 mday 1.18 Boolean MessageQueueService::SendForget(Message *msg)
934 {
935
|
936 mday 1.24
|
937 mday 1.18 AsyncOpNode *op = 0;
|
938 mday 1.22 Uint32 mask = msg->getMask();
939
940 if (mask & message_mask::ha_async)
|
941 mday 1.18 {
942 op = (static_cast<AsyncMessage *>(msg))->op ;
943 }
|
944 mday 1.22
|
945 mday 1.18 if( op == 0 )
|
946 mday 1.20 {
|
947 mday 1.18 op = get_op();
|
948 mday 1.20 op->_request.insert_first(msg);
|
949 mday 1.22 if (mask & message_mask::ha_async)
950 (static_cast<AsyncMessage *>(msg))->op = op;
|
951 mday 1.20 }
|
952 mday 1.30 op->_op_dest = MessageQueue::lookup(msg->dest);
|
953 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
954 mday 1.41 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
955 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
956 mday 1.22 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
957 mday 1.30 if ( op->_op_dest == 0 )
|
958 mday 1.39 {
959 op->release();
960 return_op(op);
|
961 mday 1.21 return false;
|
962 mday 1.39 }
|
963 mday 1.24
|
964 mday 1.18 // now see if the meta dispatcher will take it
|
965 mday 1.30 return _meta_dispatcher->route_async(op);
|
966 mday 1.18 }
|
967 mday 1.2
|
968 mday 1.1
|
969 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
970 mday 1.1 {
|
971 mday 1.4 if ( request == 0 )
972 return 0 ;
|
973 mday 1.5
974 Boolean destroy_op = false;
975
|
976 s.hills 1.80 if (request->op == 0)
|
977 mday 1.5 {
978 request->op = get_op();
|
979 mday 1.7 request->op->_request.insert_first(request);
|
980 mday 1.5 destroy_op = true;
981 }
|
982 mday 1.4
|
983 mday 1.33 request->block = false;
|
984 mday 1.35 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
|
985 mday 1.33 SendAsync(request->op,
986 request->dest,
|
987 mday 1.36 _sendwait_callback,
|
988 mday 1.33 this,
989 (void *)0);
|
990 mday 1.4
|
991 mday 1.33 request->op->_client_sem.wait();
|
992 mday 1.6 request->op->lock();
993 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
994 rpl->op = 0;
995 request->op->unlock();
996
|
997 mday 1.5 if( destroy_op == true)
998 {
|
999 mday 1.6 request->op->lock();
1000 request->op->_request.remove(request);
1001 request->op->_state |= ASYNC_OPSTATE_RELEASED;
1002 request->op->unlock();
1003 return_op(request->op);
|
1004 mday 1.7 request->op = 0;
|
1005 mday 1.5 }
1006 return rpl;
|
1007 mday 1.1 }
1008
1009
1010 Boolean MessageQueueService::register_service(String name,
1011 Uint32 capabilities,
1012 Uint32 mask)
1013
1014 {
1015 RegisterCimService *msg = new RegisterCimService(get_next_xid(),
|
1016 mday 1.5 0,
|
1017 mday 1.1 true,
1018 name,
1019 capabilities,
1020 mask,
1021 _queueId);
|
1022 mday 1.44 msg->dest = CIMOM_Q_ID;
1023
|
1024 mday 1.1 Boolean registered = false;
|
1025 mday 1.7 AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
|
1026 mday 1.1
|
1027 mday 1.2 if ( reply != 0 )
|
1028 mday 1.1 {
1029 if(reply->getMask() & message_mask:: ha_async)
1030 {
1031 if(reply->getMask() & message_mask::ha_reply)
1032 {
|
1033 mday 1.15 if(reply->result == async_results::OK ||
1034 reply->result == async_results::MODULE_ALREADY_REGISTERED )
|
1035 mday 1.1 registered = true;
1036 }
1037 }
1038
|
1039 mday 1.7 delete reply;
|
1040 mday 1.1 }
|
1041 mday 1.5 delete msg;
|
1042 mday 1.1 return registered;
1043 }
1044
1045 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1046 {
1047
1048
1049 UpdateCimService *msg = new UpdateCimService(get_next_xid(),
|
1050 mday 1.5 0,
|
1051 mday 1.1 true,
1052 _queueId,
1053 _capabilities,
1054 _mask);
1055 Boolean registered = false;
|
1056 mday 1.2
1057 AsyncMessage *reply = SendWait(msg);
1058 if (reply)
|
1059 mday 1.1 {
1060 if(reply->getMask() & message_mask:: ha_async)
1061 {
1062 if(reply->getMask() & message_mask::ha_reply)
1063 {
|
1064 mday 1.2 if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
|
1065 mday 1.1 registered = true;
1066 }
1067 }
1068 delete reply;
1069 }
|
1070 mday 1.5 delete msg;
|
1071 mday 1.1 return registered;
1072 }
1073
1074
1075 Boolean MessageQueueService::deregister_service(void)
1076 {
|
1077 mday 1.3
|
1078 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
1079 return true;
|
1080 mday 1.1 }
1081
1082
1083 void MessageQueueService::find_services(String name,
1084 Uint32 capabilities,
1085 Uint32 mask,
1086 Array<Uint32> *results)
1087 {
1088
1089 if( results == 0 )
1090 throw NullPointer();
|
1091 mday 1.5
|
1092 mday 1.1 results->clear();
1093
1094 FindServiceQueue *req =
1095 new FindServiceQueue(get_next_xid(),
|
1096 mday 1.5 0,
|
1097 mday 1.1 _queueId,
1098 true,
1099 name,
1100 capabilities,
1101 mask);
|
1102 mday 1.44
1103 req->dest = CIMOM_Q_ID;
|
1104 mday 1.1
|
1105 mday 1.2 AsyncMessage *reply = SendWait(req);
1106 if(reply)
|
1107 mday 1.1 {
1108 if( reply->getMask() & message_mask::ha_async)
1109 {
1110 if(reply->getMask() & message_mask::ha_reply)
1111 {
1112 if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1113 {
1114 if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1115 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1116 }
1117 }
1118 }
1119 delete reply;
1120 }
|
1121 mday 1.5 delete req;
|
1122 mday 1.1 return ;
1123 }
1124
1125 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1126 {
1127 if(result == 0)
1128 throw NullPointer();
1129
1130 EnumerateService *req
1131 = new EnumerateService(get_next_xid(),
|
1132 mday 1.5 0,
|
1133 mday 1.1 _queueId,
1134 true,
1135 queue);
1136
|
1137 mday 1.2 AsyncMessage *reply = SendWait(req);
|
1138 mday 1.1
|
1139 mday 1.2 if (reply)
|
1140 mday 1.1 {
1141 Boolean found = false;
1142
1143 if( reply->getMask() & message_mask::ha_async)
1144 {
1145 if(reply->getMask() & message_mask::ha_reply)
1146 {
1147 if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1148 {
1149 if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1150 {
1151 if( found == false)
1152 {
1153 found = true;
1154
1155 result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1156 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1157 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1158 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1159 }
1160 }
1161 mday 1.1 }
1162 }
1163 }
1164 delete reply;
1165 }
|
1166 mday 1.5 delete req;
1167
|
1168 mday 1.1 return;
1169 }
1170
1171 Uint32 MessageQueueService::get_next_xid(void)
1172 {
|
1173 mday 1.78 static Mutex _monitor;
1174 Uint32 value;
1175 _monitor.lock(pegasus_thread_self());
1176
|
1177 mday 1.1 _xid++;
|
1178 mday 1.78 value = _xid.value();
1179 _monitor.unlock();
1180 return value;
1181
|
1182 mday 1.1 }
1183
1184 PEGASUS_NAMESPACE_END
|