]> git.saurik.com Git - redis.git/blame_incremental - deps/hiredis/hiredis.c
Don't execute commands for clients when they are unblocked
[redis.git] / deps / hiredis / hiredis.c
... / ...
CommitLineData
1/*
2 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include <string.h>
31#include <stdlib.h>
32#include <unistd.h>
33#include <assert.h>
34#include <errno.h>
35
36#include "hiredis.h"
37#include "net.h"
38#include "sds.h"
39#include "util.h"
40
41typedef struct redisReader {
42 struct redisReplyObjectFunctions *fn;
43 sds error; /* holds optional error */
44 void *reply; /* holds temporary reply */
45
46 sds buf; /* read buffer */
47 unsigned int pos; /* buffer cursor */
48
49 redisReadTask rstack[3]; /* stack of read tasks */
50 int ridx; /* index of stack */
51} redisReader;
52
53static redisReply *createReplyObject(int type);
54static void *createStringObject(const redisReadTask *task, char *str, size_t len);
55static void *createArrayObject(const redisReadTask *task, int elements);
56static void *createIntegerObject(const redisReadTask *task, long long value);
57static void *createNilObject(const redisReadTask *task);
58static void redisSetReplyReaderError(redisReader *r, sds err);
59
60/* Default set of functions to build the reply. */
61static redisReplyObjectFunctions defaultFunctions = {
62 createStringObject,
63 createArrayObject,
64 createIntegerObject,
65 createNilObject,
66 freeReplyObject
67};
68
69/* Create a reply object */
70static redisReply *createReplyObject(int type) {
71 redisReply *r = calloc(sizeof(*r),1);
72
73 if (!r) redisOOM();
74 r->type = type;
75 return r;
76}
77
78/* Free a reply object */
79void freeReplyObject(void *reply) {
80 redisReply *r = reply;
81 size_t j;
82
83 switch(r->type) {
84 case REDIS_REPLY_INTEGER:
85 break; /* Nothing to free */
86 case REDIS_REPLY_ARRAY:
87 for (j = 0; j < r->elements; j++)
88 if (r->element[j]) freeReplyObject(r->element[j]);
89 free(r->element);
90 break;
91 default:
92 if (r->str != NULL)
93 free(r->str);
94 break;
95 }
96 free(r);
97}
98
99static void *createStringObject(const redisReadTask *task, char *str, size_t len) {
100 redisReply *r = createReplyObject(task->type);
101 char *value = malloc(len+1);
102 if (!value) redisOOM();
103 assert(task->type == REDIS_REPLY_ERROR ||
104 task->type == REDIS_REPLY_STATUS ||
105 task->type == REDIS_REPLY_STRING);
106
107 /* Copy string value */
108 memcpy(value,str,len);
109 value[len] = '\0';
110 r->str = value;
111 r->len = len;
112
113 if (task->parent) {
114 redisReply *parent = task->parent;
115 assert(parent->type == REDIS_REPLY_ARRAY);
116 parent->element[task->idx] = r;
117 }
118 return r;
119}
120
121static void *createArrayObject(const redisReadTask *task, int elements) {
122 redisReply *r = createReplyObject(REDIS_REPLY_ARRAY);
123 r->elements = elements;
124 if ((r->element = calloc(sizeof(redisReply*),elements)) == NULL)
125 redisOOM();
126 if (task->parent) {
127 redisReply *parent = task->parent;
128 assert(parent->type == REDIS_REPLY_ARRAY);
129 parent->element[task->idx] = r;
130 }
131 return r;
132}
133
134static void *createIntegerObject(const redisReadTask *task, long long value) {
135 redisReply *r = createReplyObject(REDIS_REPLY_INTEGER);
136 r->integer = value;
137 if (task->parent) {
138 redisReply *parent = task->parent;
139 assert(parent->type == REDIS_REPLY_ARRAY);
140 parent->element[task->idx] = r;
141 }
142 return r;
143}
144
145static void *createNilObject(const redisReadTask *task) {
146 redisReply *r = createReplyObject(REDIS_REPLY_NIL);
147 if (task->parent) {
148 redisReply *parent = task->parent;
149 assert(parent->type == REDIS_REPLY_ARRAY);
150 parent->element[task->idx] = r;
151 }
152 return r;
153}
154
155static char *readBytes(redisReader *r, unsigned int bytes) {
156 char *p;
157 if (sdslen(r->buf)-r->pos >= bytes) {
158 p = r->buf+r->pos;
159 r->pos += bytes;
160 return p;
161 }
162 return NULL;
163}
164
165static char *seekNewline(char *s) {
166 /* Find pointer to \r\n without strstr */
167 while (s != NULL) {
168 s = strchr(s,'\r');
169 if (s != NULL) {
170 if (s[1] == '\n')
171 break;
172 else
173 s++;
174 } else {
175 break;
176 }
177 }
178 return s;
179}
180
181static char *readLine(redisReader *r, int *_len) {
182 char *p, *s;
183 int len;
184
185 p = r->buf+r->pos;
186 s = seekNewline(p);
187 if (s != NULL) {
188 len = s-(r->buf+r->pos);
189 r->pos += len+2; /* skip \r\n */
190 if (_len) *_len = len;
191 return p;
192 }
193 return NULL;
194}
195
196static void moveToNextTask(redisReader *r) {
197 redisReadTask *cur, *prv;
198 while (r->ridx >= 0) {
199 /* Return a.s.a.p. when the stack is now empty. */
200 if (r->ridx == 0) {
201 r->ridx--;
202 return;
203 }
204
205 cur = &(r->rstack[r->ridx]);
206 prv = &(r->rstack[r->ridx-1]);
207 assert(prv->type == REDIS_REPLY_ARRAY);
208 if (cur->idx == prv->elements-1) {
209 r->ridx--;
210 } else {
211 /* Reset the type because the next item can be anything */
212 assert(cur->idx < prv->elements);
213 cur->type = -1;
214 cur->elements = -1;
215 cur->idx++;
216 return;
217 }
218 }
219}
220
221static int processLineItem(redisReader *r) {
222 redisReadTask *cur = &(r->rstack[r->ridx]);
223 void *obj;
224 char *p;
225 int len;
226
227 if ((p = readLine(r,&len)) != NULL) {
228 if (r->fn) {
229 if (cur->type == REDIS_REPLY_INTEGER) {
230 obj = r->fn->createInteger(cur,strtoll(p,NULL,10));
231 } else {
232 obj = r->fn->createString(cur,p,len);
233 }
234 } else {
235 obj = (void*)(size_t)(cur->type);
236 }
237
238 /* If there is no root yet, register this object as root. */
239 if (r->reply == NULL)
240 r->reply = obj;
241 moveToNextTask(r);
242 return 0;
243 }
244 return -1;
245}
246
247static int processBulkItem(redisReader *r) {
248 redisReadTask *cur = &(r->rstack[r->ridx]);
249 void *obj = NULL;
250 char *p, *s;
251 long len;
252 unsigned long bytelen;
253
254 p = r->buf+r->pos;
255 s = seekNewline(p);
256 if (s != NULL) {
257 p = r->buf+r->pos;
258 bytelen = s-(r->buf+r->pos)+2; /* include \r\n */
259 len = strtol(p,NULL,10);
260
261 if (len < 0) {
262 /* The nil object can always be created. */
263 obj = r->fn ? r->fn->createNil(cur) :
264 (void*)REDIS_REPLY_NIL;
265 } else {
266 /* Only continue when the buffer contains the entire bulk item. */
267 bytelen += len+2; /* include \r\n */
268 if (r->pos+bytelen <= sdslen(r->buf)) {
269 obj = r->fn ? r->fn->createString(cur,s+2,len) :
270 (void*)REDIS_REPLY_STRING;
271 }
272 }
273
274 /* Proceed when obj was created. */
275 if (obj != NULL) {
276 r->pos += bytelen;
277 if (r->reply == NULL)
278 r->reply = obj;
279 moveToNextTask(r);
280 return 0;
281 }
282 }
283 return -1;
284}
285
286static int processMultiBulkItem(redisReader *r) {
287 redisReadTask *cur = &(r->rstack[r->ridx]);
288 void *obj;
289 char *p;
290 long elements;
291
292 if ((p = readLine(r,NULL)) != NULL) {
293 elements = strtol(p,NULL,10);
294 if (elements == -1) {
295 obj = r->fn ? r->fn->createNil(cur) :
296 (void*)REDIS_REPLY_NIL;
297 moveToNextTask(r);
298 } else {
299 obj = r->fn ? r->fn->createArray(cur,elements) :
300 (void*)REDIS_REPLY_ARRAY;
301
302 /* Modify task stack when there are more than 0 elements. */
303 if (elements > 0) {
304 cur->elements = elements;
305 r->ridx++;
306 r->rstack[r->ridx].type = -1;
307 r->rstack[r->ridx].elements = -1;
308 r->rstack[r->ridx].parent = obj;
309 r->rstack[r->ridx].idx = 0;
310 } else {
311 moveToNextTask(r);
312 }
313 }
314
315 /* Object was created, so we can always continue. */
316 if (r->reply == NULL)
317 r->reply = obj;
318 return 0;
319 }
320 return -1;
321}
322
323static int processItem(redisReader *r) {
324 redisReadTask *cur = &(r->rstack[r->ridx]);
325 char *p;
326 sds byte;
327
328 /* check if we need to read type */
329 if (cur->type < 0) {
330 if ((p = readBytes(r,1)) != NULL) {
331 switch (p[0]) {
332 case '-':
333 cur->type = REDIS_REPLY_ERROR;
334 break;
335 case '+':
336 cur->type = REDIS_REPLY_STATUS;
337 break;
338 case ':':
339 cur->type = REDIS_REPLY_INTEGER;
340 break;
341 case '$':
342 cur->type = REDIS_REPLY_STRING;
343 break;
344 case '*':
345 cur->type = REDIS_REPLY_ARRAY;
346 break;
347 default:
348 byte = sdscatrepr(sdsempty(),p,1);
349 redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
350 "protocol error, got %s as reply type byte", byte));
351 sdsfree(byte);
352 return -1;
353 }
354 } else {
355 /* could not consume 1 byte */
356 return -1;
357 }
358 }
359
360 /* process typed item */
361 switch(cur->type) {
362 case REDIS_REPLY_ERROR:
363 case REDIS_REPLY_STATUS:
364 case REDIS_REPLY_INTEGER:
365 return processLineItem(r);
366 case REDIS_REPLY_STRING:
367 return processBulkItem(r);
368 case REDIS_REPLY_ARRAY:
369 return processMultiBulkItem(r);
370 default:
371 redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
372 "unknown item type '%d'", cur->type));
373 return -1;
374 }
375}
376
377void *redisReplyReaderCreate() {
378 redisReader *r = calloc(sizeof(redisReader),1);
379 r->error = NULL;
380 r->fn = &defaultFunctions;
381 r->buf = sdsempty();
382 r->ridx = -1;
383 return r;
384}
385
386/* Set the function set to build the reply. Returns REDIS_OK when there
387 * is no temporary object and it can be set, REDIS_ERR otherwise. */
388int redisReplyReaderSetReplyObjectFunctions(void *reader, redisReplyObjectFunctions *fn) {
389 redisReader *r = reader;
390 if (r->reply == NULL) {
391 r->fn = fn;
392 return REDIS_OK;
393 }
394 return REDIS_ERR;
395}
396
397/* External libraries wrapping hiredis might need access to the temporary
398 * variable while the reply is built up. When the reader contains an
399 * object in between receiving some bytes to parse, this object might
400 * otherwise be free'd by garbage collection. */
401void *redisReplyReaderGetObject(void *reader) {
402 redisReader *r = reader;
403 return r->reply;
404}
405
406void redisReplyReaderFree(void *reader) {
407 redisReader *r = reader;
408 if (r->error != NULL)
409 sdsfree(r->error);
410 if (r->reply != NULL && r->fn)
411 r->fn->freeObject(r->reply);
412 if (r->buf != NULL)
413 sdsfree(r->buf);
414 free(r);
415}
416
417static void redisSetReplyReaderError(redisReader *r, sds err) {
418 if (r->reply != NULL)
419 r->fn->freeObject(r->reply);
420
421 /* Clear remaining buffer when we see a protocol error. */
422 if (r->buf != NULL) {
423 sdsfree(r->buf);
424 r->buf = sdsempty();
425 r->pos = 0;
426 }
427 r->ridx = -1;
428 r->error = err;
429}
430
431char *redisReplyReaderGetError(void *reader) {
432 redisReader *r = reader;
433 return r->error;
434}
435
436void redisReplyReaderFeed(void *reader, char *buf, size_t len) {
437 redisReader *r = reader;
438
439 /* Copy the provided buffer. */
440 if (buf != NULL && len >= 1)
441 r->buf = sdscatlen(r->buf,buf,len);
442}
443
444int redisReplyReaderGetReply(void *reader, void **reply) {
445 redisReader *r = reader;
446 if (reply != NULL) *reply = NULL;
447
448 /* When the buffer is empty, there will never be a reply. */
449 if (sdslen(r->buf) == 0)
450 return REDIS_OK;
451
452 /* Set first item to process when the stack is empty. */
453 if (r->ridx == -1) {
454 r->rstack[0].type = -1;
455 r->rstack[0].elements = -1;
456 r->rstack[0].parent = NULL;
457 r->rstack[0].idx = -1;
458 r->ridx = 0;
459 }
460
461 /* Process items in reply. */
462 while (r->ridx >= 0)
463 if (processItem(r) < 0)
464 break;
465
466 /* Discard the consumed part of the buffer. */
467 if (r->pos > 0) {
468 if (r->pos == sdslen(r->buf)) {
469 /* sdsrange has a quirck on this edge case. */
470 sdsfree(r->buf);
471 r->buf = sdsempty();
472 } else {
473 r->buf = sdsrange(r->buf,r->pos,sdslen(r->buf));
474 }
475 r->pos = 0;
476 }
477
478 /* Emit a reply when there is one. */
479 if (r->ridx == -1) {
480 void *aux = r->reply;
481 r->reply = NULL;
482
483 /* Destroy the buffer when it is empty and is quite large. */
484 if (sdslen(r->buf) == 0 && sdsavail(r->buf) > 16*1024) {
485 sdsfree(r->buf);
486 r->buf = sdsempty();
487 r->pos = 0;
488 }
489
490 /* Check if there actually *is* a reply. */
491 if (r->error != NULL) {
492 return REDIS_ERR;
493 } else {
494 if (reply != NULL) *reply = aux;
495 }
496 }
497 return REDIS_OK;
498}
499
500/* Calculate the number of bytes needed to represent an integer as string. */
501static int intlen(int i) {
502 int len = 0;
503 if (i < 0) {
504 len++;
505 i = -i;
506 }
507 do {
508 len++;
509 i /= 10;
510 } while(i);
511 return len;
512}
513
514/* Helper function for redisvFormatCommand(). */
515static void addArgument(sds a, char ***argv, int *argc, int *totlen) {
516 (*argc)++;
517 if ((*argv = realloc(*argv, sizeof(char*)*(*argc))) == NULL) redisOOM();
518 if (totlen) *totlen = *totlen+1+intlen(sdslen(a))+2+sdslen(a)+2;
519 (*argv)[(*argc)-1] = a;
520}
521
522int redisvFormatCommand(char **target, const char *format, va_list ap) {
523 size_t size;
524 const char *arg, *c = format;
525 char *cmd = NULL; /* final command */
526 int pos; /* position in final command */
527 sds current; /* current argument */
528 char **argv = NULL;
529 int argc = 0, j;
530 int totlen = 0;
531
532 /* Abort if there is not target to set */
533 if (target == NULL)
534 return -1;
535
536 /* Build the command string accordingly to protocol */
537 current = sdsempty();
538 while(*c != '\0') {
539 if (*c != '%' || c[1] == '\0') {
540 if (*c == ' ') {
541 if (sdslen(current) != 0) {
542 addArgument(current, &argv, &argc, &totlen);
543 current = sdsempty();
544 }
545 } else {
546 current = sdscatlen(current,c,1);
547 }
548 } else {
549 switch(c[1]) {
550 case 's':
551 arg = va_arg(ap,char*);
552 current = sdscat(current,arg);
553 break;
554 case 'b':
555 arg = va_arg(ap,char*);
556 size = va_arg(ap,size_t);
557 current = sdscatlen(current,arg,size);
558 break;
559 case '%':
560 cmd = sdscat(cmd,"%");
561 break;
562 }
563 c++;
564 }
565 c++;
566 }
567
568 /* Add the last argument if needed */
569 if (sdslen(current) != 0) {
570 addArgument(current, &argv, &argc, &totlen);
571 } else {
572 sdsfree(current);
573 }
574
575 /* Add bytes needed to hold multi bulk count */
576 totlen += 1+intlen(argc)+2;
577
578 /* Build the command at protocol level */
579 cmd = malloc(totlen+1);
580 if (!cmd) redisOOM();
581 pos = sprintf(cmd,"*%d\r\n",argc);
582 for (j = 0; j < argc; j++) {
583 pos += sprintf(cmd+pos,"$%zu\r\n",sdslen(argv[j]));
584 memcpy(cmd+pos,argv[j],sdslen(argv[j]));
585 pos += sdslen(argv[j]);
586 sdsfree(argv[j]);
587 cmd[pos++] = '\r';
588 cmd[pos++] = '\n';
589 }
590 assert(pos == totlen);
591 free(argv);
592 cmd[totlen] = '\0';
593 *target = cmd;
594 return totlen;
595}
596
597/* Format a command according to the Redis protocol. This function
598 * takes a format similar to printf:
599 *
600 * %s represents a C null terminated string you want to interpolate
601 * %b represents a binary safe string
602 *
603 * When using %b you need to provide both the pointer to the string
604 * and the length in bytes. Examples:
605 *
606 * len = redisFormatCommand(target, "GET %s", mykey);
607 * len = redisFormatCommand(target, "SET %s %b", mykey, myval, myvallen);
608 */
609int redisFormatCommand(char **target, const char *format, ...) {
610 va_list ap;
611 int len;
612 va_start(ap,format);
613 len = redisvFormatCommand(target,format,ap);
614 va_end(ap);
615 return len;
616}
617
618/* Format a command according to the Redis protocol. This function takes the
619 * number of arguments, an array with arguments and an array with their
620 * lengths. If the latter is set to NULL, strlen will be used to compute the
621 * argument lengths.
622 */
623int redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen) {
624 char *cmd = NULL; /* final command */
625 int pos; /* position in final command */
626 size_t len;
627 int totlen, j;
628
629 /* Calculate number of bytes needed for the command */
630 totlen = 1+intlen(argc)+2;
631 for (j = 0; j < argc; j++) {
632 len = argvlen ? argvlen[j] : strlen(argv[j]);
633 totlen += 1+intlen(len)+2+len+2;
634 }
635
636 /* Build the command at protocol level */
637 cmd = malloc(totlen+1);
638 if (!cmd) redisOOM();
639 pos = sprintf(cmd,"*%d\r\n",argc);
640 for (j = 0; j < argc; j++) {
641 len = argvlen ? argvlen[j] : strlen(argv[j]);
642 pos += sprintf(cmd+pos,"$%zu\r\n",len);
643 memcpy(cmd+pos,argv[j],len);
644 pos += len;
645 cmd[pos++] = '\r';
646 cmd[pos++] = '\n';
647 }
648 assert(pos == totlen);
649 cmd[totlen] = '\0';
650 *target = cmd;
651 return totlen;
652}
653
654void __redisSetError(redisContext *c, int type, const sds errstr) {
655 c->err = type;
656 if (errstr != NULL) {
657 c->errstr = errstr;
658 } else {
659 /* Only REDIS_ERR_IO may lack a description! */
660 assert(type == REDIS_ERR_IO);
661 c->errstr = sdsnew(strerror(errno));
662 }
663}
664
665static redisContext *redisContextInit() {
666 redisContext *c = calloc(sizeof(redisContext),1);
667 c->fd = -1; /* quick fix for a bug that should be addressed differently */
668 c->err = 0;
669 c->errstr = NULL;
670 c->obuf = sdsempty();
671 c->fn = &defaultFunctions;
672 c->reader = NULL;
673 return c;
674}
675
676void redisFree(redisContext *c) {
677 /* Disconnect before free'ing if not yet disconnected. */
678 if (c->flags & REDIS_CONNECTED)
679 close(c->fd);
680 if (c->errstr != NULL)
681 sdsfree(c->errstr);
682 if (c->obuf != NULL)
683 sdsfree(c->obuf);
684 if (c->reader != NULL)
685 redisReplyReaderFree(c->reader);
686 free(c);
687}
688
689/* Connect to a Redis instance. On error the field error in the returned
690 * context will be set to the return value of the error function.
691 * When no set of reply functions is given, the default set will be used. */
692redisContext *redisConnect(const char *ip, int port) {
693 redisContext *c = redisContextInit();
694 c->flags |= REDIS_BLOCK;
695 c->flags |= REDIS_CONNECTED;
696 redisContextConnectTcp(c,ip,port);
697 return c;
698}
699
700redisContext *redisConnectNonBlock(const char *ip, int port) {
701 redisContext *c = redisContextInit();
702 c->flags &= ~REDIS_BLOCK;
703 c->flags |= REDIS_CONNECTED;
704 redisContextConnectTcp(c,ip,port);
705 return c;
706}
707
708redisContext *redisConnectUnix(const char *path) {
709 redisContext *c = redisContextInit();
710 c->flags |= REDIS_BLOCK;
711 c->flags |= REDIS_CONNECTED;
712 redisContextConnectUnix(c,path);
713 return c;
714}
715
716redisContext *redisConnectUnixNonBlock(const char *path) {
717 redisContext *c = redisContextInit();
718 c->flags &= ~REDIS_BLOCK;
719 c->flags |= REDIS_CONNECTED;
720 redisContextConnectUnix(c,path);
721 return c;
722}
723
724/* Set the replyObjectFunctions to use. Returns REDIS_ERR when the reader
725 * was already initialized and the function set could not be re-set.
726 * Return REDIS_OK when they could be set. */
727int redisSetReplyObjectFunctions(redisContext *c, redisReplyObjectFunctions *fn) {
728 if (c->reader != NULL)
729 return REDIS_ERR;
730 c->fn = fn;
731 return REDIS_OK;
732}
733
734/* Helper function to lazily create a reply reader. */
735static void __redisCreateReplyReader(redisContext *c) {
736 if (c->reader == NULL) {
737 c->reader = redisReplyReaderCreate();
738 assert(redisReplyReaderSetReplyObjectFunctions(c->reader,c->fn) == REDIS_OK);
739 }
740}
741
742/* Use this function to handle a read event on the descriptor. It will try
743 * and read some bytes from the socket and feed them to the reply parser.
744 *
745 * After this function is called, you may use redisContextReadReply to
746 * see if there is a reply available. */
747int redisBufferRead(redisContext *c) {
748 char buf[2048];
749 int nread = read(c->fd,buf,sizeof(buf));
750 if (nread == -1) {
751 if (errno == EAGAIN) {
752 /* Try again later */
753 } else {
754 __redisSetError(c,REDIS_ERR_IO,NULL);
755 return REDIS_ERR;
756 }
757 } else if (nread == 0) {
758 __redisSetError(c,REDIS_ERR_EOF,
759 sdsnew("Server closed the connection"));
760 return REDIS_ERR;
761 } else {
762 __redisCreateReplyReader(c);
763 redisReplyReaderFeed(c->reader,buf,nread);
764 }
765 return REDIS_OK;
766}
767
768/* Write the output buffer to the socket.
769 *
770 * Returns REDIS_OK when the buffer is empty, or (a part of) the buffer was
771 * succesfully written to the socket. When the buffer is empty after the
772 * write operation, "wdone" is set to 1 (if given).
773 *
774 * Returns REDIS_ERR if an error occured trying to write and sets
775 * c->error to hold the appropriate error string.
776 */
777int redisBufferWrite(redisContext *c, int *done) {
778 int nwritten;
779 if (sdslen(c->obuf) > 0) {
780 nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
781 if (nwritten == -1) {
782 if (errno == EAGAIN) {
783 /* Try again later */
784 } else {
785 __redisSetError(c,REDIS_ERR_IO,NULL);
786 return REDIS_ERR;
787 }
788 } else if (nwritten > 0) {
789 if (nwritten == (signed)sdslen(c->obuf)) {
790 sdsfree(c->obuf);
791 c->obuf = sdsempty();
792 } else {
793 c->obuf = sdsrange(c->obuf,nwritten,-1);
794 }
795 }
796 }
797 if (done != NULL) *done = (sdslen(c->obuf) == 0);
798 return REDIS_OK;
799}
800
801/* Internal helper function to try and get a reply from the reader,
802 * or set an error in the context otherwise. */
803int redisGetReplyFromReader(redisContext *c, void **reply) {
804 __redisCreateReplyReader(c);
805 if (redisReplyReaderGetReply(c->reader,reply) == REDIS_ERR) {
806 __redisSetError(c,REDIS_ERR_PROTOCOL,
807 sdsnew(((redisReader*)c->reader)->error));
808 return REDIS_ERR;
809 }
810 return REDIS_OK;
811}
812
813int redisGetReply(redisContext *c, void **reply) {
814 int wdone = 0;
815 void *aux = NULL;
816
817 /* Try to read pending replies */
818 if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
819 return REDIS_ERR;
820
821 /* For the blocking context, flush output buffer and read reply */
822 if (aux == NULL && c->flags & REDIS_BLOCK) {
823 /* Write until done */
824 do {
825 if (redisBufferWrite(c,&wdone) == REDIS_ERR)
826 return REDIS_ERR;
827 } while (!wdone);
828
829 /* Read until there is a reply */
830 do {
831 if (redisBufferRead(c) == REDIS_ERR)
832 return REDIS_ERR;
833 if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
834 return REDIS_ERR;
835 } while (aux == NULL);
836 }
837
838 /* Set reply object */
839 if (reply != NULL) *reply = aux;
840 return REDIS_OK;
841}
842
843
844/* Helper function for the redisAppendCommand* family of functions.
845 *
846 * Write a formatted command to the output buffer. When this family
847 * is used, you need to call redisGetReply yourself to retrieve
848 * the reply (or replies in pub/sub).
849 */
850void __redisAppendCommand(redisContext *c, char *cmd, size_t len) {
851 c->obuf = sdscatlen(c->obuf,cmd,len);
852}
853
854void redisvAppendCommand(redisContext *c, const char *format, va_list ap) {
855 char *cmd;
856 int len;
857 len = redisvFormatCommand(&cmd,format,ap);
858 __redisAppendCommand(c,cmd,len);
859 free(cmd);
860}
861
862void redisAppendCommand(redisContext *c, const char *format, ...) {
863 va_list ap;
864 va_start(ap,format);
865 redisvAppendCommand(c,format,ap);
866 va_end(ap);
867}
868
869void redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
870 char *cmd;
871 int len;
872 len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
873 __redisAppendCommand(c,cmd,len);
874 free(cmd);
875}
876
877/* Helper function for the redisCommand* family of functions.
878 *
879 * Write a formatted command to the output buffer. If the given context is
880 * blocking, immediately read the reply into the "reply" pointer. When the
881 * context is non-blocking, the "reply" pointer will not be used and the
882 * command is simply appended to the write buffer.
883 *
884 * Returns the reply when a reply was succesfully retrieved. Returns NULL
885 * otherwise. When NULL is returned in a blocking context, the error field
886 * in the context will be set.
887 */
888static void *__redisCommand(redisContext *c, char *cmd, size_t len) {
889 void *aux = NULL;
890 __redisAppendCommand(c,cmd,len);
891
892 if (c->flags & REDIS_BLOCK) {
893 if (redisGetReply(c,&aux) == REDIS_OK)
894 return aux;
895 return NULL;
896 }
897 return NULL;
898}
899
900void *redisvCommand(redisContext *c, const char *format, va_list ap) {
901 char *cmd;
902 int len;
903 void *reply = NULL;
904 len = redisvFormatCommand(&cmd,format,ap);
905 reply = __redisCommand(c,cmd,len);
906 free(cmd);
907 return reply;
908}
909
910void *redisCommand(redisContext *c, const char *format, ...) {
911 va_list ap;
912 void *reply = NULL;
913 va_start(ap,format);
914 reply = redisvCommand(c,format,ap);
915 va_end(ap);
916 return reply;
917}
918
919void *redisCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
920 char *cmd;
921 int len;
922 void *reply = NULL;
923 len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
924 reply = __redisCommand(c,cmd,len);
925 free(cmd);
926 return reply;
927}