X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/e2641e09cc0daf44f63f654230f72d22acf3a9af..b7b2a1cc5eec13a714b02fc71dff3510a6b72155:/src/ae.c diff --git a/src/ae.c b/src/ae.c index c7918ee1..ee483802 100644 --- a/src/ae.c +++ b/src/ae.c @@ -35,6 +35,8 @@ #include #include #include +#include +#include #include "ae.h" #include "zmalloc.h" @@ -42,40 +44,55 @@ /* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ -#ifdef HAVE_EPOLL -#include "ae_epoll.c" +#ifdef HAVE_EVPORT +#include "ae_evport.c" #else - #ifdef HAVE_KQUEUE - #include "ae_kqueue.c" + #ifdef HAVE_EPOLL + #include "ae_epoll.c" #else - #include "ae_select.c" + #ifdef HAVE_KQUEUE + #include "ae_kqueue.c" + #else + #include "ae_select.c" + #endif #endif #endif -aeEventLoop *aeCreateEventLoop(void) { +aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; - eventLoop = zmalloc(sizeof(*eventLoop)); - if (!eventLoop) return NULL; + if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; + eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); + eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); + if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; + eventLoop->setsize = setsize; + eventLoop->lastTime = time(NULL); eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; - if (aeApiCreate(eventLoop) == -1) { - zfree(eventLoop); - return NULL; - } + if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ - for (i = 0; i < AE_SETSIZE; i++) + for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; + +err: + if (eventLoop) { + zfree(eventLoop->events); + zfree(eventLoop->fired); + zfree(eventLoop); + } + return NULL; } void aeDeleteEventLoop(aeEventLoop *eventLoop) { aeApiFree(eventLoop); + zfree(eventLoop->events); + zfree(eventLoop->fired); zfree(eventLoop); } @@ -86,7 +103,7 @@ void aeStop(aeEventLoop *eventLoop) { int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { - if (fd >= AE_SETSIZE) return AE_ERR; + if (fd >= eventLoop->setsize) return AE_ERR; aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) @@ -102,7 +119,7 @@ int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { - if (fd >= AE_SETSIZE) return; + if (fd >= eventLoop->setsize) return; aeFileEvent *fe = &eventLoop->events[fd]; if (fe->mask == AE_NONE) return; @@ -118,6 +135,13 @@ void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) aeApiDelEvent(eventLoop, fd, mask); } +int aeGetFileEvents(aeEventLoop *eventLoop, int fd) { + if (fd >= eventLoop->setsize) return 0; + aeFileEvent *fe = &eventLoop->events[fd]; + + return fe->mask; +} + static void aeGetTime(long *seconds, long *milliseconds) { struct timeval tv; @@ -213,6 +237,24 @@ static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; + time_t now = time(NULL); + + /* If the system clock is moved to the future, and then set back to the + * right value, time events may be delayed in a random way. Often this + * means that scheduled operations will not be performed soon enough. + * + * Here we try to detect system clock skews, and force all the time + * events to be processed ASAP when this happens: the idea is that + * processing events earlier is less dangerous than delaying them + * indefinitely, and practice suggests it is. */ + if (now < eventLoop->lastTime) { + te = eventLoop->timeEventHead; + while(te) { + te->when_sec = 0; + te = te->next; + } + } + eventLoop->lastTime = now; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; @@ -351,21 +393,19 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) /* Wait for millseconds until the given file descriptor becomes * writable/readable/exception */ int aeWait(int fd, int mask, long long milliseconds) { - struct timeval tv; - fd_set rfds, wfds, efds; + struct pollfd pfd; int retmask = 0, retval; - tv.tv_sec = milliseconds/1000; - tv.tv_usec = (milliseconds%1000)*1000; - FD_ZERO(&rfds); - FD_ZERO(&wfds); - FD_ZERO(&efds); - - if (mask & AE_READABLE) FD_SET(fd,&rfds); - if (mask & AE_WRITABLE) FD_SET(fd,&wfds); - if ((retval = select(fd+1, &rfds, &wfds, &efds, &tv)) > 0) { - if (FD_ISSET(fd,&rfds)) retmask |= AE_READABLE; - if (FD_ISSET(fd,&wfds)) retmask |= AE_WRITABLE; + memset(&pfd, 0, sizeof(pfd)); + pfd.fd = fd; + if (mask & AE_READABLE) pfd.events |= POLLIN; + if (mask & AE_WRITABLE) pfd.events |= POLLOUT; + + if ((retval = poll(&pfd, 1, milliseconds))== 1) { + if (pfd.revents & POLLIN) retmask |= AE_READABLE; + if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE; + if (pfd.revents & POLLERR) retmask |= AE_WRITABLE; + if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE; return retmask; } else { return retval;