From 266373b2834efa11b83e681bb211a0cd6c922eaa Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 23 Nov 2009 18:50:39 +0100 Subject: [PATCH] ae.c now supports multiple polling API modules, even if only ae_select.c is implemented currently. Also adding and removing an event is now O(1). --- Makefile | 6 ++- ae.c | 150 ++++++++++++++++++++++++---------------------------- ae.h | 59 ++++++++++++--------- benchmark.c | 23 ++++++-- config.h | 5 ++ redis.c | 10 ++-- 6 files changed, 137 insertions(+), 116 deletions(-) diff --git a/Makefile b/Makefile index 9e687d25..7d9bdc57 100644 --- a/Makefile +++ b/Makefile @@ -25,7 +25,8 @@ all: redis-server redis-benchmark redis-cli # Deps (use make dep to generate this) adlist.o: adlist.c adlist.h zmalloc.h -ae.o: ae.c ae.h zmalloc.h +ae.o: ae.c ae.h zmalloc.h ae_select.c +ae_select.o: ae_select.c anet.o: anet.c fmacros.h anet.h benchmark.o: benchmark.c fmacros.h ae.h anet.h sds.h adlist.h zmalloc.h dict.o: dict.c fmacros.h dict.h zmalloc.h @@ -33,7 +34,8 @@ lzf_c.o: lzf_c.c lzfP.h lzf_d.o: lzf_d.c lzfP.h pqsort.o: pqsort.c redis-cli.o: redis-cli.c fmacros.h anet.h sds.h adlist.h zmalloc.h -redis.o: redis.c fmacros.h ae.h sds.h anet.h dict.h adlist.h zmalloc.h lzf.h pqsort.h config.h +redis.o: redis.c fmacros.h config.h redis.h ae.h sds.h anet.h dict.h \ + adlist.h zmalloc.h lzf.h pqsort.h staticsymbols.h sds.o: sds.c sds.h zmalloc.h zmalloc.o: zmalloc.c config.h diff --git a/ae.c b/ae.c index d5eff76c..4f12e410 100644 --- a/ae.c +++ b/ae.c @@ -38,20 +38,39 @@ #include "ae.h" #include "zmalloc.h" +#include "config.h" + +/* Include the best multiplexing layer supported by this system. + * The following should be ordered by performances, descending. */ +#ifdef HAVE_EPOLL +#include "ae_epoll.c" +#else +#include "ae_select.c" +#endif aeEventLoop *aeCreateEventLoop(void) { aeEventLoop *eventLoop; + int i; eventLoop = zmalloc(sizeof(*eventLoop)); if (!eventLoop) return NULL; - eventLoop->fileEventHead = NULL; eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; + eventLoop->maxfd = -1; + if (aeApiCreate(eventLoop) == -1) { + zfree(eventLoop); + return NULL; + } + /* Events with mask == AE_NONE are not set. So let's initialize the + * vector with it. */ + for (i = 0; i < AE_SETSIZE; i++) + eventLoop->events[i].mask = AE_NONE; return eventLoop; } void aeDeleteEventLoop(aeEventLoop *eventLoop) { + aeApiFree(eventLoop); zfree(eventLoop); } @@ -60,42 +79,39 @@ void aeStop(aeEventLoop *eventLoop) { } int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData, - aeEventFinalizerProc *finalizerProc) + aeFileProc *proc, void *clientData) { - aeFileEvent *fe; - - fe = zmalloc(sizeof(*fe)); - if (fe == NULL) return AE_ERR; - fe->fd = fd; - fe->mask = mask; - fe->fileProc = proc; - fe->finalizerProc = finalizerProc; + if (fd >= AE_SETSIZE) return AE_ERR; + aeFileEvent *fe = &eventLoop->events[fd]; + + if (aeApiAddEvent(eventLoop, fd, mask) == -1) + return AE_ERR; + fe->mask |= mask; + if (mask & AE_READABLE) fe->rfileProc = proc; + if (mask & AE_WRITABLE) fe->wfileProc = proc; + if (mask & AE_EXCEPTION) fe->efileProc = proc; fe->clientData = clientData; - fe->next = eventLoop->fileEventHead; - eventLoop->fileEventHead = fe; + if (fd > eventLoop->maxfd) + eventLoop->maxfd = fd; return AE_OK; } void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { - aeFileEvent *fe, *prev = NULL; - - fe = eventLoop->fileEventHead; - while(fe) { - if (fe->fd == fd && fe->mask == mask) { - if (prev == NULL) - eventLoop->fileEventHead = fe->next; - else - prev->next = fe->next; - if (fe->finalizerProc) - fe->finalizerProc(eventLoop, fe->clientData); - zfree(fe); - return; - } - prev = fe; - fe = fe->next; + if (fd >= AE_SETSIZE) return; + aeFileEvent *fe = &eventLoop->events[fd]; + + if (fe->mask == AE_NONE) return; + fe->mask = fe->mask & (~mask); + if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { + /* Update the max fd */ + int j; + + for (j = eventLoop->maxfd-1; j >= 0; j--) + if (eventLoop->events[j].mask != AE_NONE) break; + eventLoop->maxfd = j; } + aeApiDelEvent(eventLoop, fd, mask); } static void aeGetTime(long *seconds, long *milliseconds) @@ -254,34 +270,18 @@ static int processTimeEvents(aeEventLoop *eventLoop) { * The function returns the number of events processed. */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { - int maxfd = 0, numfd = 0, processed = 0; - fd_set rfds, wfds, efds; - aeFileEvent *fe = eventLoop->fileEventHead; + int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; - FD_ZERO(&rfds); - FD_ZERO(&wfds); - FD_ZERO(&efds); - - /* Check file events */ - if (flags & AE_FILE_EVENTS) { - while (fe != NULL) { - if (fe->mask & AE_READABLE) FD_SET(fe->fd, &rfds); - if (fe->mask & AE_WRITABLE) FD_SET(fe->fd, &wfds); - if (fe->mask & AE_EXCEPTION) FD_SET(fe->fd, &efds); - if (maxfd < fe->fd) maxfd = fe->fd; - numfd++; - fe = fe->next; - } - } /* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ - if (numfd || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { - int retval; + if (eventLoop->maxfd != -1 || + ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { + int j; aeTimeEvent *shortest = NULL; struct timeval tv, *tvp; @@ -301,6 +301,8 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) } else { tvp->tv_usec = (shortest->when_ms - now_ms)*1000; } + if (tvp->tv_sec < 0) tvp->tv_sec = 0; + if (tvp->tv_usec < 0) tvp->tv_usec = 0; } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to se the timeout @@ -314,38 +316,24 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) } } - retval = select(maxfd+1, &rfds, &wfds, &efds, tvp); - if (retval > 0) { - fe = eventLoop->fileEventHead; - while(fe != NULL) { - int fd = (int) fe->fd; - - if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) || - (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) || - (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))) - { - int mask = 0; - - if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) - mask |= AE_READABLE; - if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) - mask |= AE_WRITABLE; - if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds)) - mask |= AE_EXCEPTION; - fe->fileProc(eventLoop, fe->fd, fe->clientData, mask); - processed++; - /* After an event is processed our file event list - * may no longer be the same, so what we do - * is to clear the bit for this file descriptor and - * restart again from the head. */ - fe = eventLoop->fileEventHead; - FD_CLR(fd, &rfds); - FD_CLR(fd, &wfds); - FD_CLR(fd, &efds); - } else { - fe = fe->next; - } - } + numevents = aeApiPoll(eventLoop, tvp); + for (j = 0; j < numevents; j++) { + aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; + int mask = eventLoop->fired[j].mask; + int fd = eventLoop->fired[j].fd; + + /* note the fe->mask & mask & ... code: maybe an already processed + * event removed an element that fired and we still didn't + * processed, so we check if the event is still valid. */ + if (fe->mask & mask & AE_READABLE) + fe->rfileProc(eventLoop,fd,fe->clientData,mask); + if (fe->mask & mask & AE_WRITABLE && fe->wfileProc != fe->rfileProc) + fe->wfileProc(eventLoop,fd,fe->clientData,mask); + if (fe->mask & mask & AE_EXCEPTION && + fe->efileProc != fe->wfileProc && + fe->efileProc != fe->rfileProc) + fe->efileProc(eventLoop,fd,fe->clientData,mask); + processed++; } } /* Check time events */ diff --git a/ae.h b/ae.h index 69bbbee9..499063ca 100644 --- a/ae.h +++ b/ae.h @@ -33,6 +33,26 @@ #ifndef __AE_H__ #define __AE_H__ +#define AE_SETSIZE (1024*10) /* Max number of fd supported */ + +#define AE_OK 0 +#define AE_ERR -1 + +#define AE_NONE 0 +#define AE_READABLE 1 +#define AE_WRITABLE 2 +#define AE_EXCEPTION 4 + +#define AE_FILE_EVENTS 1 +#define AE_TIME_EVENTS 2 +#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) +#define AE_DONT_WAIT 4 + +#define AE_NOMORE -1 + +/* Macros */ +#define AE_NOTUSED(V) ((void) V) + struct aeEventLoop; /* Types and data structures */ @@ -42,12 +62,11 @@ typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientDat /* File event structure */ typedef struct aeFileEvent { - int fd; int mask; /* one of AE_(READABLE|WRITABLE|EXCEPTION) */ - aeFileProc *fileProc; - aeEventFinalizerProc *finalizerProc; + aeFileProc *rfileProc; + aeFileProc *wfileProc; + aeFileProc *efileProc; void *clientData; - struct aeFileEvent *next; } aeFileEvent; /* Time event structure */ @@ -61,39 +80,29 @@ typedef struct aeTimeEvent { struct aeTimeEvent *next; } aeTimeEvent; +/* A fired event */ +typedef struct aeFiredEvent { + int fd; + int mask; +} aeFiredEvent; + /* State of an event based program */ typedef struct aeEventLoop { + int maxfd; long long timeEventNextId; - aeFileEvent *fileEventHead; + aeFileEvent events[AE_SETSIZE]; /* Registered events */ + aeFiredEvent fired[AE_SETSIZE]; /* Fired events */ aeTimeEvent *timeEventHead; int stop; + void *apidata; /* This is used for polling API specific data */ } aeEventLoop; -/* Defines */ -#define AE_OK 0 -#define AE_ERR -1 - -#define AE_READABLE 1 -#define AE_WRITABLE 2 -#define AE_EXCEPTION 4 - -#define AE_FILE_EVENTS 1 -#define AE_TIME_EVENTS 2 -#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) -#define AE_DONT_WAIT 4 - -#define AE_NOMORE -1 - -/* Macros */ -#define AE_NOTUSED(V) ((void) V) - /* Prototypes */ aeEventLoop *aeCreateEventLoop(void); void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeStop(aeEventLoop *eventLoop); int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData, - aeEventFinalizerProc *finalizerProc); + aeFileProc *proc, void *clientData); void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, diff --git a/benchmark.c b/benchmark.c index 2994186b..fd949ea3 100644 --- a/benchmark.c +++ b/benchmark.c @@ -78,6 +78,7 @@ static struct config { list *clients; int quiet; int loop; + int idlemode; } config; typedef struct _client { @@ -136,7 +137,7 @@ static void freeAllClients(void) { static void resetClient(client c) { aeDeleteFileEvent(config.el,c->fd,AE_WRITABLE); aeDeleteFileEvent(config.el,c->fd,AE_READABLE); - aeCreateFileEvent(config.el,c->fd, AE_WRITABLE,writeHandler,c,NULL); + aeCreateFileEvent(config.el,c->fd, AE_WRITABLE,writeHandler,c); sdsfree(c->ibuf); c->ibuf = sdsempty(); c->readlen = (c->replytype == REPLY_BULK || @@ -328,7 +329,7 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) c->written += nwritten; if (sdslen(c->obuf) == c->written) { aeDeleteFileEvent(config.el,c->fd,AE_WRITABLE); - aeCreateFileEvent(config.el,c->fd,AE_READABLE,readHandler,c,NULL); + aeCreateFileEvent(config.el,c->fd,AE_READABLE,readHandler,c); c->state = CLIENT_READREPLY; } } @@ -352,7 +353,7 @@ static client createClient(void) { c->written = 0; c->totreceived = 0; c->state = CLIENT_CONNECTING; - aeCreateFileEvent(config.el, c->fd, AE_WRITABLE, writeHandler, c, NULL); + aeCreateFileEvent(config.el, c->fd, AE_WRITABLE, writeHandler, c); config.liveclients++; listAddNodeTail(config.clients,c); return c; @@ -451,6 +452,8 @@ void parseOptions(int argc, char **argv) { config.loop = 1; } else if (!strcmp(argv[i],"-D")) { config.debug = 1; + } else if (!strcmp(argv[i],"-I")) { + config.idlemode = 1; } else { printf("Wrong option '%s' or option argument missing\n\n",argv[i]); printf("Usage: redis-benchmark [-h ] [-p ] [-c ] [-n [-k ]\n\n"); @@ -469,6 +472,7 @@ void parseOptions(int argc, char **argv) { printf(" range will be allowed.\n"); printf(" -q Quiet. Just show query/sec values\n"); printf(" -l Loop. Run the tests forever\n"); + printf(" -I Idle mode. Just open N idle connections and wait.\n"); printf(" -D Debug mode. more verbose.\n"); exit(1); } @@ -493,6 +497,7 @@ int main(int argc, char **argv) { config.randomkeys_keyspacelen = 0; config.quiet = 0; config.loop = 0; + config.idlemode = 0; config.latency = NULL; config.clients = listCreate(); config.latency = zmalloc(sizeof(int)*(MAX_LATENCY+1)); @@ -506,6 +511,18 @@ int main(int argc, char **argv) { printf("WARNING: keepalive disabled, you probably need 'echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse' for Linux and 'sudo sysctl -w net.inet.tcp.msl=1000' for Mac OS X in order to use a lot of clients/requests\n"); } + if (config.idlemode) { + printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients); + prepareForBenchmark(); + c = createClient(); + if (!c) exit(1); + c->obuf = sdsempty(); + prepareClientForReply(c,REPLY_RETCODE); /* will never receive it */ + createMissingClients(c); + aeMain(config.el); + /* and will wait for every */ + } + do { prepareForBenchmark(); c = createClient(); diff --git a/config.h b/config.h index c19cfc74..af733895 100644 --- a/config.h +++ b/config.h @@ -26,4 +26,9 @@ #define HAVE_BACKTRACE 1 #endif +/* test for polling API */ +#ifdef __linux__ +#define HAVE_EPOLL 1 +#endif + #endif diff --git a/redis.c b/redis.c index d1d3010f..438a8b9e 100644 --- a/redis.c +++ b/redis.c @@ -1912,7 +1912,7 @@ static redisClient *createClient(int fd) { listSetFreeMethod(c->reply,decrRefCount); listSetDupMethod(c->reply,dupClientReplyValue); if (aeCreateFileEvent(server.el, c->fd, AE_READABLE, - readQueryFromClient, c, NULL) == AE_ERR) { + readQueryFromClient, c) == AE_ERR) { freeClient(c); return NULL; } @@ -1925,7 +1925,7 @@ static void addReply(redisClient *c, robj *obj) { (c->replstate == REDIS_REPL_NONE || c->replstate == REDIS_REPL_ONLINE) && aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, - sendReplyToClient, c, NULL) == AE_ERR) return; + sendReplyToClient, c) == AE_ERR) return; if (obj->encoding != REDIS_ENCODING_RAW) { obj = getDecodedObject(obj); } else { @@ -5304,7 +5304,7 @@ static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); slave->replstate = REDIS_REPL_ONLINE; if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, - sendReplyToClient, slave, NULL) == AE_ERR) { + sendReplyToClient, slave) == AE_ERR) { freeClient(slave); return; } @@ -5348,7 +5348,7 @@ static void updateSlavesWaitingBgsave(int bgsaveerr) { slave->repldbsize = buf.st_size; slave->replstate = REDIS_REPL_SEND_BULK; aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); - if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) { + if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { freeClient(slave); continue; } @@ -5834,7 +5834,7 @@ int main(int argc, char **argv) { redisLog(REDIS_NOTICE,"DB loaded from disk"); } if (aeCreateFileEvent(server.el, server.fd, AE_READABLE, - acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event"); + acceptHandler, NULL) == AE_ERR) oom("creating file event"); redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port); aeMain(server.el); aeDeleteEventLoop(server.el); -- 2.47.2