]> git.saurik.com Git - redis.git/blame - src/networking.c
Fixed a crash loading the AOF file containing MULTI/EXEC, a result of WATCH implement...
[redis.git] / src / networking.c
CommitLineData
e2641e09 1#include "redis.h"
2
3#include <sys/uio.h>
4
5void *dupClientReplyValue(void *o) {
6 incrRefCount((robj*)o);
7 return o;
8}
9
10int listMatchObjects(void *a, void *b) {
11 return equalStringObjects(a,b);
12}
13
14redisClient *createClient(int fd) {
15 redisClient *c = zmalloc(sizeof(*c));
16
17 anetNonBlock(NULL,fd);
18 anetTcpNoDelay(NULL,fd);
19 if (!c) return NULL;
20 selectDb(c,0);
21 c->fd = fd;
22 c->querybuf = sdsempty();
23 c->argc = 0;
24 c->argv = NULL;
25 c->bulklen = -1;
26 c->multibulk = 0;
27 c->mbargc = 0;
28 c->mbargv = NULL;
29 c->sentlen = 0;
30 c->flags = 0;
31 c->lastinteraction = time(NULL);
32 c->authenticated = 0;
33 c->replstate = REDIS_REPL_NONE;
34 c->reply = listCreate();
35 listSetFreeMethod(c->reply,decrRefCount);
36 listSetDupMethod(c->reply,dupClientReplyValue);
37 c->blocking_keys = NULL;
38 c->blocking_keys_num = 0;
39 c->io_keys = listCreate();
40 c->watched_keys = listCreate();
41 listSetFreeMethod(c->io_keys,decrRefCount);
42 c->pubsub_channels = dictCreate(&setDictType,NULL);
43 c->pubsub_patterns = listCreate();
44 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
45 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
46 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
47 readQueryFromClient, c) == AE_ERR) {
48 freeClient(c);
49 return NULL;
50 }
51 listAddNodeTail(server.clients,c);
52 initClientMultiState(c);
53 return c;
54}
55
56void addReply(redisClient *c, robj *obj) {
57 if (listLength(c->reply) == 0 &&
58 (c->replstate == REDIS_REPL_NONE ||
59 c->replstate == REDIS_REPL_ONLINE) &&
60 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
61 sendReplyToClient, c) == AE_ERR) return;
62
63 if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
64 obj = dupStringObject(obj);
65 obj->refcount = 0; /* getDecodedObject() will increment the refcount */
66 }
67 listAddNodeTail(c->reply,getDecodedObject(obj));
68}
69
70void addReplySds(redisClient *c, sds s) {
71 robj *o = createObject(REDIS_STRING,s);
72 addReply(c,o);
73 decrRefCount(o);
74}
75
76void addReplyDouble(redisClient *c, double d) {
77 char buf[128];
78
79 snprintf(buf,sizeof(buf),"%.17g",d);
80 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
81 (unsigned long) strlen(buf),buf));
82}
83
84void addReplyLongLong(redisClient *c, long long ll) {
85 char buf[128];
86 size_t len;
87
88 if (ll == 0) {
89 addReply(c,shared.czero);
90 return;
91 } else if (ll == 1) {
92 addReply(c,shared.cone);
93 return;
94 }
95 buf[0] = ':';
96 len = ll2string(buf+1,sizeof(buf)-1,ll);
97 buf[len+1] = '\r';
98 buf[len+2] = '\n';
99 addReplySds(c,sdsnewlen(buf,len+3));
100}
101
102void addReplyUlong(redisClient *c, unsigned long ul) {
103 char buf[128];
104 size_t len;
105
106 if (ul == 0) {
107 addReply(c,shared.czero);
108 return;
109 } else if (ul == 1) {
110 addReply(c,shared.cone);
111 return;
112 }
113 len = snprintf(buf,sizeof(buf),":%lu\r\n",ul);
114 addReplySds(c,sdsnewlen(buf,len));
115}
116
117void addReplyBulkLen(redisClient *c, robj *obj) {
118 size_t len, intlen;
119 char buf[128];
120
121 if (obj->encoding == REDIS_ENCODING_RAW) {
122 len = sdslen(obj->ptr);
123 } else {
124 long n = (long)obj->ptr;
125
126 /* Compute how many bytes will take this integer as a radix 10 string */
127 len = 1;
128 if (n < 0) {
129 len++;
130 n = -n;
131 }
132 while((n = n/10) != 0) {
133 len++;
134 }
135 }
136 buf[0] = '$';
137 intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len);
138 buf[intlen+1] = '\r';
139 buf[intlen+2] = '\n';
140 addReplySds(c,sdsnewlen(buf,intlen+3));
141}
142
143void addReplyBulk(redisClient *c, robj *obj) {
144 addReplyBulkLen(c,obj);
145 addReply(c,obj);
146 addReply(c,shared.crlf);
147}
148
149/* In the CONFIG command we need to add vanilla C string as bulk replies */
150void addReplyBulkCString(redisClient *c, char *s) {
151 if (s == NULL) {
152 addReply(c,shared.nullbulk);
153 } else {
154 robj *o = createStringObject(s,strlen(s));
155 addReplyBulk(c,o);
156 decrRefCount(o);
157 }
158}
159
160void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
161 int cport, cfd;
162 char cip[128];
163 redisClient *c;
164 REDIS_NOTUSED(el);
165 REDIS_NOTUSED(mask);
166 REDIS_NOTUSED(privdata);
167
168 cfd = anetAccept(server.neterr, fd, cip, &cport);
169 if (cfd == AE_ERR) {
170 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
171 return;
172 }
173 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
174 if ((c = createClient(cfd)) == NULL) {
175 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
176 close(cfd); /* May be already closed, just ingore errors */
177 return;
178 }
179 /* If maxclient directive is set and this is one client more... close the
180 * connection. Note that we create the client instead to check before
181 * for this condition, since now the socket is already set in nonblocking
182 * mode and we can send an error for free using the Kernel I/O */
183 if (server.maxclients && listLength(server.clients) > server.maxclients) {
184 char *err = "-ERR max number of clients reached\r\n";
185
186 /* That's a best effort error message, don't check write errors */
187 if (write(c->fd,err,strlen(err)) == -1) {
188 /* Nothing to do, Just to avoid the warning... */
189 }
190 freeClient(c);
191 return;
192 }
193 server.stat_numconnections++;
194}
195
196static void freeClientArgv(redisClient *c) {
197 int j;
198
199 for (j = 0; j < c->argc; j++)
200 decrRefCount(c->argv[j]);
201 for (j = 0; j < c->mbargc; j++)
202 decrRefCount(c->mbargv[j]);
203 c->argc = 0;
204 c->mbargc = 0;
205}
206
207void freeClient(redisClient *c) {
208 listNode *ln;
209
210 /* Note that if the client we are freeing is blocked into a blocking
211 * call, we have to set querybuf to NULL *before* to call
212 * unblockClientWaitingData() to avoid processInputBuffer() will get
213 * called. Also it is important to remove the file events after
214 * this, because this call adds the READABLE event. */
215 sdsfree(c->querybuf);
216 c->querybuf = NULL;
217 if (c->flags & REDIS_BLOCKED)
218 unblockClientWaitingData(c);
219
220 /* UNWATCH all the keys */
221 unwatchAllKeys(c);
222 listRelease(c->watched_keys);
223 /* Unsubscribe from all the pubsub channels */
224 pubsubUnsubscribeAllChannels(c,0);
225 pubsubUnsubscribeAllPatterns(c,0);
226 dictRelease(c->pubsub_channels);
227 listRelease(c->pubsub_patterns);
228 /* Obvious cleanup */
229 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
230 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
231 listRelease(c->reply);
232 freeClientArgv(c);
233 close(c->fd);
234 /* Remove from the list of clients */
235 ln = listSearchKey(server.clients,c);
236 redisAssert(ln != NULL);
237 listDelNode(server.clients,ln);
238 /* Remove from the list of clients that are now ready to be restarted
239 * after waiting for swapped keys */
240 if (c->flags & REDIS_IO_WAIT && listLength(c->io_keys) == 0) {
241 ln = listSearchKey(server.io_ready_clients,c);
242 if (ln) {
243 listDelNode(server.io_ready_clients,ln);
244 server.vm_blocked_clients--;
245 }
246 }
247 /* Remove from the list of clients waiting for swapped keys */
248 while (server.vm_enabled && listLength(c->io_keys)) {
249 ln = listFirst(c->io_keys);
250 dontWaitForSwappedKey(c,ln->value);
251 }
252 listRelease(c->io_keys);
253 /* Master/slave cleanup */
254 if (c->flags & REDIS_SLAVE) {
255 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
256 close(c->repldbfd);
257 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
258 ln = listSearchKey(l,c);
259 redisAssert(ln != NULL);
260 listDelNode(l,ln);
261 }
262 if (c->flags & REDIS_MASTER) {
263 server.master = NULL;
264 server.replstate = REDIS_REPL_CONNECT;
265 }
266 /* Release memory */
267 zfree(c->argv);
268 zfree(c->mbargv);
269 freeClientMultiState(c);
270 zfree(c);
271}
272
273#define GLUEREPLY_UP_TO (1024)
274static void glueReplyBuffersIfNeeded(redisClient *c) {
275 int copylen = 0;
276 char buf[GLUEREPLY_UP_TO];
277 listNode *ln;
278 listIter li;
279 robj *o;
280
281 listRewind(c->reply,&li);
282 while((ln = listNext(&li))) {
283 int objlen;
284
285 o = ln->value;
286 objlen = sdslen(o->ptr);
287 if (copylen + objlen <= GLUEREPLY_UP_TO) {
288 memcpy(buf+copylen,o->ptr,objlen);
289 copylen += objlen;
290 listDelNode(c->reply,ln);
291 } else {
292 if (copylen == 0) return;
293 break;
294 }
295 }
296 /* Now the output buffer is empty, add the new single element */
297 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
298 listAddNodeHead(c->reply,o);
299}
300
301void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
302 redisClient *c = privdata;
303 int nwritten = 0, totwritten = 0, objlen;
304 robj *o;
305 REDIS_NOTUSED(el);
306 REDIS_NOTUSED(mask);
307
308 /* Use writev() if we have enough buffers to send */
309 if (!server.glueoutputbuf &&
310 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
311 !(c->flags & REDIS_MASTER))
312 {
313 sendReplyToClientWritev(el, fd, privdata, mask);
314 return;
315 }
316
317 while(listLength(c->reply)) {
318 if (server.glueoutputbuf && listLength(c->reply) > 1)
319 glueReplyBuffersIfNeeded(c);
320
321 o = listNodeValue(listFirst(c->reply));
322 objlen = sdslen(o->ptr);
323
324 if (objlen == 0) {
325 listDelNode(c->reply,listFirst(c->reply));
326 continue;
327 }
328
329 if (c->flags & REDIS_MASTER) {
330 /* Don't reply to a master */
331 nwritten = objlen - c->sentlen;
332 } else {
333 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
334 if (nwritten <= 0) break;
335 }
336 c->sentlen += nwritten;
337 totwritten += nwritten;
338 /* If we fully sent the object on head go to the next one */
339 if (c->sentlen == objlen) {
340 listDelNode(c->reply,listFirst(c->reply));
341 c->sentlen = 0;
342 }
343 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
344 * bytes, in a single threaded server it's a good idea to serve
345 * other clients as well, even if a very large request comes from
346 * super fast link that is always able to accept data (in real world
347 * scenario think about 'KEYS *' against the loopback interfae) */
348 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
349 }
350 if (nwritten == -1) {
351 if (errno == EAGAIN) {
352 nwritten = 0;
353 } else {
354 redisLog(REDIS_VERBOSE,
355 "Error writing to client: %s", strerror(errno));
356 freeClient(c);
357 return;
358 }
359 }
360 if (totwritten > 0) c->lastinteraction = time(NULL);
361 if (listLength(c->reply) == 0) {
362 c->sentlen = 0;
363 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
364 }
365}
366
367void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
368{
369 redisClient *c = privdata;
370 int nwritten = 0, totwritten = 0, objlen, willwrite;
371 robj *o;
372 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
373 int offset, ion = 0;
374 REDIS_NOTUSED(el);
375 REDIS_NOTUSED(mask);
376
377 listNode *node;
378 while (listLength(c->reply)) {
379 offset = c->sentlen;
380 ion = 0;
381 willwrite = 0;
382
383 /* fill-in the iov[] array */
384 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
385 o = listNodeValue(node);
386 objlen = sdslen(o->ptr);
387
388 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
389 break;
390
391 if(ion == REDIS_WRITEV_IOVEC_COUNT)
392 break; /* no more iovecs */
393
394 iov[ion].iov_base = ((char*)o->ptr) + offset;
395 iov[ion].iov_len = objlen - offset;
396 willwrite += objlen - offset;
397 offset = 0; /* just for the first item */
398 ion++;
399 }
400
401 if(willwrite == 0)
402 break;
403
404 /* write all collected blocks at once */
405 if((nwritten = writev(fd, iov, ion)) < 0) {
406 if (errno != EAGAIN) {
407 redisLog(REDIS_VERBOSE,
408 "Error writing to client: %s", strerror(errno));
409 freeClient(c);
410 return;
411 }
412 break;
413 }
414
415 totwritten += nwritten;
416 offset = c->sentlen;
417
418 /* remove written robjs from c->reply */
419 while (nwritten && listLength(c->reply)) {
420 o = listNodeValue(listFirst(c->reply));
421 objlen = sdslen(o->ptr);
422
423 if(nwritten >= objlen - offset) {
424 listDelNode(c->reply, listFirst(c->reply));
425 nwritten -= objlen - offset;
426 c->sentlen = 0;
427 } else {
428 /* partial write */
429 c->sentlen += nwritten;
430 break;
431 }
432 offset = 0;
433 }
434 }
435
436 if (totwritten > 0)
437 c->lastinteraction = time(NULL);
438
439 if (listLength(c->reply) == 0) {
440 c->sentlen = 0;
441 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
442 }
443}
444
445/* resetClient prepare the client to process the next command */
446void resetClient(redisClient *c) {
447 freeClientArgv(c);
448 c->bulklen = -1;
449 c->multibulk = 0;
450}
451
452void closeTimedoutClients(void) {
453 redisClient *c;
454 listNode *ln;
455 time_t now = time(NULL);
456 listIter li;
457
458 listRewind(server.clients,&li);
459 while ((ln = listNext(&li)) != NULL) {
460 c = listNodeValue(ln);
461 if (server.maxidletime &&
462 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
463 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
464 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
465 listLength(c->pubsub_patterns) == 0 &&
466 (now - c->lastinteraction > server.maxidletime))
467 {
468 redisLog(REDIS_VERBOSE,"Closing idle client");
469 freeClient(c);
470 } else if (c->flags & REDIS_BLOCKED) {
471 if (c->blockingto != 0 && c->blockingto < now) {
472 addReply(c,shared.nullmultibulk);
473 unblockClientWaitingData(c);
474 }
475 }
476 }
477}
478
479void processInputBuffer(redisClient *c) {
480again:
481 /* Before to process the input buffer, make sure the client is not
482 * waitig for a blocking operation such as BLPOP. Note that the first
483 * iteration the client is never blocked, otherwise the processInputBuffer
484 * would not be called at all, but after the execution of the first commands
485 * in the input buffer the client may be blocked, and the "goto again"
486 * will try to reiterate. The following line will make it return asap. */
487 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
488 if (c->bulklen == -1) {
489 /* Read the first line of the query */
490 char *p = strchr(c->querybuf,'\n');
491 size_t querylen;
492
493 if (p) {
494 sds query, *argv;
495 int argc, j;
496
497 query = c->querybuf;
498 c->querybuf = sdsempty();
499 querylen = 1+(p-(query));
500 if (sdslen(query) > querylen) {
501 /* leave data after the first line of the query in the buffer */
502 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
503 }
504 *p = '\0'; /* remove "\n" */
505 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
506 sdsupdatelen(query);
507
508 /* Now we can split the query in arguments */
509 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
510 sdsfree(query);
511
512 if (c->argv) zfree(c->argv);
513 c->argv = zmalloc(sizeof(robj*)*argc);
514
515 for (j = 0; j < argc; j++) {
516 if (sdslen(argv[j])) {
517 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
518 c->argc++;
519 } else {
520 sdsfree(argv[j]);
521 }
522 }
523 zfree(argv);
524 if (c->argc) {
525 /* Execute the command. If the client is still valid
526 * after processCommand() return and there is something
527 * on the query buffer try to process the next command. */
528 if (processCommand(c) && sdslen(c->querybuf)) goto again;
529 } else {
530 /* Nothing to process, argc == 0. Just process the query
531 * buffer if it's not empty or return to the caller */
532 if (sdslen(c->querybuf)) goto again;
533 }
534 return;
535 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
536 redisLog(REDIS_VERBOSE, "Client protocol error");
537 freeClient(c);
538 return;
539 }
540 } else {
541 /* Bulk read handling. Note that if we are at this point
542 the client already sent a command terminated with a newline,
543 we are reading the bulk data that is actually the last
544 argument of the command. */
545 int qbl = sdslen(c->querybuf);
546
547 if (c->bulklen <= qbl) {
548 /* Copy everything but the final CRLF as final argument */
549 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
550 c->argc++;
551 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
552 /* Process the command. If the client is still valid after
553 * the processing and there is more data in the buffer
554 * try to parse it. */
555 if (processCommand(c) && sdslen(c->querybuf)) goto again;
556 return;
557 }
558 }
559}
560
561void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
562 redisClient *c = (redisClient*) privdata;
563 char buf[REDIS_IOBUF_LEN];
564 int nread;
565 REDIS_NOTUSED(el);
566 REDIS_NOTUSED(mask);
567
568 nread = read(fd, buf, REDIS_IOBUF_LEN);
569 if (nread == -1) {
570 if (errno == EAGAIN) {
571 nread = 0;
572 } else {
573 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
574 freeClient(c);
575 return;
576 }
577 } else if (nread == 0) {
578 redisLog(REDIS_VERBOSE, "Client closed connection");
579 freeClient(c);
580 return;
581 }
582 if (nread) {
583 c->querybuf = sdscatlen(c->querybuf, buf, nread);
584 c->lastinteraction = time(NULL);
585 } else {
586 return;
587 }
588 processInputBuffer(c);
589}