]> git.saurik.com Git - redis.git/blob - src/ae_evport.c
Test: more MIGRATE tests.
[redis.git] / src / ae_evport.c
1 /* ae.c module for illumos event ports.
2 *
3 * Copyright (c) 2012, Joyent, Inc. All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30
31 #include <assert.h>
32 #include <errno.h>
33 #include <port.h>
34 #include <poll.h>
35
36 #include <sys/types.h>
37 #include <sys/time.h>
38
39 #include <stdio.h>
40
41 static int evport_debug = 0;
42
43 /*
44 * This file implements the ae API using event ports, present on Solaris-based
45 * systems since Solaris 10. Using the event port interface, we associate file
46 * descriptors with the port. Each association also includes the set of poll(2)
47 * events that the consumer is interested in (e.g., POLLIN and POLLOUT).
48 *
49 * There's one tricky piece to this implementation: when we return events via
50 * aeApiPoll, the corresponding file descriptors become dissociated from the
51 * port. This is necessary because poll events are level-triggered, so if the
52 * fd didn't become dissociated, it would immediately fire another event since
53 * the underlying state hasn't changed yet. We must reassociate the file
54 * descriptor, but only after we know that our caller has actually read from it.
55 * The ae API does not tell us exactly when that happens, but we do know that
56 * it must happen by the time aeApiPoll is called again. Our solution is to
57 * keep track of the last fds returned by aeApiPoll and reassociate them next
58 * time aeApiPoll is invoked.
59 *
60 * To summarize, in this module, each fd association is EITHER (a) represented
61 * only via the in-kernel assocation OR (b) represented by pending_fds and
62 * pending_masks. (b) is only true for the last fds we returned from aeApiPoll,
63 * and only until we enter aeApiPoll again (at which point we restore the
64 * in-kernel association).
65 */
66 #define MAX_EVENT_BATCHSZ 512
67
68 typedef struct aeApiState {
69 int portfd; /* event port */
70 int npending; /* # of pending fds */
71 int pending_fds[MAX_EVENT_BATCHSZ]; /* pending fds */
72 int pending_masks[MAX_EVENT_BATCHSZ]; /* pending fds' masks */
73 } aeApiState;
74
75 static int aeApiCreate(aeEventLoop *eventLoop) {
76 int i;
77 aeApiState *state = zmalloc(sizeof(aeApiState));
78 if (!state) return -1;
79
80 state->portfd = port_create();
81 if (state->portfd == -1) {
82 zfree(state);
83 return -1;
84 }
85
86 state->npending = 0;
87
88 for (i = 0; i < MAX_EVENT_BATCHSZ; i++) {
89 state->pending_fds[i] = -1;
90 state->pending_masks[i] = AE_NONE;
91 }
92
93 eventLoop->apidata = state;
94 return 0;
95 }
96
97 static void aeApiFree(aeEventLoop *eventLoop) {
98 aeApiState *state = eventLoop->apidata;
99
100 close(state->portfd);
101 zfree(state);
102 }
103
104 static int aeApiLookupPending(aeApiState *state, int fd) {
105 int i;
106
107 for (i = 0; i < state->npending; i++) {
108 if (state->pending_fds[i] == fd)
109 return (i);
110 }
111
112 return (-1);
113 }
114
115 /*
116 * Helper function to invoke port_associate for the given fd and mask.
117 */
118 static int aeApiAssociate(const char *where, int portfd, int fd, int mask) {
119 int events = 0;
120 int rv, err;
121
122 if (mask & AE_READABLE)
123 events |= POLLIN;
124 if (mask & AE_WRITABLE)
125 events |= POLLOUT;
126
127 if (evport_debug)
128 fprintf(stderr, "%s: port_associate(%d, 0x%x) = ", where, fd, events);
129
130 rv = port_associate(portfd, PORT_SOURCE_FD, fd, events,
131 (void *)(uintptr_t)mask);
132 err = errno;
133
134 if (evport_debug)
135 fprintf(stderr, "%d (%s)\n", rv, rv == 0 ? "no error" : strerror(err));
136
137 if (rv == -1) {
138 fprintf(stderr, "%s: port_associate: %s\n", where, strerror(err));
139
140 if (err == EAGAIN)
141 fprintf(stderr, "aeApiAssociate: event port limit exceeded.");
142 }
143
144 return rv;
145 }
146
147 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
148 aeApiState *state = eventLoop->apidata;
149 int fullmask, pfd;
150
151 if (evport_debug)
152 fprintf(stderr, "aeApiAddEvent: fd %d mask 0x%x\n", fd, mask);
153
154 /*
155 * Since port_associate's "events" argument replaces any existing events, we
156 * must be sure to include whatever events are already associated when
157 * we call port_associate() again.
158 */
159 fullmask = mask | eventLoop->events[fd].mask;
160 pfd = aeApiLookupPending(state, fd);
161
162 if (pfd != -1) {
163 /*
164 * This fd was recently returned from aeApiPoll. It should be safe to
165 * assume that the consumer has processed that poll event, but we play
166 * it safer by simply updating pending_mask. The fd will be
167 * reassociated as usual when aeApiPoll is called again.
168 */
169 if (evport_debug)
170 fprintf(stderr, "aeApiAddEvent: adding to pending fd %d\n", fd);
171 state->pending_masks[pfd] |= fullmask;
172 return 0;
173 }
174
175 return (aeApiAssociate("aeApiAddEvent", state->portfd, fd, fullmask));
176 }
177
178 static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
179 aeApiState *state = eventLoop->apidata;
180 int fullmask, pfd;
181
182 if (evport_debug)
183 fprintf(stderr, "del fd %d mask 0x%x\n", fd, mask);
184
185 pfd = aeApiLookupPending(state, fd);
186
187 if (pfd != -1) {
188 if (evport_debug)
189 fprintf(stderr, "deleting event from pending fd %d\n", fd);
190
191 /*
192 * This fd was just returned from aeApiPoll, so it's not currently
193 * associated with the port. All we need to do is update
194 * pending_mask appropriately.
195 */
196 state->pending_masks[pfd] &= ~mask;
197
198 if (state->pending_masks[pfd] == AE_NONE)
199 state->pending_fds[pfd] = -1;
200
201 return;
202 }
203
204 /*
205 * The fd is currently associated with the port. Like with the add case
206 * above, we must look at the full mask for the file descriptor before
207 * updating that association. We don't have a good way of knowing what the
208 * events are without looking into the eventLoop state directly. We rely on
209 * the fact that our caller has already updated the mask in the eventLoop.
210 */
211
212 fullmask = eventLoop->events[fd].mask;
213 if (fullmask == AE_NONE) {
214 /*
215 * We're removing *all* events, so use port_dissociate to remove the
216 * association completely. Failure here indicates a bug.
217 */
218 if (evport_debug)
219 fprintf(stderr, "aeApiDelEvent: port_dissociate(%d)\n", fd);
220
221 if (port_dissociate(state->portfd, PORT_SOURCE_FD, fd) != 0) {
222 perror("aeApiDelEvent: port_dissociate");
223 abort(); /* will not return */
224 }
225 } else if (aeApiAssociate("aeApiDelEvent", state->portfd, fd,
226 fullmask) != 0) {
227 /*
228 * ENOMEM is a potentially transient condition, but the kernel won't
229 * generally return it unless things are really bad. EAGAIN indicates
230 * we've reached an resource limit, for which it doesn't make sense to
231 * retry (counterintuitively). All other errors indicate a bug. In any
232 * of these cases, the best we can do is to abort.
233 */
234 abort(); /* will not return */
235 }
236 }
237
238 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
239 aeApiState *state = eventLoop->apidata;
240 struct timespec timeout, *tsp;
241 int mask, i;
242 uint_t nevents;
243 port_event_t event[MAX_EVENT_BATCHSZ];
244
245 /*
246 * If we've returned fd events before, we must reassociate them with the
247 * port now, before calling port_get(). See the block comment at the top of
248 * this file for an explanation of why.
249 */
250 for (i = 0; i < state->npending; i++) {
251 if (state->pending_fds[i] == -1)
252 /* This fd has since been deleted. */
253 continue;
254
255 if (aeApiAssociate("aeApiPoll", state->portfd,
256 state->pending_fds[i], state->pending_masks[i]) != 0) {
257 /* See aeApiDelEvent for why this case is fatal. */
258 abort();
259 }
260
261 state->pending_masks[i] = AE_NONE;
262 state->pending_fds[i] = -1;
263 }
264
265 state->npending = 0;
266
267 if (tvp != NULL) {
268 timeout.tv_sec = tvp->tv_sec;
269 timeout.tv_nsec = tvp->tv_usec * 1000;
270 tsp = &timeout;
271 } else {
272 tsp = NULL;
273 }
274
275 /*
276 * port_getn can return with errno == ETIME having returned some events (!).
277 * So if we get ETIME, we check nevents, too.
278 */
279 nevents = 1;
280 if (port_getn(state->portfd, event, MAX_EVENT_BATCHSZ, &nevents,
281 tsp) == -1 && (errno != ETIME || nevents == 0)) {
282 if (errno == ETIME || errno == EINTR)
283 return 0;
284
285 /* Any other error indicates a bug. */
286 perror("aeApiPoll: port_get");
287 abort();
288 }
289
290 state->npending = nevents;
291
292 for (i = 0; i < nevents; i++) {
293 mask = 0;
294 if (event[i].portev_events & POLLIN)
295 mask |= AE_READABLE;
296 if (event[i].portev_events & POLLOUT)
297 mask |= AE_WRITABLE;
298
299 eventLoop->fired[i].fd = event[i].portev_object;
300 eventLoop->fired[i].mask = mask;
301
302 if (evport_debug)
303 fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n",
304 (int)event[i].portev_object, mask);
305
306 state->pending_fds[i] = event[i].portev_object;
307 state->pending_masks[i] = (uintptr_t)event[i].portev_user;
308 }
309
310 return nevents;
311 }
312
313 static char *aeApiName(void) {
314 return "evport";
315 }