]>
git.saurik.com Git - redis.git/blob - src/ae_evport.c
1 /* ae.c module for illumos event ports.
2 * Copyright (c) 2012, Joyent, Inc. All rights reserved.
3 * Released under the BSD license. See the COPYING file for more info. */
10 #include <sys/types.h>
15 static int evport_debug
= 0;
18 * This file implements the ae API using event ports, present on Solaris-based
19 * systems since Solaris 10. Using the event port interface, we associate file
20 * descriptors with the port. Each association also includes the set of poll(2)
21 * events that the consumer is interested in (e.g., POLLIN and POLLOUT).
23 * There's one tricky piece to this implementation: when we return events via
24 * aeApiPoll, the corresponding file descriptors become dissociated from the
25 * port. This is necessary because poll events are level-triggered, so if the
26 * fd didn't become dissociated, it would immediately fire another event since
27 * the underlying state hasn't changed yet. We must reassociate the file
28 * descriptor, but only after we know that our caller has actually read from it.
29 * The ae API does not tell us exactly when that happens, but we do know that
30 * it must happen by the time aeApiPoll is called again. Our solution is to
31 * keep track of the last fds returned by aeApiPoll and reassociate them next
32 * time aeApiPoll is invoked.
34 * To summarize, in this module, each fd association is EITHER (a) represented
35 * only via the in-kernel assocation OR (b) represented by pending_fds and
36 * pending_masks. (b) is only true for the last fds we returned from aeApiPoll,
37 * and only until we enter aeApiPoll again (at which point we restore the
38 * in-kernel association).
40 #define MAX_EVENT_BATCHSZ 512
42 typedef struct aeApiState
{
43 int portfd
; /* event port */
44 int npending
; /* # of pending fds */
45 int pending_fds
[MAX_EVENT_BATCHSZ
]; /* pending fds */
46 int pending_masks
[MAX_EVENT_BATCHSZ
]; /* pending fds' masks */
49 static int aeApiCreate(aeEventLoop
*eventLoop
) {
51 aeApiState
*state
= zmalloc(sizeof(aeApiState
));
52 if (!state
) return -1;
54 state
->portfd
= port_create();
55 if (state
->portfd
== -1) {
62 for (i
= 0; i
< MAX_EVENT_BATCHSZ
; i
++) {
63 state
->pending_fds
[i
] = -1;
64 state
->pending_masks
[i
] = AE_NONE
;
67 eventLoop
->apidata
= state
;
71 static void aeApiFree(aeEventLoop
*eventLoop
) {
72 aeApiState
*state
= eventLoop
->apidata
;
78 static int aeApiLookupPending(aeApiState
*state
, int fd
) {
81 for (i
= 0; i
< state
->npending
; i
++) {
82 if (state
->pending_fds
[i
] == fd
)
90 * Helper function to invoke port_associate for the given fd and mask.
92 static int aeApiAssociate(const char *where
, int portfd
, int fd
, int mask
) {
96 if (mask
& AE_READABLE
)
98 if (mask
& AE_WRITABLE
)
102 fprintf(stderr
, "%s: port_associate(%d, 0x%x) = ", where
, fd
, events
);
104 rv
= port_associate(portfd
, PORT_SOURCE_FD
, fd
, events
,
105 (void *)(uintptr_t)mask
);
109 fprintf(stderr
, "%d (%s)\n", rv
, rv
== 0 ? "no error" : strerror(err
));
112 fprintf(stderr
, "%s: port_associate: %s\n", where
, strerror(err
));
115 fprintf(stderr
, "aeApiAssociate: event port limit exceeded.");
121 static int aeApiAddEvent(aeEventLoop
*eventLoop
, int fd
, int mask
) {
122 aeApiState
*state
= eventLoop
->apidata
;
126 fprintf(stderr
, "aeApiAddEvent: fd %d mask 0x%x\n", fd
, mask
);
129 * Since port_associate's "events" argument replaces any existing events, we
130 * must be sure to include whatever events are already associated when
131 * we call port_associate() again.
133 fullmask
= mask
| eventLoop
->events
[fd
].mask
;
134 pfd
= aeApiLookupPending(state
, fd
);
138 * This fd was recently returned from aeApiPoll. It should be safe to
139 * assume that the consumer has processed that poll event, but we play
140 * it safer by simply updating pending_mask. The fd will be
141 * reassociated as usual when aeApiPoll is called again.
144 fprintf(stderr
, "aeApiAddEvent: adding to pending fd %d\n", fd
);
145 state
->pending_masks
[pfd
] |= fullmask
;
149 return (aeApiAssociate("aeApiAddEvent", state
->portfd
, fd
, fullmask
));
152 static void aeApiDelEvent(aeEventLoop
*eventLoop
, int fd
, int mask
) {
153 aeApiState
*state
= eventLoop
->apidata
;
157 fprintf(stderr
, "del fd %d mask 0x%x\n", fd
, mask
);
159 pfd
= aeApiLookupPending(state
, fd
);
163 fprintf(stderr
, "deleting event from pending fd %d\n", fd
);
166 * This fd was just returned from aeApiPoll, so it's not currently
167 * associated with the port. All we need to do is update
168 * pending_mask appropriately.
170 state
->pending_masks
[pfd
] &= ~mask
;
172 if (state
->pending_masks
[pfd
] == AE_NONE
)
173 state
->pending_fds
[pfd
] = -1;
179 * The fd is currently associated with the port. Like with the add case
180 * above, we must look at the full mask for the file descriptor before
181 * updating that association. We don't have a good way of knowing what the
182 * events are without looking into the eventLoop state directly. We rely on
183 * the fact that our caller has already updated the mask in the eventLoop.
186 fullmask
= eventLoop
->events
[fd
].mask
;
187 if (fullmask
== AE_NONE
) {
189 * We're removing *all* events, so use port_dissociate to remove the
190 * association completely. Failure here indicates a bug.
193 fprintf(stderr
, "aeApiDelEvent: port_dissociate(%d)\n", fd
);
195 if (port_dissociate(state
->portfd
, PORT_SOURCE_FD
, fd
) != 0) {
196 perror("aeApiDelEvent: port_dissociate");
197 abort(); /* will not return */
199 } else if (aeApiAssociate("aeApiDelEvent", state
->portfd
, fd
,
202 * ENOMEM is a potentially transient condition, but the kernel won't
203 * generally return it unless things are really bad. EAGAIN indicates
204 * we've reached an resource limit, for which it doesn't make sense to
205 * retry (counterintuitively). All other errors indicate a bug. In any
206 * of these cases, the best we can do is to abort.
208 abort(); /* will not return */
212 static int aeApiPoll(aeEventLoop
*eventLoop
, struct timeval
*tvp
) {
213 aeApiState
*state
= eventLoop
->apidata
;
214 struct timespec timeout
, *tsp
;
217 port_event_t event
[MAX_EVENT_BATCHSZ
];
220 * If we've returned fd events before, we must reassociate them with the
221 * port now, before calling port_get(). See the block comment at the top of
222 * this file for an explanation of why.
224 for (i
= 0; i
< state
->npending
; i
++) {
225 if (state
->pending_fds
[i
] == -1)
226 /* This fd has since been deleted. */
229 if (aeApiAssociate("aeApiPoll", state
->portfd
,
230 state
->pending_fds
[i
], state
->pending_masks
[i
]) != 0) {
231 /* See aeApiDelEvent for why this case is fatal. */
235 state
->pending_masks
[i
] = AE_NONE
;
236 state
->pending_fds
[i
] = -1;
242 timeout
.tv_sec
= tvp
->tv_sec
;
243 timeout
.tv_nsec
= tvp
->tv_usec
* 1000;
250 * port_getn can return with errno == ETIME having returned some events (!).
251 * So if we get ETIME, we check nevents, too.
254 if (port_getn(state
->portfd
, event
, MAX_EVENT_BATCHSZ
, &nevents
,
255 tsp
) == -1 && (errno
!= ETIME
|| nevents
== 0)) {
256 if (errno
== ETIME
|| errno
== EINTR
)
259 /* Any other error indicates a bug. */
260 perror("aeApiPoll: port_get");
264 state
->npending
= nevents
;
266 for (i
= 0; i
< nevents
; i
++) {
268 if (event
[i
].portev_events
& POLLIN
)
270 if (event
[i
].portev_events
& POLLOUT
)
273 eventLoop
->fired
[i
].fd
= event
[i
].portev_object
;
274 eventLoop
->fired
[i
].mask
= mask
;
277 fprintf(stderr
, "aeApiPoll: fd %d mask 0x%x\n",
278 (int)event
[i
].portev_object
, mask
);
280 state
->pending_fds
[i
] = event
[i
].portev_object
;
281 state
->pending_masks
[i
] = (uintptr_t)event
[i
].portev_user
;
287 static char *aeApiName(void) {