]> git.saurik.com Git - redis.git/blob - src/networking.c
Changed reply buildup internals
[redis.git] / src / networking.c
1 #include "redis.h"
2 #include <sys/uio.h>
3
4 void *dupClientReplyValue(void *o) {
5 incrRefCount((robj*)o);
6 return o;
7 }
8
9 int listMatchObjects(void *a, void *b) {
10 return equalStringObjects(a,b);
11 }
12
13 redisClient *createClient(int fd) {
14 redisClient *c;
15
16 /* Allocate more space to hold a static write buffer. */
17 c = zmalloc(sizeof(redisClient)+REDIS_REPLY_CHUNK_BYTES);
18 c->buflen = REDIS_REPLY_CHUNK_BYTES;
19 c->bufpos = 0;
20
21 anetNonBlock(NULL,fd);
22 anetTcpNoDelay(NULL,fd);
23 if (!c) return NULL;
24 selectDb(c,0);
25 c->fd = fd;
26 c->querybuf = sdsempty();
27 c->argc = 0;
28 c->argv = NULL;
29 c->bulklen = -1;
30 c->multibulk = 0;
31 c->mbargc = 0;
32 c->mbargv = NULL;
33 c->sentlen = 0;
34 c->flags = 0;
35 c->lastinteraction = time(NULL);
36 c->authenticated = 0;
37 c->replstate = REDIS_REPL_NONE;
38 c->reply = listCreate();
39 listSetFreeMethod(c->reply,decrRefCount);
40 listSetDupMethod(c->reply,dupClientReplyValue);
41 c->blocking_keys = NULL;
42 c->blocking_keys_num = 0;
43 c->io_keys = listCreate();
44 c->watched_keys = listCreate();
45 listSetFreeMethod(c->io_keys,decrRefCount);
46 c->pubsub_channels = dictCreate(&setDictType,NULL);
47 c->pubsub_patterns = listCreate();
48 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
49 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
50 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
51 readQueryFromClient, c) == AE_ERR) {
52 freeClient(c);
53 return NULL;
54 }
55 listAddNodeTail(server.clients,c);
56 initClientMultiState(c);
57 return c;
58 }
59
60 int _ensureFileEvent(redisClient *c) {
61 if (c->fd <= 0) return REDIS_ERR;
62 if (c->bufpos == 0 && listLength(c->reply) == 0 &&
63 (c->replstate == REDIS_REPL_NONE ||
64 c->replstate == REDIS_REPL_ONLINE) &&
65 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
66 sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
67 return REDIS_OK;
68 }
69
70 /* Create a duplicate of the last object in the reply list when
71 * it is not exclusively owned by the reply list. */
72 robj *dupLastObjectIfNeeded(list *reply) {
73 robj *new, *cur;
74 listNode *ln;
75 redisAssert(listLength(reply) > 0);
76 ln = listLast(reply);
77 cur = listNodeValue(ln);
78 if (cur->refcount > 1) {
79 new = dupStringObject(cur);
80 decrRefCount(cur);
81 listNodeValue(ln) = new;
82 }
83 return listNodeValue(ln);
84 }
85
86 int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
87 size_t available = c->buflen-c->bufpos;
88
89 /* If there already are entries in the reply list, we cannot
90 * add anything more to the static buffer. */
91 if (listLength(c->reply) > 0) return REDIS_ERR;
92
93 /* Check that the buffer has enough space available for this string. */
94 if (len > available) return REDIS_ERR;
95
96 memcpy(c->buf+c->bufpos,s,len);
97 c->bufpos+=len;
98 return REDIS_OK;
99 }
100
101 void _addReplyObjectToList(redisClient *c, robj *o) {
102 robj *tail;
103 if (listLength(c->reply) == 0) {
104 incrRefCount(o);
105 listAddNodeTail(c->reply,o);
106 } else {
107 tail = listNodeValue(listLast(c->reply));
108
109 /* Append to this object when possible. */
110 if (tail->ptr != NULL &&
111 sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
112 {
113 tail = dupLastObjectIfNeeded(c->reply);
114 tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
115 } else {
116 incrRefCount(o);
117 listAddNodeTail(c->reply,o);
118 }
119 }
120 }
121
122 /* This method takes responsibility over the sds. When it is no longer
123 * needed it will be free'd, otherwise it ends up in a robj. */
124 void _addReplySdsToList(redisClient *c, sds s) {
125 robj *tail;
126 if (listLength(c->reply) == 0) {
127 listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
128 } else {
129 tail = listNodeValue(listLast(c->reply));
130
131 /* Append to this object when possible. */
132 if (tail->ptr != NULL &&
133 sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
134 {
135 tail = dupLastObjectIfNeeded(c->reply);
136 tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
137 sdsfree(s);
138 } else {
139 listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
140 }
141 }
142 }
143
144 void _addReplyStringToList(redisClient *c, char *s, size_t len) {
145 robj *tail;
146 if (listLength(c->reply) == 0) {
147 listAddNodeTail(c->reply,createStringObject(s,len));
148 } else {
149 tail = listNodeValue(listLast(c->reply));
150
151 /* Append to this object when possible. */
152 if (tail->ptr != NULL &&
153 sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
154 {
155 tail = dupLastObjectIfNeeded(c->reply);
156 tail->ptr = sdscatlen(tail->ptr,s,len);
157 } else {
158 listAddNodeTail(c->reply,createStringObject(s,len));
159 }
160 }
161 }
162
163 void addReply(redisClient *c, robj *obj) {
164 if (_ensureFileEvent(c) != REDIS_OK) return;
165 if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
166 /* Returns a new object with refcount 1 */
167 obj = dupStringObject(obj);
168 } else {
169 /* This increments the refcount. */
170 obj = getDecodedObject(obj);
171 }
172 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
173 _addReplyObjectToList(c,obj);
174 decrRefCount(obj);
175 }
176
177 void addReplySds(redisClient *c, sds s) {
178 if (_ensureFileEvent(c) != REDIS_OK) {
179 /* The caller expects the sds to be free'd. */
180 sdsfree(s);
181 return;
182 }
183 if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
184 sdsfree(s);
185 } else {
186 /* This method free's the sds when it is no longer needed. */
187 _addReplySdsToList(c,s);
188 }
189 }
190
191 void addReplyString(redisClient *c, char *s, size_t len) {
192 if (_ensureFileEvent(c) != REDIS_OK) return;
193 if (_addReplyToBuffer(c,s,len) != REDIS_OK)
194 _addReplyStringToList(c,s,len);
195 }
196
197 /* Adds an empty object to the reply list that will contain the multi bulk
198 * length, which is not known when this function is called. */
199 void *addDeferredMultiBulkLength(redisClient *c) {
200 if (_ensureFileEvent(c) != REDIS_OK) return NULL;
201 listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL));
202 return listLast(c->reply);
203 }
204
205 /* Populate the length object and try glueing it to the next chunk. */
206 void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
207 listNode *ln = (listNode*)node;
208 robj *len, *next;
209
210 /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
211 if (node == NULL) return;
212
213 len = listNodeValue(ln);
214 len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
215 if (ln->next != NULL) {
216 next = listNodeValue(ln->next);
217
218 /* Only glue when the next node is an sds */
219 if (next->ptr != NULL) {
220 len->ptr = sdscat(len->ptr,next->ptr);
221 listDelNode(c->reply,ln->next);
222 }
223 }
224 }
225
226 void addReplyDouble(redisClient *c, double d) {
227 char dbuf[128], sbuf[128];
228 int dlen, slen;
229 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
230 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
231 addReplyString(c,sbuf,slen);
232 }
233
234 void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
235 char buf[128];
236 int len;
237 buf[0] = prefix;
238 len = ll2string(buf+1,sizeof(buf)-1,ll);
239 buf[len+1] = '\r';
240 buf[len+2] = '\n';
241 addReplyString(c,buf,len+3);
242 }
243
244 void addReplyLongLong(redisClient *c, long long ll) {
245 _addReplyLongLong(c,ll,':');
246 }
247
248 void addReplyMultiBulkLen(redisClient *c, long length) {
249 _addReplyLongLong(c,length,'*');
250 }
251
252 void addReplyBulkLen(redisClient *c, robj *obj) {
253 size_t len;
254
255 if (obj->encoding == REDIS_ENCODING_RAW) {
256 len = sdslen(obj->ptr);
257 } else {
258 long n = (long)obj->ptr;
259
260 /* Compute how many bytes will take this integer as a radix 10 string */
261 len = 1;
262 if (n < 0) {
263 len++;
264 n = -n;
265 }
266 while((n = n/10) != 0) {
267 len++;
268 }
269 }
270 _addReplyLongLong(c,len,'$');
271 }
272
273 void addReplyBulk(redisClient *c, robj *obj) {
274 addReplyBulkLen(c,obj);
275 addReply(c,obj);
276 addReply(c,shared.crlf);
277 }
278
279 /* In the CONFIG command we need to add vanilla C string as bulk replies */
280 void addReplyBulkCString(redisClient *c, char *s) {
281 if (s == NULL) {
282 addReply(c,shared.nullbulk);
283 } else {
284 robj *o = createStringObject(s,strlen(s));
285 addReplyBulk(c,o);
286 decrRefCount(o);
287 }
288 }
289
290 void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
291 int cport, cfd;
292 char cip[128];
293 redisClient *c;
294 REDIS_NOTUSED(el);
295 REDIS_NOTUSED(mask);
296 REDIS_NOTUSED(privdata);
297
298 cfd = anetAccept(server.neterr, fd, cip, &cport);
299 if (cfd == AE_ERR) {
300 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
301 return;
302 }
303 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
304 if ((c = createClient(cfd)) == NULL) {
305 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
306 close(cfd); /* May be already closed, just ingore errors */
307 return;
308 }
309 /* If maxclient directive is set and this is one client more... close the
310 * connection. Note that we create the client instead to check before
311 * for this condition, since now the socket is already set in nonblocking
312 * mode and we can send an error for free using the Kernel I/O */
313 if (server.maxclients && listLength(server.clients) > server.maxclients) {
314 char *err = "-ERR max number of clients reached\r\n";
315
316 /* That's a best effort error message, don't check write errors */
317 if (write(c->fd,err,strlen(err)) == -1) {
318 /* Nothing to do, Just to avoid the warning... */
319 }
320 freeClient(c);
321 return;
322 }
323 server.stat_numconnections++;
324 }
325
326 static void freeClientArgv(redisClient *c) {
327 int j;
328
329 for (j = 0; j < c->argc; j++)
330 decrRefCount(c->argv[j]);
331 for (j = 0; j < c->mbargc; j++)
332 decrRefCount(c->mbargv[j]);
333 c->argc = 0;
334 c->mbargc = 0;
335 }
336
337 void freeClient(redisClient *c) {
338 listNode *ln;
339
340 /* Note that if the client we are freeing is blocked into a blocking
341 * call, we have to set querybuf to NULL *before* to call
342 * unblockClientWaitingData() to avoid processInputBuffer() will get
343 * called. Also it is important to remove the file events after
344 * this, because this call adds the READABLE event. */
345 sdsfree(c->querybuf);
346 c->querybuf = NULL;
347 if (c->flags & REDIS_BLOCKED)
348 unblockClientWaitingData(c);
349
350 /* UNWATCH all the keys */
351 unwatchAllKeys(c);
352 listRelease(c->watched_keys);
353 /* Unsubscribe from all the pubsub channels */
354 pubsubUnsubscribeAllChannels(c,0);
355 pubsubUnsubscribeAllPatterns(c,0);
356 dictRelease(c->pubsub_channels);
357 listRelease(c->pubsub_patterns);
358 /* Obvious cleanup */
359 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
360 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
361 listRelease(c->reply);
362 freeClientArgv(c);
363 close(c->fd);
364 /* Remove from the list of clients */
365 ln = listSearchKey(server.clients,c);
366 redisAssert(ln != NULL);
367 listDelNode(server.clients,ln);
368 /* Remove from the list of clients waiting for swapped keys, or ready
369 * to be restarted, but not yet woken up again. */
370 if (c->flags & REDIS_IO_WAIT) {
371 redisAssert(server.vm_enabled);
372 if (listLength(c->io_keys) == 0) {
373 ln = listSearchKey(server.io_ready_clients,c);
374
375 /* When this client is waiting to be woken up (REDIS_IO_WAIT),
376 * it should be present in the list io_ready_clients */
377 redisAssert(ln != NULL);
378 listDelNode(server.io_ready_clients,ln);
379 } else {
380 while (listLength(c->io_keys)) {
381 ln = listFirst(c->io_keys);
382 dontWaitForSwappedKey(c,ln->value);
383 }
384 }
385 server.vm_blocked_clients--;
386 }
387 listRelease(c->io_keys);
388 /* Master/slave cleanup.
389 * Case 1: we lost the connection with a slave. */
390 if (c->flags & REDIS_SLAVE) {
391 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
392 close(c->repldbfd);
393 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
394 ln = listSearchKey(l,c);
395 redisAssert(ln != NULL);
396 listDelNode(l,ln);
397 }
398
399 /* Case 2: we lost the connection with the master. */
400 if (c->flags & REDIS_MASTER) {
401 server.master = NULL;
402 server.replstate = REDIS_REPL_CONNECT;
403 /* Since we lost the connection with the master, we should also
404 * close the connection with all our slaves if we have any, so
405 * when we'll resync with the master the other slaves will sync again
406 * with us as well. Note that also when the slave is not connected
407 * to the master it will keep refusing connections by other slaves. */
408 while (listLength(server.slaves)) {
409 ln = listFirst(server.slaves);
410 freeClient((redisClient*)ln->value);
411 }
412 }
413 /* Release memory */
414 zfree(c->argv);
415 zfree(c->mbargv);
416 freeClientMultiState(c);
417 zfree(c);
418 }
419
420 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
421 redisClient *c = privdata;
422 int nwritten = 0, totwritten = 0, objlen;
423 robj *o;
424 REDIS_NOTUSED(el);
425 REDIS_NOTUSED(mask);
426
427 /* Use writev() if we have enough buffers to send */
428 if (!server.glueoutputbuf &&
429 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
430 !(c->flags & REDIS_MASTER))
431 {
432 sendReplyToClientWritev(el, fd, privdata, mask);
433 return;
434 }
435
436 while(c->bufpos > 0 || listLength(c->reply)) {
437 if (c->bufpos > 0) {
438 if (c->flags & REDIS_MASTER) {
439 /* Don't reply to a master */
440 nwritten = c->bufpos - c->sentlen;
441 } else {
442 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
443 if (nwritten <= 0) break;
444 }
445 c->sentlen += nwritten;
446 totwritten += nwritten;
447
448 /* If the buffer was sent, set bufpos to zero to continue with
449 * the remainder of the reply. */
450 if (c->sentlen == c->bufpos) {
451 c->bufpos = 0;
452 c->sentlen = 0;
453 }
454 } else {
455 o = listNodeValue(listFirst(c->reply));
456 objlen = sdslen(o->ptr);
457
458 if (objlen == 0) {
459 listDelNode(c->reply,listFirst(c->reply));
460 continue;
461 }
462
463 if (c->flags & REDIS_MASTER) {
464 /* Don't reply to a master */
465 nwritten = objlen - c->sentlen;
466 } else {
467 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
468 if (nwritten <= 0) break;
469 }
470 c->sentlen += nwritten;
471 totwritten += nwritten;
472
473 /* If we fully sent the object on head go to the next one */
474 if (c->sentlen == objlen) {
475 listDelNode(c->reply,listFirst(c->reply));
476 c->sentlen = 0;
477 }
478 }
479 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
480 * bytes, in a single threaded server it's a good idea to serve
481 * other clients as well, even if a very large request comes from
482 * super fast link that is always able to accept data (in real world
483 * scenario think about 'KEYS *' against the loopback interfae) */
484 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
485 }
486 if (nwritten == -1) {
487 if (errno == EAGAIN) {
488 nwritten = 0;
489 } else {
490 redisLog(REDIS_VERBOSE,
491 "Error writing to client: %s", strerror(errno));
492 freeClient(c);
493 return;
494 }
495 }
496 if (totwritten > 0) c->lastinteraction = time(NULL);
497 if (listLength(c->reply) == 0) {
498 c->sentlen = 0;
499 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
500 }
501 }
502
503 void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
504 {
505 redisClient *c = privdata;
506 int nwritten = 0, totwritten = 0, objlen, willwrite;
507 robj *o;
508 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
509 int offset, ion = 0;
510 REDIS_NOTUSED(el);
511 REDIS_NOTUSED(mask);
512
513 listNode *node;
514 while (listLength(c->reply)) {
515 offset = c->sentlen;
516 ion = 0;
517 willwrite = 0;
518
519 /* fill-in the iov[] array */
520 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
521 o = listNodeValue(node);
522 objlen = sdslen(o->ptr);
523
524 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
525 break;
526
527 if(ion == REDIS_WRITEV_IOVEC_COUNT)
528 break; /* no more iovecs */
529
530 iov[ion].iov_base = ((char*)o->ptr) + offset;
531 iov[ion].iov_len = objlen - offset;
532 willwrite += objlen - offset;
533 offset = 0; /* just for the first item */
534 ion++;
535 }
536
537 if(willwrite == 0)
538 break;
539
540 /* write all collected blocks at once */
541 if((nwritten = writev(fd, iov, ion)) < 0) {
542 if (errno != EAGAIN) {
543 redisLog(REDIS_VERBOSE,
544 "Error writing to client: %s", strerror(errno));
545 freeClient(c);
546 return;
547 }
548 break;
549 }
550
551 totwritten += nwritten;
552 offset = c->sentlen;
553
554 /* remove written robjs from c->reply */
555 while (nwritten && listLength(c->reply)) {
556 o = listNodeValue(listFirst(c->reply));
557 objlen = sdslen(o->ptr);
558
559 if(nwritten >= objlen - offset) {
560 listDelNode(c->reply, listFirst(c->reply));
561 nwritten -= objlen - offset;
562 c->sentlen = 0;
563 } else {
564 /* partial write */
565 c->sentlen += nwritten;
566 break;
567 }
568 offset = 0;
569 }
570 }
571
572 if (totwritten > 0)
573 c->lastinteraction = time(NULL);
574
575 if (listLength(c->reply) == 0) {
576 c->sentlen = 0;
577 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
578 }
579 }
580
581 /* resetClient prepare the client to process the next command */
582 void resetClient(redisClient *c) {
583 freeClientArgv(c);
584 c->bulklen = -1;
585 c->multibulk = 0;
586 }
587
588 void closeTimedoutClients(void) {
589 redisClient *c;
590 listNode *ln;
591 time_t now = time(NULL);
592 listIter li;
593
594 listRewind(server.clients,&li);
595 while ((ln = listNext(&li)) != NULL) {
596 c = listNodeValue(ln);
597 if (server.maxidletime &&
598 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
599 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
600 !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
601 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
602 listLength(c->pubsub_patterns) == 0 &&
603 (now - c->lastinteraction > server.maxidletime))
604 {
605 redisLog(REDIS_VERBOSE,"Closing idle client");
606 freeClient(c);
607 } else if (c->flags & REDIS_BLOCKED) {
608 if (c->blockingto != 0 && c->blockingto < now) {
609 addReply(c,shared.nullmultibulk);
610 unblockClientWaitingData(c);
611 }
612 }
613 }
614 }
615
616 void processInputBuffer(redisClient *c) {
617 again:
618 /* Before to process the input buffer, make sure the client is not
619 * waitig for a blocking operation such as BLPOP. Note that the first
620 * iteration the client is never blocked, otherwise the processInputBuffer
621 * would not be called at all, but after the execution of the first commands
622 * in the input buffer the client may be blocked, and the "goto again"
623 * will try to reiterate. The following line will make it return asap. */
624 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
625 if (c->bulklen == -1) {
626 /* Read the first line of the query */
627 char *p = strchr(c->querybuf,'\n');
628 size_t querylen;
629
630 if (p) {
631 sds query, *argv;
632 int argc, j;
633
634 query = c->querybuf;
635 c->querybuf = sdsempty();
636 querylen = 1+(p-(query));
637 if (sdslen(query) > querylen) {
638 /* leave data after the first line of the query in the buffer */
639 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
640 }
641 *p = '\0'; /* remove "\n" */
642 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
643 sdsupdatelen(query);
644
645 /* Now we can split the query in arguments */
646 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
647 sdsfree(query);
648
649 if (c->argv) zfree(c->argv);
650 c->argv = zmalloc(sizeof(robj*)*argc);
651
652 for (j = 0; j < argc; j++) {
653 if (sdslen(argv[j])) {
654 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
655 c->argc++;
656 } else {
657 sdsfree(argv[j]);
658 }
659 }
660 zfree(argv);
661 if (c->argc) {
662 /* Execute the command. If the client is still valid
663 * after processCommand() return and there is something
664 * on the query buffer try to process the next command. */
665 if (processCommand(c) && sdslen(c->querybuf)) goto again;
666 } else {
667 /* Nothing to process, argc == 0. Just process the query
668 * buffer if it's not empty or return to the caller */
669 if (sdslen(c->querybuf)) goto again;
670 }
671 return;
672 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
673 redisLog(REDIS_VERBOSE, "Client protocol error");
674 freeClient(c);
675 return;
676 }
677 } else {
678 /* Bulk read handling. Note that if we are at this point
679 the client already sent a command terminated with a newline,
680 we are reading the bulk data that is actually the last
681 argument of the command. */
682 int qbl = sdslen(c->querybuf);
683
684 if (c->bulklen <= qbl) {
685 /* Copy everything but the final CRLF as final argument */
686 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
687 c->argc++;
688 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
689 /* Process the command. If the client is still valid after
690 * the processing and there is more data in the buffer
691 * try to parse it. */
692 if (processCommand(c) && sdslen(c->querybuf)) goto again;
693 return;
694 }
695 }
696 }
697
698 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
699 redisClient *c = (redisClient*) privdata;
700 char buf[REDIS_IOBUF_LEN];
701 int nread;
702 REDIS_NOTUSED(el);
703 REDIS_NOTUSED(mask);
704
705 nread = read(fd, buf, REDIS_IOBUF_LEN);
706 if (nread == -1) {
707 if (errno == EAGAIN) {
708 nread = 0;
709 } else {
710 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
711 freeClient(c);
712 return;
713 }
714 } else if (nread == 0) {
715 redisLog(REDIS_VERBOSE, "Client closed connection");
716 freeClient(c);
717 return;
718 }
719 if (nread) {
720 c->querybuf = sdscatlen(c->querybuf, buf, nread);
721 c->lastinteraction = time(NULL);
722 } else {
723 return;
724 }
725 processInputBuffer(c);
726 }