]> git.saurik.com Git - redis.git/blob - src/networking.c
Merge branch 'zset-mem' into zrevrangebyscore
[redis.git] / src / networking.c
1 #include "redis.h"
2 #include <sys/uio.h>
3
4 void *dupClientReplyValue(void *o) {
5 incrRefCount((robj*)o);
6 return o;
7 }
8
9 int listMatchObjects(void *a, void *b) {
10 return equalStringObjects(a,b);
11 }
12
13 redisClient *createClient(int fd) {
14 redisClient *c = zmalloc(sizeof(redisClient));
15 c->bufpos = 0;
16
17 anetNonBlock(NULL,fd);
18 anetTcpNoDelay(NULL,fd);
19 if (!c) return NULL;
20 if (aeCreateFileEvent(server.el,fd,AE_READABLE,
21 readQueryFromClient, c) == AE_ERR)
22 {
23 close(fd);
24 zfree(c);
25 return NULL;
26 }
27
28 selectDb(c,0);
29 c->fd = fd;
30 c->querybuf = sdsempty();
31 c->argc = 0;
32 c->argv = NULL;
33 c->bulklen = -1;
34 c->multibulk = 0;
35 c->mbargc = 0;
36 c->mbargv = NULL;
37 c->sentlen = 0;
38 c->flags = 0;
39 c->lastinteraction = time(NULL);
40 c->authenticated = 0;
41 c->replstate = REDIS_REPL_NONE;
42 c->reply = listCreate();
43 listSetFreeMethod(c->reply,decrRefCount);
44 listSetDupMethod(c->reply,dupClientReplyValue);
45 c->blocking_keys = NULL;
46 c->blocking_keys_num = 0;
47 c->io_keys = listCreate();
48 c->watched_keys = listCreate();
49 listSetFreeMethod(c->io_keys,decrRefCount);
50 c->pubsub_channels = dictCreate(&setDictType,NULL);
51 c->pubsub_patterns = listCreate();
52 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
53 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
54 listAddNodeTail(server.clients,c);
55 initClientMultiState(c);
56 return c;
57 }
58
59 int _installWriteEvent(redisClient *c) {
60 if (c->fd <= 0) return REDIS_ERR;
61 if (c->bufpos == 0 && listLength(c->reply) == 0 &&
62 (c->replstate == REDIS_REPL_NONE ||
63 c->replstate == REDIS_REPL_ONLINE) &&
64 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
65 sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
66 return REDIS_OK;
67 }
68
69 /* Create a duplicate of the last object in the reply list when
70 * it is not exclusively owned by the reply list. */
71 robj *dupLastObjectIfNeeded(list *reply) {
72 robj *new, *cur;
73 listNode *ln;
74 redisAssert(listLength(reply) > 0);
75 ln = listLast(reply);
76 cur = listNodeValue(ln);
77 if (cur->refcount > 1) {
78 new = dupStringObject(cur);
79 decrRefCount(cur);
80 listNodeValue(ln) = new;
81 }
82 return listNodeValue(ln);
83 }
84
85 int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
86 size_t available = sizeof(c->buf)-c->bufpos;
87
88 /* If there already are entries in the reply list, we cannot
89 * add anything more to the static buffer. */
90 if (listLength(c->reply) > 0) return REDIS_ERR;
91
92 /* Check that the buffer has enough space available for this string. */
93 if (len > available) return REDIS_ERR;
94
95 memcpy(c->buf+c->bufpos,s,len);
96 c->bufpos+=len;
97 return REDIS_OK;
98 }
99
100 void _addReplyObjectToList(redisClient *c, robj *o) {
101 robj *tail;
102 if (listLength(c->reply) == 0) {
103 incrRefCount(o);
104 listAddNodeTail(c->reply,o);
105 } else {
106 tail = listNodeValue(listLast(c->reply));
107
108 /* Append to this object when possible. */
109 if (tail->ptr != NULL &&
110 sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
111 {
112 tail = dupLastObjectIfNeeded(c->reply);
113 tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
114 } else {
115 incrRefCount(o);
116 listAddNodeTail(c->reply,o);
117 }
118 }
119 }
120
121 /* This method takes responsibility over the sds. When it is no longer
122 * needed it will be free'd, otherwise it ends up in a robj. */
123 void _addReplySdsToList(redisClient *c, sds s) {
124 robj *tail;
125 if (listLength(c->reply) == 0) {
126 listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
127 } else {
128 tail = listNodeValue(listLast(c->reply));
129
130 /* Append to this object when possible. */
131 if (tail->ptr != NULL &&
132 sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
133 {
134 tail = dupLastObjectIfNeeded(c->reply);
135 tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
136 sdsfree(s);
137 } else {
138 listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
139 }
140 }
141 }
142
143 void _addReplyStringToList(redisClient *c, char *s, size_t len) {
144 robj *tail;
145 if (listLength(c->reply) == 0) {
146 listAddNodeTail(c->reply,createStringObject(s,len));
147 } else {
148 tail = listNodeValue(listLast(c->reply));
149
150 /* Append to this object when possible. */
151 if (tail->ptr != NULL &&
152 sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
153 {
154 tail = dupLastObjectIfNeeded(c->reply);
155 tail->ptr = sdscatlen(tail->ptr,s,len);
156 } else {
157 listAddNodeTail(c->reply,createStringObject(s,len));
158 }
159 }
160 }
161
162 void addReply(redisClient *c, robj *obj) {
163 if (_installWriteEvent(c) != REDIS_OK) return;
164 redisAssert(!server.vm_enabled || obj->storage == REDIS_VM_MEMORY);
165
166 /* This is an important place where we can avoid copy-on-write
167 * when there is a saving child running, avoiding touching the
168 * refcount field of the object if it's not needed.
169 *
170 * If the encoding is RAW and there is room in the static buffer
171 * we'll be able to send the object to the client without
172 * messing with its page. */
173 if (obj->encoding == REDIS_ENCODING_RAW) {
174 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
175 _addReplyObjectToList(c,obj);
176 } else {
177 obj = getDecodedObject(obj);
178 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
179 _addReplyObjectToList(c,obj);
180 decrRefCount(obj);
181 }
182 }
183
184 void addReplySds(redisClient *c, sds s) {
185 if (_installWriteEvent(c) != REDIS_OK) {
186 /* The caller expects the sds to be free'd. */
187 sdsfree(s);
188 return;
189 }
190 if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
191 sdsfree(s);
192 } else {
193 /* This method free's the sds when it is no longer needed. */
194 _addReplySdsToList(c,s);
195 }
196 }
197
198 void addReplyString(redisClient *c, char *s, size_t len) {
199 if (_installWriteEvent(c) != REDIS_OK) return;
200 if (_addReplyToBuffer(c,s,len) != REDIS_OK)
201 _addReplyStringToList(c,s,len);
202 }
203
204 void _addReplyError(redisClient *c, char *s, size_t len) {
205 addReplyString(c,"-ERR ",5);
206 addReplyString(c,s,len);
207 addReplyString(c,"\r\n",2);
208 }
209
210 void addReplyError(redisClient *c, char *err) {
211 _addReplyError(c,err,strlen(err));
212 }
213
214 void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
215 va_list ap;
216 va_start(ap,fmt);
217 sds s = sdscatvprintf(sdsempty(),fmt,ap);
218 va_end(ap);
219 _addReplyError(c,s,sdslen(s));
220 sdsfree(s);
221 }
222
223 void _addReplyStatus(redisClient *c, char *s, size_t len) {
224 addReplyString(c,"+",1);
225 addReplyString(c,s,len);
226 addReplyString(c,"\r\n",2);
227 }
228
229 void addReplyStatus(redisClient *c, char *status) {
230 _addReplyStatus(c,status,strlen(status));
231 }
232
233 void addReplyStatusFormat(redisClient *c, const char *fmt, ...) {
234 va_list ap;
235 va_start(ap,fmt);
236 sds s = sdscatvprintf(sdsempty(),fmt,ap);
237 va_end(ap);
238 _addReplyStatus(c,s,sdslen(s));
239 sdsfree(s);
240 }
241
242 /* Adds an empty object to the reply list that will contain the multi bulk
243 * length, which is not known when this function is called. */
244 void *addDeferredMultiBulkLength(redisClient *c) {
245 /* Note that we install the write event here even if the object is not
246 * ready to be sent, since we are sure that before returning to the
247 * event loop setDeferredMultiBulkLength() will be called. */
248 if (_installWriteEvent(c) != REDIS_OK) return NULL;
249 listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL));
250 return listLast(c->reply);
251 }
252
253 /* Populate the length object and try glueing it to the next chunk. */
254 void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
255 listNode *ln = (listNode*)node;
256 robj *len, *next;
257
258 /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
259 if (node == NULL) return;
260
261 len = listNodeValue(ln);
262 len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
263 if (ln->next != NULL) {
264 next = listNodeValue(ln->next);
265
266 /* Only glue when the next node is non-NULL (an sds in this case) */
267 if (next->ptr != NULL) {
268 len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
269 listDelNode(c->reply,ln->next);
270 }
271 }
272 }
273
274 void addReplyDouble(redisClient *c, double d) {
275 char dbuf[128], sbuf[128];
276 int dlen, slen;
277 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
278 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
279 addReplyString(c,sbuf,slen);
280 }
281
282 void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
283 char buf[128];
284 int len;
285 buf[0] = prefix;
286 len = ll2string(buf+1,sizeof(buf)-1,ll);
287 buf[len+1] = '\r';
288 buf[len+2] = '\n';
289 addReplyString(c,buf,len+3);
290 }
291
292 void addReplyLongLong(redisClient *c, long long ll) {
293 _addReplyLongLong(c,ll,':');
294 }
295
296 void addReplyMultiBulkLen(redisClient *c, long length) {
297 _addReplyLongLong(c,length,'*');
298 }
299
300 void addReplyBulkLen(redisClient *c, robj *obj) {
301 size_t len;
302
303 if (obj->encoding == REDIS_ENCODING_RAW) {
304 len = sdslen(obj->ptr);
305 } else {
306 long n = (long)obj->ptr;
307
308 /* Compute how many bytes will take this integer as a radix 10 string */
309 len = 1;
310 if (n < 0) {
311 len++;
312 n = -n;
313 }
314 while((n = n/10) != 0) {
315 len++;
316 }
317 }
318 _addReplyLongLong(c,len,'$');
319 }
320
321 void addReplyBulk(redisClient *c, robj *obj) {
322 addReplyBulkLen(c,obj);
323 addReply(c,obj);
324 addReply(c,shared.crlf);
325 }
326
327 /* In the CONFIG command we need to add vanilla C string as bulk replies */
328 void addReplyBulkCString(redisClient *c, char *s) {
329 if (s == NULL) {
330 addReply(c,shared.nullbulk);
331 } else {
332 robj *o = createStringObject(s,strlen(s));
333 addReplyBulk(c,o);
334 decrRefCount(o);
335 }
336 }
337
338 void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
339 int cport, cfd;
340 char cip[128];
341 redisClient *c;
342 REDIS_NOTUSED(el);
343 REDIS_NOTUSED(mask);
344 REDIS_NOTUSED(privdata);
345
346 cfd = anetAccept(server.neterr, fd, cip, &cport);
347 if (cfd == AE_ERR) {
348 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
349 return;
350 }
351 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
352 if ((c = createClient(cfd)) == NULL) {
353 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
354 close(cfd); /* May be already closed, just ingore errors */
355 return;
356 }
357 /* If maxclient directive is set and this is one client more... close the
358 * connection. Note that we create the client instead to check before
359 * for this condition, since now the socket is already set in nonblocking
360 * mode and we can send an error for free using the Kernel I/O */
361 if (server.maxclients && listLength(server.clients) > server.maxclients) {
362 char *err = "-ERR max number of clients reached\r\n";
363
364 /* That's a best effort error message, don't check write errors */
365 if (write(c->fd,err,strlen(err)) == -1) {
366 /* Nothing to do, Just to avoid the warning... */
367 }
368 freeClient(c);
369 return;
370 }
371 server.stat_numconnections++;
372 }
373
374 static void freeClientArgv(redisClient *c) {
375 int j;
376
377 for (j = 0; j < c->argc; j++)
378 decrRefCount(c->argv[j]);
379 for (j = 0; j < c->mbargc; j++)
380 decrRefCount(c->mbargv[j]);
381 c->argc = 0;
382 c->mbargc = 0;
383 }
384
385 void freeClient(redisClient *c) {
386 listNode *ln;
387
388 /* Note that if the client we are freeing is blocked into a blocking
389 * call, we have to set querybuf to NULL *before* to call
390 * unblockClientWaitingData() to avoid processInputBuffer() will get
391 * called. Also it is important to remove the file events after
392 * this, because this call adds the READABLE event. */
393 sdsfree(c->querybuf);
394 c->querybuf = NULL;
395 if (c->flags & REDIS_BLOCKED)
396 unblockClientWaitingData(c);
397
398 /* UNWATCH all the keys */
399 unwatchAllKeys(c);
400 listRelease(c->watched_keys);
401 /* Unsubscribe from all the pubsub channels */
402 pubsubUnsubscribeAllChannels(c,0);
403 pubsubUnsubscribeAllPatterns(c,0);
404 dictRelease(c->pubsub_channels);
405 listRelease(c->pubsub_patterns);
406 /* Obvious cleanup */
407 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
408 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
409 listRelease(c->reply);
410 freeClientArgv(c);
411 close(c->fd);
412 /* Remove from the list of clients */
413 ln = listSearchKey(server.clients,c);
414 redisAssert(ln != NULL);
415 listDelNode(server.clients,ln);
416 /* Remove from the list of clients waiting for swapped keys, or ready
417 * to be restarted, but not yet woken up again. */
418 if (c->flags & REDIS_IO_WAIT) {
419 redisAssert(server.vm_enabled);
420 if (listLength(c->io_keys) == 0) {
421 ln = listSearchKey(server.io_ready_clients,c);
422
423 /* When this client is waiting to be woken up (REDIS_IO_WAIT),
424 * it should be present in the list io_ready_clients */
425 redisAssert(ln != NULL);
426 listDelNode(server.io_ready_clients,ln);
427 } else {
428 while (listLength(c->io_keys)) {
429 ln = listFirst(c->io_keys);
430 dontWaitForSwappedKey(c,ln->value);
431 }
432 }
433 server.vm_blocked_clients--;
434 }
435 listRelease(c->io_keys);
436 /* Master/slave cleanup.
437 * Case 1: we lost the connection with a slave. */
438 if (c->flags & REDIS_SLAVE) {
439 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
440 close(c->repldbfd);
441 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
442 ln = listSearchKey(l,c);
443 redisAssert(ln != NULL);
444 listDelNode(l,ln);
445 }
446
447 /* Case 2: we lost the connection with the master. */
448 if (c->flags & REDIS_MASTER) {
449 server.master = NULL;
450 server.replstate = REDIS_REPL_CONNECT;
451 /* Since we lost the connection with the master, we should also
452 * close the connection with all our slaves if we have any, so
453 * when we'll resync with the master the other slaves will sync again
454 * with us as well. Note that also when the slave is not connected
455 * to the master it will keep refusing connections by other slaves. */
456 while (listLength(server.slaves)) {
457 ln = listFirst(server.slaves);
458 freeClient((redisClient*)ln->value);
459 }
460 }
461 /* Release memory */
462 zfree(c->argv);
463 zfree(c->mbargv);
464 freeClientMultiState(c);
465 zfree(c);
466 }
467
468 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
469 redisClient *c = privdata;
470 int nwritten = 0, totwritten = 0, objlen;
471 robj *o;
472 REDIS_NOTUSED(el);
473 REDIS_NOTUSED(mask);
474
475 /* Use writev() if we have enough buffers to send */
476 if (!server.glueoutputbuf &&
477 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
478 !(c->flags & REDIS_MASTER))
479 {
480 sendReplyToClientWritev(el, fd, privdata, mask);
481 return;
482 }
483
484 while(c->bufpos > 0 || listLength(c->reply)) {
485 if (c->bufpos > 0) {
486 if (c->flags & REDIS_MASTER) {
487 /* Don't reply to a master */
488 nwritten = c->bufpos - c->sentlen;
489 } else {
490 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
491 if (nwritten <= 0) break;
492 }
493 c->sentlen += nwritten;
494 totwritten += nwritten;
495
496 /* If the buffer was sent, set bufpos to zero to continue with
497 * the remainder of the reply. */
498 if (c->sentlen == c->bufpos) {
499 c->bufpos = 0;
500 c->sentlen = 0;
501 }
502 } else {
503 o = listNodeValue(listFirst(c->reply));
504 objlen = sdslen(o->ptr);
505
506 if (objlen == 0) {
507 listDelNode(c->reply,listFirst(c->reply));
508 continue;
509 }
510
511 if (c->flags & REDIS_MASTER) {
512 /* Don't reply to a master */
513 nwritten = objlen - c->sentlen;
514 } else {
515 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
516 if (nwritten <= 0) break;
517 }
518 c->sentlen += nwritten;
519 totwritten += nwritten;
520
521 /* If we fully sent the object on head go to the next one */
522 if (c->sentlen == objlen) {
523 listDelNode(c->reply,listFirst(c->reply));
524 c->sentlen = 0;
525 }
526 }
527 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
528 * bytes, in a single threaded server it's a good idea to serve
529 * other clients as well, even if a very large request comes from
530 * super fast link that is always able to accept data (in real world
531 * scenario think about 'KEYS *' against the loopback interfae) */
532 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
533 }
534 if (nwritten == -1) {
535 if (errno == EAGAIN) {
536 nwritten = 0;
537 } else {
538 redisLog(REDIS_VERBOSE,
539 "Error writing to client: %s", strerror(errno));
540 freeClient(c);
541 return;
542 }
543 }
544 if (totwritten > 0) c->lastinteraction = time(NULL);
545 if (listLength(c->reply) == 0) {
546 c->sentlen = 0;
547 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
548 }
549 }
550
551 void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
552 {
553 redisClient *c = privdata;
554 int nwritten = 0, totwritten = 0, objlen, willwrite;
555 robj *o;
556 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
557 int offset, ion = 0;
558 REDIS_NOTUSED(el);
559 REDIS_NOTUSED(mask);
560
561 listNode *node;
562 while (listLength(c->reply)) {
563 offset = c->sentlen;
564 ion = 0;
565 willwrite = 0;
566
567 /* fill-in the iov[] array */
568 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
569 o = listNodeValue(node);
570 objlen = sdslen(o->ptr);
571
572 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
573 break;
574
575 if(ion == REDIS_WRITEV_IOVEC_COUNT)
576 break; /* no more iovecs */
577
578 iov[ion].iov_base = ((char*)o->ptr) + offset;
579 iov[ion].iov_len = objlen - offset;
580 willwrite += objlen - offset;
581 offset = 0; /* just for the first item */
582 ion++;
583 }
584
585 if(willwrite == 0)
586 break;
587
588 /* write all collected blocks at once */
589 if((nwritten = writev(fd, iov, ion)) < 0) {
590 if (errno != EAGAIN) {
591 redisLog(REDIS_VERBOSE,
592 "Error writing to client: %s", strerror(errno));
593 freeClient(c);
594 return;
595 }
596 break;
597 }
598
599 totwritten += nwritten;
600 offset = c->sentlen;
601
602 /* remove written robjs from c->reply */
603 while (nwritten && listLength(c->reply)) {
604 o = listNodeValue(listFirst(c->reply));
605 objlen = sdslen(o->ptr);
606
607 if(nwritten >= objlen - offset) {
608 listDelNode(c->reply, listFirst(c->reply));
609 nwritten -= objlen - offset;
610 c->sentlen = 0;
611 } else {
612 /* partial write */
613 c->sentlen += nwritten;
614 break;
615 }
616 offset = 0;
617 }
618 }
619
620 if (totwritten > 0)
621 c->lastinteraction = time(NULL);
622
623 if (listLength(c->reply) == 0) {
624 c->sentlen = 0;
625 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
626 }
627 }
628
629 /* resetClient prepare the client to process the next command */
630 void resetClient(redisClient *c) {
631 freeClientArgv(c);
632 c->bulklen = -1;
633 c->multibulk = 0;
634 }
635
636 void closeTimedoutClients(void) {
637 redisClient *c;
638 listNode *ln;
639 time_t now = time(NULL);
640 listIter li;
641
642 listRewind(server.clients,&li);
643 while ((ln = listNext(&li)) != NULL) {
644 c = listNodeValue(ln);
645 if (server.maxidletime &&
646 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
647 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
648 !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
649 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
650 listLength(c->pubsub_patterns) == 0 &&
651 (now - c->lastinteraction > server.maxidletime))
652 {
653 redisLog(REDIS_VERBOSE,"Closing idle client");
654 freeClient(c);
655 } else if (c->flags & REDIS_BLOCKED) {
656 if (c->blockingto != 0 && c->blockingto < now) {
657 addReply(c,shared.nullmultibulk);
658 unblockClientWaitingData(c);
659 }
660 }
661 }
662 }
663
664 void processInputBuffer(redisClient *c) {
665 again:
666 /* Before to process the input buffer, make sure the client is not
667 * waitig for a blocking operation such as BLPOP. Note that the first
668 * iteration the client is never blocked, otherwise the processInputBuffer
669 * would not be called at all, but after the execution of the first commands
670 * in the input buffer the client may be blocked, and the "goto again"
671 * will try to reiterate. The following line will make it return asap. */
672 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
673 if (c->bulklen == -1) {
674 /* Read the first line of the query */
675 char *p = strchr(c->querybuf,'\n');
676 size_t querylen;
677
678 if (p) {
679 sds query, *argv;
680 int argc, j;
681
682 query = c->querybuf;
683 c->querybuf = sdsempty();
684 querylen = 1+(p-(query));
685 if (sdslen(query) > querylen) {
686 /* leave data after the first line of the query in the buffer */
687 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
688 }
689 *p = '\0'; /* remove "\n" */
690 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
691 sdsupdatelen(query);
692
693 /* Now we can split the query in arguments */
694 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
695 sdsfree(query);
696
697 if (c->argv) zfree(c->argv);
698 c->argv = zmalloc(sizeof(robj*)*argc);
699
700 for (j = 0; j < argc; j++) {
701 if (sdslen(argv[j])) {
702 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
703 c->argc++;
704 } else {
705 sdsfree(argv[j]);
706 }
707 }
708 zfree(argv);
709 if (c->argc) {
710 /* Execute the command. If the client is still valid
711 * after processCommand() return and there is something
712 * on the query buffer try to process the next command. */
713 if (processCommand(c) && sdslen(c->querybuf)) goto again;
714 } else {
715 /* Nothing to process, argc == 0. Just process the query
716 * buffer if it's not empty or return to the caller */
717 if (sdslen(c->querybuf)) goto again;
718 }
719 return;
720 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
721 redisLog(REDIS_VERBOSE, "Client protocol error");
722 freeClient(c);
723 return;
724 }
725 } else {
726 /* Bulk read handling. Note that if we are at this point
727 the client already sent a command terminated with a newline,
728 we are reading the bulk data that is actually the last
729 argument of the command. */
730 int qbl = sdslen(c->querybuf);
731
732 if (c->bulklen <= qbl) {
733 /* Copy everything but the final CRLF as final argument */
734 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
735 c->argc++;
736 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
737 /* Process the command. If the client is still valid after
738 * the processing and there is more data in the buffer
739 * try to parse it. */
740 if (processCommand(c) && sdslen(c->querybuf)) goto again;
741 return;
742 }
743 }
744 }
745
746 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
747 redisClient *c = (redisClient*) privdata;
748 char buf[REDIS_IOBUF_LEN];
749 int nread;
750 REDIS_NOTUSED(el);
751 REDIS_NOTUSED(mask);
752
753 nread = read(fd, buf, REDIS_IOBUF_LEN);
754 if (nread == -1) {
755 if (errno == EAGAIN) {
756 nread = 0;
757 } else {
758 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
759 freeClient(c);
760 return;
761 }
762 } else if (nread == 0) {
763 redisLog(REDIS_VERBOSE, "Client closed connection");
764 freeClient(c);
765 return;
766 }
767 if (nread) {
768 c->querybuf = sdscatlen(c->querybuf, buf, nread);
769 c->lastinteraction = time(NULL);
770 } else {
771 return;
772 }
773 processInputBuffer(c);
774 }