]> git.saurik.com Git - redis.git/blob - src/networking.c
Merge remote branch 'pietern/networking-perf'
[redis.git] / src / networking.c
1 #include "redis.h"
2 #include <sys/uio.h>
3
4 void *dupClientReplyValue(void *o) {
5 incrRefCount((robj*)o);
6 return o;
7 }
8
9 int listMatchObjects(void *a, void *b) {
10 return equalStringObjects(a,b);
11 }
12
13 redisClient *createClient(int fd) {
14 redisClient *c;
15
16 /* Allocate more space to hold a static write buffer. */
17 c = zmalloc(sizeof(redisClient)+REDIS_REPLY_CHUNK_BYTES);
18 c->buflen = REDIS_REPLY_CHUNK_BYTES;
19 c->bufpos = 0;
20
21 anetNonBlock(NULL,fd);
22 anetTcpNoDelay(NULL,fd);
23 if (!c) return NULL;
24 if (aeCreateFileEvent(server.el,fd,AE_READABLE,
25 readQueryFromClient, c) == AE_ERR)
26 {
27 close(fd);
28 zfree(c);
29 return NULL;
30 }
31
32 selectDb(c,0);
33 c->fd = fd;
34 c->querybuf = sdsempty();
35 c->argc = 0;
36 c->argv = NULL;
37 c->bulklen = -1;
38 c->multibulk = 0;
39 c->mbargc = 0;
40 c->mbargv = NULL;
41 c->sentlen = 0;
42 c->flags = 0;
43 c->lastinteraction = time(NULL);
44 c->authenticated = 0;
45 c->replstate = REDIS_REPL_NONE;
46 c->reply = listCreate();
47 listSetFreeMethod(c->reply,decrRefCount);
48 listSetDupMethod(c->reply,dupClientReplyValue);
49 c->blocking_keys = NULL;
50 c->blocking_keys_num = 0;
51 c->io_keys = listCreate();
52 c->watched_keys = listCreate();
53 listSetFreeMethod(c->io_keys,decrRefCount);
54 c->pubsub_channels = dictCreate(&setDictType,NULL);
55 c->pubsub_patterns = listCreate();
56 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
57 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
58 listAddNodeTail(server.clients,c);
59 initClientMultiState(c);
60 return c;
61 }
62
63 int _ensureFileEvent(redisClient *c) {
64 if (c->fd <= 0) return REDIS_ERR;
65 if (c->bufpos == 0 && listLength(c->reply) == 0 &&
66 (c->replstate == REDIS_REPL_NONE ||
67 c->replstate == REDIS_REPL_ONLINE) &&
68 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
69 sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
70 return REDIS_OK;
71 }
72
73 /* Create a duplicate of the last object in the reply list when
74 * it is not exclusively owned by the reply list. */
75 robj *dupLastObjectIfNeeded(list *reply) {
76 robj *new, *cur;
77 listNode *ln;
78 redisAssert(listLength(reply) > 0);
79 ln = listLast(reply);
80 cur = listNodeValue(ln);
81 if (cur->refcount > 1) {
82 new = dupStringObject(cur);
83 decrRefCount(cur);
84 listNodeValue(ln) = new;
85 }
86 return listNodeValue(ln);
87 }
88
89 int _addReplyToBuffer(redisClient *c, char *s, size_t len) {
90 size_t available = c->buflen-c->bufpos;
91
92 /* If there already are entries in the reply list, we cannot
93 * add anything more to the static buffer. */
94 if (listLength(c->reply) > 0) return REDIS_ERR;
95
96 /* Check that the buffer has enough space available for this string. */
97 if (len > available) return REDIS_ERR;
98
99 memcpy(c->buf+c->bufpos,s,len);
100 c->bufpos+=len;
101 return REDIS_OK;
102 }
103
104 void _addReplyObjectToList(redisClient *c, robj *o) {
105 robj *tail;
106 if (listLength(c->reply) == 0) {
107 incrRefCount(o);
108 listAddNodeTail(c->reply,o);
109 } else {
110 tail = listNodeValue(listLast(c->reply));
111
112 /* Append to this object when possible. */
113 if (tail->ptr != NULL &&
114 sdslen(tail->ptr)+sdslen(o->ptr) <= REDIS_REPLY_CHUNK_BYTES)
115 {
116 tail = dupLastObjectIfNeeded(c->reply);
117 tail->ptr = sdscatlen(tail->ptr,o->ptr,sdslen(o->ptr));
118 } else {
119 incrRefCount(o);
120 listAddNodeTail(c->reply,o);
121 }
122 }
123 }
124
125 /* This method takes responsibility over the sds. When it is no longer
126 * needed it will be free'd, otherwise it ends up in a robj. */
127 void _addReplySdsToList(redisClient *c, sds s) {
128 robj *tail;
129 if (listLength(c->reply) == 0) {
130 listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
131 } else {
132 tail = listNodeValue(listLast(c->reply));
133
134 /* Append to this object when possible. */
135 if (tail->ptr != NULL &&
136 sdslen(tail->ptr)+sdslen(s) <= REDIS_REPLY_CHUNK_BYTES)
137 {
138 tail = dupLastObjectIfNeeded(c->reply);
139 tail->ptr = sdscatlen(tail->ptr,s,sdslen(s));
140 sdsfree(s);
141 } else {
142 listAddNodeTail(c->reply,createObject(REDIS_STRING,s));
143 }
144 }
145 }
146
147 void _addReplyStringToList(redisClient *c, char *s, size_t len) {
148 robj *tail;
149 if (listLength(c->reply) == 0) {
150 listAddNodeTail(c->reply,createStringObject(s,len));
151 } else {
152 tail = listNodeValue(listLast(c->reply));
153
154 /* Append to this object when possible. */
155 if (tail->ptr != NULL &&
156 sdslen(tail->ptr)+len <= REDIS_REPLY_CHUNK_BYTES)
157 {
158 tail = dupLastObjectIfNeeded(c->reply);
159 tail->ptr = sdscatlen(tail->ptr,s,len);
160 } else {
161 listAddNodeTail(c->reply,createStringObject(s,len));
162 }
163 }
164 }
165
166 void addReply(redisClient *c, robj *obj) {
167 if (_ensureFileEvent(c) != REDIS_OK) return;
168 if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
169 /* Returns a new object with refcount 1 */
170 obj = dupStringObject(obj);
171 } else {
172 /* This increments the refcount. */
173 obj = getDecodedObject(obj);
174 }
175 if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != REDIS_OK)
176 _addReplyObjectToList(c,obj);
177 decrRefCount(obj);
178 }
179
180 void addReplySds(redisClient *c, sds s) {
181 if (_ensureFileEvent(c) != REDIS_OK) {
182 /* The caller expects the sds to be free'd. */
183 sdsfree(s);
184 return;
185 }
186 if (_addReplyToBuffer(c,s,sdslen(s)) == REDIS_OK) {
187 sdsfree(s);
188 } else {
189 /* This method free's the sds when it is no longer needed. */
190 _addReplySdsToList(c,s);
191 }
192 }
193
194 void addReplyString(redisClient *c, char *s, size_t len) {
195 if (_ensureFileEvent(c) != REDIS_OK) return;
196 if (_addReplyToBuffer(c,s,len) != REDIS_OK)
197 _addReplyStringToList(c,s,len);
198 }
199
200 void _addReplyError(redisClient *c, char *s, size_t len) {
201 addReplyString(c,"-ERR ",5);
202 addReplyString(c,s,len);
203 addReplyString(c,"\r\n",2);
204 }
205
206 void addReplyError(redisClient *c, char *err) {
207 _addReplyError(c,err,strlen(err));
208 }
209
210 void addReplyErrorFormat(redisClient *c, const char *fmt, ...) {
211 va_list ap;
212 va_start(ap,fmt);
213 sds s = sdscatvprintf(sdsempty(),fmt,ap);
214 va_end(ap);
215 _addReplyError(c,s,sdslen(s));
216 sdsfree(s);
217 }
218
219 void _addReplyStatus(redisClient *c, char *s, size_t len) {
220 addReplyString(c,"+",1);
221 addReplyString(c,s,len);
222 addReplyString(c,"\r\n",2);
223 }
224
225 void addReplyStatus(redisClient *c, char *status) {
226 _addReplyStatus(c,status,strlen(status));
227 }
228
229 void addReplyStatusFormat(redisClient *c, const char *fmt, ...) {
230 va_list ap;
231 va_start(ap,fmt);
232 sds s = sdscatvprintf(sdsempty(),fmt,ap);
233 va_end(ap);
234 _addReplyStatus(c,s,sdslen(s));
235 sdsfree(s);
236 }
237
238 /* Adds an empty object to the reply list that will contain the multi bulk
239 * length, which is not known when this function is called. */
240 void *addDeferredMultiBulkLength(redisClient *c) {
241 if (_ensureFileEvent(c) != REDIS_OK) return NULL;
242 listAddNodeTail(c->reply,createObject(REDIS_STRING,NULL));
243 return listLast(c->reply);
244 }
245
246 /* Populate the length object and try glueing it to the next chunk. */
247 void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
248 listNode *ln = (listNode*)node;
249 robj *len, *next;
250
251 /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
252 if (node == NULL) return;
253
254 len = listNodeValue(ln);
255 len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
256 if (ln->next != NULL) {
257 next = listNodeValue(ln->next);
258
259 /* Only glue when the next node is non-NULL (an sds in this case) */
260 if (next->ptr != NULL) {
261 len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
262 listDelNode(c->reply,ln->next);
263 }
264 }
265 }
266
267 void addReplyDouble(redisClient *c, double d) {
268 char dbuf[128], sbuf[128];
269 int dlen, slen;
270 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
271 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
272 addReplyString(c,sbuf,slen);
273 }
274
275 void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
276 char buf[128];
277 int len;
278 buf[0] = prefix;
279 len = ll2string(buf+1,sizeof(buf)-1,ll);
280 buf[len+1] = '\r';
281 buf[len+2] = '\n';
282 addReplyString(c,buf,len+3);
283 }
284
285 void addReplyLongLong(redisClient *c, long long ll) {
286 _addReplyLongLong(c,ll,':');
287 }
288
289 void addReplyMultiBulkLen(redisClient *c, long length) {
290 _addReplyLongLong(c,length,'*');
291 }
292
293 void addReplyBulkLen(redisClient *c, robj *obj) {
294 size_t len;
295
296 if (obj->encoding == REDIS_ENCODING_RAW) {
297 len = sdslen(obj->ptr);
298 } else {
299 long n = (long)obj->ptr;
300
301 /* Compute how many bytes will take this integer as a radix 10 string */
302 len = 1;
303 if (n < 0) {
304 len++;
305 n = -n;
306 }
307 while((n = n/10) != 0) {
308 len++;
309 }
310 }
311 _addReplyLongLong(c,len,'$');
312 }
313
314 void addReplyBulk(redisClient *c, robj *obj) {
315 addReplyBulkLen(c,obj);
316 addReply(c,obj);
317 addReply(c,shared.crlf);
318 }
319
320 /* In the CONFIG command we need to add vanilla C string as bulk replies */
321 void addReplyBulkCString(redisClient *c, char *s) {
322 if (s == NULL) {
323 addReply(c,shared.nullbulk);
324 } else {
325 robj *o = createStringObject(s,strlen(s));
326 addReplyBulk(c,o);
327 decrRefCount(o);
328 }
329 }
330
331 void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
332 int cport, cfd;
333 char cip[128];
334 redisClient *c;
335 REDIS_NOTUSED(el);
336 REDIS_NOTUSED(mask);
337 REDIS_NOTUSED(privdata);
338
339 cfd = anetAccept(server.neterr, fd, cip, &cport);
340 if (cfd == AE_ERR) {
341 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
342 return;
343 }
344 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
345 if ((c = createClient(cfd)) == NULL) {
346 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
347 close(cfd); /* May be already closed, just ingore errors */
348 return;
349 }
350 /* If maxclient directive is set and this is one client more... close the
351 * connection. Note that we create the client instead to check before
352 * for this condition, since now the socket is already set in nonblocking
353 * mode and we can send an error for free using the Kernel I/O */
354 if (server.maxclients && listLength(server.clients) > server.maxclients) {
355 char *err = "-ERR max number of clients reached\r\n";
356
357 /* That's a best effort error message, don't check write errors */
358 if (write(c->fd,err,strlen(err)) == -1) {
359 /* Nothing to do, Just to avoid the warning... */
360 }
361 freeClient(c);
362 return;
363 }
364 server.stat_numconnections++;
365 }
366
367 static void freeClientArgv(redisClient *c) {
368 int j;
369
370 for (j = 0; j < c->argc; j++)
371 decrRefCount(c->argv[j]);
372 for (j = 0; j < c->mbargc; j++)
373 decrRefCount(c->mbargv[j]);
374 c->argc = 0;
375 c->mbargc = 0;
376 }
377
378 void freeClient(redisClient *c) {
379 listNode *ln;
380
381 /* Note that if the client we are freeing is blocked into a blocking
382 * call, we have to set querybuf to NULL *before* to call
383 * unblockClientWaitingData() to avoid processInputBuffer() will get
384 * called. Also it is important to remove the file events after
385 * this, because this call adds the READABLE event. */
386 sdsfree(c->querybuf);
387 c->querybuf = NULL;
388 if (c->flags & REDIS_BLOCKED)
389 unblockClientWaitingData(c);
390
391 /* UNWATCH all the keys */
392 unwatchAllKeys(c);
393 listRelease(c->watched_keys);
394 /* Unsubscribe from all the pubsub channels */
395 pubsubUnsubscribeAllChannels(c,0);
396 pubsubUnsubscribeAllPatterns(c,0);
397 dictRelease(c->pubsub_channels);
398 listRelease(c->pubsub_patterns);
399 /* Obvious cleanup */
400 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
401 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
402 listRelease(c->reply);
403 freeClientArgv(c);
404 close(c->fd);
405 /* Remove from the list of clients */
406 ln = listSearchKey(server.clients,c);
407 redisAssert(ln != NULL);
408 listDelNode(server.clients,ln);
409 /* Remove from the list of clients waiting for swapped keys, or ready
410 * to be restarted, but not yet woken up again. */
411 if (c->flags & REDIS_IO_WAIT) {
412 redisAssert(server.vm_enabled);
413 if (listLength(c->io_keys) == 0) {
414 ln = listSearchKey(server.io_ready_clients,c);
415
416 /* When this client is waiting to be woken up (REDIS_IO_WAIT),
417 * it should be present in the list io_ready_clients */
418 redisAssert(ln != NULL);
419 listDelNode(server.io_ready_clients,ln);
420 } else {
421 while (listLength(c->io_keys)) {
422 ln = listFirst(c->io_keys);
423 dontWaitForSwappedKey(c,ln->value);
424 }
425 }
426 server.vm_blocked_clients--;
427 }
428 listRelease(c->io_keys);
429 /* Master/slave cleanup.
430 * Case 1: we lost the connection with a slave. */
431 if (c->flags & REDIS_SLAVE) {
432 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
433 close(c->repldbfd);
434 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
435 ln = listSearchKey(l,c);
436 redisAssert(ln != NULL);
437 listDelNode(l,ln);
438 }
439
440 /* Case 2: we lost the connection with the master. */
441 if (c->flags & REDIS_MASTER) {
442 server.master = NULL;
443 server.replstate = REDIS_REPL_CONNECT;
444 /* Since we lost the connection with the master, we should also
445 * close the connection with all our slaves if we have any, so
446 * when we'll resync with the master the other slaves will sync again
447 * with us as well. Note that also when the slave is not connected
448 * to the master it will keep refusing connections by other slaves. */
449 while (listLength(server.slaves)) {
450 ln = listFirst(server.slaves);
451 freeClient((redisClient*)ln->value);
452 }
453 }
454 /* Release memory */
455 zfree(c->argv);
456 zfree(c->mbargv);
457 freeClientMultiState(c);
458 zfree(c);
459 }
460
461 void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
462 redisClient *c = privdata;
463 int nwritten = 0, totwritten = 0, objlen;
464 robj *o;
465 REDIS_NOTUSED(el);
466 REDIS_NOTUSED(mask);
467
468 /* Use writev() if we have enough buffers to send */
469 if (!server.glueoutputbuf &&
470 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
471 !(c->flags & REDIS_MASTER))
472 {
473 sendReplyToClientWritev(el, fd, privdata, mask);
474 return;
475 }
476
477 while(c->bufpos > 0 || listLength(c->reply)) {
478 if (c->bufpos > 0) {
479 if (c->flags & REDIS_MASTER) {
480 /* Don't reply to a master */
481 nwritten = c->bufpos - c->sentlen;
482 } else {
483 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
484 if (nwritten <= 0) break;
485 }
486 c->sentlen += nwritten;
487 totwritten += nwritten;
488
489 /* If the buffer was sent, set bufpos to zero to continue with
490 * the remainder of the reply. */
491 if (c->sentlen == c->bufpos) {
492 c->bufpos = 0;
493 c->sentlen = 0;
494 }
495 } else {
496 o = listNodeValue(listFirst(c->reply));
497 objlen = sdslen(o->ptr);
498
499 if (objlen == 0) {
500 listDelNode(c->reply,listFirst(c->reply));
501 continue;
502 }
503
504 if (c->flags & REDIS_MASTER) {
505 /* Don't reply to a master */
506 nwritten = objlen - c->sentlen;
507 } else {
508 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
509 if (nwritten <= 0) break;
510 }
511 c->sentlen += nwritten;
512 totwritten += nwritten;
513
514 /* If we fully sent the object on head go to the next one */
515 if (c->sentlen == objlen) {
516 listDelNode(c->reply,listFirst(c->reply));
517 c->sentlen = 0;
518 }
519 }
520 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
521 * bytes, in a single threaded server it's a good idea to serve
522 * other clients as well, even if a very large request comes from
523 * super fast link that is always able to accept data (in real world
524 * scenario think about 'KEYS *' against the loopback interfae) */
525 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
526 }
527 if (nwritten == -1) {
528 if (errno == EAGAIN) {
529 nwritten = 0;
530 } else {
531 redisLog(REDIS_VERBOSE,
532 "Error writing to client: %s", strerror(errno));
533 freeClient(c);
534 return;
535 }
536 }
537 if (totwritten > 0) c->lastinteraction = time(NULL);
538 if (listLength(c->reply) == 0) {
539 c->sentlen = 0;
540 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
541 }
542 }
543
544 void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
545 {
546 redisClient *c = privdata;
547 int nwritten = 0, totwritten = 0, objlen, willwrite;
548 robj *o;
549 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
550 int offset, ion = 0;
551 REDIS_NOTUSED(el);
552 REDIS_NOTUSED(mask);
553
554 listNode *node;
555 while (listLength(c->reply)) {
556 offset = c->sentlen;
557 ion = 0;
558 willwrite = 0;
559
560 /* fill-in the iov[] array */
561 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
562 o = listNodeValue(node);
563 objlen = sdslen(o->ptr);
564
565 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
566 break;
567
568 if(ion == REDIS_WRITEV_IOVEC_COUNT)
569 break; /* no more iovecs */
570
571 iov[ion].iov_base = ((char*)o->ptr) + offset;
572 iov[ion].iov_len = objlen - offset;
573 willwrite += objlen - offset;
574 offset = 0; /* just for the first item */
575 ion++;
576 }
577
578 if(willwrite == 0)
579 break;
580
581 /* write all collected blocks at once */
582 if((nwritten = writev(fd, iov, ion)) < 0) {
583 if (errno != EAGAIN) {
584 redisLog(REDIS_VERBOSE,
585 "Error writing to client: %s", strerror(errno));
586 freeClient(c);
587 return;
588 }
589 break;
590 }
591
592 totwritten += nwritten;
593 offset = c->sentlen;
594
595 /* remove written robjs from c->reply */
596 while (nwritten && listLength(c->reply)) {
597 o = listNodeValue(listFirst(c->reply));
598 objlen = sdslen(o->ptr);
599
600 if(nwritten >= objlen - offset) {
601 listDelNode(c->reply, listFirst(c->reply));
602 nwritten -= objlen - offset;
603 c->sentlen = 0;
604 } else {
605 /* partial write */
606 c->sentlen += nwritten;
607 break;
608 }
609 offset = 0;
610 }
611 }
612
613 if (totwritten > 0)
614 c->lastinteraction = time(NULL);
615
616 if (listLength(c->reply) == 0) {
617 c->sentlen = 0;
618 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
619 }
620 }
621
622 /* resetClient prepare the client to process the next command */
623 void resetClient(redisClient *c) {
624 freeClientArgv(c);
625 c->bulklen = -1;
626 c->multibulk = 0;
627 }
628
629 void closeTimedoutClients(void) {
630 redisClient *c;
631 listNode *ln;
632 time_t now = time(NULL);
633 listIter li;
634
635 listRewind(server.clients,&li);
636 while ((ln = listNext(&li)) != NULL) {
637 c = listNodeValue(ln);
638 if (server.maxidletime &&
639 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
640 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
641 !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
642 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
643 listLength(c->pubsub_patterns) == 0 &&
644 (now - c->lastinteraction > server.maxidletime))
645 {
646 redisLog(REDIS_VERBOSE,"Closing idle client");
647 freeClient(c);
648 } else if (c->flags & REDIS_BLOCKED) {
649 if (c->blockingto != 0 && c->blockingto < now) {
650 addReply(c,shared.nullmultibulk);
651 unblockClientWaitingData(c);
652 }
653 }
654 }
655 }
656
657 void processInputBuffer(redisClient *c) {
658 again:
659 /* Before to process the input buffer, make sure the client is not
660 * waitig for a blocking operation such as BLPOP. Note that the first
661 * iteration the client is never blocked, otherwise the processInputBuffer
662 * would not be called at all, but after the execution of the first commands
663 * in the input buffer the client may be blocked, and the "goto again"
664 * will try to reiterate. The following line will make it return asap. */
665 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
666 if (c->bulklen == -1) {
667 /* Read the first line of the query */
668 char *p = strchr(c->querybuf,'\n');
669 size_t querylen;
670
671 if (p) {
672 sds query, *argv;
673 int argc, j;
674
675 query = c->querybuf;
676 c->querybuf = sdsempty();
677 querylen = 1+(p-(query));
678 if (sdslen(query) > querylen) {
679 /* leave data after the first line of the query in the buffer */
680 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
681 }
682 *p = '\0'; /* remove "\n" */
683 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
684 sdsupdatelen(query);
685
686 /* Now we can split the query in arguments */
687 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
688 sdsfree(query);
689
690 if (c->argv) zfree(c->argv);
691 c->argv = zmalloc(sizeof(robj*)*argc);
692
693 for (j = 0; j < argc; j++) {
694 if (sdslen(argv[j])) {
695 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
696 c->argc++;
697 } else {
698 sdsfree(argv[j]);
699 }
700 }
701 zfree(argv);
702 if (c->argc) {
703 /* Execute the command. If the client is still valid
704 * after processCommand() return and there is something
705 * on the query buffer try to process the next command. */
706 if (processCommand(c) && sdslen(c->querybuf)) goto again;
707 } else {
708 /* Nothing to process, argc == 0. Just process the query
709 * buffer if it's not empty or return to the caller */
710 if (sdslen(c->querybuf)) goto again;
711 }
712 return;
713 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
714 redisLog(REDIS_VERBOSE, "Client protocol error");
715 freeClient(c);
716 return;
717 }
718 } else {
719 /* Bulk read handling. Note that if we are at this point
720 the client already sent a command terminated with a newline,
721 we are reading the bulk data that is actually the last
722 argument of the command. */
723 int qbl = sdslen(c->querybuf);
724
725 if (c->bulklen <= qbl) {
726 /* Copy everything but the final CRLF as final argument */
727 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
728 c->argc++;
729 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
730 /* Process the command. If the client is still valid after
731 * the processing and there is more data in the buffer
732 * try to parse it. */
733 if (processCommand(c) && sdslen(c->querybuf)) goto again;
734 return;
735 }
736 }
737 }
738
739 void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
740 redisClient *c = (redisClient*) privdata;
741 char buf[REDIS_IOBUF_LEN];
742 int nread;
743 REDIS_NOTUSED(el);
744 REDIS_NOTUSED(mask);
745
746 nread = read(fd, buf, REDIS_IOBUF_LEN);
747 if (nread == -1) {
748 if (errno == EAGAIN) {
749 nread = 0;
750 } else {
751 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
752 freeClient(c);
753 return;
754 }
755 } else if (nread == 0) {
756 redisLog(REDIS_VERBOSE, "Client closed connection");
757 freeClient(c);
758 return;
759 }
760 if (nread) {
761 c->querybuf = sdscatlen(c->querybuf, buf, nread);
762 c->lastinteraction = time(NULL);
763 } else {
764 return;
765 }
766 processInputBuffer(c);
767 }