1 martin 1.153 //%LICENSE////////////////////////////////////////////////////////////////
|
2 martin 1.154 //
|
3 martin 1.153 // Licensed to The Open Group (TOG) under one or more contributor license
4 // agreements. Refer to the OpenPegasusNOTICE.txt file distributed with
5 // this work for additional information regarding copyright ownership.
6 // Each contributor licenses this file to you under the OpenPegasus Open
7 // Source License; you may not use this file except in compliance with the
8 // License.
|
9 martin 1.154 //
|
10 martin 1.153 // Permission is hereby granted, free of charge, to any person obtaining a
11 // copy of this software and associated documentation files (the "Software"),
12 // to deal in the Software without restriction, including without limitation
13 // the rights to use, copy, modify, merge, publish, distribute, sublicense,
14 // and/or sell copies of the Software, and to permit persons to whom the
15 // Software is furnished to do so, subject to the following conditions:
|
16 martin 1.154 //
|
17 martin 1.153 // The above copyright notice and this permission notice shall be included
18 // in all copies or substantial portions of the Software.
|
19 martin 1.154 //
|
20 martin 1.153 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
|
21 martin 1.154 // OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
22 martin 1.153 // MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
23 // IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
24 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
25 // TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
26 // SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
27 martin 1.154 //
|
28 martin 1.153 //////////////////////////////////////////////////////////////////////////
|
29 mday 1.1 //
30 //%/////////////////////////////////////////////////////////////////////////////
31
32 #include "MessageQueueService.h"
|
33 mday 1.22 #include <Pegasus/Common/Tracer.h>
|
34 kumpf 1.126 #include <Pegasus/Common/MessageLoader.h>
|
35 mday 1.1
|
36 venkat.puvvada 1.158 PEGASUS_USING_STD;
37
|
38 mday 1.1 PEGASUS_NAMESPACE_BEGIN
39
|
40 mday 1.15 cimom *MessageQueueService::_meta_dispatcher = 0;
|
41 mike 1.118 AtomicInt MessageQueueService::_service_count(0);
|
42 venkat.puvvada 1.156 Mutex MessageQueueService::_meta_dispatcher_mutex;
|
43 mday 1.15
|
44 kumpf 1.104 static struct timeval deallocateWait = {300, 0};
|
45 mday 1.53
|
46 mday 1.55 ThreadPool *MessageQueueService::_thread_pool = 0;
|
47 mday 1.53
|
48 mike 1.120 MessageQueueService::PollingList* MessageQueueService::_polling_list;
|
49 venkat.puvvada 1.158 Mutex MessageQueueService::_polling_list_mutex;
|
50 mday 1.53
|
51 kumpf 1.62 Thread* MessageQueueService::_polling_thread = 0;
|
52 mday 1.61
|
53 kumpf 1.104 ThreadPool *MessageQueueService::get_thread_pool()
|
54 mday 1.69 {
55 return _thread_pool;
56 }
|
57 kumpf 1.117
|
58 jim.wunderlich 1.110 //
|
59 kumpf 1.117 // MAX_THREADS_PER_SVC_QUEUE
|
60 jim.wunderlich 1.110 //
61 // JR Wunderlich Jun 6, 2005
62 //
63
|
64 kumpf 1.131 #define MAX_THREADS_PER_SVC_QUEUE_LIMIT 5000
|
65 jim.wunderlich 1.114 #define MAX_THREADS_PER_SVC_QUEUE_DEFAULT 5
|
66 jim.wunderlich 1.110
|
67 kumpf 1.117 #ifndef MAX_THREADS_PER_SVC_QUEUE
68 # define MAX_THREADS_PER_SVC_QUEUE MAX_THREADS_PER_SVC_QUEUE_DEFAULT
69 #endif
70
|
71 jim.wunderlich 1.110 Uint32 max_threads_per_svc_queue;
|
72 mday 1.69
|
73 kumpf 1.131 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::polling_routine(
74 void* parm)
|
75 mday 1.53 {
|
76 kumpf 1.131 Thread *myself = reinterpret_cast<Thread *>(parm);
|
77 venkat.puvvada 1.155 MessageQueueService::PollingList *list =
78 reinterpret_cast<MessageQueueService::PollingList*>(myself->get_parm());
|
79 kumpf 1.131
|
80 venkat.puvvada 1.158 try
|
81 kumpf 1.131 {
|
82 venkat.puvvada 1.158 while (_stop_polling.get() == 0)
|
83 kumpf 1.131 {
|
84 venkat.puvvada 1.158 _polling_sem.wait();
|
85 kumpf 1.131
|
86 venkat.puvvada 1.158 if (_stop_polling.get() != 0)
87 {
88 break;
89 }
|
90 kumpf 1.131
|
91 venkat.puvvada 1.158 // The polling_routine thread must hold the lock on the
92 // _polling_list while processing incoming messages.
93 // This lock is used to give this thread ownership of
94 // services on the _polling_routine list.
95
96 // This is necessary to avoid confict with other threads
97 // processing the _polling_list
98 // (e.g., MessageQueueServer::~MessageQueueService).
99
100 _polling_list_mutex.lock();
101 MessageQueueService *service = list->front();
102 ThreadStatus rtn = PEGASUS_THREAD_OK;
103 while (service != NULL)
|
104 kumpf 1.131 {
|
105 venkat.puvvada 1.158 if ((service->_incoming.count() > 0) &&
106 (service->_die.get() == 0) &&
107 (service->_threads.get() < max_threads_per_svc_queue))
108 {
109 // The _threads count is used to track the
110 // number of active threads that have been allocated
111 // to process messages for this service.
112
113 // The _threads count MUST be incremented while
114 // the polling_routine owns the _polling_thread
115 // lock and has ownership of the service object.
|
116 kumpf 1.131
|
117 venkat.puvvada 1.158 service->_threads++;
|
118 kumpf 1.131 rtn = _thread_pool->allocate_and_awaken(
119 service, _req_proc, &_polling_sem);
|
120 venkat.puvvada 1.158 // if no more threads available, break from processing loop
121 if (rtn != PEGASUS_THREAD_OK )
122 {
123 service->_threads--;
124 PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL1,
125 "Could not allocate thread for %s. Queue has %d "
126 "messages waiting and %d threads servicing."
127 "Skipping the service for right now. ",
128 service->getQueueName(),
129 service->_incoming.count(),
130 service->_threads.get()));
131
132 Threads::yield();
133 break;
134 }
|
135 kumpf 1.131 }
|
136 venkat.puvvada 1.158 service = list->next_of(service);
137 }
138 _polling_list_mutex.unlock();
139 }
140 }
141 catch(const Exception &e)
142 {
143 PEG_TRACE((TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1,
144 "Exception caught in MessageQueueService::polling_routine : %s",
145 (const char*)e.getMessage().getCString()));
146 }
147 catch(const exception &e)
148 {
149 PEG_TRACE((TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1,
150 "Exception caught in MessageQueueService::polling_routine : %s",
151 e.what()));
152 }
153 catch(...)
154 {
155 PEG_TRACE_CSTRING(TRC_MESSAGEQUEUESERVICE,Tracer::LEVEL1,
156 "Unknown Exception caught in MessageQueueService::polling_routine");
157 venkat.puvvada 1.158 }
|
158 kumpf 1.131
|
159 venkat.puvvada 1.158 PEGASUS_ASSERT(_stop_polling.get());
|
160 kumpf 1.131
|
161 kumpf 1.135 return ThreadReturnType(0);
|
162 mday 1.53 }
163
164
165 Semaphore MessageQueueService::_polling_sem(0);
166 AtomicInt MessageQueueService::_stop_polling(0);
167
|
168 mday 1.15
|
169 kumpf 1.104 MessageQueueService::MessageQueueService(
|
170 kumpf 1.131 const char* name,
|
171 venkat.puvvada 1.148 Uint32 queueID)
|
172 kumpf 1.131 : Base(name, true, queueID),
|
173 venkat.puvvada 1.158 _die(0),
|
174 kumpf 1.131 _threads(0),
175 _incoming(),
176 _incoming_queue_shutdown(0)
177 {
|
178 venkat.puvvada 1.148 _isRunning = true;
|
179 kumpf 1.131
180 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE;
181
182 // if requested thread max is out of range, then set to
183 // MAX_THREADS_PER_SVC_QUEUE_LIMIT
184
185 if ((max_threads_per_svc_queue < 1) ||
186 (max_threads_per_svc_queue > MAX_THREADS_PER_SVC_QUEUE_LIMIT))
187 {
188 max_threads_per_svc_queue = MAX_THREADS_PER_SVC_QUEUE_LIMIT;
189 }
190
|
191 marek 1.138 PEG_TRACE((TRC_MESSAGEQUEUESERVICE, Tracer::LEVEL3,
|
192 marek 1.132 "max_threads_per_svc_queue set to %u.", max_threads_per_svc_queue));
|
193 kumpf 1.159
|
194 venkat.puvvada 1.158 AutoMutex autoMut(_meta_dispatcher_mutex);
|
195 venkat.puvvada 1.156
|
196 kumpf 1.131 if (_meta_dispatcher == 0)
197 {
198 _stop_polling = 0;
199 PEGASUS_ASSERT(_service_count.get() == 0);
200 _meta_dispatcher = new cimom();
201
202 // _thread_pool = new ThreadPool(initial_cnt, "MessageQueueService",
203 // minimum_cnt, maximum_cnt, deallocateWait);
204 //
205 _thread_pool =
206 new ThreadPool(0, "MessageQueueService", 0, 0, deallocateWait);
207 }
208 _service_count++;
209
|
210 venkat.puvvada 1.155 // Add to the polling list
211 if (!_polling_list)
212 {
213 _polling_list = new PollingList;
214 }
|
215 venkat.puvvada 1.158 _polling_list->insert_back(this);
216 _meta_dispatcher->registerCIMService(this);
|
217 mday 1.1 }
218
|
219 mday 1.4
|
220 kumpf 1.104 MessageQueueService::~MessageQueueService()
|
221 mday 1.1 {
|
222 venkat.puvvada 1.158
|
223 venkat.puvvada 1.152 // Close incoming queue.
|
224 kumpf 1.131 if (_incoming_queue_shutdown.get() == 0)
225 {
|
226 venkat.puvvada 1.152 AsyncIoClose *msg = new AsyncIoClose(
227 0,
228 _queueId,
229 _queueId,
230 true);
231 SendForget(msg);
232 // Wait until our queue has been shutdown.
233 while (_incoming_queue_shutdown.get() == 0)
234 {
235 Threads::yield();
236 }
|
237 kumpf 1.131 }
|
238 mday 1.76
|
239 venkat.puvvada 1.158 // die now.
240 _die = 1;
241
242 _meta_dispatcher->deregisterCIMService(this);
|
243 venkat.puvvada 1.152
|
244 kumpf 1.131 // Wait until all threads processing the messages
245 // for this service have completed.
246 while (_threads.get() > 0)
|
247 konrad.r 1.109 {
|
248 kumpf 1.131 Threads::yield();
249 }
250
|
251 venkat.puvvada 1.158
252 // The polling_routine locks the _polling_list while
253 // processing the incoming messages for services on the
254 // list. Deleting the service from the _polling_list
255 // prior to processing, avoids synchronization issues
256 // with the _polling_routine.
257 _removeFromPollingList(this);
|
258 venkat.puvvada 1.157
|
259 kumpf 1.131 {
|
260 venkat.puvvada 1.158 AutoMutex autoMut(_meta_dispatcher_mutex);
|
261 venkat.puvvada 1.156
|
262 kumpf 1.131 _service_count--;
|
263 venkat.puvvada 1.152 // If we are last service to die, delete metadispatcher.
|
264 kumpf 1.131 if (_service_count.get() == 0)
265 {
266 _stop_polling++;
267 _polling_sem.signal();
268 if (_polling_thread)
269 {
270 _polling_thread->join();
271 delete _polling_thread;
272 _polling_thread = 0;
273 }
274 delete _meta_dispatcher;
275 _meta_dispatcher = 0;
276
277 delete _thread_pool;
278 _thread_pool = 0;
279 }
|
280 venkat.puvvada 1.155 }
|
281 kumpf 1.145
282 // Clean up any extra stuff on the queue.
283 AsyncOpNode* op = 0;
284 while ((op = _incoming.dequeue()))
|
285 kumpf 1.131 {
|
286 kumpf 1.145 delete op;
|
287 konrad.r 1.109 }
|
288 kumpf 1.104 }
|
289 mday 1.1
|
290 kumpf 1.131 void MessageQueueService::enqueue(Message* msg)
|
291 mday 1.22 {
|
292 kumpf 1.131 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE, "MessageQueueService::enqueue()");
|
293 kumpf 1.28
|
294 kumpf 1.131 Base::enqueue(msg);
|
295 kumpf 1.28
|
296 kumpf 1.131 PEG_METHOD_EXIT();
|
297 mday 1.22 }
298
299
|
300 mike 1.124 ThreadReturnType PEGASUS_THREAD_CDECL MessageQueueService::_req_proc(
|
301 kumpf 1.131 void* parm)
|
302 kumpf 1.103 {
|
303 konrad.r 1.107 MessageQueueService* service =
|
304 kumpf 1.131 reinterpret_cast<MessageQueueService*>(parm);
|
305 konrad.r 1.107 PEGASUS_ASSERT(service != 0);
|
306 kumpf 1.103 try
307 {
|
308 venkat.puvvada 1.158 if (service->_die.get() != 0)
|
309 kumpf 1.103 {
|
310 denise.eckstein 1.116 service->_threads--;
|
311 kumpf 1.131 return 0;
|
312 kumpf 1.103 }
313 // pull messages off the incoming queue and dispatch them. then
314 // check pending messages that are non-blocking
315 AsyncOpNode *operation = 0;
316
317 // many operations may have been queued.
318 do
319 {
|
320 kumpf 1.145 operation = service->_incoming.dequeue();
|
321 kumpf 1.103
322 if (operation)
323 {
324 operation->_service_ptr = service;
325 service->_handle_incoming_operation(operation);
326 }
327 } while (operation);
328 }
329 catch (const Exception& e)
330 {
|
331 thilo.boehm 1.142 PEG_TRACE((TRC_DISCARDED_DATA, Tracer::LEVEL1,
332 "Caught exception: \"%s\". Exiting _req_proc.",
333 (const char*)e.getMessage().getCString()));
|
334 kumpf 1.103 }
335 catch (...)
336 {
|
337 marek 1.138 PEG_TRACE_CSTRING(TRC_DISCARDED_DATA, Tracer::LEVEL1,
|
338 kumpf 1.103 "Caught unrecognized exception. Exiting _req_proc.");
339 }
|
340 konrad.r 1.107 service->_threads--;
|
341 kumpf 1.131 return 0;
|
342 mday 1.1 }
343
|
344 mday 1.43
|
345 kumpf 1.104 void MessageQueueService::_sendwait_callback(
|
346 kumpf 1.131 AsyncOpNode* op,
347 MessageQueue* q,
|
348 kumpf 1.104 void *parm)
|
349 mday 1.33 {
|
350 kumpf 1.131 op->_client_sem.signal();
|
351 mday 1.33 }
352
|
353 mday 1.30
354 // callback function is responsible for cleaning up all resources
355 // including op, op->_callback_node, and op->_callback_ptr
|
356 kumpf 1.131 void MessageQueueService::_handle_async_callback(AsyncOpNode* op)
|
357 mday 1.22 {
|
358 venkat.puvvada 1.147 PEGASUS_ASSERT(op->_flags == ASYNC_OPFLAGS_CALLBACK);
359 // note that _callback_node may be different from op
360 // op->_callback_response_q is a "this" pointer we can use for
361 // static callback methods
362 op->_async_callback(
363 op->_callback_node, op->_callback_response_q, op->_callback_ptr);
|
364 mday 1.22 }
365
|
366 mday 1.1
|
367 kumpf 1.131 void MessageQueueService::_handle_incoming_operation(AsyncOpNode* operation)
|
368 mday 1.1 {
|
369 kumpf 1.131 if (operation != 0)
370 {
371 Message *rq = operation->_request.get();
|
372 kumpf 1.104
|
373 mday 1.31 // optimization <<< Thu Mar 7 21:04:05 2002 mdd >>>
|
374 kumpf 1.104 // move this to the bottom of the loop when the majority of
375 // messages become async messages.
|
376 mday 1.31
|
377 kumpf 1.131 // divert legacy messages to handleEnqueue
378 if ((rq != 0) && (!(rq->getMask() & MessageMask::ha_async)))
379 {
380 operation->_request.release();
381 // delete the op node
382 return_op(operation);
383 handleEnqueue(rq);
384 return;
385 }
386
|
387 venkat.puvvada 1.147 if ((operation->_flags & ASYNC_OPFLAGS_CALLBACK) &&
|
388 kumpf 1.131 (operation->_state & ASYNC_OPSTATE_COMPLETE))
389 {
390 _handle_async_callback(operation);
391 }
392 else
393 {
394 PEGASUS_ASSERT(rq != 0);
395 _handle_async_request(static_cast<AsyncRequest *>(rq));
396 }
397 }
398 return;
|
399 mday 1.1 }
400
401 void MessageQueueService::_handle_async_request(AsyncRequest *req)
402 {
|
403 venkat.puvvada 1.147 MessageType type = req->getType();
|
404 venkat.puvvada 1.152 if (type == ASYNC_IOCLOSE)
|
405 venkat.puvvada 1.147 {
|
406 venkat.puvvada 1.152 handle_AsyncIoClose(static_cast<AsyncIoClose*>(req));
|
407 venkat.puvvada 1.147 }
408 else if (type == ASYNC_CIMSERVICE_START)
409 {
410 handle_CimServiceStart(static_cast<CimServiceStart *>(req));
411 }
412 else if (type == ASYNC_CIMSERVICE_STOP)
|
413 kumpf 1.131 {
|
414 venkat.puvvada 1.147 handle_CimServiceStop(static_cast<CimServiceStop *>(req));
415 }
416 else
417 {
418 // we don't handle this request message
419 _make_response(req, async_results::CIM_NAK);
|
420 kumpf 1.131 }
|
421 mday 1.1 }
422
|
423 mday 1.17 Boolean MessageQueueService::_enqueueResponse(
|
424 kumpf 1.131 Message* request,
425 Message* response)
|
426 mday 1.17 {
|
427 kumpf 1.131 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
428 "MessageQueueService::_enqueueResponse");
|
429 mday 1.25
|
430 kumpf 1.131 if (request->getMask() & MessageMask::ha_async)
431 {
432 if (response->getMask() & MessageMask::ha_async)
433 {
434 _completeAsyncResponse(
435 static_cast<AsyncRequest *>(request),
|
436 venkat.puvvada 1.149 static_cast<AsyncReply *>(response));
437
|
438 kumpf 1.131 PEG_METHOD_EXIT();
439 return true;
440 }
441 }
|
442 mday 1.63
|
443 kumpf 1.134 AsyncRequest* asyncRequest =
444 static_cast<AsyncRequest*>(request->get_async());
445
446 if (asyncRequest != 0)
|
447 kumpf 1.131 {
|
448 kumpf 1.134 PEGASUS_ASSERT(asyncRequest->getMask() &
|
449 kumpf 1.131 (MessageMask::ha_async | MessageMask::ha_request));
450
|
451 kumpf 1.134 AsyncOpNode* op = asyncRequest->op;
452
|
453 kumpf 1.131 // the legacy request is going to be deleted by its handler
454 // remove it from the op node
455
|
456 kumpf 1.134 static_cast<AsyncLegacyOperationStart *>(asyncRequest)->get_action();
|
457 kumpf 1.131
458 AsyncLegacyOperationResult *async_result =
459 new AsyncLegacyOperationResult(
460 op,
461 response);
462 _completeAsyncResponse(
|
463 kumpf 1.134 asyncRequest,
|
464 venkat.puvvada 1.149 async_result);
465
|
466 kumpf 1.131 PEG_METHOD_EXIT();
467 return true;
468 }
|
469 kumpf 1.104
|
470 kumpf 1.131 // ensure that the destination queue is in response->dest
471 PEG_METHOD_EXIT();
472 return SendForget(response);
|
473 mday 1.17 }
474
|
475 kumpf 1.131 void MessageQueueService::_make_response(Message* req, Uint32 code)
|
476 mday 1.1 {
|
477 kumpf 1.131 cimom::_make_response(req, code);
|
478 mday 1.1 }
479
|
480 kumpf 1.104 void MessageQueueService::_completeAsyncResponse(
|
481 kumpf 1.131 AsyncRequest* request,
|
482 venkat.puvvada 1.149 AsyncReply* reply)
|
483 mday 1.5 {
|
484 kumpf 1.131 PEG_METHOD_ENTER(TRC_MESSAGEQUEUESERVICE,
485 "MessageQueueService::_completeAsyncResponse");
|
486 kumpf 1.37
|
487 venkat.puvvada 1.149 cimom::_completeAsyncResponse(request, reply);
|
488 kumpf 1.37
|
489 kumpf 1.131 PEG_METHOD_EXIT();
|
490 mday 1.5 }
491
492
|
493 kumpf 1.104 void MessageQueueService::_complete_op_node(
|
494 venkat.puvvada 1.149 AsyncOpNode* op)
|
495 mday 1.32 {
|
496 venkat.puvvada 1.149 cimom::_complete_op_node(op);
|
497 mday 1.32 }
498
|
499 mday 1.5
|
500 kumpf 1.131 Boolean MessageQueueService::accept_async(AsyncOpNode* op)
|
501 mday 1.5 {
|
502 venkat.puvvada 1.158 if (!_isRunning)
503 {
504 // Don't accept any messages other than start.
505 if (op->_request.get()->getType() != ASYNC_CIMSERVICE_START)
506 {
507 return false;
508 }
509 }
510
|
511 kumpf 1.131 if (_incoming_queue_shutdown.get() > 0)
512 return false;
|
513 venkat.puvvada 1.158
|
514 kumpf 1.131 if (_polling_thread == NULL)
515 {
|
516 venkat.puvvada 1.155 PEGASUS_ASSERT(_polling_list);
|
517 kumpf 1.131 _polling_thread = new Thread(
518 polling_routine,
|
519 venkat.puvvada 1.155 reinterpret_cast<void *>(_polling_list),
|
520 kumpf 1.131 false);
521 ThreadStatus tr = PEGASUS_THREAD_OK;
522 while ( (tr =_polling_thread->run()) != PEGASUS_THREAD_OK)
523 {
524 if (tr == PEGASUS_THREAD_INSUFFICIENT_RESOURCES)
525 Threads::yield();
526 else
527 throw Exception(MessageLoaderParms(
528 "Common.MessageQueueService.NOT_ENOUGH_THREAD",
529 "Could not allocate thread for the polling thread."));
530 }
531 }
|
532 venkat.puvvada 1.158 if (_die.get() == 0)
|
533 kumpf 1.131 {
|
534 kumpf 1.145 if (_incoming.enqueue(op))
535 {
536 _polling_sem.signal();
537 return true;
538 }
|
539 kumpf 1.131 }
540 return false;
|
541 mday 1.5 }
542
|
543 venkat.puvvada 1.152 void MessageQueueService::handle_AsyncIoClose(AsyncIoClose *req)
|
544 mday 1.1 {
|
545 kumpf 1.159 MessageQueueService *service =
|
546 venkat.puvvada 1.152 static_cast<MessageQueueService*>(req->op->_op_dest);
|
547 kumpf 1.81
548 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
549 venkat.puvvada 1.152 PEGASUS_STD(cout) << service->getQueueName() <<
550 " Received AsyncIoClose " << PEGASUS_STD(endl);
|
551 kumpf 1.81 #endif
|
552 venkat.puvvada 1.152 // set the closing flag, don't accept any more messages
553 service->_incoming_queue_shutdown = 1;
|
554 kumpf 1.104
|
555 venkat.puvvada 1.152 // respond to this message. this is fire and forget, so we
556 // don't need to delete anything.
557 // this takes care of two problems that were being found
558 // << Thu Oct 9 10:52:48 2003 mdd >>
559 _make_response(req, async_results::OK);
|
560 mday 1.1 }
|
561 mday 1.8
|
562 kumpf 1.131 void MessageQueueService::handle_CimServiceStart(CimServiceStart* req)
|
563 mday 1.1 {
|
564 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
565 kumpf 1.131 PEGASUS_STD(cout) << getQueueName() << "received START" <<
566 PEGASUS_STD(endl);
|
567 kumpf 1.81 #endif
|
568 venkat.puvvada 1.148 PEGASUS_ASSERT(!_isRunning);
569 _isRunning = true;
|
570 kumpf 1.131 _make_response(req, async_results::OK);
571 }
|
572 mday 1.10
|
573 kumpf 1.131 void MessageQueueService::handle_CimServiceStop(CimServiceStop* req)
|
574 mday 1.1 {
|
575 kumpf 1.81 #ifdef MESSAGEQUEUESERVICE_DEBUG
|
576 kumpf 1.131 PEGASUS_STD(cout) << getQueueName() << "received STOP" << PEGASUS_STD(endl);
|
577 kumpf 1.81 #endif
|
578 venkat.puvvada 1.148 PEGASUS_ASSERT(_isRunning);
579 _isRunning = false;
|
580 venkat.puvvada 1.150 _make_response(req, async_results::CIM_SERVICE_STOPPED);
|
581 mday 1.14 }
582
|
583 kumpf 1.131 AsyncOpNode* MessageQueueService::get_op()
|
584 mday 1.1 {
|
585 kumpf 1.131 AsyncOpNode* op = new AsyncOpNode();
|
586 kumpf 1.104
|
587 mday 1.9 op->_state = ASYNC_OPSTATE_UNKNOWN;
|
588 venkat.puvvada 1.147 op->_flags = ASYNC_OPFLAGS_UNKNOWN;
|
589 kumpf 1.104
|
590 mday 1.1 return op;
591 }
592
|
593 kumpf 1.131 void MessageQueueService::return_op(AsyncOpNode* op)
|
594 mday 1.1 {
|
595 kumpf 1.131 delete op;
|
596 mday 1.1 }
597
|
598 mday 1.18
|
599 kumpf 1.104 Boolean MessageQueueService::SendAsync(
|
600 kumpf 1.131 AsyncOpNode* op,
|
601 kumpf 1.104 Uint32 destination,
|
602 kumpf 1.131 void (*callback)(AsyncOpNode*, MessageQueue*, void*),
603 MessageQueue* callback_response_q,
604 void* callback_ptr)
605 {
|
606 venkat.puvvada 1.147 return _sendAsync(
607 op,
608 destination,
609 callback,
610 callback_response_q,
611 callback_ptr,
612 ASYNC_OPFLAGS_CALLBACK);
613
614 }
615
616 Boolean MessageQueueService::_sendAsync(
617 AsyncOpNode* op,
618 Uint32 destination,
619 void (*callback)(AsyncOpNode*, MessageQueue*, void*),
620 MessageQueue* callback_response_q,
621 void* callback_ptr,
622 Uint32 flags)
623 {
|
624 kumpf 1.131 PEGASUS_ASSERT(op != 0 && callback != 0);
625
626 // destination of this message
627 op->_op_dest = MessageQueue::lookup(destination);
|
628 venkat.puvvada 1.147 if (op->_op_dest == 0)
629 {
630 return false;
631 }
632 op->_flags = flags;
|
633 kumpf 1.131 // initialize the callback data
634 // callback function to be executed by recpt. of response
635 op->_async_callback = callback;
636 // the op node
637 op->_callback_node = op;
638 // the queue that will receive the response
639 op->_callback_response_q = callback_response_q;
640 // user data for callback
641 op->_callback_ptr = callback_ptr;
642 // I am the originator of this request
643 op->_callback_request_q = this;
644
645 return _meta_dispatcher->route_async(op);
|
646 mday 1.18 }
647
|
648 kumpf 1.131 Boolean MessageQueueService::SendForget(Message* msg)
649 {
650 AsyncOpNode* op = 0;
651 Uint32 mask = msg->getMask();
652
653 if (mask & MessageMask::ha_async)
654 {
655 op = (static_cast<AsyncMessage *>(msg))->op;
656 }
657
658 if (op == 0)
659 {
660 op = get_op();
661 op->_request.reset(msg);
662 if (mask & MessageMask::ha_async)
663 {
664 (static_cast<AsyncMessage *>(msg))->op = op;
665 }
666 }
|
667 venkat.puvvada 1.147
668 PEGASUS_ASSERT(op->_flags == ASYNC_OPFLAGS_UNKNOWN);
669 PEGASUS_ASSERT(op->_state == ASYNC_OPSTATE_UNKNOWN);
|
670 kumpf 1.131 op->_op_dest = MessageQueue::lookup(msg->dest);
671 if (op->_op_dest == 0)
672 {
673 return_op(op);
674 return false;
675 }
676
|
677 venkat.puvvada 1.147 op->_flags = ASYNC_OPFLAGS_FIRE_AND_FORGET;
678
|
679 kumpf 1.131 // now see if the meta dispatcher will take it
680 return _meta_dispatcher->route_async(op);
681 }
682
683
684 AsyncReply *MessageQueueService::SendWait(AsyncRequest* request)
685 {
686 if (request == 0)
687 return 0;
688
689 Boolean destroy_op = false;
690
691 if (request->op == 0)
692 {
693 request->op = get_op();
694 request->op->_request.reset(request);
695 destroy_op = true;
696 }
|
697 kumpf 1.159
|
698 venkat.puvvada 1.147 PEGASUS_ASSERT(request->op->_flags == ASYNC_OPFLAGS_UNKNOWN);
699 PEGASUS_ASSERT(request->op->_state == ASYNC_OPSTATE_UNKNOWN);
|
700 kumpf 1.131
701 request->block = false;
|
702 venkat.puvvada 1.147 _sendAsync(
|
703 kumpf 1.131 request->op,
704 request->dest,
705 _sendwait_callback,
706 this,
|
707 venkat.puvvada 1.147 (void *)0,
708 ASYNC_OPFLAGS_PSEUDO_CALLBACK);
|
709 kumpf 1.131
710 request->op->_client_sem.wait();
711
712 AsyncReply* rpl = static_cast<AsyncReply *>(request->op->removeResponse());
713 rpl->op = 0;
714
715 if (destroy_op == true)
716 {
717 request->op->_request.release();
718 return_op(request->op);
719 request->op = 0;
720 }
721 return rpl;
|
722 mday 1.1 }
723
|
724 kumpf 1.160 Uint32 MessageQueueService::find_service_qid(const char* name)
|
725 mday 1.1 {
|
726 kumpf 1.160 MessageQueue* queue = MessageQueue::lookup(name);
|
727 venkat.puvvada 1.146 PEGASUS_ASSERT(queue);
728 return queue->getQueueId();
|
729 mday 1.1 }
730
|
731 venkat.puvvada 1.158 void MessageQueueService::_removeFromPollingList(MessageQueueService *service)
732 {
733 _polling_list_mutex.lock();
734 _polling_list->remove(service);
735 _polling_list_mutex.unlock();
736 }
737
|
738 mday 1.1 PEGASUS_NAMESPACE_END
|