1 karl 1.119 //%2006////////////////////////////////////////////////////////////////////////
|
2 mday 1.1 //
|
3 karl 1.88 // Copyright (c) 2000, 2001, 2002 BMC Software; Hewlett-Packard Development
4 // Company, L.P.; IBM Corp.; The Open Group; Tivoli Systems.
5 // Copyright (c) 2003 BMC Software; Hewlett-Packard Development Company, L.P.;
|
6 karl 1.82 // IBM Corp.; EMC Corporation, The Open Group.
|
7 karl 1.88 // Copyright (c) 2004 BMC Software; Hewlett-Packard Development Company, L.P.;
8 // IBM Corp.; EMC Corporation; VERITAS Software Corporation; The Open Group.
|
9 karl 1.91 // Copyright (c) 2005 Hewlett-Packard Development Company, L.P.; IBM Corp.;
10 // EMC Corporation; VERITAS Software Corporation; The Open Group.
|
11 karl 1.119 // Copyright (c) 2006 Hewlett-Packard Development Company, L.P.; IBM Corp.;
12 // EMC Corporation; Symantec Corporation; The Open Group.
|
13 mday 1.1 //
14 // Permission is hereby granted, free of charge, to any person obtaining a copy
15 // of this software and associated documentation files (the "Software"), to
16 // deal in the Software without restriction, including without limitation the
17 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
18 // sell copies of the Software, and to permit persons to whom the Software is
19 // furnished to do so, subject to the following conditions:
|
20 karl 1.82 //
|
21 mday 1.1 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
22 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
23 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
24 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
25 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
26 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
27 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
28 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
29 //
30 //==============================================================================
31 //
32 //%/////////////////////////////////////////////////////////////////////////////
33
34 #include "MessageQueueService.h"
|
35 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
36 kumpf 1.126 #include <Pegasus/Common/MessageLoader.h>
|
37 mday 1.1
38 PEGASUS_NAMESPACE_BEGIN
39
|
40 mday 1.15 cimom *MessageQueueService::_meta_dispatcher = 0;
|
41 mike 1.118 AtomicInt MessageQueueService::_service_count(0);
|
42 mday 1.38 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
43 mday 1.15
|
44 kumpf 1.104 static struct timeval deallocateWait = {300, 0};
|
45 mday 1.53
|
46 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
47 mday 1.53
|
48 mike 1.120 MessageQueueService::PollingList* MessageQueueService::_polling_list;
49 Mutex MessageQueueService::_polling_list_mutex;
|
50 mday 1.53
|
51 kumpf 1.62 Thread* MessageQueueService::_polling_thread = 0;
|
52 mday 1.61
|
53 kumpf 1.104 ThreadPool *MessageQueueService::get_thread_pool()
|
54 mday 1.69 {
55 return _thread_pool;
56 }
|
57 kumpf 1.117
|
58 jim.wunderlich 1.110 //
|
59 kumpf 1.117 // MAX_THREADS_PER_SVC_QUEUE
|
60 jim.wunderlich 1.110 //
61 // JR Wunderlich Jun 6, 2005
62 //
63
|
64 kumpf 1.131 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
|
65 jim.wunderlich 1.114 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
|
66 jim.wunderlich 1.110
|
67 kumpf 1.117 #ifndef MAX_THREADS_PER_SVC_QUEUE
68 # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
69 #endif
70
|
71 jim.wunderlich 1.110 Uint32 max_threads_per_svc_queue;
|
72 mday 1.69
|
73 kumpf 1.131 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(
74 void* parm)
|
75 mday 1.53 {
|
76 kumpf 1.131 Thread *myself = reinterpret_cast<Thread *>(parm);
77 List<MessageQueueService, Mutex> *list =
78 reinterpret_cast<List<MessageQueueService, Mutex>*>(myself->get_parm());
79
80 while (_stop_polling.get() == 0)
81 {
82 _polling_sem.wait();
83
84 if (_stop_polling.get() != 0)
85 {
86 break;
87 }
88
89 // The polling_routine thread must hold the lock on the
90 // _polling_list while processing incoming messages.
91 // This lock is used to give this thread ownership of
92 // services on the _polling_routine list.
93
94 // This is necessary to avoid confict with other threads
95 // processing the _polling_list
96 // (e.g., MessageQueueServer::~MessageQueueService).
97 kumpf 1.131
98 list->lock();
99 MessageQueueService *service = list->front();
100 ThreadStatus rtn = PEGASUS_THREAD_OK;
101 while (service != NULL)
102 {
103 if ((service->_incoming.count() > 0) &&
104 (service->_die.get() == 0) &&
105 (service->_threads.get() < max_threads_per_svc_queue))
106 {
107 // The _threads count is used to track the
108 // number of active threads that have been allocated
109 // to process messages for this service.
110
111 // The _threads count MUST be incremented while
112 // the polling_routine owns the _polling_thread
113 // lock and has ownership of the service object.
114
115 service->_threads++;
116 try
117 {
118 kumpf 1.131 rtn = _thread_pool->allocate_and_awaken(
119 service, _req_proc, &_polling_sem);
120 }
121 catch (...)
122 {
123 service->_threads--;
124
125 // allocate_and_awaken should never generate an exception.
126 PEGASUS_ASSERT(0);
127 }
128 // if no more threads available, break from processing loop
129 if (rtn != PEGASUS_THREAD_OK )
130 {
131 service->_threads--;
132 Logger::put(
133 Logger::STANDARD_LOG, System::CIMSERVER, Logger::TRACE,
134 "Not enough threads to process this request. "
135 "Skipping.");
136
|
137 marek 1.132 PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
|
138 kumpf 1.131 "Could not allocate thread for %s. Queue has %d "
139 "messages waiting and %d threads servicing."
140 "Skipping the service for right now. ",
141 service->getQueueName(),
142 service->_incoming.count(),
|
143 marek 1.132 service->_threads.get()));
|
144 kumpf 1.131
145 Threads::yield();
146 service = NULL;
147 }
148 }
149 if (service != NULL)
150 {
151 service = list->next_of(service);
152 }
153 }
154 list->unlock();
155 }
156 myself->exit_self( (ThreadReturnType) 1 );
157 return 0;
|
158 mday 1.53 }
159
160
161 Semaphore MessageQueueService::_polling_sem(0);
162 AtomicInt MessageQueueService::_stop_polling(0);
163
|
164 mday 1.15
|
165 kumpf 1.104 MessageQueueService::MessageQueueService(
|
166 kumpf 1.131 const char* name,
167 Uint32 queueID,
168 Uint32 capabilities,
169 Uint32 mask)
170 : Base(name, true, queueID),
171 _mask(mask),
172 _die(0),
173 _threads(0),
174 _incoming(),
175 _incoming_queue_shutdown(0)
176 {
177 _capabilities = (capabilities | module_capabilities::async);
178
179 _default_op_timeout.tv_sec = 30;
180 _default_op_timeout.tv_usec = 100;
181
182 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
183
184 // if requested thread max is out of range, then set to
185 // MAX_THREADS_PER_SVC_QUEUE_LIMIT
186
187 kumpf 1.131 if ((max_threads_per_svc_queue < 1) ||
188 (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
189 {
190 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
191 }
192
|
193 marek 1.132 PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL2,
194 "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue));
|
195 kumpf 1.131
196 AutoMutex autoMut(_meta_dispatcher_mutex);
197
198 if (_meta_dispatcher == 0)
199 {
200 _stop_polling = 0;
201 PEGASUS_ASSERT(_service_count.get() == 0);
202 _meta_dispatcher = new cimom();
203
204 // _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
205 // minimum_cnt, maximum_cnt, deallocateWait);
206 //
207 _thread_pool =
208 new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
209 }
210 _service_count++;
211
212 if (false == register_service(name, _capabilities, _mask))
213 {
214 MessageLoaderParms parms(
215 "Common.MessageQueueService.UNABLE_TO_REGISTER",
216 kumpf 1.131 "CIM base message queue service is unable to register with the "
217 "CIMOM dispatcher.");
218 throw BindFailedException(parms);
219 }
|
220 kumpf 1.104
|
221 kumpf 1.131 _get_polling_list()->insert_back(this);
|
222 mday 1.1 }
223
|
224 mday 1.4
|
225 kumpf 1.104 MessageQueueService::~MessageQueueService()
|
226 mday 1.1 {
|
227 kumpf 1.131 _die = 1;
228
229 // The polling_routine locks the _polling_list while
230 // processing the incoming messages for services on the
231 // list. Deleting the service from the _polling_list
232 // prior to processing, avoids synchronization issues
233 // with the _polling_routine.
234
235 // ATTN: added to prevent assertion in List in which the list does not
236 // contain this element.
237
238 if (_get_polling_list()->contains(this))
239 _get_polling_list()->remove(this);
240
241 // ATTN: The code for closing the _incoming queue
242 // is not working correctly. In OpenPegasus 2.5,
243 // execution of the following code is very timing
244 // dependent. This needs to be fix.
245 // See Bug 4079 for details.
246 if (_incoming_queue_shutdown.get() == 0)
247 {
248 kumpf 1.131 _shutdown_incoming_queue();
249 }
|
250 mday 1.76
|
251 kumpf 1.131 // Wait until all threads processing the messages
252 // for this service have completed.
253
254 while (_threads.get() > 0)
|
255 konrad.r 1.109 {
|
256 kumpf 1.131 Threads::yield();
257 }
258
259 {
260 AutoMutex autoMut(_meta_dispatcher_mutex);
261 _service_count--;
262 if (_service_count.get() == 0)
263 {
264
265 _stop_polling++;
266 _polling_sem.signal();
267 if (_polling_thread)
268 {
269 _polling_thread->join();
270 delete _polling_thread;
271 _polling_thread = 0;
272 }
273 _meta_dispatcher->_shutdown_routed_queue();
274 delete _meta_dispatcher;
275 _meta_dispatcher = 0;
276
277 kumpf 1.131 delete _thread_pool;
278 _thread_pool = 0;
279 }
280 } // mutex unlocks here
281 // Clean up in case there are extra stuff on the queue.
282 while (_incoming.count())
283 {
284 try
285 {
286 delete _incoming.dequeue();
287 }
288 catch (const ListClosed&)
289 {
290 // If the list is closed, there is nothing we can do.
291 break;
292 }
|
293 konrad.r 1.109 }
|
294 kumpf 1.104 }
|
295 mday 1.1
|
296 kumpf 1.104 void MessageQueueService::_shutdown_incoming_queue()
|
297 mday 1.7 {
|
298 kumpf 1.131 if (_incoming_queue_shutdown.get() > 0)
299 return;
300
301 AsyncIoctl *msg = new AsyncIoctl(
302 0,
303 _queueId,
304 _queueId,
305 true,
306 AsyncIoctl::IO_CLOSE,
307 0,
308 0);
309
310 msg->op = get_op();
311 msg->op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
312 msg->op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
313 | ASYNC_OPFLAGS_SIMPLE_STATUS);
314 msg->op->_state &= ~ASYNC_OPSTATE_COMPLETE;
|
315 kumpf 1.104
|
316 kumpf 1.131 msg->op->_op_dest = this;
317 msg->op->_request.reset(msg);
318 try
319 {
320 _incoming.enqueue_wait(msg->op);
321 _polling_sem.signal();
322 }
323 catch (const ListClosed&)
324 {
|
325 denise.eckstein 1.116 // This means the queue has already been shut-down (happens when there
|
326 kumpf 1.131 // are two AsyncIoctrl::IO_CLOSE messages generated and one got first
327 // processed.
328 delete msg;
329 }
330 catch (const Permission&)
331 {
332 delete msg;
333 }
|
334 mday 1.7 }
335
|
336 mday 1.22
337
|
338 kumpf 1.131 void MessageQueueService::enqueue(Message* msg)
|
339 mday 1.22 {
|
340 kumpf 1.131 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
|
341 kumpf 1.28
|
342 kumpf 1.131 Base::enqueue(msg);
|
343 kumpf 1.28
|
344 kumpf 1.131 PEG_METHOD_EXIT();
|
345 mday 1.22 }
346
347
|
348 mike 1.124 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
|
349 kumpf 1.131 void* parm)
|
350 kumpf 1.103 {
|
351 konrad.r 1.107 MessageQueueService* service =
|
352 kumpf 1.131 reinterpret_cast<MessageQueueService*>(parm);
|
353 konrad.r 1.107 PEGASUS_ASSERT(service != 0);
|
354 kumpf 1.103 try
355 {
|
356 mike 1.118 if (service->_die.get() != 0)
|
357 kumpf 1.103 {
|
358 denise.eckstein 1.116 service->_threads--;
|
359 kumpf 1.131 return 0;
|
360 kumpf 1.103 }
361 // pull messages off the incoming queue and dispatch them. then
362 // check pending messages that are non-blocking
363 AsyncOpNode *operation = 0;
364
365 // many operations may have been queued.
366 do
367 {
368 try
369 {
|
370 mike 1.120 operation = service->_incoming.dequeue();
|
371 kumpf 1.103 }
|
372 kumpf 1.131 catch (ListClosed&)
|
373 kumpf 1.103 {
|
374 kumpf 1.104 // ATTN: This appears to be a common loop exit path.
|
375 marek 1.132 //PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
376 kumpf 1.104 // "Caught ListClosed exception. Exiting _req_proc.");
|
377 kumpf 1.103 break;
378 }
379
380 if (operation)
381 {
382 operation->_service_ptr = service;
383 service->_handle_incoming_operation(operation);
384 }
385 } while (operation);
386 }
387 catch (const Exception& e)
388 {
389 PEG_TRACE_STRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
390 String("Caught exception: \"") + e.getMessage() +
391 "\". Exiting _req_proc.");
392 }
393 catch (...)
394 {
|
395 marek 1.132 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL2,
|
396 kumpf 1.103 "Caught unrecognized exception. Exiting _req_proc.");
397 }
|
398 konrad.r 1.107 service->_threads--;
|
399 kumpf 1.131 return 0;
|
400 mday 1.1 }
401
|
402 mday 1.43
|
403 kumpf 1.104 void MessageQueueService::_sendwait_callback(
|
404 kumpf 1.131 AsyncOpNode* op,
405 MessageQueue* q,
|
406 kumpf 1.104 void *parm)
|
407 mday 1.33 {
|
408 kumpf 1.131 op->_client_sem.signal();
|
409 mday 1.33 }
410
|
411 mday 1.30
412 // callback function is responsible for cleaning up all resources
413 // including op, op->_callback_node, and op->_callback_ptr
|
414 kumpf 1.131 void MessageQueueService::_handle_async_callback(AsyncOpNode* op)
|
415 mday 1.22 {
|
416 kumpf 1.131 if (op->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK)
417 {
418 Message *msg = op->removeRequest();
419 if (msg && (msg->getMask() & MessageMask::ha_async))
420 {
421 if (msg->getType() == async_messages::ASYNC_LEGACY_OP_START)
422 {
423 AsyncLegacyOperationStart *wrapper =
424 static_cast<AsyncLegacyOperationStart *>(msg);
425 msg = wrapper->get_action();
426 delete wrapper;
427 }
428 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_START)
429 {
430 AsyncModuleOperationStart *wrapper =
431 static_cast<AsyncModuleOperationStart *>(msg);
432 msg = wrapper->get_action();
433 delete wrapper;
434 }
435 else if (msg->getType() == async_messages::ASYNC_OP_START)
436 {
437 kumpf 1.131 AsyncOperationStart *wrapper =
438 static_cast<AsyncOperationStart *>(msg);
439 msg = wrapper->get_action();
440 delete wrapper;
441 }
442 delete msg;
443 }
|
444 mday 1.39
|
445 kumpf 1.131 msg = op->removeResponse();
446 if (msg && (msg->getMask() & MessageMask::ha_async))
447 {
448 if (msg->getType() == async_messages::ASYNC_LEGACY_OP_RESULT)
449 {
450 AsyncLegacyOperationResult *wrapper =
451 static_cast<AsyncLegacyOperationResult *>(msg);
452 msg = wrapper->get_result();
453 delete wrapper;
454 }
455 else if (msg->getType() == async_messages::ASYNC_MODULE_OP_RESULT)
456 {
457 AsyncModuleOperationResult *wrapper =
458 static_cast<AsyncModuleOperationResult *>(msg);
459 msg = wrapper->get_result();
460 delete wrapper;
461 }
462 }
463 void (*callback)(Message *, void *, void *) = op->__async_callback;
464 void *handle = op->_callback_handle;
465 void *parm = op->_callback_parameter;
466 kumpf 1.131 op->release();
467 return_op(op);
468 callback(msg, handle, parm);
469 }
470 else if (op->_flags & ASYNC_OPFLAGS_CALLBACK)
471 {
472 // note that _callback_node may be different from op
473 // op->_callback_response_q is a "this" pointer we can use for
474 // static callback methods
475 op->_async_callback(
476 op->_callback_node, op->_callback_response_q, op->_callback_ptr);
477 }
|
478 mday 1.22 }
479
|
480 mday 1.1
|
481 kumpf 1.131 void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation)
|
482 mday 1.1 {
|
483 kumpf 1.131 if (operation != 0)
484 {
|
485 kumpf 1.104
486 // ATTN: optimization
|
487 mday 1.22 // << Tue Feb 19 14:10:38 2002 mdd >>
|
488 kumpf 1.131 operation->lock();
|
489 kumpf 1.104
|
490 kumpf 1.131 Message *rq = operation->_request.get();
|
491 kumpf 1.104
|
492 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
|
493 kumpf 1.104 // move this to the bottom of the loop when the majority of
494 // messages become async messages.
|
495 mday 1.31
|
496 kumpf 1.131 // divert legacy messages to handleEnqueue
497 if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
498 {
499 operation->_request.release();
500 operation->unlock();
501 // delete the op node
502 operation->release();
503 return_op(operation);
504
505 handleEnqueue(rq);
506 return;
507 }
508
509 if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK ||
510 operation->_flags & ASYNC_OPFLAGS_SAFE_CALLBACK) &&
511 (operation->_state & ASYNC_OPSTATE_COMPLETE))
512 {
513 operation->unlock();
514 _handle_async_callback(operation);
515 }
516 else
517 kumpf 1.131 {
518 PEGASUS_ASSERT(rq != 0);
519 operation->unlock();
520 _handle_async_request(static_cast<AsyncRequest *>(rq));
521 }
522 }
523 return;
|
524 mday 1.1 }
525
526 void MessageQueueService::_handle_async_request(AsyncRequest *req)
527 {
|
528 kumpf 1.131 if (req != 0)
529 {
530 req->op->processing();
531
532 Uint32 type = req->getType();
533 if (type == async_messages::HEARTBEAT)
534 handle_heartbeat_request(req);
535 else if (type == async_messages::IOCTL)
536 handle_AsyncIoctl(static_cast<AsyncIoctl *>(req));
537 else if (type == async_messages::CIMSERVICE_START)
538 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
539 else if (type == async_messages::CIMSERVICE_STOP)
540 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
541 else if (type == async_messages::CIMSERVICE_PAUSE)
542 handle_CimServicePause(static_cast<CimServicePause *>(req));
543 else if (type == async_messages::CIMSERVICE_RESUME)
544 handle_CimServiceResume(static_cast<CimServiceResume *>(req));
545 else if (type == async_messages::ASYNC_OP_START)
546 handle_AsyncOperationStart(static_cast<AsyncOperationStart *>(req));
547 else
548 {
549 kumpf 1.131 // we don't handle this request message
550 _make_response(req, async_results::CIM_NAK);
551 }
552 }
|
553 mday 1.1 }
554
|
555 mday 1.17
556 Boolean MessageQueueService::_enqueueResponse(
|
557 kumpf 1.131 Message* request,
558 Message* response)
|
559 mday 1.17 {
|
560 kumpf 1.131 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
561 "MessageQueueService::_enqueueResponse");
|
562 mday 1.25
|
563 kumpf 1.131 if (request->getMask() & MessageMask::ha_async)
564 {
565 if (response->getMask() & MessageMask::ha_async)
566 {
567 _completeAsyncResponse(
568 static_cast<AsyncRequest *>(request),
569 static_cast<AsyncReply *>(response),
570 ASYNC_OPSTATE_COMPLETE, 0);
571 PEG_METHOD_EXIT();
572 return true;
573 }
574 }
|
575 mday 1.63
|
576 kumpf 1.131 if (request->_async != 0)
577 {
578 Uint32 mask = request->_async->getMask();
579 PEGASUS_ASSERT(mask &
580 (MessageMask::ha_async | MessageMask::ha_request));
581
582 AsyncRequest *async = static_cast<AsyncRequest *>(request->_async);
583 AsyncOpNode *op = async->op;
584 request->_async = 0;
585 // the legacy request is going to be deleted by its handler
586 // remove it from the op node
587
588 static_cast<AsyncLegacyOperationStart *>(async)->get_action();
589
590 AsyncLegacyOperationResult *async_result =
591 new AsyncLegacyOperationResult(
592 op,
593 response);
594 _completeAsyncResponse(
595 async,
596 async_result,
597 kumpf 1.131 ASYNC_OPSTATE_COMPLETE,
598 0);
599 PEG_METHOD_EXIT();
600 return true;
601 }
|
602 kumpf 1.104
|
603 kumpf 1.131 // ensure that the destination queue is in response->dest
604 PEG_METHOD_EXIT();
605 return SendForget(response);
|
606 mday 1.17 }
607
|
608 kumpf 1.131 void MessageQueueService::_make_response(Message* req, Uint32 code)
|
609 mday 1.1 {
|
610 kumpf 1.131 cimom::_make_response(req, code);
|
611 mday 1.1 }
612
613
|
614 kumpf 1.104 void MessageQueueService::_completeAsyncResponse(
|
615 kumpf 1.131 AsyncRequest* request,
616 AsyncReply* reply,
|
617 kumpf 1.104 Uint32 state,
618 Uint32 flag)
|
619 mday 1.5 {
|
620 kumpf 1.131 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
621 "MessageQueueService::_completeAsyncResponse");
|
622 kumpf 1.37
|
623 kumpf 1.131 cimom::_completeAsyncResponse(request, reply, state, flag);
|
624 kumpf 1.37
|
625 kumpf 1.131 PEG_METHOD_EXIT();
|
626 mday 1.5 }
627
628
|
629 kumpf 1.104 void MessageQueueService::_complete_op_node(
|
630 kumpf 1.131 AsyncOpNode* op,
|
631 kumpf 1.104 Uint32 state,
632 Uint32 flag,
633 Uint32 code)
|
634 mday 1.32 {
|
635 kumpf 1.131 cimom::_complete_op_node(op, state, flag, code);
|
636 mday 1.32 }
637
|
638 mday 1.5
|
639 kumpf 1.131 Boolean MessageQueueService::accept_async(AsyncOpNode* op)
|
640 mday 1.5 {
|
641 kumpf 1.131 if (_incoming_queue_shutdown.get() > 0)
642 return false;
643 if (_polling_thread == NULL)
644 {
645 _polling_thread = new Thread(
646 polling_routine,
647 reinterpret_cast<void *>(_get_polling_list()),
648 false);
649 ThreadStatus tr = PEGASUS_THREAD_OK;
650 while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
651 {
652 if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
653 Threads::yield();
654 else
655 throw Exception(MessageLoaderParms(
656 "Common.MessageQueueService.NOT_ENOUGH_THREAD",
657 "Could not allocate thread for the polling thread."));
658 }
659 }
|
660 kumpf 1.104 // ATTN optimization remove the message checking altogether in the base
|
661 mday 1.20 // << Mon Feb 18 14:02:20 2002 mdd >>
|
662 kumpf 1.131 op->lock();
663 Message *rq = op->_request.get();
664 Message *rp = op->_response.get();
665 op->unlock();
|
666 kumpf 1.104
|
667 kumpf 1.131 if ((rq != 0 && (true == messageOK(rq))) ||
668 (rp != 0 && (true == messageOK(rp))) && _die.get() == 0)
669 {
670 _incoming.enqueue_wait(op);
671 _polling_sem.signal();
672 return true;
673 }
674 return false;
|
675 mday 1.5 }
676
|
677 kumpf 1.131 Boolean MessageQueueService::messageOK(const Message* msg)
|
678 mday 1.5 {
|
679 kumpf 1.131 if (_incoming_queue_shutdown.get() > 0)
680 return false;
681 return true;
|
682 mday 1.18 }
683
|
684 kumpf 1.131 void MessageQueueService::handle_heartbeat_request(AsyncRequest* req)
|
685 mday 1.1 {
|
686 kumpf 1.131 // default action is to echo a heartbeat response
|
687 kumpf 1.104
|
688 kumpf 1.131 AsyncReply *reply = new AsyncReply(
689 async_messages::HEARTBEAT,
690 0,
691 req->op,
692 async_results::OK,
693 req->resp,
694 false);
695 _completeAsyncResponse(req, reply, ASYNC_OPSTATE_COMPLETE, 0);
|
696 mday 1.1 }
697
698
|
699 kumpf 1.131 void MessageQueueService::handle_heartbeat_reply(AsyncReply* rep)
|
700 kumpf 1.104 {
|
701 mday 1.1 }
|
702 kumpf 1.104
|
703 kumpf 1.131 void MessageQueueService::handle_AsyncIoctl(AsyncIoctl* req)
|
704 mday 1.1 {
|
705 kumpf 1.131 switch (req->ctl)
706 {
707 case AsyncIoctl::IO_CLOSE:
708 {
709 MessageQueueService *service =
710 static_cast<MessageQueueService *>(req->op->_service_ptr);
|
711 kumpf 1.81
712 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
713 kumpf 1.131 PEGASUS_STD(cout) << service->getQueueName() <<
714 " Received AsyncIoctl::IO_CLOSE " << PEGASUS_STD(endl);
|
715 kumpf 1.81 #endif
|
716 kumpf 1.104
|
717 kumpf 1.131 // respond to this message. this is fire and forget, so we
718 // don't need to delete anything.
719 // this takes care of two problems that were being found
720 // << Thu Oct 9 10:52:48 2003 mdd >>
721 _make_response(req, async_results::OK);
722 // ensure we do not accept any further messages
|
723 kumpf 1.104
|
724 kumpf 1.131 // ensure we don't recurse on IO_CLOSE
725 if (_incoming_queue_shutdown.get() > 0)
726 break;
|
727 kumpf 1.104
|
728 kumpf 1.131 // set the closing flag
729 service->_incoming_queue_shutdown = 1;
730 // empty out the queue
731 while (1)
|
732 kumpf 1.104 {
|
733 kumpf 1.131 AsyncOpNode *operation;
734 try
735 {
736 operation = service->_incoming.dequeue();
737 }
738 catch (IPCException&)
739 {
740 break;
741 }
742 if (operation)
743 {
744 operation->_service_ptr = service;
745 service->_handle_incoming_operation(operation);
746 }
747 else
748 break;
749 } // message processing loop
750
751 // shutdown the AsyncQueue
752 service->_incoming.close();
753 return;
754 kumpf 1.131 }
|
755 kumpf 1.104
|
756 kumpf 1.131 default:
757 _make_response(req, async_results::CIM_NAK);
758 }
|
759 mday 1.1 }
|
760 mday 1.8
|
761 kumpf 1.131 void MessageQueueService::handle_CimServiceStart(CimServiceStart* req)
|
762 mday 1.1 {
|
763 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
764 kumpf 1.131 PEGASUS_STD(cout) << getQueueName() << "received START" <<
765 PEGASUS_STD(endl);
|
766 kumpf 1.81 #endif
|
767 kumpf 1.104
|
768 kumpf 1.131 // clear the stoped bit and update
769 _capabilities &= (~(module_capabilities::stopped));
770 _make_response(req, async_results::OK);
771 // now tell the meta dispatcher we are stopped
772 update_service(_capabilities, _mask);
773 }
|
774 mday 1.10
|
775 kumpf 1.131 void MessageQueueService::handle_CimServiceStop(CimServiceStop* req)
|
776 mday 1.1 {
|
777 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
778 kumpf 1.131 PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
|
779 kumpf 1.81 #endif
|
780 kumpf 1.131 // set the stopeed bit and update
781 _capabilities |= module_capabilities::stopped;
782 _make_response(req, async_results::CIM_STOPPED);
783 // now tell the meta dispatcher we are stopped
784 update_service(_capabilities, _mask);
|
785 mday 1.1 }
|
786 kumpf 1.104
|
787 kumpf 1.131 void MessageQueueService::handle_CimServicePause(CimServicePause* req)
|
788 mday 1.1 {
|
789 kumpf 1.131 // set the paused bit and update
790 _capabilities |= module_capabilities::paused;
791 update_service(_capabilities, _mask);
792 _make_response(req, async_results::CIM_PAUSED);
793 // now tell the meta dispatcher we are stopped
|
794 mday 1.1 }
|
795 kumpf 1.104
|
796 kumpf 1.131 void MessageQueueService::handle_CimServiceResume(CimServiceResume* req)
|
797 mday 1.1 {
|
798 kumpf 1.131 // clear the paused bit and update
799 _capabilities &= (~(module_capabilities::paused));
800 update_service(_capabilities, _mask);
801 _make_response(req, async_results::OK);
802 // now tell the meta dispatcher we are stopped
|
803 mday 1.1 }
|
804 kumpf 1.104
|
805 kumpf 1.131 void MessageQueueService::handle_AsyncOperationStart(AsyncOperationStart* req)
|
806 mday 1.1 {
|
807 kumpf 1.131 _make_response(req, async_results::CIM_NAK);
|
808 mday 1.1 }
809
|
810 kumpf 1.131 void MessageQueueService::handle_AsyncOperationResult(AsyncOperationResult* req)
|
811 mday 1.1 {
|
812 mday 1.14 }
813
|
814 mday 1.10
|
815 kumpf 1.131 void MessageQueueService::handle_AsyncLegacyOperationStart(
816 AsyncLegacyOperationStart* req)
|
817 mday 1.14 {
|
818 kumpf 1.131 // remove the legacy message from the request and enqueue it to its
819 // destination
820 Uint32 result = async_results::CIM_NAK;
|
821 kumpf 1.104
|
822 kumpf 1.131 Message* legacy = req->_act;
823 if (legacy != 0)
824 {
825 MessageQueue* queue = MessageQueue::lookup(req->_legacy_destination);
826 if (queue != 0)
827 {
828 if (queue->isAsync() == true)
829 {
830 (static_cast<MessageQueueService *>(queue))->handleEnqueue(
831 legacy);
832 }
833 else
834 {
835 // Enqueue the response:
836 queue->enqueue(req->get_action());
837 }
|
838 kumpf 1.104
|
839 kumpf 1.131 result = async_results::OK;
840 }
841 }
842 _make_response(req, result);
|
843 mday 1.14 }
844
|
845 kumpf 1.131 void MessageQueueService::handle_AsyncLegacyOperationResult(
846 AsyncLegacyOperationResult* rep)
|
847 mday 1.14 {
|
848 mday 1.1 }
849
|
850 kumpf 1.131 AsyncOpNode* MessageQueueService::get_op()
|
851 mday 1.1 {
|
852 kumpf 1.131 AsyncOpNode* op = new AsyncOpNode();
|
853 kumpf 1.104
|
854 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
855 op->_flags = ASYNC_OPFLAGS_SINGLE | ASYNC_OPFLAGS_NORMAL;
|
856 kumpf 1.104
|
857 mday 1.1 return op;
858 }
859
|
860 kumpf 1.131 void MessageQueueService::return_op(AsyncOpNode* op)
|
861 mday 1.1 {
|
862 kumpf 1.131 PEGASUS_ASSERT(op->_state & ASYNC_OPSTATE_RELEASED);
863 delete op;
|
864 mday 1.1 }
865
|
866 mday 1.18
|
867 kumpf 1.104 Boolean MessageQueueService::SendAsync(
|
868 kumpf 1.131 AsyncOpNode* op,
|
869 kumpf 1.104 Uint32 destination,
|
870 kumpf 1.131 void (*callback)(AsyncOpNode*, MessageQueue*, void*),
871 MessageQueue* callback_response_q,
872 void* callback_ptr)
873 {
874 PEGASUS_ASSERT(op != 0 && callback != 0);
875
876 // get the queue handle for the destination
877
878 op->lock();
879 // destination of this message
880 op->_op_dest = MessageQueue::lookup(destination);
881 op->_flags |= ASYNC_OPFLAGS_CALLBACK;
882 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
883 // initialize the callback data
884 // callback function to be executed by recpt. of response
885 op->_async_callback = callback;
886 // the op node
887 op->_callback_node = op;
888 // the queue that will receive the response
889 op->_callback_response_q = callback_response_q;
890 // user data for callback
891 kumpf 1.131 op->_callback_ptr = callback_ptr;
892 // I am the originator of this request
893 op->_callback_request_q = this;
894
895 op->unlock();
896 if (op->_op_dest == 0)
897 return false;
|
898 kumpf 1.104
|
899 kumpf 1.131 return _meta_dispatcher->route_async(op);
|
900 mday 1.18 }
901
902
|
903 kumpf 1.104 Boolean MessageQueueService::SendAsync(
|
904 kumpf 1.131 Message* msg,
|
905 kumpf 1.104 Uint32 destination,
|
906 kumpf 1.131 void (*callback)(Message* response, void* handle, void* parameter),
907 void* handle,
908 void* parameter)
909 {
910 if (msg == NULL)
911 return false;
912 if (callback == NULL)
913 return SendForget(msg);
914 AsyncOpNode *op = get_op();
915 msg->dest = destination;
916 if (NULL == (op->_op_dest = MessageQueue::lookup(msg->dest)))
917 {
918 op->release();
919 return_op(op);
920 return false;
921 }
922 op->_flags |= ASYNC_OPFLAGS_SAFE_CALLBACK;
923 op->_flags &= ~(ASYNC_OPFLAGS_FIRE_AND_FORGET);
924 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
925 op->__async_callback = callback;
926 op->_callback_node = op;
927 kumpf 1.131 op->_callback_handle = handle;
928 op->_callback_parameter = parameter;
929 op->_callback_response_q = this;
930
931 if (!(msg->getMask() & MessageMask::ha_async))
932 {
933 AsyncLegacyOperationStart *wrapper = new AsyncLegacyOperationStart(
934 op,
935 destination,
936 msg,
937 destination);
938 }
939 else
940 {
941 op->_request.reset(msg);
942 (static_cast<AsyncMessage *>(msg))->op = op;
943 }
944 return _meta_dispatcher->route_async(op);
945 }
946
947
948 kumpf 1.131 Boolean MessageQueueService::SendForget(Message* msg)
949 {
950 AsyncOpNode* op = 0;
951 Uint32 mask = msg->getMask();
952
953 if (mask & MessageMask::ha_async)
954 {
955 op = (static_cast<AsyncMessage *>(msg))->op;
956 }
957
958 if (op == 0)
959 {
960 op = get_op();
961 op->_request.reset(msg);
962 if (mask & MessageMask::ha_async)
963 {
964 (static_cast<AsyncMessage *>(msg))->op = op;
965 }
966 }
967 op->_op_dest = MessageQueue::lookup(msg->dest);
968 op->_flags |= ASYNC_OPFLAGS_FIRE_AND_FORGET;
969 kumpf 1.131 op->_flags &= ~(ASYNC_OPFLAGS_CALLBACK | ASYNC_OPFLAGS_SAFE_CALLBACK
970 | ASYNC_OPFLAGS_SIMPLE_STATUS);
971 op->_state &= ~ASYNC_OPSTATE_COMPLETE;
972 if (op->_op_dest == 0)
973 {
974 op->release();
975 return_op(op);
976 return false;
977 }
978
979 // now see if the meta dispatcher will take it
980 return _meta_dispatcher->route_async(op);
981 }
982
983
984 AsyncReply *MessageQueueService::SendWait(AsyncRequest* request)
985 {
986 if (request == 0)
987 return 0;
988
989 Boolean destroy_op = false;
990 kumpf 1.131
991 if (request->op == 0)
992 {
993 request->op = get_op();
994 request->op->_request.reset(request);
995 destroy_op = true;
996 }
997
998 request->block = false;
999 request->op->_flags |= ASYNC_OPFLAGS_PSEUDO_CALLBACK;
1000 SendAsync(
1001 request->op,
1002 request->dest,
1003 _sendwait_callback,
1004 this,
1005 (void *)0);
1006
1007 request->op->_client_sem.wait();
1008
1009 AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse());
1010 rpl->op = 0;
1011 kumpf 1.131
1012 if (destroy_op == true)
1013 {
1014 request->op->lock();
1015 request->op->_request.release();
1016 request->op->_state |= ASYNC_OPSTATE_RELEASED;
1017 request->op->unlock();
1018 return_op(request->op);
1019 request->op = 0;
1020 }
1021 return rpl;
|
1022 mday 1.1 }
1023
1024
|
1025 kumpf 1.104 Boolean MessageQueueService::register_service(
1026 String name,
1027 Uint32 capabilities,
1028 Uint32 mask)
1029 {
|
1030 kumpf 1.131 RegisterCimService *msg = new RegisterCimService(
1031 0,
1032 true,
1033 name,
1034 capabilities,
1035 mask,
1036 _queueId);
1037 msg->dest = CIMOM_Q_ID;
1038
1039 Boolean registered = false;
1040 AsyncReply *reply = static_cast<AsyncReply *>(SendWait(msg));
1041
1042 if (reply != 0)
1043 {
1044 if (reply->getMask() & MessageMask::ha_async)
1045 {
1046 if (reply->getMask() & MessageMask::ha_reply)
|
1047 kumpf 1.104 {
|
1048 kumpf 1.131 if (reply->result == async_results::OK ||
1049 reply->result == async_results::MODULE_ALREADY_REGISTERED)
1050 {
1051 registered = true;
1052 }
|
1053 kumpf 1.104 }
|
1054 kumpf 1.131 }
|
1055 kumpf 1.104
|
1056 kumpf 1.131 delete reply;
1057 }
1058 delete msg;
1059 return registered;
|
1060 mday 1.1 }
1061
1062 Boolean MessageQueueService::update_service(Uint32 capabilities, Uint32 mask)
1063 {
|
1064 kumpf 1.131 UpdateCimService *msg = new UpdateCimService(
1065 0,
1066 true,
1067 _queueId,
1068 _capabilities,
1069 _mask);
1070 Boolean registered = false;
1071
1072 AsyncMessage* reply = SendWait(msg);
1073 if (reply)
1074 {
1075 if (reply->getMask() & MessageMask::ha_async)
1076 {
1077 if (reply->getMask() & MessageMask::ha_reply)
|
1078 kumpf 1.104 {
|
1079 kumpf 1.131 if (static_cast<AsyncReply *>(reply)->result ==
1080 async_results::OK)
1081 {
1082 registered = true;
1083 }
|
1084 kumpf 1.104 }
|
1085 kumpf 1.131 }
1086 delete reply;
1087 }
1088 delete msg;
1089 return registered;
|
1090 mday 1.1 }
1091
1092
|
1093 kumpf 1.104 Boolean MessageQueueService::deregister_service()
|
1094 mday 1.1 {
|
1095 kumpf 1.131 _meta_dispatcher->deregister_module(_queueId);
1096 return true;
|
1097 mday 1.1 }
1098
1099
|
1100 kumpf 1.104 void MessageQueueService::find_services(
1101 String name,
1102 Uint32 capabilities,
1103 Uint32 mask,
|
1104 kumpf 1.131 Array<Uint32>* results)
|
1105 mday 1.1 {
|
1106 kumpf 1.131 if (results == 0)
1107 {
1108 throw NullPointer();
1109 }
1110
1111 results->clear();
1112
1113 FindServiceQueue *req = new FindServiceQueue(
1114 0,
1115 _queueId,
1116 true,
1117 name,
1118 capabilities,
1119 mask);
1120
1121 req->dest = CIMOM_Q_ID;
1122
1123 AsyncMessage *reply = SendWait(req);
1124 if (reply)
1125 {
1126 if (reply->getMask() & MessageMask::ha_async)
1127 kumpf 1.131 {
1128 if (reply->getMask() & MessageMask::ha_reply)
|
1129 kumpf 1.104 {
|
1130 kumpf 1.131 if (reply->getType() == async_messages::FIND_SERVICE_Q_RESULT)
1131 {
1132 if ((static_cast<FindServiceQueueResult*>(reply))->result ==
1133 async_results::OK)
1134 *results =
1135 (static_cast<FindServiceQueueResult*>(reply))->qids;
1136 }
|
1137 kumpf 1.104 }
|
1138 kumpf 1.131 }
1139 delete reply;
1140 }
1141 delete req;
1142 }
1143
1144 void MessageQueueService::enumerate_service(
1145 Uint32 queue,
1146 message_module* result)
1147 {
1148 if (result == 0)
1149 {
1150 throw NullPointer();
1151 }
1152
1153 EnumerateService *req = new EnumerateService(
1154 0,
1155 _queueId,
1156 true,
1157 queue);
1158
1159 kumpf 1.131 AsyncMessage* reply = SendWait(req);
1160
1161 if (reply)
1162 {
1163 Boolean found = false;
1164
1165 if (reply->getMask() & MessageMask::ha_async)
1166 {
1167 if (reply->getMask() & MessageMask::ha_reply)
|
1168 kumpf 1.104 {
|
1169 kumpf 1.131 if (reply->getType() ==
1170 async_messages::ENUMERATE_SERVICE_RESULT)
1171 {
1172 if ((static_cast<EnumerateServiceResponse*>(reply))->
1173 result == async_results::OK)
1174 {
1175 if (found == false)
1176 {
1177 found = true;
1178
1179 result->put_name((static_cast<
1180 EnumerateServiceResponse*>(reply))->name);
1181 result->put_capabilities((static_cast<
1182 EnumerateServiceResponse*>(reply))->
1183 capabilities);
1184 result->put_mask((static_cast<
1185 EnumerateServiceResponse*>(reply))->mask);
1186 result->put_queue((static_cast<
1187 EnumerateServiceResponse*>(reply))->qid);
1188 }
1189 }
1190 kumpf 1.131 }
|
1191 kumpf 1.104 }
|
1192 kumpf 1.131 }
1193 delete reply;
1194 }
1195 delete req;
|
1196 mday 1.1 }
1197
|
1198 mike 1.120 MessageQueueService::PollingList* MessageQueueService::_get_polling_list()
1199 {
1200 _polling_list_mutex.lock();
1201
1202 if (!_polling_list)
|
1203 kumpf 1.131 _polling_list = new PollingList;
|
1204 mike 1.120
1205 _polling_list_mutex.unlock();
|
1206 kumpf 1.104
|
1207 mike 1.120 return _polling_list;
|
1208 mday 1.1 }
1209
1210 PEGASUS_NAMESPACE_END
|