1 karl 1.88 //%2004////////////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 karl 1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.82 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 mday 1.1 //
10 // Permission is hereby granted, free of charge, to any person obtaining a copy
11 // of this software and associated documentation files (the "Software"), to
12 // deal in the Software without restriction, including without limitation the
13 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
14 // sell copies of the Software, and to permit persons to whom the Software is
15 // furnished to do so, subject to the following conditions:
|
16 karl 1.82 //
|
17 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
18 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
19 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
20 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
21 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
22 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
23 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
24 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
25 //
26 //==============================================================================
27 //
28 // Author: Mike Day (mdday@us.ibm.com)
29 //
30 // Modified By:
|
31 a.arora 1.87 // Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090
|
32 mday 1.1 //
33 //%/////////////////////////////////////////////////////////////////////////////
34
35 #include "MessageQueueService.h"
|
36 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
37 humberto 1.71 #include <Pegasus/Common/MessageLoader.h> //l10n
|
38 mday 1.1
39 PEGASUS_NAMESPACE_BEGIN
40
|
41 mday 1.15
42 cimom *MessageQueueService::_meta_dispatcher = 0;
43 AtomicInt MessageQueueService::_service_count = 0;
44 AtomicInt MessageQueueService::_xid(1);
|
45 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
46 mday 1.15
|
47 mday 1.58 static struct timeval create_time = {0, 1};
|
48 mday 1.70 static struct timeval destroy_time = {300, 0};
|
49 mday 1.59 static struct timeval deadlock_time = {0, 0};
|
50 mday 1.53
|
51 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
52 mday 1.53
53 DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
54
|
55 kumpf 1.62 Thread* MessageQueueService::_polling_thread = 0;
|
56 mday 1.61
|
57 mday 1.69 ThreadPool *MessageQueueService::get_thread_pool(void)
58 {
59 return _thread_pool;
60 }
61
62 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::kill_idle_threads(void *parm)
|
63 mday 1.53 {
|
64 mday 1.69
65 static struct timeval now, last = {0,0};
|
66 mday 1.53 gettimeofday(&now, NULL);
|
67 mday 1.56 int dead_threads = 0;
|
68 mday 1.53
|
69 mday 1.70 if( now.tv_sec - last.tv_sec > 120 )
|
70 mday 1.53 {
71 gettimeofday(&last, NULL);
|
72 mday 1.56 try
73 {
|
74 mday 1.69 dead_threads = MessageQueueService::_thread_pool->kill_dead_threads();
|
75 mday 1.56 }
|
76 mday 1.69 catch(...)
|
77 mday 1.56 {
78
79 }
|
80 mday 1.53 }
|
81 mday 1.69 return (PEGASUS_THREAD_RETURN)dead_threads;
|
82 mday 1.53 }
83
|
84 mday 1.68
|
85 mday 1.76 void MessageQueueService::force_shutdown(Boolean destroy_flag)
|
86 mday 1.68 {
|
87 mday 1.79 return;
88
|
89 kumpf 1.75 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
90 humberto 1.71 //l10n
91 MessageLoaderParms parms("Common.MessageQueueService.FORCING_SHUTDOWN",
|
92 humberto 1.74 "Forcing shutdown of CIMOM Message Router");
|
93 humberto 1.71 PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);
|
94 kumpf 1.75 #endif
|
95 mday 1.76
|
96 kumpf 1.75
|
97 mday 1.68 MessageQueueService *svc;
|
98 konrad.r 1.73 int counter = 0;
|
99 mday 1.68 _polling_list.lock();
100 svc = _polling_list.next(0);
|
101 humberto 1.71
|
102 mday 1.68 while(svc != 0)
103 {
|
104 kumpf 1.75 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
105 humberto 1.71 //l10n - reuse same MessageLoaderParms to avoid multiple creates
106 parms.msg_id = "Common.MessageQueueService.STOPPING_SERVICE";
107 parms.default_msg = "Stopping $0";
108 parms.arg0 = svc->getQueueName();
109 PEGASUS_STD(cout) << MessageLoader::getMessage(parms) << PEGASUS_STD(endl);
|
110 kumpf 1.75 #endif
|
111 humberto 1.71
|
112 mday 1.68 _polling_sem.signal();
113 svc->_shutdown_incoming_queue();
|
114 konrad.r 1.73 counter++;
|
115 mday 1.68 _polling_sem.signal();
116 svc = _polling_list.next(svc);
117 }
118 _polling_list.unlock();
|
119 konrad.r 1.73
120 _polling_sem.signal();
121
|
122 mday 1.76 MessageQueueService::_stop_polling = 1;
123
124 if(destroy_flag == true)
125 {
126
127 svc = _polling_list.remove_last();
128 while(svc)
129 {
130 delete svc;
131 svc = _polling_list.remove_last();
132 }
133
|
134 konrad.r 1.73 }
|
135 mday 1.68 }
136
137
|
138 mday 1.53 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
139 {
140 Thread *myself = reinterpret_cast<Thread *>(parm);
141 DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
142 while ( _stop_polling.value() == 0 )
143 {
|
144 mday 1.69 _polling_sem.wait();
|
145 mday 1.61 if(_stop_polling.value() != 0 )
146 {
|
147 kumpf 1.62 break;
|
148 mday 1.61 }
149
|
150 mday 1.53 list->lock();
151 MessageQueueService *service = list->next(0);
152 while(service != NULL)
153 {
154 if(service->_incoming.count() > 0 )
155 {
|
156 mday 1.55 _thread_pool->allocate_and_awaken(service, _req_proc);
|
157 mday 1.53 }
158 service = list->next(service);
159 }
160 list->unlock();
|
161 mday 1.69 if(_check_idle_flag.value() != 0 )
162 {
163 _check_idle_flag = 0;
|
164 kumpf 1.84
165 // If there are insufficent resources to run
166 // kill_idle_threads, then just return.
167 _thread_pool->allocate_and_awaken(service, kill_idle_threads);
|
168 mday 1.69 }
|
169 mday 1.53 }
170 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
171 return(0);
172 }
173
174
175 Semaphore MessageQueueService::_polling_sem(0);
176 AtomicInt MessageQueueService::_stop_polling(0);
|
177 mday 1.69 AtomicInt MessageQueueService::_check_idle_flag(0);
|
178 mday 1.53
|
179 mday 1.15
|
180 mday 1.1 MessageQueueService::MessageQueueService(const char *name,
181 Uint32 queueID,
182 Uint32 capabilities,
183 Uint32 mask)
|
184 mday 1.15 : Base(name, true, queueID),
|
185 mday 1.22
|
186 mday 1.1 _mask(mask),
|
187 mday 1.4 _die(0),
|
188 mday 1.41 _incoming(true, 0),
|
189 mday 1.39 _callback(true),
|
190 mday 1.7 _incoming_queue_shutdown(0),
|
191 mday 1.39 _callback_ready(0),
192 _req_thread(_req_proc, this, false),
193 _callback_thread(_callback_proc, this, false)
|
194 mday 1.53
|
195 mday 1.1 {
|
196 mday 1.60
|
197 mday 1.22 _capabilities = (capabilities | module_capabilities::async);
198
|
199 mday 1.1 _default_op_timeout.tv_sec = 30;
200 _default_op_timeout.tv_usec = 100;
|
201 mday 1.15
|
202 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
|
203 mday 1.15
204 if( _meta_dispatcher == 0 )
205 {
206 PEGASUS_ASSERT( _service_count.value() == 0 );
207 _meta_dispatcher = new cimom();
208 if (_meta_dispatcher == NULL )
209 {
|
210 a.arora 1.87 throw NullPointer();
|
211 mday 1.15 }
|
212 mday 1.58 _thread_pool = new ThreadPool(0, "MessageQueueService", 0, 0,
|
213 mday 1.55 create_time, destroy_time, deadlock_time);
|
214 mday 1.61
|
215 kumpf 1.62 _polling_thread = new Thread(polling_routine,
216 reinterpret_cast<void *>(&_polling_list),
217 false);
|
218 kumpf 1.83 while (!_polling_thread->run())
219 {
220 pegasus_yield();
221 }
|
222 mday 1.15 }
223 _service_count++;
224
225 if( false == register_service(name, _capabilities, _mask) )
226 {
|
227 humberto 1.71 //l10n
228 //throw BindFailedException("MessageQueueService Base Unable to register with Meta Dispatcher");
229 MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
|
230 humberto 1.74 "MessageQueueService Base Unable to register with Meta Dispatcher");
|
231 humberto 1.71
232 throw BindFailedException(parms);
|
233 mday 1.15 }
234
|
235 mday 1.53 _polling_list.insert_last(this);
236
|
237 a.arora 1.87 // _meta_dispatcher_mutex.unlock(); //Bug#1090
|
238 mday 1.39 // _callback_thread.run();
239
|
240 mday 1.53 // _req_thread.run();
|
241 mday 1.1 }
242
|
243 mday 1.4
|
244 mday 1.1 MessageQueueService::~MessageQueueService(void)
245 {
246 _die = 1;
|
247 mday 1.76
248 if (_incoming_queue_shutdown.value() == 0 )
249 {
250 _shutdown_incoming_queue();
251 }
|
252 mday 1.39 _callback_ready.signal();
253
|
254 mday 1.15 {
|
255 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
256 _service_count--;
257 if (_service_count.value() == 0 )
258 {
|
259 mday 1.76
|
260 mday 1.53 _stop_polling++;
261 _polling_sem.signal();
|
262 kumpf 1.62 _polling_thread->join();
263 delete _polling_thread;
|
264 kumpf 1.65 _polling_thread = 0;
|
265 mday 1.15 _meta_dispatcher->_shutdown_routed_queue();
266 delete _meta_dispatcher;
|
267 kumpf 1.45 _meta_dispatcher = 0;
|
268 mday 1.76
|
269 kumpf 1.65 delete _thread_pool;
270 _thread_pool = 0;
|
271 a.arora 1.87 }
272 } // mutex unlocks here
|
273 mday 1.53 _polling_list.remove(this);
|
274 konrad.r 1.72 // Clean up in case there are extra stuff on the queue.
275 while (_incoming.count())
276 {
277 delete _incoming.remove_first();
278 }
|
279 mday 1.53 }
|
280 mday 1.1
|
281 mday 1.7 void MessageQueueService::_shutdown_incoming_queue(void)
282 {
283
|
284 mday 1.76
|
285 mday 1.9 if (_incoming_queue_shutdown.value() > 0 )
286 return ;
|
287 mday 1.8 AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
288 0,
289 _queueId,
290 _queueId,
291 true,
292 AsyncIoctl::IO_CLOSE,
293 0,
294 0);
|
295 mday 1.9
|
296 mday 1.8 msg->op = get_op();
|
297 mday 1.41 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
298 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
299 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
300 mday 1.32 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
301 mday 1.41
|
302 mday 1.32 msg->op->_op_dest = this;
|
303 mday 1.8 msg->op->_request.insert_first(msg);
304
305 _incoming.insert_last_wait(msg->op);
|
306 mday 1.32
|
307 mday 1.7 }
308
|
309 mday 1.22
310
|
311 kumpf 1.85 void MessageQueueService::enqueue(Message *msg)
|
312 mday 1.22 {
|
313 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
314
|
315 mday 1.22 Base::enqueue(msg);
|
316 kumpf 1.28
317 PEG_METHOD_EXIT();
|
318 mday 1.22 }
319
320
|
321 mday 1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
322 {
323 Thread *myself = reinterpret_cast<Thread *>(parm);
324 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
325 AsyncOpNode *operation = 0;
326
327 while ( service->_die.value() == 0 )
328 {
329 service->_callback_ready.wait();
330
331 service->_callback.lock();
332 operation = service->_callback.next(0);
333 while( operation != NULL)
334 {
335 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
336 {
337 operation = service->_callback.remove_no_lock(operation);
338 PEGASUS_ASSERT(operation != NULL);
339 operation->_thread_ptr = myself;
340 operation->_service_ptr = service;
341 service->_handle_async_callback(operation);
342 mday 1.39 break;
343 }
344 operation = service->_callback.next(operation);
345 }
346 service->_callback.unlock();
347 }
348 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
349 return(0);
350 }
351
|
352 mday 1.22
|
353 mday 1.5 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
|
354 mday 1.1 {
|
355 mday 1.53 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
|
356 mday 1.5 // pull messages off the incoming queue and dispatch them. then
357 // check pending messages that are non-blocking
|
358 mday 1.7 AsyncOpNode *operation = 0;
359
|
360 mday 1.56 if ( service->_die.value() == 0 )
361 {
|
362 mday 1.41 try
363 {
|
364 mday 1.53 operation = service->_incoming.remove_first();
|
365 mday 1.41 }
366 catch(ListClosed & )
367 {
|
368 mday 1.56 operation = 0;
369
370 return(0);
|
371 mday 1.41 }
372 if( operation )
373 {
374 operation->_service_ptr = service;
375 service->_handle_incoming_operation(operation);
376 }
|
377 mday 1.56 }
|
378 mday 1.44
|
379 mday 1.5 return(0);
|
380 mday 1.1 }
381
|
382 mday 1.43 Uint32 MessageQueueService::get_pending_callback_count(void)
383 {
384 return _callback.count();
385 }
386
387
388
|
389 mday 1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op,
390 MessageQueue *q,
391 void *parm)
392 {
393 op->_client_sem.signal();
394 }
395
|
396 mday 1.30
397 // callback function is responsible for cleaning up all resources
398 // including op, op->_callback_node, and op->_callback_ptr
|
399 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
400 {
|
401 mday 1.39 if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
402 {
403
404 Message *msg = op->get_request();
405 if( msg && ( msg->getMask() & message_mask::ha_async))
406 {
407 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
408 {
409 AsyncLegacyOperationStart *wrapper =
410 static_cast<AsyncLegacyOperationStart *>(msg);
411 msg = wrapper->get_action();
412 delete wrapper;
413 }
414 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
415 {
416 AsyncModuleOperationStart *wrapper =
417 static_cast<AsyncModuleOperationStart *>(msg);
418 msg = wrapper->get_action();
419 delete wrapper;
420 }
421 else if (msg->getType() == async_messages::ASYNC_OP_START)
422 mday 1.39 {
423 AsyncOperationStart *wrapper =
424 static_cast<AsyncOperationStart *>(msg);
425 msg = wrapper->get_action();
426 delete wrapper;
427 }
428 delete msg;
429 }
430
431 msg = op->get_response();
432 if( msg && ( msg->getMask() & message_mask::ha_async))
433 {
434 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
435 {
436 AsyncLegacyOperationResult *wrapper =
437 static_cast<AsyncLegacyOperationResult *>(msg);
438 msg = wrapper->get_result();
439 delete wrapper;
440 }
441 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
442 {
443 mday 1.39 AsyncModuleOperationResult *wrapper =
444 static_cast<AsyncModuleOperationResult *>(msg);
445 msg = wrapper->get_result();
446 delete wrapper;
447 }
448 }
449 void (*callback)(Message *, void *, void *) = op->__async_callback;
450 void *handle = op->_callback_handle;
451 void *parm = op->_callback_parameter;
452 op->release();
453 return_op(op);
454 callback(msg, handle, parm);
455 }
456 else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
457 {
458 // note that _callback_node may be different from op
459 // op->_callback_response_q is a "this" pointer we can use for
460 // static callback methods
461 op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
462 }
|
463 mday 1.22 }
464
|
465 mday 1.1
|
466 mday 1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
467 // Thread *thread,
468 // MessageQueue *queue)
|
469 mday 1.1 {
|
470 mday 1.5 if ( operation != 0 )
471 {
|
472 mday 1.29
|
473 mday 1.22 // ATTN: optimization
474 // << Tue Feb 19 14:10:38 2002 mdd >>
|
475 mday 1.6 operation->lock();
|
476 mday 1.29
|
477 mday 1.6 Message *rq = operation->_request.next(0);
|
478 mday 1.29
|
479 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
480 // move this to the bottom of the loop when the majority of
481 // messages become async messages.
482
|
483 mday 1.18 // divert legacy messages to handleEnqueue
|
484 mday 1.29 if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
|
485 mday 1.18 {
486 rq = operation->_request.remove_first() ;
487 operation->unlock();
488 // delete the op node
|
489 mday 1.39 operation->release();
490 return_op( operation);
|
491 mday 1.24
|
492 mday 1.26 handleEnqueue(rq);
|
493 mday 1.18 return;
494 }
495
|
496 mday 1.39 if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
497 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
|
498 mday 1.29 (operation->_state & ASYNC_OPSTATE_COMPLETE))
499 {
|
500 mday 1.49
|
501 mday 1.29 operation->unlock();
502 _handle_async_callback(operation);
503 }
504 else
505 {
506 PEGASUS_ASSERT(rq != 0 );
507 operation->unlock();
508 _handle_async_request(static_cast<AsyncRequest *>(rq));
509 }
|
510 mday 1.5 }
511 return;
|
512 mday 1.1 }
513
514 void MessageQueueService::_handle_async_request(AsyncRequest *req)
515 {
|
516 mday 1.4 if ( req != 0 )
517 {
518 req->op->processing();
|
519 mday 1.1
|
520 mday 1.4 Uint32 type = req->getType();
521 if( type == async_messages::HEARTBEAT )
522 handle_heartbeat_request(req);
523 else if (type == async_messages::IOCTL)
524 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
525 else if (type == async_messages::CIMSERVICE_START)
526 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
527 else if (type == async_messages::CIMSERVICE_STOP)
528 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
529 else if (type == async_messages::CIMSERVICE_PAUSE)
530 handle_CimServicePause(static_cast<CimServicePause *>(req));
531 else if (type == async_messages::CIMSERVICE_RESUME)
532 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
533 else if ( type == async_messages::ASYNC_OP_START)
534 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
535 else
536 {
537 // we don't handle this request message
538 _make_response(req, async_results::CIM_NAK );
539 }
|
540 mday 1.1 }
541 }
542
|
543 mday 1.17
544 Boolean MessageQueueService::_enqueueResponse(
545 Message* request,
546 Message* response)
|
547 mday 1.18
|
548 mday 1.17 {
|
549 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
550 "MessageQueueService::_enqueueResponse");
|
551 mday 1.25
552 if( request->getMask() & message_mask::ha_async)
553 {
554 if (response->getMask() & message_mask::ha_async )
555 {
556 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
557 static_cast<AsyncReply *>(response),
558 ASYNC_OPSTATE_COMPLETE, 0 );
|
559 kumpf 1.37 PEG_METHOD_EXIT();
|
560 mday 1.25 return true;
561 }
562 }
563
|
564 mday 1.17 if(request->_async != 0 )
565 {
566 Uint32 mask = request->_async->getMask();
|
567 mday 1.18 PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
568
569 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
570 AsyncOpNode *op = async->op;
571 request->_async = 0;
|
572 mday 1.63 // the legacy request is going to be deleted by its handler
|
573 mday 1.27 // remove it from the op node
|
574 mday 1.63
575 static_cast<AsyncLegacyOperationStart *>(async)->get_action();
|
576 mday 1.18
|
577 mday 1.63
|
578 mday 1.18 AsyncLegacyOperationResult *async_result =
579 new AsyncLegacyOperationResult(
580 async->getKey(),
581 async->getRouting(),
582 op,
583 response);
584 _completeAsyncResponse(async,
585 async_result,
586 ASYNC_OPSTATE_COMPLETE,
587 0);
|
588 kumpf 1.37 PEG_METHOD_EXIT();
|
589 mday 1.18 return true;
|
590 mday 1.17 }
|
591 mday 1.18
592 // ensure that the destination queue is in response->dest
|
593 kumpf 1.37 PEG_METHOD_EXIT();
|
594 mday 1.24 return SendForget(response);
|
595 mday 1.18
|
596 mday 1.17 }
597
|
598 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
599 mday 1.1 {
|
600 mday 1.19 cimom::_make_response(req, code);
|
601 mday 1.1 }
602
603
|
604 mday 1.5 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
605 AsyncReply *reply,
606 Uint32 state,
607 Uint32 flag)
608 {
|
609 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
610 "MessageQueueService::_completeAsyncResponse");
611
|
612 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
613 kumpf 1.37
614 PEG_METHOD_EXIT();
|
615 mday 1.5 }
616
617
|
618 mday 1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
619 Uint32 state,
620 Uint32 flag,
621 Uint32 code)
622 {
623 cimom::_complete_op_node(op, state, flag, code);
624 }
625
|
626 mday 1.5
627 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
628 {
|
629 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
630 return false;
631
|
632 mday 1.20 // ATTN optimization remove the message checking altogether in the base
633 // << Mon Feb 18 14:02:20 2002 mdd >>
|
634 mday 1.6 op->lock();
635 Message *rq = op->_request.next(0);
|
636 mday 1.20 Message *rp = op->_response.next(0);
|
637 mday 1.6 op->unlock();
638
|
639 mday 1.22 if( (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&
640 _die.value() == 0 )
|
641 mday 1.5 {
|
642 mday 1.6 _incoming.insert_last_wait(op);
|
643 mday 1.53 _polling_sem.signal();
|
644 mday 1.5 return true;
645 }
646 return false;
647 }
648
649 Boolean MessageQueueService::messageOK(const Message *msg)
650 {
|
651 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
652 return false;
|
653 mday 1.18 return true;
654 }
655
|
656 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
657 {
658 // default action is to echo a heartbeat response
659
660 AsyncReply *reply =
661 new AsyncReply(async_messages::HEARTBEAT,
662 req->getKey(),
663 req->getRouting(),
664 0,
665 req->op,
666 async_results::OK,
667 req->resp,
668 false);
|
669 mday 1.4 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
|
670 mday 1.1 }
671
672
673 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
674 {
675 ;
676 }
677
678 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
679 {
|
680 mday 1.8
681 switch( req->ctl )
682 {
683 case AsyncIoctl::IO_CLOSE:
684 {
|
685 mday 1.76
|
686 mday 1.34 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
|
687 kumpf 1.81
688 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
689 mday 1.79 PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
|
690 kumpf 1.81 #endif
|
691 mday 1.8
|
692 mday 1.76 // respond to this message. this is fire and forget, so we don't need to delete anything.
693 // this takes care of two problems that were being found
694 // << Thu Oct 9 10:52:48 2003 mdd >>
695 _make_response(req, async_results::OK);
|
696 mday 1.8 // ensure we do not accept any further messages
697
698 // ensure we don't recurse on IO_CLOSE
699 if( _incoming_queue_shutdown.value() > 0 )
700 break;
701
702 // set the closing flag
703 service->_incoming_queue_shutdown = 1;
704 // empty out the queue
705 while( 1 )
706 {
707 AsyncOpNode *operation;
708 try
709 {
710 operation = service->_incoming.remove_first();
711 }
712 catch(IPCException & )
713 {
714 break;
715 }
716 if( operation )
717 mday 1.8 {
|
718 mday 1.34 operation->_service_ptr = service;
719 service->_handle_incoming_operation(operation);
|
720 mday 1.8 }
721 else
722 break;
723 } // message processing loop
|
724 mday 1.76
|
725 mday 1.8 // shutdown the AsyncDQueue
726 service->_incoming.shutdown_queue();
727 return;
728 }
729
730 default:
731 _make_response(req, async_results::CIM_NAK);
732 }
|
733 mday 1.1 }
|
734 mday 1.8
|
735 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
736 {
|
737 mday 1.79
|
738 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
739 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
|
740 kumpf 1.81 #endif
|
741 mday 1.79
|
742 mday 1.10 // clear the stoped bit and update
|
743 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
744 mday 1.10 _make_response(req, async_results::OK);
745 // now tell the meta dispatcher we are stopped
746 update_service(_capabilities, _mask);
747
|
748 mday 1.1 }
749 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
750 {
|
751 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
752 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
|
753 kumpf 1.81 #endif
|
754 mday 1.10 // set the stopeed bit and update
755 _capabilities |= module_capabilities::stopped;
756 _make_response(req, async_results::CIM_STOPPED);
757 // now tell the meta dispatcher we are stopped
758 update_service(_capabilities, _mask);
759
|
760 mday 1.1 }
761 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
762 {
|
763 mday 1.10 // set the paused bit and update
|
764 mday 1.13 _capabilities |= module_capabilities::paused;
|
765 mday 1.11 update_service(_capabilities, _mask);
|
766 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
767 // now tell the meta dispatcher we are stopped
|
768 mday 1.1 }
769 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
770 {
|
771 mday 1.10 // clear the paused bit and update
|
772 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
773 mday 1.11 update_service(_capabilities, _mask);
|
774 mday 1.10 _make_response(req, async_results::OK);
775 // now tell the meta dispatcher we are stopped
|
776 mday 1.1 }
777
778 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
779 {
780 _make_response(req, async_results::CIM_NAK);
781 }
782
783 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
784 {
|
785 mday 1.14 ;
786 }
787
|
788 mday 1.10
|
789 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
790 {
791 // remove the legacy message from the request and enqueue it to its destination
792 Uint32 result = async_results::CIM_NAK;
793
|
794 mday 1.25 Message *legacy = req->_act;
|
795 mday 1.14 if ( legacy != 0 )
796 {
|
797 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
798 mday 1.14 if( queue != 0 )
799 {
|
800 mday 1.25 if(queue->isAsync() == true )
801 {
802 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
803 }
804 else
805 {
806 // Enqueue the response:
807 queue->enqueue(req->get_action());
808 }
809
|
810 mday 1.14 result = async_results::OK;
811 }
812 }
813 _make_response(req, result);
814 }
815
816 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
817 {
818 ;
|
819 mday 1.1 }
820
821 AsyncOpNode *MessageQueueService::get_op(void)
822 {
|
823 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
824 mday 1.1
|
825 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
826 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
827 mday 1.4
|
828 mday 1.1 return op;
829 }
830
831 void MessageQueueService::return_op(AsyncOpNode *op)
832 {
833 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
|
834 mday 1.4 delete op;
|
835 mday 1.1 }
836
|
837 mday 1.18
|
838 mday 1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op,
839 Uint32 destination)
840 {
841 PEGASUS_ASSERT(op != 0 );
|
842 mday 1.30 op->lock();
843 op->_op_dest = MessageQueue::lookup(destination);
|
844 mday 1.29 op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
845 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
|
846 mday 1.30 op->unlock();
847 if ( op->_op_dest == 0 )
848 return false;
849
|
850 mday 1.29 return _meta_dispatcher->route_async(op);
851 }
852
|
853 mday 1.39
|
854 mday 1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op,
855 Uint32 destination,
|
856 mday 1.18 void (*callback)(AsyncOpNode *,
|
857 mday 1.32 MessageQueue *,
|
858 mday 1.30 void *),
859 MessageQueue *callback_response_q,
860 void *callback_ptr)
|
861 mday 1.20 {
|
862 mday 1.21 PEGASUS_ASSERT(op != 0 && callback != 0 );
|
863 mday 1.18
|
864 mday 1.21 // get the queue handle for the destination
865
|
866 mday 1.30 op->lock();
|
867 mday 1.32 op->_op_dest = MessageQueue::lookup(destination); // destination of this message
|
868 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
869 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
|
870 mday 1.30 // initialize the callback data
|
871 mday 1.32 op->_async_callback = callback; // callback function to be executed by recpt. of response
872 op->_callback_node = op; // the op node
873 op->_callback_response_q = callback_response_q; // the queue that will receive the response
874 op->_callback_ptr = callback_ptr; // user data for callback
875 op->_callback_request_q = this; // I am the originator of this request
|
876 mday 1.30
877 op->unlock();
878 if(op->_op_dest == 0)
879 return false;
|
880 mday 1.21
881 return _meta_dispatcher->route_async(op);
|
882 mday 1.18 }
883
884
|
885 mday 1.39 Boolean MessageQueueService::SendAsync(Message *msg,
886 Uint32 destination,
887 void (*callback)(Message *response,
888 void *handle,
889 void *parameter),
890 void *handle,
891 void *parameter)
892 {
893 if(msg == NULL)
894 return false;
895 if(callback == NULL)
896 return SendForget(msg);
897 AsyncOpNode *op = get_op();
|
898 mday 1.43 msg->dest = destination;
|
899 mday 1.39 if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
900 {
901 op->release();
902 return_op(op);
903 return false;
904 }
905 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
906 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
907 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
908 op->__async_callback = callback;
909 op->_callback_node = op;
910 op->_callback_handle = handle;
911 op->_callback_parameter = parameter;
912 op->_callback_response_q = this;
913
914
915 if( ! (msg->getMask() & message_mask::ha_async) )
916 {
917 AsyncLegacyOperationStart *wrapper =
918 new AsyncLegacyOperationStart(get_next_xid(),
919 op,
920 mday 1.39 destination,
921 msg,
922 destination);
923 }
924 else
925 {
926 op->_request.insert_first(msg);
927 (static_cast<AsyncMessage *>(msg))->op = op;
928 }
929
930 _callback.insert_last(op);
931 return _meta_dispatcher->route_async(op);
932 }
933
934
|
935 mday 1.18 Boolean MessageQueueService::SendForget(Message *msg)
936 {
937
|
938 mday 1.24
|
939 mday 1.18 AsyncOpNode *op = 0;
|
940 mday 1.22 Uint32 mask = msg->getMask();
941
942 if (mask & message_mask::ha_async)
|
943 mday 1.18 {
944 op = (static_cast<AsyncMessage *>(msg))->op ;
945 }
|
946 mday 1.22
|
947 mday 1.18 if( op == 0 )
|
948 mday 1.20 {
|
949 mday 1.18 op = get_op();
|
950 mday 1.20 op->_request.insert_first(msg);
|
951 mday 1.22 if (mask & message_mask::ha_async)
952 (static_cast<AsyncMessage *>(msg))->op = op;
|
953 mday 1.20 }
|
954 mday 1.30 op->_op_dest = MessageQueue::lookup(msg->dest);
|
955 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
956 mday 1.41 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
957 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
958 mday 1.22 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
959 mday 1.30 if ( op->_op_dest == 0 )
|
960 mday 1.39 {
961 op->release();
962 return_op(op);
|
963 mday 1.21 return false;
|
964 mday 1.39 }
|
965 mday 1.24
|
966 mday 1.18 // now see if the meta dispatcher will take it
|
967 mday 1.30 return _meta_dispatcher->route_async(op);
|
968 mday 1.18 }
|
969 mday 1.2
|
970 mday 1.1
|
971 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
972 mday 1.1 {
|
973 mday 1.4 if ( request == 0 )
974 return 0 ;
|
975 mday 1.5
976 Boolean destroy_op = false;
977
|
978 s.hills 1.80 if (request->op == 0)
|
979 mday 1.5 {
980 request->op = get_op();
|
981 mday 1.7 request->op->_request.insert_first(request);
|
982 mday 1.5 destroy_op = true;
983 }
|
984 mday 1.4
|
985 mday 1.33 request->block = false;
|
986 mday 1.35 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
|
987 mday 1.33 SendAsync(request->op,
988 request->dest,
|
989 mday 1.36 _sendwait_callback,
|
990 mday 1.33 this,
991 (void *)0);
|
992 mday 1.4
|
993 mday 1.33 request->op->_client_sem.wait();
|
994 mday 1.6 request->op->lock();
995 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
996 rpl->op = 0;
997 request->op->unlock();
998
|
999 mday 1.5 if( destroy_op == true)
1000 {
|
1001 mday 1.6 request->op->lock();
1002 request->op->_request.remove(request);
1003 request->op->_state |= ASYNC_OPSTATE_RELEASED;
1004 request->op->unlock();
1005 return_op(request->op);
|
1006 mday 1.7 request->op = 0;
|
1007 mday 1.5 }
1008 return rpl;
|
1009 mday 1.1 }
1010
1011
1012 Boolean MessageQueueService::register_service(String name,
1013 Uint32 capabilities,
1014 Uint32 mask)
1015
1016 {
1017 RegisterCimService *msg = new RegisterCimService(get_next_xid(),
|
1018 mday 1.5 0,
|
1019 mday 1.1 true,
1020 name,
1021 capabilities,
1022 mask,
1023 _queueId);
|
1024 mday 1.44 msg->dest = CIMOM_Q_ID;
1025
|
1026 mday 1.1 Boolean registered = false;
|
1027 mday 1.7 AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
|
1028 mday 1.1
|
1029 mday 1.2 if ( reply != 0 )
|
1030 mday 1.1 {
1031 if(reply->getMask() & message_mask:: ha_async)
1032 {
1033 if(reply->getMask() & message_mask::ha_reply)
1034 {
|
1035 mday 1.15 if(reply->result == async_results::OK ||
1036 reply->result == async_results::MODULE_ALREADY_REGISTERED )
|
1037 mday 1.1 registered = true;
1038 }
1039 }
1040
|
1041 mday 1.7 delete reply;
|
1042 mday 1.1 }
|
1043 mday 1.5 delete msg;
|
1044 mday 1.1 return registered;
1045 }
1046
1047 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1048 {
1049
1050
1051 UpdateCimService *msg = new UpdateCimService(get_next_xid(),
|
1052 mday 1.5 0,
|
1053 mday 1.1 true,
1054 _queueId,
1055 _capabilities,
1056 _mask);
1057 Boolean registered = false;
|
1058 mday 1.2
1059 AsyncMessage *reply = SendWait(msg);
1060 if (reply)
|
1061 mday 1.1 {
1062 if(reply->getMask() & message_mask:: ha_async)
1063 {
1064 if(reply->getMask() & message_mask::ha_reply)
1065 {
|
1066 mday 1.2 if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
|
1067 mday 1.1 registered = true;
1068 }
1069 }
1070 delete reply;
1071 }
|
1072 mday 1.5 delete msg;
|
1073 mday 1.1 return registered;
1074 }
1075
1076
1077 Boolean MessageQueueService::deregister_service(void)
1078 {
|
1079 mday 1.3
|
1080 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
1081 return true;
|
1082 mday 1.1 }
1083
1084
1085 void MessageQueueService::find_services(String name,
1086 Uint32 capabilities,
1087 Uint32 mask,
1088 Array<Uint32> *results)
1089 {
1090
1091 if( results == 0 )
1092 throw NullPointer();
|
1093 mday 1.5
|
1094 mday 1.1 results->clear();
1095
1096 FindServiceQueue *req =
1097 new FindServiceQueue(get_next_xid(),
|
1098 mday 1.5 0,
|
1099 mday 1.1 _queueId,
1100 true,
1101 name,
1102 capabilities,
1103 mask);
|
1104 mday 1.44
1105 req->dest = CIMOM_Q_ID;
|
1106 mday 1.1
|
1107 mday 1.2 AsyncMessage *reply = SendWait(req);
1108 if(reply)
|
1109 mday 1.1 {
1110 if( reply->getMask() & message_mask::ha_async)
1111 {
1112 if(reply->getMask() & message_mask::ha_reply)
1113 {
1114 if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1115 {
1116 if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1117 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1118 }
1119 }
1120 }
1121 delete reply;
1122 }
|
1123 mday 1.5 delete req;
|
1124 mday 1.1 return ;
1125 }
1126
1127 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1128 {
1129 if(result == 0)
1130 throw NullPointer();
1131
1132 EnumerateService *req
1133 = new EnumerateService(get_next_xid(),
|
1134 mday 1.5 0,
|
1135 mday 1.1 _queueId,
1136 true,
1137 queue);
1138
|
1139 mday 1.2 AsyncMessage *reply = SendWait(req);
|
1140 mday 1.1
|
1141 mday 1.2 if (reply)
|
1142 mday 1.1 {
1143 Boolean found = false;
1144
1145 if( reply->getMask() & message_mask::ha_async)
1146 {
1147 if(reply->getMask() & message_mask::ha_reply)
1148 {
1149 if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1150 {
1151 if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1152 {
1153 if( found == false)
1154 {
1155 found = true;
1156
1157 result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1158 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1159 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1160 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1161 }
1162 }
1163 mday 1.1 }
1164 }
1165 }
1166 delete reply;
1167 }
|
1168 mday 1.5 delete req;
1169
|
1170 mday 1.1 return;
1171 }
1172
1173 Uint32 MessageQueueService::get_next_xid(void)
1174 {
|
1175 mday 1.78 static Mutex _monitor;
1176 Uint32 value;
|
1177 a.arora 1.87 AutoMutex autoMut(_monitor);
|
1178 mday 1.1 _xid++;
|
1179 mday 1.78 value = _xid.value();
1180 return value;
1181
|
1182 mday 1.1 }
1183
1184 PEGASUS_NAMESPACE_END
|