]>
git.saurik.com Git - redis.git/blob - src/ae_evport.c
1 /* ae.c module for illumos event ports.
3 * Copyright (c) 2012, Joyent, Inc. All rights reserved.
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
36 #include <sys/types.h>
41 static int evport_debug
= 0;
44 * This file implements the ae API using event ports, present on Solaris-based
45 * systems since Solaris 10. Using the event port interface, we associate file
46 * descriptors with the port. Each association also includes the set of poll(2)
47 * events that the consumer is interested in (e.g., POLLIN and POLLOUT).
49 * There's one tricky piece to this implementation: when we return events via
50 * aeApiPoll, the corresponding file descriptors become dissociated from the
51 * port. This is necessary because poll events are level-triggered, so if the
52 * fd didn't become dissociated, it would immediately fire another event since
53 * the underlying state hasn't changed yet. We must reassociate the file
54 * descriptor, but only after we know that our caller has actually read from it.
55 * The ae API does not tell us exactly when that happens, but we do know that
56 * it must happen by the time aeApiPoll is called again. Our solution is to
57 * keep track of the last fds returned by aeApiPoll and reassociate them next
58 * time aeApiPoll is invoked.
60 * To summarize, in this module, each fd association is EITHER (a) represented
61 * only via the in-kernel assocation OR (b) represented by pending_fds and
62 * pending_masks. (b) is only true for the last fds we returned from aeApiPoll,
63 * and only until we enter aeApiPoll again (at which point we restore the
64 * in-kernel association).
66 #define MAX_EVENT_BATCHSZ 512
68 typedef struct aeApiState
{
69 int portfd
; /* event port */
70 int npending
; /* # of pending fds */
71 int pending_fds
[MAX_EVENT_BATCHSZ
]; /* pending fds */
72 int pending_masks
[MAX_EVENT_BATCHSZ
]; /* pending fds' masks */
75 static int aeApiCreate(aeEventLoop
*eventLoop
) {
77 aeApiState
*state
= zmalloc(sizeof(aeApiState
));
78 if (!state
) return -1;
80 state
->portfd
= port_create();
81 if (state
->portfd
== -1) {
88 for (i
= 0; i
< MAX_EVENT_BATCHSZ
; i
++) {
89 state
->pending_fds
[i
] = -1;
90 state
->pending_masks
[i
] = AE_NONE
;
93 eventLoop
->apidata
= state
;
97 static void aeApiFree(aeEventLoop
*eventLoop
) {
98 aeApiState
*state
= eventLoop
->apidata
;
100 close(state
->portfd
);
104 static int aeApiLookupPending(aeApiState
*state
, int fd
) {
107 for (i
= 0; i
< state
->npending
; i
++) {
108 if (state
->pending_fds
[i
] == fd
)
116 * Helper function to invoke port_associate for the given fd and mask.
118 static int aeApiAssociate(const char *where
, int portfd
, int fd
, int mask
) {
122 if (mask
& AE_READABLE
)
124 if (mask
& AE_WRITABLE
)
128 fprintf(stderr
, "%s: port_associate(%d, 0x%x) = ", where
, fd
, events
);
130 rv
= port_associate(portfd
, PORT_SOURCE_FD
, fd
, events
,
131 (void *)(uintptr_t)mask
);
135 fprintf(stderr
, "%d (%s)\n", rv
, rv
== 0 ? "no error" : strerror(err
));
138 fprintf(stderr
, "%s: port_associate: %s\n", where
, strerror(err
));
141 fprintf(stderr
, "aeApiAssociate: event port limit exceeded.");
147 static int aeApiAddEvent(aeEventLoop
*eventLoop
, int fd
, int mask
) {
148 aeApiState
*state
= eventLoop
->apidata
;
152 fprintf(stderr
, "aeApiAddEvent: fd %d mask 0x%x\n", fd
, mask
);
155 * Since port_associate's "events" argument replaces any existing events, we
156 * must be sure to include whatever events are already associated when
157 * we call port_associate() again.
159 fullmask
= mask
| eventLoop
->events
[fd
].mask
;
160 pfd
= aeApiLookupPending(state
, fd
);
164 * This fd was recently returned from aeApiPoll. It should be safe to
165 * assume that the consumer has processed that poll event, but we play
166 * it safer by simply updating pending_mask. The fd will be
167 * reassociated as usual when aeApiPoll is called again.
170 fprintf(stderr
, "aeApiAddEvent: adding to pending fd %d\n", fd
);
171 state
->pending_masks
[pfd
] |= fullmask
;
175 return (aeApiAssociate("aeApiAddEvent", state
->portfd
, fd
, fullmask
));
178 static void aeApiDelEvent(aeEventLoop
*eventLoop
, int fd
, int mask
) {
179 aeApiState
*state
= eventLoop
->apidata
;
183 fprintf(stderr
, "del fd %d mask 0x%x\n", fd
, mask
);
185 pfd
= aeApiLookupPending(state
, fd
);
189 fprintf(stderr
, "deleting event from pending fd %d\n", fd
);
192 * This fd was just returned from aeApiPoll, so it's not currently
193 * associated with the port. All we need to do is update
194 * pending_mask appropriately.
196 state
->pending_masks
[pfd
] &= ~mask
;
198 if (state
->pending_masks
[pfd
] == AE_NONE
)
199 state
->pending_fds
[pfd
] = -1;
205 * The fd is currently associated with the port. Like with the add case
206 * above, we must look at the full mask for the file descriptor before
207 * updating that association. We don't have a good way of knowing what the
208 * events are without looking into the eventLoop state directly. We rely on
209 * the fact that our caller has already updated the mask in the eventLoop.
212 fullmask
= eventLoop
->events
[fd
].mask
;
213 if (fullmask
== AE_NONE
) {
215 * We're removing *all* events, so use port_dissociate to remove the
216 * association completely. Failure here indicates a bug.
219 fprintf(stderr
, "aeApiDelEvent: port_dissociate(%d)\n", fd
);
221 if (port_dissociate(state
->portfd
, PORT_SOURCE_FD
, fd
) != 0) {
222 perror("aeApiDelEvent: port_dissociate");
223 abort(); /* will not return */
225 } else if (aeApiAssociate("aeApiDelEvent", state
->portfd
, fd
,
228 * ENOMEM is a potentially transient condition, but the kernel won't
229 * generally return it unless things are really bad. EAGAIN indicates
230 * we've reached an resource limit, for which it doesn't make sense to
231 * retry (counterintuitively). All other errors indicate a bug. In any
232 * of these cases, the best we can do is to abort.
234 abort(); /* will not return */
238 static int aeApiPoll(aeEventLoop
*eventLoop
, struct timeval
*tvp
) {
239 aeApiState
*state
= eventLoop
->apidata
;
240 struct timespec timeout
, *tsp
;
243 port_event_t event
[MAX_EVENT_BATCHSZ
];
246 * If we've returned fd events before, we must reassociate them with the
247 * port now, before calling port_get(). See the block comment at the top of
248 * this file for an explanation of why.
250 for (i
= 0; i
< state
->npending
; i
++) {
251 if (state
->pending_fds
[i
] == -1)
252 /* This fd has since been deleted. */
255 if (aeApiAssociate("aeApiPoll", state
->portfd
,
256 state
->pending_fds
[i
], state
->pending_masks
[i
]) != 0) {
257 /* See aeApiDelEvent for why this case is fatal. */
261 state
->pending_masks
[i
] = AE_NONE
;
262 state
->pending_fds
[i
] = -1;
268 timeout
.tv_sec
= tvp
->tv_sec
;
269 timeout
.tv_nsec
= tvp
->tv_usec
* 1000;
276 * port_getn can return with errno == ETIME having returned some events (!).
277 * So if we get ETIME, we check nevents, too.
280 if (port_getn(state
->portfd
, event
, MAX_EVENT_BATCHSZ
, &nevents
,
281 tsp
) == -1 && (errno
!= ETIME
|| nevents
== 0)) {
282 if (errno
== ETIME
|| errno
== EINTR
)
285 /* Any other error indicates a bug. */
286 perror("aeApiPoll: port_get");
290 state
->npending
= nevents
;
292 for (i
= 0; i
< nevents
; i
++) {
294 if (event
[i
].portev_events
& POLLIN
)
296 if (event
[i
].portev_events
& POLLOUT
)
299 eventLoop
->fired
[i
].fd
= event
[i
].portev_object
;
300 eventLoop
->fired
[i
].mask
= mask
;
303 fprintf(stderr
, "aeApiPoll: fd %d mask 0x%x\n",
304 (int)event
[i
].portev_object
, mask
);
306 state
->pending_fds
[i
] = event
[i
].portev_object
;
307 state
->pending_masks
[i
] = (uintptr_t)event
[i
].portev_user
;
313 static char *aeApiName(void) {