]> git.saurik.com Git - redis.git/blame - src/networking.c
Free the sds in addReplySds when it cannot be added to the reply
[redis.git] / src / networking.c
CommitLineData
e2641e09 1#include "redis.h"
e2641e09 2#include <sys/uio.h>
3
4void *dupClientReplyValue(void *o) {
5 incrRefCount((robj*)o);
6 return o;
7}
8
9int listMatchObjects(void *a, void *b) {
10 return equalStringObjects(a,b);
11}
12
13redisClient *createClient(int fd) {
834ef78e
PN
14 redisClient *c;
15
16 /* Make sure to allocate a multiple of the page size to prevent wasting
17 * memory. A page size of 4096 is assumed here. We need to compensate
18 * for the zmalloc overhead of sizeof(size_t) bytes. */
19 size_t size = 8192-sizeof(size_t);
20 redisAssert(size > sizeof(redisClient));
21 c = zmalloc(size);
22 c->buflen = size-sizeof(redisClient);
23 c->bufpos = 0;
e2641e09 24
25 anetNonBlock(NULL,fd);
26 anetTcpNoDelay(NULL,fd);
27 if (!c) return NULL;
28 selectDb(c,0);
29 c->fd = fd;
30 c->querybuf = sdsempty();
31 c->argc = 0;
32 c->argv = NULL;
33 c->bulklen = -1;
34 c->multibulk = 0;
35 c->mbargc = 0;
36 c->mbargv = NULL;
37 c->sentlen = 0;
38 c->flags = 0;
39 c->lastinteraction = time(NULL);
40 c->authenticated = 0;
41 c->replstate = REDIS_REPL_NONE;
42 c->reply = listCreate();
43 listSetFreeMethod(c->reply,decrRefCount);
44 listSetDupMethod(c->reply,dupClientReplyValue);
45 c->blocking_keys = NULL;
46 c->blocking_keys_num = 0;
47 c->io_keys = listCreate();
48 c->watched_keys = listCreate();
49 listSetFreeMethod(c->io_keys,decrRefCount);
50 c->pubsub_channels = dictCreate(&setDictType,NULL);
51 c->pubsub_patterns = listCreate();
52 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
53 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
54 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
55 readQueryFromClient, c) == AE_ERR) {
56 freeClient(c);
57 return NULL;
58 }
59 listAddNodeTail(server.clients,c);
60 initClientMultiState(c);
61 return c;
62}
63
834ef78e 64int _ensureFileEvent(redisClient *c) {
57b07380 65 if (c->fd <= 0) return REDIS_ERR;
834ef78e 66 if (c->bufpos == 0 && listLength(c->reply) == 0 &&
e2641e09 67 (c->replstate == REDIS_REPL_NONE ||
68 c->replstate == REDIS_REPL_ONLINE) &&
69 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
834ef78e
PN
70 sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
71 return REDIS_OK;
72}
73
74void _addReplyObjectToList(redisClient *c, robj *obj) {
75 redisAssert(obj->type == REDIS_STRING &&
76 obj->encoding == REDIS_ENCODING_RAW);
77 listAddNodeTail(c->reply,obj);
78}
79
80void _ensureBufferInReplyList(redisClient *c) {
81 sds buffer = sdsnewlen(NULL,REDIS_REPLY_CHUNK_SIZE);
82 sdsupdatelen(buffer); /* sdsnewlen expects non-empty string */
83 listAddNodeTail(c->reply,createObject(REDIS_REPLY_NODE,buffer));
84}
85
86void _addReplyStringToBuffer(redisClient *c, char *s, size_t len) {
87 size_t available = 0;
88 redisAssert(len < REDIS_REPLY_CHUNK_THRESHOLD);
89 if (listLength(c->reply) > 0) {
90 robj *o = listNodeValue(listLast(c->reply));
91
92 /* Make sure to append to a reply node with enough bytes available. */
93 if (o->type == REDIS_REPLY_NODE) available = sdsavail(o->ptr);
94 if (o->type != REDIS_REPLY_NODE || len > available) {
95 _ensureBufferInReplyList(c);
96 _addReplyStringToBuffer(c,s,len);
97 } else {
98 o->ptr = sdscatlen(o->ptr,s,len);
99 }
100 } else {
101 available = c->buflen-c->bufpos;
102 if (len > available) {
103 _ensureBufferInReplyList(c);
104 _addReplyStringToBuffer(c,s,len);
105 } else {
106 memcpy(c->buf+c->bufpos,s,len);
107 c->bufpos += len;
108 }
109 }
110}
e2641e09 111
834ef78e
PN
112void addReply(redisClient *c, robj *obj) {
113 if (_ensureFileEvent(c) != REDIS_OK) return;
e2641e09 114 if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
834ef78e 115 /* Returns a new object with refcount 1 */
e2641e09 116 obj = dupStringObject(obj);
834ef78e
PN
117 } else {
118 /* This increments the refcount. */
119 obj = getDecodedObject(obj);
120 }
121
122 if (sdslen(obj->ptr) < REDIS_REPLY_CHUNK_THRESHOLD) {
123 _addReplyStringToBuffer(c,obj->ptr,sdslen(obj->ptr));
124 decrRefCount(obj);
125 } else {
126 _addReplyObjectToList(c,obj);
e2641e09 127 }
e2641e09 128}
129
130void addReplySds(redisClient *c, sds s) {
cd76bb65
PN
131 if (_ensureFileEvent(c) != REDIS_OK) {
132 /* The caller expects the sds to be free'd. */
133 sdsfree(s);
134 return;
135 }
834ef78e
PN
136 if (sdslen(s) < REDIS_REPLY_CHUNK_THRESHOLD) {
137 _addReplyStringToBuffer(c,s,sdslen(s));
138 sdsfree(s);
139 } else {
140 _addReplyObjectToList(c,createObject(REDIS_STRING,s));
141 }
e2641e09 142}
143
834ef78e
PN
144void addReplyString(redisClient *c, char *s, size_t len) {
145 if (_ensureFileEvent(c) != REDIS_OK) return;
146 if (len < REDIS_REPLY_CHUNK_THRESHOLD) {
147 _addReplyStringToBuffer(c,s,len);
148 } else {
149 _addReplyObjectToList(c,createStringObject(s,len));
150 }
151}
e2641e09 152
b301c1fc
PN
153/* Adds an empty object to the reply list that will contain the multi bulk
154 * length, which is not known when this function is called. */
155void *addDeferredMultiBulkLength(redisClient *c) {
156 if (_ensureFileEvent(c) != REDIS_OK) return NULL;
157 _addReplyObjectToList(c,createObject(REDIS_STRING,NULL));
158 return listLast(c->reply);
159}
160
161/* Populate the length object and try glueing it to the next chunk. */
162void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
163 listNode *ln = (listNode*)node;
164 robj *len, *next;
165
166 /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
167 if (node == NULL) return;
168
169 len = listNodeValue(ln);
170 len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
171 if (ln->next != NULL) {
172 next = listNodeValue(ln->next);
173 /* Only glue when the next node is a reply chunk. */
174 if (next->type == REDIS_REPLY_NODE) {
175 len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
176 listDelNode(c->reply,ln->next);
177 }
178 }
179}
180
834ef78e
PN
181void addReplyDouble(redisClient *c, double d) {
182 char dbuf[128], sbuf[128];
183 int dlen, slen;
184 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
185 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
186 addReplyString(c,sbuf,slen);
e2641e09 187}
188
834ef78e 189void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
e2641e09 190 char buf[128];
834ef78e
PN
191 int len;
192 buf[0] = prefix;
e2641e09 193 len = ll2string(buf+1,sizeof(buf)-1,ll);
194 buf[len+1] = '\r';
195 buf[len+2] = '\n';
834ef78e 196 addReplyString(c,buf,len+3);
e2641e09 197}
198
834ef78e
PN
199void addReplyLongLong(redisClient *c, long long ll) {
200 _addReplyLongLong(c,ll,':');
201}
e2641e09 202
834ef78e
PN
203void addReplyUlong(redisClient *c, unsigned long ul) {
204 _addReplyLongLong(c,(long long)ul,':');
e2641e09 205}
206
0537e7bf
PN
207void addReplyMultiBulkLen(redisClient *c, long length) {
208 _addReplyLongLong(c,length,'*');
209}
210
e2641e09 211void addReplyBulkLen(redisClient *c, robj *obj) {
834ef78e 212 size_t len;
e2641e09 213
214 if (obj->encoding == REDIS_ENCODING_RAW) {
215 len = sdslen(obj->ptr);
216 } else {
217 long n = (long)obj->ptr;
218
219 /* Compute how many bytes will take this integer as a radix 10 string */
220 len = 1;
221 if (n < 0) {
222 len++;
223 n = -n;
224 }
225 while((n = n/10) != 0) {
226 len++;
227 }
228 }
834ef78e 229 _addReplyLongLong(c,len,'$');
e2641e09 230}
231
232void addReplyBulk(redisClient *c, robj *obj) {
233 addReplyBulkLen(c,obj);
234 addReply(c,obj);
235 addReply(c,shared.crlf);
236}
237
238/* In the CONFIG command we need to add vanilla C string as bulk replies */
239void addReplyBulkCString(redisClient *c, char *s) {
240 if (s == NULL) {
241 addReply(c,shared.nullbulk);
242 } else {
243 robj *o = createStringObject(s,strlen(s));
244 addReplyBulk(c,o);
245 decrRefCount(o);
246 }
247}
248
249void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
250 int cport, cfd;
251 char cip[128];
252 redisClient *c;
253 REDIS_NOTUSED(el);
254 REDIS_NOTUSED(mask);
255 REDIS_NOTUSED(privdata);
256
257 cfd = anetAccept(server.neterr, fd, cip, &cport);
258 if (cfd == AE_ERR) {
259 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
260 return;
261 }
262 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
263 if ((c = createClient(cfd)) == NULL) {
264 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
265 close(cfd); /* May be already closed, just ingore errors */
266 return;
267 }
268 /* If maxclient directive is set and this is one client more... close the
269 * connection. Note that we create the client instead to check before
270 * for this condition, since now the socket is already set in nonblocking
271 * mode and we can send an error for free using the Kernel I/O */
272 if (server.maxclients && listLength(server.clients) > server.maxclients) {
273 char *err = "-ERR max number of clients reached\r\n";
274
275 /* That's a best effort error message, don't check write errors */
276 if (write(c->fd,err,strlen(err)) == -1) {
277 /* Nothing to do, Just to avoid the warning... */
278 }
279 freeClient(c);
280 return;
281 }
282 server.stat_numconnections++;
283}
284
285static void freeClientArgv(redisClient *c) {
286 int j;
287
288 for (j = 0; j < c->argc; j++)
289 decrRefCount(c->argv[j]);
290 for (j = 0; j < c->mbargc; j++)
291 decrRefCount(c->mbargv[j]);
292 c->argc = 0;
293 c->mbargc = 0;
294}
295
296void freeClient(redisClient *c) {
297 listNode *ln;
298
299 /* Note that if the client we are freeing is blocked into a blocking
300 * call, we have to set querybuf to NULL *before* to call
301 * unblockClientWaitingData() to avoid processInputBuffer() will get
302 * called. Also it is important to remove the file events after
303 * this, because this call adds the READABLE event. */
304 sdsfree(c->querybuf);
305 c->querybuf = NULL;
306 if (c->flags & REDIS_BLOCKED)
307 unblockClientWaitingData(c);
308
309 /* UNWATCH all the keys */
310 unwatchAllKeys(c);
311 listRelease(c->watched_keys);
312 /* Unsubscribe from all the pubsub channels */
313 pubsubUnsubscribeAllChannels(c,0);
314 pubsubUnsubscribeAllPatterns(c,0);
315 dictRelease(c->pubsub_channels);
316 listRelease(c->pubsub_patterns);
317 /* Obvious cleanup */
318 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
319 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
320 listRelease(c->reply);
321 freeClientArgv(c);
322 close(c->fd);
323 /* Remove from the list of clients */
324 ln = listSearchKey(server.clients,c);
325 redisAssert(ln != NULL);
326 listDelNode(server.clients,ln);
1a71fb96 327 /* Remove from the list of clients waiting for swapped keys, or ready
328 * to be restarted, but not yet woken up again. */
329 if (c->flags & REDIS_IO_WAIT) {
330 redisAssert(server.vm_enabled);
331 if (listLength(c->io_keys) == 0) {
332 ln = listSearchKey(server.io_ready_clients,c);
333
334 /* When this client is waiting to be woken up (REDIS_IO_WAIT),
335 * it should be present in the list io_ready_clients */
336 redisAssert(ln != NULL);
e2641e09 337 listDelNode(server.io_ready_clients,ln);
1a71fb96 338 } else {
339 while (listLength(c->io_keys)) {
340 ln = listFirst(c->io_keys);
341 dontWaitForSwappedKey(c,ln->value);
342 }
e2641e09 343 }
1a71fb96 344 server.vm_blocked_clients--;
e2641e09 345 }
346 listRelease(c->io_keys);
778b2210 347 /* Master/slave cleanup.
348 * Case 1: we lost the connection with a slave. */
e2641e09 349 if (c->flags & REDIS_SLAVE) {
350 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
351 close(c->repldbfd);
352 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
353 ln = listSearchKey(l,c);
354 redisAssert(ln != NULL);
355 listDelNode(l,ln);
356 }
778b2210 357
358 /* Case 2: we lost the connection with the master. */
e2641e09 359 if (c->flags & REDIS_MASTER) {
360 server.master = NULL;
361 server.replstate = REDIS_REPL_CONNECT;
778b2210 362 /* Since we lost the connection with the master, we should also
363 * close the connection with all our slaves if we have any, so
364 * when we'll resync with the master the other slaves will sync again
365 * with us as well. Note that also when the slave is not connected
366 * to the master it will keep refusing connections by other slaves. */
367 while (listLength(server.slaves)) {
368 ln = listFirst(server.slaves);
369 freeClient((redisClient*)ln->value);
370 }
e2641e09 371 }
372 /* Release memory */
373 zfree(c->argv);
374 zfree(c->mbargv);
375 freeClientMultiState(c);
376 zfree(c);
377}
378
e2641e09 379void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
380 redisClient *c = privdata;
381 int nwritten = 0, totwritten = 0, objlen;
382 robj *o;
383 REDIS_NOTUSED(el);
384 REDIS_NOTUSED(mask);
385
386 /* Use writev() if we have enough buffers to send */
387 if (!server.glueoutputbuf &&
388 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
389 !(c->flags & REDIS_MASTER))
390 {
391 sendReplyToClientWritev(el, fd, privdata, mask);
392 return;
393 }
394
834ef78e
PN
395 while(c->bufpos > 0 || listLength(c->reply)) {
396 if (c->bufpos > 0) {
397 if (c->flags & REDIS_MASTER) {
398 /* Don't reply to a master */
399 nwritten = c->bufpos - c->sentlen;
400 } else {
401 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
402 if (nwritten <= 0) break;
403 }
404 c->sentlen += nwritten;
405 totwritten += nwritten;
406
407 /* If the buffer was sent, set bufpos to zero to continue with
408 * the remainder of the reply. */
409 if (c->sentlen == c->bufpos) {
410 c->bufpos = 0;
411 c->sentlen = 0;
412 }
413 } else {
414 o = listNodeValue(listFirst(c->reply));
415 objlen = sdslen(o->ptr);
e2641e09 416
834ef78e
PN
417 if (objlen == 0) {
418 listDelNode(c->reply,listFirst(c->reply));
419 continue;
420 }
e2641e09 421
834ef78e
PN
422 if (c->flags & REDIS_MASTER) {
423 /* Don't reply to a master */
424 nwritten = objlen - c->sentlen;
425 } else {
426 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
427 if (nwritten <= 0) break;
428 }
429 c->sentlen += nwritten;
430 totwritten += nwritten;
e2641e09 431
834ef78e
PN
432 /* If we fully sent the object on head go to the next one */
433 if (c->sentlen == objlen) {
434 listDelNode(c->reply,listFirst(c->reply));
435 c->sentlen = 0;
436 }
e2641e09 437 }
438 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
439 * bytes, in a single threaded server it's a good idea to serve
440 * other clients as well, even if a very large request comes from
441 * super fast link that is always able to accept data (in real world
442 * scenario think about 'KEYS *' against the loopback interfae) */
443 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
444 }
445 if (nwritten == -1) {
446 if (errno == EAGAIN) {
447 nwritten = 0;
448 } else {
449 redisLog(REDIS_VERBOSE,
450 "Error writing to client: %s", strerror(errno));
451 freeClient(c);
452 return;
453 }
454 }
455 if (totwritten > 0) c->lastinteraction = time(NULL);
456 if (listLength(c->reply) == 0) {
457 c->sentlen = 0;
458 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
459 }
460}
461
462void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
463{
464 redisClient *c = privdata;
465 int nwritten = 0, totwritten = 0, objlen, willwrite;
466 robj *o;
467 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
468 int offset, ion = 0;
469 REDIS_NOTUSED(el);
470 REDIS_NOTUSED(mask);
471
472 listNode *node;
473 while (listLength(c->reply)) {
474 offset = c->sentlen;
475 ion = 0;
476 willwrite = 0;
477
478 /* fill-in the iov[] array */
479 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
480 o = listNodeValue(node);
481 objlen = sdslen(o->ptr);
482
483 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
484 break;
485
486 if(ion == REDIS_WRITEV_IOVEC_COUNT)
487 break; /* no more iovecs */
488
489 iov[ion].iov_base = ((char*)o->ptr) + offset;
490 iov[ion].iov_len = objlen - offset;
491 willwrite += objlen - offset;
492 offset = 0; /* just for the first item */
493 ion++;
494 }
495
496 if(willwrite == 0)
497 break;
498
499 /* write all collected blocks at once */
500 if((nwritten = writev(fd, iov, ion)) < 0) {
501 if (errno != EAGAIN) {
502 redisLog(REDIS_VERBOSE,
503 "Error writing to client: %s", strerror(errno));
504 freeClient(c);
505 return;
506 }
507 break;
508 }
509
510 totwritten += nwritten;
511 offset = c->sentlen;
512
513 /* remove written robjs from c->reply */
514 while (nwritten && listLength(c->reply)) {
515 o = listNodeValue(listFirst(c->reply));
516 objlen = sdslen(o->ptr);
517
518 if(nwritten >= objlen - offset) {
519 listDelNode(c->reply, listFirst(c->reply));
520 nwritten -= objlen - offset;
521 c->sentlen = 0;
522 } else {
523 /* partial write */
524 c->sentlen += nwritten;
525 break;
526 }
527 offset = 0;
528 }
529 }
530
531 if (totwritten > 0)
532 c->lastinteraction = time(NULL);
533
534 if (listLength(c->reply) == 0) {
535 c->sentlen = 0;
536 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
537 }
538}
539
540/* resetClient prepare the client to process the next command */
541void resetClient(redisClient *c) {
542 freeClientArgv(c);
543 c->bulklen = -1;
544 c->multibulk = 0;
545}
546
547void closeTimedoutClients(void) {
548 redisClient *c;
549 listNode *ln;
550 time_t now = time(NULL);
551 listIter li;
552
553 listRewind(server.clients,&li);
554 while ((ln = listNext(&li)) != NULL) {
555 c = listNodeValue(ln);
556 if (server.maxidletime &&
557 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
558 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
e452436a 559 !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
e2641e09 560 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
561 listLength(c->pubsub_patterns) == 0 &&
562 (now - c->lastinteraction > server.maxidletime))
563 {
564 redisLog(REDIS_VERBOSE,"Closing idle client");
565 freeClient(c);
566 } else if (c->flags & REDIS_BLOCKED) {
567 if (c->blockingto != 0 && c->blockingto < now) {
568 addReply(c,shared.nullmultibulk);
569 unblockClientWaitingData(c);
570 }
571 }
572 }
573}
574
575void processInputBuffer(redisClient *c) {
576again:
577 /* Before to process the input buffer, make sure the client is not
578 * waitig for a blocking operation such as BLPOP. Note that the first
579 * iteration the client is never blocked, otherwise the processInputBuffer
580 * would not be called at all, but after the execution of the first commands
581 * in the input buffer the client may be blocked, and the "goto again"
582 * will try to reiterate. The following line will make it return asap. */
583 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
584 if (c->bulklen == -1) {
585 /* Read the first line of the query */
586 char *p = strchr(c->querybuf,'\n');
587 size_t querylen;
588
589 if (p) {
590 sds query, *argv;
591 int argc, j;
592
593 query = c->querybuf;
594 c->querybuf = sdsempty();
595 querylen = 1+(p-(query));
596 if (sdslen(query) > querylen) {
597 /* leave data after the first line of the query in the buffer */
598 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
599 }
600 *p = '\0'; /* remove "\n" */
601 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
602 sdsupdatelen(query);
603
604 /* Now we can split the query in arguments */
605 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
606 sdsfree(query);
607
608 if (c->argv) zfree(c->argv);
609 c->argv = zmalloc(sizeof(robj*)*argc);
610
611 for (j = 0; j < argc; j++) {
612 if (sdslen(argv[j])) {
613 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
614 c->argc++;
615 } else {
616 sdsfree(argv[j]);
617 }
618 }
619 zfree(argv);
620 if (c->argc) {
621 /* Execute the command. If the client is still valid
622 * after processCommand() return and there is something
623 * on the query buffer try to process the next command. */
624 if (processCommand(c) && sdslen(c->querybuf)) goto again;
625 } else {
626 /* Nothing to process, argc == 0. Just process the query
627 * buffer if it's not empty or return to the caller */
628 if (sdslen(c->querybuf)) goto again;
629 }
630 return;
631 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
632 redisLog(REDIS_VERBOSE, "Client protocol error");
633 freeClient(c);
634 return;
635 }
636 } else {
637 /* Bulk read handling. Note that if we are at this point
638 the client already sent a command terminated with a newline,
639 we are reading the bulk data that is actually the last
640 argument of the command. */
641 int qbl = sdslen(c->querybuf);
642
643 if (c->bulklen <= qbl) {
644 /* Copy everything but the final CRLF as final argument */
645 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
646 c->argc++;
647 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
648 /* Process the command. If the client is still valid after
649 * the processing and there is more data in the buffer
650 * try to parse it. */
651 if (processCommand(c) && sdslen(c->querybuf)) goto again;
652 return;
653 }
654 }
655}
656
657void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
658 redisClient *c = (redisClient*) privdata;
659 char buf[REDIS_IOBUF_LEN];
660 int nread;
661 REDIS_NOTUSED(el);
662 REDIS_NOTUSED(mask);
663
664 nread = read(fd, buf, REDIS_IOBUF_LEN);
665 if (nread == -1) {
666 if (errno == EAGAIN) {
667 nread = 0;
668 } else {
669 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
670 freeClient(c);
671 return;
672 }
673 } else if (nread == 0) {
674 redisLog(REDIS_VERBOSE, "Client closed connection");
675 freeClient(c);
676 return;
677 }
678 if (nread) {
679 c->querybuf = sdscatlen(c->querybuf, buf, nread);
680 c->lastinteraction = time(NULL);
681 } else {
682 return;
683 }
684 processInputBuffer(c);
685}