X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/ed9b544e10b84cd43348ddfab7068b610a5df1f7..6e0e5bedd9c3a4bf0f53f43c427c88e2866bda0a:/ae.c?ds=sidebyside diff --git a/ae.c b/ae.c index 375f28a4..c7918ee1 100644 --- a/ae.c +++ b/ae.c @@ -2,7 +2,7 @@ * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated * it in form of a library for easy reuse. * - * Copyright (c) 2006-2009, Salvatore Sanfilippo + * Copyright (c) 2006-2010, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -38,20 +38,44 @@ #include "ae.h" #include "zmalloc.h" +#include "config.h" + +/* Include the best multiplexing layer supported by this system. + * The following should be ordered by performances, descending. */ +#ifdef HAVE_EPOLL +#include "ae_epoll.c" +#else + #ifdef HAVE_KQUEUE + #include "ae_kqueue.c" + #else + #include "ae_select.c" + #endif +#endif aeEventLoop *aeCreateEventLoop(void) { aeEventLoop *eventLoop; + int i; eventLoop = zmalloc(sizeof(*eventLoop)); if (!eventLoop) return NULL; - eventLoop->fileEventHead = NULL; eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; + eventLoop->maxfd = -1; + eventLoop->beforesleep = NULL; + if (aeApiCreate(eventLoop) == -1) { + zfree(eventLoop); + return NULL; + } + /* Events with mask == AE_NONE are not set. So let's initialize the + * vector with it. */ + for (i = 0; i < AE_SETSIZE; i++) + eventLoop->events[i].mask = AE_NONE; return eventLoop; } void aeDeleteEventLoop(aeEventLoop *eventLoop) { + aeApiFree(eventLoop); zfree(eventLoop); } @@ -60,42 +84,38 @@ void aeStop(aeEventLoop *eventLoop) { } int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData, - aeEventFinalizerProc *finalizerProc) + aeFileProc *proc, void *clientData) { - aeFileEvent *fe; - - fe = zmalloc(sizeof(*fe)); - if (fe == NULL) return AE_ERR; - fe->fd = fd; - fe->mask = mask; - fe->fileProc = proc; - fe->finalizerProc = finalizerProc; + if (fd >= AE_SETSIZE) return AE_ERR; + aeFileEvent *fe = &eventLoop->events[fd]; + + if (aeApiAddEvent(eventLoop, fd, mask) == -1) + return AE_ERR; + fe->mask |= mask; + if (mask & AE_READABLE) fe->rfileProc = proc; + if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; - fe->next = eventLoop->fileEventHead; - eventLoop->fileEventHead = fe; + if (fd > eventLoop->maxfd) + eventLoop->maxfd = fd; return AE_OK; } void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { - aeFileEvent *fe, *prev = NULL; - - fe = eventLoop->fileEventHead; - while(fe) { - if (fe->fd == fd && fe->mask == mask) { - if (prev == NULL) - eventLoop->fileEventHead = fe->next; - else - prev->next = fe->next; - if (fe->finalizerProc) - fe->finalizerProc(eventLoop, fe->clientData); - zfree(fe); - return; - } - prev = fe; - fe = fe->next; + if (fd >= AE_SETSIZE) return; + aeFileEvent *fe = &eventLoop->events[fd]; + + if (fe->mask == AE_NONE) return; + fe->mask = fe->mask & (~mask); + if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { + /* Update the max fd */ + int j; + + for (j = eventLoop->maxfd-1; j >= 0; j--) + if (eventLoop->events[j].mask != AE_NONE) break; + eventLoop->maxfd = j; } + aeApiDelEvent(eventLoop, fd, mask); } static void aeGetTime(long *seconds, long *milliseconds) @@ -167,7 +187,12 @@ int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) * put in sleep without to delay any event. * If there are no timers NULL is returned. * - * Note that's O(N) since time events are unsorted. */ + * Note that's O(N) since time events are unsorted. + * Possible optimizations (not needed by Redis so far, but...): + * 1) Insert the event in order, so that the nearest is just the head. + * Much better but still insertion or deletion of timers is O(N). + * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). + */ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) { aeTimeEvent *te = eventLoop->timeEventHead; @@ -183,6 +208,57 @@ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) return nearest; } +/* Process time events */ +static int processTimeEvents(aeEventLoop *eventLoop) { + int processed = 0; + aeTimeEvent *te; + long long maxId; + + te = eventLoop->timeEventHead; + maxId = eventLoop->timeEventNextId-1; + while(te) { + long now_sec, now_ms; + long long id; + + if (te->id > maxId) { + te = te->next; + continue; + } + aeGetTime(&now_sec, &now_ms); + if (now_sec > te->when_sec || + (now_sec == te->when_sec && now_ms >= te->when_ms)) + { + int retval; + + id = te->id; + retval = te->timeProc(eventLoop, id, te->clientData); + processed++; + /* After an event is processed our time event list may + * no longer be the same, so we restart from head. + * Still we make sure to don't process events registered + * by event handlers itself in order to don't loop forever. + * To do so we saved the max ID we want to handle. + * + * FUTURE OPTIMIZATIONS: + * Note that this is NOT great algorithmically. Redis uses + * a single time event so it's not a problem but the right + * way to do this is to add the new elements on head, and + * to flag deleted elements in a special way for later + * deletion (putting references to the nodes to delete into + * another linked list). */ + if (retval != AE_NOMORE) { + aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); + } else { + aeDeleteTimeEvent(eventLoop, id); + } + te = eventLoop->timeEventHead; + } else { + te = te->next; + } + } + return processed; +} + /* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * Without special flags the function sleeps until some file event @@ -198,37 +274,18 @@ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) * The function returns the number of events processed. */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { - int maxfd = 0, numfd = 0, processed = 0; - fd_set rfds, wfds, efds; - aeFileEvent *fe = eventLoop->fileEventHead; - aeTimeEvent *te; - long long maxId; - AE_NOTUSED(flags); + int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; - FD_ZERO(&rfds); - FD_ZERO(&wfds); - FD_ZERO(&efds); - - /* Check file events */ - if (flags & AE_FILE_EVENTS) { - while (fe != NULL) { - if (fe->mask & AE_READABLE) FD_SET(fe->fd, &rfds); - if (fe->mask & AE_WRITABLE) FD_SET(fe->fd, &wfds); - if (fe->mask & AE_EXCEPTION) FD_SET(fe->fd, &efds); - if (maxfd < fe->fd) maxfd = fe->fd; - numfd++; - fe = fe->next; - } - } /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ - if (numfd || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { - int retval; + if (eventLoop->maxfd != -1 || + ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { + int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; @@ -248,6 +305,8 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } + if (tvp->tv_sec < 0) tvp->tv_sec = 0; + if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to se the timeout @@ -261,76 +320,31 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) } } - retval = select(maxfd+1, &rfds, &wfds, &efds, tvp); - if (retval > 0) { - fe = eventLoop->fileEventHead; - while(fe != NULL) { - int fd = (int) fe->fd; - - if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) || - (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) || - (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))) - { - int mask = 0; - - if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) - mask |= AE_READABLE; - if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) - mask |= AE_WRITABLE; - if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds)) - mask |= AE_EXCEPTION; - fe->fileProc(eventLoop, fe->fd, fe->clientData, mask); - processed++; - /* After an event is processed our file event list - * may no longer be the same, so what we do - * is to clear the bit for this file descriptor and - * restart again from the head. */ - fe = eventLoop->fileEventHead; - FD_CLR(fd, &rfds); - FD_CLR(fd, &wfds); - FD_CLR(fd, &efds); - } else { - fe = fe->next; - } + numevents = aeApiPoll(eventLoop, tvp); + for (j = 0; j < numevents; j++) { + aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; + int mask = eventLoop->fired[j].mask; + int fd = eventLoop->fired[j].fd; + int rfired = 0; + + /* note the fe->mask & mask & ... code: maybe an already processed + * event removed an element that fired and we still didn't + * processed, so we check if the event is still valid. */ + if (fe->mask & mask & AE_READABLE) { + rfired = 1; + fe->rfileProc(eventLoop,fd,fe->clientData,mask); + } + if (fe->mask & mask & AE_WRITABLE) { + if (!rfired || fe->wfileProc != fe->rfileProc) + fe->wfileProc(eventLoop,fd,fe->clientData,mask); } + processed++; } } /* Check time events */ - if (flags & AE_TIME_EVENTS) { - te = eventLoop->timeEventHead; - maxId = eventLoop->timeEventNextId-1; - while(te) { - long now_sec, now_ms; - long long id; + if (flags & AE_TIME_EVENTS) + processed += processTimeEvents(eventLoop); - if (te->id > maxId) { - te = te->next; - continue; - } - aeGetTime(&now_sec, &now_ms); - if (now_sec > te->when_sec || - (now_sec == te->when_sec && now_ms >= te->when_ms)) - { - int retval; - - id = te->id; - retval = te->timeProc(eventLoop, id, te->clientData); - /* After an event is processed our time event list may - * no longer be the same, so we restart from head. - * Still we make sure to don't process events registered - * by event handlers itself in order to don't loop forever. - * To do so we saved the max ID we want to handle. */ - if (retval != AE_NOMORE) { - aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); - } else { - aeDeleteTimeEvent(eventLoop, id); - } - te = eventLoop->timeEventHead; - } else { - te = te->next; - } - } - } return processed; /* return the number of processed file/time events */ } @@ -349,20 +363,28 @@ int aeWait(int fd, int mask, long long milliseconds) { if (mask & AE_READABLE) FD_SET(fd,&rfds); if (mask & AE_WRITABLE) FD_SET(fd,&wfds); - if (mask & AE_EXCEPTION) FD_SET(fd,&efds); if ((retval = select(fd+1, &rfds, &wfds, &efds, &tv)) > 0) { if (FD_ISSET(fd,&rfds)) retmask |= AE_READABLE; if (FD_ISSET(fd,&wfds)) retmask |= AE_WRITABLE; - if (FD_ISSET(fd,&efds)) retmask |= AE_EXCEPTION; return retmask; } else { return retval; } } -void aeMain(aeEventLoop *eventLoop) -{ +void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; - while (!eventLoop->stop) + while (!eventLoop->stop) { + if (eventLoop->beforesleep != NULL) + eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS); + } +} + +char *aeGetApiName(void) { + return aeApiName(); +} + +void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) { + eventLoop->beforesleep = beforesleep; }