1 mike 1.1.2.1 //%/////////////////////////////////////////////////////////////////////////////
2 //
3 // Copyright (c) 2000, 2001 The Open group, BMC Software, Tivoli Systems, IBM
4 //
5 // Permission is hereby granted, free of charge, to any person obtaining a copy
6 // of this software and associated documentation files (the "Software"), to
7 // deal in the Software without restriction, including without limitation the
8 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
9 // sell copies of the Software, and to permit persons to whom the Software is
10 // furnished to do so, subject to the following conditions:
11 //
12 // THE ABOVE COPYRIGHT NOTICE AND THIS PERMISSION NOTICE SHALL BE INCLUDED IN
13 // ALL COPIES OR SUBSTANTIAL PORTIONS OF THE SOFTWARE. THE SOFTWARE IS PROVIDED
14 // "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
15 // LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
16 // PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
17 // HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
18 // ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
19 // WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20 //
21 //==============================================================================
22 mike 1.1.2.1 //
23 // Author: Mike Brasher (mbrasher@bmc.com)
24 //
25 // Modified By:
26 //
27 //%/////////////////////////////////////////////////////////////////////////////
28
|
29 sage 1.1.2.10 #include <Pegasus/Common/Config.h>
|
30 mike 1.1.2.8 #include <cstring>
31 #include "Monitor.h"
32 #include "MessageQueue.h"
33 #include "Socket.h"
|
34 mike 1.1.2.5
35 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
36 mike 1.1.2.11 # if defined(FD_SETSIZE) && FD_SETSIZE != 1024
37 # error "FD_SETSIZE was not set to 1024 prior to the last inclusion \
38 of <winsock.h>. It may have been indirectly included (e.g., by including \
39 <windows.h>). Find the inclusion of that header which is visible to this \
40 compilation unit and #define FD_SETZIE to 1024 prior to that inclusion; \
41 otherwise, less than 64 clients (the default) will be able to connect to the \
42 CIMOM. PLEASE DO NOT SUPPRESS THIS WARNING; PLEASE FIX THE PROBLEM."
43 # endif
|
44 mike 1.1.2.3 # define FD_SETSIZE 1024
|
45 mday 1.1.2.9 # include <winsock2.h>
|
46 mike 1.1.2.3 #else
47 # include <sys/types.h>
48 # include <sys/socket.h>
49 # include <sys/time.h>
50 # include <netinet/in.h>
51 # include <netdb.h>
52 # include <arpa/inet.h>
53 # include <unistd.h>
54 #endif
55
|
56 mike 1.1.2.1 PEGASUS_USING_STD;
57
58 PEGASUS_NAMESPACE_BEGIN
59
60 ////////////////////////////////////////////////////////////////////////////////
61 //
62 // MonitorRep
63 //
64 ////////////////////////////////////////////////////////////////////////////////
65
66 struct MonitorRep
67 {
68 fd_set rd_fd_set;
69 fd_set wr_fd_set;
70 fd_set ex_fd_set;
71 fd_set active_rd_fd_set;
72 fd_set active_wr_fd_set;
73 fd_set active_ex_fd_set;
74 };
75
76 ////////////////////////////////////////////////////////////////////////////////
77 mike 1.1.2.1 //
78 // Monitor
79 //
80 ////////////////////////////////////////////////////////////////////////////////
81
82 Monitor::Monitor()
83 {
|
84 mike 1.1.2.8 Socket::initializeInterface();
|
85 mike 1.1.2.1
86 _rep = new MonitorRep;
87 FD_ZERO(&_rep->rd_fd_set);
88 FD_ZERO(&_rep->wr_fd_set);
89 FD_ZERO(&_rep->ex_fd_set);
90 FD_ZERO(&_rep->active_rd_fd_set);
91 FD_ZERO(&_rep->active_wr_fd_set);
92 FD_ZERO(&_rep->active_ex_fd_set);
93 }
94
95 Monitor::~Monitor()
96 {
|
97 mike 1.1.2.8 Socket::uninitializeInterface();
|
98 mike 1.1.2.1 }
99
|
100 mike 1.1.2.2 Boolean Monitor::run(Uint32 milliseconds)
|
101 mike 1.1.2.1 {
|
102 mike 1.1.2.5 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
103 mike 1.1.2.3
|
104 mike 1.1.2.1 // Windows select() has a strange little bug. It returns immediately if
105 // there are no descriptors in the set even if the timeout is non-zero.
106 // To work around this, we call Sleep() for now:
107
108 if (_entries.size() == 0)
109 Sleep(milliseconds);
110
|
111 mike 1.1.2.3 #endif
112
|
113 mike 1.1.2.1 // Check for events on the selected file descriptors. Only do this if
114 // there were no undispatched events from last time.
115
116 static int count = 0;
117
118 if (count == 0)
119 {
120 memcpy(&_rep->active_rd_fd_set, &_rep->rd_fd_set, sizeof(fd_set));
121 memcpy(&_rep->active_wr_fd_set, &_rep->wr_fd_set, sizeof(fd_set));
122 memcpy(&_rep->active_ex_fd_set, &_rep->ex_fd_set, sizeof(fd_set));
123
124 const Uint32 SEC = milliseconds / 1000;
125 const Uint32 USEC = (milliseconds % 1000) * 1000;
126 struct timeval tv = { SEC, USEC };
127
|
128 mike 1.1.2.2 count = select(
|
129 mike 1.1.2.1 FD_SETSIZE,
130 &_rep->active_rd_fd_set,
131 &_rep->active_wr_fd_set,
132 &_rep->active_ex_fd_set,
133 &tv);
134
135 if (count == 0)
136 return false;
|
137 mike 1.1.2.5 #ifdef PEGASUS_OS_TYPE_WINDOWS
|
138 mike 1.1.2.1 else if (count == SOCKET_ERROR)
|
139 mike 1.1.2.3 #else
140 else if (count == -1)
141 #endif
|
142 mike 1.1.2.1 {
143 count = 0;
144 return false;
145 }
146 }
147
148 for (Uint32 i = 0, n = _entries.size(); i < n; i++)
149 {
150 Sint32 socket = _entries[i].socket;
151 Uint32 events = 0;
152
153 if (FD_ISSET(socket, &_rep->active_rd_fd_set))
|
154 mike 1.1.2.2 events |= SocketMessage::READ;
|
155 mike 1.1.2.1
156 if (FD_ISSET(socket, &_rep->active_wr_fd_set))
|
157 mike 1.1.2.2 events |= SocketMessage::WRITE;
|
158 mike 1.1.2.1
159 if (FD_ISSET(socket, &_rep->active_ex_fd_set))
|
160 mike 1.1.2.2 events |= SocketMessage::EXCEPTION;
|
161 mike 1.1.2.1
162 if (events)
163 {
164 MessageQueue* queue = MessageQueue::lookup(_entries[i].queueId);
165
166 if (!queue)
167 unsolicitSocketMessages(_entries[i].queueId);
168
|
169 mike 1.1.2.6
|
170 mike 1.1.2.4 Message* message = new SocketMessage(socket, events);
171 queue->enqueue(message);
172
|
173 mike 1.1.2.2 if (events & SocketMessage::WRITE)
|
174 mike 1.1.2.1 {
175 FD_CLR(socket, &_rep->active_wr_fd_set);
176 }
177
|
178 mike 1.1.2.2 if (events & SocketMessage::EXCEPTION)
|
179 mike 1.1.2.1 {
180 FD_CLR(socket, &_rep->active_ex_fd_set);
181 }
182
|
183 mike 1.1.2.2 if (events & SocketMessage::READ)
|
184 mike 1.1.2.1 {
185 FD_CLR(socket, &_rep->active_rd_fd_set);
186 }
187
188 count--;
189 return true;
190 }
191 }
192
193 return false;
194 }
195
196 Boolean Monitor::solicitSocketMessages(
197 Sint32 socket,
198 Uint32 events,
|
199 mike 1.1.2.2 Uint32 queueId)
|
200 mike 1.1.2.1 {
201 // See whether a handler is already registered for this one:
202
203 Uint32 pos = _findEntry(socket);
204
|
205 mike 1.1.2.2 if (pos != PEGASUS_NOT_FOUND)
|
206 mike 1.1.2.1 return false;
207
208 // Set the events:
209
|
210 mike 1.1.2.2 if (events & SocketMessage::READ)
|
211 mike 1.1.2.1 FD_SET(socket, &_rep->rd_fd_set);
212
|
213 mike 1.1.2.2 if (events & SocketMessage::WRITE)
|
214 mike 1.1.2.1 FD_SET(socket, &_rep->wr_fd_set);
215
|
216 mike 1.1.2.2 if (events & SocketMessage::EXCEPTION)
|
217 mike 1.1.2.1 FD_SET(socket, &_rep->ex_fd_set);
218
219 // Add the entry to the list:
220
221 _MonitorEntry entry = { socket, queueId };
222 _entries.append(entry);
223
224 // Success!
225
226 return true;
227 }
228
|
229 mike 1.1.2.2 Boolean Monitor::unsolicitSocketMessages(Sint32 socket)
|
230 mike 1.1.2.1 {
231 // Look for the given entry and remove it:
232
233 for (Uint32 i = 0, n = _entries.size(); i < n; i++)
234 {
235 if (_entries[i].socket == socket)
236 {
237 Sint32 socket = _entries[i].socket;
238 FD_CLR(socket, &_rep->rd_fd_set);
239 FD_CLR(socket, &_rep->wr_fd_set);
240 FD_CLR(socket, &_rep->ex_fd_set);
241 _entries.remove(i);
242 return true;
243 }
244 }
245
246 return false;
247 }
248
249 Uint32 Monitor::_findEntry(Sint32 socket) const
250 {
251 mike 1.1.2.1 for (Uint32 i = 0, n = _entries.size(); i < n; i++)
252 {
253 if (_entries[i].socket == socket)
254 return i;
255 }
256
257 return PEG_NOT_FOUND;
258 }
259
260 PEGASUS_NAMESPACE_END
|