#include <stdio.h>
-int evport_debug = 0;
+static int evport_debug = 0;
/*
* This file implements the ae API using event ports, present on Solaris-based
* events that the consumer is interested in (e.g., POLLIN and POLLOUT).
*
* There's one tricky piece to this implementation: when we return events via
- * aeApiPoll, the corresponding file descriptor becomes dissociated from the
+ * aeApiPoll, the corresponding file descriptors become dissociated from the
* port. This is necessary because poll events are level-triggered, so if the
* fd didn't become dissociated, it would immediately fire another event since
* the underlying state hasn't changed yet. We must reassociate the file
* descriptor, but only after we know that our caller has actually read from it.
* The ae API does not tell us exactly when that happens, but we do know that
* it must happen by the time aeApiPoll is called again. Our solution is to
- * keep track of the last fd returned by aeApiPoll and reassociate it next time
- * aeApiPoll is invoked.
+ * keep track of the last fds returned by aeApiPoll and reassociate them next
+ * time aeApiPoll is invoked.
*
* To summarize, in this module, each fd association is EITHER (a) represented
- * only via the in-kernel assocation OR (b) represented by pending_fd and
- * pending_mask. (b) is only true for the last fd we returned from aeApiPoll,
+ * only via the in-kernel assocation OR (b) represented by pending_fds and
+ * pending_masks. (b) is only true for the last fds we returned from aeApiPoll,
* and only until we enter aeApiPoll again (at which point we restore the
* in-kernel association).
- *
- * We currently only return one fd event per call to aeApiPoll. This could be
- * extended to return more than one by extending the corresponding pending
- * fields and using port_getn().
*/
+#define MAX_EVENT_BATCHSZ 512
+
typedef struct aeApiState {
- int portfd; /* event port */
- int pending_fd; /* pending fd */
- int pending_mask; /* pending fd's mask */
+ int portfd; /* event port */
+ int npending; /* # of pending fds */
+ int pending_fds[MAX_EVENT_BATCHSZ]; /* pending fds */
+ int pending_masks[MAX_EVENT_BATCHSZ]; /* pending fds' masks */
} aeApiState;
static int aeApiCreate(aeEventLoop *eventLoop) {
+ int i;
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
return -1;
}
- state->pending_fd = -1;
- state->pending_mask = AE_NONE;
+ state->npending = 0;
+
+ for (i = 0; i < MAX_EVENT_BATCHSZ; i++) {
+ state->pending_fds[i] = -1;
+ state->pending_masks[i] = AE_NONE;
+ }
eventLoop->apidata = state;
- return 0;
+ return 0;
}
static void aeApiFree(aeEventLoop *eventLoop) {
zfree(state);
}
+static int aeApiLookupPending(aeApiState *state, int fd) {
+ int i;
+
+ for (i = 0; i < state->npending; i++) {
+ if (state->pending_fds[i] == fd)
+ return (i);
+ }
+
+ return (-1);
+}
+
/*
* Helper function to invoke port_associate for the given fd and mask.
*/
-static int aeApiAssociate(const char *where, int portfd, int fd, int mask)
-{
+static int aeApiAssociate(const char *where, int portfd, int fd, int mask) {
int events = 0;
int rv, err;
if (evport_debug)
fprintf(stderr, "%s: port_associate(%d, 0x%x) = ", where, fd, events);
- rv = port_associate(portfd, PORT_SOURCE_FD, fd, events, (void *)mask);
+ rv = port_associate(portfd, PORT_SOURCE_FD, fd, events,
+ (void *)(uintptr_t)mask);
err = errno;
if (evport_debug)
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
- int fullmask;
+ int fullmask, pfd;
if (evport_debug)
fprintf(stderr, "aeApiAddEvent: fd %d mask 0x%x\n", fd, mask);
* we call port_associate() again.
*/
fullmask = mask | eventLoop->events[fd].mask;
+ pfd = aeApiLookupPending(state, fd);
- if (fd == state->pending_fd) {
+ if (pfd != -1) {
/*
* This fd was recently returned from aeApiPoll. It should be safe to
* assume that the consumer has processed that poll event, but we play
*/
if (evport_debug)
fprintf(stderr, "aeApiAddEvent: adding to pending fd %d\n", fd);
- state->pending_mask |= fullmask;
+ state->pending_masks[pfd] |= fullmask;
return 0;
}
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
- int fullmask;
+ int fullmask, pfd;
if (evport_debug)
fprintf(stderr, "del fd %d mask 0x%x\n", fd, mask);
- if (fd == state->pending_fd) {
+ pfd = aeApiLookupPending(state, fd);
+
+ if (pfd != -1) {
if (evport_debug)
fprintf(stderr, "deleting event from pending fd %d\n", fd);
* associated with the port. All we need to do is update
* pending_mask appropriately.
*/
- state->pending_mask &= ~mask;
+ state->pending_masks[pfd] &= ~mask;
- if (state->pending_mask == AE_NONE)
- state->pending_fd = -1;
+ if (state->pending_masks[pfd] == AE_NONE)
+ state->pending_fds[pfd] = -1;
return;
}
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
struct timespec timeout, *tsp;
- int mask;
- port_event_t event;
+ int mask, i;
+ uint_t nevents;
+ port_event_t event[MAX_EVENT_BATCHSZ];
/*
- * If we've returned an fd event before, we must reassociated that fd with
- * the port now, before calling port_get(). See the block comment at the
- * top of this file for an explanation of why.
+ * If we've returned fd events before, we must reassociate them with the
+ * port now, before calling port_get(). See the block comment at the top of
+ * this file for an explanation of why.
*/
- if (state->pending_mask != AE_NONE) {
- if (aeApiAssociate("aeApiPoll", state->portfd, state->pending_fd,
- state->pending_mask) != 0) {
+ for (i = 0; i < state->npending; i++) {
+ if (state->pending_fds[i] == -1)
+ /* This fd has since been deleted. */
+ continue;
+
+ if (aeApiAssociate("aeApiPoll", state->portfd,
+ state->pending_fds[i], state->pending_masks[i]) != 0) {
/* See aeApiDelEvent for why this case is fatal. */
abort();
}
- state->pending_mask = AE_NONE;
- state->pending_fd = -1;
+ state->pending_masks[i] = AE_NONE;
+ state->pending_fds[i] = -1;
}
+ state->npending = 0;
+
if (tvp != NULL) {
timeout.tv_sec = tvp->tv_sec;
timeout.tv_nsec = tvp->tv_usec * 1000;
tsp = NULL;
}
- if (port_get(state->portfd, &event, tsp) == -1) {
+ /*
+ * port_getn can return with errno == ETIME having returned some events (!).
+ * So if we get ETIME, we check nevents, too.
+ */
+ nevents = 1;
+ if (port_getn(state->portfd, event, MAX_EVENT_BATCHSZ, &nevents,
+ tsp) == -1 && (errno != ETIME || nevents == 0)) {
if (errno == ETIME || errno == EINTR)
return 0;
abort();
}
- mask = 0;
- if (event.portev_events & POLLIN)
- mask |= AE_READABLE;
- if (event.portev_events & POLLOUT)
- mask |= AE_WRITABLE;
+ state->npending = nevents;
- eventLoop->fired[0].fd = event.portev_object;
- eventLoop->fired[0].mask = mask;
+ for (i = 0; i < nevents; i++) {
+ mask = 0;
+ if (event[i].portev_events & POLLIN)
+ mask |= AE_READABLE;
+ if (event[i].portev_events & POLLOUT)
+ mask |= AE_WRITABLE;
- if (evport_debug)
- fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n", event.portev_object,
- mask);
+ eventLoop->fired[i].fd = event[i].portev_object;
+ eventLoop->fired[i].mask = mask;
- state->pending_fd = event.portev_object;
- state->pending_mask = (uintptr_t)event.portev_user;
+ if (evport_debug)
+ fprintf(stderr, "aeApiPoll: fd %d mask 0x%x\n",
+ (int)event[i].portev_object, mask);
+
+ state->pending_fds[i] = event[i].portev_object;
+ state->pending_masks[i] = (uintptr_t)event[i].portev_user;
+ }
- return 1;
+ return nevents;
}
static char *aeApiName(void) {