]> git.saurik.com Git - redis.git/blame_incremental - src/networking.c
Fix for bug 312, yet to verify in a couple of minutes...
[redis.git] / src / networking.c
... / ...
CommitLineData
1#include "redis.h"
2
3#include <sys/uio.h>
4
5void *dupClientReplyValue(void *o) {
6 incrRefCount((robj*)o);
7 return o;
8}
9
10int listMatchObjects(void *a, void *b) {
11 return equalStringObjects(a,b);
12}
13
14redisClient *createClient(int fd) {
15 redisClient *c = zmalloc(sizeof(*c));
16
17 anetNonBlock(NULL,fd);
18 anetTcpNoDelay(NULL,fd);
19 if (!c) return NULL;
20 selectDb(c,0);
21 c->fd = fd;
22 c->querybuf = sdsempty();
23 c->argc = 0;
24 c->argv = NULL;
25 c->bulklen = -1;
26 c->multibulk = 0;
27 c->mbargc = 0;
28 c->mbargv = NULL;
29 c->sentlen = 0;
30 c->flags = 0;
31 c->lastinteraction = time(NULL);
32 c->authenticated = 0;
33 c->replstate = REDIS_REPL_NONE;
34 c->reply = listCreate();
35 listSetFreeMethod(c->reply,decrRefCount);
36 listSetDupMethod(c->reply,dupClientReplyValue);
37 c->blocking_keys = NULL;
38 c->blocking_keys_num = 0;
39 c->io_keys = listCreate();
40 c->watched_keys = listCreate();
41 listSetFreeMethod(c->io_keys,decrRefCount);
42 c->pubsub_channels = dictCreate(&setDictType,NULL);
43 c->pubsub_patterns = listCreate();
44 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
45 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
46 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
47 readQueryFromClient, c) == AE_ERR) {
48 freeClient(c);
49 return NULL;
50 }
51 listAddNodeTail(server.clients,c);
52 initClientMultiState(c);
53 return c;
54}
55
56void addReply(redisClient *c, robj *obj) {
57 if (listLength(c->reply) == 0 &&
58 (c->replstate == REDIS_REPL_NONE ||
59 c->replstate == REDIS_REPL_ONLINE) &&
60 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
61 sendReplyToClient, c) == AE_ERR) return;
62
63 if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
64 obj = dupStringObject(obj);
65 obj->refcount = 0; /* getDecodedObject() will increment the refcount */
66 }
67 listAddNodeTail(c->reply,getDecodedObject(obj));
68}
69
70void addReplySds(redisClient *c, sds s) {
71 robj *o = createObject(REDIS_STRING,s);
72 addReply(c,o);
73 decrRefCount(o);
74}
75
76void addReplyDouble(redisClient *c, double d) {
77 char buf[128];
78
79 snprintf(buf,sizeof(buf),"%.17g",d);
80 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
81 (unsigned long) strlen(buf),buf));
82}
83
84void addReplyLongLong(redisClient *c, long long ll) {
85 char buf[128];
86 size_t len;
87
88 if (ll == 0) {
89 addReply(c,shared.czero);
90 return;
91 } else if (ll == 1) {
92 addReply(c,shared.cone);
93 return;
94 }
95 buf[0] = ':';
96 len = ll2string(buf+1,sizeof(buf)-1,ll);
97 buf[len+1] = '\r';
98 buf[len+2] = '\n';
99 addReplySds(c,sdsnewlen(buf,len+3));
100}
101
102void addReplyUlong(redisClient *c, unsigned long ul) {
103 char buf[128];
104 size_t len;
105
106 if (ul == 0) {
107 addReply(c,shared.czero);
108 return;
109 } else if (ul == 1) {
110 addReply(c,shared.cone);
111 return;
112 }
113 len = snprintf(buf,sizeof(buf),":%lu\r\n",ul);
114 addReplySds(c,sdsnewlen(buf,len));
115}
116
117void addReplyBulkLen(redisClient *c, robj *obj) {
118 size_t len, intlen;
119 char buf[128];
120
121 if (obj->encoding == REDIS_ENCODING_RAW) {
122 len = sdslen(obj->ptr);
123 } else {
124 long n = (long)obj->ptr;
125
126 /* Compute how many bytes will take this integer as a radix 10 string */
127 len = 1;
128 if (n < 0) {
129 len++;
130 n = -n;
131 }
132 while((n = n/10) != 0) {
133 len++;
134 }
135 }
136 buf[0] = '$';
137 intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len);
138 buf[intlen+1] = '\r';
139 buf[intlen+2] = '\n';
140 addReplySds(c,sdsnewlen(buf,intlen+3));
141}
142
143void addReplyBulk(redisClient *c, robj *obj) {
144 addReplyBulkLen(c,obj);
145 addReply(c,obj);
146 addReply(c,shared.crlf);
147}
148
149/* In the CONFIG command we need to add vanilla C string as bulk replies */
150void addReplyBulkCString(redisClient *c, char *s) {
151 if (s == NULL) {
152 addReply(c,shared.nullbulk);
153 } else {
154 robj *o = createStringObject(s,strlen(s));
155 addReplyBulk(c,o);
156 decrRefCount(o);
157 }
158}
159
160void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
161 int cport, cfd;
162 char cip[128];
163 redisClient *c;
164 REDIS_NOTUSED(el);
165 REDIS_NOTUSED(mask);
166 REDIS_NOTUSED(privdata);
167
168 cfd = anetAccept(server.neterr, fd, cip, &cport);
169 if (cfd == AE_ERR) {
170 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
171 return;
172 }
173 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
174 if ((c = createClient(cfd)) == NULL) {
175 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
176 close(cfd); /* May be already closed, just ingore errors */
177 return;
178 }
179 /* If maxclient directive is set and this is one client more... close the
180 * connection. Note that we create the client instead to check before
181 * for this condition, since now the socket is already set in nonblocking
182 * mode and we can send an error for free using the Kernel I/O */
183 if (server.maxclients && listLength(server.clients) > server.maxclients) {
184 char *err = "-ERR max number of clients reached\r\n";
185
186 /* That's a best effort error message, don't check write errors */
187 if (write(c->fd,err,strlen(err)) == -1) {
188 /* Nothing to do, Just to avoid the warning... */
189 }
190 freeClient(c);
191 return;
192 }
193 server.stat_numconnections++;
194}
195
196static void freeClientArgv(redisClient *c) {
197 int j;
198
199 for (j = 0; j < c->argc; j++)
200 decrRefCount(c->argv[j]);
201 for (j = 0; j < c->mbargc; j++)
202 decrRefCount(c->mbargv[j]);
203 c->argc = 0;
204 c->mbargc = 0;
205}
206
207void freeClient(redisClient *c) {
208 listNode *ln;
209
210 /* Note that if the client we are freeing is blocked into a blocking
211 * call, we have to set querybuf to NULL *before* to call
212 * unblockClientWaitingData() to avoid processInputBuffer() will get
213 * called. Also it is important to remove the file events after
214 * this, because this call adds the READABLE event. */
215 sdsfree(c->querybuf);
216 c->querybuf = NULL;
217 if (c->flags & REDIS_BLOCKED)
218 unblockClientWaitingData(c);
219
220 /* UNWATCH all the keys */
221 unwatchAllKeys(c);
222 listRelease(c->watched_keys);
223 /* Unsubscribe from all the pubsub channels */
224 pubsubUnsubscribeAllChannels(c,0);
225 pubsubUnsubscribeAllPatterns(c,0);
226 dictRelease(c->pubsub_channels);
227 listRelease(c->pubsub_patterns);
228 /* Obvious cleanup */
229 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
230 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
231 listRelease(c->reply);
232 freeClientArgv(c);
233 close(c->fd);
234 /* Remove from the list of clients */
235 ln = listSearchKey(server.clients,c);
236 redisAssert(ln != NULL);
237 listDelNode(server.clients,ln);
238 /* Remove from the list of clients waiting for swapped keys, or ready
239 * to be restarted, but not yet woken up again. */
240 if (c->flags & REDIS_IO_WAIT) {
241 redisAssert(server.vm_enabled);
242 if (listLength(c->io_keys) == 0) {
243 ln = listSearchKey(server.io_ready_clients,c);
244
245 /* When this client is waiting to be woken up (REDIS_IO_WAIT),
246 * it should be present in the list io_ready_clients */
247 redisAssert(ln != NULL);
248 listDelNode(server.io_ready_clients,ln);
249 } else {
250 while (listLength(c->io_keys)) {
251 ln = listFirst(c->io_keys);
252 dontWaitForSwappedKey(c,ln->value);
253 }
254 }
255 server.vm_blocked_clients--;
256 }
257 listRelease(c->io_keys);
258 /* Master/slave cleanup.
259 * Case 1: we lost the connection with a slave. */
260 if (c->flags & REDIS_SLAVE) {
261 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
262 close(c->repldbfd);
263 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
264 ln = listSearchKey(l,c);
265 redisAssert(ln != NULL);
266 listDelNode(l,ln);
267 }
268
269 /* Case 2: we lost the connection with the master. */
270 if (c->flags & REDIS_MASTER) {
271 server.master = NULL;
272 server.replstate = REDIS_REPL_CONNECT;
273 /* Since we lost the connection with the master, we should also
274 * close the connection with all our slaves if we have any, so
275 * when we'll resync with the master the other slaves will sync again
276 * with us as well. Note that also when the slave is not connected
277 * to the master it will keep refusing connections by other slaves. */
278 while (listLength(server.slaves)) {
279 ln = listFirst(server.slaves);
280 freeClient((redisClient*)ln->value);
281 }
282 }
283 /* Release memory */
284 zfree(c->argv);
285 zfree(c->mbargv);
286 freeClientMultiState(c);
287 zfree(c);
288}
289
290#define GLUEREPLY_UP_TO (1024)
291static void glueReplyBuffersIfNeeded(redisClient *c) {
292 int copylen = 0;
293 char buf[GLUEREPLY_UP_TO];
294 listNode *ln;
295 listIter li;
296 robj *o;
297
298 listRewind(c->reply,&li);
299 while((ln = listNext(&li))) {
300 int objlen;
301
302 o = ln->value;
303 objlen = sdslen(o->ptr);
304 if (copylen + objlen <= GLUEREPLY_UP_TO) {
305 memcpy(buf+copylen,o->ptr,objlen);
306 copylen += objlen;
307 listDelNode(c->reply,ln);
308 } else {
309 if (copylen == 0) return;
310 break;
311 }
312 }
313 /* Now the output buffer is empty, add the new single element */
314 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
315 listAddNodeHead(c->reply,o);
316}
317
318void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
319 redisClient *c = privdata;
320 int nwritten = 0, totwritten = 0, objlen;
321 robj *o;
322 REDIS_NOTUSED(el);
323 REDIS_NOTUSED(mask);
324
325 /* Use writev() if we have enough buffers to send */
326 if (!server.glueoutputbuf &&
327 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
328 !(c->flags & REDIS_MASTER))
329 {
330 sendReplyToClientWritev(el, fd, privdata, mask);
331 return;
332 }
333
334 while(listLength(c->reply)) {
335 if (server.glueoutputbuf && listLength(c->reply) > 1)
336 glueReplyBuffersIfNeeded(c);
337
338 o = listNodeValue(listFirst(c->reply));
339 objlen = sdslen(o->ptr);
340
341 if (objlen == 0) {
342 listDelNode(c->reply,listFirst(c->reply));
343 continue;
344 }
345
346 if (c->flags & REDIS_MASTER) {
347 /* Don't reply to a master */
348 nwritten = objlen - c->sentlen;
349 } else {
350 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
351 if (nwritten <= 0) break;
352 }
353 c->sentlen += nwritten;
354 totwritten += nwritten;
355 /* If we fully sent the object on head go to the next one */
356 if (c->sentlen == objlen) {
357 listDelNode(c->reply,listFirst(c->reply));
358 c->sentlen = 0;
359 }
360 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
361 * bytes, in a single threaded server it's a good idea to serve
362 * other clients as well, even if a very large request comes from
363 * super fast link that is always able to accept data (in real world
364 * scenario think about 'KEYS *' against the loopback interfae) */
365 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
366 }
367 if (nwritten == -1) {
368 if (errno == EAGAIN) {
369 nwritten = 0;
370 } else {
371 redisLog(REDIS_VERBOSE,
372 "Error writing to client: %s", strerror(errno));
373 freeClient(c);
374 return;
375 }
376 }
377 if (totwritten > 0) c->lastinteraction = time(NULL);
378 if (listLength(c->reply) == 0) {
379 c->sentlen = 0;
380 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
381 }
382}
383
384void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
385{
386 redisClient *c = privdata;
387 int nwritten = 0, totwritten = 0, objlen, willwrite;
388 robj *o;
389 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
390 int offset, ion = 0;
391 REDIS_NOTUSED(el);
392 REDIS_NOTUSED(mask);
393
394 listNode *node;
395 while (listLength(c->reply)) {
396 offset = c->sentlen;
397 ion = 0;
398 willwrite = 0;
399
400 /* fill-in the iov[] array */
401 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
402 o = listNodeValue(node);
403 objlen = sdslen(o->ptr);
404
405 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
406 break;
407
408 if(ion == REDIS_WRITEV_IOVEC_COUNT)
409 break; /* no more iovecs */
410
411 iov[ion].iov_base = ((char*)o->ptr) + offset;
412 iov[ion].iov_len = objlen - offset;
413 willwrite += objlen - offset;
414 offset = 0; /* just for the first item */
415 ion++;
416 }
417
418 if(willwrite == 0)
419 break;
420
421 /* write all collected blocks at once */
422 if((nwritten = writev(fd, iov, ion)) < 0) {
423 if (errno != EAGAIN) {
424 redisLog(REDIS_VERBOSE,
425 "Error writing to client: %s", strerror(errno));
426 freeClient(c);
427 return;
428 }
429 break;
430 }
431
432 totwritten += nwritten;
433 offset = c->sentlen;
434
435 /* remove written robjs from c->reply */
436 while (nwritten && listLength(c->reply)) {
437 o = listNodeValue(listFirst(c->reply));
438 objlen = sdslen(o->ptr);
439
440 if(nwritten >= objlen - offset) {
441 listDelNode(c->reply, listFirst(c->reply));
442 nwritten -= objlen - offset;
443 c->sentlen = 0;
444 } else {
445 /* partial write */
446 c->sentlen += nwritten;
447 break;
448 }
449 offset = 0;
450 }
451 }
452
453 if (totwritten > 0)
454 c->lastinteraction = time(NULL);
455
456 if (listLength(c->reply) == 0) {
457 c->sentlen = 0;
458 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
459 }
460}
461
462/* resetClient prepare the client to process the next command */
463void resetClient(redisClient *c) {
464 freeClientArgv(c);
465 c->bulklen = -1;
466 c->multibulk = 0;
467}
468
469void closeTimedoutClients(void) {
470 redisClient *c;
471 listNode *ln;
472 time_t now = time(NULL);
473 listIter li;
474
475 listRewind(server.clients,&li);
476 while ((ln = listNext(&li)) != NULL) {
477 c = listNodeValue(ln);
478 if (server.maxidletime &&
479 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
480 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
481 !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
482 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
483 listLength(c->pubsub_patterns) == 0 &&
484 (now - c->lastinteraction > server.maxidletime))
485 {
486 redisLog(REDIS_VERBOSE,"Closing idle client");
487 freeClient(c);
488 } else if (c->flags & REDIS_BLOCKED) {
489 if (c->blockingto != 0 && c->blockingto < now) {
490 addReply(c,shared.nullmultibulk);
491 unblockClientWaitingData(c);
492 }
493 }
494 }
495}
496
497void processInputBuffer(redisClient *c) {
498again:
499 /* Before to process the input buffer, make sure the client is not
500 * waitig for a blocking operation such as BLPOP. Note that the first
501 * iteration the client is never blocked, otherwise the processInputBuffer
502 * would not be called at all, but after the execution of the first commands
503 * in the input buffer the client may be blocked, and the "goto again"
504 * will try to reiterate. The following line will make it return asap. */
505 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
506 if (c->bulklen == -1) {
507 /* Read the first line of the query */
508 char *p = strchr(c->querybuf,'\n');
509 size_t querylen;
510
511 if (p) {
512 sds query, *argv;
513 int argc, j;
514
515 query = c->querybuf;
516 c->querybuf = sdsempty();
517 querylen = 1+(p-(query));
518 if (sdslen(query) > querylen) {
519 /* leave data after the first line of the query in the buffer */
520 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
521 }
522 *p = '\0'; /* remove "\n" */
523 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
524 sdsupdatelen(query);
525
526 /* Now we can split the query in arguments */
527 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
528 sdsfree(query);
529
530 if (c->argv) zfree(c->argv);
531 c->argv = zmalloc(sizeof(robj*)*argc);
532
533 for (j = 0; j < argc; j++) {
534 if (sdslen(argv[j])) {
535 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
536 c->argc++;
537 } else {
538 sdsfree(argv[j]);
539 }
540 }
541 zfree(argv);
542 if (c->argc) {
543 /* Execute the command. If the client is still valid
544 * after processCommand() return and there is something
545 * on the query buffer try to process the next command. */
546 if (processCommand(c) && sdslen(c->querybuf)) goto again;
547 } else {
548 /* Nothing to process, argc == 0. Just process the query
549 * buffer if it's not empty or return to the caller */
550 if (sdslen(c->querybuf)) goto again;
551 }
552 return;
553 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
554 redisLog(REDIS_VERBOSE, "Client protocol error");
555 freeClient(c);
556 return;
557 }
558 } else {
559 /* Bulk read handling. Note that if we are at this point
560 the client already sent a command terminated with a newline,
561 we are reading the bulk data that is actually the last
562 argument of the command. */
563 int qbl = sdslen(c->querybuf);
564
565 if (c->bulklen <= qbl) {
566 /* Copy everything but the final CRLF as final argument */
567 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
568 c->argc++;
569 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
570 /* Process the command. If the client is still valid after
571 * the processing and there is more data in the buffer
572 * try to parse it. */
573 if (processCommand(c) && sdslen(c->querybuf)) goto again;
574 return;
575 }
576 }
577}
578
579void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
580 redisClient *c = (redisClient*) privdata;
581 char buf[REDIS_IOBUF_LEN];
582 int nread;
583 REDIS_NOTUSED(el);
584 REDIS_NOTUSED(mask);
585
586 nread = read(fd, buf, REDIS_IOBUF_LEN);
587 if (nread == -1) {
588 if (errno == EAGAIN) {
589 nread = 0;
590 } else {
591 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
592 freeClient(c);
593 return;
594 }
595 } else if (nread == 0) {
596 redisLog(REDIS_VERBOSE, "Client closed connection");
597 freeClient(c);
598 return;
599 }
600 if (nread) {
601 c->querybuf = sdscatlen(c->querybuf, buf, nread);
602 c->lastinteraction = time(NULL);
603 } else {
604 return;
605 }
606 processInputBuffer(c);
607}