2 * Copyright (c) 2009-2010, Salvatore Sanfilippo <antirez at gmail dot com>
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
41 typedef struct redisReader
{
42 struct redisReplyObjectFunctions
*fn
;
43 sds error
; /* holds optional error */
44 void *reply
; /* holds temporary reply */
46 sds buf
; /* read buffer */
47 unsigned int pos
; /* buffer cursor */
49 redisReadTask rstack
[3]; /* stack of read tasks */
50 int ridx
; /* index of stack */
53 static redisReply
*createReplyObject(int type
);
54 static void *createStringObject(const redisReadTask
*task
, char *str
, size_t len
);
55 static void *createArrayObject(const redisReadTask
*task
, int elements
);
56 static void *createIntegerObject(const redisReadTask
*task
, long long value
);
57 static void *createNilObject(const redisReadTask
*task
);
58 static void redisSetReplyReaderError(redisReader
*r
, sds err
);
60 /* Default set of functions to build the reply. */
61 static redisReplyObjectFunctions defaultFunctions
= {
69 /* Create a reply object */
70 static redisReply
*createReplyObject(int type
) {
71 redisReply
*r
= calloc(sizeof(*r
),1);
78 /* Free a reply object */
79 void freeReplyObject(void *reply
) {
80 redisReply
*r
= reply
;
84 case REDIS_REPLY_INTEGER
:
85 break; /* Nothing to free */
86 case REDIS_REPLY_ARRAY
:
87 for (j
= 0; j
< r
->elements
; j
++)
88 if (r
->element
[j
]) freeReplyObject(r
->element
[j
]);
99 static void *createStringObject(const redisReadTask
*task
, char *str
, size_t len
) {
100 redisReply
*r
= createReplyObject(task
->type
);
101 char *value
= malloc(len
+1);
102 if (!value
) redisOOM();
103 assert(task
->type
== REDIS_REPLY_ERROR
||
104 task
->type
== REDIS_REPLY_STATUS
||
105 task
->type
== REDIS_REPLY_STRING
);
107 /* Copy string value */
108 memcpy(value
,str
,len
);
114 redisReply
*parent
= task
->parent
;
115 assert(parent
->type
== REDIS_REPLY_ARRAY
);
116 parent
->element
[task
->idx
] = r
;
121 static void *createArrayObject(const redisReadTask
*task
, int elements
) {
122 redisReply
*r
= createReplyObject(REDIS_REPLY_ARRAY
);
123 r
->elements
= elements
;
124 if ((r
->element
= calloc(sizeof(redisReply
*),elements
)) == NULL
)
127 redisReply
*parent
= task
->parent
;
128 assert(parent
->type
== REDIS_REPLY_ARRAY
);
129 parent
->element
[task
->idx
] = r
;
134 static void *createIntegerObject(const redisReadTask
*task
, long long value
) {
135 redisReply
*r
= createReplyObject(REDIS_REPLY_INTEGER
);
138 redisReply
*parent
= task
->parent
;
139 assert(parent
->type
== REDIS_REPLY_ARRAY
);
140 parent
->element
[task
->idx
] = r
;
145 static void *createNilObject(const redisReadTask
*task
) {
146 redisReply
*r
= createReplyObject(REDIS_REPLY_NIL
);
148 redisReply
*parent
= task
->parent
;
149 assert(parent
->type
== REDIS_REPLY_ARRAY
);
150 parent
->element
[task
->idx
] = r
;
155 static char *readBytes(redisReader
*r
, unsigned int bytes
) {
157 if (sdslen(r
->buf
)-r
->pos
>= bytes
) {
165 static char *readLine(redisReader
*r
, int *_len
) {
166 char *p
, *s
= strstr(r
->buf
+r
->pos
,"\r\n");
170 len
= s
-(r
->buf
+r
->pos
);
171 r
->pos
+= len
+2; /* skip \r\n */
172 if (_len
) *_len
= len
;
178 static void moveToNextTask(redisReader
*r
) {
179 redisReadTask
*cur
, *prv
;
180 assert(r
->ridx
>= 0);
182 /* Return a.s.a.p. when the stack is now empty. */
188 cur
= &(r
->rstack
[r
->ridx
]);
189 prv
= &(r
->rstack
[r
->ridx
-1]);
190 assert(prv
->type
== REDIS_REPLY_ARRAY
);
191 if (cur
->idx
== prv
->elements
-1) {
195 /* Reset the type because the next item can be anything */
196 assert(cur
->idx
< prv
->elements
);
203 static int processLineItem(redisReader
*r
) {
204 redisReadTask
*cur
= &(r
->rstack
[r
->ridx
]);
209 if ((p
= readLine(r
,&len
)) != NULL
) {
210 if (cur
->type
== REDIS_REPLY_INTEGER
) {
211 obj
= r
->fn
->createInteger(cur
,strtoll(p
,NULL
,10));
213 obj
= r
->fn
->createString(cur
,p
,len
);
216 /* If there is no root yet, register this object as root. */
217 if (r
->reply
== NULL
)
225 static int processBulkItem(redisReader
*r
) {
226 redisReadTask
*cur
= &(r
->rstack
[r
->ridx
]);
230 unsigned long bytelen
;
233 s
= strstr(p
,"\r\n");
236 bytelen
= s
-(r
->buf
+r
->pos
)+2; /* include \r\n */
237 len
= strtol(p
,NULL
,10);
240 /* The nil object can always be created. */
241 obj
= r
->fn
->createNil(cur
);
243 /* Only continue when the buffer contains the entire bulk item. */
244 bytelen
+= len
+2; /* include \r\n */
245 if (r
->pos
+bytelen
<= sdslen(r
->buf
)) {
246 obj
= r
->fn
->createString(cur
,s
+2,len
);
250 /* Proceed when obj was created. */
253 if (r
->reply
== NULL
)
262 static int processMultiBulkItem(redisReader
*r
) {
263 redisReadTask
*cur
= &(r
->rstack
[r
->ridx
]);
268 if ((p
= readLine(r
,NULL
)) != NULL
) {
269 elements
= strtol(p
,NULL
,10);
270 if (elements
== -1) {
271 obj
= r
->fn
->createNil(cur
);
274 obj
= r
->fn
->createArray(cur
,elements
);
276 /* Modify task stack when there are more than 0 elements. */
278 cur
->elements
= elements
;
280 r
->rstack
[r
->ridx
].type
= -1;
281 r
->rstack
[r
->ridx
].elements
= -1;
282 r
->rstack
[r
->ridx
].parent
= obj
;
283 r
->rstack
[r
->ridx
].idx
= 0;
289 /* Object was created, so we can always continue. */
290 if (r
->reply
== NULL
)
297 static int processItem(redisReader
*r
) {
298 redisReadTask
*cur
= &(r
->rstack
[r
->ridx
]);
302 /* check if we need to read type */
304 if ((p
= readBytes(r
,1)) != NULL
) {
307 cur
->type
= REDIS_REPLY_ERROR
;
310 cur
->type
= REDIS_REPLY_STATUS
;
313 cur
->type
= REDIS_REPLY_INTEGER
;
316 cur
->type
= REDIS_REPLY_STRING
;
319 cur
->type
= REDIS_REPLY_ARRAY
;
322 byte
= sdscatrepr(sdsempty(),p
,1);
323 redisSetReplyReaderError(r
,sdscatprintf(sdsempty(),
324 "protocol error, got %s as reply type byte", byte
));
329 /* could not consume 1 byte */
334 /* process typed item */
336 case REDIS_REPLY_ERROR
:
337 case REDIS_REPLY_STATUS
:
338 case REDIS_REPLY_INTEGER
:
339 return processLineItem(r
);
340 case REDIS_REPLY_STRING
:
341 return processBulkItem(r
);
342 case REDIS_REPLY_ARRAY
:
343 return processMultiBulkItem(r
);
345 redisSetReplyReaderError(r
,sdscatprintf(sdsempty(),
346 "unknown item type '%d'", cur
->type
));
351 void *redisReplyReaderCreate(redisReplyObjectFunctions
*fn
) {
352 redisReader
*r
= calloc(sizeof(redisReader
),1);
354 r
->fn
= fn
== NULL
? &defaultFunctions
: fn
;
360 /* External libraries wrapping hiredis might need access to the temporary
361 * variable while the reply is built up. When the reader contains an
362 * object in between receiving some bytes to parse, this object might
363 * otherwise be free'd by garbage collection. */
364 void *redisReplyReaderGetObject(void *reader
) {
365 redisReader
*r
= reader
;
369 void redisReplyReaderFree(void *reader
) {
370 redisReader
*r
= reader
;
371 if (r
->error
!= NULL
)
373 if (r
->reply
!= NULL
)
374 r
->fn
->freeObject(r
->reply
);
380 static void redisSetReplyReaderError(redisReader
*r
, sds err
) {
381 if (r
->reply
!= NULL
)
382 r
->fn
->freeObject(r
->reply
);
384 /* Clear remaining buffer when we see a protocol error. */
385 if (r
->buf
!= NULL
) {
394 char *redisReplyReaderGetError(void *reader
) {
395 redisReader
*r
= reader
;
399 void redisReplyReaderFeed(void *reader
, char *buf
, int len
) {
400 redisReader
*r
= reader
;
402 /* Copy the provided buffer. */
403 if (buf
!= NULL
&& len
>= 1)
404 r
->buf
= sdscatlen(r
->buf
,buf
,len
);
407 int redisReplyReaderGetReply(void *reader
, void **reply
) {
408 redisReader
*r
= reader
;
409 if (reply
!= NULL
) *reply
= NULL
;
411 /* When the buffer is empty, there will never be a reply. */
412 if (sdslen(r
->buf
) == 0)
415 /* Set first item to process when the stack is empty. */
417 r
->rstack
[0].type
= -1;
418 r
->rstack
[0].elements
= -1;
419 r
->rstack
[0].parent
= NULL
;
420 r
->rstack
[0].idx
= -1;
424 /* Process items in reply. */
426 if (processItem(r
) < 0)
429 /* Discard the consumed part of the buffer. */
431 if (r
->pos
== sdslen(r
->buf
)) {
432 /* sdsrange has a quirck on this edge case. */
436 r
->buf
= sdsrange(r
->buf
,r
->pos
,sdslen(r
->buf
));
441 /* Emit a reply when there is one. */
443 void *aux
= r
->reply
;
446 /* Destroy the buffer when it is empty and is quite large. */
447 if (sdslen(r
->buf
) == 0 && sdsavail(r
->buf
) > 16*1024) {
453 /* Check if there actually *is* a reply. */
454 if (r
->error
!= NULL
) {
457 if (reply
!= NULL
) *reply
= aux
;
463 /* Calculate the number of bytes needed to represent an integer as string. */
464 static int intlen(int i
) {
477 /* Helper function for redisvFormatCommand(). */
478 static void addArgument(sds a
, char ***argv
, int *argc
, int *totlen
) {
480 if ((*argv
= realloc(*argv
, sizeof(char*)*(*argc
))) == NULL
) redisOOM();
481 if (totlen
) *totlen
= *totlen
+1+intlen(sdslen(a
))+2+sdslen(a
)+2;
482 (*argv
)[(*argc
)-1] = a
;
485 int redisvFormatCommand(char **target
, const char *format
, va_list ap
) {
487 const char *arg
, *c
= format
;
488 char *cmd
= NULL
; /* final command */
489 int pos
; /* position in final command */
490 sds current
; /* current argument */
495 /* Abort if there is not target to set */
499 /* Build the command string accordingly to protocol */
500 current
= sdsempty();
502 if (*c
!= '%' || c
[1] == '\0') {
504 if (sdslen(current
) != 0) {
505 addArgument(current
, &argv
, &argc
, &totlen
);
506 current
= sdsempty();
509 current
= sdscatlen(current
,c
,1);
514 arg
= va_arg(ap
,char*);
515 current
= sdscat(current
,arg
);
518 arg
= va_arg(ap
,char*);
519 size
= va_arg(ap
,size_t);
520 current
= sdscatlen(current
,arg
,size
);
523 cmd
= sdscat(cmd
,"%");
531 /* Add the last argument if needed */
532 if (sdslen(current
) != 0) {
533 addArgument(current
, &argv
, &argc
, &totlen
);
538 /* Add bytes needed to hold multi bulk count */
539 totlen
+= 1+intlen(argc
)+2;
541 /* Build the command at protocol level */
542 cmd
= malloc(totlen
+1);
543 if (!cmd
) redisOOM();
544 pos
= sprintf(cmd
,"*%d\r\n",argc
);
545 for (j
= 0; j
< argc
; j
++) {
546 pos
+= sprintf(cmd
+pos
,"$%zu\r\n",sdslen(argv
[j
]));
547 memcpy(cmd
+pos
,argv
[j
],sdslen(argv
[j
]));
548 pos
+= sdslen(argv
[j
]);
553 assert(pos
== totlen
);
560 /* Format a command according to the Redis protocol. This function
561 * takes a format similar to printf:
563 * %s represents a C null terminated string you want to interpolate
564 * %b represents a binary safe string
566 * When using %b you need to provide both the pointer to the string
567 * and the length in bytes. Examples:
569 * len = redisFormatCommand(target, "GET %s", mykey);
570 * len = redisFormatCommand(target, "SET %s %b", mykey, myval, myvallen);
572 int redisFormatCommand(char **target
, const char *format
, ...) {
576 len
= redisvFormatCommand(target
,format
,ap
);
581 /* Format a command according to the Redis protocol. This function takes the
582 * number of arguments, an array with arguments and an array with their
583 * lengths. If the latter is set to NULL, strlen will be used to compute the
586 int redisFormatCommandArgv(char **target
, int argc
, const char **argv
, const size_t *argvlen
) {
587 char *cmd
= NULL
; /* final command */
588 int pos
; /* position in final command */
592 /* Calculate number of bytes needed for the command */
593 totlen
= 1+intlen(argc
)+2;
594 for (j
= 0; j
< argc
; j
++) {
595 len
= argvlen
? argvlen
[j
] : strlen(argv
[j
]);
596 totlen
+= 1+intlen(len
)+2+len
+2;
599 /* Build the command at protocol level */
600 cmd
= malloc(totlen
+1);
601 if (!cmd
) redisOOM();
602 pos
= sprintf(cmd
,"*%d\r\n",argc
);
603 for (j
= 0; j
< argc
; j
++) {
604 len
= argvlen
? argvlen
[j
] : strlen(argv
[j
]);
605 pos
+= sprintf(cmd
+pos
,"$%zu\r\n",len
);
606 memcpy(cmd
+pos
,argv
[j
],len
);
611 assert(pos
== totlen
);
617 void __redisSetError(redisContext
*c
, int type
, const sds errstr
) {
619 if (errstr
!= NULL
) {
622 /* Only REDIS_ERR_IO may lack a description! */
623 assert(type
== REDIS_ERR_IO
);
624 c
->errstr
= sdsnew(strerror(errno
));
628 static redisContext
*redisContextInit() {
629 redisContext
*c
= calloc(sizeof(redisContext
),1);
632 c
->obuf
= sdsempty();
633 c
->fn
= &defaultFunctions
;
638 void redisFree(redisContext
*c
) {
639 /* Disconnect before free'ing if not yet disconnected. */
640 if (c
->flags
& REDIS_CONNECTED
)
642 if (c
->errstr
!= NULL
)
646 if (c
->reader
!= NULL
)
647 redisReplyReaderFree(c
->reader
);
651 /* Connect to a Redis instance. On error the field error in the returned
652 * context will be set to the return value of the error function.
653 * When no set of reply functions is given, the default set will be used. */
654 redisContext
*redisConnect(const char *ip
, int port
) {
655 redisContext
*c
= redisContextInit();
656 c
->flags
|= REDIS_BLOCK
;
657 c
->flags
|= REDIS_CONNECTED
;
658 redisContextConnectTcp(c
,ip
,port
);
662 redisContext
*redisConnectNonBlock(const char *ip
, int port
) {
663 redisContext
*c
= redisContextInit();
664 c
->flags
&= ~REDIS_BLOCK
;
665 c
->flags
|= REDIS_CONNECTED
;
666 redisContextConnectTcp(c
,ip
,port
);
670 redisContext
*redisConnectUnix(const char *path
) {
671 redisContext
*c
= redisContextInit();
672 c
->flags
|= REDIS_BLOCK
;
673 c
->flags
|= REDIS_CONNECTED
;
674 redisContextConnectUnix(c
,path
);
678 redisContext
*redisConnectUnixNonBlock(const char *path
) {
679 redisContext
*c
= redisContextInit();
680 c
->flags
&= ~REDIS_BLOCK
;
681 c
->flags
|= REDIS_CONNECTED
;
682 redisContextConnectUnix(c
,path
);
686 /* Set the replyObjectFunctions to use. Returns REDIS_ERR when the reader
687 * was already initialized and the function set could not be re-set.
688 * Return REDIS_OK when they could be set. */
689 int redisSetReplyObjectFunctions(redisContext
*c
, redisReplyObjectFunctions
*fn
) {
690 if (c
->reader
!= NULL
)
696 /* Helper function to lazily create a reply reader. */
697 static void __redisCreateReplyReader(redisContext
*c
) {
698 if (c
->reader
== NULL
)
699 c
->reader
= redisReplyReaderCreate(c
->fn
);
702 /* Use this function to handle a read event on the descriptor. It will try
703 * and read some bytes from the socket and feed them to the reply parser.
705 * After this function is called, you may use redisContextReadReply to
706 * see if there is a reply available. */
707 int redisBufferRead(redisContext
*c
) {
709 int nread
= read(c
->fd
,buf
,sizeof(buf
));
711 if (errno
== EAGAIN
) {
712 /* Try again later */
714 __redisSetError(c
,REDIS_ERR_IO
,NULL
);
717 } else if (nread
== 0) {
718 __redisSetError(c
,REDIS_ERR_EOF
,
719 sdsnew("Server closed the connection"));
722 __redisCreateReplyReader(c
);
723 redisReplyReaderFeed(c
->reader
,buf
,nread
);
728 /* Write the output buffer to the socket.
730 * Returns REDIS_OK when the buffer is empty, or (a part of) the buffer was
731 * succesfully written to the socket. When the buffer is empty after the
732 * write operation, "wdone" is set to 1 (if given).
734 * Returns REDIS_ERR if an error occured trying to write and sets
735 * c->error to hold the appropriate error string.
737 int redisBufferWrite(redisContext
*c
, int *done
) {
739 if (sdslen(c
->obuf
) > 0) {
740 nwritten
= write(c
->fd
,c
->obuf
,sdslen(c
->obuf
));
741 if (nwritten
== -1) {
742 if (errno
== EAGAIN
) {
743 /* Try again later */
745 __redisSetError(c
,REDIS_ERR_IO
,NULL
);
748 } else if (nwritten
> 0) {
749 if (nwritten
== (signed)sdslen(c
->obuf
)) {
751 c
->obuf
= sdsempty();
753 c
->obuf
= sdsrange(c
->obuf
,nwritten
,-1);
757 if (done
!= NULL
) *done
= (sdslen(c
->obuf
) == 0);
761 /* Internal helper function to try and get a reply from the reader,
762 * or set an error in the context otherwise. */
763 int redisGetReplyFromReader(redisContext
*c
, void **reply
) {
764 __redisCreateReplyReader(c
);
765 if (redisReplyReaderGetReply(c
->reader
,reply
) == REDIS_ERR
) {
766 __redisSetError(c
,REDIS_ERR_PROTOCOL
,
767 sdsnew(((redisReader
*)c
->reader
)->error
));
773 int redisGetReply(redisContext
*c
, void **reply
) {
777 /* Try to read pending replies */
778 if (redisGetReplyFromReader(c
,&aux
) == REDIS_ERR
)
781 /* For the blocking context, flush output buffer and read reply */
782 if (aux
== NULL
&& c
->flags
& REDIS_BLOCK
) {
783 /* Write until done */
785 if (redisBufferWrite(c
,&wdone
) == REDIS_ERR
)
789 /* Read until there is a reply */
791 if (redisBufferRead(c
) == REDIS_ERR
)
793 if (redisGetReplyFromReader(c
,&aux
) == REDIS_ERR
)
795 } while (aux
== NULL
);
798 /* Set reply object */
799 if (reply
!= NULL
) *reply
= aux
;
804 /* Helper function for the redisAppendCommand* family of functions.
806 * Write a formatted command to the output buffer. When this family
807 * is used, you need to call redisGetReply yourself to retrieve
808 * the reply (or replies in pub/sub).
810 void __redisAppendCommand(redisContext
*c
, char *cmd
, size_t len
) {
811 c
->obuf
= sdscatlen(c
->obuf
,cmd
,len
);
814 void redisvAppendCommand(redisContext
*c
, const char *format
, va_list ap
) {
817 len
= redisvFormatCommand(&cmd
,format
,ap
);
818 __redisAppendCommand(c
,cmd
,len
);
822 void redisAppendCommand(redisContext
*c
, const char *format
, ...) {
825 redisvAppendCommand(c
,format
,ap
);
829 void redisAppendCommandArgv(redisContext
*c
, int argc
, const char **argv
, const size_t *argvlen
) {
832 len
= redisFormatCommandArgv(&cmd
,argc
,argv
,argvlen
);
833 __redisAppendCommand(c
,cmd
,len
);
837 /* Helper function for the redisCommand* family of functions.
839 * Write a formatted command to the output buffer. If the given context is
840 * blocking, immediately read the reply into the "reply" pointer. When the
841 * context is non-blocking, the "reply" pointer will not be used and the
842 * command is simply appended to the write buffer.
844 * Returns the reply when a reply was succesfully retrieved. Returns NULL
845 * otherwise. When NULL is returned in a blocking context, the error field
846 * in the context will be set.
848 static void *__redisCommand(redisContext
*c
, char *cmd
, size_t len
) {
850 __redisAppendCommand(c
,cmd
,len
);
852 if (c
->flags
& REDIS_BLOCK
) {
853 if (redisGetReply(c
,&aux
) == REDIS_OK
)
860 void *redisvCommand(redisContext
*c
, const char *format
, va_list ap
) {
864 len
= redisvFormatCommand(&cmd
,format
,ap
);
865 reply
= __redisCommand(c
,cmd
,len
);
870 void *redisCommand(redisContext
*c
, const char *format
, ...) {
874 reply
= redisvCommand(c
,format
,ap
);
879 void *redisCommandArgv(redisContext
*c
, int argc
, const char **argv
, const size_t *argvlen
) {
883 len
= redisFormatCommandArgv(&cmd
,argc
,argv
,argvlen
);
884 reply
= __redisCommand(c
,cmd
,len
);