]> git.saurik.com Git - redis.git/blob - src/networking.c
Merge branch 'master' into networking-perf
[redis.git] / src / networking.c
1 #include "redis.h"
2 #include <sys/uio.h>
3
4 void *dupClientReplyValue(void *o) {
5 incrRefCount((robj*)o);
6 return o;
7 }
8
9 int listMatchObjects(void *a, void *b) {
10 return equalStringObjects(a,b);
11 }
12
13 redisClient *createClient(int fd) {
14 redisClient *c;
15
16 /* Allocate more space to hold a static write buffer. */
17 c = zmalloc(sizeof(redisClient)+REDIS_REPLY_CHUNK_BYTES);
18 c->buflen = REDIS_REPLY_CHUNK_BYTES;
19 c->bufpos = 0;
20
21 anetNonBlock(NULL,fd);
22 anetTcpNoDelay(NULL,fd);
23 if (!c) return NULL;
24 selectDb(c,0);
25 c->fd = fd;
26 c->querybuf = sdsempty();
27 c->argc = 0;
28 c->argv = NULL;
29 c->bulklen = -1;
30 c->multibulk = 0;
31 c->mbargc = 0;
32 c->mbargv = NULL;
33 c->sentlen = 0;
34 c->flags = 0;
35 c->lastinteraction = time(NULL);
36 c->authenticated = 0;
37 c->replstate = REDIS_REPL_NONE;
38 c->reply = listCreate();
39 listSetFreeMethod(c->reply,decrRefCount);
40 listSetDupMethod(c->reply,dupClientReplyValue);
41 c->blocking_keys = NULL;
42 c->blocking_keys_num = 0;
43 c->io_keys = listCreate();
44 c->watched_keys = listCreate();
45 listSetFreeMethod(c->io_keys,decrRefCount);
46 c->pubsub_channels = dictCreate(&setDictType,NULL);
47 c->pubsub_patterns = listCreate();
48 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
49 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
50 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
51 readQueryFromClient, c) == AE_ERR) {
52 freeClient(c);
53 return NULL;
54 }
55 listAddNodeTail(server.clients,c);
56 initClientMultiState(c);
57 return c;
58 }
59
60 int _ensureFileEvent(redisClient *c) {
61 if (c->fd <= 0) return REDIS_ERR;
62 if (c->bufpos == 0 && listLength(c->reply) == 0 &&
63 (c->replstate == REDIS_REPL_NONE ||
64 c->replstate == REDIS_REPL_ONLINE) &&
65 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
66 sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
67 return REDIS_OK;
68 }
69
70 /* Create a duplicate of the last object in the reply list when
71 * it is not exclusively owned by the reply list. */
72 robj *dupLastObjectIfNeeded(list *reply) {
73 robj *new, *cur;
74 listNode *ln;
75 redisAssert(listLength(reply) > 0);
76 ln = listLast(reply);
77 cur = listNodeValue(ln);
78 if (cur->refcount > 1) {
79 new = dupStringObject(cur);
80 decrRefCount(cur);
81 listNodeValue(ln) = new;
82 }
83 return listNodeValue(ln);
84 }
85
86 int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
87 size_t available = c->buflen-c->bufpos;
88
89 /* If there already are entries in the reply list, we cannot
90 * add anything more to the static buffer. */
91 if (listLength(c->reply) > 0) return REDIS_ERR;
92
93 /* Check that the buffer has enough space available for this string. */
94 if (len > available) return REDIS_ERR;
95
96 memcpy(c->buf+c->bufpos,s,len);
97 c->bufpos+=len;
98 return REDIS_OK;
99 }
100
101 void _addReplyObjectToList(redisClient *c, robj *o) {
102 robj *tail;
103 if (listLength(c->reply) == 0) {
104 incrRefCount(o);
105 listAddNodeTail(c->reply,o);
106 } else {
107 tail = listNodeValue(listLast(c->reply));
108
109 /* Append to this object when possible. */
110 if (tail->ptr != NULL &&
111 sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
112 {
113 tail = dupLastObjectIfNeeded(c->reply);
114 tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
115 } else {
116 incrRefCount(o);
117 listAddNodeTail(c->reply,o);
118 }
119 }
120 }
121
122 /* This method takes responsibility over the sds. When it is no longer
123 * needed it will be free'd, otherwise it ends up in a robj. */
124 void _addReplySdsToList(redisClient *c, sds s) {
125 robj *tail;
126 if (listLength(c->reply) == 0) {
127 listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
128 } else {
129 tail = listNodeValue(listLast(c->reply));
130
131 /* Append to this object when possible. */
132 if (tail->ptr != NULL &&
133 sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
134 {
135 tail = dupLastObjectIfNeeded(c->reply);
136 tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
137 sdsfree(s);
138 } else {
139 listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
140 }
141 }
142 }
143
144 void _addReplyStringToList(redisClient *c, char *s, size_t len) {
145 robj *tail;
146 if (listLength(c->reply) == 0) {
147 listAddNodeTail(c->reply,createStringObject(s,len));
148 } else {
149 tail = listNodeValue(listLast(c->reply));
150
151 /* Append to this object when possible. */
152 if (tail->ptr != NULL &&
153 sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
154 {
155 tail = dupLastObjectIfNeeded(c->reply);
156 tail->ptr = sdscatlen(tail->ptr,s,len);
157 } else {
158 listAddNodeTail(c->reply,createStringObject(s,len));
159 }
160 }
161 }
162
163 void addReply(redisClient *c, robj *obj) {
164 if (_ensureFileEvent(c) != REDIS_OK) return;
165 if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
166 /* Returns a new object with refcount 1 */
167 obj = dupStringObject(obj);
168 } else {
169 /* This increments the refcount. */
170 obj = getDecodedObject(obj);
171 }
172 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
173 _addReplyObjectToList(c,obj);
174 decrRefCount(obj);
175 }
176
177 void addReplySds(redisClient *c, sds s) {
178 if (_ensureFileEvent(c) != REDIS_OK) {
179 /* The caller expects the sds to be free'd. */
180 sdsfree(s);
181 return;
182 }
183 if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
184 sdsfree(s);
185 } else {
186 /* This method free's the sds when it is no longer needed. */
187 _addReplySdsToList(c,s);
188 }
189 }
190
191 void addReplyString(redisClient *c, char *s, size_t len) {
192 if (_ensureFileEvent(c) != REDIS_OK) return;
193 if (_addReplyToBuffer(c,s,len) != REDIS_OK)
194 _addReplyStringToList(c,s,len);
195 }
196
197 void _addReplyError(redisClient *c, char *s, size_t len) {
198 addReplyString(c,"-ERR ",5);
199 addReplyString(c,s,len);
200 addReplyString(c,"\r\n",2);
201 }
202
203 void addReplyError(redisClient *c, char *err) {
204 _addReplyError(c,err,strlen(err));
205 }
206
207 void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
208 va_list ap;
209 va_start(ap,fmt);
210 sds s = sdscatvprintf(sdsempty(),fmt,ap);
211 va_end(ap);
212 _addReplyError(c,s,sdslen(s));
213 sdsfree(s);
214 }
215
216 void _addReplyStatus(redisClient *c, char *s, size_t len) {
217 addReplyString(c,"+",1);
218 addReplyString(c,s,len);
219 addReplyString(c,"\r\n",2);
220 }
221
222 void addReplyStatus(redisClient *c, char *status) {
223 _addReplyStatus(c,status,strlen(status));
224 }
225
226 void addReplyStatusFormat(redisClient *c, const char *fmt, ...) {
227 va_list ap;
228 va_start(ap,fmt);
229 sds s = sdscatvprintf(sdsempty(),fmt,ap);
230 va_end(ap);
231 _addReplyStatus(c,s,sdslen(s));
232 sdsfree(s);
233 }
234
235 /* Adds an empty object to the reply list that will contain the multi bulk
236 * length, which is not known when this function is called. */
237 void *addDeferredMultiBulkLength(redisClient *c) {
238 if (_ensureFileEvent(c) != REDIS_OK) return NULL;
239 listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL));
240 return listLast(c->reply);
241 }
242
243 /* Populate the length object and try glueing it to the next chunk. */
244 void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
245 listNode *ln = (listNode*)node;
246 robj *len, *next;
247
248 /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
249 if (node == NULL) return;
250
251 len = listNodeValue(ln);
252 len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
253 if (ln->next != NULL) {
254 next = listNodeValue(ln->next);
255
256 /* Only glue when the next node is non-NULL (an sds in this case) */
257 if (next->ptr != NULL) {
258 len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
259 listDelNode(c->reply,ln->next);
260 }
261 }
262 }
263
264 void addReplyDouble(redisClient *c, double d) {
265 char dbuf[128], sbuf[128];
266 int dlen, slen;
267 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
268 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
269 addReplyString(c,sbuf,slen);
270 }
271
272 void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
273 char buf[128];
274 int len;
275 buf[0] = prefix;
276 len = ll2string(buf+1,sizeof(buf)-1,ll);
277 buf[len+1] = '\r';
278 buf[len+2] = '\n';
279 addReplyString(c,buf,len+3);
280 }
281
282 void addReplyLongLong(redisClient *c, long long ll) {
283 _addReplyLongLong(c,ll,':');
284 }
285
286 void addReplyMultiBulkLen(redisClient *c, long length) {
287 _addReplyLongLong(c,length,'*');
288 }
289
290 void addReplyBulkLen(redisClient *c, robj *obj) {
291 size_t len;
292
293 if (obj->encoding == REDIS_ENCODING_RAW) {
294 len = sdslen(obj->ptr);
295 } else {
296 long n = (long)obj->ptr;
297
298 /* Compute how many bytes will take this integer as a radix 10 string */
299 len = 1;
300 if (n < 0) {
301 len++;
302 n = -n;
303 }
304 while((n = n/10) != 0) {
305 len++;
306 }
307 }
308 _addReplyLongLong(c,len,'$');
309 }
310
311 void addReplyBulk(redisClient *c, robj *obj) {
312 addReplyBulkLen(c,obj);
313 addReply(c,obj);
314 addReply(c,shared.crlf);
315 }
316
317 /* In the CONFIG command we need to add vanilla C string as bulk replies */
318 void addReplyBulkCString(redisClient *c, char *s) {
319 if (s == NULL) {
320 addReply(c,shared.nullbulk);
321 } else {
322 robj *o = createStringObject(s,strlen(s));
323 addReplyBulk(c,o);
324 decrRefCount(o);
325 }
326 }
327
328 void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
329 int cport, cfd;
330 char cip[128];
331 redisClient *c;
332 REDIS_NOTUSED(el);
333 REDIS_NOTUSED(mask);
334 REDIS_NOTUSED(privdata);
335
336 cfd = anetAccept(server.neterr, fd, cip, &cport);
337 if (cfd == AE_ERR) {
338 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
339 return;
340 }
341 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
342 if ((c = createClient(cfd)) == NULL) {
343 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
344 close(cfd); /* May be already closed, just ingore errors */
345 return;
346 }
347 /* If maxclient directive is set and this is one client more... close the
348 * connection. Note that we create the client instead to check before
349 * for this condition, since now the socket is already set in nonblocking
350 * mode and we can send an error for free using the Kernel I/O */
351 if (server.maxclients && listLength(server.clients) > server.maxclients) {
352 char *err = "-ERR max number of clients reached\r\n";
353
354 /* That's a best effort error message, don't check write errors */
355 if (write(c->fd,err,strlen(err)) == -1) {
356 /* Nothing to do, Just to avoid the warning... */
357 }
358 freeClient(c);
359 return;
360 }
361 server.stat_numconnections++;
362 }
363
364 static void freeClientArgv(redisClient *c) {
365 int j;
366
367 for (j = 0; j < c->argc; j++)
368 decrRefCount(c->argv[j]);
369 for (j = 0; j < c->mbargc; j++)
370 decrRefCount(c->mbargv[j]);
371 c->argc = 0;
372 c->mbargc = 0;
373 }
374
375 void freeClient(redisClient *c) {
376 listNode *ln;
377
378 /* Note that if the client we are freeing is blocked into a blocking
379 * call, we have to set querybuf to NULL *before* to call
380 * unblockClientWaitingData() to avoid processInputBuffer() will get
381 * called. Also it is important to remove the file events after
382 * this, because this call adds the READABLE event. */
383 sdsfree(c->querybuf);
384 c->querybuf = NULL;
385 if (c->flags & REDIS_BLOCKED)
386 unblockClientWaitingData(c);
387
388 /* UNWATCH all the keys */
389 unwatchAllKeys(c);
390 listRelease(c->watched_keys);
391 /* Unsubscribe from all the pubsub channels */
392 pubsubUnsubscribeAllChannels(c,0);
393 pubsubUnsubscribeAllPatterns(c,0);
394 dictRelease(c->pubsub_channels);
395 listRelease(c->pubsub_patterns);
396 /* Obvious cleanup */
397 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
398 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
399 listRelease(c->reply);
400 freeClientArgv(c);
401 close(c->fd);
402 /* Remove from the list of clients */
403 ln = listSearchKey(server.clients,c);
404 redisAssert(ln != NULL);
405 listDelNode(server.clients,ln);
406 /* Remove from the list of clients waiting for swapped keys, or ready
407 * to be restarted, but not yet woken up again. */
408 if (c->flags & REDIS_IO_WAIT) {
409 redisAssert(server.vm_enabled);
410 if (listLength(c->io_keys) == 0) {
411 ln = listSearchKey(server.io_ready_clients,c);
412
413 /* When this client is waiting to be woken up (REDIS_IO_WAIT),
414 * it should be present in the list io_ready_clients */
415 redisAssert(ln != NULL);
416 listDelNode(server.io_ready_clients,ln);
417 } else {
418 while (listLength(c->io_keys)) {
419 ln = listFirst(c->io_keys);
420 dontWaitForSwappedKey(c,ln->value);
421 }
422 }
423 server.vm_blocked_clients--;
424 }
425 listRelease(c->io_keys);
426 /* Master/slave cleanup.
427 * Case 1: we lost the connection with a slave. */
428 if (c->flags & REDIS_SLAVE) {
429 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
430 close(c->repldbfd);
431 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
432 ln = listSearchKey(l,c);
433 redisAssert(ln != NULL);
434 listDelNode(l,ln);
435 }
436
437 /* Case 2: we lost the connection with the master. */
438 if (c->flags & REDIS_MASTER) {
439 server.master = NULL;
440 server.replstate = REDIS_REPL_CONNECT;
441 /* Since we lost the connection with the master, we should also
442 * close the connection with all our slaves if we have any, so
443 * when we'll resync with the master the other slaves will sync again
444 * with us as well. Note that also when the slave is not connected
445 * to the master it will keep refusing connections by other slaves. */
446 while (listLength(server.slaves)) {
447 ln = listFirst(server.slaves);
448 freeClient((redisClient*)ln->value);
449 }
450 }
451 /* Release memory */
452 zfree(c->argv);
453 zfree(c->mbargv);
454 freeClientMultiState(c);
455 zfree(c);
456 }
457
458 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
459 redisClient *c = privdata;
460 int nwritten = 0, totwritten = 0, objlen;
461 robj *o;
462 REDIS_NOTUSED(el);
463 REDIS_NOTUSED(mask);
464
465 /* Use writev() if we have enough buffers to send */
466 if (!server.glueoutputbuf &&
467 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
468 !(c->flags & REDIS_MASTER))
469 {
470 sendReplyToClientWritev(el, fd, privdata, mask);
471 return;
472 }
473
474 while(c->bufpos > 0 || listLength(c->reply)) {
475 if (c->bufpos > 0) {
476 if (c->flags & REDIS_MASTER) {
477 /* Don't reply to a master */
478 nwritten = c->bufpos - c->sentlen;
479 } else {
480 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
481 if (nwritten <= 0) break;
482 }
483 c->sentlen += nwritten;
484 totwritten += nwritten;
485
486 /* If the buffer was sent, set bufpos to zero to continue with
487 * the remainder of the reply. */
488 if (c->sentlen == c->bufpos) {
489 c->bufpos = 0;
490 c->sentlen = 0;
491 }
492 } else {
493 o = listNodeValue(listFirst(c->reply));
494 objlen = sdslen(o->ptr);
495
496 if (objlen == 0) {
497 listDelNode(c->reply,listFirst(c->reply));
498 continue;
499 }
500
501 if (c->flags & REDIS_MASTER) {
502 /* Don't reply to a master */
503 nwritten = objlen - c->sentlen;
504 } else {
505 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
506 if (nwritten <= 0) break;
507 }
508 c->sentlen += nwritten;
509 totwritten += nwritten;
510
511 /* If we fully sent the object on head go to the next one */
512 if (c->sentlen == objlen) {
513 listDelNode(c->reply,listFirst(c->reply));
514 c->sentlen = 0;
515 }
516 }
517 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
518 * bytes, in a single threaded server it's a good idea to serve
519 * other clients as well, even if a very large request comes from
520 * super fast link that is always able to accept data (in real world
521 * scenario think about 'KEYS *' against the loopback interfae) */
522 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
523 }
524 if (nwritten == -1) {
525 if (errno == EAGAIN) {
526 nwritten = 0;
527 } else {
528 redisLog(REDIS_VERBOSE,
529 "Error writing to client: %s", strerror(errno));
530 freeClient(c);
531 return;
532 }
533 }
534 if (totwritten > 0) c->lastinteraction = time(NULL);
535 if (listLength(c->reply) == 0) {
536 c->sentlen = 0;
537 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
538 }
539 }
540
541 void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
542 {
543 redisClient *c = privdata;
544 int nwritten = 0, totwritten = 0, objlen, willwrite;
545 robj *o;
546 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
547 int offset, ion = 0;
548 REDIS_NOTUSED(el);
549 REDIS_NOTUSED(mask);
550
551 listNode *node;
552 while (listLength(c->reply)) {
553 offset = c->sentlen;
554 ion = 0;
555 willwrite = 0;
556
557 /* fill-in the iov[] array */
558 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
559 o = listNodeValue(node);
560 objlen = sdslen(o->ptr);
561
562 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
563 break;
564
565 if(ion == REDIS_WRITEV_IOVEC_COUNT)
566 break; /* no more iovecs */
567
568 iov[ion].iov_base = ((char*)o->ptr) + offset;
569 iov[ion].iov_len = objlen - offset;
570 willwrite += objlen - offset;
571 offset = 0; /* just for the first item */
572 ion++;
573 }
574
575 if(willwrite == 0)
576 break;
577
578 /* write all collected blocks at once */
579 if((nwritten = writev(fd, iov, ion)) < 0) {
580 if (errno != EAGAIN) {
581 redisLog(REDIS_VERBOSE,
582 "Error writing to client: %s", strerror(errno));
583 freeClient(c);
584 return;
585 }
586 break;
587 }
588
589 totwritten += nwritten;
590 offset = c->sentlen;
591
592 /* remove written robjs from c->reply */
593 while (nwritten && listLength(c->reply)) {
594 o = listNodeValue(listFirst(c->reply));
595 objlen = sdslen(o->ptr);
596
597 if(nwritten >= objlen - offset) {
598 listDelNode(c->reply, listFirst(c->reply));
599 nwritten -= objlen - offset;
600 c->sentlen = 0;
601 } else {
602 /* partial write */
603 c->sentlen += nwritten;
604 break;
605 }
606 offset = 0;
607 }
608 }
609
610 if (totwritten > 0)
611 c->lastinteraction = time(NULL);
612
613 if (listLength(c->reply) == 0) {
614 c->sentlen = 0;
615 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
616 }
617 }
618
619 /* resetClient prepare the client to process the next command */
620 void resetClient(redisClient *c) {
621 freeClientArgv(c);
622 c->bulklen = -1;
623 c->multibulk = 0;
624 }
625
626 void closeTimedoutClients(void) {
627 redisClient *c;
628 listNode *ln;
629 time_t now = time(NULL);
630 listIter li;
631
632 listRewind(server.clients,&li);
633 while ((ln = listNext(&li)) != NULL) {
634 c = listNodeValue(ln);
635 if (server.maxidletime &&
636 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
637 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
638 !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
639 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
640 listLength(c->pubsub_patterns) == 0 &&
641 (now - c->lastinteraction > server.maxidletime))
642 {
643 redisLog(REDIS_VERBOSE,"Closing idle client");
644 freeClient(c);
645 } else if (c->flags & REDIS_BLOCKED) {
646 if (c->blockingto != 0 && c->blockingto < now) {
647 addReply(c,shared.nullmultibulk);
648 unblockClientWaitingData(c);
649 }
650 }
651 }
652 }
653
654 void processInputBuffer(redisClient *c) {
655 again:
656 /* Before to process the input buffer, make sure the client is not
657 * waitig for a blocking operation such as BLPOP. Note that the first
658 * iteration the client is never blocked, otherwise the processInputBuffer
659 * would not be called at all, but after the execution of the first commands
660 * in the input buffer the client may be blocked, and the "goto again"
661 * will try to reiterate. The following line will make it return asap. */
662 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
663 if (c->bulklen == -1) {
664 /* Read the first line of the query */
665 char *p = strchr(c->querybuf,'\n');
666 size_t querylen;
667
668 if (p) {
669 sds query, *argv;
670 int argc, j;
671
672 query = c->querybuf;
673 c->querybuf = sdsempty();
674 querylen = 1+(p-(query));
675 if (sdslen(query) > querylen) {
676 /* leave data after the first line of the query in the buffer */
677 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
678 }
679 *p = '\0'; /* remove "\n" */
680 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
681 sdsupdatelen(query);
682
683 /* Now we can split the query in arguments */
684 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
685 sdsfree(query);
686
687 if (c->argv) zfree(c->argv);
688 c->argv = zmalloc(sizeof(robj*)*argc);
689
690 for (j = 0; j < argc; j++) {
691 if (sdslen(argv[j])) {
692 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
693 c->argc++;
694 } else {
695 sdsfree(argv[j]);
696 }
697 }
698 zfree(argv);
699 if (c->argc) {
700 /* Execute the command. If the client is still valid
701 * after processCommand() return and there is something
702 * on the query buffer try to process the next command. */
703 if (processCommand(c) && sdslen(c->querybuf)) goto again;
704 } else {
705 /* Nothing to process, argc == 0. Just process the query
706 * buffer if it's not empty or return to the caller */
707 if (sdslen(c->querybuf)) goto again;
708 }
709 return;
710 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
711 redisLog(REDIS_VERBOSE, "Client protocol error");
712 freeClient(c);
713 return;
714 }
715 } else {
716 /* Bulk read handling. Note that if we are at this point
717 the client already sent a command terminated with a newline,
718 we are reading the bulk data that is actually the last
719 argument of the command. */
720 int qbl = sdslen(c->querybuf);
721
722 if (c->bulklen <= qbl) {
723 /* Copy everything but the final CRLF as final argument */
724 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
725 c->argc++;
726 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
727 /* Process the command. If the client is still valid after
728 * the processing and there is more data in the buffer
729 * try to parse it. */
730 if (processCommand(c) && sdslen(c->querybuf)) goto again;
731 return;
732 }
733 }
734 }
735
736 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
737 redisClient *c = (redisClient*) privdata;
738 char buf[REDIS_IOBUF_LEN];
739 int nread;
740 REDIS_NOTUSED(el);
741 REDIS_NOTUSED(mask);
742
743 nread = read(fd, buf, REDIS_IOBUF_LEN);
744 if (nread == -1) {
745 if (errno == EAGAIN) {
746 nread = 0;
747 } else {
748 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
749 freeClient(c);
750 return;
751 }
752 } else if (nread == 0) {
753 redisLog(REDIS_VERBOSE, "Client closed connection");
754 freeClient(c);
755 return;
756 }
757 if (nread) {
758 c->querybuf = sdscatlen(c->querybuf, buf, nread);
759 c->lastinteraction = time(NULL);
760 } else {
761 return;
762 }
763 processInputBuffer(c);
764 }