]> git.saurik.com Git - redis.git/blame - src/networking.c
Fixed missed use of INSTALL_TOP
[redis.git] / src / networking.c
CommitLineData
e2641e09 1#include "redis.h"
2
3#include <sys/uio.h>
4
5void *dupClientReplyValue(void *o) {
6 incrRefCount((robj*)o);
7 return o;
8}
9
10int listMatchObjects(void *a, void *b) {
11 return equalStringObjects(a,b);
12}
13
14redisClient *createClient(int fd) {
15 redisClient *c = zmalloc(sizeof(*c));
16
17 anetNonBlock(NULL,fd);
18 anetTcpNoDelay(NULL,fd);
19 if (!c) return NULL;
106bd87a
PN
20 if (aeCreateFileEvent(server.el,fd,AE_READABLE,
21 readQueryFromClient, c) == AE_ERR)
22 {
23 close(fd);
24 zfree(c);
25 return NULL;
26 }
27
e2641e09 28 selectDb(c,0);
29 c->fd = fd;
30 c->querybuf = sdsempty();
31 c->argc = 0;
32 c->argv = NULL;
33 c->bulklen = -1;
34 c->multibulk = 0;
35 c->mbargc = 0;
36 c->mbargv = NULL;
37 c->sentlen = 0;
38 c->flags = 0;
39 c->lastinteraction = time(NULL);
40 c->authenticated = 0;
41 c->replstate = REDIS_REPL_NONE;
42 c->reply = listCreate();
43 listSetFreeMethod(c->reply,decrRefCount);
44 listSetDupMethod(c->reply,dupClientReplyValue);
45 c->blocking_keys = NULL;
46 c->blocking_keys_num = 0;
47 c->io_keys = listCreate();
48 c->watched_keys = listCreate();
49 listSetFreeMethod(c->io_keys,decrRefCount);
50 c->pubsub_channels = dictCreate(&setDictType,NULL);
51 c->pubsub_patterns = listCreate();
52 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
53 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
e2641e09 54 listAddNodeTail(server.clients,c);
55 initClientMultiState(c);
56 return c;
57}
58
59void addReply(redisClient *c, robj *obj) {
60 if (listLength(c->reply) == 0 &&
61 (c->replstate == REDIS_REPL_NONE ||
62 c->replstate == REDIS_REPL_ONLINE) &&
63 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
64 sendReplyToClient, c) == AE_ERR) return;
65
66 if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
67 obj = dupStringObject(obj);
68 obj->refcount = 0; /* getDecodedObject() will increment the refcount */
69 }
70 listAddNodeTail(c->reply,getDecodedObject(obj));
71}
72
73void addReplySds(redisClient *c, sds s) {
74 robj *o = createObject(REDIS_STRING,s);
75 addReply(c,o);
76 decrRefCount(o);
77}
78
79void addReplyDouble(redisClient *c, double d) {
80 char buf[128];
81
82 snprintf(buf,sizeof(buf),"%.17g",d);
83 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
84 (unsigned long) strlen(buf),buf));
85}
86
87void addReplyLongLong(redisClient *c, long long ll) {
88 char buf[128];
89 size_t len;
90
91 if (ll == 0) {
92 addReply(c,shared.czero);
93 return;
94 } else if (ll == 1) {
95 addReply(c,shared.cone);
96 return;
97 }
98 buf[0] = ':';
99 len = ll2string(buf+1,sizeof(buf)-1,ll);
100 buf[len+1] = '\r';
101 buf[len+2] = '\n';
102 addReplySds(c,sdsnewlen(buf,len+3));
103}
104
105void addReplyUlong(redisClient *c, unsigned long ul) {
106 char buf[128];
107 size_t len;
108
109 if (ul == 0) {
110 addReply(c,shared.czero);
111 return;
112 } else if (ul == 1) {
113 addReply(c,shared.cone);
114 return;
115 }
116 len = snprintf(buf,sizeof(buf),":%lu\r\n",ul);
117 addReplySds(c,sdsnewlen(buf,len));
118}
119
120void addReplyBulkLen(redisClient *c, robj *obj) {
121 size_t len, intlen;
122 char buf[128];
123
124 if (obj->encoding == REDIS_ENCODING_RAW) {
125 len = sdslen(obj->ptr);
126 } else {
127 long n = (long)obj->ptr;
128
129 /* Compute how many bytes will take this integer as a radix 10 string */
130 len = 1;
131 if (n < 0) {
132 len++;
133 n = -n;
134 }
135 while((n = n/10) != 0) {
136 len++;
137 }
138 }
139 buf[0] = '$';
140 intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len);
141 buf[intlen+1] = '\r';
142 buf[intlen+2] = '\n';
143 addReplySds(c,sdsnewlen(buf,intlen+3));
144}
145
146void addReplyBulk(redisClient *c, robj *obj) {
147 addReplyBulkLen(c,obj);
148 addReply(c,obj);
149 addReply(c,shared.crlf);
150}
151
152/* In the CONFIG command we need to add vanilla C string as bulk replies */
153void addReplyBulkCString(redisClient *c, char *s) {
154 if (s == NULL) {
155 addReply(c,shared.nullbulk);
156 } else {
157 robj *o = createStringObject(s,strlen(s));
158 addReplyBulk(c,o);
159 decrRefCount(o);
160 }
161}
162
163void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
164 int cport, cfd;
165 char cip[128];
166 redisClient *c;
167 REDIS_NOTUSED(el);
168 REDIS_NOTUSED(mask);
169 REDIS_NOTUSED(privdata);
170
171 cfd = anetAccept(server.neterr, fd, cip, &cport);
172 if (cfd == AE_ERR) {
173 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
174 return;
175 }
176 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
177 if ((c = createClient(cfd)) == NULL) {
178 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
179 close(cfd); /* May be already closed, just ingore errors */
180 return;
181 }
182 /* If maxclient directive is set and this is one client more... close the
183 * connection. Note that we create the client instead to check before
184 * for this condition, since now the socket is already set in nonblocking
185 * mode and we can send an error for free using the Kernel I/O */
186 if (server.maxclients && listLength(server.clients) > server.maxclients) {
187 char *err = "-ERR max number of clients reached\r\n";
188
189 /* That's a best effort error message, don't check write errors */
190 if (write(c->fd,err,strlen(err)) == -1) {
191 /* Nothing to do, Just to avoid the warning... */
192 }
193 freeClient(c);
194 return;
195 }
196 server.stat_numconnections++;
197}
198
199static void freeClientArgv(redisClient *c) {
200 int j;
201
202 for (j = 0; j < c->argc; j++)
203 decrRefCount(c->argv[j]);
204 for (j = 0; j < c->mbargc; j++)
205 decrRefCount(c->mbargv[j]);
206 c->argc = 0;
207 c->mbargc = 0;
208}
209
210void freeClient(redisClient *c) {
211 listNode *ln;
212
213 /* Note that if the client we are freeing is blocked into a blocking
214 * call, we have to set querybuf to NULL *before* to call
215 * unblockClientWaitingData() to avoid processInputBuffer() will get
216 * called. Also it is important to remove the file events after
217 * this, because this call adds the READABLE event. */
218 sdsfree(c->querybuf);
219 c->querybuf = NULL;
220 if (c->flags & REDIS_BLOCKED)
221 unblockClientWaitingData(c);
222
223 /* UNWATCH all the keys */
224 unwatchAllKeys(c);
225 listRelease(c->watched_keys);
226 /* Unsubscribe from all the pubsub channels */
227 pubsubUnsubscribeAllChannels(c,0);
228 pubsubUnsubscribeAllPatterns(c,0);
229 dictRelease(c->pubsub_channels);
230 listRelease(c->pubsub_patterns);
231 /* Obvious cleanup */
232 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
233 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
234 listRelease(c->reply);
235 freeClientArgv(c);
236 close(c->fd);
237 /* Remove from the list of clients */
238 ln = listSearchKey(server.clients,c);
239 redisAssert(ln != NULL);
240 listDelNode(server.clients,ln);
1a71fb96 241 /* Remove from the list of clients waiting for swapped keys, or ready
242 * to be restarted, but not yet woken up again. */
243 if (c->flags & REDIS_IO_WAIT) {
244 redisAssert(server.vm_enabled);
245 if (listLength(c->io_keys) == 0) {
246 ln = listSearchKey(server.io_ready_clients,c);
247
248 /* When this client is waiting to be woken up (REDIS_IO_WAIT),
249 * it should be present in the list io_ready_clients */
250 redisAssert(ln != NULL);
e2641e09 251 listDelNode(server.io_ready_clients,ln);
1a71fb96 252 } else {
253 while (listLength(c->io_keys)) {
254 ln = listFirst(c->io_keys);
255 dontWaitForSwappedKey(c,ln->value);
256 }
e2641e09 257 }
1a71fb96 258 server.vm_blocked_clients--;
e2641e09 259 }
260 listRelease(c->io_keys);
778b2210 261 /* Master/slave cleanup.
262 * Case 1: we lost the connection with a slave. */
e2641e09 263 if (c->flags & REDIS_SLAVE) {
264 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
265 close(c->repldbfd);
266 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
267 ln = listSearchKey(l,c);
268 redisAssert(ln != NULL);
269 listDelNode(l,ln);
270 }
778b2210 271
272 /* Case 2: we lost the connection with the master. */
e2641e09 273 if (c->flags & REDIS_MASTER) {
274 server.master = NULL;
275 server.replstate = REDIS_REPL_CONNECT;
778b2210 276 /* Since we lost the connection with the master, we should also
277 * close the connection with all our slaves if we have any, so
278 * when we'll resync with the master the other slaves will sync again
279 * with us as well. Note that also when the slave is not connected
280 * to the master it will keep refusing connections by other slaves. */
281 while (listLength(server.slaves)) {
282 ln = listFirst(server.slaves);
283 freeClient((redisClient*)ln->value);
284 }
e2641e09 285 }
286 /* Release memory */
287 zfree(c->argv);
288 zfree(c->mbargv);
289 freeClientMultiState(c);
290 zfree(c);
291}
292
293#define GLUEREPLY_UP_TO (1024)
294static void glueReplyBuffersIfNeeded(redisClient *c) {
295 int copylen = 0;
296 char buf[GLUEREPLY_UP_TO];
297 listNode *ln;
298 listIter li;
299 robj *o;
300
301 listRewind(c->reply,&li);
302 while((ln = listNext(&li))) {
303 int objlen;
304
305 o = ln->value;
306 objlen = sdslen(o->ptr);
307 if (copylen + objlen <= GLUEREPLY_UP_TO) {
308 memcpy(buf+copylen,o->ptr,objlen);
309 copylen += objlen;
310 listDelNode(c->reply,ln);
311 } else {
312 if (copylen == 0) return;
313 break;
314 }
315 }
316 /* Now the output buffer is empty, add the new single element */
317 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
318 listAddNodeHead(c->reply,o);
319}
320
321void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
322 redisClient *c = privdata;
323 int nwritten = 0, totwritten = 0, objlen;
324 robj *o;
325 REDIS_NOTUSED(el);
326 REDIS_NOTUSED(mask);
327
328 /* Use writev() if we have enough buffers to send */
329 if (!server.glueoutputbuf &&
330 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
331 !(c->flags & REDIS_MASTER))
332 {
333 sendReplyToClientWritev(el, fd, privdata, mask);
334 return;
335 }
336
337 while(listLength(c->reply)) {
338 if (server.glueoutputbuf && listLength(c->reply) > 1)
339 glueReplyBuffersIfNeeded(c);
340
341 o = listNodeValue(listFirst(c->reply));
342 objlen = sdslen(o->ptr);
343
344 if (objlen == 0) {
345 listDelNode(c->reply,listFirst(c->reply));
346 continue;
347 }
348
349 if (c->flags & REDIS_MASTER) {
350 /* Don't reply to a master */
351 nwritten = objlen - c->sentlen;
352 } else {
353 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
354 if (nwritten <= 0) break;
355 }
356 c->sentlen += nwritten;
357 totwritten += nwritten;
358 /* If we fully sent the object on head go to the next one */
359 if (c->sentlen == objlen) {
360 listDelNode(c->reply,listFirst(c->reply));
361 c->sentlen = 0;
362 }
363 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
364 * bytes, in a single threaded server it's a good idea to serve
365 * other clients as well, even if a very large request comes from
366 * super fast link that is always able to accept data (in real world
367 * scenario think about 'KEYS *' against the loopback interfae) */
368 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
369 }
370 if (nwritten == -1) {
371 if (errno == EAGAIN) {
372 nwritten = 0;
373 } else {
374 redisLog(REDIS_VERBOSE,
375 "Error writing to client: %s", strerror(errno));
376 freeClient(c);
377 return;
378 }
379 }
380 if (totwritten > 0) c->lastinteraction = time(NULL);
381 if (listLength(c->reply) == 0) {
382 c->sentlen = 0;
383 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
384 }
385}
386
387void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
388{
389 redisClient *c = privdata;
390 int nwritten = 0, totwritten = 0, objlen, willwrite;
391 robj *o;
392 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
393 int offset, ion = 0;
394 REDIS_NOTUSED(el);
395 REDIS_NOTUSED(mask);
396
397 listNode *node;
398 while (listLength(c->reply)) {
399 offset = c->sentlen;
400 ion = 0;
401 willwrite = 0;
402
403 /* fill-in the iov[] array */
404 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
405 o = listNodeValue(node);
406 objlen = sdslen(o->ptr);
407
408 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
409 break;
410
411 if(ion == REDIS_WRITEV_IOVEC_COUNT)
412 break; /* no more iovecs */
413
414 iov[ion].iov_base = ((char*)o->ptr) + offset;
415 iov[ion].iov_len = objlen - offset;
416 willwrite += objlen - offset;
417 offset = 0; /* just for the first item */
418 ion++;
419 }
420
421 if(willwrite == 0)
422 break;
423
424 /* write all collected blocks at once */
425 if((nwritten = writev(fd, iov, ion)) < 0) {
426 if (errno != EAGAIN) {
427 redisLog(REDIS_VERBOSE,
428 "Error writing to client: %s", strerror(errno));
429 freeClient(c);
430 return;
431 }
432 break;
433 }
434
435 totwritten += nwritten;
436 offset = c->sentlen;
437
438 /* remove written robjs from c->reply */
439 while (nwritten && listLength(c->reply)) {
440 o = listNodeValue(listFirst(c->reply));
441 objlen = sdslen(o->ptr);
442
443 if(nwritten >= objlen - offset) {
444 listDelNode(c->reply, listFirst(c->reply));
445 nwritten -= objlen - offset;
446 c->sentlen = 0;
447 } else {
448 /* partial write */
449 c->sentlen += nwritten;
450 break;
451 }
452 offset = 0;
453 }
454 }
455
456 if (totwritten > 0)
457 c->lastinteraction = time(NULL);
458
459 if (listLength(c->reply) == 0) {
460 c->sentlen = 0;
461 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
462 }
463}
464
465/* resetClient prepare the client to process the next command */
466void resetClient(redisClient *c) {
467 freeClientArgv(c);
468 c->bulklen = -1;
469 c->multibulk = 0;
470}
471
472void closeTimedoutClients(void) {
473 redisClient *c;
474 listNode *ln;
475 time_t now = time(NULL);
476 listIter li;
477
478 listRewind(server.clients,&li);
479 while ((ln = listNext(&li)) != NULL) {
480 c = listNodeValue(ln);
481 if (server.maxidletime &&
482 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
483 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
e452436a 484 !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
e2641e09 485 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
486 listLength(c->pubsub_patterns) == 0 &&
487 (now - c->lastinteraction > server.maxidletime))
488 {
489 redisLog(REDIS_VERBOSE,"Closing idle client");
490 freeClient(c);
491 } else if (c->flags & REDIS_BLOCKED) {
492 if (c->blockingto != 0 && c->blockingto < now) {
493 addReply(c,shared.nullmultibulk);
494 unblockClientWaitingData(c);
495 }
496 }
497 }
498}
499
500void processInputBuffer(redisClient *c) {
501again:
502 /* Before to process the input buffer, make sure the client is not
503 * waitig for a blocking operation such as BLPOP. Note that the first
504 * iteration the client is never blocked, otherwise the processInputBuffer
505 * would not be called at all, but after the execution of the first commands
506 * in the input buffer the client may be blocked, and the "goto again"
507 * will try to reiterate. The following line will make it return asap. */
508 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
509 if (c->bulklen == -1) {
510 /* Read the first line of the query */
511 char *p = strchr(c->querybuf,'\n');
512 size_t querylen;
513
514 if (p) {
515 sds query, *argv;
516 int argc, j;
517
518 query = c->querybuf;
519 c->querybuf = sdsempty();
520 querylen = 1+(p-(query));
521 if (sdslen(query) > querylen) {
522 /* leave data after the first line of the query in the buffer */
523 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
524 }
525 *p = '\0'; /* remove "\n" */
526 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
527 sdsupdatelen(query);
528
529 /* Now we can split the query in arguments */
530 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
531 sdsfree(query);
532
533 if (c->argv) zfree(c->argv);
534 c->argv = zmalloc(sizeof(robj*)*argc);
535
536 for (j = 0; j < argc; j++) {
537 if (sdslen(argv[j])) {
538 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
539 c->argc++;
540 } else {
541 sdsfree(argv[j]);
542 }
543 }
544 zfree(argv);
545 if (c->argc) {
546 /* Execute the command. If the client is still valid
547 * after processCommand() return and there is something
548 * on the query buffer try to process the next command. */
549 if (processCommand(c) && sdslen(c->querybuf)) goto again;
550 } else {
551 /* Nothing to process, argc == 0. Just process the query
552 * buffer if it's not empty or return to the caller */
553 if (sdslen(c->querybuf)) goto again;
554 }
555 return;
556 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
557 redisLog(REDIS_VERBOSE, "Client protocol error");
558 freeClient(c);
559 return;
560 }
561 } else {
562 /* Bulk read handling. Note that if we are at this point
563 the client already sent a command terminated with a newline,
564 we are reading the bulk data that is actually the last
565 argument of the command. */
566 int qbl = sdslen(c->querybuf);
567
568 if (c->bulklen <= qbl) {
569 /* Copy everything but the final CRLF as final argument */
570 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
571 c->argc++;
572 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
573 /* Process the command. If the client is still valid after
574 * the processing and there is more data in the buffer
575 * try to parse it. */
576 if (processCommand(c) && sdslen(c->querybuf)) goto again;
577 return;
578 }
579 }
580}
581
582void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
583 redisClient *c = (redisClient*) privdata;
584 char buf[REDIS_IOBUF_LEN];
585 int nread;
586 REDIS_NOTUSED(el);
587 REDIS_NOTUSED(mask);
588
589 nread = read(fd, buf, REDIS_IOBUF_LEN);
590 if (nread == -1) {
591 if (errno == EAGAIN) {
592 nread = 0;
593 } else {
594 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
595 freeClient(c);
596 return;
597 }
598 } else if (nread == 0) {
599 redisLog(REDIS_VERBOSE, "Client closed connection");
600 freeClient(c);
601 return;
602 }
603 if (nread) {
604 c->querybuf = sdscatlen(c->querybuf, buf, nread);
605 c->lastinteraction = time(NULL);
606 } else {
607 return;
608 }
609 processInputBuffer(c);
610}