]> git.saurik.com Git - redis.git/blob - src/networking.c
sockaddr_un.sun_path appears to never hold anything after accept()
[redis.git] / src / networking.c
1 #include "redis.h"
2
3 #include <sys/uio.h>
4
5 void *dupClientReplyValue(void *o) {
6 incrRefCount((robj*)o);
7 return o;
8 }
9
10 int listMatchObjects(void *a, void *b) {
11 return equalStringObjects(a,b);
12 }
13
14 redisClient *createClient(int fd) {
15 redisClient *c = zmalloc(sizeof(*c));
16
17 anetNonBlock(NULL,fd);
18 anetTcpNoDelay(NULL,fd);
19 if (!c) return NULL;
20 selectDb(c,0);
21 c->fd = fd;
22 c->querybuf = sdsempty();
23 c->argc = 0;
24 c->argv = NULL;
25 c->bulklen = -1;
26 c->multibulk = 0;
27 c->mbargc = 0;
28 c->mbargv = NULL;
29 c->sentlen = 0;
30 c->flags = 0;
31 c->lastinteraction = time(NULL);
32 c->authenticated = 0;
33 c->replstate = REDIS_REPL_NONE;
34 c->reply = listCreate();
35 listSetFreeMethod(c->reply,decrRefCount);
36 listSetDupMethod(c->reply,dupClientReplyValue);
37 c->blocking_keys = NULL;
38 c->blocking_keys_num = 0;
39 c->io_keys = listCreate();
40 c->watched_keys = listCreate();
41 listSetFreeMethod(c->io_keys,decrRefCount);
42 c->pubsub_channels = dictCreate(&setDictType,NULL);
43 c->pubsub_patterns = listCreate();
44 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
45 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
46 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
47 readQueryFromClient, c) == AE_ERR) {
48 freeClient(c);
49 return NULL;
50 }
51 listAddNodeTail(server.clients,c);
52 initClientMultiState(c);
53 return c;
54 }
55
56 void addReply(redisClient *c, robj *obj) {
57 if (listLength(c->reply) == 0 &&
58 (c->replstate == REDIS_REPL_NONE ||
59 c->replstate == REDIS_REPL_ONLINE) &&
60 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
61 sendReplyToClient, c) == AE_ERR) return;
62
63 if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
64 obj = dupStringObject(obj);
65 obj->refcount = 0; /* getDecodedObject() will increment the refcount */
66 }
67 listAddNodeTail(c->reply,getDecodedObject(obj));
68 }
69
70 void addReplySds(redisClient *c, sds s) {
71 robj *o = createObject(REDIS_STRING,s);
72 addReply(c,o);
73 decrRefCount(o);
74 }
75
76 void addReplyDouble(redisClient *c, double d) {
77 char buf[128];
78
79 snprintf(buf,sizeof(buf),"%.17g",d);
80 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
81 (unsigned long) strlen(buf),buf));
82 }
83
84 void addReplyLongLong(redisClient *c, long long ll) {
85 char buf[128];
86 size_t len;
87
88 if (ll == 0) {
89 addReply(c,shared.czero);
90 return;
91 } else if (ll == 1) {
92 addReply(c,shared.cone);
93 return;
94 }
95 buf[0] = ':';
96 len = ll2string(buf+1,sizeof(buf)-1,ll);
97 buf[len+1] = '\r';
98 buf[len+2] = '\n';
99 addReplySds(c,sdsnewlen(buf,len+3));
100 }
101
102 void addReplyUlong(redisClient *c, unsigned long ul) {
103 char buf[128];
104 size_t len;
105
106 if (ul == 0) {
107 addReply(c,shared.czero);
108 return;
109 } else if (ul == 1) {
110 addReply(c,shared.cone);
111 return;
112 }
113 len = snprintf(buf,sizeof(buf),":%lu\r\n",ul);
114 addReplySds(c,sdsnewlen(buf,len));
115 }
116
117 void addReplyBulkLen(redisClient *c, robj *obj) {
118 size_t len, intlen;
119 char buf[128];
120
121 if (obj->encoding == REDIS_ENCODING_RAW) {
122 len = sdslen(obj->ptr);
123 } else {
124 long n = (long)obj->ptr;
125
126 /* Compute how many bytes will take this integer as a radix 10 string */
127 len = 1;
128 if (n < 0) {
129 len++;
130 n = -n;
131 }
132 while((n = n/10) != 0) {
133 len++;
134 }
135 }
136 buf[0] = '$';
137 intlen = ll2string(buf+1,sizeof(buf)-1,(long long)len);
138 buf[intlen+1] = '\r';
139 buf[intlen+2] = '\n';
140 addReplySds(c,sdsnewlen(buf,intlen+3));
141 }
142
143 void addReplyBulk(redisClient *c, robj *obj) {
144 addReplyBulkLen(c,obj);
145 addReply(c,obj);
146 addReply(c,shared.crlf);
147 }
148
149 /* In the CONFIG command we need to add vanilla C string as bulk replies */
150 void addReplyBulkCString(redisClient *c, char *s) {
151 if (s == NULL) {
152 addReply(c,shared.nullbulk);
153 } else {
154 robj *o = createStringObject(s,strlen(s));
155 addReplyBulk(c,o);
156 decrRefCount(o);
157 }
158 }
159
160 static void acceptCommonHandler(int fd) {
161 redisClient *c;
162 if ((c = createClient(fd)) == NULL) {
163 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
164 close(fd); /* May be already closed, just ingore errors */
165 return;
166 }
167 /* If maxclient directive is set and this is one client more... close the
168 * connection. Note that we create the client instead to check before
169 * for this condition, since now the socket is already set in nonblocking
170 * mode and we can send an error for free using the Kernel I/O */
171 if (server.maxclients && listLength(server.clients) > server.maxclients) {
172 char *err = "-ERR max number of clients reached\r\n";
173
174 /* That's a best effort error message, don't check write errors */
175 if (write(c->fd,err,strlen(err)) == -1) {
176 /* Nothing to do, Just to avoid the warning... */
177 }
178 freeClient(c);
179 return;
180 }
181 server.stat_numconnections++;
182 }
183
184 void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
185 int cport, cfd;
186 char cip[128];
187 REDIS_NOTUSED(el);
188 REDIS_NOTUSED(mask);
189 REDIS_NOTUSED(privdata);
190
191 cfd = anetTcpAccept(server.neterr, fd, cip, &cport);
192 if (cfd == AE_ERR) {
193 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
194 return;
195 }
196 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
197 acceptCommonHandler(cfd);
198 }
199
200 void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
201 int cfd;
202 REDIS_NOTUSED(el);
203 REDIS_NOTUSED(mask);
204 REDIS_NOTUSED(privdata);
205
206 cfd = anetUnixAccept(server.neterr, fd);
207 if (cfd == AE_ERR) {
208 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
209 return;
210 }
211 redisLog(REDIS_VERBOSE,"Accepted connection to %s", server.unixsocket);
212 acceptCommonHandler(cfd);
213 }
214
215
216 static void freeClientArgv(redisClient *c) {
217 int j;
218
219 for (j = 0; j < c->argc; j++)
220 decrRefCount(c->argv[j]);
221 for (j = 0; j < c->mbargc; j++)
222 decrRefCount(c->mbargv[j]);
223 c->argc = 0;
224 c->mbargc = 0;
225 }
226
227 void freeClient(redisClient *c) {
228 listNode *ln;
229
230 /* Note that if the client we are freeing is blocked into a blocking
231 * call, we have to set querybuf to NULL *before* to call
232 * unblockClientWaitingData() to avoid processInputBuffer() will get
233 * called. Also it is important to remove the file events after
234 * this, because this call adds the READABLE event. */
235 sdsfree(c->querybuf);
236 c->querybuf = NULL;
237 if (c->flags & REDIS_BLOCKED)
238 unblockClientWaitingData(c);
239
240 /* UNWATCH all the keys */
241 unwatchAllKeys(c);
242 listRelease(c->watched_keys);
243 /* Unsubscribe from all the pubsub channels */
244 pubsubUnsubscribeAllChannels(c,0);
245 pubsubUnsubscribeAllPatterns(c,0);
246 dictRelease(c->pubsub_channels);
247 listRelease(c->pubsub_patterns);
248 /* Obvious cleanup */
249 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
250 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
251 listRelease(c->reply);
252 freeClientArgv(c);
253 close(c->fd);
254 /* Remove from the list of clients */
255 ln = listSearchKey(server.clients,c);
256 redisAssert(ln != NULL);
257 listDelNode(server.clients,ln);
258 /* Remove from the list of clients waiting for swapped keys, or ready
259 * to be restarted, but not yet woken up again. */
260 if (c->flags & REDIS_IO_WAIT) {
261 redisAssert(server.vm_enabled);
262 if (listLength(c->io_keys) == 0) {
263 ln = listSearchKey(server.io_ready_clients,c);
264
265 /* When this client is waiting to be woken up (REDIS_IO_WAIT),
266 * it should be present in the list io_ready_clients */
267 redisAssert(ln != NULL);
268 listDelNode(server.io_ready_clients,ln);
269 } else {
270 while (listLength(c->io_keys)) {
271 ln = listFirst(c->io_keys);
272 dontWaitForSwappedKey(c,ln->value);
273 }
274 }
275 server.vm_blocked_clients--;
276 }
277 listRelease(c->io_keys);
278 /* Master/slave cleanup */
279 if (c->flags & REDIS_SLAVE) {
280 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
281 close(c->repldbfd);
282 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
283 ln = listSearchKey(l,c);
284 redisAssert(ln != NULL);
285 listDelNode(l,ln);
286 }
287 if (c->flags & REDIS_MASTER) {
288 server.master = NULL;
289 server.replstate = REDIS_REPL_CONNECT;
290 }
291 /* Release memory */
292 zfree(c->argv);
293 zfree(c->mbargv);
294 freeClientMultiState(c);
295 zfree(c);
296 }
297
298 #define GLUEREPLY_UP_TO (1024)
299 static void glueReplyBuffersIfNeeded(redisClient *c) {
300 int copylen = 0;
301 char buf[GLUEREPLY_UP_TO];
302 listNode *ln;
303 listIter li;
304 robj *o;
305
306 listRewind(c->reply,&li);
307 while((ln = listNext(&li))) {
308 int objlen;
309
310 o = ln->value;
311 objlen = sdslen(o->ptr);
312 if (copylen + objlen <= GLUEREPLY_UP_TO) {
313 memcpy(buf+copylen,o->ptr,objlen);
314 copylen += objlen;
315 listDelNode(c->reply,ln);
316 } else {
317 if (copylen == 0) return;
318 break;
319 }
320 }
321 /* Now the output buffer is empty, add the new single element */
322 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
323 listAddNodeHead(c->reply,o);
324 }
325
326 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
327 redisClient *c = privdata;
328 int nwritten = 0, totwritten = 0, objlen;
329 robj *o;
330 REDIS_NOTUSED(el);
331 REDIS_NOTUSED(mask);
332
333 /* Use writev() if we have enough buffers to send */
334 if (!server.glueoutputbuf &&
335 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
336 !(c->flags & REDIS_MASTER))
337 {
338 sendReplyToClientWritev(el, fd, privdata, mask);
339 return;
340 }
341
342 while(listLength(c->reply)) {
343 if (server.glueoutputbuf && listLength(c->reply) > 1)
344 glueReplyBuffersIfNeeded(c);
345
346 o = listNodeValue(listFirst(c->reply));
347 objlen = sdslen(o->ptr);
348
349 if (objlen == 0) {
350 listDelNode(c->reply,listFirst(c->reply));
351 continue;
352 }
353
354 if (c->flags & REDIS_MASTER) {
355 /* Don't reply to a master */
356 nwritten = objlen - c->sentlen;
357 } else {
358 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
359 if (nwritten <= 0) break;
360 }
361 c->sentlen += nwritten;
362 totwritten += nwritten;
363 /* If we fully sent the object on head go to the next one */
364 if (c->sentlen == objlen) {
365 listDelNode(c->reply,listFirst(c->reply));
366 c->sentlen = 0;
367 }
368 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
369 * bytes, in a single threaded server it's a good idea to serve
370 * other clients as well, even if a very large request comes from
371 * super fast link that is always able to accept data (in real world
372 * scenario think about 'KEYS *' against the loopback interfae) */
373 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
374 }
375 if (nwritten == -1) {
376 if (errno == EAGAIN) {
377 nwritten = 0;
378 } else {
379 redisLog(REDIS_VERBOSE,
380 "Error writing to client: %s", strerror(errno));
381 freeClient(c);
382 return;
383 }
384 }
385 if (totwritten > 0) c->lastinteraction = time(NULL);
386 if (listLength(c->reply) == 0) {
387 c->sentlen = 0;
388 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
389 }
390 }
391
392 void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
393 {
394 redisClient *c = privdata;
395 int nwritten = 0, totwritten = 0, objlen, willwrite;
396 robj *o;
397 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
398 int offset, ion = 0;
399 REDIS_NOTUSED(el);
400 REDIS_NOTUSED(mask);
401
402 listNode *node;
403 while (listLength(c->reply)) {
404 offset = c->sentlen;
405 ion = 0;
406 willwrite = 0;
407
408 /* fill-in the iov[] array */
409 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
410 o = listNodeValue(node);
411 objlen = sdslen(o->ptr);
412
413 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
414 break;
415
416 if(ion == REDIS_WRITEV_IOVEC_COUNT)
417 break; /* no more iovecs */
418
419 iov[ion].iov_base = ((char*)o->ptr) + offset;
420 iov[ion].iov_len = objlen - offset;
421 willwrite += objlen - offset;
422 offset = 0; /* just for the first item */
423 ion++;
424 }
425
426 if(willwrite == 0)
427 break;
428
429 /* write all collected blocks at once */
430 if((nwritten = writev(fd, iov, ion)) < 0) {
431 if (errno != EAGAIN) {
432 redisLog(REDIS_VERBOSE,
433 "Error writing to client: %s", strerror(errno));
434 freeClient(c);
435 return;
436 }
437 break;
438 }
439
440 totwritten += nwritten;
441 offset = c->sentlen;
442
443 /* remove written robjs from c->reply */
444 while (nwritten && listLength(c->reply)) {
445 o = listNodeValue(listFirst(c->reply));
446 objlen = sdslen(o->ptr);
447
448 if(nwritten >= objlen - offset) {
449 listDelNode(c->reply, listFirst(c->reply));
450 nwritten -= objlen - offset;
451 c->sentlen = 0;
452 } else {
453 /* partial write */
454 c->sentlen += nwritten;
455 break;
456 }
457 offset = 0;
458 }
459 }
460
461 if (totwritten > 0)
462 c->lastinteraction = time(NULL);
463
464 if (listLength(c->reply) == 0) {
465 c->sentlen = 0;
466 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
467 }
468 }
469
470 /* resetClient prepare the client to process the next command */
471 void resetClient(redisClient *c) {
472 freeClientArgv(c);
473 c->bulklen = -1;
474 c->multibulk = 0;
475 }
476
477 void closeTimedoutClients(void) {
478 redisClient *c;
479 listNode *ln;
480 time_t now = time(NULL);
481 listIter li;
482
483 listRewind(server.clients,&li);
484 while ((ln = listNext(&li)) != NULL) {
485 c = listNodeValue(ln);
486 if (server.maxidletime &&
487 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
488 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
489 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
490 listLength(c->pubsub_patterns) == 0 &&
491 (now - c->lastinteraction > server.maxidletime))
492 {
493 redisLog(REDIS_VERBOSE,"Closing idle client");
494 freeClient(c);
495 } else if (c->flags & REDIS_BLOCKED) {
496 if (c->blockingto != 0 && c->blockingto < now) {
497 addReply(c,shared.nullmultibulk);
498 unblockClientWaitingData(c);
499 }
500 }
501 }
502 }
503
504 void processInputBuffer(redisClient *c) {
505 again:
506 /* Before to process the input buffer, make sure the client is not
507 * waitig for a blocking operation such as BLPOP. Note that the first
508 * iteration the client is never blocked, otherwise the processInputBuffer
509 * would not be called at all, but after the execution of the first commands
510 * in the input buffer the client may be blocked, and the "goto again"
511 * will try to reiterate. The following line will make it return asap. */
512 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
513 if (c->bulklen == -1) {
514 /* Read the first line of the query */
515 char *p = strchr(c->querybuf,'\n');
516 size_t querylen;
517
518 if (p) {
519 sds query, *argv;
520 int argc, j;
521
522 query = c->querybuf;
523 c->querybuf = sdsempty();
524 querylen = 1+(p-(query));
525 if (sdslen(query) > querylen) {
526 /* leave data after the first line of the query in the buffer */
527 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
528 }
529 *p = '\0'; /* remove "\n" */
530 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
531 sdsupdatelen(query);
532
533 /* Now we can split the query in arguments */
534 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
535 sdsfree(query);
536
537 if (c->argv) zfree(c->argv);
538 c->argv = zmalloc(sizeof(robj*)*argc);
539
540 for (j = 0; j < argc; j++) {
541 if (sdslen(argv[j])) {
542 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
543 c->argc++;
544 } else {
545 sdsfree(argv[j]);
546 }
547 }
548 zfree(argv);
549 if (c->argc) {
550 /* Execute the command. If the client is still valid
551 * after processCommand() return and there is something
552 * on the query buffer try to process the next command. */
553 if (processCommand(c) && sdslen(c->querybuf)) goto again;
554 } else {
555 /* Nothing to process, argc == 0. Just process the query
556 * buffer if it's not empty or return to the caller */
557 if (sdslen(c->querybuf)) goto again;
558 }
559 return;
560 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
561 redisLog(REDIS_VERBOSE, "Client protocol error");
562 freeClient(c);
563 return;
564 }
565 } else {
566 /* Bulk read handling. Note that if we are at this point
567 the client already sent a command terminated with a newline,
568 we are reading the bulk data that is actually the last
569 argument of the command. */
570 int qbl = sdslen(c->querybuf);
571
572 if (c->bulklen <= qbl) {
573 /* Copy everything but the final CRLF as final argument */
574 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
575 c->argc++;
576 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
577 /* Process the command. If the client is still valid after
578 * the processing and there is more data in the buffer
579 * try to parse it. */
580 if (processCommand(c) && sdslen(c->querybuf)) goto again;
581 return;
582 }
583 }
584 }
585
586 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
587 redisClient *c = (redisClient*) privdata;
588 char buf[REDIS_IOBUF_LEN];
589 int nread;
590 REDIS_NOTUSED(el);
591 REDIS_NOTUSED(mask);
592
593 nread = read(fd, buf, REDIS_IOBUF_LEN);
594 if (nread == -1) {
595 if (errno == EAGAIN) {
596 nread = 0;
597 } else {
598 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
599 freeClient(c);
600 return;
601 }
602 } else if (nread == 0) {
603 redisLog(REDIS_VERBOSE, "Client closed connection");
604 freeClient(c);
605 return;
606 }
607 if (nread) {
608 c->querybuf = sdscatlen(c->querybuf, buf, nread);
609 c->lastinteraction = time(NULL);
610 } else {
611 return;
612 }
613 processInputBuffer(c);
614 }