]> git.saurik.com Git - redis.git/blob - src/ae.c
Fix for slaves chains. Force resync of slaves (simply disconnecting them) when SLAVE...
[redis.git] / src / ae.c
1 /* A simple event-driven programming library. Originally I wrote this code
2 * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
3 * it in form of a library for easy reuse.
4 *
5 * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * * Redistributions of source code must retain the above copyright notice,
12 * this list of conditions and the following disclaimer.
13 * * Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * * Neither the name of Redis nor the names of its contributors may be used
17 * to endorse or promote products derived from this software without
18 * specific prior written permission.
19 *
20 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
24 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30 * POSSIBILITY OF SUCH DAMAGE.
31 */
32
33 #include <stdio.h>
34 #include <sys/time.h>
35 #include <sys/types.h>
36 #include <unistd.h>
37 #include <stdlib.h>
38 #include <string.h>
39
40 #include "ae.h"
41 #include "zmalloc.h"
42 #include "config.h"
43
44 /* Include the best multiplexing layer supported by this system.
45 * The following should be ordered by performances, descending. */
46 #ifdef HAVE_EPOLL
47 #include "ae_epoll.c"
48 #else
49 #ifdef HAVE_KQUEUE
50 #include "ae_kqueue.c"
51 #else
52 #include "ae_select.c"
53 #endif
54 #endif
55
56 aeEventLoop *aeCreateEventLoop(int setsize) {
57 aeEventLoop *eventLoop;
58 int i;
59
60 if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
61 eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
62 eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
63 if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
64 eventLoop->setsize = setsize;
65 eventLoop->timeEventHead = NULL;
66 eventLoop->timeEventNextId = 0;
67 eventLoop->stop = 0;
68 eventLoop->maxfd = -1;
69 eventLoop->beforesleep = NULL;
70 if (aeApiCreate(eventLoop) == -1) goto err;
71 /* Events with mask == AE_NONE are not set. So let's initialize the
72 * vector with it. */
73 for (i = 0; i < setsize; i++)
74 eventLoop->events[i].mask = AE_NONE;
75 return eventLoop;
76
77 err:
78 if (eventLoop) {
79 zfree(eventLoop->events);
80 zfree(eventLoop->fired);
81 zfree(eventLoop);
82 }
83 return NULL;
84 }
85
86 void aeDeleteEventLoop(aeEventLoop *eventLoop) {
87 aeApiFree(eventLoop);
88 zfree(eventLoop->events);
89 zfree(eventLoop->fired);
90 zfree(eventLoop);
91 }
92
93 void aeStop(aeEventLoop *eventLoop) {
94 eventLoop->stop = 1;
95 }
96
97 int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
98 aeFileProc *proc, void *clientData)
99 {
100 if (fd >= eventLoop->setsize) return AE_ERR;
101 aeFileEvent *fe = &eventLoop->events[fd];
102
103 if (aeApiAddEvent(eventLoop, fd, mask) == -1)
104 return AE_ERR;
105 fe->mask |= mask;
106 if (mask & AE_READABLE) fe->rfileProc = proc;
107 if (mask & AE_WRITABLE) fe->wfileProc = proc;
108 fe->clientData = clientData;
109 if (fd > eventLoop->maxfd)
110 eventLoop->maxfd = fd;
111 return AE_OK;
112 }
113
114 void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
115 {
116 if (fd >= eventLoop->setsize) return;
117 aeFileEvent *fe = &eventLoop->events[fd];
118
119 if (fe->mask == AE_NONE) return;
120 fe->mask = fe->mask & (~mask);
121 if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
122 /* Update the max fd */
123 int j;
124
125 for (j = eventLoop->maxfd-1; j >= 0; j--)
126 if (eventLoop->events[j].mask != AE_NONE) break;
127 eventLoop->maxfd = j;
128 }
129 aeApiDelEvent(eventLoop, fd, mask);
130 }
131
132 int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
133 if (fd >= eventLoop->setsize) return 0;
134 aeFileEvent *fe = &eventLoop->events[fd];
135
136 return fe->mask;
137 }
138
139 static void aeGetTime(long *seconds, long *milliseconds)
140 {
141 struct timeval tv;
142
143 gettimeofday(&tv, NULL);
144 *seconds = tv.tv_sec;
145 *milliseconds = tv.tv_usec/1000;
146 }
147
148 static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
149 long cur_sec, cur_ms, when_sec, when_ms;
150
151 aeGetTime(&cur_sec, &cur_ms);
152 when_sec = cur_sec + milliseconds/1000;
153 when_ms = cur_ms + milliseconds%1000;
154 if (when_ms >= 1000) {
155 when_sec ++;
156 when_ms -= 1000;
157 }
158 *sec = when_sec;
159 *ms = when_ms;
160 }
161
162 long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
163 aeTimeProc *proc, void *clientData,
164 aeEventFinalizerProc *finalizerProc)
165 {
166 long long id = eventLoop->timeEventNextId++;
167 aeTimeEvent *te;
168
169 te = zmalloc(sizeof(*te));
170 if (te == NULL) return AE_ERR;
171 te->id = id;
172 aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
173 te->timeProc = proc;
174 te->finalizerProc = finalizerProc;
175 te->clientData = clientData;
176 te->next = eventLoop->timeEventHead;
177 eventLoop->timeEventHead = te;
178 return id;
179 }
180
181 int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
182 {
183 aeTimeEvent *te, *prev = NULL;
184
185 te = eventLoop->timeEventHead;
186 while(te) {
187 if (te->id == id) {
188 if (prev == NULL)
189 eventLoop->timeEventHead = te->next;
190 else
191 prev->next = te->next;
192 if (te->finalizerProc)
193 te->finalizerProc(eventLoop, te->clientData);
194 zfree(te);
195 return AE_OK;
196 }
197 prev = te;
198 te = te->next;
199 }
200 return AE_ERR; /* NO event with the specified ID found */
201 }
202
203 /* Search the first timer to fire.
204 * This operation is useful to know how many time the select can be
205 * put in sleep without to delay any event.
206 * If there are no timers NULL is returned.
207 *
208 * Note that's O(N) since time events are unsorted.
209 * Possible optimizations (not needed by Redis so far, but...):
210 * 1) Insert the event in order, so that the nearest is just the head.
211 * Much better but still insertion or deletion of timers is O(N).
212 * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
213 */
214 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
215 {
216 aeTimeEvent *te = eventLoop->timeEventHead;
217 aeTimeEvent *nearest = NULL;
218
219 while(te) {
220 if (!nearest || te->when_sec < nearest->when_sec ||
221 (te->when_sec == nearest->when_sec &&
222 te->when_ms < nearest->when_ms))
223 nearest = te;
224 te = te->next;
225 }
226 return nearest;
227 }
228
229 /* Process time events */
230 static int processTimeEvents(aeEventLoop *eventLoop) {
231 int processed = 0;
232 aeTimeEvent *te;
233 long long maxId;
234
235 te = eventLoop->timeEventHead;
236 maxId = eventLoop->timeEventNextId-1;
237 while(te) {
238 long now_sec, now_ms;
239 long long id;
240
241 if (te->id > maxId) {
242 te = te->next;
243 continue;
244 }
245 aeGetTime(&now_sec, &now_ms);
246 if (now_sec > te->when_sec ||
247 (now_sec == te->when_sec && now_ms >= te->when_ms))
248 {
249 int retval;
250
251 id = te->id;
252 retval = te->timeProc(eventLoop, id, te->clientData);
253 processed++;
254 /* After an event is processed our time event list may
255 * no longer be the same, so we restart from head.
256 * Still we make sure to don't process events registered
257 * by event handlers itself in order to don't loop forever.
258 * To do so we saved the max ID we want to handle.
259 *
260 * FUTURE OPTIMIZATIONS:
261 * Note that this is NOT great algorithmically. Redis uses
262 * a single time event so it's not a problem but the right
263 * way to do this is to add the new elements on head, and
264 * to flag deleted elements in a special way for later
265 * deletion (putting references to the nodes to delete into
266 * another linked list). */
267 if (retval != AE_NOMORE) {
268 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
269 } else {
270 aeDeleteTimeEvent(eventLoop, id);
271 }
272 te = eventLoop->timeEventHead;
273 } else {
274 te = te->next;
275 }
276 }
277 return processed;
278 }
279
280 /* Process every pending time event, then every pending file event
281 * (that may be registered by time event callbacks just processed).
282 * Without special flags the function sleeps until some file event
283 * fires, or when the next time event occurrs (if any).
284 *
285 * If flags is 0, the function does nothing and returns.
286 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
287 * if flags has AE_FILE_EVENTS set, file events are processed.
288 * if flags has AE_TIME_EVENTS set, time events are processed.
289 * if flags has AE_DONT_WAIT set the function returns ASAP until all
290 * the events that's possible to process without to wait are processed.
291 *
292 * The function returns the number of events processed. */
293 int aeProcessEvents(aeEventLoop *eventLoop, int flags)
294 {
295 int processed = 0, numevents;
296
297 /* Nothing to do? return ASAP */
298 if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
299
300 /* Note that we want call select() even if there are no
301 * file events to process as long as we want to process time
302 * events, in order to sleep until the next time event is ready
303 * to fire. */
304 if (eventLoop->maxfd != -1 ||
305 ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
306 int j;
307 aeTimeEvent *shortest = NULL;
308 struct timeval tv, *tvp;
309
310 if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
311 shortest = aeSearchNearestTimer(eventLoop);
312 if (shortest) {
313 long now_sec, now_ms;
314
315 /* Calculate the time missing for the nearest
316 * timer to fire. */
317 aeGetTime(&now_sec, &now_ms);
318 tvp = &tv;
319 tvp->tv_sec = shortest->when_sec - now_sec;
320 if (shortest->when_ms < now_ms) {
321 tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
322 tvp->tv_sec --;
323 } else {
324 tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
325 }
326 if (tvp->tv_sec < 0) tvp->tv_sec = 0;
327 if (tvp->tv_usec < 0) tvp->tv_usec = 0;
328 } else {
329 /* If we have to check for events but need to return
330 * ASAP because of AE_DONT_WAIT we need to se the timeout
331 * to zero */
332 if (flags & AE_DONT_WAIT) {
333 tv.tv_sec = tv.tv_usec = 0;
334 tvp = &tv;
335 } else {
336 /* Otherwise we can block */
337 tvp = NULL; /* wait forever */
338 }
339 }
340
341 numevents = aeApiPoll(eventLoop, tvp);
342 for (j = 0; j < numevents; j++) {
343 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
344 int mask = eventLoop->fired[j].mask;
345 int fd = eventLoop->fired[j].fd;
346 int rfired = 0;
347
348 /* note the fe->mask & mask & ... code: maybe an already processed
349 * event removed an element that fired and we still didn't
350 * processed, so we check if the event is still valid. */
351 if (fe->mask & mask & AE_READABLE) {
352 rfired = 1;
353 fe->rfileProc(eventLoop,fd,fe->clientData,mask);
354 }
355 if (fe->mask & mask & AE_WRITABLE) {
356 if (!rfired || fe->wfileProc != fe->rfileProc)
357 fe->wfileProc(eventLoop,fd,fe->clientData,mask);
358 }
359 processed++;
360 }
361 }
362 /* Check time events */
363 if (flags & AE_TIME_EVENTS)
364 processed += processTimeEvents(eventLoop);
365
366 return processed; /* return the number of processed file/time events */
367 }
368
369 /* Wait for millseconds until the given file descriptor becomes
370 * writable/readable/exception */
371 int aeWait(int fd, int mask, long long milliseconds) {
372 struct timeval tv;
373 fd_set rfds, wfds, efds;
374 int retmask = 0, retval;
375
376 tv.tv_sec = milliseconds/1000;
377 tv.tv_usec = (milliseconds%1000)*1000;
378 FD_ZERO(&rfds);
379 FD_ZERO(&wfds);
380 FD_ZERO(&efds);
381
382 if (mask & AE_READABLE) FD_SET(fd,&rfds);
383 if (mask & AE_WRITABLE) FD_SET(fd,&wfds);
384 if ((retval = select(fd+1, &rfds, &wfds, &efds, &tv)) > 0) {
385 if (FD_ISSET(fd,&rfds)) retmask |= AE_READABLE;
386 if (FD_ISSET(fd,&wfds)) retmask |= AE_WRITABLE;
387 return retmask;
388 } else {
389 return retval;
390 }
391 }
392
393 void aeMain(aeEventLoop *eventLoop) {
394 eventLoop->stop = 0;
395 while (!eventLoop->stop) {
396 if (eventLoop->beforesleep != NULL)
397 eventLoop->beforesleep(eventLoop);
398 aeProcessEvents(eventLoop, AE_ALL_EVENTS);
399 }
400 }
401
402 char *aeGetApiName(void) {
403 return aeApiName();
404 }
405
406 void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
407 eventLoop->beforesleep = beforesleep;
408 }