]> git.saurik.com Git - redis.git/blobdiff - ae.c
Merge branch 'issue_191' of git://github.com/gnrfan/redis
[redis.git] / ae.c
diff --git a/ae.c b/ae.c
index 375f28a402f3139387a85a5b4c5a9b7d8070e36c..c7918ee1d6c52732856e3335197c0dff675f9474 100644 (file)
--- a/ae.c
+++ b/ae.c
@@ -2,7 +2,7 @@
  * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
  * it in form of a library for easy reuse.
  *
- * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
+ * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
 
 #include "ae.h"
 #include "zmalloc.h"
+#include "config.h"
+
+/* Include the best multiplexing layer supported by this system.
+ * The following should be ordered by performances, descending. */
+#ifdef HAVE_EPOLL
+#include "ae_epoll.c"
+#else
+    #ifdef HAVE_KQUEUE
+    #include "ae_kqueue.c"
+    #else
+    #include "ae_select.c"
+    #endif
+#endif
 
 aeEventLoop *aeCreateEventLoop(void) {
     aeEventLoop *eventLoop;
+    int i;
 
     eventLoop = zmalloc(sizeof(*eventLoop));
     if (!eventLoop) return NULL;
-    eventLoop->fileEventHead = NULL;
     eventLoop->timeEventHead = NULL;
     eventLoop->timeEventNextId = 0;
     eventLoop->stop = 0;
+    eventLoop->maxfd = -1;
+    eventLoop->beforesleep = NULL;
+    if (aeApiCreate(eventLoop) == -1) {
+        zfree(eventLoop);
+        return NULL;
+    }
+    /* Events with mask == AE_NONE are not set. So let's initialize the
+     * vector with it. */
+    for (i = 0; i < AE_SETSIZE; i++)
+        eventLoop->events[i].mask = AE_NONE;
     return eventLoop;
 }
 
 void aeDeleteEventLoop(aeEventLoop *eventLoop) {
+    aeApiFree(eventLoop);
     zfree(eventLoop);
 }
 
@@ -60,42 +84,38 @@ void aeStop(aeEventLoop *eventLoop) {
 }
 
 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
-        aeFileProc *proc, void *clientData,
-        aeEventFinalizerProc *finalizerProc)
+        aeFileProc *proc, void *clientData)
 {
-    aeFileEvent *fe;
-
-    fe = zmalloc(sizeof(*fe));
-    if (fe == NULL) return AE_ERR;
-    fe->fd = fd;
-    fe->mask = mask;
-    fe->fileProc = proc;
-    fe->finalizerProc = finalizerProc;
+    if (fd >= AE_SETSIZE) return AE_ERR;
+    aeFileEvent *fe = &eventLoop->events[fd];
+
+    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
+        return AE_ERR;
+    fe->mask |= mask;
+    if (mask & AE_READABLE) fe->rfileProc = proc;
+    if (mask & AE_WRITABLE) fe->wfileProc = proc;
     fe->clientData = clientData;
-    fe->next = eventLoop->fileEventHead;
-    eventLoop->fileEventHead = fe;
+    if (fd > eventLoop->maxfd)
+        eventLoop->maxfd = fd;
     return AE_OK;
 }
 
 void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
 {
-    aeFileEvent *fe, *prev = NULL;
-
-    fe = eventLoop->fileEventHead;
-    while(fe) {
-        if (fe->fd == fd && fe->mask == mask) {
-            if (prev == NULL)
-                eventLoop->fileEventHead = fe->next;
-            else
-                prev->next = fe->next;
-            if (fe->finalizerProc)
-                fe->finalizerProc(eventLoop, fe->clientData);
-            zfree(fe);
-            return;
-        }
-        prev = fe;
-        fe = fe->next;
+    if (fd >= AE_SETSIZE) return;
+    aeFileEvent *fe = &eventLoop->events[fd];
+
+    if (fe->mask == AE_NONE) return;
+    fe->mask = fe->mask & (~mask);
+    if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
+        /* Update the max fd */
+        int j;
+
+        for (j = eventLoop->maxfd-1; j >= 0; j--)
+            if (eventLoop->events[j].mask != AE_NONE) break;
+        eventLoop->maxfd = j;
     }
+    aeApiDelEvent(eventLoop, fd, mask);
 }
 
 static void aeGetTime(long *seconds, long *milliseconds)
@@ -167,7 +187,12 @@ int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
  * put in sleep without to delay any event.
  * If there are no timers NULL is returned.
  *
- * Note that's O(N) since time events are unsorted. */
+ * Note that's O(N) since time events are unsorted.
+ * Possible optimizations (not needed by Redis so far, but...):
+ * 1) Insert the event in order, so that the nearest is just the head.
+ *    Much better but still insertion or deletion of timers is O(N).
+ * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
+ */
 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
 {
     aeTimeEvent *te = eventLoop->timeEventHead;
@@ -183,6 +208,57 @@ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
     return nearest;
 }
 
+/* Process time events */
+static int processTimeEvents(aeEventLoop *eventLoop) {
+    int processed = 0;
+    aeTimeEvent *te;
+    long long maxId;
+
+    te = eventLoop->timeEventHead;
+    maxId = eventLoop->timeEventNextId-1;
+    while(te) {
+        long now_sec, now_ms;
+        long long id;
+
+        if (te->id > maxId) {
+            te = te->next;
+            continue;
+        }
+        aeGetTime(&now_sec, &now_ms);
+        if (now_sec > te->when_sec ||
+            (now_sec == te->when_sec && now_ms >= te->when_ms))
+        {
+            int retval;
+
+            id = te->id;
+            retval = te->timeProc(eventLoop, id, te->clientData);
+            processed++;
+            /* After an event is processed our time event list may
+             * no longer be the same, so we restart from head.
+             * Still we make sure to don't process events registered
+             * by event handlers itself in order to don't loop forever.
+             * To do so we saved the max ID we want to handle.
+             *
+             * FUTURE OPTIMIZATIONS:
+             * Note that this is NOT great algorithmically. Redis uses
+             * a single time event so it's not a problem but the right
+             * way to do this is to add the new elements on head, and
+             * to flag deleted elements in a special way for later
+             * deletion (putting references to the nodes to delete into
+             * another linked list). */
+            if (retval != AE_NOMORE) {
+                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
+            } else {
+                aeDeleteTimeEvent(eventLoop, id);
+            }
+            te = eventLoop->timeEventHead;
+        } else {
+            te = te->next;
+        }
+    }
+    return processed;
+}
+
 /* Process every pending time event, then every pending file event
  * (that may be registered by time event callbacks just processed).
  * Without special flags the function sleeps until some file event
@@ -198,37 +274,18 @@ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
  * The function returns the number of events processed. */
 int aeProcessEvents(aeEventLoop *eventLoop, int flags)
 {
-    int maxfd = 0, numfd = 0, processed = 0;
-    fd_set rfds, wfds, efds;
-    aeFileEvent *fe = eventLoop->fileEventHead;
-    aeTimeEvent *te;
-    long long maxId;
-    AE_NOTUSED(flags);
+    int processed = 0, numevents;
 
     /* Nothing to do? return ASAP */
     if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
 
-    FD_ZERO(&rfds);
-    FD_ZERO(&wfds);
-    FD_ZERO(&efds);
-
-    /* Check file events */
-    if (flags & AE_FILE_EVENTS) {
-        while (fe != NULL) {
-            if (fe->mask & AE_READABLE) FD_SET(fe->fd, &rfds);
-            if (fe->mask & AE_WRITABLE) FD_SET(fe->fd, &wfds);
-            if (fe->mask & AE_EXCEPTION) FD_SET(fe->fd, &efds);
-            if (maxfd < fe->fd) maxfd = fe->fd;
-            numfd++;
-            fe = fe->next;
-        }
-    }
     /* Note that we want call select() even if there are no
      * file events to process as long as we want to process time
      * events, in order to sleep until the next time event is ready
      * to fire. */
-    if (numfd || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
-        int retval;
+    if (eventLoop->maxfd != -1 ||
+        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
+        int j;
         aeTimeEvent *shortest = NULL;
         struct timeval tv, *tvp;
 
@@ -248,6 +305,8 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
             } else {
                 tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
             }
+            if (tvp->tv_sec < 0) tvp->tv_sec = 0;
+            if (tvp->tv_usec < 0) tvp->tv_usec = 0;
         } else {
             /* If we have to check for events but need to return
              * ASAP because of AE_DONT_WAIT we need to se the timeout
@@ -261,76 +320,31 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
             }
         }
 
-        retval = select(maxfd+1, &rfds, &wfds, &efds, tvp);
-        if (retval > 0) {
-            fe = eventLoop->fileEventHead;
-            while(fe != NULL) {
-                int fd = (int) fe->fd;
-
-                if ((fe->mask & AE_READABLE && FD_ISSET(fd, &rfds)) ||
-                    (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds)) ||
-                    (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds)))
-                {
-                    int mask = 0;
-
-                    if (fe->mask & AE_READABLE && FD_ISSET(fd, &rfds))
-                        mask |= AE_READABLE;
-                    if (fe->mask & AE_WRITABLE && FD_ISSET(fd, &wfds))
-                        mask |= AE_WRITABLE;
-                    if (fe->mask & AE_EXCEPTION && FD_ISSET(fd, &efds))
-                        mask |= AE_EXCEPTION;
-                    fe->fileProc(eventLoop, fe->fd, fe->clientData, mask);
-                    processed++;
-                    /* After an event is processed our file event list
-                     * may no longer be the same, so what we do
-                     * is to clear the bit for this file descriptor and
-                     * restart again from the head. */
-                    fe = eventLoop->fileEventHead;
-                    FD_CLR(fd, &rfds);
-                    FD_CLR(fd, &wfds);
-                    FD_CLR(fd, &efds);
-                } else {
-                    fe = fe->next;
-                }
+        numevents = aeApiPoll(eventLoop, tvp);
+        for (j = 0; j < numevents; j++) {
+            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
+            int mask = eventLoop->fired[j].mask;
+            int fd = eventLoop->fired[j].fd;
+            int rfired = 0;
+
+           /* note the fe->mask & mask & ... code: maybe an already processed
+             * event removed an element that fired and we still didn't
+             * processed, so we check if the event is still valid. */
+            if (fe->mask & mask & AE_READABLE) {
+                rfired = 1;
+                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
+            }
+            if (fe->mask & mask & AE_WRITABLE) {
+                if (!rfired || fe->wfileProc != fe->rfileProc)
+                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
             }
+            processed++;
         }
     }
     /* Check time events */
-    if (flags & AE_TIME_EVENTS) {
-        te = eventLoop->timeEventHead;
-        maxId = eventLoop->timeEventNextId-1;
-        while(te) {
-            long now_sec, now_ms;
-            long long id;
+    if (flags & AE_TIME_EVENTS)
+        processed += processTimeEvents(eventLoop);
 
-            if (te->id > maxId) {
-                te = te->next;
-                continue;
-            }
-            aeGetTime(&now_sec, &now_ms);
-            if (now_sec > te->when_sec ||
-                (now_sec == te->when_sec && now_ms >= te->when_ms))
-            {
-                int retval;
-
-                id = te->id;
-                retval = te->timeProc(eventLoop, id, te->clientData);
-                /* After an event is processed our time event list may
-                 * no longer be the same, so we restart from head.
-                 * Still we make sure to don't process events registered
-                 * by event handlers itself in order to don't loop forever.
-                 * To do so we saved the max ID we want to handle. */
-                if (retval != AE_NOMORE) {
-                    aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
-                } else {
-                    aeDeleteTimeEvent(eventLoop, id);
-                }
-                te = eventLoop->timeEventHead;
-            } else {
-                te = te->next;
-            }
-        }
-    }
     return processed; /* return the number of processed file/time events */
 }
 
@@ -349,20 +363,28 @@ int aeWait(int fd, int mask, long long milliseconds) {
 
     if (mask & AE_READABLE) FD_SET(fd,&rfds);
     if (mask & AE_WRITABLE) FD_SET(fd,&wfds);
-    if (mask & AE_EXCEPTION) FD_SET(fd,&efds);
     if ((retval = select(fd+1, &rfds, &wfds, &efds, &tv)) > 0) {
         if (FD_ISSET(fd,&rfds)) retmask |= AE_READABLE;
         if (FD_ISSET(fd,&wfds)) retmask |= AE_WRITABLE;
-        if (FD_ISSET(fd,&efds)) retmask |= AE_EXCEPTION;
         return retmask;
     } else {
         return retval;
     }
 }
 
-void aeMain(aeEventLoop *eventLoop)
-{
+void aeMain(aeEventLoop *eventLoop) {
     eventLoop->stop = 0;
-    while (!eventLoop->stop)
+    while (!eventLoop->stop) {
+        if (eventLoop->beforesleep != NULL)
+            eventLoop->beforesleep(eventLoop);
         aeProcessEvents(eventLoop, AE_ALL_EVENTS);
+    }
+}
+
+char *aeGetApiName(void) {
+    return aeApiName();
+}
+
+void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
+    eventLoop->beforesleep = beforesleep;
 }