]> git.saurik.com Git - redis.git/blame - src/networking.c
Removed unneeded function
[redis.git] / src / networking.c
CommitLineData
e2641e09 1#include "redis.h"
e2641e09 2#include <sys/uio.h>
3
4void *dupClientReplyValue(void *o) {
5 incrRefCount((robj*)o);
6 return o;
7}
8
9int listMatchObjects(void *a, void *b) {
10 return equalStringObjects(a,b);
11}
12
13redisClient *createClient(int fd) {
834ef78e
PN
14 redisClient *c;
15
16 /* Make sure to allocate a multiple of the page size to prevent wasting
17 * memory. A page size of 4096 is assumed here. We need to compensate
18 * for the zmalloc overhead of sizeof(size_t) bytes. */
19 size_t size = 8192-sizeof(size_t);
20 redisAssert(size > sizeof(redisClient));
21 c = zmalloc(size);
22 c->buflen = size-sizeof(redisClient);
23 c->bufpos = 0;
e2641e09 24
25 anetNonBlock(NULL,fd);
26 anetTcpNoDelay(NULL,fd);
27 if (!c) return NULL;
28 selectDb(c,0);
29 c->fd = fd;
30 c->querybuf = sdsempty();
31 c->argc = 0;
32 c->argv = NULL;
33 c->bulklen = -1;
34 c->multibulk = 0;
35 c->mbargc = 0;
36 c->mbargv = NULL;
37 c->sentlen = 0;
38 c->flags = 0;
39 c->lastinteraction = time(NULL);
40 c->authenticated = 0;
41 c->replstate = REDIS_REPL_NONE;
42 c->reply = listCreate();
43 listSetFreeMethod(c->reply,decrRefCount);
44 listSetDupMethod(c->reply,dupClientReplyValue);
45 c->blocking_keys = NULL;
46 c->blocking_keys_num = 0;
47 c->io_keys = listCreate();
48 c->watched_keys = listCreate();
49 listSetFreeMethod(c->io_keys,decrRefCount);
50 c->pubsub_channels = dictCreate(&setDictType,NULL);
51 c->pubsub_patterns = listCreate();
52 listSetFreeMethod(c->pubsub_patterns,decrRefCount);
53 listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
54 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
55 readQueryFromClient, c) == AE_ERR) {
56 freeClient(c);
57 return NULL;
58 }
59 listAddNodeTail(server.clients,c);
60 initClientMultiState(c);
61 return c;
62}
63
834ef78e 64int _ensureFileEvent(redisClient *c) {
57b07380 65 if (c->fd <= 0) return REDIS_ERR;
834ef78e 66 if (c->bufpos == 0 && listLength(c->reply) == 0 &&
e2641e09 67 (c->replstate == REDIS_REPL_NONE ||
68 c->replstate == REDIS_REPL_ONLINE) &&
69 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
834ef78e
PN
70 sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
71 return REDIS_OK;
72}
73
74void _addReplyObjectToList(redisClient *c, robj *obj) {
75 redisAssert(obj->type == REDIS_STRING &&
76 obj->encoding == REDIS_ENCODING_RAW);
77 listAddNodeTail(c->reply,obj);
78}
79
80void _ensureBufferInReplyList(redisClient *c) {
81 sds buffer = sdsnewlen(NULL,REDIS_REPLY_CHUNK_SIZE);
82 sdsupdatelen(buffer); /* sdsnewlen expects non-empty string */
83 listAddNodeTail(c->reply,createObject(REDIS_REPLY_NODE,buffer));
84}
85
86void _addReplyStringToBuffer(redisClient *c, char *s, size_t len) {
87 size_t available = 0;
88 redisAssert(len < REDIS_REPLY_CHUNK_THRESHOLD);
89 if (listLength(c->reply) > 0) {
90 robj *o = listNodeValue(listLast(c->reply));
91
92 /* Make sure to append to a reply node with enough bytes available. */
93 if (o->type == REDIS_REPLY_NODE) available = sdsavail(o->ptr);
94 if (o->type != REDIS_REPLY_NODE || len > available) {
95 _ensureBufferInReplyList(c);
96 _addReplyStringToBuffer(c,s,len);
97 } else {
98 o->ptr = sdscatlen(o->ptr,s,len);
99 }
100 } else {
101 available = c->buflen-c->bufpos;
102 if (len > available) {
103 _ensureBufferInReplyList(c);
104 _addReplyStringToBuffer(c,s,len);
105 } else {
106 memcpy(c->buf+c->bufpos,s,len);
107 c->bufpos += len;
108 }
109 }
110}
e2641e09 111
834ef78e
PN
112void addReply(redisClient *c, robj *obj) {
113 if (_ensureFileEvent(c) != REDIS_OK) return;
e2641e09 114 if (server.vm_enabled && obj->storage != REDIS_VM_MEMORY) {
834ef78e 115 /* Returns a new object with refcount 1 */
e2641e09 116 obj = dupStringObject(obj);
834ef78e
PN
117 } else {
118 /* This increments the refcount. */
119 obj = getDecodedObject(obj);
120 }
121
122 if (sdslen(obj->ptr) < REDIS_REPLY_CHUNK_THRESHOLD) {
123 _addReplyStringToBuffer(c,obj->ptr,sdslen(obj->ptr));
124 decrRefCount(obj);
125 } else {
126 _addReplyObjectToList(c,obj);
e2641e09 127 }
e2641e09 128}
129
130void addReplySds(redisClient *c, sds s) {
cd76bb65
PN
131 if (_ensureFileEvent(c) != REDIS_OK) {
132 /* The caller expects the sds to be free'd. */
133 sdsfree(s);
134 return;
135 }
834ef78e
PN
136 if (sdslen(s) < REDIS_REPLY_CHUNK_THRESHOLD) {
137 _addReplyStringToBuffer(c,s,sdslen(s));
138 sdsfree(s);
139 } else {
140 _addReplyObjectToList(c,createObject(REDIS_STRING,s));
141 }
e2641e09 142}
143
834ef78e
PN
144void addReplyString(redisClient *c, char *s, size_t len) {
145 if (_ensureFileEvent(c) != REDIS_OK) return;
146 if (len < REDIS_REPLY_CHUNK_THRESHOLD) {
147 _addReplyStringToBuffer(c,s,len);
148 } else {
149 _addReplyObjectToList(c,createStringObject(s,len));
150 }
151}
e2641e09 152
b301c1fc
PN
153/* Adds an empty object to the reply list that will contain the multi bulk
154 * length, which is not known when this function is called. */
155void *addDeferredMultiBulkLength(redisClient *c) {
156 if (_ensureFileEvent(c) != REDIS_OK) return NULL;
157 _addReplyObjectToList(c,createObject(REDIS_STRING,NULL));
158 return listLast(c->reply);
159}
160
161/* Populate the length object and try glueing it to the next chunk. */
162void setDeferredMultiBulkLength(redisClient *c, void *node, long length) {
163 listNode *ln = (listNode*)node;
164 robj *len, *next;
165
166 /* Abort when *node is NULL (see addDeferredMultiBulkLength). */
167 if (node == NULL) return;
168
169 len = listNodeValue(ln);
170 len->ptr = sdscatprintf(sdsempty(),"*%ld\r\n",length);
171 if (ln->next != NULL) {
172 next = listNodeValue(ln->next);
173 /* Only glue when the next node is a reply chunk. */
174 if (next->type == REDIS_REPLY_NODE) {
175 len->ptr = sdscatlen(len->ptr,next->ptr,sdslen(next->ptr));
176 listDelNode(c->reply,ln->next);
177 }
178 }
179}
180
834ef78e
PN
181void addReplyDouble(redisClient *c, double d) {
182 char dbuf[128], sbuf[128];
183 int dlen, slen;
184 dlen = snprintf(dbuf,sizeof(dbuf),"%.17g",d);
185 slen = snprintf(sbuf,sizeof(sbuf),"$%d\r\n%s\r\n",dlen,dbuf);
186 addReplyString(c,sbuf,slen);
e2641e09 187}
188
834ef78e 189void _addReplyLongLong(redisClient *c, long long ll, char prefix) {
e2641e09 190 char buf[128];
834ef78e
PN
191 int len;
192 buf[0] = prefix;
e2641e09 193 len = ll2string(buf+1,sizeof(buf)-1,ll);
194 buf[len+1] = '\r';
195 buf[len+2] = '\n';
834ef78e 196 addReplyString(c,buf,len+3);
e2641e09 197}
198
834ef78e
PN
199void addReplyLongLong(redisClient *c, long long ll) {
200 _addReplyLongLong(c,ll,':');
201}
e2641e09 202
0537e7bf
PN
203void addReplyMultiBulkLen(redisClient *c, long length) {
204 _addReplyLongLong(c,length,'*');
205}
206
e2641e09 207void addReplyBulkLen(redisClient *c, robj *obj) {
834ef78e 208 size_t len;
e2641e09 209
210 if (obj->encoding == REDIS_ENCODING_RAW) {
211 len = sdslen(obj->ptr);
212 } else {
213 long n = (long)obj->ptr;
214
215 /* Compute how many bytes will take this integer as a radix 10 string */
216 len = 1;
217 if (n < 0) {
218 len++;
219 n = -n;
220 }
221 while((n = n/10) != 0) {
222 len++;
223 }
224 }
834ef78e 225 _addReplyLongLong(c,len,'$');
e2641e09 226}
227
228void addReplyBulk(redisClient *c, robj *obj) {
229 addReplyBulkLen(c,obj);
230 addReply(c,obj);
231 addReply(c,shared.crlf);
232}
233
234/* In the CONFIG command we need to add vanilla C string as bulk replies */
235void addReplyBulkCString(redisClient *c, char *s) {
236 if (s == NULL) {
237 addReply(c,shared.nullbulk);
238 } else {
239 robj *o = createStringObject(s,strlen(s));
240 addReplyBulk(c,o);
241 decrRefCount(o);
242 }
243}
244
245void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
246 int cport, cfd;
247 char cip[128];
248 redisClient *c;
249 REDIS_NOTUSED(el);
250 REDIS_NOTUSED(mask);
251 REDIS_NOTUSED(privdata);
252
253 cfd = anetAccept(server.neterr, fd, cip, &cport);
254 if (cfd == AE_ERR) {
255 redisLog(REDIS_VERBOSE,"Accepting client connection: %s", server.neterr);
256 return;
257 }
258 redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
259 if ((c = createClient(cfd)) == NULL) {
260 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
261 close(cfd); /* May be already closed, just ingore errors */
262 return;
263 }
264 /* If maxclient directive is set and this is one client more... close the
265 * connection. Note that we create the client instead to check before
266 * for this condition, since now the socket is already set in nonblocking
267 * mode and we can send an error for free using the Kernel I/O */
268 if (server.maxclients && listLength(server.clients) > server.maxclients) {
269 char *err = "-ERR max number of clients reached\r\n";
270
271 /* That's a best effort error message, don't check write errors */
272 if (write(c->fd,err,strlen(err)) == -1) {
273 /* Nothing to do, Just to avoid the warning... */
274 }
275 freeClient(c);
276 return;
277 }
278 server.stat_numconnections++;
279}
280
281static void freeClientArgv(redisClient *c) {
282 int j;
283
284 for (j = 0; j < c->argc; j++)
285 decrRefCount(c->argv[j]);
286 for (j = 0; j < c->mbargc; j++)
287 decrRefCount(c->mbargv[j]);
288 c->argc = 0;
289 c->mbargc = 0;
290}
291
292void freeClient(redisClient *c) {
293 listNode *ln;
294
295 /* Note that if the client we are freeing is blocked into a blocking
296 * call, we have to set querybuf to NULL *before* to call
297 * unblockClientWaitingData() to avoid processInputBuffer() will get
298 * called. Also it is important to remove the file events after
299 * this, because this call adds the READABLE event. */
300 sdsfree(c->querybuf);
301 c->querybuf = NULL;
302 if (c->flags & REDIS_BLOCKED)
303 unblockClientWaitingData(c);
304
305 /* UNWATCH all the keys */
306 unwatchAllKeys(c);
307 listRelease(c->watched_keys);
308 /* Unsubscribe from all the pubsub channels */
309 pubsubUnsubscribeAllChannels(c,0);
310 pubsubUnsubscribeAllPatterns(c,0);
311 dictRelease(c->pubsub_channels);
312 listRelease(c->pubsub_patterns);
313 /* Obvious cleanup */
314 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
315 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
316 listRelease(c->reply);
317 freeClientArgv(c);
318 close(c->fd);
319 /* Remove from the list of clients */
320 ln = listSearchKey(server.clients,c);
321 redisAssert(ln != NULL);
322 listDelNode(server.clients,ln);
1a71fb96 323 /* Remove from the list of clients waiting for swapped keys, or ready
324 * to be restarted, but not yet woken up again. */
325 if (c->flags & REDIS_IO_WAIT) {
326 redisAssert(server.vm_enabled);
327 if (listLength(c->io_keys) == 0) {
328 ln = listSearchKey(server.io_ready_clients,c);
329
330 /* When this client is waiting to be woken up (REDIS_IO_WAIT),
331 * it should be present in the list io_ready_clients */
332 redisAssert(ln != NULL);
e2641e09 333 listDelNode(server.io_ready_clients,ln);
1a71fb96 334 } else {
335 while (listLength(c->io_keys)) {
336 ln = listFirst(c->io_keys);
337 dontWaitForSwappedKey(c,ln->value);
338 }
e2641e09 339 }
1a71fb96 340 server.vm_blocked_clients--;
e2641e09 341 }
342 listRelease(c->io_keys);
778b2210 343 /* Master/slave cleanup.
344 * Case 1: we lost the connection with a slave. */
e2641e09 345 if (c->flags & REDIS_SLAVE) {
346 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
347 close(c->repldbfd);
348 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
349 ln = listSearchKey(l,c);
350 redisAssert(ln != NULL);
351 listDelNode(l,ln);
352 }
778b2210 353
354 /* Case 2: we lost the connection with the master. */
e2641e09 355 if (c->flags & REDIS_MASTER) {
356 server.master = NULL;
357 server.replstate = REDIS_REPL_CONNECT;
778b2210 358 /* Since we lost the connection with the master, we should also
359 * close the connection with all our slaves if we have any, so
360 * when we'll resync with the master the other slaves will sync again
361 * with us as well. Note that also when the slave is not connected
362 * to the master it will keep refusing connections by other slaves. */
363 while (listLength(server.slaves)) {
364 ln = listFirst(server.slaves);
365 freeClient((redisClient*)ln->value);
366 }
e2641e09 367 }
368 /* Release memory */
369 zfree(c->argv);
370 zfree(c->mbargv);
371 freeClientMultiState(c);
372 zfree(c);
373}
374
e2641e09 375void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
376 redisClient *c = privdata;
377 int nwritten = 0, totwritten = 0, objlen;
378 robj *o;
379 REDIS_NOTUSED(el);
380 REDIS_NOTUSED(mask);
381
382 /* Use writev() if we have enough buffers to send */
383 if (!server.glueoutputbuf &&
384 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
385 !(c->flags & REDIS_MASTER))
386 {
387 sendReplyToClientWritev(el, fd, privdata, mask);
388 return;
389 }
390
834ef78e
PN
391 while(c->bufpos > 0 || listLength(c->reply)) {
392 if (c->bufpos > 0) {
393 if (c->flags & REDIS_MASTER) {
394 /* Don't reply to a master */
395 nwritten = c->bufpos - c->sentlen;
396 } else {
397 nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
398 if (nwritten <= 0) break;
399 }
400 c->sentlen += nwritten;
401 totwritten += nwritten;
402
403 /* If the buffer was sent, set bufpos to zero to continue with
404 * the remainder of the reply. */
405 if (c->sentlen == c->bufpos) {
406 c->bufpos = 0;
407 c->sentlen = 0;
408 }
409 } else {
410 o = listNodeValue(listFirst(c->reply));
411 objlen = sdslen(o->ptr);
e2641e09 412
834ef78e
PN
413 if (objlen == 0) {
414 listDelNode(c->reply,listFirst(c->reply));
415 continue;
416 }
e2641e09 417
834ef78e
PN
418 if (c->flags & REDIS_MASTER) {
419 /* Don't reply to a master */
420 nwritten = objlen - c->sentlen;
421 } else {
422 nwritten = write(fd, ((char*)o->ptr)+c->sentlen,objlen-c->sentlen);
423 if (nwritten <= 0) break;
424 }
425 c->sentlen += nwritten;
426 totwritten += nwritten;
e2641e09 427
834ef78e
PN
428 /* If we fully sent the object on head go to the next one */
429 if (c->sentlen == objlen) {
430 listDelNode(c->reply,listFirst(c->reply));
431 c->sentlen = 0;
432 }
e2641e09 433 }
434 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
435 * bytes, in a single threaded server it's a good idea to serve
436 * other clients as well, even if a very large request comes from
437 * super fast link that is always able to accept data (in real world
438 * scenario think about 'KEYS *' against the loopback interfae) */
439 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
440 }
441 if (nwritten == -1) {
442 if (errno == EAGAIN) {
443 nwritten = 0;
444 } else {
445 redisLog(REDIS_VERBOSE,
446 "Error writing to client: %s", strerror(errno));
447 freeClient(c);
448 return;
449 }
450 }
451 if (totwritten > 0) c->lastinteraction = time(NULL);
452 if (listLength(c->reply) == 0) {
453 c->sentlen = 0;
454 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
455 }
456}
457
458void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
459{
460 redisClient *c = privdata;
461 int nwritten = 0, totwritten = 0, objlen, willwrite;
462 robj *o;
463 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
464 int offset, ion = 0;
465 REDIS_NOTUSED(el);
466 REDIS_NOTUSED(mask);
467
468 listNode *node;
469 while (listLength(c->reply)) {
470 offset = c->sentlen;
471 ion = 0;
472 willwrite = 0;
473
474 /* fill-in the iov[] array */
475 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
476 o = listNodeValue(node);
477 objlen = sdslen(o->ptr);
478
479 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
480 break;
481
482 if(ion == REDIS_WRITEV_IOVEC_COUNT)
483 break; /* no more iovecs */
484
485 iov[ion].iov_base = ((char*)o->ptr) + offset;
486 iov[ion].iov_len = objlen - offset;
487 willwrite += objlen - offset;
488 offset = 0; /* just for the first item */
489 ion++;
490 }
491
492 if(willwrite == 0)
493 break;
494
495 /* write all collected blocks at once */
496 if((nwritten = writev(fd, iov, ion)) < 0) {
497 if (errno != EAGAIN) {
498 redisLog(REDIS_VERBOSE,
499 "Error writing to client: %s", strerror(errno));
500 freeClient(c);
501 return;
502 }
503 break;
504 }
505
506 totwritten += nwritten;
507 offset = c->sentlen;
508
509 /* remove written robjs from c->reply */
510 while (nwritten && listLength(c->reply)) {
511 o = listNodeValue(listFirst(c->reply));
512 objlen = sdslen(o->ptr);
513
514 if(nwritten >= objlen - offset) {
515 listDelNode(c->reply, listFirst(c->reply));
516 nwritten -= objlen - offset;
517 c->sentlen = 0;
518 } else {
519 /* partial write */
520 c->sentlen += nwritten;
521 break;
522 }
523 offset = 0;
524 }
525 }
526
527 if (totwritten > 0)
528 c->lastinteraction = time(NULL);
529
530 if (listLength(c->reply) == 0) {
531 c->sentlen = 0;
532 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
533 }
534}
535
536/* resetClient prepare the client to process the next command */
537void resetClient(redisClient *c) {
538 freeClientArgv(c);
539 c->bulklen = -1;
540 c->multibulk = 0;
541}
542
543void closeTimedoutClients(void) {
544 redisClient *c;
545 listNode *ln;
546 time_t now = time(NULL);
547 listIter li;
548
549 listRewind(server.clients,&li);
550 while ((ln = listNext(&li)) != NULL) {
551 c = listNodeValue(ln);
552 if (server.maxidletime &&
553 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
554 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
e452436a 555 !(c->flags & REDIS_BLOCKED) && /* no timeout for BLPOP */
e2641e09 556 dictSize(c->pubsub_channels) == 0 && /* no timeout for pubsub */
557 listLength(c->pubsub_patterns) == 0 &&
558 (now - c->lastinteraction > server.maxidletime))
559 {
560 redisLog(REDIS_VERBOSE,"Closing idle client");
561 freeClient(c);
562 } else if (c->flags & REDIS_BLOCKED) {
563 if (c->blockingto != 0 && c->blockingto < now) {
564 addReply(c,shared.nullmultibulk);
565 unblockClientWaitingData(c);
566 }
567 }
568 }
569}
570
571void processInputBuffer(redisClient *c) {
572again:
573 /* Before to process the input buffer, make sure the client is not
574 * waitig for a blocking operation such as BLPOP. Note that the first
575 * iteration the client is never blocked, otherwise the processInputBuffer
576 * would not be called at all, but after the execution of the first commands
577 * in the input buffer the client may be blocked, and the "goto again"
578 * will try to reiterate. The following line will make it return asap. */
579 if (c->flags & REDIS_BLOCKED || c->flags & REDIS_IO_WAIT) return;
580 if (c->bulklen == -1) {
581 /* Read the first line of the query */
582 char *p = strchr(c->querybuf,'\n');
583 size_t querylen;
584
585 if (p) {
586 sds query, *argv;
587 int argc, j;
588
589 query = c->querybuf;
590 c->querybuf = sdsempty();
591 querylen = 1+(p-(query));
592 if (sdslen(query) > querylen) {
593 /* leave data after the first line of the query in the buffer */
594 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
595 }
596 *p = '\0'; /* remove "\n" */
597 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
598 sdsupdatelen(query);
599
600 /* Now we can split the query in arguments */
601 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
602 sdsfree(query);
603
604 if (c->argv) zfree(c->argv);
605 c->argv = zmalloc(sizeof(robj*)*argc);
606
607 for (j = 0; j < argc; j++) {
608 if (sdslen(argv[j])) {
609 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
610 c->argc++;
611 } else {
612 sdsfree(argv[j]);
613 }
614 }
615 zfree(argv);
616 if (c->argc) {
617 /* Execute the command. If the client is still valid
618 * after processCommand() return and there is something
619 * on the query buffer try to process the next command. */
620 if (processCommand(c) && sdslen(c->querybuf)) goto again;
621 } else {
622 /* Nothing to process, argc == 0. Just process the query
623 * buffer if it's not empty or return to the caller */
624 if (sdslen(c->querybuf)) goto again;
625 }
626 return;
627 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
628 redisLog(REDIS_VERBOSE, "Client protocol error");
629 freeClient(c);
630 return;
631 }
632 } else {
633 /* Bulk read handling. Note that if we are at this point
634 the client already sent a command terminated with a newline,
635 we are reading the bulk data that is actually the last
636 argument of the command. */
637 int qbl = sdslen(c->querybuf);
638
639 if (c->bulklen <= qbl) {
640 /* Copy everything but the final CRLF as final argument */
641 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
642 c->argc++;
643 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
644 /* Process the command. If the client is still valid after
645 * the processing and there is more data in the buffer
646 * try to parse it. */
647 if (processCommand(c) && sdslen(c->querybuf)) goto again;
648 return;
649 }
650 }
651}
652
653void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
654 redisClient *c = (redisClient*) privdata;
655 char buf[REDIS_IOBUF_LEN];
656 int nread;
657 REDIS_NOTUSED(el);
658 REDIS_NOTUSED(mask);
659
660 nread = read(fd, buf, REDIS_IOBUF_LEN);
661 if (nread == -1) {
662 if (errno == EAGAIN) {
663 nread = 0;
664 } else {
665 redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
666 freeClient(c);
667 return;
668 }
669 } else if (nread == 0) {
670 redisLog(REDIS_VERBOSE, "Client closed connection");
671 freeClient(c);
672 return;
673 }
674 if (nread) {
675 c->querybuf = sdscatlen(c->querybuf, buf, nread);
676 c->lastinteraction = time(NULL);
677 } else {
678 return;
679 }
680 processInputBuffer(c);
681}