]> git.saurik.com Git - redis.git/blame - src/ae_evport.c
Allow Pub/Sub in contexts where other commands are blocked.
[redis.git] / src / ae_evport.c
CommitLineData
05da63da
DP
1/* ae.c module for illumos event ports.
2 * Copyright (c) 2012, Joyent, Inc. All rights reserved.
3 * Released under the BSD license. See the COPYING file for more info. */
4
5#include <assert.h>
6#include <errno.h>
7#include <port.h>
8#include <poll.h>
9
10#include <sys/types.h>
11#include <sys/time.h>
12
13#include <stdio.h>
14
6d312770 15static int evport_debug = 0;
05da63da
DP
16
17/*
18 * This file implements the ae API using event ports, present on Solaris-based
19 * systems since Solaris 10. Using the event port interface, we associate file
20 * descriptors with the port. Each association also includes the set of poll(2)
21 * events that the consumer is interested in (e.g., POLLIN and POLLOUT).
22 *
23 * There's one tricky piece to this implementation: when we return events via
6d312770 24 * aeApiPoll, the corresponding file descriptors become dissociated from the
05da63da
DP
25 * port. This is necessary because poll events are level-triggered, so if the
26 * fd didn't become dissociated, it would immediately fire another event since
27 * the underlying state hasn't changed yet. We must reassociate the file
28 * descriptor, but only after we know that our caller has actually read from it.
29 * The ae API does not tell us exactly when that happens, but we do know that
30 * it must happen by the time aeApiPoll is called again. Our solution is to
6d312770
DP
31 * keep track of the last fds returned by aeApiPoll and reassociate them next
32 * time aeApiPoll is invoked.
05da63da
DP
33 *
34 * To summarize, in this module, each fd association is EITHER (a) represented
6d312770
DP
35 * only via the in-kernel assocation OR (b) represented by pending_fds and
36 * pending_masks. (b) is only true for the last fds we returned from aeApiPoll,
05da63da
DP
37 * and only until we enter aeApiPoll again (at which point we restore the
38 * in-kernel association).
05da63da 39 */
6d312770
DP
40#define MAX_EVENT_BATCHSZ 512
41
05da63da 42typedef struct aeApiState {
6d312770
DP
43 int portfd; /* event port */
44 int npending; /* # of pending fds */
45 int pending_fds[MAX_EVENT_BATCHSZ]; /* pending fds */
46 int pending_masks[MAX_EVENT_BATCHSZ]; /* pending fds' masks */
05da63da
DP
47} aeApiState;
48
49static int aeApiCreate(aeEventLoop *eventLoop) {
6d312770 50 int i;
05da63da
DP
51 aeApiState *state = zmalloc(sizeof(aeApiState));
52 if (!state) return -1;
53
54 state->portfd = port_create();
55 if (state->portfd == -1) {
56 zfree(state);
57 return -1;
58 }
59
6d312770
DP
60 state->npending = 0;
61
62 for (i = 0; i < MAX_EVENT_BATCHSZ; i++) {
63 state->pending_fds[i] = -1;
64 state->pending_masks[i] = AE_NONE;
65 }
05da63da
DP
66
67 eventLoop->apidata = state;
3c72c94a 68 return 0;
05da63da
DP
69}
70
71static void aeApiFree(aeEventLoop *eventLoop) {
72 aeApiState *state = eventLoop->apidata;
73
74 close(state->portfd);
75 zfree(state);
76}
77
6d312770
DP
78static int aeApiLookupPending(aeApiState *state, int fd) {
79 int i;
80
81 for (i = 0; i < state->npending; i++) {
82 if (state->pending_fds[i] == fd)
83 return (i);
84 }
85
86 return (-1);
87}
88
05da63da
DP
89/*
90 * Helper function to invoke port_associate for the given fd and mask.
91 */
6d312770 92static int aeApiAssociate(const char *where, int portfd, int fd, int mask) {
05da63da
DP
93 int events = 0;
94 int rv, err;
95
96 if (mask & AE_READABLE)
97 events |= POLLIN;
98 if (mask & AE_WRITABLE)
99 events |= POLLOUT;
100
101 if (evport_debug)
102 fprintf(stderr, "%s: port_associate(%d, 0x%x) = ", where, fd, events);
103
6d312770
DP
104 rv = port_associate(portfd, PORT_SOURCE_FD, fd, events,
105 (void *)(uintptr_t)mask);
05da63da
DP
106 err = errno;
107
108 if (evport_debug)
109 fprintf(stderr, "%d (%s)\n", rv, rv == 0 ? "no error" : strerror(err));
110
111 if (rv == -1) {
112 fprintf(stderr, "%s: port_associate: %s\n", where, strerror(err));
113
114 if (err == EAGAIN)
115 fprintf(stderr, "aeApiAssociate: event port limit exceeded.");
116 }
117
118 return rv;
119}
120
121static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
122 aeApiState *state = eventLoop->apidata;
6d312770 123 int fullmask, pfd;
05da63da
DP
124
125 if (evport_debug)
126 fprintf(stderr, "aeApiAddEvent: fd %d mask 0x%x\n", fd, mask);
127
128 /*
129 * Since port_associate's "events" argument replaces any existing events, we
130 * must be sure to include whatever events are already associated when
131 * we call port_associate() again.
132 */
133 fullmask = mask | eventLoop->events[fd].mask;
6d312770 134 pfd = aeApiLookupPending(state, fd);
05da63da 135
6d312770 136 if (pfd != -1) {
05da63da
DP
137 /*
138 * This fd was recently returned from aeApiPoll. It should be safe to
139 * assume that the consumer has processed that poll event, but we play
140 * it safer by simply updating pending_mask. The fd will be
141 * reassociated as usual when aeApiPoll is called again.
142 */
143 if (evport_debug)
144 fprintf(stderr, "aeApiAddEvent: adding to pending fd %d\n", fd);
6d312770 145 state->pending_masks[pfd] |= fullmask;
05da63da
DP
146 return 0;
147 }
148
149 return (aeApiAssociate("aeApiAddEvent", state->portfd, fd, fullmask));
150}
151
152static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
153 aeApiState *state = eventLoop->apidata;
6d312770 154 int fullmask, pfd;
05da63da
DP
155
156 if (evport_debug)
157 fprintf(stderr, "del fd %d mask 0x%x\n", fd, mask);
158
6d312770
DP
159 pfd = aeApiLookupPending(state, fd);
160
161 if (pfd != -1) {
05da63da
DP
162 if (evport_debug)
163 fprintf(stderr, "deleting event from pending fd %d\n", fd);
164
165 /*
166 * This fd was just returned from aeApiPoll, so it's not currently
167 * associated with the port. All we need to do is update
168 * pending_mask appropriately.
169 */
6d312770 170 state->pending_masks[pfd] &= ~mask;
05da63da 171
6d312770
DP
172 if (state->pending_masks[pfd] == AE_NONE)
173 state->pending_fds[pfd] = -1;
05da63da
DP
174
175 return;
176 }
177
178 /*
179 * The fd is currently associated with the port. Like with the add case
180 * above, we must look at the full mask for the file descriptor before
181 * updating that association. We don't have a good way of knowing what the
182 * events are without looking into the eventLoop state directly. We rely on
183 * the fact that our caller has already updated the mask in the eventLoop.
184 */
185
186 fullmask = eventLoop->events[fd].mask;
187 if (fullmask == AE_NONE) {
188 /*
189 * We're removing *all* events, so use port_dissociate to remove the
190 * association completely. Failure here indicates a bug.
191 */
192 if (evport_debug)
193 fprintf(stderr, "aeApiDelEvent: port_dissociate(%d)\n", fd);
194
195 if (port_dissociate(state->portfd, PORT_SOURCE_FD, fd) != 0) {
196 perror("aeApiDelEvent: port_dissociate");
197 abort(); /* will not return */
198 }
199 } else if (aeApiAssociate("aeApiDelEvent", state->portfd, fd,
200 fullmask) != 0) {
201 /*
202 * ENOMEM is a potentially transient condition, but the kernel won't
203 * generally return it unless things are really bad. EAGAIN indicates
204 * we've reached an resource limit, for which it doesn't make sense to
205 * retry (counterintuitively). All other errors indicate a bug. In any
206 * of these cases, the best we can do is to abort.
207 */
208 abort(); /* will not return */
209 }
210}
211
212static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
213 aeApiState *state = eventLoop->apidata;
214 struct timespec timeout, *tsp;
6d312770
DP
215 int mask, i;
216 uint_t nevents;
217 port_event_t event[MAX_EVENT_BATCHSZ];
05da63da
DP
218
219 /*
6d312770
DP
220 * If we've returned fd events before, we must reassociate them with the
221 * port now, before calling port_get(). See the block comment at the top of
222 * this file for an explanation of why.
05da63da 223 */
6d312770
DP
224 for (i = 0; i < state->npending; i++) {
225 if (state->pending_fds[i] == -1)
226 /* This fd has since been deleted. */
227 continue;
228
229 if (aeApiAssociate("aeApiPoll", state->portfd,
230 state->pending_fds[i], state->pending_masks[i]) != 0) {
05da63da
DP
231 /* See aeApiDelEvent for why this case is fatal. */
232 abort();
233 }
234
6d312770
DP
235 state->pending_masks[i] = AE_NONE;
236 state->pending_fds[i] = -1;
05da63da
DP
237 }
238
6d312770
DP
239 state->npending = 0;
240
05da63da
DP
241 if (tvp != NULL) {
242 timeout.tv_sec = tvp->tv_sec;
243 timeout.tv_nsec = tvp->tv_usec * 1000;
244 tsp = &timeout;
245 } else {
246 tsp = NULL;
247 }
248
6d312770 249 /*
3c72c94a 250 * port_getn can return with errno == ETIME having returned some events (!).
6d312770
DP
251 * So if we get ETIME, we check nevents, too.
252 */
253 nevents = 1;
254 if (port_getn(state->portfd, event, MAX_EVENT_BATCHSZ, &nevents,
255 tsp) == -1 && (errno != ETIME || nevents == 0)) {
05da63da
DP
256 if (errno == ETIME || errno == EINTR)
257 return 0;
258
259 /* Any other error indicates a bug. */
260 perror("aeApiPoll: port_get");
261 abort();
262 }
263
6d312770 264 state->npending = nevents;
05da63da 265
6d312770
DP
266 for (i = 0; i < nevents; i++) {
267 mask = 0;
268 if (event[i].portev_events & POLLIN)
269 mask |= AE_READABLE;
270 if (event[i].portev_events & POLLOUT)
271 mask |= AE_WRITABLE;
05da63da 272
6d312770
DP
273 eventLoop->fired[i].fd = event[i].portev_object;
274 eventLoop->fired[i].mask = mask;
05da63da 275
6d312770
DP
276 if (evport_debug)
277 fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n",
278 (int)event[i].portev_object, mask);
279
280 state->pending_fds[i] = event[i].portev_object;
281 state->pending_masks[i] = (uintptr_t)event[i].portev_user;
282 }
05da63da 283
6d312770 284 return nevents;
05da63da
DP
285}
286
287static char *aeApiName(void) {
288 return "evport";
289}