1 karl 1.91 //%2005////////////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 karl 1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.82 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.91 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 mday 1.1 //
12 // Permission is hereby granted, free of charge, to any person obtaining a copy
13 // of this software and associated documentation files (the "Software"), to
14 // deal in the Software without restriction, including without limitation the
15 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
16 // sell copies of the Software, and to permit persons to whom the Software is
17 // furnished to do so, subject to the following conditions:
|
18 karl 1.82 //
|
19 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
20 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
21 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
22 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
23 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
24 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
25 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
26 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
27 //
28 //==============================================================================
29 //
30 // Author: Mike Day (mdday@us.ibm.com)
31 //
32 // Modified By:
|
33 a.arora 1.93 // Amit K Arora, IBM (amita@in.ibm.com) for Bug#1090,#2657
|
34 joyce.j 1.101 // Josephine Eskaline Joyce, IBM (jojustin@in.ibm.com) for Bug#3259
|
35 mday 1.1 //
36 //%/////////////////////////////////////////////////////////////////////////////
37
38 #include "MessageQueueService.h"
|
39 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
40 humberto 1.71 #include <Pegasus/Common/MessageLoader.h> //l10n
|
41 mday 1.1
42 PEGASUS_NAMESPACE_BEGIN
43
|
44 mday 1.15
45 cimom *MessageQueueService::_meta_dispatcher = 0;
46 AtomicInt MessageQueueService::_service_count = 0;
47 AtomicInt MessageQueueService::_xid(1);
|
48 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
49 mday 1.15
|
50 kumpf 1.99 static struct timeval deallocateWait = {300, 0};
|
51 mday 1.53
|
52 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
53 mday 1.53
54 DQueue<MessageQueueService> MessageQueueService::_polling_list(true);
55
|
56 kumpf 1.62 Thread* MessageQueueService::_polling_thread = 0;
|
57 mday 1.61
|
58 mday 1.69 ThreadPool *MessageQueueService::get_thread_pool(void)
59 {
60 return _thread_pool;
61 }
62
63 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::kill_idle_threads(void *parm)
|
64 mday 1.53 {
|
65 mday 1.69
66 static struct timeval now, last = {0,0};
|
67 mday 1.53 gettimeofday(&now, NULL);
|
68 mday 1.56 int dead_threads = 0;
|
69 mday 1.53
|
70 mday 1.70 if( now.tv_sec - last.tv_sec > 120 )
|
71 mday 1.53 {
72 gettimeofday(&last, NULL);
|
73 mday 1.56 try
74 {
|
75 kumpf 1.99 dead_threads = MessageQueueService::_thread_pool->cleanupIdleThreads();
|
76 mday 1.56 }
|
77 mday 1.69 catch(...)
|
78 mday 1.56 {
79
80 }
|
81 mday 1.53 }
|
82 jenny.yu 1.92
|
83 w.otsuka 1.94 #ifdef PEGASUS_POINTER_64BIT
|
84 jenny.yu 1.92 return (PEGASUS_THREAD_RETURN)(Uint64)dead_threads;
85 #elif PEGASUS_PLATFORM_AIX_RS_IBMCXX
86 return (PEGASUS_THREAD_RETURN)(unsigned long)dead_threads;
87 #else
88 return (PEGASUS_THREAD_RETURN)(Uint32)dead_threads;
89 #endif
|
90 mday 1.53 }
91
92 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(void *parm)
93 {
94 Thread *myself = reinterpret_cast<Thread *>(parm);
95 DQueue<MessageQueueService> *list = reinterpret_cast<DQueue<MessageQueueService> *>(myself->get_parm());
96 while ( _stop_polling.value() == 0 )
97 {
|
98 baggio.br 1.100 _polling_sem.wait();
99
|
100 mday 1.61 if(_stop_polling.value() != 0 )
101 {
|
102 kumpf 1.62 break;
|
103 mday 1.61 }
104
|
105 mday 1.53 list->lock();
106 MessageQueueService *service = list->next(0);
107 while(service != NULL)
108 {
109 if(service->_incoming.count() > 0 )
110 {
|
111 mday 1.55 _thread_pool->allocate_and_awaken(service, _req_proc);
|
112 mday 1.53 }
113 service = list->next(service);
114 }
115 list->unlock();
|
116 mday 1.69 if(_check_idle_flag.value() != 0 )
117 {
118 _check_idle_flag = 0;
|
119 kumpf 1.84
120 // If there are insufficent resources to run
121 // kill_idle_threads, then just return.
122 _thread_pool->allocate_and_awaken(service, kill_idle_threads);
|
123 mday 1.69 }
|
124 mday 1.53 }
125 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
126 return(0);
127 }
128
129
130 Semaphore MessageQueueService::_polling_sem(0);
131 AtomicInt MessageQueueService::_stop_polling(0);
|
132 mday 1.69 AtomicInt MessageQueueService::_check_idle_flag(0);
|
133 mday 1.53
|
134 mday 1.15
|
135 mday 1.1 MessageQueueService::MessageQueueService(const char *name,
136 Uint32 queueID,
137 Uint32 capabilities,
138 Uint32 mask)
|
139 mday 1.15 : Base(name, true, queueID),
|
140 mday 1.22
|
141 mday 1.1 _mask(mask),
|
142 mday 1.4 _die(0),
|
143 mday 1.41 _incoming(true, 0),
|
144 mday 1.39 _callback(true),
|
145 mday 1.7 _incoming_queue_shutdown(0),
|
146 mday 1.39 _callback_ready(0),
147 _req_thread(_req_proc, this, false),
148 _callback_thread(_callback_proc, this, false)
|
149 mday 1.53
|
150 mday 1.1 {
|
151 mday 1.60
|
152 mday 1.22 _capabilities = (capabilities | module_capabilities::async);
153
|
154 mday 1.1 _default_op_timeout.tv_sec = 30;
155 _default_op_timeout.tv_usec = 100;
|
156 mday 1.15
|
157 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
|
158 mday 1.15
159 if( _meta_dispatcher == 0 )
160 {
|
161 joyce.j 1.101 _stop_polling = 0;
|
162 mday 1.15 PEGASUS_ASSERT( _service_count.value() == 0 );
163 _meta_dispatcher = new cimom();
164 if (_meta_dispatcher == NULL )
165 {
|
166 a.arora 1.87 throw NullPointer();
|
167 mday 1.15 }
|
168 kumpf 1.99 _thread_pool =
169 new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
|
170 mday 1.15 }
171 _service_count++;
172
173 if( false == register_service(name, _capabilities, _mask) )
174 {
|
175 humberto 1.71 //l10n
176 //throw BindFailedException("MessageQueueService Base Unable to register with Meta Dispatcher");
177 MessageLoaderParms parms("Common.MessageQueueService.UNABLE_TO_REGISTER",
|
178 humberto 1.74 "MessageQueueService Base Unable to register with Meta Dispatcher");
|
179 humberto 1.71
180 throw BindFailedException(parms);
|
181 mday 1.15 }
182
|
183 mday 1.53 _polling_list.insert_last(this);
184
|
185 a.arora 1.87 // _meta_dispatcher_mutex.unlock(); //Bug#1090
|
186 mday 1.39 // _callback_thread.run();
187
|
188 mday 1.53 // _req_thread.run();
|
189 mday 1.1 }
190
|
191 mday 1.4
|
192 mday 1.1 MessageQueueService::~MessageQueueService(void)
193 {
194 _die = 1;
|
195 mday 1.76
196 if (_incoming_queue_shutdown.value() == 0 )
197 {
198 _shutdown_incoming_queue();
199 }
|
200 mday 1.39 _callback_ready.signal();
201
|
202 mday 1.15 {
|
203 a.arora 1.87 AutoMutex autoMut(_meta_dispatcher_mutex);
204 _service_count--;
205 if (_service_count.value() == 0 )
206 {
|
207 mday 1.76
|
208 mday 1.53 _stop_polling++;
209 _polling_sem.signal();
|
210 a.arora 1.93 if (_polling_thread) {
211 _polling_thread->join();
212 delete _polling_thread;
213 _polling_thread = 0;
214 }
|
215 mday 1.15 _meta_dispatcher->_shutdown_routed_queue();
216 delete _meta_dispatcher;
|
217 kumpf 1.45 _meta_dispatcher = 0;
|
218 mday 1.76
|
219 kumpf 1.65 delete _thread_pool;
220 _thread_pool = 0;
|
221 a.arora 1.87 }
222 } // mutex unlocks here
|
223 mday 1.53 _polling_list.remove(this);
|
224 konrad.r 1.72 // Clean up in case there are extra stuff on the queue.
225 while (_incoming.count())
226 {
227 delete _incoming.remove_first();
228 }
|
229 mday 1.53 }
|
230 mday 1.1
|
231 mday 1.7 void MessageQueueService::_shutdown_incoming_queue(void)
232 {
233
|
234 mday 1.76
|
235 mday 1.9 if (_incoming_queue_shutdown.value() > 0 )
236 return ;
|
237 mday 1.8 AsyncIoctl *msg = new AsyncIoctl(get_next_xid(),
238 0,
239 _queueId,
240 _queueId,
241 true,
242 AsyncIoctl::IO_CLOSE,
243 0,
244 0);
|
245 mday 1.9
|
246 mday 1.8 msg->op = get_op();
|
247 mday 1.41 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
248 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
249 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
250 mday 1.32 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
251 mday 1.41
|
252 mday 1.32 msg->op->_op_dest = this;
|
253 mday 1.8 msg->op->_request.insert_first(msg);
254
255 _incoming.insert_last_wait(msg->op);
|
256 mday 1.32
|
257 mday 1.7 }
258
|
259 mday 1.22
260
|
261 kumpf 1.85 void MessageQueueService::enqueue(Message *msg)
|
262 mday 1.22 {
|
263 kumpf 1.28 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
264
|
265 mday 1.22 Base::enqueue(msg);
|
266 kumpf 1.28
267 PEG_METHOD_EXIT();
|
268 mday 1.22 }
269
270
|
271 mday 1.39 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_callback_proc(void *parm)
272 {
273 Thread *myself = reinterpret_cast<Thread *>(parm);
274 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(myself->get_parm());
275 AsyncOpNode *operation = 0;
276
277 while ( service->_die.value() == 0 )
278 {
|
279 baggio.br 1.100 service->_callback_ready.wait();
|
280 mday 1.39
281 service->_callback.lock();
282 operation = service->_callback.next(0);
283 while( operation != NULL)
284 {
285 if( ASYNC_OPSTATE_COMPLETE & operation->read_state())
286 {
287 operation = service->_callback.remove_no_lock(operation);
288 PEGASUS_ASSERT(operation != NULL);
289 operation->_thread_ptr = myself;
290 operation->_service_ptr = service;
291 service->_handle_async_callback(operation);
292 break;
293 }
294 operation = service->_callback.next(operation);
295 }
296 service->_callback.unlock();
297 }
298 myself->exit_self( (PEGASUS_THREAD_RETURN) 1 );
299 return(0);
300 }
301 mday 1.39
|
302 mday 1.22
|
303 mday 1.5 PEGASUS_THREAD_RETURN PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(void * parm)
|
304 mday 1.1 {
|
305 mday 1.53 MessageQueueService *service = reinterpret_cast<MessageQueueService *>(parm);
|
306 brian.campbell 1.90
307 if ( service->_die.value() != 0)
308 return (0);
309
|
310 mday 1.5 // pull messages off the incoming queue and dispatch them. then
311 // check pending messages that are non-blocking
|
312 mday 1.7 AsyncOpNode *operation = 0;
|
313 brian.campbell 1.90
314 // many operations may have been queued.
315 do
|
316 mday 1.41 {
|
317 brian.campbell 1.90 try
318 {
319 operation = service->_incoming.remove_first();
320 }
321 catch(ListClosed &)
322 {
323 break;
324 }
325
326 if (operation)
327 {
328 operation->_service_ptr = service;
329 service->_handle_incoming_operation(operation);
330 }
331 } while (operation);
|
332 mday 1.44
|
333 mday 1.5 return(0);
|
334 mday 1.1 }
335
|
336 mday 1.43 Uint32 MessageQueueService::get_pending_callback_count(void)
337 {
338 return _callback.count();
339 }
340
341
342
|
343 mday 1.33 void MessageQueueService::_sendwait_callback(AsyncOpNode *op,
344 MessageQueue *q,
345 void *parm)
346 {
347 op->_client_sem.signal();
348 }
349
|
350 mday 1.30
351 // callback function is responsible for cleaning up all resources
352 // including op, op->_callback_node, and op->_callback_ptr
|
353 mday 1.22 void MessageQueueService::_handle_async_callback(AsyncOpNode *op)
354 {
|
355 mday 1.39 if( op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK )
356 {
357
358 Message *msg = op->get_request();
359 if( msg && ( msg->getMask() & message_mask::ha_async))
360 {
361 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_START )
362 {
363 AsyncLegacyOperationStart *wrapper =
364 static_cast<AsyncLegacyOperationStart *>(msg);
365 msg = wrapper->get_action();
366 delete wrapper;
367 }
368 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
369 {
370 AsyncModuleOperationStart *wrapper =
371 static_cast<AsyncModuleOperationStart *>(msg);
372 msg = wrapper->get_action();
373 delete wrapper;
374 }
375 else if (msg->getType() == async_messages::ASYNC_OP_START)
376 mday 1.39 {
377 AsyncOperationStart *wrapper =
378 static_cast<AsyncOperationStart *>(msg);
379 msg = wrapper->get_action();
380 delete wrapper;
381 }
382 delete msg;
383 }
384
385 msg = op->get_response();
386 if( msg && ( msg->getMask() & message_mask::ha_async))
387 {
388 if(msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT )
389 {
390 AsyncLegacyOperationResult *wrapper =
391 static_cast<AsyncLegacyOperationResult *>(msg);
392 msg = wrapper->get_result();
393 delete wrapper;
394 }
395 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
396 {
397 mday 1.39 AsyncModuleOperationResult *wrapper =
398 static_cast<AsyncModuleOperationResult *>(msg);
399 msg = wrapper->get_result();
400 delete wrapper;
401 }
402 }
403 void (*callback)(Message *, void *, void *) = op->__async_callback;
404 void *handle = op->_callback_handle;
405 void *parm = op->_callback_parameter;
406 op->release();
407 return_op(op);
408 callback(msg, handle, parm);
409 }
410 else if( op->_flags & ASYNC_OPFLAGS_CALLBACK )
411 {
412 // note that _callback_node may be different from op
413 // op->_callback_response_q is a "this" pointer we can use for
414 // static callback methods
415 op->_async_callback(op->_callback_node, op->_callback_response_q, op->_callback_ptr);
416 }
|
417 mday 1.22 }
418
|
419 mday 1.1
|
420 mday 1.34 void MessageQueueService::_handle_incoming_operation(AsyncOpNode *operation)
421 // Thread *thread,
422 // MessageQueue *queue)
|
423 mday 1.1 {
|
424 mday 1.5 if ( operation != 0 )
425 {
|
426 mday 1.29
|
427 mday 1.22 // ATTN: optimization
428 // << Tue Feb 19 14:10:38 2002 mdd >>
|
429 mday 1.6 operation->lock();
|
430 mday 1.29
|
431 mday 1.6 Message *rq = operation->_request.next(0);
|
432 mday 1.29
|
433 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
434 // move this to the bottom of the loop when the majority of
435 // messages become async messages.
436
|
437 mday 1.18 // divert legacy messages to handleEnqueue
|
438 mday 1.29 if ((rq != 0) && (!(rq->getMask() & message_mask::ha_async)))
|
439 mday 1.18 {
440 rq = operation->_request.remove_first() ;
441 operation->unlock();
442 // delete the op node
|
443 mday 1.39 operation->release();
444 return_op( operation);
|
445 mday 1.24
|
446 mday 1.26 handleEnqueue(rq);
|
447 mday 1.18 return;
448 }
449
|
450 mday 1.39 if ( (operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
451 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
|
452 mday 1.29 (operation->_state & ASYNC_OPSTATE_COMPLETE))
453 {
|
454 mday 1.49
|
455 mday 1.29 operation->unlock();
456 _handle_async_callback(operation);
457 }
458 else
459 {
460 PEGASUS_ASSERT(rq != 0 );
461 operation->unlock();
462 _handle_async_request(static_cast<AsyncRequest *>(rq));
463 }
|
464 mday 1.5 }
465 return;
|
466 mday 1.1 }
467
468 void MessageQueueService::_handle_async_request(AsyncRequest *req)
469 {
|
470 mday 1.4 if ( req != 0 )
471 {
472 req->op->processing();
|
473 mday 1.1
|
474 mday 1.4 Uint32 type = req->getType();
475 if( type == async_messages::HEARTBEAT )
476 handle_heartbeat_request(req);
477 else if (type == async_messages::IOCTL)
478 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
479 else if (type == async_messages::CIMSERVICE_START)
480 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
481 else if (type == async_messages::CIMSERVICE_STOP)
482 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
483 else if (type == async_messages::CIMSERVICE_PAUSE)
484 handle_CimServicePause(static_cast<CimServicePause *>(req));
485 else if (type == async_messages::CIMSERVICE_RESUME)
486 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
487 else if ( type == async_messages::ASYNC_OP_START)
488 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
489 else
490 {
491 // we don't handle this request message
492 _make_response(req, async_results::CIM_NAK );
493 }
|
494 mday 1.1 }
495 }
496
|
497 mday 1.17
498 Boolean MessageQueueService::_enqueueResponse(
499 Message* request,
500 Message* response)
|
501 mday 1.18
|
502 mday 1.17 {
|
503 w.white 1.102
504 STAT_COPYDISPATCHER
505
|
506 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
507 "MessageQueueService::_enqueueResponse");
|
508 mday 1.25
509 if( request->getMask() & message_mask::ha_async)
510 {
511 if (response->getMask() & message_mask::ha_async )
512 {
513 _completeAsyncResponse(static_cast<AsyncRequest *>(request),
514 static_cast<AsyncReply *>(response),
515 ASYNC_OPSTATE_COMPLETE, 0 );
|
516 kumpf 1.37 PEG_METHOD_EXIT();
|
517 mday 1.25 return true;
518 }
519 }
520
|
521 mday 1.17 if(request->_async != 0 )
522 {
523 Uint32 mask = request->_async->getMask();
|
524 mday 1.18 PEGASUS_ASSERT(mask & (message_mask::ha_async | message_mask::ha_request ));
525
526 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
527 AsyncOpNode *op = async->op;
528 request->_async = 0;
|
529 mday 1.63 // the legacy request is going to be deleted by its handler
|
530 mday 1.27 // remove it from the op node
|
531 mday 1.63
532 static_cast<AsyncLegacyOperationStart *>(async)->get_action();
|
533 mday 1.18
|
534 mday 1.63
|
535 mday 1.18 AsyncLegacyOperationResult *async_result =
536 new AsyncLegacyOperationResult(
537 async->getKey(),
538 async->getRouting(),
539 op,
540 response);
541 _completeAsyncResponse(async,
542 async_result,
543 ASYNC_OPSTATE_COMPLETE,
544 0);
|
545 kumpf 1.37 PEG_METHOD_EXIT();
|
546 mday 1.18 return true;
|
547 mday 1.17 }
|
548 mday 1.18
549 // ensure that the destination queue is in response->dest
|
550 kumpf 1.37 PEG_METHOD_EXIT();
|
551 mday 1.24 return SendForget(response);
|
552 mday 1.18
|
553 mday 1.17 }
554
|
555 mday 1.18 void MessageQueueService::_make_response(Message *req, Uint32 code)
|
556 mday 1.1 {
|
557 mday 1.19 cimom::_make_response(req, code);
|
558 mday 1.1 }
559
560
|
561 mday 1.5 void MessageQueueService::_completeAsyncResponse(AsyncRequest *request,
562 AsyncReply *reply,
563 Uint32 state,
564 Uint32 flag)
565 {
|
566 kumpf 1.37 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
567 "MessageQueueService::_completeAsyncResponse");
568
|
569 mday 1.19 cimom::_completeAsyncResponse(request, reply, state, flag);
|
570 kumpf 1.37
571 PEG_METHOD_EXIT();
|
572 mday 1.5 }
573
574
|
575 mday 1.32 void MessageQueueService::_complete_op_node(AsyncOpNode *op,
576 Uint32 state,
577 Uint32 flag,
578 Uint32 code)
579 {
580 cimom::_complete_op_node(op, state, flag, code);
581 }
582
|
583 mday 1.5
584 Boolean MessageQueueService::accept_async(AsyncOpNode *op)
585 {
|
586 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
587 return false;
|
588 a.arora 1.93 if (_polling_thread == NULL) {
589 _polling_thread = new Thread(polling_routine,
590 reinterpret_cast<void *>(&_polling_list),
591 false);
592 while (!_polling_thread->run())
593 {
594 pegasus_yield();
595 }
596 }
|
597 mday 1.20 // ATTN optimization remove the message checking altogether in the base
598 // << Mon Feb 18 14:02:20 2002 mdd >>
|
599 mday 1.6 op->lock();
600 Message *rq = op->_request.next(0);
|
601 mday 1.20 Message *rp = op->_response.next(0);
|
602 mday 1.6 op->unlock();
603
|
604 mday 1.22 if( (rq != 0 && (true == messageOK(rq))) || (rp != 0 && ( true == messageOK(rp) )) &&
605 _die.value() == 0 )
|
606 mday 1.5 {
|
607 mday 1.6 _incoming.insert_last_wait(op);
|
608 mday 1.53 _polling_sem.signal();
|
609 mday 1.5 return true;
610 }
611 return false;
612 }
613
614 Boolean MessageQueueService::messageOK(const Message *msg)
615 {
|
616 mday 1.8 if (_incoming_queue_shutdown.value() > 0 )
617 return false;
|
618 mday 1.18 return true;
619 }
620
|
621 mday 1.1 void MessageQueueService::handle_heartbeat_request(AsyncRequest *req)
622 {
623 // default action is to echo a heartbeat response
624
625 AsyncReply *reply =
626 new AsyncReply(async_messages::HEARTBEAT,
627 req->getKey(),
628 req->getRouting(),
629 0,
630 req->op,
631 async_results::OK,
632 req->resp,
633 false);
|
634 mday 1.4 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0 );
|
635 mday 1.1 }
636
637
638 void MessageQueueService::handle_heartbeat_reply(AsyncReply *rep)
639 {
640 ;
641 }
642
643 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl *req)
644 {
|
645 mday 1.8
646 switch( req->ctl )
647 {
648 case AsyncIoctl::IO_CLOSE:
649 {
|
650 mday 1.76
|
651 mday 1.34 MessageQueueService *service = static_cast<MessageQueueService *>(req->op->_service_ptr);
|
652 kumpf 1.81
653 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
654 mday 1.79 PEGASUS_STD(cout) << service->getQueueName() << " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
|
655 kumpf 1.81 #endif
|
656 mday 1.8
|
657 mday 1.76 // respond to this message. this is fire and forget, so we don't need to delete anything.
658 // this takes care of two problems that were being found
659 // << Thu Oct 9 10:52:48 2003 mdd >>
660 _make_response(req, async_results::OK);
|
661 mday 1.8 // ensure we do not accept any further messages
662
663 // ensure we don't recurse on IO_CLOSE
664 if( _incoming_queue_shutdown.value() > 0 )
665 break;
666
667 // set the closing flag
668 service->_incoming_queue_shutdown = 1;
669 // empty out the queue
670 while( 1 )
671 {
672 AsyncOpNode *operation;
673 try
674 {
675 operation = service->_incoming.remove_first();
676 }
677 catch(IPCException & )
678 {
679 break;
680 }
681 if( operation )
682 mday 1.8 {
|
683 mday 1.34 operation->_service_ptr = service;
684 service->_handle_incoming_operation(operation);
|
685 mday 1.8 }
686 else
687 break;
688 } // message processing loop
|
689 mday 1.76
|
690 mday 1.8 // shutdown the AsyncDQueue
691 service->_incoming.shutdown_queue();
692 return;
693 }
694
695 default:
696 _make_response(req, async_results::CIM_NAK);
697 }
|
698 mday 1.1 }
|
699 mday 1.8
|
700 mday 1.1 void MessageQueueService::handle_CimServiceStart(CimServiceStart *req)
701 {
|
702 mday 1.79
|
703 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
704 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received START" << PEGASUS_STD(endl);
|
705 kumpf 1.81 #endif
|
706 mday 1.79
|
707 mday 1.10 // clear the stoped bit and update
|
708 mday 1.13 _capabilities &= (~(module_capabilities::stopped));
|
709 mday 1.10 _make_response(req, async_results::OK);
710 // now tell the meta dispatcher we are stopped
711 update_service(_capabilities, _mask);
712
|
713 mday 1.1 }
714 void MessageQueueService::handle_CimServiceStop(CimServiceStop *req)
715 {
|
716 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
717 mday 1.79 PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
|
718 kumpf 1.81 #endif
|
719 mday 1.10 // set the stopeed bit and update
720 _capabilities |= module_capabilities::stopped;
721 _make_response(req, async_results::CIM_STOPPED);
722 // now tell the meta dispatcher we are stopped
723 update_service(_capabilities, _mask);
724
|
725 mday 1.1 }
726 void MessageQueueService::handle_CimServicePause(CimServicePause *req)
727 {
|
728 mday 1.10 // set the paused bit and update
|
729 mday 1.13 _capabilities |= module_capabilities::paused;
|
730 mday 1.11 update_service(_capabilities, _mask);
|
731 mday 1.10 _make_response(req, async_results::CIM_PAUSED);
732 // now tell the meta dispatcher we are stopped
|
733 mday 1.1 }
734 void MessageQueueService::handle_CimServiceResume(CimServiceResume *req)
735 {
|
736 mday 1.10 // clear the paused bit and update
|
737 mday 1.13 _capabilities &= (~(module_capabilities::paused));
|
738 mday 1.11 update_service(_capabilities, _mask);
|
739 mday 1.10 _make_response(req, async_results::OK);
740 // now tell the meta dispatcher we are stopped
|
741 mday 1.1 }
742
743 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart *req)
744 {
745 _make_response(req, async_results::CIM_NAK);
746 }
747
748 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult *req)
749 {
|
750 mday 1.14 ;
751 }
752
|
753 mday 1.10
|
754 mday 1.14 void MessageQueueService::handle_AsyncLegacyOperationStart(AsyncLegacyOperationStart *req)
755 {
756 // remove the legacy message from the request and enqueue it to its destination
757 Uint32 result = async_results::CIM_NAK;
758
|
759 mday 1.25 Message *legacy = req->_act;
|
760 mday 1.14 if ( legacy != 0 )
761 {
|
762 mday 1.25 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
|
763 mday 1.14 if( queue != 0 )
764 {
|
765 mday 1.25 if(queue->isAsync() == true )
766 {
767 (static_cast<MessageQueueService *>(queue))->handleEnqueue(legacy);
768 }
769 else
770 {
771 // Enqueue the response:
772 queue->enqueue(req->get_action());
773 }
774
|
775 mday 1.14 result = async_results::OK;
776 }
777 }
778 _make_response(req, result);
779 }
780
781 void MessageQueueService::handle_AsyncLegacyOperationResult(AsyncLegacyOperationResult *rep)
782 {
783 ;
|
784 mday 1.1 }
785
786 AsyncOpNode *MessageQueueService::get_op(void)
787 {
|
788 mday 1.4 AsyncOpNode *op = new AsyncOpNode();
|
789 mday 1.1
|
790 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
791 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
792 mday 1.4
|
793 mday 1.1 return op;
794 }
795
796 void MessageQueueService::return_op(AsyncOpNode *op)
797 {
798 PEGASUS_ASSERT(op->read_state() & ASYNC_OPSTATE_RELEASED );
|
799 mday 1.4 delete op;
|
800 mday 1.1 }
801
|
802 mday 1.18
|
803 mday 1.29 Boolean MessageQueueService::ForwardOp(AsyncOpNode *op,
804 Uint32 destination)
805 {
806 PEGASUS_ASSERT(op != 0 );
|
807 mday 1.30 op->lock();
808 op->_op_dest = MessageQueue::lookup(destination);
|
809 mday 1.29 op->_flags |= (ASYNC_OPFLAGS_FIRE_AND_FORGET | ASYNC_OPFLAGS_FORWARD);
810 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK);
|
811 mday 1.30 op->unlock();
812 if ( op->_op_dest == 0 )
813 return false;
814
|
815 mday 1.29 return _meta_dispatcher->route_async(op);
816 }
817
|
818 mday 1.39
|
819 mday 1.21 Boolean MessageQueueService::SendAsync(AsyncOpNode *op,
820 Uint32 destination,
|
821 mday 1.18 void (*callback)(AsyncOpNode *,
|
822 mday 1.32 MessageQueue *,
|
823 mday 1.30 void *),
824 MessageQueue *callback_response_q,
825 void *callback_ptr)
|
826 mday 1.20 {
|
827 mday 1.21 PEGASUS_ASSERT(op != 0 && callback != 0 );
|
828 mday 1.18
|
829 mday 1.21 // get the queue handle for the destination
830
|
831 mday 1.30 op->lock();
|
832 mday 1.32 op->_op_dest = MessageQueue::lookup(destination); // destination of this message
|
833 mday 1.22 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
834 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
|
835 mday 1.30 // initialize the callback data
|
836 mday 1.32 op->_async_callback = callback; // callback function to be executed by recpt. of response
837 op->_callback_node = op; // the op node
838 op->_callback_response_q = callback_response_q; // the queue that will receive the response
839 op->_callback_ptr = callback_ptr; // user data for callback
840 op->_callback_request_q = this; // I am the originator of this request
|
841 mday 1.30
842 op->unlock();
843 if(op->_op_dest == 0)
844 return false;
|
845 mday 1.21
846 return _meta_dispatcher->route_async(op);
|
847 mday 1.18 }
848
849
|
850 mday 1.39 Boolean MessageQueueService::SendAsync(Message *msg,
851 Uint32 destination,
852 void (*callback)(Message *response,
853 void *handle,
854 void *parameter),
855 void *handle,
856 void *parameter)
857 {
858 if(msg == NULL)
859 return false;
860 if(callback == NULL)
861 return SendForget(msg);
862 AsyncOpNode *op = get_op();
|
863 mday 1.43 msg->dest = destination;
|
864 mday 1.39 if( NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
865 {
866 op->release();
867 return_op(op);
868 return false;
869 }
870 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
871 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
872 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
873 op->__async_callback = callback;
874 op->_callback_node = op;
875 op->_callback_handle = handle;
876 op->_callback_parameter = parameter;
877 op->_callback_response_q = this;
878
879
880 if( ! (msg->getMask() & message_mask::ha_async) )
881 {
882 AsyncLegacyOperationStart *wrapper =
883 new AsyncLegacyOperationStart(get_next_xid(),
884 op,
885 mday 1.39 destination,
886 msg,
887 destination);
888 }
889 else
890 {
891 op->_request.insert_first(msg);
892 (static_cast<AsyncMessage *>(msg))->op = op;
893 }
894
895 _callback.insert_last(op);
896 return _meta_dispatcher->route_async(op);
897 }
898
899
|
900 mday 1.18 Boolean MessageQueueService::SendForget(Message *msg)
901 {
902
|
903 mday 1.24
|
904 mday 1.18 AsyncOpNode *op = 0;
|
905 mday 1.22 Uint32 mask = msg->getMask();
906
907 if (mask & message_mask::ha_async)
|
908 mday 1.18 {
909 op = (static_cast<AsyncMessage *>(msg))->op ;
910 }
|
911 mday 1.22
|
912 mday 1.18 if( op == 0 )
|
913 mday 1.20 {
|
914 mday 1.18 op = get_op();
|
915 mday 1.20 op->_request.insert_first(msg);
|
916 mday 1.22 if (mask & message_mask::ha_async)
917 (static_cast<AsyncMessage *>(msg))->op = op;
|
918 mday 1.20 }
|
919 mday 1.30 op->_op_dest = MessageQueue::lookup(msg->dest);
|
920 mday 1.22 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
|
921 mday 1.41 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
922 | ASYNC_OPFLAGS_SIMPLE_STATUS);
|
923 mday 1.22 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
924 mday 1.30 if ( op->_op_dest == 0 )
|
925 mday 1.39 {
926 op->release();
927 return_op(op);
|
928 mday 1.21 return false;
|
929 mday 1.39 }
|
930 mday 1.24
|
931 mday 1.18 // now see if the meta dispatcher will take it
|
932 mday 1.30 return _meta_dispatcher->route_async(op);
|
933 mday 1.18 }
|
934 mday 1.2
|
935 mday 1.1
|
936 mday 1.4 AsyncReply *MessageQueueService::SendWait(AsyncRequest *request)
|
937 mday 1.1 {
|
938 mday 1.4 if ( request == 0 )
939 return 0 ;
|
940 mday 1.5
941 Boolean destroy_op = false;
942
|
943 s.hills 1.80 if (request->op == 0)
|
944 mday 1.5 {
945 request->op = get_op();
|
946 mday 1.7 request->op->_request.insert_first(request);
|
947 mday 1.5 destroy_op = true;
948 }
|
949 mday 1.4
|
950 mday 1.33 request->block = false;
|
951 mday 1.35 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
|
952 mday 1.33 SendAsync(request->op,
953 request->dest,
|
954 mday 1.36 _sendwait_callback,
|
955 mday 1.33 this,
956 (void *)0);
|
957 mday 1.4
|
958 baggio.br 1.100 request->op->_client_sem.wait();
959
|
960 mday 1.6 request->op->lock();
961 AsyncReply * rpl = static_cast<AsyncReply *>(request->op->_response.remove_first());
962 rpl->op = 0;
963 request->op->unlock();
964
|
965 mday 1.5 if( destroy_op == true)
966 {
|
967 mday 1.6 request->op->lock();
968 request->op->_request.remove(request);
969 request->op->_state |= ASYNC_OPSTATE_RELEASED;
970 request->op->unlock();
971 return_op(request->op);
|
972 mday 1.7 request->op = 0;
|
973 mday 1.5 }
974 return rpl;
|
975 mday 1.1 }
976
977
978 Boolean MessageQueueService::register_service(String name,
979 Uint32 capabilities,
980 Uint32 mask)
981
982 {
983 RegisterCimService *msg = new RegisterCimService(get_next_xid(),
|
984 mday 1.5 0,
|
985 mday 1.1 true,
986 name,
987 capabilities,
988 mask,
989 _queueId);
|
990 mday 1.44 msg->dest = CIMOM_Q_ID;
991
|
992 mday 1.1 Boolean registered = false;
|
993 mday 1.7 AsyncReply *reply = static_cast<AsyncReply *>(SendWait( msg ));
|
994 mday 1.1
|
995 mday 1.2 if ( reply != 0 )
|
996 mday 1.1 {
997 if(reply->getMask() & message_mask:: ha_async)
998 {
999 if(reply->getMask() & message_mask::ha_reply)
1000 {
|
1001 mday 1.15 if(reply->result == async_results::OK ||
1002 reply->result == async_results::MODULE_ALREADY_REGISTERED )
|
1003 mday 1.1 registered = true;
1004 }
1005 }
1006
|
1007 mday 1.7 delete reply;
|
1008 mday 1.1 }
|
1009 mday 1.5 delete msg;
|
1010 mday 1.1 return registered;
1011 }
1012
1013 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1014 {
1015
1016
1017 UpdateCimService *msg = new UpdateCimService(get_next_xid(),
|
1018 mday 1.5 0,
|
1019 mday 1.1 true,
1020 _queueId,
1021 _capabilities,
1022 _mask);
1023 Boolean registered = false;
|
1024 mday 1.2
1025 AsyncMessage *reply = SendWait(msg);
1026 if (reply)
|
1027 mday 1.1 {
1028 if(reply->getMask() & message_mask:: ha_async)
1029 {
1030 if(reply->getMask() & message_mask::ha_reply)
1031 {
|
1032 mday 1.2 if(static_cast<AsyncReply *>(reply)->result == async_results::OK)
|
1033 mday 1.1 registered = true;
1034 }
1035 }
1036 delete reply;
1037 }
|
1038 mday 1.5 delete msg;
|
1039 mday 1.1 return registered;
1040 }
1041
1042
1043 Boolean MessageQueueService::deregister_service(void)
1044 {
|
1045 mday 1.3
|
1046 mday 1.5 _meta_dispatcher->deregister_module(_queueId);
1047 return true;
|
1048 mday 1.1 }
1049
1050
1051 void MessageQueueService::find_services(String name,
1052 Uint32 capabilities,
1053 Uint32 mask,
1054 Array<Uint32> *results)
1055 {
1056
1057 if( results == 0 )
1058 throw NullPointer();
|
1059 mday 1.5
|
1060 mday 1.1 results->clear();
1061
1062 FindServiceQueue *req =
1063 new FindServiceQueue(get_next_xid(),
|
1064 mday 1.5 0,
|
1065 mday 1.1 _queueId,
1066 true,
1067 name,
1068 capabilities,
1069 mask);
|
1070 mday 1.44
1071 req->dest = CIMOM_Q_ID;
|
1072 mday 1.1
|
1073 mday 1.2 AsyncMessage *reply = SendWait(req);
1074 if(reply)
|
1075 mday 1.1 {
1076 if( reply->getMask() & message_mask::ha_async)
1077 {
1078 if(reply->getMask() & message_mask::ha_reply)
1079 {
1080 if(reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1081 {
1082 if( (static_cast<FindServiceQueueResult *>(reply))->result == async_results::OK )
1083 *results = (static_cast<FindServiceQueueResult *>(reply))->qids;
1084 }
1085 }
1086 }
1087 delete reply;
1088 }
|
1089 mday 1.5 delete req;
|
1090 mday 1.1 return ;
1091 }
1092
1093 void MessageQueueService::enumerate_service(Uint32 queue, message_module *result)
1094 {
1095 if(result == 0)
1096 throw NullPointer();
1097
1098 EnumerateService *req
1099 = new EnumerateService(get_next_xid(),
|
1100 mday 1.5 0,
|
1101 mday 1.1 _queueId,
1102 true,
1103 queue);
1104
|
1105 mday 1.2 AsyncMessage *reply = SendWait(req);
|
1106 mday 1.1
|
1107 mday 1.2 if (reply)
|
1108 mday 1.1 {
1109 Boolean found = false;
1110
1111 if( reply->getMask() & message_mask::ha_async)
1112 {
1113 if(reply->getMask() & message_mask::ha_reply)
1114 {
1115 if(reply->getType() == async_messages::ENUMERATE_SERVICE_RESULT)
1116 {
1117 if( (static_cast<EnumerateServiceResponse *>(reply))->result == async_results::OK )
1118 {
1119 if( found == false)
1120 {
1121 found = true;
1122
1123 result->put_name( (static_cast<EnumerateServiceResponse *>(reply))->name);
1124 result->put_capabilities((static_cast<EnumerateServiceResponse *>(reply))->capabilities);
1125 result->put_mask((static_cast<EnumerateServiceResponse *>(reply))->mask);
1126 result->put_queue((static_cast<EnumerateServiceResponse *>(reply))->qid);
1127 }
1128 }
1129 mday 1.1 }
1130 }
1131 }
1132 delete reply;
1133 }
|
1134 mday 1.5 delete req;
1135
|
1136 mday 1.1 return;
1137 }
1138
1139 Uint32 MessageQueueService::get_next_xid(void)
1140 {
|
1141 mday 1.78 static Mutex _monitor;
1142 Uint32 value;
|
1143 a.arora 1.87 AutoMutex autoMut(_monitor);
|
1144 mday 1.1 _xid++;
|
1145 mday 1.78 value = _xid.value();
1146 return value;
1147
|
1148 mday 1.1 }
1149
1150 PEGASUS_NAMESPACE_END
|