]> git.saurik.com Git - redis.git/blame - deps/hiredis/hiredis.c
Update hiredis
[redis.git] / deps / hiredis / hiredis.c
CommitLineData
24f753a8
PN
1/*
2 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#include <string.h>
31#include <stdlib.h>
32#include <unistd.h>
33#include <assert.h>
34#include <errno.h>
35
36#include "hiredis.h"
37#include "net.h"
38#include "sds.h"
39#include "util.h"
40
41typedef struct redisReader {
42 struct redisReplyObjectFunctions *fn;
43 sds error; /* holds optional error */
44 void *reply; /* holds temporary reply */
45
46 sds buf; /* read buffer */
47 unsigned int pos; /* buffer cursor */
48
49 redisReadTask rstack[3]; /* stack of read tasks */
50 int ridx; /* index of stack */
51} redisReader;
52
53static redisReply *createReplyObject(int type);
54static void *createStringObject(const redisReadTask *task, char *str, size_t len);
55static void *createArrayObject(const redisReadTask *task, int elements);
56static void *createIntegerObject(const redisReadTask *task, long long value);
57static void *createNilObject(const redisReadTask *task);
58static void redisSetReplyReaderError(redisReader *r, sds err);
59
60/* Default set of functions to build the reply. */
61static redisReplyObjectFunctions defaultFunctions = {
62 createStringObject,
63 createArrayObject,
64 createIntegerObject,
65 createNilObject,
66 freeReplyObject
67};
68
69/* Create a reply object */
70static redisReply *createReplyObject(int type) {
71 redisReply *r = calloc(sizeof(*r),1);
72
73 if (!r) redisOOM();
74 r->type = type;
75 return r;
76}
77
78/* Free a reply object */
79void freeReplyObject(void *reply) {
80 redisReply *r = reply;
81 size_t j;
82
83 switch(r->type) {
84 case REDIS_REPLY_INTEGER:
85 break; /* Nothing to free */
86 case REDIS_REPLY_ARRAY:
87 for (j = 0; j < r->elements; j++)
88 if (r->element[j]) freeReplyObject(r->element[j]);
89 free(r->element);
90 break;
91 default:
92 if (r->str != NULL)
93 free(r->str);
94 break;
95 }
96 free(r);
97}
98
99static void *createStringObject(const redisReadTask *task, char *str, size_t len) {
100 redisReply *r = createReplyObject(task->type);
101 char *value = malloc(len+1);
102 if (!value) redisOOM();
103 assert(task->type == REDIS_REPLY_ERROR ||
104 task->type == REDIS_REPLY_STATUS ||
105 task->type == REDIS_REPLY_STRING);
106
107 /* Copy string value */
108 memcpy(value,str,len);
109 value[len] = '\0';
110 r->str = value;
111 r->len = len;
112
113 if (task->parent) {
114 redisReply *parent = task->parent;
115 assert(parent->type == REDIS_REPLY_ARRAY);
116 parent->element[task->idx] = r;
117 }
118 return r;
119}
120
121static void *createArrayObject(const redisReadTask *task, int elements) {
122 redisReply *r = createReplyObject(REDIS_REPLY_ARRAY);
123 r->elements = elements;
124 if ((r->element = calloc(sizeof(redisReply*),elements)) == NULL)
125 redisOOM();
126 if (task->parent) {
127 redisReply *parent = task->parent;
128 assert(parent->type == REDIS_REPLY_ARRAY);
129 parent->element[task->idx] = r;
130 }
131 return r;
132}
133
134static void *createIntegerObject(const redisReadTask *task, long long value) {
135 redisReply *r = createReplyObject(REDIS_REPLY_INTEGER);
136 r->integer = value;
137 if (task->parent) {
138 redisReply *parent = task->parent;
139 assert(parent->type == REDIS_REPLY_ARRAY);
140 parent->element[task->idx] = r;
141 }
142 return r;
143}
144
145static void *createNilObject(const redisReadTask *task) {
146 redisReply *r = createReplyObject(REDIS_REPLY_NIL);
147 if (task->parent) {
148 redisReply *parent = task->parent;
149 assert(parent->type == REDIS_REPLY_ARRAY);
150 parent->element[task->idx] = r;
151 }
152 return r;
153}
154
155static char *readBytes(redisReader *r, unsigned int bytes) {
156 char *p;
157 if (sdslen(r->buf)-r->pos >= bytes) {
158 p = r->buf+r->pos;
159 r->pos += bytes;
160 return p;
161 }
162 return NULL;
163}
164
afc156c2
PN
165static char *seekNewline(char *s) {
166 /* Find pointer to \r\n without strstr */
167 while(s != NULL && s[0] != '\r' && s[1] != '\n')
168 s = strchr(s,'\r');
169 return s;
170}
171
24f753a8 172static char *readLine(redisReader *r, int *_len) {
afc156c2 173 char *p, *s;
24f753a8 174 int len;
afc156c2
PN
175
176 p = r->buf+r->pos;
177 s = seekNewline(p);
24f753a8 178 if (s != NULL) {
24f753a8
PN
179 len = s-(r->buf+r->pos);
180 r->pos += len+2; /* skip \r\n */
181 if (_len) *_len = len;
182 return p;
183 }
184 return NULL;
185}
186
187static void moveToNextTask(redisReader *r) {
188 redisReadTask *cur, *prv;
afc156c2
PN
189 while (r->ridx >= 0) {
190 /* Return a.s.a.p. when the stack is now empty. */
191 if (r->ridx == 0) {
192 r->ridx--;
193 return;
194 }
24f753a8 195
afc156c2
PN
196 cur = &(r->rstack[r->ridx]);
197 prv = &(r->rstack[r->ridx-1]);
198 assert(prv->type == REDIS_REPLY_ARRAY);
199 if (cur->idx == prv->elements-1) {
200 r->ridx--;
201 } else {
202 /* Reset the type because the next item can be anything */
203 assert(cur->idx < prv->elements);
204 cur->type = -1;
205 cur->elements = -1;
206 cur->idx++;
207 return;
208 }
24f753a8
PN
209 }
210}
211
212static int processLineItem(redisReader *r) {
213 redisReadTask *cur = &(r->rstack[r->ridx]);
214 void *obj;
215 char *p;
216 int len;
217
218 if ((p = readLine(r,&len)) != NULL) {
afc156c2
PN
219 if (r->fn) {
220 if (cur->type == REDIS_REPLY_INTEGER) {
221 obj = r->fn->createInteger(cur,strtoll(p,NULL,10));
222 } else {
223 obj = r->fn->createString(cur,p,len);
224 }
24f753a8 225 } else {
afc156c2 226 obj = (void*)(size_t)(cur->type);
24f753a8
PN
227 }
228
229 /* If there is no root yet, register this object as root. */
230 if (r->reply == NULL)
231 r->reply = obj;
232 moveToNextTask(r);
233 return 0;
234 }
235 return -1;
236}
237
238static int processBulkItem(redisReader *r) {
239 redisReadTask *cur = &(r->rstack[r->ridx]);
240 void *obj = NULL;
241 char *p, *s;
242 long len;
243 unsigned long bytelen;
244
245 p = r->buf+r->pos;
afc156c2 246 s = seekNewline(p);
24f753a8
PN
247 if (s != NULL) {
248 p = r->buf+r->pos;
249 bytelen = s-(r->buf+r->pos)+2; /* include \r\n */
250 len = strtol(p,NULL,10);
251
252 if (len < 0) {
253 /* The nil object can always be created. */
afc156c2
PN
254 obj = r->fn ? r->fn->createNil(cur) :
255 (void*)REDIS_REPLY_NIL;
24f753a8
PN
256 } else {
257 /* Only continue when the buffer contains the entire bulk item. */
258 bytelen += len+2; /* include \r\n */
259 if (r->pos+bytelen <= sdslen(r->buf)) {
afc156c2
PN
260 obj = r->fn ? r->fn->createString(cur,s+2,len) :
261 (void*)REDIS_REPLY_STRING;
24f753a8
PN
262 }
263 }
264
265 /* Proceed when obj was created. */
266 if (obj != NULL) {
267 r->pos += bytelen;
268 if (r->reply == NULL)
269 r->reply = obj;
270 moveToNextTask(r);
271 return 0;
272 }
273 }
274 return -1;
275}
276
277static int processMultiBulkItem(redisReader *r) {
278 redisReadTask *cur = &(r->rstack[r->ridx]);
279 void *obj;
280 char *p;
281 long elements;
282
283 if ((p = readLine(r,NULL)) != NULL) {
284 elements = strtol(p,NULL,10);
285 if (elements == -1) {
afc156c2
PN
286 obj = r->fn ? r->fn->createNil(cur) :
287 (void*)REDIS_REPLY_NIL;
24f753a8
PN
288 moveToNextTask(r);
289 } else {
afc156c2
PN
290 obj = r->fn ? r->fn->createArray(cur,elements) :
291 (void*)REDIS_REPLY_ARRAY;
24f753a8
PN
292
293 /* Modify task stack when there are more than 0 elements. */
294 if (elements > 0) {
295 cur->elements = elements;
296 r->ridx++;
297 r->rstack[r->ridx].type = -1;
298 r->rstack[r->ridx].elements = -1;
299 r->rstack[r->ridx].parent = obj;
300 r->rstack[r->ridx].idx = 0;
301 } else {
302 moveToNextTask(r);
303 }
304 }
305
306 /* Object was created, so we can always continue. */
307 if (r->reply == NULL)
308 r->reply = obj;
309 return 0;
310 }
311 return -1;
312}
313
314static int processItem(redisReader *r) {
315 redisReadTask *cur = &(r->rstack[r->ridx]);
316 char *p;
317 sds byte;
318
319 /* check if we need to read type */
320 if (cur->type < 0) {
321 if ((p = readBytes(r,1)) != NULL) {
322 switch (p[0]) {
323 case '-':
324 cur->type = REDIS_REPLY_ERROR;
325 break;
326 case '+':
327 cur->type = REDIS_REPLY_STATUS;
328 break;
329 case ':':
330 cur->type = REDIS_REPLY_INTEGER;
331 break;
332 case '$':
333 cur->type = REDIS_REPLY_STRING;
334 break;
335 case '*':
336 cur->type = REDIS_REPLY_ARRAY;
337 break;
338 default:
339 byte = sdscatrepr(sdsempty(),p,1);
340 redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
341 "protocol error, got %s as reply type byte", byte));
342 sdsfree(byte);
343 return -1;
344 }
345 } else {
346 /* could not consume 1 byte */
347 return -1;
348 }
349 }
350
351 /* process typed item */
352 switch(cur->type) {
353 case REDIS_REPLY_ERROR:
354 case REDIS_REPLY_STATUS:
355 case REDIS_REPLY_INTEGER:
356 return processLineItem(r);
357 case REDIS_REPLY_STRING:
358 return processBulkItem(r);
359 case REDIS_REPLY_ARRAY:
360 return processMultiBulkItem(r);
361 default:
362 redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
363 "unknown item type '%d'", cur->type));
364 return -1;
365 }
366}
367
afc156c2 368void *redisReplyReaderCreate() {
24f753a8
PN
369 redisReader *r = calloc(sizeof(redisReader),1);
370 r->error = NULL;
afc156c2 371 r->fn = &defaultFunctions;
24f753a8
PN
372 r->buf = sdsempty();
373 r->ridx = -1;
374 return r;
375}
376
afc156c2
PN
377/* Set the function set to build the reply. Returns REDIS_OK when there
378 * is no temporary object and it can be set, REDIS_ERR otherwise. */
379int redisReplyReaderSetReplyObjectFunctions(void *reader, redisReplyObjectFunctions *fn) {
380 redisReader *r = reader;
381 if (r->reply == NULL) {
382 r->fn = fn;
383 return REDIS_OK;
384 }
385 return REDIS_ERR;
386}
387
24f753a8
PN
388/* External libraries wrapping hiredis might need access to the temporary
389 * variable while the reply is built up. When the reader contains an
390 * object in between receiving some bytes to parse, this object might
391 * otherwise be free'd by garbage collection. */
392void *redisReplyReaderGetObject(void *reader) {
393 redisReader *r = reader;
394 return r->reply;
395}
396
397void redisReplyReaderFree(void *reader) {
398 redisReader *r = reader;
399 if (r->error != NULL)
400 sdsfree(r->error);
afc156c2 401 if (r->reply != NULL && r->fn)
24f753a8
PN
402 r->fn->freeObject(r->reply);
403 if (r->buf != NULL)
404 sdsfree(r->buf);
405 free(r);
406}
407
408static void redisSetReplyReaderError(redisReader *r, sds err) {
409 if (r->reply != NULL)
410 r->fn->freeObject(r->reply);
411
412 /* Clear remaining buffer when we see a protocol error. */
413 if (r->buf != NULL) {
414 sdsfree(r->buf);
415 r->buf = sdsempty();
416 r->pos = 0;
417 }
418 r->ridx = -1;
419 r->error = err;
420}
421
422char *redisReplyReaderGetError(void *reader) {
423 redisReader *r = reader;
424 return r->error;
425}
426
427void redisReplyReaderFeed(void *reader, char *buf, int len) {
428 redisReader *r = reader;
429
430 /* Copy the provided buffer. */
431 if (buf != NULL && len >= 1)
432 r->buf = sdscatlen(r->buf,buf,len);
433}
434
435int redisReplyReaderGetReply(void *reader, void **reply) {
436 redisReader *r = reader;
437 if (reply != NULL) *reply = NULL;
438
439 /* When the buffer is empty, there will never be a reply. */
440 if (sdslen(r->buf) == 0)
441 return REDIS_OK;
442
443 /* Set first item to process when the stack is empty. */
444 if (r->ridx == -1) {
445 r->rstack[0].type = -1;
446 r->rstack[0].elements = -1;
447 r->rstack[0].parent = NULL;
448 r->rstack[0].idx = -1;
449 r->ridx = 0;
450 }
451
452 /* Process items in reply. */
453 while (r->ridx >= 0)
454 if (processItem(r) < 0)
455 break;
456
457 /* Discard the consumed part of the buffer. */
458 if (r->pos > 0) {
459 if (r->pos == sdslen(r->buf)) {
460 /* sdsrange has a quirck on this edge case. */
461 sdsfree(r->buf);
462 r->buf = sdsempty();
463 } else {
464 r->buf = sdsrange(r->buf,r->pos,sdslen(r->buf));
465 }
466 r->pos = 0;
467 }
468
469 /* Emit a reply when there is one. */
470 if (r->ridx == -1) {
471 void *aux = r->reply;
472 r->reply = NULL;
473
474 /* Destroy the buffer when it is empty and is quite large. */
475 if (sdslen(r->buf) == 0 && sdsavail(r->buf) > 16*1024) {
476 sdsfree(r->buf);
477 r->buf = sdsempty();
478 r->pos = 0;
479 }
480
481 /* Check if there actually *is* a reply. */
482 if (r->error != NULL) {
483 return REDIS_ERR;
484 } else {
485 if (reply != NULL) *reply = aux;
486 }
487 }
488 return REDIS_OK;
489}
490
491/* Calculate the number of bytes needed to represent an integer as string. */
492static int intlen(int i) {
493 int len = 0;
494 if (i < 0) {
495 len++;
496 i = -i;
497 }
498 do {
499 len++;
500 i /= 10;
501 } while(i);
502 return len;
503}
504
505/* Helper function for redisvFormatCommand(). */
506static void addArgument(sds a, char ***argv, int *argc, int *totlen) {
507 (*argc)++;
508 if ((*argv = realloc(*argv, sizeof(char*)*(*argc))) == NULL) redisOOM();
509 if (totlen) *totlen = *totlen+1+intlen(sdslen(a))+2+sdslen(a)+2;
510 (*argv)[(*argc)-1] = a;
511}
512
513int redisvFormatCommand(char **target, const char *format, va_list ap) {
514 size_t size;
515 const char *arg, *c = format;
516 char *cmd = NULL; /* final command */
517 int pos; /* position in final command */
518 sds current; /* current argument */
519 char **argv = NULL;
520 int argc = 0, j;
521 int totlen = 0;
522
523 /* Abort if there is not target to set */
524 if (target == NULL)
525 return -1;
526
527 /* Build the command string accordingly to protocol */
528 current = sdsempty();
529 while(*c != '\0') {
530 if (*c != '%' || c[1] == '\0') {
531 if (*c == ' ') {
532 if (sdslen(current) != 0) {
533 addArgument(current, &argv, &argc, &totlen);
534 current = sdsempty();
535 }
536 } else {
537 current = sdscatlen(current,c,1);
538 }
539 } else {
540 switch(c[1]) {
541 case 's':
542 arg = va_arg(ap,char*);
543 current = sdscat(current,arg);
544 break;
545 case 'b':
546 arg = va_arg(ap,char*);
547 size = va_arg(ap,size_t);
548 current = sdscatlen(current,arg,size);
549 break;
550 case '%':
551 cmd = sdscat(cmd,"%");
552 break;
553 }
554 c++;
555 }
556 c++;
557 }
558
559 /* Add the last argument if needed */
560 if (sdslen(current) != 0) {
561 addArgument(current, &argv, &argc, &totlen);
562 } else {
563 sdsfree(current);
564 }
565
566 /* Add bytes needed to hold multi bulk count */
567 totlen += 1+intlen(argc)+2;
568
569 /* Build the command at protocol level */
570 cmd = malloc(totlen+1);
571 if (!cmd) redisOOM();
572 pos = sprintf(cmd,"*%d\r\n",argc);
573 for (j = 0; j < argc; j++) {
574 pos += sprintf(cmd+pos,"$%zu\r\n",sdslen(argv[j]));
575 memcpy(cmd+pos,argv[j],sdslen(argv[j]));
576 pos += sdslen(argv[j]);
577 sdsfree(argv[j]);
578 cmd[pos++] = '\r';
579 cmd[pos++] = '\n';
580 }
581 assert(pos == totlen);
582 free(argv);
583 cmd[totlen] = '\0';
584 *target = cmd;
585 return totlen;
586}
587
588/* Format a command according to the Redis protocol. This function
589 * takes a format similar to printf:
590 *
591 * %s represents a C null terminated string you want to interpolate
592 * %b represents a binary safe string
593 *
594 * When using %b you need to provide both the pointer to the string
595 * and the length in bytes. Examples:
596 *
597 * len = redisFormatCommand(target, "GET %s", mykey);
598 * len = redisFormatCommand(target, "SET %s %b", mykey, myval, myvallen);
599 */
600int redisFormatCommand(char **target, const char *format, ...) {
601 va_list ap;
602 int len;
603 va_start(ap,format);
604 len = redisvFormatCommand(target,format,ap);
605 va_end(ap);
606 return len;
607}
608
609/* Format a command according to the Redis protocol. This function takes the
610 * number of arguments, an array with arguments and an array with their
611 * lengths. If the latter is set to NULL, strlen will be used to compute the
612 * argument lengths.
613 */
614int redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen) {
615 char *cmd = NULL; /* final command */
616 int pos; /* position in final command */
617 size_t len;
618 int totlen, j;
619
620 /* Calculate number of bytes needed for the command */
621 totlen = 1+intlen(argc)+2;
622 for (j = 0; j < argc; j++) {
623 len = argvlen ? argvlen[j] : strlen(argv[j]);
624 totlen += 1+intlen(len)+2+len+2;
625 }
626
627 /* Build the command at protocol level */
628 cmd = malloc(totlen+1);
629 if (!cmd) redisOOM();
630 pos = sprintf(cmd,"*%d\r\n",argc);
631 for (j = 0; j < argc; j++) {
632 len = argvlen ? argvlen[j] : strlen(argv[j]);
633 pos += sprintf(cmd+pos,"$%zu\r\n",len);
634 memcpy(cmd+pos,argv[j],len);
635 pos += len;
636 cmd[pos++] = '\r';
637 cmd[pos++] = '\n';
638 }
639 assert(pos == totlen);
640 cmd[totlen] = '\0';
641 *target = cmd;
642 return totlen;
643}
644
645void __redisSetError(redisContext *c, int type, const sds errstr) {
646 c->err = type;
647 if (errstr != NULL) {
648 c->errstr = errstr;
649 } else {
650 /* Only REDIS_ERR_IO may lack a description! */
651 assert(type == REDIS_ERR_IO);
652 c->errstr = sdsnew(strerror(errno));
653 }
654}
655
656static redisContext *redisContextInit() {
657 redisContext *c = calloc(sizeof(redisContext),1);
658 c->err = 0;
659 c->errstr = NULL;
660 c->obuf = sdsempty();
661 c->fn = &defaultFunctions;
662 c->reader = NULL;
663 return c;
664}
665
666void redisFree(redisContext *c) {
667 /* Disconnect before free'ing if not yet disconnected. */
668 if (c->flags & REDIS_CONNECTED)
669 close(c->fd);
670 if (c->errstr != NULL)
671 sdsfree(c->errstr);
672 if (c->obuf != NULL)
673 sdsfree(c->obuf);
674 if (c->reader != NULL)
675 redisReplyReaderFree(c->reader);
676 free(c);
677}
678
679/* Connect to a Redis instance. On error the field error in the returned
680 * context will be set to the return value of the error function.
681 * When no set of reply functions is given, the default set will be used. */
682redisContext *redisConnect(const char *ip, int port) {
683 redisContext *c = redisContextInit();
684 c->flags |= REDIS_BLOCK;
685 c->flags |= REDIS_CONNECTED;
686 redisContextConnectTcp(c,ip,port);
687 return c;
688}
689
690redisContext *redisConnectNonBlock(const char *ip, int port) {
691 redisContext *c = redisContextInit();
692 c->flags &= ~REDIS_BLOCK;
693 c->flags |= REDIS_CONNECTED;
694 redisContextConnectTcp(c,ip,port);
695 return c;
696}
697
698redisContext *redisConnectUnix(const char *path) {
699 redisContext *c = redisContextInit();
700 c->flags |= REDIS_BLOCK;
701 c->flags |= REDIS_CONNECTED;
702 redisContextConnectUnix(c,path);
703 return c;
704}
705
706redisContext *redisConnectUnixNonBlock(const char *path) {
707 redisContext *c = redisContextInit();
708 c->flags &= ~REDIS_BLOCK;
709 c->flags |= REDIS_CONNECTED;
710 redisContextConnectUnix(c,path);
711 return c;
712}
713
714/* Set the replyObjectFunctions to use. Returns REDIS_ERR when the reader
715 * was already initialized and the function set could not be re-set.
716 * Return REDIS_OK when they could be set. */
717int redisSetReplyObjectFunctions(redisContext *c, redisReplyObjectFunctions *fn) {
718 if (c->reader != NULL)
719 return REDIS_ERR;
720 c->fn = fn;
721 return REDIS_OK;
722}
723
724/* Helper function to lazily create a reply reader. */
725static void __redisCreateReplyReader(redisContext *c) {
afc156c2
PN
726 if (c->reader == NULL) {
727 c->reader = redisReplyReaderCreate();
728 assert(redisReplyReaderSetReplyObjectFunctions(c->reader,c->fn) == REDIS_OK);
729 }
24f753a8
PN
730}
731
732/* Use this function to handle a read event on the descriptor. It will try
733 * and read some bytes from the socket and feed them to the reply parser.
734 *
735 * After this function is called, you may use redisContextReadReply to
736 * see if there is a reply available. */
737int redisBufferRead(redisContext *c) {
738 char buf[2048];
739 int nread = read(c->fd,buf,sizeof(buf));
740 if (nread == -1) {
741 if (errno == EAGAIN) {
742 /* Try again later */
743 } else {
744 __redisSetError(c,REDIS_ERR_IO,NULL);
745 return REDIS_ERR;
746 }
747 } else if (nread == 0) {
748 __redisSetError(c,REDIS_ERR_EOF,
749 sdsnew("Server closed the connection"));
750 return REDIS_ERR;
751 } else {
752 __redisCreateReplyReader(c);
753 redisReplyReaderFeed(c->reader,buf,nread);
754 }
755 return REDIS_OK;
756}
757
758/* Write the output buffer to the socket.
759 *
760 * Returns REDIS_OK when the buffer is empty, or (a part of) the buffer was
761 * succesfully written to the socket. When the buffer is empty after the
762 * write operation, "wdone" is set to 1 (if given).
763 *
764 * Returns REDIS_ERR if an error occured trying to write and sets
765 * c->error to hold the appropriate error string.
766 */
767int redisBufferWrite(redisContext *c, int *done) {
768 int nwritten;
769 if (sdslen(c->obuf) > 0) {
770 nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
771 if (nwritten == -1) {
772 if (errno == EAGAIN) {
773 /* Try again later */
774 } else {
775 __redisSetError(c,REDIS_ERR_IO,NULL);
776 return REDIS_ERR;
777 }
778 } else if (nwritten > 0) {
779 if (nwritten == (signed)sdslen(c->obuf)) {
780 sdsfree(c->obuf);
781 c->obuf = sdsempty();
782 } else {
783 c->obuf = sdsrange(c->obuf,nwritten,-1);
784 }
785 }
786 }
787 if (done != NULL) *done = (sdslen(c->obuf) == 0);
788 return REDIS_OK;
789}
790
791/* Internal helper function to try and get a reply from the reader,
792 * or set an error in the context otherwise. */
793int redisGetReplyFromReader(redisContext *c, void **reply) {
794 __redisCreateReplyReader(c);
795 if (redisReplyReaderGetReply(c->reader,reply) == REDIS_ERR) {
796 __redisSetError(c,REDIS_ERR_PROTOCOL,
797 sdsnew(((redisReader*)c->reader)->error));
798 return REDIS_ERR;
799 }
800 return REDIS_OK;
801}
802
803int redisGetReply(redisContext *c, void **reply) {
804 int wdone = 0;
805 void *aux = NULL;
806
807 /* Try to read pending replies */
808 if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
809 return REDIS_ERR;
810
811 /* For the blocking context, flush output buffer and read reply */
812 if (aux == NULL && c->flags & REDIS_BLOCK) {
813 /* Write until done */
814 do {
815 if (redisBufferWrite(c,&wdone) == REDIS_ERR)
816 return REDIS_ERR;
817 } while (!wdone);
818
819 /* Read until there is a reply */
820 do {
821 if (redisBufferRead(c) == REDIS_ERR)
822 return REDIS_ERR;
823 if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
824 return REDIS_ERR;
825 } while (aux == NULL);
826 }
827
828 /* Set reply object */
829 if (reply != NULL) *reply = aux;
830 return REDIS_OK;
831}
832
833
834/* Helper function for the redisAppendCommand* family of functions.
835 *
836 * Write a formatted command to the output buffer. When this family
837 * is used, you need to call redisGetReply yourself to retrieve
838 * the reply (or replies in pub/sub).
839 */
840void __redisAppendCommand(redisContext *c, char *cmd, size_t len) {
841 c->obuf = sdscatlen(c->obuf,cmd,len);
842}
843
844void redisvAppendCommand(redisContext *c, const char *format, va_list ap) {
845 char *cmd;
846 int len;
847 len = redisvFormatCommand(&cmd,format,ap);
848 __redisAppendCommand(c,cmd,len);
849 free(cmd);
850}
851
852void redisAppendCommand(redisContext *c, const char *format, ...) {
853 va_list ap;
854 va_start(ap,format);
855 redisvAppendCommand(c,format,ap);
856 va_end(ap);
857}
858
859void redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
860 char *cmd;
861 int len;
862 len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
863 __redisAppendCommand(c,cmd,len);
864 free(cmd);
865}
866
867/* Helper function for the redisCommand* family of functions.
868 *
869 * Write a formatted command to the output buffer. If the given context is
870 * blocking, immediately read the reply into the "reply" pointer. When the
871 * context is non-blocking, the "reply" pointer will not be used and the
872 * command is simply appended to the write buffer.
873 *
874 * Returns the reply when a reply was succesfully retrieved. Returns NULL
875 * otherwise. When NULL is returned in a blocking context, the error field
876 * in the context will be set.
877 */
878static void *__redisCommand(redisContext *c, char *cmd, size_t len) {
879 void *aux = NULL;
880 __redisAppendCommand(c,cmd,len);
881
882 if (c->flags & REDIS_BLOCK) {
883 if (redisGetReply(c,&aux) == REDIS_OK)
884 return aux;
885 return NULL;
886 }
887 return NULL;
888}
889
890void *redisvCommand(redisContext *c, const char *format, va_list ap) {
891 char *cmd;
892 int len;
893 void *reply = NULL;
894 len = redisvFormatCommand(&cmd,format,ap);
895 reply = __redisCommand(c,cmd,len);
896 free(cmd);
897 return reply;
898}
899
900void *redisCommand(redisContext *c, const char *format, ...) {
901 va_list ap;
902 void *reply = NULL;
903 va_start(ap,format);
904 reply = redisvCommand(c,format,ap);
905 va_end(ap);
906 return reply;
907}
908
909void *redisCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
910 char *cmd;
911 int len;
912 void *reply = NULL;
913 len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
914 reply = __redisCommand(c,cmd,len);
915 free(cmd);
916 return reply;
917}