]> git.saurik.com Git - redis.git/blob - src/networking.c
Refactor reply buildup for speed on large multi bulk replies
[redis.git] / src / networking.c
1 #include "redis.h"
2 #include <sys/uio.h>
3
4 void *dupClientReplyValue(void *o) {
5 incrRefCount((robj*)o);
6 return o;
7 }
8
9 int listMatchObjects(void *a, void *b) {
10 return equalStringObjects(a,b);
11 }
12
13 redisClient *createClient(int fd) {
14 redisClient *c;
15
16 /* Make sure to allocate a multiple of the page size to prevent wasting
17 * memory. A page size of 4096 is assumed here. We need to compensate
18 * for the zmalloc overhead of sizeof(size_t) bytes. */
19 size_t size = 8192-sizeof(size_t);
20 redisAssert(size > sizeof(redisClient));
21 c = zmalloc(size);
22 c->buflen = size-sizeof(redisClient);
23 c->bufpos = 0;
24
25 anetNonBlock(NULL,fd);
26 anetTcpNoDelay(NULL,fd);
27 if (!c) return NULL;
28 selectDb(c,0);
29 c->fd = fd;
30 c->querybuf = sdsempty();
31 c->argc = 0;
32 c->argv = NULL;
33 c->bulklen = -1;
34 c->multibulk = 0;
35 c->mbargc = 0;
36 c->mbargv = NULL;
37 c->sentlen = 0;
38 c->flags = 0;
39 c->lastinteraction = time(NULL);
40 c->authenticated = 0;
41 c->replstate = REDIS_REPL_NONE;
42 c->reply = listCreate();
43 listSetFreeMethod(c->reply,decrRefCount);
44 listSetDupMethod(c->reply,dupClientReplyValue);
45 c->blocking_keys = NULL;
46 c->blocking_keys_num = 0;
47 c->io_keys = listCreate();
48 c->watched_keys = listCreate();
49 listSetFreeMethod(c->io_keys,decrRefCount);
50 c->pubsub_channels = dictCreate(&setDictType,NULL);
51 c->pubsub_patterns = listCreate();
52 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
53 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
54 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
55 readQueryFromClient, c) == AE_ERR) {
56 freeClient(c);
57 return NULL;
58 }
59 listAddNodeTail(server.clients,c);
60 initClientMultiState(c);
61 return c;
62 }
63
64 int _ensureFileEvent(redisClient *c) {
65 if (c->bufpos == 0 && listLength(c->reply) == 0 &&
66 (c->replstate == REDIS_REPL_NONE ||
67 c->replstate == REDIS_REPL_ONLINE) &&
68 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
69 sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
70 return REDIS_OK;
71 }
72
73 void _addReplyObjectToList(redisClient *c, robj *obj) {
74 redisAssert(obj->type == REDIS_STRING &&
75 obj->encoding == REDIS_ENCODING_RAW);
76 listAddNodeTail(c->reply,obj);
77 }
78
79 void _ensureBufferInReplyList(redisClient *c) {
80 sds buffer = sdsnewlen(NULL,REDIS_REPLY_CHUNK_SIZE);
81 sdsupdatelen(buffer); /* sdsnewlen expects non-empty string */
82 listAddNodeTail(c->reply,createObject(REDIS_REPLY_NODE,buffer));
83 }
84
85 void _addReplyStringToBuffer(redisClient *c, char *s, size_t len) {
86 size_t available = 0;
87 redisAssert(len < REDIS_REPLY_CHUNK_THRESHOLD);
88 if (listLength(c->reply) > 0) {
89 robj *o = listNodeValue(listLast(c->reply));
90
91 /* Make sure to append to a reply node with enough bytes available. */
92 if (o->type == REDIS_REPLY_NODE) available = sdsavail(o->ptr);
93 if (o->type != REDIS_REPLY_NODE || len > available) {
94 _ensureBufferInReplyList(c);
95 _addReplyStringToBuffer(c,s,len);
96 } else {
97 o->ptr = sdscatlen(o->ptr,s,len);
98 }
99 } else {
100 available = c->buflen-c->bufpos;
101 if (len > available) {
102 _ensureBufferInReplyList(c);
103 _addReplyStringToBuffer(c,s,len);
104 } else {
105 memcpy(c->buf+c->bufpos,s,len);
106 c->bufpos += len;
107 }
108 }
109 }
110
111 void addReply(redisClient *c, robj *obj) {
112 if (_ensureFileEvent(c) != REDIS_OK) return;
113 if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
114 /* Returns a new object with refcount 1 */
115 obj = dupStringObject(obj);
116 } else {
117 /* This increments the refcount. */
118 obj = getDecodedObject(obj);
119 }
120
121 if (sdslen(obj->ptr) < REDIS_REPLY_CHUNK_THRESHOLD) {
122 _addReplyStringToBuffer(c,obj->ptr,sdslen(obj->ptr));
123 decrRefCount(obj);
124 } else {
125 _addReplyObjectToList(c,obj);
126 }
127 }
128
129 void addReplySds(redisClient *c, sds s) {
130 if (_ensureFileEvent(c) != REDIS_OK) return;
131 if (sdslen(s) < REDIS_REPLY_CHUNK_THRESHOLD) {
132 _addReplyStringToBuffer(c,s,sdslen(s));
133 sdsfree(s);
134 } else {
135 _addReplyObjectToList(c,createObject(REDIS_STRING,s));
136 }
137 }
138
139 void addReplyString(redisClient *c, char *s, size_t len) {
140 if (_ensureFileEvent(c) != REDIS_OK) return;
141 if (len < REDIS_REPLY_CHUNK_THRESHOLD) {
142 _addReplyStringToBuffer(c,s,len);
143 } else {
144 _addReplyObjectToList(c,createStringObject(s,len));
145 }
146 }
147
148 void addReplyDouble(redisClient *c, double d) {
149 char dbuf[128], sbuf[128];
150 int dlen, slen;
151 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
152 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
153 addReplyString(c,sbuf,slen);
154 }
155
156 void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
157 char buf[128];
158 int len;
159 buf[0] = prefix;
160 len = ll2string(buf+1,sizeof(buf)-1,ll);
161 buf[len+1] = '\r';
162 buf[len+2] = '\n';
163 addReplyString(c,buf,len+3);
164 }
165
166 void addReplyLongLong(redisClient *c, long long ll) {
167 _addReplyLongLong(c,ll,':');
168 }
169
170 void addReplyUlong(redisClient *c, unsigned long ul) {
171 _addReplyLongLong(c,(long long)ul,':');
172 }
173
174 void addReplyBulkLen(redisClient *c, robj *obj) {
175 size_t len;
176
177 if (obj->encoding == REDIS_ENCODING_RAW) {
178 len = sdslen(obj->ptr);
179 } else {
180 long n = (long)obj->ptr;
181
182 /* Compute how many bytes will take this integer as a radix 10 string */
183 len = 1;
184 if (n < 0) {
185 len++;
186 n = -n;
187 }
188 while((n = n/10) != 0) {
189 len++;
190 }
191 }
192 _addReplyLongLong(c,len,'$');
193 }
194
195 void addReplyBulk(redisClient *c, robj *obj) {
196 addReplyBulkLen(c,obj);
197 addReply(c,obj);
198 addReply(c,shared.crlf);
199 }
200
201 /* In the CONFIG command we need to add vanilla C string as bulk replies */
202 void addReplyBulkCString(redisClient *c, char *s) {
203 if (s == NULL) {
204 addReply(c,shared.nullbulk);
205 } else {
206 robj *o = createStringObject(s,strlen(s));
207 addReplyBulk(c,o);
208 decrRefCount(o);
209 }
210 }
211
212 void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
213 int cport, cfd;
214 char cip[128];
215 redisClient *c;
216 REDIS_NOTUSED(el);
217 REDIS_NOTUSED(mask);
218 REDIS_NOTUSED(privdata);
219
220 cfd = anetAccept(server.neterr, fd, cip, &cport);
221 if (cfd == AE_ERR) {
222 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
223 return;
224 }
225 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
226 if ((c = createClient(cfd)) == NULL) {
227 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
228 close(cfd); /* May be already closed, just ingore errors */
229 return;
230 }
231 /* If maxclient directive is set and this is one client more... close the
232 * connection. Note that we create the client instead to check before
233 * for this condition, since now the socket is already set in nonblocking
234 * mode and we can send an error for free using the Kernel I/O */
235 if (server.maxclients && listLength(server.clients) > server.maxclients) {
236 char *err = "-ERR max number of clients reached\r\n";
237
238 /* That's a best effort error message, don't check write errors */
239 if (write(c->fd,err,strlen(err)) == -1) {
240 /* Nothing to do, Just to avoid the warning... */
241 }
242 freeClient(c);
243 return;
244 }
245 server.stat_numconnections++;
246 }
247
248 static void freeClientArgv(redisClient *c) {
249 int j;
250
251 for (j = 0; j < c->argc; j++)
252 decrRefCount(c->argv[j]);
253 for (j = 0; j < c->mbargc; j++)
254 decrRefCount(c->mbargv[j]);
255 c->argc = 0;
256 c->mbargc = 0;
257 }
258
259 void freeClient(redisClient *c) {
260 listNode *ln;
261
262 /* Note that if the client we are freeing is blocked into a blocking
263 * call, we have to set querybuf to NULL *before* to call
264 * unblockClientWaitingData() to avoid processInputBuffer() will get
265 * called. Also it is important to remove the file events after
266 * this, because this call adds the READABLE event. */
267 sdsfree(c->querybuf);
268 c->querybuf = NULL;
269 if (c->flags & REDIS_BLOCKED)
270 unblockClientWaitingData(c);
271
272 /* UNWATCH all the keys */
273 unwatchAllKeys(c);
274 listRelease(c->watched_keys);
275 /* Unsubscribe from all the pubsub channels */
276 pubsubUnsubscribeAllChannels(c,0);
277 pubsubUnsubscribeAllPatterns(c,0);
278 dictRelease(c->pubsub_channels);
279 listRelease(c->pubsub_patterns);
280 /* Obvious cleanup */
281 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
282 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
283 listRelease(c->reply);
284 freeClientArgv(c);
285 close(c->fd);
286 /* Remove from the list of clients */
287 ln = listSearchKey(server.clients,c);
288 redisAssert(ln != NULL);
289 listDelNode(server.clients,ln);
290 /* Remove from the list of clients waiting for swapped keys, or ready
291 * to be restarted, but not yet woken up again. */
292 if (c->flags & REDIS_IO_WAIT) {
293 redisAssert(server.vm_enabled);
294 if (listLength(c->io_keys) == 0) {
295 ln = listSearchKey(server.io_ready_clients,c);
296
297 /* When this client is waiting to be woken up (REDIS_IO_WAIT),
298 * it should be present in the list io_ready_clients */
299 redisAssert(ln != NULL);
300 listDelNode(server.io_ready_clients,ln);
301 } else {
302 while (listLength(c->io_keys)) {
303 ln = listFirst(c->io_keys);
304 dontWaitForSwappedKey(c,ln->value);
305 }
306 }
307 server.vm_blocked_clients--;
308 }
309 listRelease(c->io_keys);
310 /* Master/slave cleanup.
311 * Case 1: we lost the connection with a slave. */
312 if (c->flags & REDIS_SLAVE) {
313 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
314 close(c->repldbfd);
315 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
316 ln = listSearchKey(l,c);
317 redisAssert(ln != NULL);
318 listDelNode(l,ln);
319 }
320
321 /* Case 2: we lost the connection with the master. */
322 if (c->flags & REDIS_MASTER) {
323 server.master = NULL;
324 server.replstate = REDIS_REPL_CONNECT;
325 /* Since we lost the connection with the master, we should also
326 * close the connection with all our slaves if we have any, so
327 * when we'll resync with the master the other slaves will sync again
328 * with us as well. Note that also when the slave is not connected
329 * to the master it will keep refusing connections by other slaves. */
330 while (listLength(server.slaves)) {
331 ln = listFirst(server.slaves);
332 freeClient((redisClient*)ln->value);
333 }
334 }
335 /* Release memory */
336 zfree(c->argv);
337 zfree(c->mbargv);
338 freeClientMultiState(c);
339 zfree(c);
340 }
341
342 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
343 redisClient *c = privdata;
344 int nwritten = 0, totwritten = 0, objlen;
345 robj *o;
346 REDIS_NOTUSED(el);
347 REDIS_NOTUSED(mask);
348
349 /* Use writev() if we have enough buffers to send */
350 if (!server.glueoutputbuf &&
351 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
352 !(c->flags & REDIS_MASTER))
353 {
354 sendReplyToClientWritev(el, fd, privdata, mask);
355 return;
356 }
357
358 while(c->bufpos > 0 || listLength(c->reply)) {
359 if (c->bufpos > 0) {
360 if (c->flags & REDIS_MASTER) {
361 /* Don't reply to a master */
362 nwritten = c->bufpos - c->sentlen;
363 } else {
364 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
365 if (nwritten <= 0) break;
366 }
367 c->sentlen += nwritten;
368 totwritten += nwritten;
369
370 /* If the buffer was sent, set bufpos to zero to continue with
371 * the remainder of the reply. */
372 if (c->sentlen == c->bufpos) {
373 c->bufpos = 0;
374 c->sentlen = 0;
375 }
376 } else {
377 o = listNodeValue(listFirst(c->reply));
378 objlen = sdslen(o->ptr);
379
380 if (objlen == 0) {
381 listDelNode(c->reply,listFirst(c->reply));
382 continue;
383 }
384
385 if (c->flags & REDIS_MASTER) {
386 /* Don't reply to a master */
387 nwritten = objlen - c->sentlen;
388 } else {
389 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
390 if (nwritten <= 0) break;
391 }
392 c->sentlen += nwritten;
393 totwritten += nwritten;
394
395 /* If we fully sent the object on head go to the next one */
396 if (c->sentlen == objlen) {
397 listDelNode(c->reply,listFirst(c->reply));
398 c->sentlen = 0;
399 }
400 }
401 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
402 * bytes, in a single threaded server it's a good idea to serve
403 * other clients as well, even if a very large request comes from
404 * super fast link that is always able to accept data (in real world
405 * scenario think about 'KEYS *' against the loopback interfae) */
406 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
407 }
408 if (nwritten == -1) {
409 if (errno == EAGAIN) {
410 nwritten = 0;
411 } else {
412 redisLog(REDIS_VERBOSE,
413 "Error writing to client: %s", strerror(errno));
414 freeClient(c);
415 return;
416 }
417 }
418 if (totwritten > 0) c->lastinteraction = time(NULL);
419 if (listLength(c->reply) == 0) {
420 c->sentlen = 0;
421 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
422 }
423 }
424
425 void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
426 {
427 redisClient *c = privdata;
428 int nwritten = 0, totwritten = 0, objlen, willwrite;
429 robj *o;
430 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
431 int offset, ion = 0;
432 REDIS_NOTUSED(el);
433 REDIS_NOTUSED(mask);
434
435 listNode *node;
436 while (listLength(c->reply)) {
437 offset = c->sentlen;
438 ion = 0;
439 willwrite = 0;
440
441 /* fill-in the iov[] array */
442 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
443 o = listNodeValue(node);
444 objlen = sdslen(o->ptr);
445
446 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
447 break;
448
449 if(ion == REDIS_WRITEV_IOVEC_COUNT)
450 break; /* no more iovecs */
451
452 iov[ion].iov_base = ((char*)o->ptr) + offset;
453 iov[ion].iov_len = objlen - offset;
454 willwrite += objlen - offset;
455 offset = 0; /* just for the first item */
456 ion++;
457 }
458
459 if(willwrite == 0)
460 break;
461
462 /* write all collected blocks at once */
463 if((nwritten = writev(fd, iov, ion)) < 0) {
464 if (errno != EAGAIN) {
465 redisLog(REDIS_VERBOSE,
466 "Error writing to client: %s", strerror(errno));
467 freeClient(c);
468 return;
469 }
470 break;
471 }
472
473 totwritten += nwritten;
474 offset = c->sentlen;
475
476 /* remove written robjs from c->reply */
477 while (nwritten && listLength(c->reply)) {
478 o = listNodeValue(listFirst(c->reply));
479 objlen = sdslen(o->ptr);
480
481 if(nwritten >= objlen - offset) {
482 listDelNode(c->reply, listFirst(c->reply));
483 nwritten -= objlen - offset;
484 c->sentlen = 0;
485 } else {
486 /* partial write */
487 c->sentlen += nwritten;
488 break;
489 }
490 offset = 0;
491 }
492 }
493
494 if (totwritten > 0)
495 c->lastinteraction = time(NULL);
496
497 if (listLength(c->reply) == 0) {
498 c->sentlen = 0;
499 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
500 }
501 }
502
503 /* resetClient prepare the client to process the next command */
504 void resetClient(redisClient *c) {
505 freeClientArgv(c);
506 c->bulklen = -1;
507 c->multibulk = 0;
508 }
509
510 void closeTimedoutClients(void) {
511 redisClient *c;
512 listNode *ln;
513 time_t now = time(NULL);
514 listIter li;
515
516 listRewind(server.clients,&li);
517 while ((ln = listNext(&li)) != NULL) {
518 c = listNodeValue(ln);
519 if (server.maxidletime &&
520 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
521 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
522 !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
523 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
524 listLength(c->pubsub_patterns) == 0 &&
525 (now - c->lastinteraction > server.maxidletime))
526 {
527 redisLog(REDIS_VERBOSE,"Closing idle client");
528 freeClient(c);
529 } else if (c->flags & REDIS_BLOCKED) {
530 if (c->blockingto != 0 && c->blockingto < now) {
531 addReply(c,shared.nullmultibulk);
532 unblockClientWaitingData(c);
533 }
534 }
535 }
536 }
537
538 void processInputBuffer(redisClient *c) {
539 again:
540 /* Before to process the input buffer, make sure the client is not
541 * waitig for a blocking operation such as BLPOP. Note that the first
542 * iteration the client is never blocked, otherwise the processInputBuffer
543 * would not be called at all, but after the execution of the first commands
544 * in the input buffer the client may be blocked, and the "goto again"
545 * will try to reiterate. The following line will make it return asap. */
546 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
547 if (c->bulklen == -1) {
548 /* Read the first line of the query */
549 char *p = strchr(c->querybuf,'\n');
550 size_t querylen;
551
552 if (p) {
553 sds query, *argv;
554 int argc, j;
555
556 query = c->querybuf;
557 c->querybuf = sdsempty();
558 querylen = 1+(p-(query));
559 if (sdslen(query) > querylen) {
560 /* leave data after the first line of the query in the buffer */
561 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
562 }
563 *p = '\0'; /* remove "\n" */
564 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
565 sdsupdatelen(query);
566
567 /* Now we can split the query in arguments */
568 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
569 sdsfree(query);
570
571 if (c->argv) zfree(c->argv);
572 c->argv = zmalloc(sizeof(robj*)*argc);
573
574 for (j = 0; j < argc; j++) {
575 if (sdslen(argv[j])) {
576 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
577 c->argc++;
578 } else {
579 sdsfree(argv[j]);
580 }
581 }
582 zfree(argv);
583 if (c->argc) {
584 /* Execute the command. If the client is still valid
585 * after processCommand() return and there is something
586 * on the query buffer try to process the next command. */
587 if (processCommand(c) && sdslen(c->querybuf)) goto again;
588 } else {
589 /* Nothing to process, argc == 0. Just process the query
590 * buffer if it's not empty or return to the caller */
591 if (sdslen(c->querybuf)) goto again;
592 }
593 return;
594 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
595 redisLog(REDIS_VERBOSE, "Client protocol error");
596 freeClient(c);
597 return;
598 }
599 } else {
600 /* Bulk read handling. Note that if we are at this point
601 the client already sent a command terminated with a newline,
602 we are reading the bulk data that is actually the last
603 argument of the command. */
604 int qbl = sdslen(c->querybuf);
605
606 if (c->bulklen <= qbl) {
607 /* Copy everything but the final CRLF as final argument */
608 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
609 c->argc++;
610 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
611 /* Process the command. If the client is still valid after
612 * the processing and there is more data in the buffer
613 * try to parse it. */
614 if (processCommand(c) && sdslen(c->querybuf)) goto again;
615 return;
616 }
617 }
618 }
619
620 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
621 redisClient *c = (redisClient*) privdata;
622 char buf[REDIS_IOBUF_LEN];
623 int nread;
624 REDIS_NOTUSED(el);
625 REDIS_NOTUSED(mask);
626
627 nread = read(fd, buf, REDIS_IOBUF_LEN);
628 if (nread == -1) {
629 if (errno == EAGAIN) {
630 nread = 0;
631 } else {
632 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
633 freeClient(c);
634 return;
635 }
636 } else if (nread == 0) {
637 redisLog(REDIS_VERBOSE, "Client closed connection");
638 freeClient(c);
639 return;
640 }
641 if (nread) {
642 c->querybuf = sdscatlen(c->querybuf, buf, nread);
643 c->lastinteraction = time(NULL);
644 } else {
645 return;
646 }
647 processInputBuffer(c);
648 }