]> git.saurik.com Git - redis.git/blob - src/networking.c
Static buffer in client struct has a constant size
[redis.git] / src / networking.c
1 #include "redis.h"
2 #include <sys/uio.h>
3
4 void *dupClientReplyValue(void *o) {
5 incrRefCount((robj*)o);
6 return o;
7 }
8
9 int listMatchObjects(void *a, void *b) {
10 return equalStringObjects(a,b);
11 }
12
13 redisClient *createClient(int fd) {
14 redisClient *c = zmalloc(sizeof(redisClient));
15 c->bufpos = 0;
16
17 anetNonBlock(NULL,fd);
18 anetTcpNoDelay(NULL,fd);
19 if (!c) return NULL;
20 selectDb(c,0);
21 c->fd = fd;
22 c->querybuf = sdsempty();
23 c->argc = 0;
24 c->argv = NULL;
25 c->bulklen = -1;
26 c->multibulk = 0;
27 c->mbargc = 0;
28 c->mbargv = NULL;
29 c->sentlen = 0;
30 c->flags = 0;
31 c->lastinteraction = time(NULL);
32 c->authenticated = 0;
33 c->replstate = REDIS_REPL_NONE;
34 c->reply = listCreate();
35 listSetFreeMethod(c->reply,decrRefCount);
36 listSetDupMethod(c->reply,dupClientReplyValue);
37 c->blocking_keys = NULL;
38 c->blocking_keys_num = 0;
39 c->io_keys = listCreate();
40 c->watched_keys = listCreate();
41 listSetFreeMethod(c->io_keys,decrRefCount);
42 c->pubsub_channels = dictCreate(&setDictType,NULL);
43 c->pubsub_patterns = listCreate();
44 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
45 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
46 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
47 readQueryFromClient, c) == AE_ERR) {
48 freeClient(c);
49 return NULL;
50 }
51 listAddNodeTail(server.clients,c);
52 initClientMultiState(c);
53 return c;
54 }
55
56 int _ensureFileEvent(redisClient *c) {
57 if (c->fd <= 0) return REDIS_ERR;
58 if (c->bufpos == 0 && listLength(c->reply) == 0 &&
59 (c->replstate == REDIS_REPL_NONE ||
60 c->replstate == REDIS_REPL_ONLINE) &&
61 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
62 sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
63 return REDIS_OK;
64 }
65
66 /* Create a duplicate of the last object in the reply list when
67 * it is not exclusively owned by the reply list. */
68 robj *dupLastObjectIfNeeded(list *reply) {
69 robj *new, *cur;
70 listNode *ln;
71 redisAssert(listLength(reply) > 0);
72 ln = listLast(reply);
73 cur = listNodeValue(ln);
74 if (cur->refcount > 1) {
75 new = dupStringObject(cur);
76 decrRefCount(cur);
77 listNodeValue(ln) = new;
78 }
79 return listNodeValue(ln);
80 }
81
82 int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
83 size_t available = sizeof(c->buf)-c->bufpos;
84
85 /* If there already are entries in the reply list, we cannot
86 * add anything more to the static buffer. */
87 if (listLength(c->reply) > 0) return REDIS_ERR;
88
89 /* Check that the buffer has enough space available for this string. */
90 if (len > available) return REDIS_ERR;
91
92 memcpy(c->buf+c->bufpos,s,len);
93 c->bufpos+=len;
94 return REDIS_OK;
95 }
96
97 void _addReplyObjectToList(redisClient *c, robj *o) {
98 robj *tail;
99 if (listLength(c->reply) == 0) {
100 incrRefCount(o);
101 listAddNodeTail(c->reply,o);
102 } else {
103 tail = listNodeValue(listLast(c->reply));
104
105 /* Append to this object when possible. */
106 if (tail->ptr != NULL &&
107 sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
108 {
109 tail = dupLastObjectIfNeeded(c->reply);
110 tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
111 } else {
112 incrRefCount(o);
113 listAddNodeTail(c->reply,o);
114 }
115 }
116 }
117
118 /* This method takes responsibility over the sds. When it is no longer
119 * needed it will be free'd, otherwise it ends up in a robj. */
120 void _addReplySdsToList(redisClient *c, sds s) {
121 robj *tail;
122 if (listLength(c->reply) == 0) {
123 listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
124 } else {
125 tail = listNodeValue(listLast(c->reply));
126
127 /* Append to this object when possible. */
128 if (tail->ptr != NULL &&
129 sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
130 {
131 tail = dupLastObjectIfNeeded(c->reply);
132 tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
133 sdsfree(s);
134 } else {
135 listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
136 }
137 }
138 }
139
140 void _addReplyStringToList(redisClient *c, char *s, size_t len) {
141 robj *tail;
142 if (listLength(c->reply) == 0) {
143 listAddNodeTail(c->reply,createStringObject(s,len));
144 } else {
145 tail = listNodeValue(listLast(c->reply));
146
147 /* Append to this object when possible. */
148 if (tail->ptr != NULL &&
149 sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
150 {
151 tail = dupLastObjectIfNeeded(c->reply);
152 tail->ptr = sdscatlen(tail->ptr,s,len);
153 } else {
154 listAddNodeTail(c->reply,createStringObject(s,len));
155 }
156 }
157 }
158
159 void addReply(redisClient *c, robj *obj) {
160 if (_ensureFileEvent(c) != REDIS_OK) return;
161 if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
162 /* Returns a new object with refcount 1 */
163 obj = dupStringObject(obj);
164 } else {
165 /* This increments the refcount. */
166 obj = getDecodedObject(obj);
167 }
168 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
169 _addReplyObjectToList(c,obj);
170 decrRefCount(obj);
171 }
172
173 void addReplySds(redisClient *c, sds s) {
174 if (_ensureFileEvent(c) != REDIS_OK) {
175 /* The caller expects the sds to be free'd. */
176 sdsfree(s);
177 return;
178 }
179 if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
180 sdsfree(s);
181 } else {
182 /* This method free's the sds when it is no longer needed. */
183 _addReplySdsToList(c,s);
184 }
185 }
186
187 void addReplyString(redisClient *c, char *s, size_t len) {
188 if (_ensureFileEvent(c) != REDIS_OK) return;
189 if (_addReplyToBuffer(c,s,len) != REDIS_OK)
190 _addReplyStringToList(c,s,len);
191 }
192
193 void _addReplyError(redisClient *c, char *s, size_t len) {
194 addReplyString(c,"-ERR ",5);
195 addReplyString(c,s,len);
196 addReplyString(c,"\r\n",2);
197 }
198
199 void addReplyError(redisClient *c, char *err) {
200 _addReplyError(c,err,strlen(err));
201 }
202
203 void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
204 va_list ap;
205 va_start(ap,fmt);
206 sds s = sdscatvprintf(sdsempty(),fmt,ap);
207 va_end(ap);
208 _addReplyError(c,s,sdslen(s));
209 sdsfree(s);
210 }
211
212 void _addReplyStatus(redisClient *c, char *s, size_t len) {
213 addReplyString(c,"+",1);
214 addReplyString(c,s,len);
215 addReplyString(c,"\r\n",2);
216 }
217
218 void addReplyStatus(redisClient *c, char *status) {
219 _addReplyStatus(c,status,strlen(status));
220 }
221
222 void addReplyStatusFormat(redisClient *c, const char *fmt, ...) {
223 va_list ap;
224 va_start(ap,fmt);
225 sds s = sdscatvprintf(sdsempty(),fmt,ap);
226 va_end(ap);
227 _addReplyStatus(c,s,sdslen(s));
228 sdsfree(s);
229 }
230
231 /* Adds an empty object to the reply list that will contain the multi bulk
232 * length, which is not known when this function is called. */
233 void *addDeferredMultiBulkLength(redisClient *c) {
234 if (_ensureFileEvent(c) != REDIS_OK) return NULL;
235 listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL));
236 return listLast(c->reply);
237 }
238
239 /* Populate the length object and try glueing it to the next chunk. */
240 void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
241 listNode *ln = (listNode*)node;
242 robj *len, *next;
243
244 /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
245 if (node == NULL) return;
246
247 len = listNodeValue(ln);
248 len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
249 if (ln->next != NULL) {
250 next = listNodeValue(ln->next);
251
252 /* Only glue when the next node is non-NULL (an sds in this case) */
253 if (next->ptr != NULL) {
254 len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
255 listDelNode(c->reply,ln->next);
256 }
257 }
258 }
259
260 void addReplyDouble(redisClient *c, double d) {
261 char dbuf[128], sbuf[128];
262 int dlen, slen;
263 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
264 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
265 addReplyString(c,sbuf,slen);
266 }
267
268 void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
269 char buf[128];
270 int len;
271 buf[0] = prefix;
272 len = ll2string(buf+1,sizeof(buf)-1,ll);
273 buf[len+1] = '\r';
274 buf[len+2] = '\n';
275 addReplyString(c,buf,len+3);
276 }
277
278 void addReplyLongLong(redisClient *c, long long ll) {
279 _addReplyLongLong(c,ll,':');
280 }
281
282 void addReplyMultiBulkLen(redisClient *c, long length) {
283 _addReplyLongLong(c,length,'*');
284 }
285
286 void addReplyBulkLen(redisClient *c, robj *obj) {
287 size_t len;
288
289 if (obj->encoding == REDIS_ENCODING_RAW) {
290 len = sdslen(obj->ptr);
291 } else {
292 long n = (long)obj->ptr;
293
294 /* Compute how many bytes will take this integer as a radix 10 string */
295 len = 1;
296 if (n < 0) {
297 len++;
298 n = -n;
299 }
300 while((n = n/10) != 0) {
301 len++;
302 }
303 }
304 _addReplyLongLong(c,len,'$');
305 }
306
307 void addReplyBulk(redisClient *c, robj *obj) {
308 addReplyBulkLen(c,obj);
309 addReply(c,obj);
310 addReply(c,shared.crlf);
311 }
312
313 /* In the CONFIG command we need to add vanilla C string as bulk replies */
314 void addReplyBulkCString(redisClient *c, char *s) {
315 if (s == NULL) {
316 addReply(c,shared.nullbulk);
317 } else {
318 robj *o = createStringObject(s,strlen(s));
319 addReplyBulk(c,o);
320 decrRefCount(o);
321 }
322 }
323
324 void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
325 int cport, cfd;
326 char cip[128];
327 redisClient *c;
328 REDIS_NOTUSED(el);
329 REDIS_NOTUSED(mask);
330 REDIS_NOTUSED(privdata);
331
332 cfd = anetAccept(server.neterr, fd, cip, &cport);
333 if (cfd == AE_ERR) {
334 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
335 return;
336 }
337 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
338 if ((c = createClient(cfd)) == NULL) {
339 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
340 close(cfd); /* May be already closed, just ingore errors */
341 return;
342 }
343 /* If maxclient directive is set and this is one client more... close the
344 * connection. Note that we create the client instead to check before
345 * for this condition, since now the socket is already set in nonblocking
346 * mode and we can send an error for free using the Kernel I/O */
347 if (server.maxclients && listLength(server.clients) > server.maxclients) {
348 char *err = "-ERR max number of clients reached\r\n";
349
350 /* That's a best effort error message, don't check write errors */
351 if (write(c->fd,err,strlen(err)) == -1) {
352 /* Nothing to do, Just to avoid the warning... */
353 }
354 freeClient(c);
355 return;
356 }
357 server.stat_numconnections++;
358 }
359
360 static void freeClientArgv(redisClient *c) {
361 int j;
362
363 for (j = 0; j < c->argc; j++)
364 decrRefCount(c->argv[j]);
365 for (j = 0; j < c->mbargc; j++)
366 decrRefCount(c->mbargv[j]);
367 c->argc = 0;
368 c->mbargc = 0;
369 }
370
371 void freeClient(redisClient *c) {
372 listNode *ln;
373
374 /* Note that if the client we are freeing is blocked into a blocking
375 * call, we have to set querybuf to NULL *before* to call
376 * unblockClientWaitingData() to avoid processInputBuffer() will get
377 * called. Also it is important to remove the file events after
378 * this, because this call adds the READABLE event. */
379 sdsfree(c->querybuf);
380 c->querybuf = NULL;
381 if (c->flags & REDIS_BLOCKED)
382 unblockClientWaitingData(c);
383
384 /* UNWATCH all the keys */
385 unwatchAllKeys(c);
386 listRelease(c->watched_keys);
387 /* Unsubscribe from all the pubsub channels */
388 pubsubUnsubscribeAllChannels(c,0);
389 pubsubUnsubscribeAllPatterns(c,0);
390 dictRelease(c->pubsub_channels);
391 listRelease(c->pubsub_patterns);
392 /* Obvious cleanup */
393 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
394 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
395 listRelease(c->reply);
396 freeClientArgv(c);
397 close(c->fd);
398 /* Remove from the list of clients */
399 ln = listSearchKey(server.clients,c);
400 redisAssert(ln != NULL);
401 listDelNode(server.clients,ln);
402 /* Remove from the list of clients waiting for swapped keys, or ready
403 * to be restarted, but not yet woken up again. */
404 if (c->flags & REDIS_IO_WAIT) {
405 redisAssert(server.vm_enabled);
406 if (listLength(c->io_keys) == 0) {
407 ln = listSearchKey(server.io_ready_clients,c);
408
409 /* When this client is waiting to be woken up (REDIS_IO_WAIT),
410 * it should be present in the list io_ready_clients */
411 redisAssert(ln != NULL);
412 listDelNode(server.io_ready_clients,ln);
413 } else {
414 while (listLength(c->io_keys)) {
415 ln = listFirst(c->io_keys);
416 dontWaitForSwappedKey(c,ln->value);
417 }
418 }
419 server.vm_blocked_clients--;
420 }
421 listRelease(c->io_keys);
422 /* Master/slave cleanup.
423 * Case 1: we lost the connection with a slave. */
424 if (c->flags & REDIS_SLAVE) {
425 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
426 close(c->repldbfd);
427 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
428 ln = listSearchKey(l,c);
429 redisAssert(ln != NULL);
430 listDelNode(l,ln);
431 }
432
433 /* Case 2: we lost the connection with the master. */
434 if (c->flags & REDIS_MASTER) {
435 server.master = NULL;
436 server.replstate = REDIS_REPL_CONNECT;
437 /* Since we lost the connection with the master, we should also
438 * close the connection with all our slaves if we have any, so
439 * when we'll resync with the master the other slaves will sync again
440 * with us as well. Note that also when the slave is not connected
441 * to the master it will keep refusing connections by other slaves. */
442 while (listLength(server.slaves)) {
443 ln = listFirst(server.slaves);
444 freeClient((redisClient*)ln->value);
445 }
446 }
447 /* Release memory */
448 zfree(c->argv);
449 zfree(c->mbargv);
450 freeClientMultiState(c);
451 zfree(c);
452 }
453
454 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
455 redisClient *c = privdata;
456 int nwritten = 0, totwritten = 0, objlen;
457 robj *o;
458 REDIS_NOTUSED(el);
459 REDIS_NOTUSED(mask);
460
461 /* Use writev() if we have enough buffers to send */
462 if (!server.glueoutputbuf &&
463 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
464 !(c->flags & REDIS_MASTER))
465 {
466 sendReplyToClientWritev(el, fd, privdata, mask);
467 return;
468 }
469
470 while(c->bufpos > 0 || listLength(c->reply)) {
471 if (c->bufpos > 0) {
472 if (c->flags & REDIS_MASTER) {
473 /* Don't reply to a master */
474 nwritten = c->bufpos - c->sentlen;
475 } else {
476 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
477 if (nwritten <= 0) break;
478 }
479 c->sentlen += nwritten;
480 totwritten += nwritten;
481
482 /* If the buffer was sent, set bufpos to zero to continue with
483 * the remainder of the reply. */
484 if (c->sentlen == c->bufpos) {
485 c->bufpos = 0;
486 c->sentlen = 0;
487 }
488 } else {
489 o = listNodeValue(listFirst(c->reply));
490 objlen = sdslen(o->ptr);
491
492 if (objlen == 0) {
493 listDelNode(c->reply,listFirst(c->reply));
494 continue;
495 }
496
497 if (c->flags & REDIS_MASTER) {
498 /* Don't reply to a master */
499 nwritten = objlen - c->sentlen;
500 } else {
501 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
502 if (nwritten <= 0) break;
503 }
504 c->sentlen += nwritten;
505 totwritten += nwritten;
506
507 /* If we fully sent the object on head go to the next one */
508 if (c->sentlen == objlen) {
509 listDelNode(c->reply,listFirst(c->reply));
510 c->sentlen = 0;
511 }
512 }
513 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
514 * bytes, in a single threaded server it's a good idea to serve
515 * other clients as well, even if a very large request comes from
516 * super fast link that is always able to accept data (in real world
517 * scenario think about 'KEYS *' against the loopback interfae) */
518 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
519 }
520 if (nwritten == -1) {
521 if (errno == EAGAIN) {
522 nwritten = 0;
523 } else {
524 redisLog(REDIS_VERBOSE,
525 "Error writing to client: %s", strerror(errno));
526 freeClient(c);
527 return;
528 }
529 }
530 if (totwritten > 0) c->lastinteraction = time(NULL);
531 if (listLength(c->reply) == 0) {
532 c->sentlen = 0;
533 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
534 }
535 }
536
537 void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
538 {
539 redisClient *c = privdata;
540 int nwritten = 0, totwritten = 0, objlen, willwrite;
541 robj *o;
542 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
543 int offset, ion = 0;
544 REDIS_NOTUSED(el);
545 REDIS_NOTUSED(mask);
546
547 listNode *node;
548 while (listLength(c->reply)) {
549 offset = c->sentlen;
550 ion = 0;
551 willwrite = 0;
552
553 /* fill-in the iov[] array */
554 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
555 o = listNodeValue(node);
556 objlen = sdslen(o->ptr);
557
558 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
559 break;
560
561 if(ion == REDIS_WRITEV_IOVEC_COUNT)
562 break; /* no more iovecs */
563
564 iov[ion].iov_base = ((char*)o->ptr) + offset;
565 iov[ion].iov_len = objlen - offset;
566 willwrite += objlen - offset;
567 offset = 0; /* just for the first item */
568 ion++;
569 }
570
571 if(willwrite == 0)
572 break;
573
574 /* write all collected blocks at once */
575 if((nwritten = writev(fd, iov, ion)) < 0) {
576 if (errno != EAGAIN) {
577 redisLog(REDIS_VERBOSE,
578 "Error writing to client: %s", strerror(errno));
579 freeClient(c);
580 return;
581 }
582 break;
583 }
584
585 totwritten += nwritten;
586 offset = c->sentlen;
587
588 /* remove written robjs from c->reply */
589 while (nwritten && listLength(c->reply)) {
590 o = listNodeValue(listFirst(c->reply));
591 objlen = sdslen(o->ptr);
592
593 if(nwritten >= objlen - offset) {
594 listDelNode(c->reply, listFirst(c->reply));
595 nwritten -= objlen - offset;
596 c->sentlen = 0;
597 } else {
598 /* partial write */
599 c->sentlen += nwritten;
600 break;
601 }
602 offset = 0;
603 }
604 }
605
606 if (totwritten > 0)
607 c->lastinteraction = time(NULL);
608
609 if (listLength(c->reply) == 0) {
610 c->sentlen = 0;
611 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
612 }
613 }
614
615 /* resetClient prepare the client to process the next command */
616 void resetClient(redisClient *c) {
617 freeClientArgv(c);
618 c->bulklen = -1;
619 c->multibulk = 0;
620 }
621
622 void closeTimedoutClients(void) {
623 redisClient *c;
624 listNode *ln;
625 time_t now = time(NULL);
626 listIter li;
627
628 listRewind(server.clients,&li);
629 while ((ln = listNext(&li)) != NULL) {
630 c = listNodeValue(ln);
631 if (server.maxidletime &&
632 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
633 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
634 !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
635 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
636 listLength(c->pubsub_patterns) == 0 &&
637 (now - c->lastinteraction > server.maxidletime))
638 {
639 redisLog(REDIS_VERBOSE,"Closing idle client");
640 freeClient(c);
641 } else if (c->flags & REDIS_BLOCKED) {
642 if (c->blockingto != 0 && c->blockingto < now) {
643 addReply(c,shared.nullmultibulk);
644 unblockClientWaitingData(c);
645 }
646 }
647 }
648 }
649
650 void processInputBuffer(redisClient *c) {
651 again:
652 /* Before to process the input buffer, make sure the client is not
653 * waitig for a blocking operation such as BLPOP. Note that the first
654 * iteration the client is never blocked, otherwise the processInputBuffer
655 * would not be called at all, but after the execution of the first commands
656 * in the input buffer the client may be blocked, and the "goto again"
657 * will try to reiterate. The following line will make it return asap. */
658 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
659 if (c->bulklen == -1) {
660 /* Read the first line of the query */
661 char *p = strchr(c->querybuf,'\n');
662 size_t querylen;
663
664 if (p) {
665 sds query, *argv;
666 int argc, j;
667
668 query = c->querybuf;
669 c->querybuf = sdsempty();
670 querylen = 1+(p-(query));
671 if (sdslen(query) > querylen) {
672 /* leave data after the first line of the query in the buffer */
673 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
674 }
675 *p = '\0'; /* remove "\n" */
676 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
677 sdsupdatelen(query);
678
679 /* Now we can split the query in arguments */
680 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
681 sdsfree(query);
682
683 if (c->argv) zfree(c->argv);
684 c->argv = zmalloc(sizeof(robj*)*argc);
685
686 for (j = 0; j < argc; j++) {
687 if (sdslen(argv[j])) {
688 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
689 c->argc++;
690 } else {
691 sdsfree(argv[j]);
692 }
693 }
694 zfree(argv);
695 if (c->argc) {
696 /* Execute the command. If the client is still valid
697 * after processCommand() return and there is something
698 * on the query buffer try to process the next command. */
699 if (processCommand(c) && sdslen(c->querybuf)) goto again;
700 } else {
701 /* Nothing to process, argc == 0. Just process the query
702 * buffer if it's not empty or return to the caller */
703 if (sdslen(c->querybuf)) goto again;
704 }
705 return;
706 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
707 redisLog(REDIS_VERBOSE, "Client protocol error");
708 freeClient(c);
709 return;
710 }
711 } else {
712 /* Bulk read handling. Note that if we are at this point
713 the client already sent a command terminated with a newline,
714 we are reading the bulk data that is actually the last
715 argument of the command. */
716 int qbl = sdslen(c->querybuf);
717
718 if (c->bulklen <= qbl) {
719 /* Copy everything but the final CRLF as final argument */
720 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
721 c->argc++;
722 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
723 /* Process the command. If the client is still valid after
724 * the processing and there is more data in the buffer
725 * try to parse it. */
726 if (processCommand(c) && sdslen(c->querybuf)) goto again;
727 return;
728 }
729 }
730 }
731
732 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
733 redisClient *c = (redisClient*) privdata;
734 char buf[REDIS_IOBUF_LEN];
735 int nread;
736 REDIS_NOTUSED(el);
737 REDIS_NOTUSED(mask);
738
739 nread = read(fd, buf, REDIS_IOBUF_LEN);
740 if (nread == -1) {
741 if (errno == EAGAIN) {
742 nread = 0;
743 } else {
744 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
745 freeClient(c);
746 return;
747 }
748 } else if (nread == 0) {
749 redisLog(REDIS_VERBOSE, "Client closed connection");
750 freeClient(c);
751 return;
752 }
753 if (nread) {
754 c->querybuf = sdscatlen(c->querybuf, buf, nread);
755 c->lastinteraction = time(NULL);
756 } else {
757 return;
758 }
759 processInputBuffer(c);
760 }