]> git.saurik.com Git - redis.git/blob - deps/hiredis/hiredis.c
top level make clean also clean hiredis and linoise (deps)
[redis.git] / deps / hiredis / hiredis.c
1 /*
2 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include <string.h>
31 #include <stdlib.h>
32 #include <unistd.h>
33 #include <assert.h>
34 #include <errno.h>
35
36 #include "hiredis.h"
37 #include "net.h"
38 #include "sds.h"
39 #include "util.h"
40
41 typedef struct redisReader {
42 struct redisReplyObjectFunctions *fn;
43 sds error; /* holds optional error */
44 void *reply; /* holds temporary reply */
45
46 sds buf; /* read buffer */
47 unsigned int pos; /* buffer cursor */
48
49 redisReadTask rstack[3]; /* stack of read tasks */
50 int ridx; /* index of stack */
51 } redisReader;
52
53 static redisReply *createReplyObject(int type);
54 static void *createStringObject(const redisReadTask *task, char *str, size_t len);
55 static void *createArrayObject(const redisReadTask *task, int elements);
56 static void *createIntegerObject(const redisReadTask *task, long long value);
57 static void *createNilObject(const redisReadTask *task);
58 static void redisSetReplyReaderError(redisReader *r, sds err);
59
60 /* Default set of functions to build the reply. */
61 static redisReplyObjectFunctions defaultFunctions = {
62 createStringObject,
63 createArrayObject,
64 createIntegerObject,
65 createNilObject,
66 freeReplyObject
67 };
68
69 /* Create a reply object */
70 static redisReply *createReplyObject(int type) {
71 redisReply *r = calloc(sizeof(*r),1);
72
73 if (!r) redisOOM();
74 r->type = type;
75 return r;
76 }
77
78 /* Free a reply object */
79 void freeReplyObject(void *reply) {
80 redisReply *r = reply;
81 size_t j;
82
83 switch(r->type) {
84 case REDIS_REPLY_INTEGER:
85 break; /* Nothing to free */
86 case REDIS_REPLY_ARRAY:
87 for (j = 0; j < r->elements; j++)
88 if (r->element[j]) freeReplyObject(r->element[j]);
89 free(r->element);
90 break;
91 default:
92 if (r->str != NULL)
93 free(r->str);
94 break;
95 }
96 free(r);
97 }
98
99 static void *createStringObject(const redisReadTask *task, char *str, size_t len) {
100 redisReply *r = createReplyObject(task->type);
101 char *value = malloc(len+1);
102 if (!value) redisOOM();
103 assert(task->type == REDIS_REPLY_ERROR ||
104 task->type == REDIS_REPLY_STATUS ||
105 task->type == REDIS_REPLY_STRING);
106
107 /* Copy string value */
108 memcpy(value,str,len);
109 value[len] = '\0';
110 r->str = value;
111 r->len = len;
112
113 if (task->parent) {
114 redisReply *parent = task->parent;
115 assert(parent->type == REDIS_REPLY_ARRAY);
116 parent->element[task->idx] = r;
117 }
118 return r;
119 }
120
121 static void *createArrayObject(const redisReadTask *task, int elements) {
122 redisReply *r = createReplyObject(REDIS_REPLY_ARRAY);
123 r->elements = elements;
124 if ((r->element = calloc(sizeof(redisReply*),elements)) == NULL)
125 redisOOM();
126 if (task->parent) {
127 redisReply *parent = task->parent;
128 assert(parent->type == REDIS_REPLY_ARRAY);
129 parent->element[task->idx] = r;
130 }
131 return r;
132 }
133
134 static void *createIntegerObject(const redisReadTask *task, long long value) {
135 redisReply *r = createReplyObject(REDIS_REPLY_INTEGER);
136 r->integer = value;
137 if (task->parent) {
138 redisReply *parent = task->parent;
139 assert(parent->type == REDIS_REPLY_ARRAY);
140 parent->element[task->idx] = r;
141 }
142 return r;
143 }
144
145 static void *createNilObject(const redisReadTask *task) {
146 redisReply *r = createReplyObject(REDIS_REPLY_NIL);
147 if (task->parent) {
148 redisReply *parent = task->parent;
149 assert(parent->type == REDIS_REPLY_ARRAY);
150 parent->element[task->idx] = r;
151 }
152 return r;
153 }
154
155 static char *readBytes(redisReader *r, unsigned int bytes) {
156 char *p;
157 if (sdslen(r->buf)-r->pos >= bytes) {
158 p = r->buf+r->pos;
159 r->pos += bytes;
160 return p;
161 }
162 return NULL;
163 }
164
165 static char *readLine(redisReader *r, int *_len) {
166 char *p, *s = strstr(r->buf+r->pos,"\r\n");
167 int len;
168 if (s != NULL) {
169 p = r->buf+r->pos;
170 len = s-(r->buf+r->pos);
171 r->pos += len+2; /* skip \r\n */
172 if (_len) *_len = len;
173 return p;
174 }
175 return NULL;
176 }
177
178 static void moveToNextTask(redisReader *r) {
179 redisReadTask *cur, *prv;
180 assert(r->ridx >= 0);
181
182 /* Return a.s.a.p. when the stack is now empty. */
183 if (r->ridx == 0) {
184 r->ridx--;
185 return;
186 }
187
188 cur = &(r->rstack[r->ridx]);
189 prv = &(r->rstack[r->ridx-1]);
190 assert(prv->type == REDIS_REPLY_ARRAY);
191 if (cur->idx == prv->elements-1) {
192 r->ridx--;
193 moveToNextTask(r);
194 } else {
195 /* Reset the type because the next item can be anything */
196 assert(cur->idx < prv->elements);
197 cur->type = -1;
198 cur->elements = -1;
199 cur->idx++;
200 }
201 }
202
203 static int processLineItem(redisReader *r) {
204 redisReadTask *cur = &(r->rstack[r->ridx]);
205 void *obj;
206 char *p;
207 int len;
208
209 if ((p = readLine(r,&len)) != NULL) {
210 if (cur->type == REDIS_REPLY_INTEGER) {
211 obj = r->fn->createInteger(cur,strtoll(p,NULL,10));
212 } else {
213 obj = r->fn->createString(cur,p,len);
214 }
215
216 /* If there is no root yet, register this object as root. */
217 if (r->reply == NULL)
218 r->reply = obj;
219 moveToNextTask(r);
220 return 0;
221 }
222 return -1;
223 }
224
225 static int processBulkItem(redisReader *r) {
226 redisReadTask *cur = &(r->rstack[r->ridx]);
227 void *obj = NULL;
228 char *p, *s;
229 long len;
230 unsigned long bytelen;
231
232 p = r->buf+r->pos;
233 s = strstr(p,"\r\n");
234 if (s != NULL) {
235 p = r->buf+r->pos;
236 bytelen = s-(r->buf+r->pos)+2; /* include \r\n */
237 len = strtol(p,NULL,10);
238
239 if (len < 0) {
240 /* The nil object can always be created. */
241 obj = r->fn->createNil(cur);
242 } else {
243 /* Only continue when the buffer contains the entire bulk item. */
244 bytelen += len+2; /* include \r\n */
245 if (r->pos+bytelen <= sdslen(r->buf)) {
246 obj = r->fn->createString(cur,s+2,len);
247 }
248 }
249
250 /* Proceed when obj was created. */
251 if (obj != NULL) {
252 r->pos += bytelen;
253 if (r->reply == NULL)
254 r->reply = obj;
255 moveToNextTask(r);
256 return 0;
257 }
258 }
259 return -1;
260 }
261
262 static int processMultiBulkItem(redisReader *r) {
263 redisReadTask *cur = &(r->rstack[r->ridx]);
264 void *obj;
265 char *p;
266 long elements;
267
268 if ((p = readLine(r,NULL)) != NULL) {
269 elements = strtol(p,NULL,10);
270 if (elements == -1) {
271 obj = r->fn->createNil(cur);
272 moveToNextTask(r);
273 } else {
274 obj = r->fn->createArray(cur,elements);
275
276 /* Modify task stack when there are more than 0 elements. */
277 if (elements > 0) {
278 cur->elements = elements;
279 r->ridx++;
280 r->rstack[r->ridx].type = -1;
281 r->rstack[r->ridx].elements = -1;
282 r->rstack[r->ridx].parent = obj;
283 r->rstack[r->ridx].idx = 0;
284 } else {
285 moveToNextTask(r);
286 }
287 }
288
289 /* Object was created, so we can always continue. */
290 if (r->reply == NULL)
291 r->reply = obj;
292 return 0;
293 }
294 return -1;
295 }
296
297 static int processItem(redisReader *r) {
298 redisReadTask *cur = &(r->rstack[r->ridx]);
299 char *p;
300 sds byte;
301
302 /* check if we need to read type */
303 if (cur->type < 0) {
304 if ((p = readBytes(r,1)) != NULL) {
305 switch (p[0]) {
306 case '-':
307 cur->type = REDIS_REPLY_ERROR;
308 break;
309 case '+':
310 cur->type = REDIS_REPLY_STATUS;
311 break;
312 case ':':
313 cur->type = REDIS_REPLY_INTEGER;
314 break;
315 case '$':
316 cur->type = REDIS_REPLY_STRING;
317 break;
318 case '*':
319 cur->type = REDIS_REPLY_ARRAY;
320 break;
321 default:
322 byte = sdscatrepr(sdsempty(),p,1);
323 redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
324 "protocol error, got %s as reply type byte", byte));
325 sdsfree(byte);
326 return -1;
327 }
328 } else {
329 /* could not consume 1 byte */
330 return -1;
331 }
332 }
333
334 /* process typed item */
335 switch(cur->type) {
336 case REDIS_REPLY_ERROR:
337 case REDIS_REPLY_STATUS:
338 case REDIS_REPLY_INTEGER:
339 return processLineItem(r);
340 case REDIS_REPLY_STRING:
341 return processBulkItem(r);
342 case REDIS_REPLY_ARRAY:
343 return processMultiBulkItem(r);
344 default:
345 redisSetReplyReaderError(r,sdscatprintf(sdsempty(),
346 "unknown item type '%d'", cur->type));
347 return -1;
348 }
349 }
350
351 void *redisReplyReaderCreate(redisReplyObjectFunctions *fn) {
352 redisReader *r = calloc(sizeof(redisReader),1);
353 r->error = NULL;
354 r->fn = fn == NULL ? &defaultFunctions : fn;
355 r->buf = sdsempty();
356 r->ridx = -1;
357 return r;
358 }
359
360 /* External libraries wrapping hiredis might need access to the temporary
361 * variable while the reply is built up. When the reader contains an
362 * object in between receiving some bytes to parse, this object might
363 * otherwise be free'd by garbage collection. */
364 void *redisReplyReaderGetObject(void *reader) {
365 redisReader *r = reader;
366 return r->reply;
367 }
368
369 void redisReplyReaderFree(void *reader) {
370 redisReader *r = reader;
371 if (r->error != NULL)
372 sdsfree(r->error);
373 if (r->reply != NULL)
374 r->fn->freeObject(r->reply);
375 if (r->buf != NULL)
376 sdsfree(r->buf);
377 free(r);
378 }
379
380 static void redisSetReplyReaderError(redisReader *r, sds err) {
381 if (r->reply != NULL)
382 r->fn->freeObject(r->reply);
383
384 /* Clear remaining buffer when we see a protocol error. */
385 if (r->buf != NULL) {
386 sdsfree(r->buf);
387 r->buf = sdsempty();
388 r->pos = 0;
389 }
390 r->ridx = -1;
391 r->error = err;
392 }
393
394 char *redisReplyReaderGetError(void *reader) {
395 redisReader *r = reader;
396 return r->error;
397 }
398
399 void redisReplyReaderFeed(void *reader, char *buf, int len) {
400 redisReader *r = reader;
401
402 /* Copy the provided buffer. */
403 if (buf != NULL && len >= 1)
404 r->buf = sdscatlen(r->buf,buf,len);
405 }
406
407 int redisReplyReaderGetReply(void *reader, void **reply) {
408 redisReader *r = reader;
409 if (reply != NULL) *reply = NULL;
410
411 /* When the buffer is empty, there will never be a reply. */
412 if (sdslen(r->buf) == 0)
413 return REDIS_OK;
414
415 /* Set first item to process when the stack is empty. */
416 if (r->ridx == -1) {
417 r->rstack[0].type = -1;
418 r->rstack[0].elements = -1;
419 r->rstack[0].parent = NULL;
420 r->rstack[0].idx = -1;
421 r->ridx = 0;
422 }
423
424 /* Process items in reply. */
425 while (r->ridx >= 0)
426 if (processItem(r) < 0)
427 break;
428
429 /* Discard the consumed part of the buffer. */
430 if (r->pos > 0) {
431 if (r->pos == sdslen(r->buf)) {
432 /* sdsrange has a quirck on this edge case. */
433 sdsfree(r->buf);
434 r->buf = sdsempty();
435 } else {
436 r->buf = sdsrange(r->buf,r->pos,sdslen(r->buf));
437 }
438 r->pos = 0;
439 }
440
441 /* Emit a reply when there is one. */
442 if (r->ridx == -1) {
443 void *aux = r->reply;
444 r->reply = NULL;
445
446 /* Destroy the buffer when it is empty and is quite large. */
447 if (sdslen(r->buf) == 0 && sdsavail(r->buf) > 16*1024) {
448 sdsfree(r->buf);
449 r->buf = sdsempty();
450 r->pos = 0;
451 }
452
453 /* Check if there actually *is* a reply. */
454 if (r->error != NULL) {
455 return REDIS_ERR;
456 } else {
457 if (reply != NULL) *reply = aux;
458 }
459 }
460 return REDIS_OK;
461 }
462
463 /* Calculate the number of bytes needed to represent an integer as string. */
464 static int intlen(int i) {
465 int len = 0;
466 if (i < 0) {
467 len++;
468 i = -i;
469 }
470 do {
471 len++;
472 i /= 10;
473 } while(i);
474 return len;
475 }
476
477 /* Helper function for redisvFormatCommand(). */
478 static void addArgument(sds a, char ***argv, int *argc, int *totlen) {
479 (*argc)++;
480 if ((*argv = realloc(*argv, sizeof(char*)*(*argc))) == NULL) redisOOM();
481 if (totlen) *totlen = *totlen+1+intlen(sdslen(a))+2+sdslen(a)+2;
482 (*argv)[(*argc)-1] = a;
483 }
484
485 int redisvFormatCommand(char **target, const char *format, va_list ap) {
486 size_t size;
487 const char *arg, *c = format;
488 char *cmd = NULL; /* final command */
489 int pos; /* position in final command */
490 sds current; /* current argument */
491 char **argv = NULL;
492 int argc = 0, j;
493 int totlen = 0;
494
495 /* Abort if there is not target to set */
496 if (target == NULL)
497 return -1;
498
499 /* Build the command string accordingly to protocol */
500 current = sdsempty();
501 while(*c != '\0') {
502 if (*c != '%' || c[1] == '\0') {
503 if (*c == ' ') {
504 if (sdslen(current) != 0) {
505 addArgument(current, &argv, &argc, &totlen);
506 current = sdsempty();
507 }
508 } else {
509 current = sdscatlen(current,c,1);
510 }
511 } else {
512 switch(c[1]) {
513 case 's':
514 arg = va_arg(ap,char*);
515 current = sdscat(current,arg);
516 break;
517 case 'b':
518 arg = va_arg(ap,char*);
519 size = va_arg(ap,size_t);
520 current = sdscatlen(current,arg,size);
521 break;
522 case '%':
523 cmd = sdscat(cmd,"%");
524 break;
525 }
526 c++;
527 }
528 c++;
529 }
530
531 /* Add the last argument if needed */
532 if (sdslen(current) != 0) {
533 addArgument(current, &argv, &argc, &totlen);
534 } else {
535 sdsfree(current);
536 }
537
538 /* Add bytes needed to hold multi bulk count */
539 totlen += 1+intlen(argc)+2;
540
541 /* Build the command at protocol level */
542 cmd = malloc(totlen+1);
543 if (!cmd) redisOOM();
544 pos = sprintf(cmd,"*%d\r\n",argc);
545 for (j = 0; j < argc; j++) {
546 pos += sprintf(cmd+pos,"$%zu\r\n",sdslen(argv[j]));
547 memcpy(cmd+pos,argv[j],sdslen(argv[j]));
548 pos += sdslen(argv[j]);
549 sdsfree(argv[j]);
550 cmd[pos++] = '\r';
551 cmd[pos++] = '\n';
552 }
553 assert(pos == totlen);
554 free(argv);
555 cmd[totlen] = '\0';
556 *target = cmd;
557 return totlen;
558 }
559
560 /* Format a command according to the Redis protocol. This function
561 * takes a format similar to printf:
562 *
563 * %s represents a C null terminated string you want to interpolate
564 * %b represents a binary safe string
565 *
566 * When using %b you need to provide both the pointer to the string
567 * and the length in bytes. Examples:
568 *
569 * len = redisFormatCommand(target, "GET %s", mykey);
570 * len = redisFormatCommand(target, "SET %s %b", mykey, myval, myvallen);
571 */
572 int redisFormatCommand(char **target, const char *format, ...) {
573 va_list ap;
574 int len;
575 va_start(ap,format);
576 len = redisvFormatCommand(target,format,ap);
577 va_end(ap);
578 return len;
579 }
580
581 /* Format a command according to the Redis protocol. This function takes the
582 * number of arguments, an array with arguments and an array with their
583 * lengths. If the latter is set to NULL, strlen will be used to compute the
584 * argument lengths.
585 */
586 int redisFormatCommandArgv(char **target, int argc, const char **argv, const size_t *argvlen) {
587 char *cmd = NULL; /* final command */
588 int pos; /* position in final command */
589 size_t len;
590 int totlen, j;
591
592 /* Calculate number of bytes needed for the command */
593 totlen = 1+intlen(argc)+2;
594 for (j = 0; j < argc; j++) {
595 len = argvlen ? argvlen[j] : strlen(argv[j]);
596 totlen += 1+intlen(len)+2+len+2;
597 }
598
599 /* Build the command at protocol level */
600 cmd = malloc(totlen+1);
601 if (!cmd) redisOOM();
602 pos = sprintf(cmd,"*%d\r\n",argc);
603 for (j = 0; j < argc; j++) {
604 len = argvlen ? argvlen[j] : strlen(argv[j]);
605 pos += sprintf(cmd+pos,"$%zu\r\n",len);
606 memcpy(cmd+pos,argv[j],len);
607 pos += len;
608 cmd[pos++] = '\r';
609 cmd[pos++] = '\n';
610 }
611 assert(pos == totlen);
612 cmd[totlen] = '\0';
613 *target = cmd;
614 return totlen;
615 }
616
617 void __redisSetError(redisContext *c, int type, const sds errstr) {
618 c->err = type;
619 if (errstr != NULL) {
620 c->errstr = errstr;
621 } else {
622 /* Only REDIS_ERR_IO may lack a description! */
623 assert(type == REDIS_ERR_IO);
624 c->errstr = sdsnew(strerror(errno));
625 }
626 }
627
628 static redisContext *redisContextInit() {
629 redisContext *c = calloc(sizeof(redisContext),1);
630 c->err = 0;
631 c->errstr = NULL;
632 c->obuf = sdsempty();
633 c->fn = &defaultFunctions;
634 c->reader = NULL;
635 return c;
636 }
637
638 void redisFree(redisContext *c) {
639 /* Disconnect before free'ing if not yet disconnected. */
640 if (c->flags & REDIS_CONNECTED)
641 close(c->fd);
642 if (c->errstr != NULL)
643 sdsfree(c->errstr);
644 if (c->obuf != NULL)
645 sdsfree(c->obuf);
646 if (c->reader != NULL)
647 redisReplyReaderFree(c->reader);
648 free(c);
649 }
650
651 /* Connect to a Redis instance. On error the field error in the returned
652 * context will be set to the return value of the error function.
653 * When no set of reply functions is given, the default set will be used. */
654 redisContext *redisConnect(const char *ip, int port) {
655 redisContext *c = redisContextInit();
656 c->flags |= REDIS_BLOCK;
657 c->flags |= REDIS_CONNECTED;
658 redisContextConnectTcp(c,ip,port);
659 return c;
660 }
661
662 redisContext *redisConnectNonBlock(const char *ip, int port) {
663 redisContext *c = redisContextInit();
664 c->flags &= ~REDIS_BLOCK;
665 c->flags |= REDIS_CONNECTED;
666 redisContextConnectTcp(c,ip,port);
667 return c;
668 }
669
670 redisContext *redisConnectUnix(const char *path) {
671 redisContext *c = redisContextInit();
672 c->flags |= REDIS_BLOCK;
673 c->flags |= REDIS_CONNECTED;
674 redisContextConnectUnix(c,path);
675 return c;
676 }
677
678 redisContext *redisConnectUnixNonBlock(const char *path) {
679 redisContext *c = redisContextInit();
680 c->flags &= ~REDIS_BLOCK;
681 c->flags |= REDIS_CONNECTED;
682 redisContextConnectUnix(c,path);
683 return c;
684 }
685
686 /* Set the replyObjectFunctions to use. Returns REDIS_ERR when the reader
687 * was already initialized and the function set could not be re-set.
688 * Return REDIS_OK when they could be set. */
689 int redisSetReplyObjectFunctions(redisContext *c, redisReplyObjectFunctions *fn) {
690 if (c->reader != NULL)
691 return REDIS_ERR;
692 c->fn = fn;
693 return REDIS_OK;
694 }
695
696 /* Helper function to lazily create a reply reader. */
697 static void __redisCreateReplyReader(redisContext *c) {
698 if (c->reader == NULL)
699 c->reader = redisReplyReaderCreate(c->fn);
700 }
701
702 /* Use this function to handle a read event on the descriptor. It will try
703 * and read some bytes from the socket and feed them to the reply parser.
704 *
705 * After this function is called, you may use redisContextReadReply to
706 * see if there is a reply available. */
707 int redisBufferRead(redisContext *c) {
708 char buf[2048];
709 int nread = read(c->fd,buf,sizeof(buf));
710 if (nread == -1) {
711 if (errno == EAGAIN) {
712 /* Try again later */
713 } else {
714 __redisSetError(c,REDIS_ERR_IO,NULL);
715 return REDIS_ERR;
716 }
717 } else if (nread == 0) {
718 __redisSetError(c,REDIS_ERR_EOF,
719 sdsnew("Server closed the connection"));
720 return REDIS_ERR;
721 } else {
722 __redisCreateReplyReader(c);
723 redisReplyReaderFeed(c->reader,buf,nread);
724 }
725 return REDIS_OK;
726 }
727
728 /* Write the output buffer to the socket.
729 *
730 * Returns REDIS_OK when the buffer is empty, or (a part of) the buffer was
731 * succesfully written to the socket. When the buffer is empty after the
732 * write operation, "wdone" is set to 1 (if given).
733 *
734 * Returns REDIS_ERR if an error occured trying to write and sets
735 * c->error to hold the appropriate error string.
736 */
737 int redisBufferWrite(redisContext *c, int *done) {
738 int nwritten;
739 if (sdslen(c->obuf) > 0) {
740 nwritten = write(c->fd,c->obuf,sdslen(c->obuf));
741 if (nwritten == -1) {
742 if (errno == EAGAIN) {
743 /* Try again later */
744 } else {
745 __redisSetError(c,REDIS_ERR_IO,NULL);
746 return REDIS_ERR;
747 }
748 } else if (nwritten > 0) {
749 if (nwritten == (signed)sdslen(c->obuf)) {
750 sdsfree(c->obuf);
751 c->obuf = sdsempty();
752 } else {
753 c->obuf = sdsrange(c->obuf,nwritten,-1);
754 }
755 }
756 }
757 if (done != NULL) *done = (sdslen(c->obuf) == 0);
758 return REDIS_OK;
759 }
760
761 /* Internal helper function to try and get a reply from the reader,
762 * or set an error in the context otherwise. */
763 int redisGetReplyFromReader(redisContext *c, void **reply) {
764 __redisCreateReplyReader(c);
765 if (redisReplyReaderGetReply(c->reader,reply) == REDIS_ERR) {
766 __redisSetError(c,REDIS_ERR_PROTOCOL,
767 sdsnew(((redisReader*)c->reader)->error));
768 return REDIS_ERR;
769 }
770 return REDIS_OK;
771 }
772
773 int redisGetReply(redisContext *c, void **reply) {
774 int wdone = 0;
775 void *aux = NULL;
776
777 /* Try to read pending replies */
778 if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
779 return REDIS_ERR;
780
781 /* For the blocking context, flush output buffer and read reply */
782 if (aux == NULL && c->flags & REDIS_BLOCK) {
783 /* Write until done */
784 do {
785 if (redisBufferWrite(c,&wdone) == REDIS_ERR)
786 return REDIS_ERR;
787 } while (!wdone);
788
789 /* Read until there is a reply */
790 do {
791 if (redisBufferRead(c) == REDIS_ERR)
792 return REDIS_ERR;
793 if (redisGetReplyFromReader(c,&aux) == REDIS_ERR)
794 return REDIS_ERR;
795 } while (aux == NULL);
796 }
797
798 /* Set reply object */
799 if (reply != NULL) *reply = aux;
800 return REDIS_OK;
801 }
802
803
804 /* Helper function for the redisAppendCommand* family of functions.
805 *
806 * Write a formatted command to the output buffer. When this family
807 * is used, you need to call redisGetReply yourself to retrieve
808 * the reply (or replies in pub/sub).
809 */
810 void __redisAppendCommand(redisContext *c, char *cmd, size_t len) {
811 c->obuf = sdscatlen(c->obuf,cmd,len);
812 }
813
814 void redisvAppendCommand(redisContext *c, const char *format, va_list ap) {
815 char *cmd;
816 int len;
817 len = redisvFormatCommand(&cmd,format,ap);
818 __redisAppendCommand(c,cmd,len);
819 free(cmd);
820 }
821
822 void redisAppendCommand(redisContext *c, const char *format, ...) {
823 va_list ap;
824 va_start(ap,format);
825 redisvAppendCommand(c,format,ap);
826 va_end(ap);
827 }
828
829 void redisAppendCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
830 char *cmd;
831 int len;
832 len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
833 __redisAppendCommand(c,cmd,len);
834 free(cmd);
835 }
836
837 /* Helper function for the redisCommand* family of functions.
838 *
839 * Write a formatted command to the output buffer. If the given context is
840 * blocking, immediately read the reply into the "reply" pointer. When the
841 * context is non-blocking, the "reply" pointer will not be used and the
842 * command is simply appended to the write buffer.
843 *
844 * Returns the reply when a reply was succesfully retrieved. Returns NULL
845 * otherwise. When NULL is returned in a blocking context, the error field
846 * in the context will be set.
847 */
848 static void *__redisCommand(redisContext *c, char *cmd, size_t len) {
849 void *aux = NULL;
850 __redisAppendCommand(c,cmd,len);
851
852 if (c->flags & REDIS_BLOCK) {
853 if (redisGetReply(c,&aux) == REDIS_OK)
854 return aux;
855 return NULL;
856 }
857 return NULL;
858 }
859
860 void *redisvCommand(redisContext *c, const char *format, va_list ap) {
861 char *cmd;
862 int len;
863 void *reply = NULL;
864 len = redisvFormatCommand(&cmd,format,ap);
865 reply = __redisCommand(c,cmd,len);
866 free(cmd);
867 return reply;
868 }
869
870 void *redisCommand(redisContext *c, const char *format, ...) {
871 va_list ap;
872 void *reply = NULL;
873 va_start(ap,format);
874 reply = redisvCommand(c,format,ap);
875 va_end(ap);
876 return reply;
877 }
878
879 void *redisCommandArgv(redisContext *c, int argc, const char **argv, const size_t *argvlen) {
880 char *cmd;
881 int len;
882 void *reply = NULL;
883 len = redisFormatCommandArgv(&cmd,argc,argv,argvlen);
884 reply = __redisCommand(c,cmd,len);
885 free(cmd);
886 return reply;
887 }