]> git.saurik.com Git - redis.git/blob - redis.c
Now the PUSH side of RPOPLPUSH is able to unblock clients blocked on BLPOP
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.3.1"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160 #define REDIS_MULTI 16 /* This client is in a MULTI context */
161 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
162
163 /* Slave replication state - slave side */
164 #define REDIS_REPL_NONE 0 /* No active replication */
165 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
166 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
167
168 /* Slave replication state - from the point of view of master
169 * Note that in SEND_BULK and ONLINE state the slave receives new updates
170 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
171 * to start the next background saving in order to send updates to it. */
172 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
173 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
174 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
175 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
176
177 /* List related stuff */
178 #define REDIS_HEAD 0
179 #define REDIS_TAIL 1
180
181 /* Sort operations */
182 #define REDIS_SORT_GET 0
183 #define REDIS_SORT_ASC 1
184 #define REDIS_SORT_DESC 2
185 #define REDIS_SORTKEY_MAX 1024
186
187 /* Log levels */
188 #define REDIS_DEBUG 0
189 #define REDIS_NOTICE 1
190 #define REDIS_WARNING 2
191
192 /* Anti-warning macro... */
193 #define REDIS_NOTUSED(V) ((void) V)
194
195 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
196 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
197
198 /* Append only defines */
199 #define APPENDFSYNC_NO 0
200 #define APPENDFSYNC_ALWAYS 1
201 #define APPENDFSYNC_EVERYSEC 2
202
203 /* We can print the stacktrace, so our assert is defined this way: */
204 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
205 static void _redisAssert(char *estr);
206
207 /*================================= Data types ============================== */
208
209 /* A redis object, that is a type able to hold a string / list / set */
210 typedef struct redisObject {
211 void *ptr;
212 unsigned char type;
213 unsigned char encoding;
214 unsigned char notused[2];
215 int refcount;
216 } robj;
217
218 /* Macro used to initalize a Redis object allocated on the stack.
219 * Note that this macro is taken near the structure definition to make sure
220 * we'll update it when the structure is changed, to avoid bugs like
221 * bug #85 introduced exactly in this way. */
222 #define initStaticStringObject(_var,_ptr) do { \
223 _var.refcount = 1; \
224 _var.type = REDIS_STRING; \
225 _var.encoding = REDIS_ENCODING_RAW; \
226 _var.ptr = _ptr; \
227 } while(0);
228
229 typedef struct redisDb {
230 dict *dict; /* The keyspace for this DB */
231 dict *expires; /* Timeout of keys with a timeout set */
232 dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
233 int id;
234 } redisDb;
235
236 /* Client MULTI/EXEC state */
237 typedef struct multiCmd {
238 robj **argv;
239 int argc;
240 struct redisCommand *cmd;
241 } multiCmd;
242
243 typedef struct multiState {
244 multiCmd *commands; /* Array of MULTI commands */
245 int count; /* Total number of MULTI commands */
246 } multiState;
247
248 /* With multiplexing we need to take per-clinet state.
249 * Clients are taken in a liked list. */
250 typedef struct redisClient {
251 int fd;
252 redisDb *db;
253 int dictid;
254 sds querybuf;
255 robj **argv, **mbargv;
256 int argc, mbargc;
257 int bulklen; /* bulk read len. -1 if not in bulk read mode */
258 int multibulk; /* multi bulk command format active */
259 list *reply;
260 int sentlen;
261 time_t lastinteraction; /* time of the last interaction, used for timeout */
262 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
263 /* REDIS_MULTI */
264 int slaveseldb; /* slave selected db, if this client is a slave */
265 int authenticated; /* when requirepass is non-NULL */
266 int replstate; /* replication state if this is a slave */
267 int repldbfd; /* replication DB file descriptor */
268 long repldboff; /* replication DB file offset */
269 off_t repldbsize; /* replication DB file size */
270 multiState mstate; /* MULTI/EXEC state */
271 robj **blockingkeys; /* The key we waiting to terminate a blocking
272 * operation such as BLPOP. Otherwise NULL. */
273 int blockingkeysnum; /* Number of blocking keys */
274 time_t blockingto; /* Blocking operation timeout. If UNIX current time
275 * is >= blockingto then the operation timed out. */
276 } redisClient;
277
278 struct saveparam {
279 time_t seconds;
280 int changes;
281 };
282
283 /* Global server state structure */
284 struct redisServer {
285 int port;
286 int fd;
287 redisDb *db;
288 dict *sharingpool; /* Poll used for object sharing */
289 unsigned int sharingpoolsize;
290 long long dirty; /* changes to DB from the last save */
291 list *clients;
292 list *slaves, *monitors;
293 char neterr[ANET_ERR_LEN];
294 aeEventLoop *el;
295 int cronloops; /* number of times the cron function run */
296 list *objfreelist; /* A list of freed objects to avoid malloc() */
297 time_t lastsave; /* Unix time of last save succeeede */
298 size_t usedmemory; /* Used memory in megabytes */
299 /* Fields used only for stats */
300 time_t stat_starttime; /* server start time */
301 long long stat_numcommands; /* number of processed commands */
302 long long stat_numconnections; /* number of connections received */
303 /* Configuration */
304 int verbosity;
305 int glueoutputbuf;
306 int maxidletime;
307 int dbnum;
308 int daemonize;
309 int appendonly;
310 int appendfsync;
311 time_t lastfsync;
312 int appendfd;
313 int appendseldb;
314 char *pidfile;
315 pid_t bgsavechildpid;
316 pid_t bgrewritechildpid;
317 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
318 struct saveparam *saveparams;
319 int saveparamslen;
320 char *logfile;
321 char *bindaddr;
322 char *dbfilename;
323 char *appendfilename;
324 char *requirepass;
325 int shareobjects;
326 int rdbcompression;
327 /* Replication related */
328 int isslave;
329 char *masterauth;
330 char *masterhost;
331 int masterport;
332 redisClient *master; /* client that is master for this slave */
333 int replstate;
334 unsigned int maxclients;
335 unsigned long maxmemory;
336 unsigned int blockedclients;
337 /* Sort parameters - qsort_r() is only available under BSD so we
338 * have to take this state global, in order to pass it to sortCompare() */
339 int sort_desc;
340 int sort_alpha;
341 int sort_bypattern;
342 };
343
344 typedef void redisCommandProc(redisClient *c);
345 struct redisCommand {
346 char *name;
347 redisCommandProc *proc;
348 int arity;
349 int flags;
350 };
351
352 struct redisFunctionSym {
353 char *name;
354 unsigned long pointer;
355 };
356
357 typedef struct _redisSortObject {
358 robj *obj;
359 union {
360 double score;
361 robj *cmpobj;
362 } u;
363 } redisSortObject;
364
365 typedef struct _redisSortOperation {
366 int type;
367 robj *pattern;
368 } redisSortOperation;
369
370 /* ZSETs use a specialized version of Skiplists */
371
372 typedef struct zskiplistNode {
373 struct zskiplistNode **forward;
374 struct zskiplistNode *backward;
375 double score;
376 robj *obj;
377 } zskiplistNode;
378
379 typedef struct zskiplist {
380 struct zskiplistNode *header, *tail;
381 unsigned long length;
382 int level;
383 } zskiplist;
384
385 typedef struct zset {
386 dict *dict;
387 zskiplist *zsl;
388 } zset;
389
390 /* Our shared "common" objects */
391
392 struct sharedObjectsStruct {
393 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
394 *colon, *nullbulk, *nullmultibulk, *queued,
395 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
396 *outofrangeerr, *plus,
397 *select0, *select1, *select2, *select3, *select4,
398 *select5, *select6, *select7, *select8, *select9;
399 } shared;
400
401 /* Global vars that are actally used as constants. The following double
402 * values are used for double on-disk serialization, and are initialized
403 * at runtime to avoid strange compiler optimizations. */
404
405 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
406
407 /*================================ Prototypes =============================== */
408
409 static void freeStringObject(robj *o);
410 static void freeListObject(robj *o);
411 static void freeSetObject(robj *o);
412 static void decrRefCount(void *o);
413 static robj *createObject(int type, void *ptr);
414 static void freeClient(redisClient *c);
415 static int rdbLoad(char *filename);
416 static void addReply(redisClient *c, robj *obj);
417 static void addReplySds(redisClient *c, sds s);
418 static void incrRefCount(robj *o);
419 static int rdbSaveBackground(char *filename);
420 static robj *createStringObject(char *ptr, size_t len);
421 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
422 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
423 static int syncWithMaster(void);
424 static robj *tryObjectSharing(robj *o);
425 static int tryObjectEncoding(robj *o);
426 static robj *getDecodedObject(robj *o);
427 static int removeExpire(redisDb *db, robj *key);
428 static int expireIfNeeded(redisDb *db, robj *key);
429 static int deleteIfVolatile(redisDb *db, robj *key);
430 static int deleteKey(redisDb *db, robj *key);
431 static time_t getExpire(redisDb *db, robj *key);
432 static int setExpire(redisDb *db, robj *key, time_t when);
433 static void updateSlavesWaitingBgsave(int bgsaveerr);
434 static void freeMemoryIfNeeded(void);
435 static int processCommand(redisClient *c);
436 static void setupSigSegvAction(void);
437 static void rdbRemoveTempFile(pid_t childpid);
438 static void aofRemoveTempFile(pid_t childpid);
439 static size_t stringObjectLen(robj *o);
440 static void processInputBuffer(redisClient *c);
441 static zskiplist *zslCreate(void);
442 static void zslFree(zskiplist *zsl);
443 static void zslInsert(zskiplist *zsl, double score, robj *obj);
444 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
445 static void initClientMultiState(redisClient *c);
446 static void freeClientMultiState(redisClient *c);
447 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
448 static void unblockClient(redisClient *c);
449 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
450
451 static void authCommand(redisClient *c);
452 static void pingCommand(redisClient *c);
453 static void echoCommand(redisClient *c);
454 static void setCommand(redisClient *c);
455 static void setnxCommand(redisClient *c);
456 static void getCommand(redisClient *c);
457 static void delCommand(redisClient *c);
458 static void existsCommand(redisClient *c);
459 static void incrCommand(redisClient *c);
460 static void decrCommand(redisClient *c);
461 static void incrbyCommand(redisClient *c);
462 static void decrbyCommand(redisClient *c);
463 static void selectCommand(redisClient *c);
464 static void randomkeyCommand(redisClient *c);
465 static void keysCommand(redisClient *c);
466 static void dbsizeCommand(redisClient *c);
467 static void lastsaveCommand(redisClient *c);
468 static void saveCommand(redisClient *c);
469 static void bgsaveCommand(redisClient *c);
470 static void bgrewriteaofCommand(redisClient *c);
471 static void shutdownCommand(redisClient *c);
472 static void moveCommand(redisClient *c);
473 static void renameCommand(redisClient *c);
474 static void renamenxCommand(redisClient *c);
475 static void lpushCommand(redisClient *c);
476 static void rpushCommand(redisClient *c);
477 static void lpopCommand(redisClient *c);
478 static void rpopCommand(redisClient *c);
479 static void llenCommand(redisClient *c);
480 static void lindexCommand(redisClient *c);
481 static void lrangeCommand(redisClient *c);
482 static void ltrimCommand(redisClient *c);
483 static void typeCommand(redisClient *c);
484 static void lsetCommand(redisClient *c);
485 static void saddCommand(redisClient *c);
486 static void sremCommand(redisClient *c);
487 static void smoveCommand(redisClient *c);
488 static void sismemberCommand(redisClient *c);
489 static void scardCommand(redisClient *c);
490 static void spopCommand(redisClient *c);
491 static void srandmemberCommand(redisClient *c);
492 static void sinterCommand(redisClient *c);
493 static void sinterstoreCommand(redisClient *c);
494 static void sunionCommand(redisClient *c);
495 static void sunionstoreCommand(redisClient *c);
496 static void sdiffCommand(redisClient *c);
497 static void sdiffstoreCommand(redisClient *c);
498 static void syncCommand(redisClient *c);
499 static void flushdbCommand(redisClient *c);
500 static void flushallCommand(redisClient *c);
501 static void sortCommand(redisClient *c);
502 static void lremCommand(redisClient *c);
503 static void rpoplpushcommand(redisClient *c);
504 static void infoCommand(redisClient *c);
505 static void mgetCommand(redisClient *c);
506 static void monitorCommand(redisClient *c);
507 static void expireCommand(redisClient *c);
508 static void expireatCommand(redisClient *c);
509 static void getsetCommand(redisClient *c);
510 static void ttlCommand(redisClient *c);
511 static void slaveofCommand(redisClient *c);
512 static void debugCommand(redisClient *c);
513 static void msetCommand(redisClient *c);
514 static void msetnxCommand(redisClient *c);
515 static void zaddCommand(redisClient *c);
516 static void zincrbyCommand(redisClient *c);
517 static void zrangeCommand(redisClient *c);
518 static void zrangebyscoreCommand(redisClient *c);
519 static void zrevrangeCommand(redisClient *c);
520 static void zcardCommand(redisClient *c);
521 static void zremCommand(redisClient *c);
522 static void zscoreCommand(redisClient *c);
523 static void zremrangebyscoreCommand(redisClient *c);
524 static void multiCommand(redisClient *c);
525 static void execCommand(redisClient *c);
526 static void blpopCommand(redisClient *c);
527 static void brpopCommand(redisClient *c);
528
529 /*================================= Globals ================================= */
530
531 /* Global vars */
532 static struct redisServer server; /* server global state */
533 static struct redisCommand cmdTable[] = {
534 {"get",getCommand,2,REDIS_CMD_INLINE},
535 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
536 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
537 {"del",delCommand,-2,REDIS_CMD_INLINE},
538 {"exists",existsCommand,2,REDIS_CMD_INLINE},
539 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
540 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
541 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
542 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
543 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
544 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
545 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
546 {"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
547 {"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
548 {"llen",llenCommand,2,REDIS_CMD_INLINE},
549 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
550 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
551 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
552 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
553 {"lrem",lremCommand,4,REDIS_CMD_BULK},
554 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
555 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
556 {"srem",sremCommand,3,REDIS_CMD_BULK},
557 {"smove",smoveCommand,4,REDIS_CMD_BULK},
558 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
559 {"scard",scardCommand,2,REDIS_CMD_INLINE},
560 {"spop",spopCommand,2,REDIS_CMD_INLINE},
561 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
562 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
563 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
564 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
565 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
566 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
567 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
568 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
569 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
570 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
571 {"zrem",zremCommand,3,REDIS_CMD_BULK},
572 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
573 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
574 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
575 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
576 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
577 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
578 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
579 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
580 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
581 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
582 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
583 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
584 {"select",selectCommand,2,REDIS_CMD_INLINE},
585 {"move",moveCommand,3,REDIS_CMD_INLINE},
586 {"rename",renameCommand,3,REDIS_CMD_INLINE},
587 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
588 {"expire",expireCommand,3,REDIS_CMD_INLINE},
589 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
590 {"keys",keysCommand,2,REDIS_CMD_INLINE},
591 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
592 {"auth",authCommand,2,REDIS_CMD_INLINE},
593 {"ping",pingCommand,1,REDIS_CMD_INLINE},
594 {"echo",echoCommand,2,REDIS_CMD_BULK},
595 {"save",saveCommand,1,REDIS_CMD_INLINE},
596 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
597 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
598 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
599 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
600 {"type",typeCommand,2,REDIS_CMD_INLINE},
601 {"multi",multiCommand,1,REDIS_CMD_INLINE},
602 {"exec",execCommand,1,REDIS_CMD_INLINE},
603 {"sync",syncCommand,1,REDIS_CMD_INLINE},
604 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
605 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
606 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
607 {"info",infoCommand,1,REDIS_CMD_INLINE},
608 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
609 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
610 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
611 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
612 {NULL,NULL,0,0}
613 };
614
615 /*============================ Utility functions ============================ */
616
617 /* Glob-style pattern matching. */
618 int stringmatchlen(const char *pattern, int patternLen,
619 const char *string, int stringLen, int nocase)
620 {
621 while(patternLen) {
622 switch(pattern[0]) {
623 case '*':
624 while (pattern[1] == '*') {
625 pattern++;
626 patternLen--;
627 }
628 if (patternLen == 1)
629 return 1; /* match */
630 while(stringLen) {
631 if (stringmatchlen(pattern+1, patternLen-1,
632 string, stringLen, nocase))
633 return 1; /* match */
634 string++;
635 stringLen--;
636 }
637 return 0; /* no match */
638 break;
639 case '?':
640 if (stringLen == 0)
641 return 0; /* no match */
642 string++;
643 stringLen--;
644 break;
645 case '[':
646 {
647 int not, match;
648
649 pattern++;
650 patternLen--;
651 not = pattern[0] == '^';
652 if (not) {
653 pattern++;
654 patternLen--;
655 }
656 match = 0;
657 while(1) {
658 if (pattern[0] == '\\') {
659 pattern++;
660 patternLen--;
661 if (pattern[0] == string[0])
662 match = 1;
663 } else if (pattern[0] == ']') {
664 break;
665 } else if (patternLen == 0) {
666 pattern--;
667 patternLen++;
668 break;
669 } else if (pattern[1] == '-' && patternLen >= 3) {
670 int start = pattern[0];
671 int end = pattern[2];
672 int c = string[0];
673 if (start > end) {
674 int t = start;
675 start = end;
676 end = t;
677 }
678 if (nocase) {
679 start = tolower(start);
680 end = tolower(end);
681 c = tolower(c);
682 }
683 pattern += 2;
684 patternLen -= 2;
685 if (c >= start && c <= end)
686 match = 1;
687 } else {
688 if (!nocase) {
689 if (pattern[0] == string[0])
690 match = 1;
691 } else {
692 if (tolower((int)pattern[0]) == tolower((int)string[0]))
693 match = 1;
694 }
695 }
696 pattern++;
697 patternLen--;
698 }
699 if (not)
700 match = !match;
701 if (!match)
702 return 0; /* no match */
703 string++;
704 stringLen--;
705 break;
706 }
707 case '\\':
708 if (patternLen >= 2) {
709 pattern++;
710 patternLen--;
711 }
712 /* fall through */
713 default:
714 if (!nocase) {
715 if (pattern[0] != string[0])
716 return 0; /* no match */
717 } else {
718 if (tolower((int)pattern[0]) != tolower((int)string[0]))
719 return 0; /* no match */
720 }
721 string++;
722 stringLen--;
723 break;
724 }
725 pattern++;
726 patternLen--;
727 if (stringLen == 0) {
728 while(*pattern == '*') {
729 pattern++;
730 patternLen--;
731 }
732 break;
733 }
734 }
735 if (patternLen == 0 && stringLen == 0)
736 return 1;
737 return 0;
738 }
739
740 static void redisLog(int level, const char *fmt, ...) {
741 va_list ap;
742 FILE *fp;
743
744 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
745 if (!fp) return;
746
747 va_start(ap, fmt);
748 if (level >= server.verbosity) {
749 char *c = ".-*";
750 char buf[64];
751 time_t now;
752
753 now = time(NULL);
754 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
755 fprintf(fp,"%s %c ",buf,c[level]);
756 vfprintf(fp, fmt, ap);
757 fprintf(fp,"\n");
758 fflush(fp);
759 }
760 va_end(ap);
761
762 if (server.logfile) fclose(fp);
763 }
764
765 /*====================== Hash table type implementation ==================== */
766
767 /* This is an hash table type that uses the SDS dynamic strings libary as
768 * keys and radis objects as values (objects can hold SDS strings,
769 * lists, sets). */
770
771 static void dictVanillaFree(void *privdata, void *val)
772 {
773 DICT_NOTUSED(privdata);
774 zfree(val);
775 }
776
777 static void dictListDestructor(void *privdata, void *val)
778 {
779 DICT_NOTUSED(privdata);
780 listRelease((list*)val);
781 }
782
783 static int sdsDictKeyCompare(void *privdata, const void *key1,
784 const void *key2)
785 {
786 int l1,l2;
787 DICT_NOTUSED(privdata);
788
789 l1 = sdslen((sds)key1);
790 l2 = sdslen((sds)key2);
791 if (l1 != l2) return 0;
792 return memcmp(key1, key2, l1) == 0;
793 }
794
795 static void dictRedisObjectDestructor(void *privdata, void *val)
796 {
797 DICT_NOTUSED(privdata);
798
799 decrRefCount(val);
800 }
801
802 static int dictObjKeyCompare(void *privdata, const void *key1,
803 const void *key2)
804 {
805 const robj *o1 = key1, *o2 = key2;
806 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
807 }
808
809 static unsigned int dictObjHash(const void *key) {
810 const robj *o = key;
811 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
812 }
813
814 static int dictEncObjKeyCompare(void *privdata, const void *key1,
815 const void *key2)
816 {
817 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
818 int cmp;
819
820 o1 = getDecodedObject(o1);
821 o2 = getDecodedObject(o2);
822 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
823 decrRefCount(o1);
824 decrRefCount(o2);
825 return cmp;
826 }
827
828 static unsigned int dictEncObjHash(const void *key) {
829 robj *o = (robj*) key;
830
831 o = getDecodedObject(o);
832 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
833 decrRefCount(o);
834 return hash;
835 }
836
837 static dictType setDictType = {
838 dictEncObjHash, /* hash function */
839 NULL, /* key dup */
840 NULL, /* val dup */
841 dictEncObjKeyCompare, /* key compare */
842 dictRedisObjectDestructor, /* key destructor */
843 NULL /* val destructor */
844 };
845
846 static dictType zsetDictType = {
847 dictEncObjHash, /* hash function */
848 NULL, /* key dup */
849 NULL, /* val dup */
850 dictEncObjKeyCompare, /* key compare */
851 dictRedisObjectDestructor, /* key destructor */
852 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
853 };
854
855 static dictType hashDictType = {
856 dictObjHash, /* hash function */
857 NULL, /* key dup */
858 NULL, /* val dup */
859 dictObjKeyCompare, /* key compare */
860 dictRedisObjectDestructor, /* key destructor */
861 dictRedisObjectDestructor /* val destructor */
862 };
863
864 /* Keylist hash table type has unencoded redis objects as keys and
865 * lists as values. It's used for blocking operations (BLPOP) */
866 static dictType keylistDictType = {
867 dictObjHash, /* hash function */
868 NULL, /* key dup */
869 NULL, /* val dup */
870 dictObjKeyCompare, /* key compare */
871 dictRedisObjectDestructor, /* key destructor */
872 dictListDestructor /* val destructor */
873 };
874
875 /* ========================= Random utility functions ======================= */
876
877 /* Redis generally does not try to recover from out of memory conditions
878 * when allocating objects or strings, it is not clear if it will be possible
879 * to report this condition to the client since the networking layer itself
880 * is based on heap allocation for send buffers, so we simply abort.
881 * At least the code will be simpler to read... */
882 static void oom(const char *msg) {
883 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
884 sleep(1);
885 abort();
886 }
887
888 /* ====================== Redis server networking stuff ===================== */
889 static void closeTimedoutClients(void) {
890 redisClient *c;
891 listNode *ln;
892 time_t now = time(NULL);
893
894 listRewind(server.clients);
895 while ((ln = listYield(server.clients)) != NULL) {
896 c = listNodeValue(ln);
897 if (server.maxidletime &&
898 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
899 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
900 (now - c->lastinteraction > server.maxidletime))
901 {
902 redisLog(REDIS_DEBUG,"Closing idle client");
903 freeClient(c);
904 } else if (c->flags & REDIS_BLOCKED) {
905 if (c->blockingto != 0 && c->blockingto < now) {
906 addReply(c,shared.nullmultibulk);
907 unblockClient(c);
908 }
909 }
910 }
911 }
912
913 static int htNeedsResize(dict *dict) {
914 long long size, used;
915
916 size = dictSlots(dict);
917 used = dictSize(dict);
918 return (size && used && size > DICT_HT_INITIAL_SIZE &&
919 (used*100/size < REDIS_HT_MINFILL));
920 }
921
922 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
923 * we resize the hash table to save memory */
924 static void tryResizeHashTables(void) {
925 int j;
926
927 for (j = 0; j < server.dbnum; j++) {
928 if (htNeedsResize(server.db[j].dict)) {
929 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
930 dictResize(server.db[j].dict);
931 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
932 }
933 if (htNeedsResize(server.db[j].expires))
934 dictResize(server.db[j].expires);
935 }
936 }
937
938 /* A background saving child (BGSAVE) terminated its work. Handle this. */
939 void backgroundSaveDoneHandler(int statloc) {
940 int exitcode = WEXITSTATUS(statloc);
941 int bysignal = WIFSIGNALED(statloc);
942
943 if (!bysignal && exitcode == 0) {
944 redisLog(REDIS_NOTICE,
945 "Background saving terminated with success");
946 server.dirty = 0;
947 server.lastsave = time(NULL);
948 } else if (!bysignal && exitcode != 0) {
949 redisLog(REDIS_WARNING, "Background saving error");
950 } else {
951 redisLog(REDIS_WARNING,
952 "Background saving terminated by signal");
953 rdbRemoveTempFile(server.bgsavechildpid);
954 }
955 server.bgsavechildpid = -1;
956 /* Possibly there are slaves waiting for a BGSAVE in order to be served
957 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
958 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
959 }
960
961 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
962 * Handle this. */
963 void backgroundRewriteDoneHandler(int statloc) {
964 int exitcode = WEXITSTATUS(statloc);
965 int bysignal = WIFSIGNALED(statloc);
966
967 if (!bysignal && exitcode == 0) {
968 int fd;
969 char tmpfile[256];
970
971 redisLog(REDIS_NOTICE,
972 "Background append only file rewriting terminated with success");
973 /* Now it's time to flush the differences accumulated by the parent */
974 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
975 fd = open(tmpfile,O_WRONLY|O_APPEND);
976 if (fd == -1) {
977 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
978 goto cleanup;
979 }
980 /* Flush our data... */
981 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
982 (signed) sdslen(server.bgrewritebuf)) {
983 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
984 close(fd);
985 goto cleanup;
986 }
987 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
988 /* Now our work is to rename the temp file into the stable file. And
989 * switch the file descriptor used by the server for append only. */
990 if (rename(tmpfile,server.appendfilename) == -1) {
991 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
992 close(fd);
993 goto cleanup;
994 }
995 /* Mission completed... almost */
996 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
997 if (server.appendfd != -1) {
998 /* If append only is actually enabled... */
999 close(server.appendfd);
1000 server.appendfd = fd;
1001 fsync(fd);
1002 server.appendseldb = -1; /* Make sure it will issue SELECT */
1003 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
1004 } else {
1005 /* If append only is disabled we just generate a dump in this
1006 * format. Why not? */
1007 close(fd);
1008 }
1009 } else if (!bysignal && exitcode != 0) {
1010 redisLog(REDIS_WARNING, "Background append only file rewriting error");
1011 } else {
1012 redisLog(REDIS_WARNING,
1013 "Background append only file rewriting terminated by signal");
1014 }
1015 cleanup:
1016 sdsfree(server.bgrewritebuf);
1017 server.bgrewritebuf = sdsempty();
1018 aofRemoveTempFile(server.bgrewritechildpid);
1019 server.bgrewritechildpid = -1;
1020 }
1021
1022 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1023 int j, loops = server.cronloops++;
1024 REDIS_NOTUSED(eventLoop);
1025 REDIS_NOTUSED(id);
1026 REDIS_NOTUSED(clientData);
1027
1028 /* Update the global state with the amount of used memory */
1029 server.usedmemory = zmalloc_used_memory();
1030
1031 /* Show some info about non-empty databases */
1032 for (j = 0; j < server.dbnum; j++) {
1033 long long size, used, vkeys;
1034
1035 size = dictSlots(server.db[j].dict);
1036 used = dictSize(server.db[j].dict);
1037 vkeys = dictSize(server.db[j].expires);
1038 if (!(loops % 5) && (used || vkeys)) {
1039 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1040 /* dictPrintStats(server.dict); */
1041 }
1042 }
1043
1044 /* We don't want to resize the hash tables while a bacground saving
1045 * is in progress: the saving child is created using fork() that is
1046 * implemented with a copy-on-write semantic in most modern systems, so
1047 * if we resize the HT while there is the saving child at work actually
1048 * a lot of memory movements in the parent will cause a lot of pages
1049 * copied. */
1050 if (server.bgsavechildpid == -1) tryResizeHashTables();
1051
1052 /* Show information about connected clients */
1053 if (!(loops % 5)) {
1054 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1055 listLength(server.clients)-listLength(server.slaves),
1056 listLength(server.slaves),
1057 server.usedmemory,
1058 dictSize(server.sharingpool));
1059 }
1060
1061 /* Close connections of timedout clients */
1062 if ((server.maxidletime && !(loops % 10)) || server.blockedclients)
1063 closeTimedoutClients();
1064
1065 /* Check if a background saving or AOF rewrite in progress terminated */
1066 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1067 int statloc;
1068 pid_t pid;
1069
1070 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1071 if (pid == server.bgsavechildpid) {
1072 backgroundSaveDoneHandler(statloc);
1073 } else {
1074 backgroundRewriteDoneHandler(statloc);
1075 }
1076 }
1077 } else {
1078 /* If there is not a background saving in progress check if
1079 * we have to save now */
1080 time_t now = time(NULL);
1081 for (j = 0; j < server.saveparamslen; j++) {
1082 struct saveparam *sp = server.saveparams+j;
1083
1084 if (server.dirty >= sp->changes &&
1085 now-server.lastsave > sp->seconds) {
1086 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1087 sp->changes, sp->seconds);
1088 rdbSaveBackground(server.dbfilename);
1089 break;
1090 }
1091 }
1092 }
1093
1094 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1095 * will use few CPU cycles if there are few expiring keys, otherwise
1096 * it will get more aggressive to avoid that too much memory is used by
1097 * keys that can be removed from the keyspace. */
1098 for (j = 0; j < server.dbnum; j++) {
1099 int expired;
1100 redisDb *db = server.db+j;
1101
1102 /* Continue to expire if at the end of the cycle more than 25%
1103 * of the keys were expired. */
1104 do {
1105 int num = dictSize(db->expires);
1106 time_t now = time(NULL);
1107
1108 expired = 0;
1109 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1110 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1111 while (num--) {
1112 dictEntry *de;
1113 time_t t;
1114
1115 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1116 t = (time_t) dictGetEntryVal(de);
1117 if (now > t) {
1118 deleteKey(db,dictGetEntryKey(de));
1119 expired++;
1120 }
1121 }
1122 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1123 }
1124
1125 /* Check if we should connect to a MASTER */
1126 if (server.replstate == REDIS_REPL_CONNECT) {
1127 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1128 if (syncWithMaster() == REDIS_OK) {
1129 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1130 }
1131 }
1132 return 1000;
1133 }
1134
1135 static void createSharedObjects(void) {
1136 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1137 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1138 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1139 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1140 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1141 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1142 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1143 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1144 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1145 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1146 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
1147 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1148 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1149 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1150 "-ERR no such key\r\n"));
1151 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1152 "-ERR syntax error\r\n"));
1153 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1154 "-ERR source and destination objects are the same\r\n"));
1155 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1156 "-ERR index out of range\r\n"));
1157 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1158 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1159 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1160 shared.select0 = createStringObject("select 0\r\n",10);
1161 shared.select1 = createStringObject("select 1\r\n",10);
1162 shared.select2 = createStringObject("select 2\r\n",10);
1163 shared.select3 = createStringObject("select 3\r\n",10);
1164 shared.select4 = createStringObject("select 4\r\n",10);
1165 shared.select5 = createStringObject("select 5\r\n",10);
1166 shared.select6 = createStringObject("select 6\r\n",10);
1167 shared.select7 = createStringObject("select 7\r\n",10);
1168 shared.select8 = createStringObject("select 8\r\n",10);
1169 shared.select9 = createStringObject("select 9\r\n",10);
1170 }
1171
1172 static void appendServerSaveParams(time_t seconds, int changes) {
1173 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1174 server.saveparams[server.saveparamslen].seconds = seconds;
1175 server.saveparams[server.saveparamslen].changes = changes;
1176 server.saveparamslen++;
1177 }
1178
1179 static void resetServerSaveParams() {
1180 zfree(server.saveparams);
1181 server.saveparams = NULL;
1182 server.saveparamslen = 0;
1183 }
1184
1185 static void initServerConfig() {
1186 server.dbnum = REDIS_DEFAULT_DBNUM;
1187 server.port = REDIS_SERVERPORT;
1188 server.verbosity = REDIS_DEBUG;
1189 server.maxidletime = REDIS_MAXIDLETIME;
1190 server.saveparams = NULL;
1191 server.logfile = NULL; /* NULL = log on standard output */
1192 server.bindaddr = NULL;
1193 server.glueoutputbuf = 1;
1194 server.daemonize = 0;
1195 server.appendonly = 0;
1196 server.appendfsync = APPENDFSYNC_ALWAYS;
1197 server.lastfsync = time(NULL);
1198 server.appendfd = -1;
1199 server.appendseldb = -1; /* Make sure the first time will not match */
1200 server.pidfile = "/var/run/redis.pid";
1201 server.dbfilename = "dump.rdb";
1202 server.appendfilename = "appendonly.aof";
1203 server.requirepass = NULL;
1204 server.shareobjects = 0;
1205 server.rdbcompression = 1;
1206 server.sharingpoolsize = 1024;
1207 server.maxclients = 0;
1208 server.blockedclients = 0;
1209 server.maxmemory = 0;
1210 resetServerSaveParams();
1211
1212 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1213 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1214 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1215 /* Replication related */
1216 server.isslave = 0;
1217 server.masterauth = NULL;
1218 server.masterhost = NULL;
1219 server.masterport = 6379;
1220 server.master = NULL;
1221 server.replstate = REDIS_REPL_NONE;
1222
1223 /* Double constants initialization */
1224 R_Zero = 0.0;
1225 R_PosInf = 1.0/R_Zero;
1226 R_NegInf = -1.0/R_Zero;
1227 R_Nan = R_Zero/R_Zero;
1228 }
1229
1230 static void initServer() {
1231 int j;
1232
1233 signal(SIGHUP, SIG_IGN);
1234 signal(SIGPIPE, SIG_IGN);
1235 setupSigSegvAction();
1236
1237 server.clients = listCreate();
1238 server.slaves = listCreate();
1239 server.monitors = listCreate();
1240 server.objfreelist = listCreate();
1241 createSharedObjects();
1242 server.el = aeCreateEventLoop();
1243 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1244 server.sharingpool = dictCreate(&setDictType,NULL);
1245 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1246 if (server.fd == -1) {
1247 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1248 exit(1);
1249 }
1250 for (j = 0; j < server.dbnum; j++) {
1251 server.db[j].dict = dictCreate(&hashDictType,NULL);
1252 server.db[j].expires = dictCreate(&setDictType,NULL);
1253 server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
1254 server.db[j].id = j;
1255 }
1256 server.cronloops = 0;
1257 server.bgsavechildpid = -1;
1258 server.bgrewritechildpid = -1;
1259 server.bgrewritebuf = sdsempty();
1260 server.lastsave = time(NULL);
1261 server.dirty = 0;
1262 server.usedmemory = 0;
1263 server.stat_numcommands = 0;
1264 server.stat_numconnections = 0;
1265 server.stat_starttime = time(NULL);
1266 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1267
1268 if (server.appendonly) {
1269 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1270 if (server.appendfd == -1) {
1271 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1272 strerror(errno));
1273 exit(1);
1274 }
1275 }
1276 }
1277
1278 /* Empty the whole database */
1279 static long long emptyDb() {
1280 int j;
1281 long long removed = 0;
1282
1283 for (j = 0; j < server.dbnum; j++) {
1284 removed += dictSize(server.db[j].dict);
1285 dictEmpty(server.db[j].dict);
1286 dictEmpty(server.db[j].expires);
1287 }
1288 return removed;
1289 }
1290
1291 static int yesnotoi(char *s) {
1292 if (!strcasecmp(s,"yes")) return 1;
1293 else if (!strcasecmp(s,"no")) return 0;
1294 else return -1;
1295 }
1296
1297 /* I agree, this is a very rudimental way to load a configuration...
1298 will improve later if the config gets more complex */
1299 static void loadServerConfig(char *filename) {
1300 FILE *fp;
1301 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1302 int linenum = 0;
1303 sds line = NULL;
1304
1305 if (filename[0] == '-' && filename[1] == '\0')
1306 fp = stdin;
1307 else {
1308 if ((fp = fopen(filename,"r")) == NULL) {
1309 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1310 exit(1);
1311 }
1312 }
1313
1314 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1315 sds *argv;
1316 int argc, j;
1317
1318 linenum++;
1319 line = sdsnew(buf);
1320 line = sdstrim(line," \t\r\n");
1321
1322 /* Skip comments and blank lines*/
1323 if (line[0] == '#' || line[0] == '\0') {
1324 sdsfree(line);
1325 continue;
1326 }
1327
1328 /* Split into arguments */
1329 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1330 sdstolower(argv[0]);
1331
1332 /* Execute config directives */
1333 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1334 server.maxidletime = atoi(argv[1]);
1335 if (server.maxidletime < 0) {
1336 err = "Invalid timeout value"; goto loaderr;
1337 }
1338 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1339 server.port = atoi(argv[1]);
1340 if (server.port < 1 || server.port > 65535) {
1341 err = "Invalid port"; goto loaderr;
1342 }
1343 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1344 server.bindaddr = zstrdup(argv[1]);
1345 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1346 int seconds = atoi(argv[1]);
1347 int changes = atoi(argv[2]);
1348 if (seconds < 1 || changes < 0) {
1349 err = "Invalid save parameters"; goto loaderr;
1350 }
1351 appendServerSaveParams(seconds,changes);
1352 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1353 if (chdir(argv[1]) == -1) {
1354 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1355 argv[1], strerror(errno));
1356 exit(1);
1357 }
1358 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1359 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1360 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1361 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1362 else {
1363 err = "Invalid log level. Must be one of debug, notice, warning";
1364 goto loaderr;
1365 }
1366 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1367 FILE *logfp;
1368
1369 server.logfile = zstrdup(argv[1]);
1370 if (!strcasecmp(server.logfile,"stdout")) {
1371 zfree(server.logfile);
1372 server.logfile = NULL;
1373 }
1374 if (server.logfile) {
1375 /* Test if we are able to open the file. The server will not
1376 * be able to abort just for this problem later... */
1377 logfp = fopen(server.logfile,"a");
1378 if (logfp == NULL) {
1379 err = sdscatprintf(sdsempty(),
1380 "Can't open the log file: %s", strerror(errno));
1381 goto loaderr;
1382 }
1383 fclose(logfp);
1384 }
1385 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1386 server.dbnum = atoi(argv[1]);
1387 if (server.dbnum < 1) {
1388 err = "Invalid number of databases"; goto loaderr;
1389 }
1390 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1391 server.maxclients = atoi(argv[1]);
1392 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1393 server.maxmemory = strtoll(argv[1], NULL, 10);
1394 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1395 server.masterhost = sdsnew(argv[1]);
1396 server.masterport = atoi(argv[2]);
1397 server.replstate = REDIS_REPL_CONNECT;
1398 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1399 server.masterauth = zstrdup(argv[1]);
1400 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1401 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1402 err = "argument must be 'yes' or 'no'"; goto loaderr;
1403 }
1404 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1405 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1406 err = "argument must be 'yes' or 'no'"; goto loaderr;
1407 }
1408 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1409 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1410 err = "argument must be 'yes' or 'no'"; goto loaderr;
1411 }
1412 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1413 server.sharingpoolsize = atoi(argv[1]);
1414 if (server.sharingpoolsize < 1) {
1415 err = "invalid object sharing pool size"; goto loaderr;
1416 }
1417 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1418 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1419 err = "argument must be 'yes' or 'no'"; goto loaderr;
1420 }
1421 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1422 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1423 err = "argument must be 'yes' or 'no'"; goto loaderr;
1424 }
1425 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1426 if (!strcasecmp(argv[1],"no")) {
1427 server.appendfsync = APPENDFSYNC_NO;
1428 } else if (!strcasecmp(argv[1],"always")) {
1429 server.appendfsync = APPENDFSYNC_ALWAYS;
1430 } else if (!strcasecmp(argv[1],"everysec")) {
1431 server.appendfsync = APPENDFSYNC_EVERYSEC;
1432 } else {
1433 err = "argument must be 'no', 'always' or 'everysec'";
1434 goto loaderr;
1435 }
1436 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1437 server.requirepass = zstrdup(argv[1]);
1438 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1439 server.pidfile = zstrdup(argv[1]);
1440 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1441 server.dbfilename = zstrdup(argv[1]);
1442 } else {
1443 err = "Bad directive or wrong number of arguments"; goto loaderr;
1444 }
1445 for (j = 0; j < argc; j++)
1446 sdsfree(argv[j]);
1447 zfree(argv);
1448 sdsfree(line);
1449 }
1450 if (fp != stdin) fclose(fp);
1451 return;
1452
1453 loaderr:
1454 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1455 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1456 fprintf(stderr, ">>> '%s'\n", line);
1457 fprintf(stderr, "%s\n", err);
1458 exit(1);
1459 }
1460
1461 static void freeClientArgv(redisClient *c) {
1462 int j;
1463
1464 for (j = 0; j < c->argc; j++)
1465 decrRefCount(c->argv[j]);
1466 for (j = 0; j < c->mbargc; j++)
1467 decrRefCount(c->mbargv[j]);
1468 c->argc = 0;
1469 c->mbargc = 0;
1470 }
1471
1472 static void freeClient(redisClient *c) {
1473 listNode *ln;
1474
1475 /* Note that if the client we are freeing is blocked into a blocking
1476 * call, we have to set querybuf to NULL *before* to call unblockClient()
1477 * to avoid processInputBuffer() will get called. Also it is important
1478 * to remove the file events after this, because this call adds
1479 * the READABLE event. */
1480 sdsfree(c->querybuf);
1481 c->querybuf = NULL;
1482 if (c->flags & REDIS_BLOCKED)
1483 unblockClient(c);
1484
1485 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1486 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1487 listRelease(c->reply);
1488 freeClientArgv(c);
1489 close(c->fd);
1490 ln = listSearchKey(server.clients,c);
1491 redisAssert(ln != NULL);
1492 listDelNode(server.clients,ln);
1493 if (c->flags & REDIS_SLAVE) {
1494 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1495 close(c->repldbfd);
1496 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1497 ln = listSearchKey(l,c);
1498 redisAssert(ln != NULL);
1499 listDelNode(l,ln);
1500 }
1501 if (c->flags & REDIS_MASTER) {
1502 server.master = NULL;
1503 server.replstate = REDIS_REPL_CONNECT;
1504 }
1505 zfree(c->argv);
1506 zfree(c->mbargv);
1507 freeClientMultiState(c);
1508 zfree(c);
1509 }
1510
1511 #define GLUEREPLY_UP_TO (1024)
1512 static void glueReplyBuffersIfNeeded(redisClient *c) {
1513 int copylen = 0;
1514 char buf[GLUEREPLY_UP_TO];
1515 listNode *ln;
1516 robj *o;
1517
1518 listRewind(c->reply);
1519 while((ln = listYield(c->reply))) {
1520 int objlen;
1521
1522 o = ln->value;
1523 objlen = sdslen(o->ptr);
1524 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1525 memcpy(buf+copylen,o->ptr,objlen);
1526 copylen += objlen;
1527 listDelNode(c->reply,ln);
1528 } else {
1529 if (copylen == 0) return;
1530 break;
1531 }
1532 }
1533 /* Now the output buffer is empty, add the new single element */
1534 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1535 listAddNodeHead(c->reply,o);
1536 }
1537
1538 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1539 redisClient *c = privdata;
1540 int nwritten = 0, totwritten = 0, objlen;
1541 robj *o;
1542 REDIS_NOTUSED(el);
1543 REDIS_NOTUSED(mask);
1544
1545 /* Use writev() if we have enough buffers to send */
1546 if (!server.glueoutputbuf &&
1547 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1548 !(c->flags & REDIS_MASTER))
1549 {
1550 sendReplyToClientWritev(el, fd, privdata, mask);
1551 return;
1552 }
1553
1554 while(listLength(c->reply)) {
1555 if (server.glueoutputbuf && listLength(c->reply) > 1)
1556 glueReplyBuffersIfNeeded(c);
1557
1558 o = listNodeValue(listFirst(c->reply));
1559 objlen = sdslen(o->ptr);
1560
1561 if (objlen == 0) {
1562 listDelNode(c->reply,listFirst(c->reply));
1563 continue;
1564 }
1565
1566 if (c->flags & REDIS_MASTER) {
1567 /* Don't reply to a master */
1568 nwritten = objlen - c->sentlen;
1569 } else {
1570 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1571 if (nwritten <= 0) break;
1572 }
1573 c->sentlen += nwritten;
1574 totwritten += nwritten;
1575 /* If we fully sent the object on head go to the next one */
1576 if (c->sentlen == objlen) {
1577 listDelNode(c->reply,listFirst(c->reply));
1578 c->sentlen = 0;
1579 }
1580 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1581 * bytes, in a single threaded server it's a good idea to serve
1582 * other clients as well, even if a very large request comes from
1583 * super fast link that is always able to accept data (in real world
1584 * scenario think about 'KEYS *' against the loopback interfae) */
1585 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1586 }
1587 if (nwritten == -1) {
1588 if (errno == EAGAIN) {
1589 nwritten = 0;
1590 } else {
1591 redisLog(REDIS_DEBUG,
1592 "Error writing to client: %s", strerror(errno));
1593 freeClient(c);
1594 return;
1595 }
1596 }
1597 if (totwritten > 0) c->lastinteraction = time(NULL);
1598 if (listLength(c->reply) == 0) {
1599 c->sentlen = 0;
1600 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1601 }
1602 }
1603
1604 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1605 {
1606 redisClient *c = privdata;
1607 int nwritten = 0, totwritten = 0, objlen, willwrite;
1608 robj *o;
1609 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1610 int offset, ion = 0;
1611 REDIS_NOTUSED(el);
1612 REDIS_NOTUSED(mask);
1613
1614 listNode *node;
1615 while (listLength(c->reply)) {
1616 offset = c->sentlen;
1617 ion = 0;
1618 willwrite = 0;
1619
1620 /* fill-in the iov[] array */
1621 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1622 o = listNodeValue(node);
1623 objlen = sdslen(o->ptr);
1624
1625 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1626 break;
1627
1628 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1629 break; /* no more iovecs */
1630
1631 iov[ion].iov_base = ((char*)o->ptr) + offset;
1632 iov[ion].iov_len = objlen - offset;
1633 willwrite += objlen - offset;
1634 offset = 0; /* just for the first item */
1635 ion++;
1636 }
1637
1638 if(willwrite == 0)
1639 break;
1640
1641 /* write all collected blocks at once */
1642 if((nwritten = writev(fd, iov, ion)) < 0) {
1643 if (errno != EAGAIN) {
1644 redisLog(REDIS_DEBUG,
1645 "Error writing to client: %s", strerror(errno));
1646 freeClient(c);
1647 return;
1648 }
1649 break;
1650 }
1651
1652 totwritten += nwritten;
1653 offset = c->sentlen;
1654
1655 /* remove written robjs from c->reply */
1656 while (nwritten && listLength(c->reply)) {
1657 o = listNodeValue(listFirst(c->reply));
1658 objlen = sdslen(o->ptr);
1659
1660 if(nwritten >= objlen - offset) {
1661 listDelNode(c->reply, listFirst(c->reply));
1662 nwritten -= objlen - offset;
1663 c->sentlen = 0;
1664 } else {
1665 /* partial write */
1666 c->sentlen += nwritten;
1667 break;
1668 }
1669 offset = 0;
1670 }
1671 }
1672
1673 if (totwritten > 0)
1674 c->lastinteraction = time(NULL);
1675
1676 if (listLength(c->reply) == 0) {
1677 c->sentlen = 0;
1678 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1679 }
1680 }
1681
1682 static struct redisCommand *lookupCommand(char *name) {
1683 int j = 0;
1684 while(cmdTable[j].name != NULL) {
1685 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1686 j++;
1687 }
1688 return NULL;
1689 }
1690
1691 /* resetClient prepare the client to process the next command */
1692 static void resetClient(redisClient *c) {
1693 freeClientArgv(c);
1694 c->bulklen = -1;
1695 c->multibulk = 0;
1696 }
1697
1698 /* Call() is the core of Redis execution of a command */
1699 static void call(redisClient *c, struct redisCommand *cmd) {
1700 long long dirty;
1701
1702 dirty = server.dirty;
1703 cmd->proc(c);
1704 if (server.appendonly && server.dirty-dirty)
1705 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1706 if (server.dirty-dirty && listLength(server.slaves))
1707 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1708 if (listLength(server.monitors))
1709 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1710 server.stat_numcommands++;
1711 }
1712
1713 /* If this function gets called we already read a whole
1714 * command, argments are in the client argv/argc fields.
1715 * processCommand() execute the command or prepare the
1716 * server for a bulk read from the client.
1717 *
1718 * If 1 is returned the client is still alive and valid and
1719 * and other operations can be performed by the caller. Otherwise
1720 * if 0 is returned the client was destroied (i.e. after QUIT). */
1721 static int processCommand(redisClient *c) {
1722 struct redisCommand *cmd;
1723
1724 /* Free some memory if needed (maxmemory setting) */
1725 if (server.maxmemory) freeMemoryIfNeeded();
1726
1727 /* Handle the multi bulk command type. This is an alternative protocol
1728 * supported by Redis in order to receive commands that are composed of
1729 * multiple binary-safe "bulk" arguments. The latency of processing is
1730 * a bit higher but this allows things like multi-sets, so if this
1731 * protocol is used only for MSET and similar commands this is a big win. */
1732 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1733 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1734 if (c->multibulk <= 0) {
1735 resetClient(c);
1736 return 1;
1737 } else {
1738 decrRefCount(c->argv[c->argc-1]);
1739 c->argc--;
1740 return 1;
1741 }
1742 } else if (c->multibulk) {
1743 if (c->bulklen == -1) {
1744 if (((char*)c->argv[0]->ptr)[0] != '$') {
1745 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1746 resetClient(c);
1747 return 1;
1748 } else {
1749 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1750 decrRefCount(c->argv[0]);
1751 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1752 c->argc--;
1753 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1754 resetClient(c);
1755 return 1;
1756 }
1757 c->argc--;
1758 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1759 return 1;
1760 }
1761 } else {
1762 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1763 c->mbargv[c->mbargc] = c->argv[0];
1764 c->mbargc++;
1765 c->argc--;
1766 c->multibulk--;
1767 if (c->multibulk == 0) {
1768 robj **auxargv;
1769 int auxargc;
1770
1771 /* Here we need to swap the multi-bulk argc/argv with the
1772 * normal argc/argv of the client structure. */
1773 auxargv = c->argv;
1774 c->argv = c->mbargv;
1775 c->mbargv = auxargv;
1776
1777 auxargc = c->argc;
1778 c->argc = c->mbargc;
1779 c->mbargc = auxargc;
1780
1781 /* We need to set bulklen to something different than -1
1782 * in order for the code below to process the command without
1783 * to try to read the last argument of a bulk command as
1784 * a special argument. */
1785 c->bulklen = 0;
1786 /* continue below and process the command */
1787 } else {
1788 c->bulklen = -1;
1789 return 1;
1790 }
1791 }
1792 }
1793 /* -- end of multi bulk commands processing -- */
1794
1795 /* The QUIT command is handled as a special case. Normal command
1796 * procs are unable to close the client connection safely */
1797 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1798 freeClient(c);
1799 return 0;
1800 }
1801 cmd = lookupCommand(c->argv[0]->ptr);
1802 if (!cmd) {
1803 addReplySds(c,
1804 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1805 (char*)c->argv[0]->ptr));
1806 resetClient(c);
1807 return 1;
1808 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1809 (c->argc < -cmd->arity)) {
1810 addReplySds(c,
1811 sdscatprintf(sdsempty(),
1812 "-ERR wrong number of arguments for '%s' command\r\n",
1813 cmd->name));
1814 resetClient(c);
1815 return 1;
1816 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1817 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1818 resetClient(c);
1819 return 1;
1820 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1821 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1822
1823 decrRefCount(c->argv[c->argc-1]);
1824 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1825 c->argc--;
1826 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1827 resetClient(c);
1828 return 1;
1829 }
1830 c->argc--;
1831 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1832 /* It is possible that the bulk read is already in the
1833 * buffer. Check this condition and handle it accordingly.
1834 * This is just a fast path, alternative to call processInputBuffer().
1835 * It's a good idea since the code is small and this condition
1836 * happens most of the times. */
1837 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1838 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1839 c->argc++;
1840 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1841 } else {
1842 return 1;
1843 }
1844 }
1845 /* Let's try to share objects on the command arguments vector */
1846 if (server.shareobjects) {
1847 int j;
1848 for(j = 1; j < c->argc; j++)
1849 c->argv[j] = tryObjectSharing(c->argv[j]);
1850 }
1851 /* Let's try to encode the bulk object to save space. */
1852 if (cmd->flags & REDIS_CMD_BULK)
1853 tryObjectEncoding(c->argv[c->argc-1]);
1854
1855 /* Check if the user is authenticated */
1856 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1857 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1858 resetClient(c);
1859 return 1;
1860 }
1861
1862 /* Exec the command */
1863 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1864 queueMultiCommand(c,cmd);
1865 addReply(c,shared.queued);
1866 } else {
1867 call(c,cmd);
1868 }
1869
1870 /* Prepare the client for the next command */
1871 if (c->flags & REDIS_CLOSE) {
1872 freeClient(c);
1873 return 0;
1874 }
1875 resetClient(c);
1876 return 1;
1877 }
1878
1879 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1880 listNode *ln;
1881 int outc = 0, j;
1882 robj **outv;
1883 /* (args*2)+1 is enough room for args, spaces, newlines */
1884 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1885
1886 if (argc <= REDIS_STATIC_ARGS) {
1887 outv = static_outv;
1888 } else {
1889 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1890 }
1891
1892 for (j = 0; j < argc; j++) {
1893 if (j != 0) outv[outc++] = shared.space;
1894 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1895 robj *lenobj;
1896
1897 lenobj = createObject(REDIS_STRING,
1898 sdscatprintf(sdsempty(),"%lu\r\n",
1899 (unsigned long) stringObjectLen(argv[j])));
1900 lenobj->refcount = 0;
1901 outv[outc++] = lenobj;
1902 }
1903 outv[outc++] = argv[j];
1904 }
1905 outv[outc++] = shared.crlf;
1906
1907 /* Increment all the refcounts at start and decrement at end in order to
1908 * be sure to free objects if there is no slave in a replication state
1909 * able to be feed with commands */
1910 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1911 listRewind(slaves);
1912 while((ln = listYield(slaves))) {
1913 redisClient *slave = ln->value;
1914
1915 /* Don't feed slaves that are still waiting for BGSAVE to start */
1916 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1917
1918 /* Feed all the other slaves, MONITORs and so on */
1919 if (slave->slaveseldb != dictid) {
1920 robj *selectcmd;
1921
1922 switch(dictid) {
1923 case 0: selectcmd = shared.select0; break;
1924 case 1: selectcmd = shared.select1; break;
1925 case 2: selectcmd = shared.select2; break;
1926 case 3: selectcmd = shared.select3; break;
1927 case 4: selectcmd = shared.select4; break;
1928 case 5: selectcmd = shared.select5; break;
1929 case 6: selectcmd = shared.select6; break;
1930 case 7: selectcmd = shared.select7; break;
1931 case 8: selectcmd = shared.select8; break;
1932 case 9: selectcmd = shared.select9; break;
1933 default:
1934 selectcmd = createObject(REDIS_STRING,
1935 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1936 selectcmd->refcount = 0;
1937 break;
1938 }
1939 addReply(slave,selectcmd);
1940 slave->slaveseldb = dictid;
1941 }
1942 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1943 }
1944 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1945 if (outv != static_outv) zfree(outv);
1946 }
1947
1948 static void processInputBuffer(redisClient *c) {
1949 again:
1950 /* Before to process the input buffer, make sure the client is not
1951 * waitig for a blocking operation such as BLPOP. Note that the first
1952 * iteration the client is never blocked, otherwise the processInputBuffer
1953 * would not be called at all, but after the execution of the first commands
1954 * in the input buffer the client may be blocked, and the "goto again"
1955 * will try to reiterate. The following line will make it return asap. */
1956 if (c->flags & REDIS_BLOCKED) return;
1957 if (c->bulklen == -1) {
1958 /* Read the first line of the query */
1959 char *p = strchr(c->querybuf,'\n');
1960 size_t querylen;
1961
1962 if (p) {
1963 sds query, *argv;
1964 int argc, j;
1965
1966 query = c->querybuf;
1967 c->querybuf = sdsempty();
1968 querylen = 1+(p-(query));
1969 if (sdslen(query) > querylen) {
1970 /* leave data after the first line of the query in the buffer */
1971 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1972 }
1973 *p = '\0'; /* remove "\n" */
1974 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1975 sdsupdatelen(query);
1976
1977 /* Now we can split the query in arguments */
1978 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1979 sdsfree(query);
1980
1981 if (c->argv) zfree(c->argv);
1982 c->argv = zmalloc(sizeof(robj*)*argc);
1983
1984 for (j = 0; j < argc; j++) {
1985 if (sdslen(argv[j])) {
1986 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1987 c->argc++;
1988 } else {
1989 sdsfree(argv[j]);
1990 }
1991 }
1992 zfree(argv);
1993 if (c->argc) {
1994 /* Execute the command. If the client is still valid
1995 * after processCommand() return and there is something
1996 * on the query buffer try to process the next command. */
1997 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1998 } else {
1999 /* Nothing to process, argc == 0. Just process the query
2000 * buffer if it's not empty or return to the caller */
2001 if (sdslen(c->querybuf)) goto again;
2002 }
2003 return;
2004 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
2005 redisLog(REDIS_DEBUG, "Client protocol error");
2006 freeClient(c);
2007 return;
2008 }
2009 } else {
2010 /* Bulk read handling. Note that if we are at this point
2011 the client already sent a command terminated with a newline,
2012 we are reading the bulk data that is actually the last
2013 argument of the command. */
2014 int qbl = sdslen(c->querybuf);
2015
2016 if (c->bulklen <= qbl) {
2017 /* Copy everything but the final CRLF as final argument */
2018 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
2019 c->argc++;
2020 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
2021 /* Process the command. If the client is still valid after
2022 * the processing and there is more data in the buffer
2023 * try to parse it. */
2024 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2025 return;
2026 }
2027 }
2028 }
2029
2030 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
2031 redisClient *c = (redisClient*) privdata;
2032 char buf[REDIS_IOBUF_LEN];
2033 int nread;
2034 REDIS_NOTUSED(el);
2035 REDIS_NOTUSED(mask);
2036
2037 nread = read(fd, buf, REDIS_IOBUF_LEN);
2038 if (nread == -1) {
2039 if (errno == EAGAIN) {
2040 nread = 0;
2041 } else {
2042 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
2043 freeClient(c);
2044 return;
2045 }
2046 } else if (nread == 0) {
2047 redisLog(REDIS_DEBUG, "Client closed connection");
2048 freeClient(c);
2049 return;
2050 }
2051 if (nread) {
2052 c->querybuf = sdscatlen(c->querybuf, buf, nread);
2053 c->lastinteraction = time(NULL);
2054 } else {
2055 return;
2056 }
2057 processInputBuffer(c);
2058 }
2059
2060 static int selectDb(redisClient *c, int id) {
2061 if (id < 0 || id >= server.dbnum)
2062 return REDIS_ERR;
2063 c->db = &server.db[id];
2064 return REDIS_OK;
2065 }
2066
2067 static void *dupClientReplyValue(void *o) {
2068 incrRefCount((robj*)o);
2069 return 0;
2070 }
2071
2072 static redisClient *createClient(int fd) {
2073 redisClient *c = zmalloc(sizeof(*c));
2074
2075 anetNonBlock(NULL,fd);
2076 anetTcpNoDelay(NULL,fd);
2077 if (!c) return NULL;
2078 selectDb(c,0);
2079 c->fd = fd;
2080 c->querybuf = sdsempty();
2081 c->argc = 0;
2082 c->argv = NULL;
2083 c->bulklen = -1;
2084 c->multibulk = 0;
2085 c->mbargc = 0;
2086 c->mbargv = NULL;
2087 c->sentlen = 0;
2088 c->flags = 0;
2089 c->lastinteraction = time(NULL);
2090 c->authenticated = 0;
2091 c->replstate = REDIS_REPL_NONE;
2092 c->reply = listCreate();
2093 c->blockingkeys = NULL;
2094 c->blockingkeysnum = 0;
2095 listSetFreeMethod(c->reply,decrRefCount);
2096 listSetDupMethod(c->reply,dupClientReplyValue);
2097 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2098 readQueryFromClient, c) == AE_ERR) {
2099 freeClient(c);
2100 return NULL;
2101 }
2102 listAddNodeTail(server.clients,c);
2103 initClientMultiState(c);
2104 return c;
2105 }
2106
2107 static void addReply(redisClient *c, robj *obj) {
2108 if (listLength(c->reply) == 0 &&
2109 (c->replstate == REDIS_REPL_NONE ||
2110 c->replstate == REDIS_REPL_ONLINE) &&
2111 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2112 sendReplyToClient, c) == AE_ERR) return;
2113 listAddNodeTail(c->reply,getDecodedObject(obj));
2114 }
2115
2116 static void addReplySds(redisClient *c, sds s) {
2117 robj *o = createObject(REDIS_STRING,s);
2118 addReply(c,o);
2119 decrRefCount(o);
2120 }
2121
2122 static void addReplyDouble(redisClient *c, double d) {
2123 char buf[128];
2124
2125 snprintf(buf,sizeof(buf),"%.17g",d);
2126 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2127 (unsigned long) strlen(buf),buf));
2128 }
2129
2130 static void addReplyBulkLen(redisClient *c, robj *obj) {
2131 size_t len;
2132
2133 if (obj->encoding == REDIS_ENCODING_RAW) {
2134 len = sdslen(obj->ptr);
2135 } else {
2136 long n = (long)obj->ptr;
2137
2138 /* Compute how many bytes will take this integer as a radix 10 string */
2139 len = 1;
2140 if (n < 0) {
2141 len++;
2142 n = -n;
2143 }
2144 while((n = n/10) != 0) {
2145 len++;
2146 }
2147 }
2148 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2149 }
2150
2151 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2152 int cport, cfd;
2153 char cip[128];
2154 redisClient *c;
2155 REDIS_NOTUSED(el);
2156 REDIS_NOTUSED(mask);
2157 REDIS_NOTUSED(privdata);
2158
2159 cfd = anetAccept(server.neterr, fd, cip, &cport);
2160 if (cfd == AE_ERR) {
2161 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2162 return;
2163 }
2164 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2165 if ((c = createClient(cfd)) == NULL) {
2166 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2167 close(cfd); /* May be already closed, just ingore errors */
2168 return;
2169 }
2170 /* If maxclient directive is set and this is one client more... close the
2171 * connection. Note that we create the client instead to check before
2172 * for this condition, since now the socket is already set in nonblocking
2173 * mode and we can send an error for free using the Kernel I/O */
2174 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2175 char *err = "-ERR max number of clients reached\r\n";
2176
2177 /* That's a best effort error message, don't check write errors */
2178 if (write(c->fd,err,strlen(err)) == -1) {
2179 /* Nothing to do, Just to avoid the warning... */
2180 }
2181 freeClient(c);
2182 return;
2183 }
2184 server.stat_numconnections++;
2185 }
2186
2187 /* ======================= Redis objects implementation ===================== */
2188
2189 static robj *createObject(int type, void *ptr) {
2190 robj *o;
2191
2192 if (listLength(server.objfreelist)) {
2193 listNode *head = listFirst(server.objfreelist);
2194 o = listNodeValue(head);
2195 listDelNode(server.objfreelist,head);
2196 } else {
2197 o = zmalloc(sizeof(*o));
2198 }
2199 o->type = type;
2200 o->encoding = REDIS_ENCODING_RAW;
2201 o->ptr = ptr;
2202 o->refcount = 1;
2203 return o;
2204 }
2205
2206 static robj *createStringObject(char *ptr, size_t len) {
2207 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2208 }
2209
2210 static robj *createListObject(void) {
2211 list *l = listCreate();
2212
2213 listSetFreeMethod(l,decrRefCount);
2214 return createObject(REDIS_LIST,l);
2215 }
2216
2217 static robj *createSetObject(void) {
2218 dict *d = dictCreate(&setDictType,NULL);
2219 return createObject(REDIS_SET,d);
2220 }
2221
2222 static robj *createZsetObject(void) {
2223 zset *zs = zmalloc(sizeof(*zs));
2224
2225 zs->dict = dictCreate(&zsetDictType,NULL);
2226 zs->zsl = zslCreate();
2227 return createObject(REDIS_ZSET,zs);
2228 }
2229
2230 static void freeStringObject(robj *o) {
2231 if (o->encoding == REDIS_ENCODING_RAW) {
2232 sdsfree(o->ptr);
2233 }
2234 }
2235
2236 static void freeListObject(robj *o) {
2237 listRelease((list*) o->ptr);
2238 }
2239
2240 static void freeSetObject(robj *o) {
2241 dictRelease((dict*) o->ptr);
2242 }
2243
2244 static void freeZsetObject(robj *o) {
2245 zset *zs = o->ptr;
2246
2247 dictRelease(zs->dict);
2248 zslFree(zs->zsl);
2249 zfree(zs);
2250 }
2251
2252 static void freeHashObject(robj *o) {
2253 dictRelease((dict*) o->ptr);
2254 }
2255
2256 static void incrRefCount(robj *o) {
2257 o->refcount++;
2258 #ifdef DEBUG_REFCOUNT
2259 if (o->type == REDIS_STRING)
2260 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2261 #endif
2262 }
2263
2264 static void decrRefCount(void *obj) {
2265 robj *o = obj;
2266
2267 #ifdef DEBUG_REFCOUNT
2268 if (o->type == REDIS_STRING)
2269 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2270 #endif
2271 if (--(o->refcount) == 0) {
2272 switch(o->type) {
2273 case REDIS_STRING: freeStringObject(o); break;
2274 case REDIS_LIST: freeListObject(o); break;
2275 case REDIS_SET: freeSetObject(o); break;
2276 case REDIS_ZSET: freeZsetObject(o); break;
2277 case REDIS_HASH: freeHashObject(o); break;
2278 default: redisAssert(0 != 0); break;
2279 }
2280 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2281 !listAddNodeHead(server.objfreelist,o))
2282 zfree(o);
2283 }
2284 }
2285
2286 static robj *lookupKey(redisDb *db, robj *key) {
2287 dictEntry *de = dictFind(db->dict,key);
2288 return de ? dictGetEntryVal(de) : NULL;
2289 }
2290
2291 static robj *lookupKeyRead(redisDb *db, robj *key) {
2292 expireIfNeeded(db,key);
2293 return lookupKey(db,key);
2294 }
2295
2296 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2297 deleteIfVolatile(db,key);
2298 return lookupKey(db,key);
2299 }
2300
2301 static int deleteKey(redisDb *db, robj *key) {
2302 int retval;
2303
2304 /* We need to protect key from destruction: after the first dictDelete()
2305 * it may happen that 'key' is no longer valid if we don't increment
2306 * it's count. This may happen when we get the object reference directly
2307 * from the hash table with dictRandomKey() or dict iterators */
2308 incrRefCount(key);
2309 if (dictSize(db->expires)) dictDelete(db->expires,key);
2310 retval = dictDelete(db->dict,key);
2311 decrRefCount(key);
2312
2313 return retval == DICT_OK;
2314 }
2315
2316 /* Try to share an object against the shared objects pool */
2317 static robj *tryObjectSharing(robj *o) {
2318 struct dictEntry *de;
2319 unsigned long c;
2320
2321 if (o == NULL || server.shareobjects == 0) return o;
2322
2323 redisAssert(o->type == REDIS_STRING);
2324 de = dictFind(server.sharingpool,o);
2325 if (de) {
2326 robj *shared = dictGetEntryKey(de);
2327
2328 c = ((unsigned long) dictGetEntryVal(de))+1;
2329 dictGetEntryVal(de) = (void*) c;
2330 incrRefCount(shared);
2331 decrRefCount(o);
2332 return shared;
2333 } else {
2334 /* Here we are using a stream algorihtm: Every time an object is
2335 * shared we increment its count, everytime there is a miss we
2336 * recrement the counter of a random object. If this object reaches
2337 * zero we remove the object and put the current object instead. */
2338 if (dictSize(server.sharingpool) >=
2339 server.sharingpoolsize) {
2340 de = dictGetRandomKey(server.sharingpool);
2341 redisAssert(de != NULL);
2342 c = ((unsigned long) dictGetEntryVal(de))-1;
2343 dictGetEntryVal(de) = (void*) c;
2344 if (c == 0) {
2345 dictDelete(server.sharingpool,de->key);
2346 }
2347 } else {
2348 c = 0; /* If the pool is empty we want to add this object */
2349 }
2350 if (c == 0) {
2351 int retval;
2352
2353 retval = dictAdd(server.sharingpool,o,(void*)1);
2354 redisAssert(retval == DICT_OK);
2355 incrRefCount(o);
2356 }
2357 return o;
2358 }
2359 }
2360
2361 /* Check if the nul-terminated string 's' can be represented by a long
2362 * (that is, is a number that fits into long without any other space or
2363 * character before or after the digits).
2364 *
2365 * If so, the function returns REDIS_OK and *longval is set to the value
2366 * of the number. Otherwise REDIS_ERR is returned */
2367 static int isStringRepresentableAsLong(sds s, long *longval) {
2368 char buf[32], *endptr;
2369 long value;
2370 int slen;
2371
2372 value = strtol(s, &endptr, 10);
2373 if (endptr[0] != '\0') return REDIS_ERR;
2374 slen = snprintf(buf,32,"%ld",value);
2375
2376 /* If the number converted back into a string is not identical
2377 * then it's not possible to encode the string as integer */
2378 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2379 if (longval) *longval = value;
2380 return REDIS_OK;
2381 }
2382
2383 /* Try to encode a string object in order to save space */
2384 static int tryObjectEncoding(robj *o) {
2385 long value;
2386 sds s = o->ptr;
2387
2388 if (o->encoding != REDIS_ENCODING_RAW)
2389 return REDIS_ERR; /* Already encoded */
2390
2391 /* It's not save to encode shared objects: shared objects can be shared
2392 * everywhere in the "object space" of Redis. Encoded objects can only
2393 * appear as "values" (and not, for instance, as keys) */
2394 if (o->refcount > 1) return REDIS_ERR;
2395
2396 /* Currently we try to encode only strings */
2397 redisAssert(o->type == REDIS_STRING);
2398
2399 /* Check if we can represent this string as a long integer */
2400 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2401
2402 /* Ok, this object can be encoded */
2403 o->encoding = REDIS_ENCODING_INT;
2404 sdsfree(o->ptr);
2405 o->ptr = (void*) value;
2406 return REDIS_OK;
2407 }
2408
2409 /* Get a decoded version of an encoded object (returned as a new object).
2410 * If the object is already raw-encoded just increment the ref count. */
2411 static robj *getDecodedObject(robj *o) {
2412 robj *dec;
2413
2414 if (o->encoding == REDIS_ENCODING_RAW) {
2415 incrRefCount(o);
2416 return o;
2417 }
2418 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2419 char buf[32];
2420
2421 snprintf(buf,32,"%ld",(long)o->ptr);
2422 dec = createStringObject(buf,strlen(buf));
2423 return dec;
2424 } else {
2425 redisAssert(1 != 1);
2426 }
2427 }
2428
2429 /* Compare two string objects via strcmp() or alike.
2430 * Note that the objects may be integer-encoded. In such a case we
2431 * use snprintf() to get a string representation of the numbers on the stack
2432 * and compare the strings, it's much faster than calling getDecodedObject().
2433 *
2434 * Important note: if objects are not integer encoded, but binary-safe strings,
2435 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2436 * binary safe. */
2437 static int compareStringObjects(robj *a, robj *b) {
2438 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2439 char bufa[128], bufb[128], *astr, *bstr;
2440 int bothsds = 1;
2441
2442 if (a == b) return 0;
2443 if (a->encoding != REDIS_ENCODING_RAW) {
2444 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2445 astr = bufa;
2446 bothsds = 0;
2447 } else {
2448 astr = a->ptr;
2449 }
2450 if (b->encoding != REDIS_ENCODING_RAW) {
2451 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2452 bstr = bufb;
2453 bothsds = 0;
2454 } else {
2455 bstr = b->ptr;
2456 }
2457 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2458 }
2459
2460 static size_t stringObjectLen(robj *o) {
2461 redisAssert(o->type == REDIS_STRING);
2462 if (o->encoding == REDIS_ENCODING_RAW) {
2463 return sdslen(o->ptr);
2464 } else {
2465 char buf[32];
2466
2467 return snprintf(buf,32,"%ld",(long)o->ptr);
2468 }
2469 }
2470
2471 /*============================ DB saving/loading ============================ */
2472
2473 static int rdbSaveType(FILE *fp, unsigned char type) {
2474 if (fwrite(&type,1,1,fp) == 0) return -1;
2475 return 0;
2476 }
2477
2478 static int rdbSaveTime(FILE *fp, time_t t) {
2479 int32_t t32 = (int32_t) t;
2480 if (fwrite(&t32,4,1,fp) == 0) return -1;
2481 return 0;
2482 }
2483
2484 /* check rdbLoadLen() comments for more info */
2485 static int rdbSaveLen(FILE *fp, uint32_t len) {
2486 unsigned char buf[2];
2487
2488 if (len < (1<<6)) {
2489 /* Save a 6 bit len */
2490 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2491 if (fwrite(buf,1,1,fp) == 0) return -1;
2492 } else if (len < (1<<14)) {
2493 /* Save a 14 bit len */
2494 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2495 buf[1] = len&0xFF;
2496 if (fwrite(buf,2,1,fp) == 0) return -1;
2497 } else {
2498 /* Save a 32 bit len */
2499 buf[0] = (REDIS_RDB_32BITLEN<<6);
2500 if (fwrite(buf,1,1,fp) == 0) return -1;
2501 len = htonl(len);
2502 if (fwrite(&len,4,1,fp) == 0) return -1;
2503 }
2504 return 0;
2505 }
2506
2507 /* String objects in the form "2391" "-100" without any space and with a
2508 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2509 * encoded as integers to save space */
2510 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2511 long long value;
2512 char *endptr, buf[32];
2513
2514 /* Check if it's possible to encode this value as a number */
2515 value = strtoll(s, &endptr, 10);
2516 if (endptr[0] != '\0') return 0;
2517 snprintf(buf,32,"%lld",value);
2518
2519 /* If the number converted back into a string is not identical
2520 * then it's not possible to encode the string as integer */
2521 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2522
2523 /* Finally check if it fits in our ranges */
2524 if (value >= -(1<<7) && value <= (1<<7)-1) {
2525 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2526 enc[1] = value&0xFF;
2527 return 2;
2528 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2529 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2530 enc[1] = value&0xFF;
2531 enc[2] = (value>>8)&0xFF;
2532 return 3;
2533 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2534 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2535 enc[1] = value&0xFF;
2536 enc[2] = (value>>8)&0xFF;
2537 enc[3] = (value>>16)&0xFF;
2538 enc[4] = (value>>24)&0xFF;
2539 return 5;
2540 } else {
2541 return 0;
2542 }
2543 }
2544
2545 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2546 unsigned int comprlen, outlen;
2547 unsigned char byte;
2548 void *out;
2549
2550 /* We require at least four bytes compression for this to be worth it */
2551 outlen = sdslen(obj->ptr)-4;
2552 if (outlen <= 0) return 0;
2553 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2554 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2555 if (comprlen == 0) {
2556 zfree(out);
2557 return 0;
2558 }
2559 /* Data compressed! Let's save it on disk */
2560 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2561 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2562 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2563 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2564 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2565 zfree(out);
2566 return comprlen;
2567
2568 writeerr:
2569 zfree(out);
2570 return -1;
2571 }
2572
2573 /* Save a string objet as [len][data] on disk. If the object is a string
2574 * representation of an integer value we try to safe it in a special form */
2575 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2576 size_t len;
2577 int enclen;
2578
2579 len = sdslen(obj->ptr);
2580
2581 /* Try integer encoding */
2582 if (len <= 11) {
2583 unsigned char buf[5];
2584 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2585 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2586 return 0;
2587 }
2588 }
2589
2590 /* Try LZF compression - under 20 bytes it's unable to compress even
2591 * aaaaaaaaaaaaaaaaaa so skip it */
2592 if (server.rdbcompression && len > 20) {
2593 int retval;
2594
2595 retval = rdbSaveLzfStringObject(fp,obj);
2596 if (retval == -1) return -1;
2597 if (retval > 0) return 0;
2598 /* retval == 0 means data can't be compressed, save the old way */
2599 }
2600
2601 /* Store verbatim */
2602 if (rdbSaveLen(fp,len) == -1) return -1;
2603 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2604 return 0;
2605 }
2606
2607 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2608 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2609 int retval;
2610
2611 obj = getDecodedObject(obj);
2612 retval = rdbSaveStringObjectRaw(fp,obj);
2613 decrRefCount(obj);
2614 return retval;
2615 }
2616
2617 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2618 * 8 bit integer specifing the length of the representation.
2619 * This 8 bit integer has special values in order to specify the following
2620 * conditions:
2621 * 253: not a number
2622 * 254: + inf
2623 * 255: - inf
2624 */
2625 static int rdbSaveDoubleValue(FILE *fp, double val) {
2626 unsigned char buf[128];
2627 int len;
2628
2629 if (isnan(val)) {
2630 buf[0] = 253;
2631 len = 1;
2632 } else if (!isfinite(val)) {
2633 len = 1;
2634 buf[0] = (val < 0) ? 255 : 254;
2635 } else {
2636 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2637 buf[0] = strlen((char*)buf+1);
2638 len = buf[0]+1;
2639 }
2640 if (fwrite(buf,len,1,fp) == 0) return -1;
2641 return 0;
2642 }
2643
2644 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2645 static int rdbSave(char *filename) {
2646 dictIterator *di = NULL;
2647 dictEntry *de;
2648 FILE *fp;
2649 char tmpfile[256];
2650 int j;
2651 time_t now = time(NULL);
2652
2653 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2654 fp = fopen(tmpfile,"w");
2655 if (!fp) {
2656 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2657 return REDIS_ERR;
2658 }
2659 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2660 for (j = 0; j < server.dbnum; j++) {
2661 redisDb *db = server.db+j;
2662 dict *d = db->dict;
2663 if (dictSize(d) == 0) continue;
2664 di = dictGetIterator(d);
2665 if (!di) {
2666 fclose(fp);
2667 return REDIS_ERR;
2668 }
2669
2670 /* Write the SELECT DB opcode */
2671 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2672 if (rdbSaveLen(fp,j) == -1) goto werr;
2673
2674 /* Iterate this DB writing every entry */
2675 while((de = dictNext(di)) != NULL) {
2676 robj *key = dictGetEntryKey(de);
2677 robj *o = dictGetEntryVal(de);
2678 time_t expiretime = getExpire(db,key);
2679
2680 /* Save the expire time */
2681 if (expiretime != -1) {
2682 /* If this key is already expired skip it */
2683 if (expiretime < now) continue;
2684 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2685 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2686 }
2687 /* Save the key and associated value */
2688 if (rdbSaveType(fp,o->type) == -1) goto werr;
2689 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2690 if (o->type == REDIS_STRING) {
2691 /* Save a string value */
2692 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2693 } else if (o->type == REDIS_LIST) {
2694 /* Save a list value */
2695 list *list = o->ptr;
2696 listNode *ln;
2697
2698 listRewind(list);
2699 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2700 while((ln = listYield(list))) {
2701 robj *eleobj = listNodeValue(ln);
2702
2703 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2704 }
2705 } else if (o->type == REDIS_SET) {
2706 /* Save a set value */
2707 dict *set = o->ptr;
2708 dictIterator *di = dictGetIterator(set);
2709 dictEntry *de;
2710
2711 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2712 while((de = dictNext(di)) != NULL) {
2713 robj *eleobj = dictGetEntryKey(de);
2714
2715 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2716 }
2717 dictReleaseIterator(di);
2718 } else if (o->type == REDIS_ZSET) {
2719 /* Save a set value */
2720 zset *zs = o->ptr;
2721 dictIterator *di = dictGetIterator(zs->dict);
2722 dictEntry *de;
2723
2724 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2725 while((de = dictNext(di)) != NULL) {
2726 robj *eleobj = dictGetEntryKey(de);
2727 double *score = dictGetEntryVal(de);
2728
2729 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2730 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2731 }
2732 dictReleaseIterator(di);
2733 } else {
2734 redisAssert(0 != 0);
2735 }
2736 }
2737 dictReleaseIterator(di);
2738 }
2739 /* EOF opcode */
2740 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2741
2742 /* Make sure data will not remain on the OS's output buffers */
2743 fflush(fp);
2744 fsync(fileno(fp));
2745 fclose(fp);
2746
2747 /* Use RENAME to make sure the DB file is changed atomically only
2748 * if the generate DB file is ok. */
2749 if (rename(tmpfile,filename) == -1) {
2750 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2751 unlink(tmpfile);
2752 return REDIS_ERR;
2753 }
2754 redisLog(REDIS_NOTICE,"DB saved on disk");
2755 server.dirty = 0;
2756 server.lastsave = time(NULL);
2757 return REDIS_OK;
2758
2759 werr:
2760 fclose(fp);
2761 unlink(tmpfile);
2762 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2763 if (di) dictReleaseIterator(di);
2764 return REDIS_ERR;
2765 }
2766
2767 static int rdbSaveBackground(char *filename) {
2768 pid_t childpid;
2769
2770 if (server.bgsavechildpid != -1) return REDIS_ERR;
2771 if ((childpid = fork()) == 0) {
2772 /* Child */
2773 close(server.fd);
2774 if (rdbSave(filename) == REDIS_OK) {
2775 exit(0);
2776 } else {
2777 exit(1);
2778 }
2779 } else {
2780 /* Parent */
2781 if (childpid == -1) {
2782 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2783 strerror(errno));
2784 return REDIS_ERR;
2785 }
2786 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2787 server.bgsavechildpid = childpid;
2788 return REDIS_OK;
2789 }
2790 return REDIS_OK; /* unreached */
2791 }
2792
2793 static void rdbRemoveTempFile(pid_t childpid) {
2794 char tmpfile[256];
2795
2796 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2797 unlink(tmpfile);
2798 }
2799
2800 static int rdbLoadType(FILE *fp) {
2801 unsigned char type;
2802 if (fread(&type,1,1,fp) == 0) return -1;
2803 return type;
2804 }
2805
2806 static time_t rdbLoadTime(FILE *fp) {
2807 int32_t t32;
2808 if (fread(&t32,4,1,fp) == 0) return -1;
2809 return (time_t) t32;
2810 }
2811
2812 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2813 * of this file for a description of how this are stored on disk.
2814 *
2815 * isencoded is set to 1 if the readed length is not actually a length but
2816 * an "encoding type", check the above comments for more info */
2817 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2818 unsigned char buf[2];
2819 uint32_t len;
2820
2821 if (isencoded) *isencoded = 0;
2822 if (rdbver == 0) {
2823 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2824 return ntohl(len);
2825 } else {
2826 int type;
2827
2828 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2829 type = (buf[0]&0xC0)>>6;
2830 if (type == REDIS_RDB_6BITLEN) {
2831 /* Read a 6 bit len */
2832 return buf[0]&0x3F;
2833 } else if (type == REDIS_RDB_ENCVAL) {
2834 /* Read a 6 bit len encoding type */
2835 if (isencoded) *isencoded = 1;
2836 return buf[0]&0x3F;
2837 } else if (type == REDIS_RDB_14BITLEN) {
2838 /* Read a 14 bit len */
2839 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2840 return ((buf[0]&0x3F)<<8)|buf[1];
2841 } else {
2842 /* Read a 32 bit len */
2843 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2844 return ntohl(len);
2845 }
2846 }
2847 }
2848
2849 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2850 unsigned char enc[4];
2851 long long val;
2852
2853 if (enctype == REDIS_RDB_ENC_INT8) {
2854 if (fread(enc,1,1,fp) == 0) return NULL;
2855 val = (signed char)enc[0];
2856 } else if (enctype == REDIS_RDB_ENC_INT16) {
2857 uint16_t v;
2858 if (fread(enc,2,1,fp) == 0) return NULL;
2859 v = enc[0]|(enc[1]<<8);
2860 val = (int16_t)v;
2861 } else if (enctype == REDIS_RDB_ENC_INT32) {
2862 uint32_t v;
2863 if (fread(enc,4,1,fp) == 0) return NULL;
2864 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2865 val = (int32_t)v;
2866 } else {
2867 val = 0; /* anti-warning */
2868 redisAssert(0!=0);
2869 }
2870 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2871 }
2872
2873 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2874 unsigned int len, clen;
2875 unsigned char *c = NULL;
2876 sds val = NULL;
2877
2878 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2879 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2880 if ((c = zmalloc(clen)) == NULL) goto err;
2881 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2882 if (fread(c,clen,1,fp) == 0) goto err;
2883 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2884 zfree(c);
2885 return createObject(REDIS_STRING,val);
2886 err:
2887 zfree(c);
2888 sdsfree(val);
2889 return NULL;
2890 }
2891
2892 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2893 int isencoded;
2894 uint32_t len;
2895 sds val;
2896
2897 len = rdbLoadLen(fp,rdbver,&isencoded);
2898 if (isencoded) {
2899 switch(len) {
2900 case REDIS_RDB_ENC_INT8:
2901 case REDIS_RDB_ENC_INT16:
2902 case REDIS_RDB_ENC_INT32:
2903 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2904 case REDIS_RDB_ENC_LZF:
2905 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2906 default:
2907 redisAssert(0!=0);
2908 }
2909 }
2910
2911 if (len == REDIS_RDB_LENERR) return NULL;
2912 val = sdsnewlen(NULL,len);
2913 if (len && fread(val,len,1,fp) == 0) {
2914 sdsfree(val);
2915 return NULL;
2916 }
2917 return tryObjectSharing(createObject(REDIS_STRING,val));
2918 }
2919
2920 /* For information about double serialization check rdbSaveDoubleValue() */
2921 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2922 char buf[128];
2923 unsigned char len;
2924
2925 if (fread(&len,1,1,fp) == 0) return -1;
2926 switch(len) {
2927 case 255: *val = R_NegInf; return 0;
2928 case 254: *val = R_PosInf; return 0;
2929 case 253: *val = R_Nan; return 0;
2930 default:
2931 if (fread(buf,len,1,fp) == 0) return -1;
2932 buf[len] = '\0';
2933 sscanf(buf, "%lg", val);
2934 return 0;
2935 }
2936 }
2937
2938 static int rdbLoad(char *filename) {
2939 FILE *fp;
2940 robj *keyobj = NULL;
2941 uint32_t dbid;
2942 int type, retval, rdbver;
2943 dict *d = server.db[0].dict;
2944 redisDb *db = server.db+0;
2945 char buf[1024];
2946 time_t expiretime = -1, now = time(NULL);
2947
2948 fp = fopen(filename,"r");
2949 if (!fp) return REDIS_ERR;
2950 if (fread(buf,9,1,fp) == 0) goto eoferr;
2951 buf[9] = '\0';
2952 if (memcmp(buf,"REDIS",5) != 0) {
2953 fclose(fp);
2954 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2955 return REDIS_ERR;
2956 }
2957 rdbver = atoi(buf+5);
2958 if (rdbver > 1) {
2959 fclose(fp);
2960 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2961 return REDIS_ERR;
2962 }
2963 while(1) {
2964 robj *o;
2965
2966 /* Read type. */
2967 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2968 if (type == REDIS_EXPIRETIME) {
2969 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2970 /* We read the time so we need to read the object type again */
2971 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2972 }
2973 if (type == REDIS_EOF) break;
2974 /* Handle SELECT DB opcode as a special case */
2975 if (type == REDIS_SELECTDB) {
2976 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2977 goto eoferr;
2978 if (dbid >= (unsigned)server.dbnum) {
2979 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2980 exit(1);
2981 }
2982 db = server.db+dbid;
2983 d = db->dict;
2984 continue;
2985 }
2986 /* Read key */
2987 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2988
2989 if (type == REDIS_STRING) {
2990 /* Read string value */
2991 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2992 tryObjectEncoding(o);
2993 } else if (type == REDIS_LIST || type == REDIS_SET) {
2994 /* Read list/set value */
2995 uint32_t listlen;
2996
2997 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2998 goto eoferr;
2999 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
3000 /* Load every single element of the list/set */
3001 while(listlen--) {
3002 robj *ele;
3003
3004 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3005 tryObjectEncoding(ele);
3006 if (type == REDIS_LIST) {
3007 listAddNodeTail((list*)o->ptr,ele);
3008 } else {
3009 dictAdd((dict*)o->ptr,ele,NULL);
3010 }
3011 }
3012 } else if (type == REDIS_ZSET) {
3013 /* Read list/set value */
3014 uint32_t zsetlen;
3015 zset *zs;
3016
3017 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
3018 goto eoferr;
3019 o = createZsetObject();
3020 zs = o->ptr;
3021 /* Load every single element of the list/set */
3022 while(zsetlen--) {
3023 robj *ele;
3024 double *score = zmalloc(sizeof(double));
3025
3026 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3027 tryObjectEncoding(ele);
3028 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
3029 dictAdd(zs->dict,ele,score);
3030 zslInsert(zs->zsl,*score,ele);
3031 incrRefCount(ele); /* added to skiplist */
3032 }
3033 } else {
3034 redisAssert(0 != 0);
3035 }
3036 /* Add the new object in the hash table */
3037 retval = dictAdd(d,keyobj,o);
3038 if (retval == DICT_ERR) {
3039 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
3040 exit(1);
3041 }
3042 /* Set the expire time if needed */
3043 if (expiretime != -1) {
3044 setExpire(db,keyobj,expiretime);
3045 /* Delete this key if already expired */
3046 if (expiretime < now) deleteKey(db,keyobj);
3047 expiretime = -1;
3048 }
3049 keyobj = o = NULL;
3050 }
3051 fclose(fp);
3052 return REDIS_OK;
3053
3054 eoferr: /* unexpected end of file is handled here with a fatal exit */
3055 if (keyobj) decrRefCount(keyobj);
3056 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3057 exit(1);
3058 return REDIS_ERR; /* Just to avoid warning */
3059 }
3060
3061 /*================================== Commands =============================== */
3062
3063 static void authCommand(redisClient *c) {
3064 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
3065 c->authenticated = 1;
3066 addReply(c,shared.ok);
3067 } else {
3068 c->authenticated = 0;
3069 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3070 }
3071 }
3072
3073 static void pingCommand(redisClient *c) {
3074 addReply(c,shared.pong);
3075 }
3076
3077 static void echoCommand(redisClient *c) {
3078 addReplyBulkLen(c,c->argv[1]);
3079 addReply(c,c->argv[1]);
3080 addReply(c,shared.crlf);
3081 }
3082
3083 /*=================================== Strings =============================== */
3084
3085 static void setGenericCommand(redisClient *c, int nx) {
3086 int retval;
3087
3088 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3089 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
3090 if (retval == DICT_ERR) {
3091 if (!nx) {
3092 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3093 incrRefCount(c->argv[2]);
3094 } else {
3095 addReply(c,shared.czero);
3096 return;
3097 }
3098 } else {
3099 incrRefCount(c->argv[1]);
3100 incrRefCount(c->argv[2]);
3101 }
3102 server.dirty++;
3103 removeExpire(c->db,c->argv[1]);
3104 addReply(c, nx ? shared.cone : shared.ok);
3105 }
3106
3107 static void setCommand(redisClient *c) {
3108 setGenericCommand(c,0);
3109 }
3110
3111 static void setnxCommand(redisClient *c) {
3112 setGenericCommand(c,1);
3113 }
3114
3115 static int getGenericCommand(redisClient *c) {
3116 robj *o = lookupKeyRead(c->db,c->argv[1]);
3117
3118 if (o == NULL) {
3119 addReply(c,shared.nullbulk);
3120 return REDIS_OK;
3121 } else {
3122 if (o->type != REDIS_STRING) {
3123 addReply(c,shared.wrongtypeerr);
3124 return REDIS_ERR;
3125 } else {
3126 addReplyBulkLen(c,o);
3127 addReply(c,o);
3128 addReply(c,shared.crlf);
3129 return REDIS_OK;
3130 }
3131 }
3132 }
3133
3134 static void getCommand(redisClient *c) {
3135 getGenericCommand(c);
3136 }
3137
3138 static void getsetCommand(redisClient *c) {
3139 if (getGenericCommand(c) == REDIS_ERR) return;
3140 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3141 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3142 } else {
3143 incrRefCount(c->argv[1]);
3144 }
3145 incrRefCount(c->argv[2]);
3146 server.dirty++;
3147 removeExpire(c->db,c->argv[1]);
3148 }
3149
3150 static void mgetCommand(redisClient *c) {
3151 int j;
3152
3153 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3154 for (j = 1; j < c->argc; j++) {
3155 robj *o = lookupKeyRead(c->db,c->argv[j]);
3156 if (o == NULL) {
3157 addReply(c,shared.nullbulk);
3158 } else {
3159 if (o->type != REDIS_STRING) {
3160 addReply(c,shared.nullbulk);
3161 } else {
3162 addReplyBulkLen(c,o);
3163 addReply(c,o);
3164 addReply(c,shared.crlf);
3165 }
3166 }
3167 }
3168 }
3169
3170 static void msetGenericCommand(redisClient *c, int nx) {
3171 int j, busykeys = 0;
3172
3173 if ((c->argc % 2) == 0) {
3174 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3175 return;
3176 }
3177 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3178 * set nothing at all if at least one already key exists. */
3179 if (nx) {
3180 for (j = 1; j < c->argc; j += 2) {
3181 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3182 busykeys++;
3183 }
3184 }
3185 }
3186 if (busykeys) {
3187 addReply(c, shared.czero);
3188 return;
3189 }
3190
3191 for (j = 1; j < c->argc; j += 2) {
3192 int retval;
3193
3194 tryObjectEncoding(c->argv[j+1]);
3195 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3196 if (retval == DICT_ERR) {
3197 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3198 incrRefCount(c->argv[j+1]);
3199 } else {
3200 incrRefCount(c->argv[j]);
3201 incrRefCount(c->argv[j+1]);
3202 }
3203 removeExpire(c->db,c->argv[j]);
3204 }
3205 server.dirty += (c->argc-1)/2;
3206 addReply(c, nx ? shared.cone : shared.ok);
3207 }
3208
3209 static void msetCommand(redisClient *c) {
3210 msetGenericCommand(c,0);
3211 }
3212
3213 static void msetnxCommand(redisClient *c) {
3214 msetGenericCommand(c,1);
3215 }
3216
3217 static void incrDecrCommand(redisClient *c, long long incr) {
3218 long long value;
3219 int retval;
3220 robj *o;
3221
3222 o = lookupKeyWrite(c->db,c->argv[1]);
3223 if (o == NULL) {
3224 value = 0;
3225 } else {
3226 if (o->type != REDIS_STRING) {
3227 value = 0;
3228 } else {
3229 char *eptr;
3230
3231 if (o->encoding == REDIS_ENCODING_RAW)
3232 value = strtoll(o->ptr, &eptr, 10);
3233 else if (o->encoding == REDIS_ENCODING_INT)
3234 value = (long)o->ptr;
3235 else
3236 redisAssert(1 != 1);
3237 }
3238 }
3239
3240 value += incr;
3241 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3242 tryObjectEncoding(o);
3243 retval = dictAdd(c->db->dict,c->argv[1],o);
3244 if (retval == DICT_ERR) {
3245 dictReplace(c->db->dict,c->argv[1],o);
3246 removeExpire(c->db,c->argv[1]);
3247 } else {
3248 incrRefCount(c->argv[1]);
3249 }
3250 server.dirty++;
3251 addReply(c,shared.colon);
3252 addReply(c,o);
3253 addReply(c,shared.crlf);
3254 }
3255
3256 static void incrCommand(redisClient *c) {
3257 incrDecrCommand(c,1);
3258 }
3259
3260 static void decrCommand(redisClient *c) {
3261 incrDecrCommand(c,-1);
3262 }
3263
3264 static void incrbyCommand(redisClient *c) {
3265 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3266 incrDecrCommand(c,incr);
3267 }
3268
3269 static void decrbyCommand(redisClient *c) {
3270 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3271 incrDecrCommand(c,-incr);
3272 }
3273
3274 /* ========================= Type agnostic commands ========================= */
3275
3276 static void delCommand(redisClient *c) {
3277 int deleted = 0, j;
3278
3279 for (j = 1; j < c->argc; j++) {
3280 if (deleteKey(c->db,c->argv[j])) {
3281 server.dirty++;
3282 deleted++;
3283 }
3284 }
3285 switch(deleted) {
3286 case 0:
3287 addReply(c,shared.czero);
3288 break;
3289 case 1:
3290 addReply(c,shared.cone);
3291 break;
3292 default:
3293 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3294 break;
3295 }
3296 }
3297
3298 static void existsCommand(redisClient *c) {
3299 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3300 }
3301
3302 static void selectCommand(redisClient *c) {
3303 int id = atoi(c->argv[1]->ptr);
3304
3305 if (selectDb(c,id) == REDIS_ERR) {
3306 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3307 } else {
3308 addReply(c,shared.ok);
3309 }
3310 }
3311
3312 static void randomkeyCommand(redisClient *c) {
3313 dictEntry *de;
3314
3315 while(1) {
3316 de = dictGetRandomKey(c->db->dict);
3317 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3318 }
3319 if (de == NULL) {
3320 addReply(c,shared.plus);
3321 addReply(c,shared.crlf);
3322 } else {
3323 addReply(c,shared.plus);
3324 addReply(c,dictGetEntryKey(de));
3325 addReply(c,shared.crlf);
3326 }
3327 }
3328
3329 static void keysCommand(redisClient *c) {
3330 dictIterator *di;
3331 dictEntry *de;
3332 sds pattern = c->argv[1]->ptr;
3333 int plen = sdslen(pattern);
3334 unsigned long numkeys = 0, keyslen = 0;
3335 robj *lenobj = createObject(REDIS_STRING,NULL);
3336
3337 di = dictGetIterator(c->db->dict);
3338 addReply(c,lenobj);
3339 decrRefCount(lenobj);
3340 while((de = dictNext(di)) != NULL) {
3341 robj *keyobj = dictGetEntryKey(de);
3342
3343 sds key = keyobj->ptr;
3344 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3345 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3346 if (expireIfNeeded(c->db,keyobj) == 0) {
3347 if (numkeys != 0)
3348 addReply(c,shared.space);
3349 addReply(c,keyobj);
3350 numkeys++;
3351 keyslen += sdslen(key);
3352 }
3353 }
3354 }
3355 dictReleaseIterator(di);
3356 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3357 addReply(c,shared.crlf);
3358 }
3359
3360 static void dbsizeCommand(redisClient *c) {
3361 addReplySds(c,
3362 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3363 }
3364
3365 static void lastsaveCommand(redisClient *c) {
3366 addReplySds(c,
3367 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3368 }
3369
3370 static void typeCommand(redisClient *c) {
3371 robj *o;
3372 char *type;
3373
3374 o = lookupKeyRead(c->db,c->argv[1]);
3375 if (o == NULL) {
3376 type = "+none";
3377 } else {
3378 switch(o->type) {
3379 case REDIS_STRING: type = "+string"; break;
3380 case REDIS_LIST: type = "+list"; break;
3381 case REDIS_SET: type = "+set"; break;
3382 case REDIS_ZSET: type = "+zset"; break;
3383 default: type = "unknown"; break;
3384 }
3385 }
3386 addReplySds(c,sdsnew(type));
3387 addReply(c,shared.crlf);
3388 }
3389
3390 static void saveCommand(redisClient *c) {
3391 if (server.bgsavechildpid != -1) {
3392 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3393 return;
3394 }
3395 if (rdbSave(server.dbfilename) == REDIS_OK) {
3396 addReply(c,shared.ok);
3397 } else {
3398 addReply(c,shared.err);
3399 }
3400 }
3401
3402 static void bgsaveCommand(redisClient *c) {
3403 if (server.bgsavechildpid != -1) {
3404 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3405 return;
3406 }
3407 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3408 char *status = "+Background saving started\r\n";
3409 addReplySds(c,sdsnew(status));
3410 } else {
3411 addReply(c,shared.err);
3412 }
3413 }
3414
3415 static void shutdownCommand(redisClient *c) {
3416 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3417 /* Kill the saving child if there is a background saving in progress.
3418 We want to avoid race conditions, for instance our saving child may
3419 overwrite the synchronous saving did by SHUTDOWN. */
3420 if (server.bgsavechildpid != -1) {
3421 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3422 kill(server.bgsavechildpid,SIGKILL);
3423 rdbRemoveTempFile(server.bgsavechildpid);
3424 }
3425 if (server.appendonly) {
3426 /* Append only file: fsync() the AOF and exit */
3427 fsync(server.appendfd);
3428 exit(0);
3429 } else {
3430 /* Snapshotting. Perform a SYNC SAVE and exit */
3431 if (rdbSave(server.dbfilename) == REDIS_OK) {
3432 if (server.daemonize)
3433 unlink(server.pidfile);
3434 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3435 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3436 exit(0);
3437 } else {
3438 /* Ooops.. error saving! The best we can do is to continue operating.
3439 * Note that if there was a background saving process, in the next
3440 * cron() Redis will be notified that the background saving aborted,
3441 * handling special stuff like slaves pending for synchronization... */
3442 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3443 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3444 }
3445 }
3446 }
3447
3448 static void renameGenericCommand(redisClient *c, int nx) {
3449 robj *o;
3450
3451 /* To use the same key as src and dst is probably an error */
3452 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3453 addReply(c,shared.sameobjecterr);
3454 return;
3455 }
3456
3457 o = lookupKeyWrite(c->db,c->argv[1]);
3458 if (o == NULL) {
3459 addReply(c,shared.nokeyerr);
3460 return;
3461 }
3462 incrRefCount(o);
3463 deleteIfVolatile(c->db,c->argv[2]);
3464 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3465 if (nx) {
3466 decrRefCount(o);
3467 addReply(c,shared.czero);
3468 return;
3469 }
3470 dictReplace(c->db->dict,c->argv[2],o);
3471 } else {
3472 incrRefCount(c->argv[2]);
3473 }
3474 deleteKey(c->db,c->argv[1]);
3475 server.dirty++;
3476 addReply(c,nx ? shared.cone : shared.ok);
3477 }
3478
3479 static void renameCommand(redisClient *c) {
3480 renameGenericCommand(c,0);
3481 }
3482
3483 static void renamenxCommand(redisClient *c) {
3484 renameGenericCommand(c,1);
3485 }
3486
3487 static void moveCommand(redisClient *c) {
3488 robj *o;
3489 redisDb *src, *dst;
3490 int srcid;
3491
3492 /* Obtain source and target DB pointers */
3493 src = c->db;
3494 srcid = c->db->id;
3495 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3496 addReply(c,shared.outofrangeerr);
3497 return;
3498 }
3499 dst = c->db;
3500 selectDb(c,srcid); /* Back to the source DB */
3501
3502 /* If the user is moving using as target the same
3503 * DB as the source DB it is probably an error. */
3504 if (src == dst) {
3505 addReply(c,shared.sameobjecterr);
3506 return;
3507 }
3508
3509 /* Check if the element exists and get a reference */
3510 o = lookupKeyWrite(c->db,c->argv[1]);
3511 if (!o) {
3512 addReply(c,shared.czero);
3513 return;
3514 }
3515
3516 /* Try to add the element to the target DB */
3517 deleteIfVolatile(dst,c->argv[1]);
3518 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3519 addReply(c,shared.czero);
3520 return;
3521 }
3522 incrRefCount(c->argv[1]);
3523 incrRefCount(o);
3524
3525 /* OK! key moved, free the entry in the source DB */
3526 deleteKey(src,c->argv[1]);
3527 server.dirty++;
3528 addReply(c,shared.cone);
3529 }
3530
3531 /* =================================== Lists ================================ */
3532 static void pushGenericCommand(redisClient *c, int where) {
3533 robj *lobj;
3534 list *list;
3535
3536 lobj = lookupKeyWrite(c->db,c->argv[1]);
3537 if (lobj == NULL) {
3538 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3539 addReply(c,shared.ok);
3540 return;
3541 }
3542 lobj = createListObject();
3543 list = lobj->ptr;
3544 if (where == REDIS_HEAD) {
3545 listAddNodeHead(list,c->argv[2]);
3546 } else {
3547 listAddNodeTail(list,c->argv[2]);
3548 }
3549 dictAdd(c->db->dict,c->argv[1],lobj);
3550 incrRefCount(c->argv[1]);
3551 incrRefCount(c->argv[2]);
3552 } else {
3553 if (lobj->type != REDIS_LIST) {
3554 addReply(c,shared.wrongtypeerr);
3555 return;
3556 }
3557 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3558 addReply(c,shared.ok);
3559 return;
3560 }
3561 list = lobj->ptr;
3562 if (where == REDIS_HEAD) {
3563 listAddNodeHead(list,c->argv[2]);
3564 } else {
3565 listAddNodeTail(list,c->argv[2]);
3566 }
3567 incrRefCount(c->argv[2]);
3568 }
3569 server.dirty++;
3570 addReply(c,shared.ok);
3571 }
3572
3573 static void lpushCommand(redisClient *c) {
3574 pushGenericCommand(c,REDIS_HEAD);
3575 }
3576
3577 static void rpushCommand(redisClient *c) {
3578 pushGenericCommand(c,REDIS_TAIL);
3579 }
3580
3581 static void llenCommand(redisClient *c) {
3582 robj *o;
3583 list *l;
3584
3585 o = lookupKeyRead(c->db,c->argv[1]);
3586 if (o == NULL) {
3587 addReply(c,shared.czero);
3588 return;
3589 } else {
3590 if (o->type != REDIS_LIST) {
3591 addReply(c,shared.wrongtypeerr);
3592 } else {
3593 l = o->ptr;
3594 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3595 }
3596 }
3597 }
3598
3599 static void lindexCommand(redisClient *c) {
3600 robj *o;
3601 int index = atoi(c->argv[2]->ptr);
3602
3603 o = lookupKeyRead(c->db,c->argv[1]);
3604 if (o == NULL) {
3605 addReply(c,shared.nullbulk);
3606 } else {
3607 if (o->type != REDIS_LIST) {
3608 addReply(c,shared.wrongtypeerr);
3609 } else {
3610 list *list = o->ptr;
3611 listNode *ln;
3612
3613 ln = listIndex(list, index);
3614 if (ln == NULL) {
3615 addReply(c,shared.nullbulk);
3616 } else {
3617 robj *ele = listNodeValue(ln);
3618 addReplyBulkLen(c,ele);
3619 addReply(c,ele);
3620 addReply(c,shared.crlf);
3621 }
3622 }
3623 }
3624 }
3625
3626 static void lsetCommand(redisClient *c) {
3627 robj *o;
3628 int index = atoi(c->argv[2]->ptr);
3629
3630 o = lookupKeyWrite(c->db,c->argv[1]);
3631 if (o == NULL) {
3632 addReply(c,shared.nokeyerr);
3633 } else {
3634 if (o->type != REDIS_LIST) {
3635 addReply(c,shared.wrongtypeerr);
3636 } else {
3637 list *list = o->ptr;
3638 listNode *ln;
3639
3640 ln = listIndex(list, index);
3641 if (ln == NULL) {
3642 addReply(c,shared.outofrangeerr);
3643 } else {
3644 robj *ele = listNodeValue(ln);
3645
3646 decrRefCount(ele);
3647 listNodeValue(ln) = c->argv[3];
3648 incrRefCount(c->argv[3]);
3649 addReply(c,shared.ok);
3650 server.dirty++;
3651 }
3652 }
3653 }
3654 }
3655
3656 static void popGenericCommand(redisClient *c, int where) {
3657 robj *o;
3658
3659 o = lookupKeyWrite(c->db,c->argv[1]);
3660 if (o == NULL) {
3661 addReply(c,shared.nullbulk);
3662 } else {
3663 if (o->type != REDIS_LIST) {
3664 addReply(c,shared.wrongtypeerr);
3665 } else {
3666 list *list = o->ptr;
3667 listNode *ln;
3668
3669 if (where == REDIS_HEAD)
3670 ln = listFirst(list);
3671 else
3672 ln = listLast(list);
3673
3674 if (ln == NULL) {
3675 addReply(c,shared.nullbulk);
3676 } else {
3677 robj *ele = listNodeValue(ln);
3678 addReplyBulkLen(c,ele);
3679 addReply(c,ele);
3680 addReply(c,shared.crlf);
3681 listDelNode(list,ln);
3682 server.dirty++;
3683 }
3684 }
3685 }
3686 }
3687
3688 static void lpopCommand(redisClient *c) {
3689 popGenericCommand(c,REDIS_HEAD);
3690 }
3691
3692 static void rpopCommand(redisClient *c) {
3693 popGenericCommand(c,REDIS_TAIL);
3694 }
3695
3696 static void lrangeCommand(redisClient *c) {
3697 robj *o;
3698 int start = atoi(c->argv[2]->ptr);
3699 int end = atoi(c->argv[3]->ptr);
3700
3701 o = lookupKeyRead(c->db,c->argv[1]);
3702 if (o == NULL) {
3703 addReply(c,shared.nullmultibulk);
3704 } else {
3705 if (o->type != REDIS_LIST) {
3706 addReply(c,shared.wrongtypeerr);
3707 } else {
3708 list *list = o->ptr;
3709 listNode *ln;
3710 int llen = listLength(list);
3711 int rangelen, j;
3712 robj *ele;
3713
3714 /* convert negative indexes */
3715 if (start < 0) start = llen+start;
3716 if (end < 0) end = llen+end;
3717 if (start < 0) start = 0;
3718 if (end < 0) end = 0;
3719
3720 /* indexes sanity checks */
3721 if (start > end || start >= llen) {
3722 /* Out of range start or start > end result in empty list */
3723 addReply(c,shared.emptymultibulk);
3724 return;
3725 }
3726 if (end >= llen) end = llen-1;
3727 rangelen = (end-start)+1;
3728
3729 /* Return the result in form of a multi-bulk reply */
3730 ln = listIndex(list, start);
3731 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3732 for (j = 0; j < rangelen; j++) {
3733 ele = listNodeValue(ln);
3734 addReplyBulkLen(c,ele);
3735 addReply(c,ele);
3736 addReply(c,shared.crlf);
3737 ln = ln->next;
3738 }
3739 }
3740 }
3741 }
3742
3743 static void ltrimCommand(redisClient *c) {
3744 robj *o;
3745 int start = atoi(c->argv[2]->ptr);
3746 int end = atoi(c->argv[3]->ptr);
3747
3748 o = lookupKeyWrite(c->db,c->argv[1]);
3749 if (o == NULL) {
3750 addReply(c,shared.ok);
3751 } else {
3752 if (o->type != REDIS_LIST) {
3753 addReply(c,shared.wrongtypeerr);
3754 } else {
3755 list *list = o->ptr;
3756 listNode *ln;
3757 int llen = listLength(list);
3758 int j, ltrim, rtrim;
3759
3760 /* convert negative indexes */
3761 if (start < 0) start = llen+start;
3762 if (end < 0) end = llen+end;
3763 if (start < 0) start = 0;
3764 if (end < 0) end = 0;
3765
3766 /* indexes sanity checks */
3767 if (start > end || start >= llen) {
3768 /* Out of range start or start > end result in empty list */
3769 ltrim = llen;
3770 rtrim = 0;
3771 } else {
3772 if (end >= llen) end = llen-1;
3773 ltrim = start;
3774 rtrim = llen-end-1;
3775 }
3776
3777 /* Remove list elements to perform the trim */
3778 for (j = 0; j < ltrim; j++) {
3779 ln = listFirst(list);
3780 listDelNode(list,ln);
3781 }
3782 for (j = 0; j < rtrim; j++) {
3783 ln = listLast(list);
3784 listDelNode(list,ln);
3785 }
3786 server.dirty++;
3787 addReply(c,shared.ok);
3788 }
3789 }
3790 }
3791
3792 static void lremCommand(redisClient *c) {
3793 robj *o;
3794
3795 o = lookupKeyWrite(c->db,c->argv[1]);
3796 if (o == NULL) {
3797 addReply(c,shared.czero);
3798 } else {
3799 if (o->type != REDIS_LIST) {
3800 addReply(c,shared.wrongtypeerr);
3801 } else {
3802 list *list = o->ptr;
3803 listNode *ln, *next;
3804 int toremove = atoi(c->argv[2]->ptr);
3805 int removed = 0;
3806 int fromtail = 0;
3807
3808 if (toremove < 0) {
3809 toremove = -toremove;
3810 fromtail = 1;
3811 }
3812 ln = fromtail ? list->tail : list->head;
3813 while (ln) {
3814 robj *ele = listNodeValue(ln);
3815
3816 next = fromtail ? ln->prev : ln->next;
3817 if (compareStringObjects(ele,c->argv[3]) == 0) {
3818 listDelNode(list,ln);
3819 server.dirty++;
3820 removed++;
3821 if (toremove && removed == toremove) break;
3822 }
3823 ln = next;
3824 }
3825 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3826 }
3827 }
3828 }
3829
3830 /* This is the semantic of this command:
3831 * RPOPLPUSH srclist dstlist:
3832 * IF LLEN(srclist) > 0
3833 * element = RPOP srclist
3834 * LPUSH dstlist element
3835 * RETURN element
3836 * ELSE
3837 * RETURN nil
3838 * END
3839 * END
3840 *
3841 * The idea is to be able to get an element from a list in a reliable way
3842 * since the element is not just returned but pushed against another list
3843 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3844 */
3845 static void rpoplpushcommand(redisClient *c) {
3846 robj *sobj;
3847
3848 sobj = lookupKeyWrite(c->db,c->argv[1]);
3849 if (sobj == NULL) {
3850 addReply(c,shared.nullbulk);
3851 } else {
3852 if (sobj->type != REDIS_LIST) {
3853 addReply(c,shared.wrongtypeerr);
3854 } else {
3855 list *srclist = sobj->ptr;
3856 listNode *ln = listLast(srclist);
3857
3858 if (ln == NULL) {
3859 addReply(c,shared.nullbulk);
3860 } else {
3861 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3862 robj *ele = listNodeValue(ln);
3863 list *dstlist;
3864
3865 if (dobj && dobj->type != REDIS_LIST) {
3866 addReply(c,shared.wrongtypeerr);
3867 return;
3868 }
3869
3870 /* Add the element to the target list (unless it's directly
3871 * passed to some BLPOP-ing client */
3872 if (!handleClientsWaitingListPush(c,c->argv[2],ele)) {
3873 if (dobj == NULL) {
3874 /* Create the list if the key does not exist */
3875 dobj = createListObject();
3876 dictAdd(c->db->dict,c->argv[2],dobj);
3877 incrRefCount(c->argv[2]);
3878 }
3879 dstlist = dobj->ptr;
3880 listAddNodeHead(dstlist,ele);
3881 incrRefCount(ele);
3882 }
3883
3884 /* Send the element to the client as reply as well */
3885 addReplyBulkLen(c,ele);
3886 addReply(c,ele);
3887 addReply(c,shared.crlf);
3888
3889 /* Finally remove the element from the source list */
3890 listDelNode(srclist,ln);
3891 server.dirty++;
3892 }
3893 }
3894 }
3895 }
3896
3897
3898 /* ==================================== Sets ================================ */
3899
3900 static void saddCommand(redisClient *c) {
3901 robj *set;
3902
3903 set = lookupKeyWrite(c->db,c->argv[1]);
3904 if (set == NULL) {
3905 set = createSetObject();
3906 dictAdd(c->db->dict,c->argv[1],set);
3907 incrRefCount(c->argv[1]);
3908 } else {
3909 if (set->type != REDIS_SET) {
3910 addReply(c,shared.wrongtypeerr);
3911 return;
3912 }
3913 }
3914 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3915 incrRefCount(c->argv[2]);
3916 server.dirty++;
3917 addReply(c,shared.cone);
3918 } else {
3919 addReply(c,shared.czero);
3920 }
3921 }
3922
3923 static void sremCommand(redisClient *c) {
3924 robj *set;
3925
3926 set = lookupKeyWrite(c->db,c->argv[1]);
3927 if (set == NULL) {
3928 addReply(c,shared.czero);
3929 } else {
3930 if (set->type != REDIS_SET) {
3931 addReply(c,shared.wrongtypeerr);
3932 return;
3933 }
3934 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3935 server.dirty++;
3936 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3937 addReply(c,shared.cone);
3938 } else {
3939 addReply(c,shared.czero);
3940 }
3941 }
3942 }
3943
3944 static void smoveCommand(redisClient *c) {
3945 robj *srcset, *dstset;
3946
3947 srcset = lookupKeyWrite(c->db,c->argv[1]);
3948 dstset = lookupKeyWrite(c->db,c->argv[2]);
3949
3950 /* If the source key does not exist return 0, if it's of the wrong type
3951 * raise an error */
3952 if (srcset == NULL || srcset->type != REDIS_SET) {
3953 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3954 return;
3955 }
3956 /* Error if the destination key is not a set as well */
3957 if (dstset && dstset->type != REDIS_SET) {
3958 addReply(c,shared.wrongtypeerr);
3959 return;
3960 }
3961 /* Remove the element from the source set */
3962 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3963 /* Key not found in the src set! return zero */
3964 addReply(c,shared.czero);
3965 return;
3966 }
3967 server.dirty++;
3968 /* Add the element to the destination set */
3969 if (!dstset) {
3970 dstset = createSetObject();
3971 dictAdd(c->db->dict,c->argv[2],dstset);
3972 incrRefCount(c->argv[2]);
3973 }
3974 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3975 incrRefCount(c->argv[3]);
3976 addReply(c,shared.cone);
3977 }
3978
3979 static void sismemberCommand(redisClient *c) {
3980 robj *set;
3981
3982 set = lookupKeyRead(c->db,c->argv[1]);
3983 if (set == NULL) {
3984 addReply(c,shared.czero);
3985 } else {
3986 if (set->type != REDIS_SET) {
3987 addReply(c,shared.wrongtypeerr);
3988 return;
3989 }
3990 if (dictFind(set->ptr,c->argv[2]))
3991 addReply(c,shared.cone);
3992 else
3993 addReply(c,shared.czero);
3994 }
3995 }
3996
3997 static void scardCommand(redisClient *c) {
3998 robj *o;
3999 dict *s;
4000
4001 o = lookupKeyRead(c->db,c->argv[1]);
4002 if (o == NULL) {
4003 addReply(c,shared.czero);
4004 return;
4005 } else {
4006 if (o->type != REDIS_SET) {
4007 addReply(c,shared.wrongtypeerr);
4008 } else {
4009 s = o->ptr;
4010 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4011 dictSize(s)));
4012 }
4013 }
4014 }
4015
4016 static void spopCommand(redisClient *c) {
4017 robj *set;
4018 dictEntry *de;
4019
4020 set = lookupKeyWrite(c->db,c->argv[1]);
4021 if (set == NULL) {
4022 addReply(c,shared.nullbulk);
4023 } else {
4024 if (set->type != REDIS_SET) {
4025 addReply(c,shared.wrongtypeerr);
4026 return;
4027 }
4028 de = dictGetRandomKey(set->ptr);
4029 if (de == NULL) {
4030 addReply(c,shared.nullbulk);
4031 } else {
4032 robj *ele = dictGetEntryKey(de);
4033
4034 addReplyBulkLen(c,ele);
4035 addReply(c,ele);
4036 addReply(c,shared.crlf);
4037 dictDelete(set->ptr,ele);
4038 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4039 server.dirty++;
4040 }
4041 }
4042 }
4043
4044 static void srandmemberCommand(redisClient *c) {
4045 robj *set;
4046 dictEntry *de;
4047
4048 set = lookupKeyRead(c->db,c->argv[1]);
4049 if (set == NULL) {
4050 addReply(c,shared.nullbulk);
4051 } else {
4052 if (set->type != REDIS_SET) {
4053 addReply(c,shared.wrongtypeerr);
4054 return;
4055 }
4056 de = dictGetRandomKey(set->ptr);
4057 if (de == NULL) {
4058 addReply(c,shared.nullbulk);
4059 } else {
4060 robj *ele = dictGetEntryKey(de);
4061
4062 addReplyBulkLen(c,ele);
4063 addReply(c,ele);
4064 addReply(c,shared.crlf);
4065 }
4066 }
4067 }
4068
4069 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4070 dict **d1 = (void*) s1, **d2 = (void*) s2;
4071
4072 return dictSize(*d1)-dictSize(*d2);
4073 }
4074
4075 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
4076 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4077 dictIterator *di;
4078 dictEntry *de;
4079 robj *lenobj = NULL, *dstset = NULL;
4080 unsigned long j, cardinality = 0;
4081
4082 for (j = 0; j < setsnum; j++) {
4083 robj *setobj;
4084
4085 setobj = dstkey ?
4086 lookupKeyWrite(c->db,setskeys[j]) :
4087 lookupKeyRead(c->db,setskeys[j]);
4088 if (!setobj) {
4089 zfree(dv);
4090 if (dstkey) {
4091 if (deleteKey(c->db,dstkey))
4092 server.dirty++;
4093 addReply(c,shared.czero);
4094 } else {
4095 addReply(c,shared.nullmultibulk);
4096 }
4097 return;
4098 }
4099 if (setobj->type != REDIS_SET) {
4100 zfree(dv);
4101 addReply(c,shared.wrongtypeerr);
4102 return;
4103 }
4104 dv[j] = setobj->ptr;
4105 }
4106 /* Sort sets from the smallest to largest, this will improve our
4107 * algorithm's performace */
4108 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4109
4110 /* The first thing we should output is the total number of elements...
4111 * since this is a multi-bulk write, but at this stage we don't know
4112 * the intersection set size, so we use a trick, append an empty object
4113 * to the output list and save the pointer to later modify it with the
4114 * right length */
4115 if (!dstkey) {
4116 lenobj = createObject(REDIS_STRING,NULL);
4117 addReply(c,lenobj);
4118 decrRefCount(lenobj);
4119 } else {
4120 /* If we have a target key where to store the resulting set
4121 * create this key with an empty set inside */
4122 dstset = createSetObject();
4123 }
4124
4125 /* Iterate all the elements of the first (smallest) set, and test
4126 * the element against all the other sets, if at least one set does
4127 * not include the element it is discarded */
4128 di = dictGetIterator(dv[0]);
4129
4130 while((de = dictNext(di)) != NULL) {
4131 robj *ele;
4132
4133 for (j = 1; j < setsnum; j++)
4134 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4135 if (j != setsnum)
4136 continue; /* at least one set does not contain the member */
4137 ele = dictGetEntryKey(de);
4138 if (!dstkey) {
4139 addReplyBulkLen(c,ele);
4140 addReply(c,ele);
4141 addReply(c,shared.crlf);
4142 cardinality++;
4143 } else {
4144 dictAdd(dstset->ptr,ele,NULL);
4145 incrRefCount(ele);
4146 }
4147 }
4148 dictReleaseIterator(di);
4149
4150 if (dstkey) {
4151 /* Store the resulting set into the target */
4152 deleteKey(c->db,dstkey);
4153 dictAdd(c->db->dict,dstkey,dstset);
4154 incrRefCount(dstkey);
4155 }
4156
4157 if (!dstkey) {
4158 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4159 } else {
4160 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4161 dictSize((dict*)dstset->ptr)));
4162 server.dirty++;
4163 }
4164 zfree(dv);
4165 }
4166
4167 static void sinterCommand(redisClient *c) {
4168 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4169 }
4170
4171 static void sinterstoreCommand(redisClient *c) {
4172 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4173 }
4174
4175 #define REDIS_OP_UNION 0
4176 #define REDIS_OP_DIFF 1
4177
4178 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4179 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4180 dictIterator *di;
4181 dictEntry *de;
4182 robj *dstset = NULL;
4183 int j, cardinality = 0;
4184
4185 for (j = 0; j < setsnum; j++) {
4186 robj *setobj;
4187
4188 setobj = dstkey ?
4189 lookupKeyWrite(c->db,setskeys[j]) :
4190 lookupKeyRead(c->db,setskeys[j]);
4191 if (!setobj) {
4192 dv[j] = NULL;
4193 continue;
4194 }
4195 if (setobj->type != REDIS_SET) {
4196 zfree(dv);
4197 addReply(c,shared.wrongtypeerr);
4198 return;
4199 }
4200 dv[j] = setobj->ptr;
4201 }
4202
4203 /* We need a temp set object to store our union. If the dstkey
4204 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4205 * this set object will be the resulting object to set into the target key*/
4206 dstset = createSetObject();
4207
4208 /* Iterate all the elements of all the sets, add every element a single
4209 * time to the result set */
4210 for (j = 0; j < setsnum; j++) {
4211 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4212 if (!dv[j]) continue; /* non existing keys are like empty sets */
4213
4214 di = dictGetIterator(dv[j]);
4215
4216 while((de = dictNext(di)) != NULL) {
4217 robj *ele;
4218
4219 /* dictAdd will not add the same element multiple times */
4220 ele = dictGetEntryKey(de);
4221 if (op == REDIS_OP_UNION || j == 0) {
4222 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4223 incrRefCount(ele);
4224 cardinality++;
4225 }
4226 } else if (op == REDIS_OP_DIFF) {
4227 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4228 cardinality--;
4229 }
4230 }
4231 }
4232 dictReleaseIterator(di);
4233
4234 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4235 }
4236
4237 /* Output the content of the resulting set, if not in STORE mode */
4238 if (!dstkey) {
4239 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4240 di = dictGetIterator(dstset->ptr);
4241 while((de = dictNext(di)) != NULL) {
4242 robj *ele;
4243
4244 ele = dictGetEntryKey(de);
4245 addReplyBulkLen(c,ele);
4246 addReply(c,ele);
4247 addReply(c,shared.crlf);
4248 }
4249 dictReleaseIterator(di);
4250 } else {
4251 /* If we have a target key where to store the resulting set
4252 * create this key with the result set inside */
4253 deleteKey(c->db,dstkey);
4254 dictAdd(c->db->dict,dstkey,dstset);
4255 incrRefCount(dstkey);
4256 }
4257
4258 /* Cleanup */
4259 if (!dstkey) {
4260 decrRefCount(dstset);
4261 } else {
4262 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4263 dictSize((dict*)dstset->ptr)));
4264 server.dirty++;
4265 }
4266 zfree(dv);
4267 }
4268
4269 static void sunionCommand(redisClient *c) {
4270 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4271 }
4272
4273 static void sunionstoreCommand(redisClient *c) {
4274 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4275 }
4276
4277 static void sdiffCommand(redisClient *c) {
4278 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4279 }
4280
4281 static void sdiffstoreCommand(redisClient *c) {
4282 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4283 }
4284
4285 /* ==================================== ZSets =============================== */
4286
4287 /* ZSETs are ordered sets using two data structures to hold the same elements
4288 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4289 * data structure.
4290 *
4291 * The elements are added to an hash table mapping Redis objects to scores.
4292 * At the same time the elements are added to a skip list mapping scores
4293 * to Redis objects (so objects are sorted by scores in this "view"). */
4294
4295 /* This skiplist implementation is almost a C translation of the original
4296 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4297 * Alternative to Balanced Trees", modified in three ways:
4298 * a) this implementation allows for repeated values.
4299 * b) the comparison is not just by key (our 'score') but by satellite data.
4300 * c) there is a back pointer, so it's a doubly linked list with the back
4301 * pointers being only at "level 1". This allows to traverse the list
4302 * from tail to head, useful for ZREVRANGE. */
4303
4304 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4305 zskiplistNode *zn = zmalloc(sizeof(*zn));
4306
4307 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4308 zn->score = score;
4309 zn->obj = obj;
4310 return zn;
4311 }
4312
4313 static zskiplist *zslCreate(void) {
4314 int j;
4315 zskiplist *zsl;
4316
4317 zsl = zmalloc(sizeof(*zsl));
4318 zsl->level = 1;
4319 zsl->length = 0;
4320 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4321 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4322 zsl->header->forward[j] = NULL;
4323 zsl->header->backward = NULL;
4324 zsl->tail = NULL;
4325 return zsl;
4326 }
4327
4328 static void zslFreeNode(zskiplistNode *node) {
4329 decrRefCount(node->obj);
4330 zfree(node->forward);
4331 zfree(node);
4332 }
4333
4334 static void zslFree(zskiplist *zsl) {
4335 zskiplistNode *node = zsl->header->forward[0], *next;
4336
4337 zfree(zsl->header->forward);
4338 zfree(zsl->header);
4339 while(node) {
4340 next = node->forward[0];
4341 zslFreeNode(node);
4342 node = next;
4343 }
4344 zfree(zsl);
4345 }
4346
4347 static int zslRandomLevel(void) {
4348 int level = 1;
4349 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4350 level += 1;
4351 return level;
4352 }
4353
4354 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4355 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4356 int i, level;
4357
4358 x = zsl->header;
4359 for (i = zsl->level-1; i >= 0; i--) {
4360 while (x->forward[i] &&
4361 (x->forward[i]->score < score ||
4362 (x->forward[i]->score == score &&
4363 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4364 x = x->forward[i];
4365 update[i] = x;
4366 }
4367 /* we assume the key is not already inside, since we allow duplicated
4368 * scores, and the re-insertion of score and redis object should never
4369 * happpen since the caller of zslInsert() should test in the hash table
4370 * if the element is already inside or not. */
4371 level = zslRandomLevel();
4372 if (level > zsl->level) {
4373 for (i = zsl->level; i < level; i++)
4374 update[i] = zsl->header;
4375 zsl->level = level;
4376 }
4377 x = zslCreateNode(level,score,obj);
4378 for (i = 0; i < level; i++) {
4379 x->forward[i] = update[i]->forward[i];
4380 update[i]->forward[i] = x;
4381 }
4382 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4383 if (x->forward[0])
4384 x->forward[0]->backward = x;
4385 else
4386 zsl->tail = x;
4387 zsl->length++;
4388 }
4389
4390 /* Delete an element with matching score/object from the skiplist. */
4391 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4392 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4393 int i;
4394
4395 x = zsl->header;
4396 for (i = zsl->level-1; i >= 0; i--) {
4397 while (x->forward[i] &&
4398 (x->forward[i]->score < score ||
4399 (x->forward[i]->score == score &&
4400 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4401 x = x->forward[i];
4402 update[i] = x;
4403 }
4404 /* We may have multiple elements with the same score, what we need
4405 * is to find the element with both the right score and object. */
4406 x = x->forward[0];
4407 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4408 for (i = 0; i < zsl->level; i++) {
4409 if (update[i]->forward[i] != x) break;
4410 update[i]->forward[i] = x->forward[i];
4411 }
4412 if (x->forward[0]) {
4413 x->forward[0]->backward = (x->backward == zsl->header) ?
4414 NULL : x->backward;
4415 } else {
4416 zsl->tail = x->backward;
4417 }
4418 zslFreeNode(x);
4419 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4420 zsl->level--;
4421 zsl->length--;
4422 return 1;
4423 } else {
4424 return 0; /* not found */
4425 }
4426 return 0; /* not found */
4427 }
4428
4429 /* Delete all the elements with score between min and max from the skiplist.
4430 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4431 * Note that this function takes the reference to the hash table view of the
4432 * sorted set, in order to remove the elements from the hash table too. */
4433 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4434 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4435 unsigned long removed = 0;
4436 int i;
4437
4438 x = zsl->header;
4439 for (i = zsl->level-1; i >= 0; i--) {
4440 while (x->forward[i] && x->forward[i]->score < min)
4441 x = x->forward[i];
4442 update[i] = x;
4443 }
4444 /* We may have multiple elements with the same score, what we need
4445 * is to find the element with both the right score and object. */
4446 x = x->forward[0];
4447 while (x && x->score <= max) {
4448 zskiplistNode *next;
4449
4450 for (i = 0; i < zsl->level; i++) {
4451 if (update[i]->forward[i] != x) break;
4452 update[i]->forward[i] = x->forward[i];
4453 }
4454 if (x->forward[0]) {
4455 x->forward[0]->backward = (x->backward == zsl->header) ?
4456 NULL : x->backward;
4457 } else {
4458 zsl->tail = x->backward;
4459 }
4460 next = x->forward[0];
4461 dictDelete(dict,x->obj);
4462 zslFreeNode(x);
4463 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4464 zsl->level--;
4465 zsl->length--;
4466 removed++;
4467 x = next;
4468 }
4469 return removed; /* not found */
4470 }
4471
4472 /* Find the first node having a score equal or greater than the specified one.
4473 * Returns NULL if there is no match. */
4474 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4475 zskiplistNode *x;
4476 int i;
4477
4478 x = zsl->header;
4479 for (i = zsl->level-1; i >= 0; i--) {
4480 while (x->forward[i] && x->forward[i]->score < score)
4481 x = x->forward[i];
4482 }
4483 /* We may have multiple elements with the same score, what we need
4484 * is to find the element with both the right score and object. */
4485 return x->forward[0];
4486 }
4487
4488 /* The actual Z-commands implementations */
4489
4490 /* This generic command implements both ZADD and ZINCRBY.
4491 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4492 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4493 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4494 robj *zsetobj;
4495 zset *zs;
4496 double *score;
4497
4498 zsetobj = lookupKeyWrite(c->db,key);
4499 if (zsetobj == NULL) {
4500 zsetobj = createZsetObject();
4501 dictAdd(c->db->dict,key,zsetobj);
4502 incrRefCount(key);
4503 } else {
4504 if (zsetobj->type != REDIS_ZSET) {
4505 addReply(c,shared.wrongtypeerr);
4506 return;
4507 }
4508 }
4509 zs = zsetobj->ptr;
4510
4511 /* Ok now since we implement both ZADD and ZINCRBY here the code
4512 * needs to handle the two different conditions. It's all about setting
4513 * '*score', that is, the new score to set, to the right value. */
4514 score = zmalloc(sizeof(double));
4515 if (doincrement) {
4516 dictEntry *de;
4517
4518 /* Read the old score. If the element was not present starts from 0 */
4519 de = dictFind(zs->dict,ele);
4520 if (de) {
4521 double *oldscore = dictGetEntryVal(de);
4522 *score = *oldscore + scoreval;
4523 } else {
4524 *score = scoreval;
4525 }
4526 } else {
4527 *score = scoreval;
4528 }
4529
4530 /* What follows is a simple remove and re-insert operation that is common
4531 * to both ZADD and ZINCRBY... */
4532 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4533 /* case 1: New element */
4534 incrRefCount(ele); /* added to hash */
4535 zslInsert(zs->zsl,*score,ele);
4536 incrRefCount(ele); /* added to skiplist */
4537 server.dirty++;
4538 if (doincrement)
4539 addReplyDouble(c,*score);
4540 else
4541 addReply(c,shared.cone);
4542 } else {
4543 dictEntry *de;
4544 double *oldscore;
4545
4546 /* case 2: Score update operation */
4547 de = dictFind(zs->dict,ele);
4548 redisAssert(de != NULL);
4549 oldscore = dictGetEntryVal(de);
4550 if (*score != *oldscore) {
4551 int deleted;
4552
4553 /* Remove and insert the element in the skip list with new score */
4554 deleted = zslDelete(zs->zsl,*oldscore,ele);
4555 redisAssert(deleted != 0);
4556 zslInsert(zs->zsl,*score,ele);
4557 incrRefCount(ele);
4558 /* Update the score in the hash table */
4559 dictReplace(zs->dict,ele,score);
4560 server.dirty++;
4561 } else {
4562 zfree(score);
4563 }
4564 if (doincrement)
4565 addReplyDouble(c,*score);
4566 else
4567 addReply(c,shared.czero);
4568 }
4569 }
4570
4571 static void zaddCommand(redisClient *c) {
4572 double scoreval;
4573
4574 scoreval = strtod(c->argv[2]->ptr,NULL);
4575 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4576 }
4577
4578 static void zincrbyCommand(redisClient *c) {
4579 double scoreval;
4580
4581 scoreval = strtod(c->argv[2]->ptr,NULL);
4582 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4583 }
4584
4585 static void zremCommand(redisClient *c) {
4586 robj *zsetobj;
4587 zset *zs;
4588
4589 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4590 if (zsetobj == NULL) {
4591 addReply(c,shared.czero);
4592 } else {
4593 dictEntry *de;
4594 double *oldscore;
4595 int deleted;
4596
4597 if (zsetobj->type != REDIS_ZSET) {
4598 addReply(c,shared.wrongtypeerr);
4599 return;
4600 }
4601 zs = zsetobj->ptr;
4602 de = dictFind(zs->dict,c->argv[2]);
4603 if (de == NULL) {
4604 addReply(c,shared.czero);
4605 return;
4606 }
4607 /* Delete from the skiplist */
4608 oldscore = dictGetEntryVal(de);
4609 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4610 redisAssert(deleted != 0);
4611
4612 /* Delete from the hash table */
4613 dictDelete(zs->dict,c->argv[2]);
4614 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4615 server.dirty++;
4616 addReply(c,shared.cone);
4617 }
4618 }
4619
4620 static void zremrangebyscoreCommand(redisClient *c) {
4621 double min = strtod(c->argv[2]->ptr,NULL);
4622 double max = strtod(c->argv[3]->ptr,NULL);
4623 robj *zsetobj;
4624 zset *zs;
4625
4626 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4627 if (zsetobj == NULL) {
4628 addReply(c,shared.czero);
4629 } else {
4630 long deleted;
4631
4632 if (zsetobj->type != REDIS_ZSET) {
4633 addReply(c,shared.wrongtypeerr);
4634 return;
4635 }
4636 zs = zsetobj->ptr;
4637 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4638 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4639 server.dirty += deleted;
4640 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4641 }
4642 }
4643
4644 static void zrangeGenericCommand(redisClient *c, int reverse) {
4645 robj *o;
4646 int start = atoi(c->argv[2]->ptr);
4647 int end = atoi(c->argv[3]->ptr);
4648 int withscores = 0;
4649
4650 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4651 withscores = 1;
4652 } else if (c->argc >= 5) {
4653 addReply(c,shared.syntaxerr);
4654 return;
4655 }
4656
4657 o = lookupKeyRead(c->db,c->argv[1]);
4658 if (o == NULL) {
4659 addReply(c,shared.nullmultibulk);
4660 } else {
4661 if (o->type != REDIS_ZSET) {
4662 addReply(c,shared.wrongtypeerr);
4663 } else {
4664 zset *zsetobj = o->ptr;
4665 zskiplist *zsl = zsetobj->zsl;
4666 zskiplistNode *ln;
4667
4668 int llen = zsl->length;
4669 int rangelen, j;
4670 robj *ele;
4671
4672 /* convert negative indexes */
4673 if (start < 0) start = llen+start;
4674 if (end < 0) end = llen+end;
4675 if (start < 0) start = 0;
4676 if (end < 0) end = 0;
4677
4678 /* indexes sanity checks */
4679 if (start > end || start >= llen) {
4680 /* Out of range start or start > end result in empty list */
4681 addReply(c,shared.emptymultibulk);
4682 return;
4683 }
4684 if (end >= llen) end = llen-1;
4685 rangelen = (end-start)+1;
4686
4687 /* Return the result in form of a multi-bulk reply */
4688 if (reverse) {
4689 ln = zsl->tail;
4690 while (start--)
4691 ln = ln->backward;
4692 } else {
4693 ln = zsl->header->forward[0];
4694 while (start--)
4695 ln = ln->forward[0];
4696 }
4697
4698 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4699 withscores ? (rangelen*2) : rangelen));
4700 for (j = 0; j < rangelen; j++) {
4701 ele = ln->obj;
4702 addReplyBulkLen(c,ele);
4703 addReply(c,ele);
4704 addReply(c,shared.crlf);
4705 if (withscores)
4706 addReplyDouble(c,ln->score);
4707 ln = reverse ? ln->backward : ln->forward[0];
4708 }
4709 }
4710 }
4711 }
4712
4713 static void zrangeCommand(redisClient *c) {
4714 zrangeGenericCommand(c,0);
4715 }
4716
4717 static void zrevrangeCommand(redisClient *c) {
4718 zrangeGenericCommand(c,1);
4719 }
4720
4721 static void zrangebyscoreCommand(redisClient *c) {
4722 robj *o;
4723 double min = strtod(c->argv[2]->ptr,NULL);
4724 double max = strtod(c->argv[3]->ptr,NULL);
4725 int offset = 0, limit = -1;
4726
4727 if (c->argc != 4 && c->argc != 7) {
4728 addReplySds(c,
4729 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4730 return;
4731 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4732 addReply(c,shared.syntaxerr);
4733 return;
4734 } else if (c->argc == 7) {
4735 offset = atoi(c->argv[5]->ptr);
4736 limit = atoi(c->argv[6]->ptr);
4737 if (offset < 0) offset = 0;
4738 }
4739
4740 o = lookupKeyRead(c->db,c->argv[1]);
4741 if (o == NULL) {
4742 addReply(c,shared.nullmultibulk);
4743 } else {
4744 if (o->type != REDIS_ZSET) {
4745 addReply(c,shared.wrongtypeerr);
4746 } else {
4747 zset *zsetobj = o->ptr;
4748 zskiplist *zsl = zsetobj->zsl;
4749 zskiplistNode *ln;
4750 robj *ele, *lenobj;
4751 unsigned int rangelen = 0;
4752
4753 /* Get the first node with the score >= min */
4754 ln = zslFirstWithScore(zsl,min);
4755 if (ln == NULL) {
4756 /* No element matching the speciifed interval */
4757 addReply(c,shared.emptymultibulk);
4758 return;
4759 }
4760
4761 /* We don't know in advance how many matching elements there
4762 * are in the list, so we push this object that will represent
4763 * the multi-bulk length in the output buffer, and will "fix"
4764 * it later */
4765 lenobj = createObject(REDIS_STRING,NULL);
4766 addReply(c,lenobj);
4767 decrRefCount(lenobj);
4768
4769 while(ln && ln->score <= max) {
4770 if (offset) {
4771 offset--;
4772 ln = ln->forward[0];
4773 continue;
4774 }
4775 if (limit == 0) break;
4776 ele = ln->obj;
4777 addReplyBulkLen(c,ele);
4778 addReply(c,ele);
4779 addReply(c,shared.crlf);
4780 ln = ln->forward[0];
4781 rangelen++;
4782 if (limit > 0) limit--;
4783 }
4784 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4785 }
4786 }
4787 }
4788
4789 static void zcardCommand(redisClient *c) {
4790 robj *o;
4791 zset *zs;
4792
4793 o = lookupKeyRead(c->db,c->argv[1]);
4794 if (o == NULL) {
4795 addReply(c,shared.czero);
4796 return;
4797 } else {
4798 if (o->type != REDIS_ZSET) {
4799 addReply(c,shared.wrongtypeerr);
4800 } else {
4801 zs = o->ptr;
4802 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4803 }
4804 }
4805 }
4806
4807 static void zscoreCommand(redisClient *c) {
4808 robj *o;
4809 zset *zs;
4810
4811 o = lookupKeyRead(c->db,c->argv[1]);
4812 if (o == NULL) {
4813 addReply(c,shared.nullbulk);
4814 return;
4815 } else {
4816 if (o->type != REDIS_ZSET) {
4817 addReply(c,shared.wrongtypeerr);
4818 } else {
4819 dictEntry *de;
4820
4821 zs = o->ptr;
4822 de = dictFind(zs->dict,c->argv[2]);
4823 if (!de) {
4824 addReply(c,shared.nullbulk);
4825 } else {
4826 double *score = dictGetEntryVal(de);
4827
4828 addReplyDouble(c,*score);
4829 }
4830 }
4831 }
4832 }
4833
4834 /* ========================= Non type-specific commands ==================== */
4835
4836 static void flushdbCommand(redisClient *c) {
4837 server.dirty += dictSize(c->db->dict);
4838 dictEmpty(c->db->dict);
4839 dictEmpty(c->db->expires);
4840 addReply(c,shared.ok);
4841 }
4842
4843 static void flushallCommand(redisClient *c) {
4844 server.dirty += emptyDb();
4845 addReply(c,shared.ok);
4846 rdbSave(server.dbfilename);
4847 server.dirty++;
4848 }
4849
4850 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4851 redisSortOperation *so = zmalloc(sizeof(*so));
4852 so->type = type;
4853 so->pattern = pattern;
4854 return so;
4855 }
4856
4857 /* Return the value associated to the key with a name obtained
4858 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4859 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4860 char *p;
4861 sds spat, ssub;
4862 robj keyobj;
4863 int prefixlen, sublen, postfixlen;
4864 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4865 struct {
4866 long len;
4867 long free;
4868 char buf[REDIS_SORTKEY_MAX+1];
4869 } keyname;
4870
4871 /* If the pattern is "#" return the substitution object itself in order
4872 * to implement the "SORT ... GET #" feature. */
4873 spat = pattern->ptr;
4874 if (spat[0] == '#' && spat[1] == '\0') {
4875 return subst;
4876 }
4877
4878 /* The substitution object may be specially encoded. If so we create
4879 * a decoded object on the fly. Otherwise getDecodedObject will just
4880 * increment the ref count, that we'll decrement later. */
4881 subst = getDecodedObject(subst);
4882
4883 ssub = subst->ptr;
4884 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4885 p = strchr(spat,'*');
4886 if (!p) {
4887 decrRefCount(subst);
4888 return NULL;
4889 }
4890
4891 prefixlen = p-spat;
4892 sublen = sdslen(ssub);
4893 postfixlen = sdslen(spat)-(prefixlen+1);
4894 memcpy(keyname.buf,spat,prefixlen);
4895 memcpy(keyname.buf+prefixlen,ssub,sublen);
4896 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4897 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4898 keyname.len = prefixlen+sublen+postfixlen;
4899
4900 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4901 decrRefCount(subst);
4902
4903 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4904 return lookupKeyRead(db,&keyobj);
4905 }
4906
4907 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4908 * the additional parameter is not standard but a BSD-specific we have to
4909 * pass sorting parameters via the global 'server' structure */
4910 static int sortCompare(const void *s1, const void *s2) {
4911 const redisSortObject *so1 = s1, *so2 = s2;
4912 int cmp;
4913
4914 if (!server.sort_alpha) {
4915 /* Numeric sorting. Here it's trivial as we precomputed scores */
4916 if (so1->u.score > so2->u.score) {
4917 cmp = 1;
4918 } else if (so1->u.score < so2->u.score) {
4919 cmp = -1;
4920 } else {
4921 cmp = 0;
4922 }
4923 } else {
4924 /* Alphanumeric sorting */
4925 if (server.sort_bypattern) {
4926 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4927 /* At least one compare object is NULL */
4928 if (so1->u.cmpobj == so2->u.cmpobj)
4929 cmp = 0;
4930 else if (so1->u.cmpobj == NULL)
4931 cmp = -1;
4932 else
4933 cmp = 1;
4934 } else {
4935 /* We have both the objects, use strcoll */
4936 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4937 }
4938 } else {
4939 /* Compare elements directly */
4940 robj *dec1, *dec2;
4941
4942 dec1 = getDecodedObject(so1->obj);
4943 dec2 = getDecodedObject(so2->obj);
4944 cmp = strcoll(dec1->ptr,dec2->ptr);
4945 decrRefCount(dec1);
4946 decrRefCount(dec2);
4947 }
4948 }
4949 return server.sort_desc ? -cmp : cmp;
4950 }
4951
4952 /* The SORT command is the most complex command in Redis. Warning: this code
4953 * is optimized for speed and a bit less for readability */
4954 static void sortCommand(redisClient *c) {
4955 list *operations;
4956 int outputlen = 0;
4957 int desc = 0, alpha = 0;
4958 int limit_start = 0, limit_count = -1, start, end;
4959 int j, dontsort = 0, vectorlen;
4960 int getop = 0; /* GET operation counter */
4961 robj *sortval, *sortby = NULL, *storekey = NULL;
4962 redisSortObject *vector; /* Resulting vector to sort */
4963
4964 /* Lookup the key to sort. It must be of the right types */
4965 sortval = lookupKeyRead(c->db,c->argv[1]);
4966 if (sortval == NULL) {
4967 addReply(c,shared.nullmultibulk);
4968 return;
4969 }
4970 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
4971 sortval->type != REDIS_ZSET)
4972 {
4973 addReply(c,shared.wrongtypeerr);
4974 return;
4975 }
4976
4977 /* Create a list of operations to perform for every sorted element.
4978 * Operations can be GET/DEL/INCR/DECR */
4979 operations = listCreate();
4980 listSetFreeMethod(operations,zfree);
4981 j = 2;
4982
4983 /* Now we need to protect sortval incrementing its count, in the future
4984 * SORT may have options able to overwrite/delete keys during the sorting
4985 * and the sorted key itself may get destroied */
4986 incrRefCount(sortval);
4987
4988 /* The SORT command has an SQL-alike syntax, parse it */
4989 while(j < c->argc) {
4990 int leftargs = c->argc-j-1;
4991 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4992 desc = 0;
4993 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4994 desc = 1;
4995 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4996 alpha = 1;
4997 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4998 limit_start = atoi(c->argv[j+1]->ptr);
4999 limit_count = atoi(c->argv[j+2]->ptr);
5000 j+=2;
5001 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
5002 storekey = c->argv[j+1];
5003 j++;
5004 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
5005 sortby = c->argv[j+1];
5006 /* If the BY pattern does not contain '*', i.e. it is constant,
5007 * we don't need to sort nor to lookup the weight keys. */
5008 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
5009 j++;
5010 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
5011 listAddNodeTail(operations,createSortOperation(
5012 REDIS_SORT_GET,c->argv[j+1]));
5013 getop++;
5014 j++;
5015 } else {
5016 decrRefCount(sortval);
5017 listRelease(operations);
5018 addReply(c,shared.syntaxerr);
5019 return;
5020 }
5021 j++;
5022 }
5023
5024 /* Load the sorting vector with all the objects to sort */
5025 switch(sortval->type) {
5026 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
5027 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
5028 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
5029 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
5030 }
5031 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
5032 j = 0;
5033
5034 if (sortval->type == REDIS_LIST) {
5035 list *list = sortval->ptr;
5036 listNode *ln;
5037
5038 listRewind(list);
5039 while((ln = listYield(list))) {
5040 robj *ele = ln->value;
5041 vector[j].obj = ele;
5042 vector[j].u.score = 0;
5043 vector[j].u.cmpobj = NULL;
5044 j++;
5045 }
5046 } else {
5047 dict *set;
5048 dictIterator *di;
5049 dictEntry *setele;
5050
5051 if (sortval->type == REDIS_SET) {
5052 set = sortval->ptr;
5053 } else {
5054 zset *zs = sortval->ptr;
5055 set = zs->dict;
5056 }
5057
5058 di = dictGetIterator(set);
5059 while((setele = dictNext(di)) != NULL) {
5060 vector[j].obj = dictGetEntryKey(setele);
5061 vector[j].u.score = 0;
5062 vector[j].u.cmpobj = NULL;
5063 j++;
5064 }
5065 dictReleaseIterator(di);
5066 }
5067 redisAssert(j == vectorlen);
5068
5069 /* Now it's time to load the right scores in the sorting vector */
5070 if (dontsort == 0) {
5071 for (j = 0; j < vectorlen; j++) {
5072 if (sortby) {
5073 robj *byval;
5074
5075 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
5076 if (!byval || byval->type != REDIS_STRING) continue;
5077 if (alpha) {
5078 vector[j].u.cmpobj = getDecodedObject(byval);
5079 } else {
5080 if (byval->encoding == REDIS_ENCODING_RAW) {
5081 vector[j].u.score = strtod(byval->ptr,NULL);
5082 } else {
5083 /* Don't need to decode the object if it's
5084 * integer-encoded (the only encoding supported) so
5085 * far. We can just cast it */
5086 if (byval->encoding == REDIS_ENCODING_INT) {
5087 vector[j].u.score = (long)byval->ptr;
5088 } else
5089 redisAssert(1 != 1);
5090 }
5091 }
5092 } else {
5093 if (!alpha) {
5094 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5095 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5096 else {
5097 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5098 vector[j].u.score = (long) vector[j].obj->ptr;
5099 else
5100 redisAssert(1 != 1);
5101 }
5102 }
5103 }
5104 }
5105 }
5106
5107 /* We are ready to sort the vector... perform a bit of sanity check
5108 * on the LIMIT option too. We'll use a partial version of quicksort. */
5109 start = (limit_start < 0) ? 0 : limit_start;
5110 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5111 if (start >= vectorlen) {
5112 start = vectorlen-1;
5113 end = vectorlen-2;
5114 }
5115 if (end >= vectorlen) end = vectorlen-1;
5116
5117 if (dontsort == 0) {
5118 server.sort_desc = desc;
5119 server.sort_alpha = alpha;
5120 server.sort_bypattern = sortby ? 1 : 0;
5121 if (sortby && (start != 0 || end != vectorlen-1))
5122 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5123 else
5124 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5125 }
5126
5127 /* Send command output to the output buffer, performing the specified
5128 * GET/DEL/INCR/DECR operations if any. */
5129 outputlen = getop ? getop*(end-start+1) : end-start+1;
5130 if (storekey == NULL) {
5131 /* STORE option not specified, sent the sorting result to client */
5132 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5133 for (j = start; j <= end; j++) {
5134 listNode *ln;
5135 if (!getop) {
5136 addReplyBulkLen(c,vector[j].obj);
5137 addReply(c,vector[j].obj);
5138 addReply(c,shared.crlf);
5139 }
5140 listRewind(operations);
5141 while((ln = listYield(operations))) {
5142 redisSortOperation *sop = ln->value;
5143 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5144 vector[j].obj);
5145
5146 if (sop->type == REDIS_SORT_GET) {
5147 if (!val || val->type != REDIS_STRING) {
5148 addReply(c,shared.nullbulk);
5149 } else {
5150 addReplyBulkLen(c,val);
5151 addReply(c,val);
5152 addReply(c,shared.crlf);
5153 }
5154 } else {
5155 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5156 }
5157 }
5158 }
5159 } else {
5160 robj *listObject = createListObject();
5161 list *listPtr = (list*) listObject->ptr;
5162
5163 /* STORE option specified, set the sorting result as a List object */
5164 for (j = start; j <= end; j++) {
5165 listNode *ln;
5166 if (!getop) {
5167 listAddNodeTail(listPtr,vector[j].obj);
5168 incrRefCount(vector[j].obj);
5169 }
5170 listRewind(operations);
5171 while((ln = listYield(operations))) {
5172 redisSortOperation *sop = ln->value;
5173 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5174 vector[j].obj);
5175
5176 if (sop->type == REDIS_SORT_GET) {
5177 if (!val || val->type != REDIS_STRING) {
5178 listAddNodeTail(listPtr,createStringObject("",0));
5179 } else {
5180 listAddNodeTail(listPtr,val);
5181 incrRefCount(val);
5182 }
5183 } else {
5184 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5185 }
5186 }
5187 }
5188 if (dictReplace(c->db->dict,storekey,listObject)) {
5189 incrRefCount(storekey);
5190 }
5191 /* Note: we add 1 because the DB is dirty anyway since even if the
5192 * SORT result is empty a new key is set and maybe the old content
5193 * replaced. */
5194 server.dirty += 1+outputlen;
5195 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5196 }
5197
5198 /* Cleanup */
5199 decrRefCount(sortval);
5200 listRelease(operations);
5201 for (j = 0; j < vectorlen; j++) {
5202 if (sortby && alpha && vector[j].u.cmpobj)
5203 decrRefCount(vector[j].u.cmpobj);
5204 }
5205 zfree(vector);
5206 }
5207
5208 /* Create the string returned by the INFO command. This is decoupled
5209 * by the INFO command itself as we need to report the same information
5210 * on memory corruption problems. */
5211 static sds genRedisInfoString(void) {
5212 sds info;
5213 time_t uptime = time(NULL)-server.stat_starttime;
5214 int j;
5215
5216 info = sdscatprintf(sdsempty(),
5217 "redis_version:%s\r\n"
5218 "arch_bits:%s\r\n"
5219 "multiplexing_api:%s\r\n"
5220 "uptime_in_seconds:%ld\r\n"
5221 "uptime_in_days:%ld\r\n"
5222 "connected_clients:%d\r\n"
5223 "connected_slaves:%d\r\n"
5224 "blocked_clients:%d\r\n"
5225 "used_memory:%zu\r\n"
5226 "changes_since_last_save:%lld\r\n"
5227 "bgsave_in_progress:%d\r\n"
5228 "last_save_time:%ld\r\n"
5229 "bgrewriteaof_in_progress:%d\r\n"
5230 "total_connections_received:%lld\r\n"
5231 "total_commands_processed:%lld\r\n"
5232 "role:%s\r\n"
5233 ,REDIS_VERSION,
5234 (sizeof(long) == 8) ? "64" : "32",
5235 aeGetApiName(),
5236 uptime,
5237 uptime/(3600*24),
5238 listLength(server.clients)-listLength(server.slaves),
5239 listLength(server.slaves),
5240 server.blockedclients,
5241 server.usedmemory,
5242 server.dirty,
5243 server.bgsavechildpid != -1,
5244 server.lastsave,
5245 server.bgrewritechildpid != -1,
5246 server.stat_numconnections,
5247 server.stat_numcommands,
5248 server.masterhost == NULL ? "master" : "slave"
5249 );
5250 if (server.masterhost) {
5251 info = sdscatprintf(info,
5252 "master_host:%s\r\n"
5253 "master_port:%d\r\n"
5254 "master_link_status:%s\r\n"
5255 "master_last_io_seconds_ago:%d\r\n"
5256 ,server.masterhost,
5257 server.masterport,
5258 (server.replstate == REDIS_REPL_CONNECTED) ?
5259 "up" : "down",
5260 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5261 );
5262 }
5263 for (j = 0; j < server.dbnum; j++) {
5264 long long keys, vkeys;
5265
5266 keys = dictSize(server.db[j].dict);
5267 vkeys = dictSize(server.db[j].expires);
5268 if (keys || vkeys) {
5269 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5270 j, keys, vkeys);
5271 }
5272 }
5273 return info;
5274 }
5275
5276 static void infoCommand(redisClient *c) {
5277 sds info = genRedisInfoString();
5278 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5279 (unsigned long)sdslen(info)));
5280 addReplySds(c,info);
5281 addReply(c,shared.crlf);
5282 }
5283
5284 static void monitorCommand(redisClient *c) {
5285 /* ignore MONITOR if aleady slave or in monitor mode */
5286 if (c->flags & REDIS_SLAVE) return;
5287
5288 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5289 c->slaveseldb = 0;
5290 listAddNodeTail(server.monitors,c);
5291 addReply(c,shared.ok);
5292 }
5293
5294 /* ================================= Expire ================================= */
5295 static int removeExpire(redisDb *db, robj *key) {
5296 if (dictDelete(db->expires,key) == DICT_OK) {
5297 return 1;
5298 } else {
5299 return 0;
5300 }
5301 }
5302
5303 static int setExpire(redisDb *db, robj *key, time_t when) {
5304 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5305 return 0;
5306 } else {
5307 incrRefCount(key);
5308 return 1;
5309 }
5310 }
5311
5312 /* Return the expire time of the specified key, or -1 if no expire
5313 * is associated with this key (i.e. the key is non volatile) */
5314 static time_t getExpire(redisDb *db, robj *key) {
5315 dictEntry *de;
5316
5317 /* No expire? return ASAP */
5318 if (dictSize(db->expires) == 0 ||
5319 (de = dictFind(db->expires,key)) == NULL) return -1;
5320
5321 return (time_t) dictGetEntryVal(de);
5322 }
5323
5324 static int expireIfNeeded(redisDb *db, robj *key) {
5325 time_t when;
5326 dictEntry *de;
5327
5328 /* No expire? return ASAP */
5329 if (dictSize(db->expires) == 0 ||
5330 (de = dictFind(db->expires,key)) == NULL) return 0;
5331
5332 /* Lookup the expire */
5333 when = (time_t) dictGetEntryVal(de);
5334 if (time(NULL) <= when) return 0;
5335
5336 /* Delete the key */
5337 dictDelete(db->expires,key);
5338 return dictDelete(db->dict,key) == DICT_OK;
5339 }
5340
5341 static int deleteIfVolatile(redisDb *db, robj *key) {
5342 dictEntry *de;
5343
5344 /* No expire? return ASAP */
5345 if (dictSize(db->expires) == 0 ||
5346 (de = dictFind(db->expires,key)) == NULL) return 0;
5347
5348 /* Delete the key */
5349 server.dirty++;
5350 dictDelete(db->expires,key);
5351 return dictDelete(db->dict,key) == DICT_OK;
5352 }
5353
5354 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5355 dictEntry *de;
5356
5357 de = dictFind(c->db->dict,key);
5358 if (de == NULL) {
5359 addReply(c,shared.czero);
5360 return;
5361 }
5362 if (seconds < 0) {
5363 if (deleteKey(c->db,key)) server.dirty++;
5364 addReply(c, shared.cone);
5365 return;
5366 } else {
5367 time_t when = time(NULL)+seconds;
5368 if (setExpire(c->db,key,when)) {
5369 addReply(c,shared.cone);
5370 server.dirty++;
5371 } else {
5372 addReply(c,shared.czero);
5373 }
5374 return;
5375 }
5376 }
5377
5378 static void expireCommand(redisClient *c) {
5379 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5380 }
5381
5382 static void expireatCommand(redisClient *c) {
5383 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5384 }
5385
5386 static void ttlCommand(redisClient *c) {
5387 time_t expire;
5388 int ttl = -1;
5389
5390 expire = getExpire(c->db,c->argv[1]);
5391 if (expire != -1) {
5392 ttl = (int) (expire-time(NULL));
5393 if (ttl < 0) ttl = -1;
5394 }
5395 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5396 }
5397
5398 /* ================================ MULTI/EXEC ============================== */
5399
5400 /* Client state initialization for MULTI/EXEC */
5401 static void initClientMultiState(redisClient *c) {
5402 c->mstate.commands = NULL;
5403 c->mstate.count = 0;
5404 }
5405
5406 /* Release all the resources associated with MULTI/EXEC state */
5407 static void freeClientMultiState(redisClient *c) {
5408 int j;
5409
5410 for (j = 0; j < c->mstate.count; j++) {
5411 int i;
5412 multiCmd *mc = c->mstate.commands+j;
5413
5414 for (i = 0; i < mc->argc; i++)
5415 decrRefCount(mc->argv[i]);
5416 zfree(mc->argv);
5417 }
5418 zfree(c->mstate.commands);
5419 }
5420
5421 /* Add a new command into the MULTI commands queue */
5422 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5423 multiCmd *mc;
5424 int j;
5425
5426 c->mstate.commands = zrealloc(c->mstate.commands,
5427 sizeof(multiCmd)*(c->mstate.count+1));
5428 mc = c->mstate.commands+c->mstate.count;
5429 mc->cmd = cmd;
5430 mc->argc = c->argc;
5431 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5432 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5433 for (j = 0; j < c->argc; j++)
5434 incrRefCount(mc->argv[j]);
5435 c->mstate.count++;
5436 }
5437
5438 static void multiCommand(redisClient *c) {
5439 c->flags |= REDIS_MULTI;
5440 addReply(c,shared.ok);
5441 }
5442
5443 static void execCommand(redisClient *c) {
5444 int j;
5445 robj **orig_argv;
5446 int orig_argc;
5447
5448 if (!(c->flags & REDIS_MULTI)) {
5449 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5450 return;
5451 }
5452
5453 orig_argv = c->argv;
5454 orig_argc = c->argc;
5455 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5456 for (j = 0; j < c->mstate.count; j++) {
5457 c->argc = c->mstate.commands[j].argc;
5458 c->argv = c->mstate.commands[j].argv;
5459 call(c,c->mstate.commands[j].cmd);
5460 }
5461 c->argv = orig_argv;
5462 c->argc = orig_argc;
5463 freeClientMultiState(c);
5464 initClientMultiState(c);
5465 c->flags &= (~REDIS_MULTI);
5466 }
5467
5468 /* =========================== Blocking Operations ========================= */
5469
5470 /* Currently Redis blocking operations support is limited to list POP ops,
5471 * so the current implementation is not fully generic, but it is also not
5472 * completely specific so it will not require a rewrite to support new
5473 * kind of blocking operations in the future.
5474 *
5475 * Still it's important to note that list blocking operations can be already
5476 * used as a notification mechanism in order to implement other blocking
5477 * operations at application level, so there must be a very strong evidence
5478 * of usefulness and generality before new blocking operations are implemented.
5479 *
5480 * This is how the current blocking POP works, we use BLPOP as example:
5481 * - If the user calls BLPOP and the key exists and contains a non empty list
5482 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5483 * if there is not to block.
5484 * - If instead BLPOP is called and the key does not exists or the list is
5485 * empty we need to block. In order to do so we remove the notification for
5486 * new data to read in the client socket (so that we'll not serve new
5487 * requests if the blocking request is not served). Also we put the client
5488 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5489 * blocking for this keys.
5490 * - If a PUSH operation against a key with blocked clients waiting is
5491 * performed, we serve the first in the list: basically instead to push
5492 * the new element inside the list we return it to the (first / oldest)
5493 * blocking client, unblock the client, and remove it form the list.
5494 *
5495 * The above comment and the source code should be enough in order to understand
5496 * the implementation and modify / fix it later.
5497 */
5498
5499 /* Set a client in blocking mode for the specified key, with the specified
5500 * timeout */
5501 static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
5502 dictEntry *de;
5503 list *l;
5504 int j;
5505
5506 c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
5507 c->blockingkeysnum = numkeys;
5508 c->blockingto = timeout;
5509 for (j = 0; j < numkeys; j++) {
5510 /* Add the key in the client structure, to map clients -> keys */
5511 c->blockingkeys[j] = keys[j];
5512 incrRefCount(keys[j]);
5513
5514 /* And in the other "side", to map keys -> clients */
5515 de = dictFind(c->db->blockingkeys,keys[j]);
5516 if (de == NULL) {
5517 int retval;
5518
5519 /* For every key we take a list of clients blocked for it */
5520 l = listCreate();
5521 retval = dictAdd(c->db->blockingkeys,keys[j],l);
5522 incrRefCount(keys[j]);
5523 assert(retval == DICT_OK);
5524 } else {
5525 l = dictGetEntryVal(de);
5526 }
5527 listAddNodeTail(l,c);
5528 }
5529 /* Mark the client as a blocked client */
5530 c->flags |= REDIS_BLOCKED;
5531 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
5532 server.blockedclients++;
5533 }
5534
5535 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5536 static void unblockClient(redisClient *c) {
5537 dictEntry *de;
5538 list *l;
5539 int j;
5540
5541 assert(c->blockingkeys != NULL);
5542 /* The client may wait for multiple keys, so unblock it for every key. */
5543 for (j = 0; j < c->blockingkeysnum; j++) {
5544 /* Remove this client from the list of clients waiting for this key. */
5545 de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
5546 assert(de != NULL);
5547 l = dictGetEntryVal(de);
5548 listDelNode(l,listSearchKey(l,c));
5549 /* If the list is empty we need to remove it to avoid wasting memory */
5550 if (listLength(l) == 0)
5551 dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
5552 decrRefCount(c->blockingkeys[j]);
5553 }
5554 /* Cleanup the client structure */
5555 zfree(c->blockingkeys);
5556 c->blockingkeys = NULL;
5557 c->flags &= (~REDIS_BLOCKED);
5558 server.blockedclients--;
5559 /* Ok now we are ready to get read events from socket, note that we
5560 * can't trap errors here as it's possible that unblockClients() is
5561 * called from freeClient() itself, and the only thing we can do
5562 * if we failed to register the READABLE event is to kill the client.
5563 * Still the following function should never fail in the real world as
5564 * we are sure the file descriptor is sane, and we exit on out of mem. */
5565 aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
5566 /* As a final step we want to process data if there is some command waiting
5567 * in the input buffer. Note that this is safe even if unblockClient()
5568 * gets called from freeClient() because freeClient() will be smart
5569 * enough to call this function *after* c->querybuf was set to NULL. */
5570 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
5571 }
5572
5573 /* This should be called from any function PUSHing into lists.
5574 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5575 * 'ele' is the element pushed.
5576 *
5577 * If the function returns 0 there was no client waiting for a list push
5578 * against this key.
5579 *
5580 * If the function returns 1 there was a client waiting for a list push
5581 * against this key, the element was passed to this client thus it's not
5582 * needed to actually add it to the list and the caller should return asap. */
5583 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
5584 struct dictEntry *de;
5585 redisClient *receiver;
5586 list *l;
5587 listNode *ln;
5588
5589 de = dictFind(c->db->blockingkeys,key);
5590 if (de == NULL) return 0;
5591 l = dictGetEntryVal(de);
5592 ln = listFirst(l);
5593 assert(ln != NULL);
5594 receiver = ln->value;
5595
5596 addReplySds(receiver,sdsnew("*2\r\n"));
5597 addReplyBulkLen(receiver,key);
5598 addReply(receiver,key);
5599 addReply(receiver,shared.crlf);
5600 addReplyBulkLen(receiver,ele);
5601 addReply(receiver,ele);
5602 addReply(receiver,shared.crlf);
5603 unblockClient(receiver);
5604 return 1;
5605 }
5606
5607 /* Blocking RPOP/LPOP */
5608 static void blockingPopGenericCommand(redisClient *c, int where) {
5609 robj *o;
5610 time_t timeout;
5611 int j;
5612
5613 for (j = 1; j < c->argc-1; j++) {
5614 o = lookupKeyWrite(c->db,c->argv[j]);
5615 if (o != NULL) {
5616 if (o->type != REDIS_LIST) {
5617 addReply(c,shared.wrongtypeerr);
5618 return;
5619 } else {
5620 list *list = o->ptr;
5621 if (listLength(list) != 0) {
5622 /* If the list contains elements fall back to the usual
5623 * non-blocking POP operation */
5624 robj *argv[2], **orig_argv;
5625 int orig_argc;
5626
5627 /* We need to alter the command arguments before to call
5628 * popGenericCommand() as the command takes a single key. */
5629 orig_argv = c->argv;
5630 orig_argc = c->argc;
5631 argv[1] = c->argv[j];
5632 c->argv = argv;
5633 c->argc = 2;
5634
5635 /* Also the return value is different, we need to output
5636 * the multi bulk reply header and the key name. The
5637 * "real" command will add the last element (the value)
5638 * for us. If this souds like an hack to you it's just
5639 * because it is... */
5640 addReplySds(c,sdsnew("*2\r\n"));
5641 addReplyBulkLen(c,argv[1]);
5642 addReply(c,argv[1]);
5643 addReply(c,shared.crlf);
5644 popGenericCommand(c,where);
5645
5646 /* Fix the client structure with the original stuff */
5647 c->argv = orig_argv;
5648 c->argc = orig_argc;
5649 return;
5650 }
5651 }
5652 }
5653 }
5654 /* If the list is empty or the key does not exists we must block */
5655 timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
5656 if (timeout > 0) timeout += time(NULL);
5657 blockForKeys(c,c->argv+1,c->argc-2,timeout);
5658 }
5659
5660 static void blpopCommand(redisClient *c) {
5661 blockingPopGenericCommand(c,REDIS_HEAD);
5662 }
5663
5664 static void brpopCommand(redisClient *c) {
5665 blockingPopGenericCommand(c,REDIS_TAIL);
5666 }
5667
5668 /* =============================== Replication ============================= */
5669
5670 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5671 ssize_t nwritten, ret = size;
5672 time_t start = time(NULL);
5673
5674 timeout++;
5675 while(size) {
5676 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5677 nwritten = write(fd,ptr,size);
5678 if (nwritten == -1) return -1;
5679 ptr += nwritten;
5680 size -= nwritten;
5681 }
5682 if ((time(NULL)-start) > timeout) {
5683 errno = ETIMEDOUT;
5684 return -1;
5685 }
5686 }
5687 return ret;
5688 }
5689
5690 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5691 ssize_t nread, totread = 0;
5692 time_t start = time(NULL);
5693
5694 timeout++;
5695 while(size) {
5696 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5697 nread = read(fd,ptr,size);
5698 if (nread == -1) return -1;
5699 ptr += nread;
5700 size -= nread;
5701 totread += nread;
5702 }
5703 if ((time(NULL)-start) > timeout) {
5704 errno = ETIMEDOUT;
5705 return -1;
5706 }
5707 }
5708 return totread;
5709 }
5710
5711 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5712 ssize_t nread = 0;
5713
5714 size--;
5715 while(size) {
5716 char c;
5717
5718 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5719 if (c == '\n') {
5720 *ptr = '\0';
5721 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5722 return nread;
5723 } else {
5724 *ptr++ = c;
5725 *ptr = '\0';
5726 nread++;
5727 }
5728 }
5729 return nread;
5730 }
5731
5732 static void syncCommand(redisClient *c) {
5733 /* ignore SYNC if aleady slave or in monitor mode */
5734 if (c->flags & REDIS_SLAVE) return;
5735
5736 /* SYNC can't be issued when the server has pending data to send to
5737 * the client about already issued commands. We need a fresh reply
5738 * buffer registering the differences between the BGSAVE and the current
5739 * dataset, so that we can copy to other slaves if needed. */
5740 if (listLength(c->reply) != 0) {
5741 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5742 return;
5743 }
5744
5745 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5746 /* Here we need to check if there is a background saving operation
5747 * in progress, or if it is required to start one */
5748 if (server.bgsavechildpid != -1) {
5749 /* Ok a background save is in progress. Let's check if it is a good
5750 * one for replication, i.e. if there is another slave that is
5751 * registering differences since the server forked to save */
5752 redisClient *slave;
5753 listNode *ln;
5754
5755 listRewind(server.slaves);
5756 while((ln = listYield(server.slaves))) {
5757 slave = ln->value;
5758 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5759 }
5760 if (ln) {
5761 /* Perfect, the server is already registering differences for
5762 * another slave. Set the right state, and copy the buffer. */
5763 listRelease(c->reply);
5764 c->reply = listDup(slave->reply);
5765 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5766 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5767 } else {
5768 /* No way, we need to wait for the next BGSAVE in order to
5769 * register differences */
5770 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5771 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5772 }
5773 } else {
5774 /* Ok we don't have a BGSAVE in progress, let's start one */
5775 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5776 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5777 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5778 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5779 return;
5780 }
5781 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5782 }
5783 c->repldbfd = -1;
5784 c->flags |= REDIS_SLAVE;
5785 c->slaveseldb = 0;
5786 listAddNodeTail(server.slaves,c);
5787 return;
5788 }
5789
5790 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5791 redisClient *slave = privdata;
5792 REDIS_NOTUSED(el);
5793 REDIS_NOTUSED(mask);
5794 char buf[REDIS_IOBUF_LEN];
5795 ssize_t nwritten, buflen;
5796
5797 if (slave->repldboff == 0) {
5798 /* Write the bulk write count before to transfer the DB. In theory here
5799 * we don't know how much room there is in the output buffer of the
5800 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5801 * operations) will never be smaller than the few bytes we need. */
5802 sds bulkcount;
5803
5804 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5805 slave->repldbsize);
5806 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5807 {
5808 sdsfree(bulkcount);
5809 freeClient(slave);
5810 return;
5811 }
5812 sdsfree(bulkcount);
5813 }
5814 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5815 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5816 if (buflen <= 0) {
5817 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5818 (buflen == 0) ? "premature EOF" : strerror(errno));
5819 freeClient(slave);
5820 return;
5821 }
5822 if ((nwritten = write(fd,buf,buflen)) == -1) {
5823 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5824 strerror(errno));
5825 freeClient(slave);
5826 return;
5827 }
5828 slave->repldboff += nwritten;
5829 if (slave->repldboff == slave->repldbsize) {
5830 close(slave->repldbfd);
5831 slave->repldbfd = -1;
5832 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5833 slave->replstate = REDIS_REPL_ONLINE;
5834 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5835 sendReplyToClient, slave) == AE_ERR) {
5836 freeClient(slave);
5837 return;
5838 }
5839 addReplySds(slave,sdsempty());
5840 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5841 }
5842 }
5843
5844 /* This function is called at the end of every backgrond saving.
5845 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5846 * otherwise REDIS_ERR is passed to the function.
5847 *
5848 * The goal of this function is to handle slaves waiting for a successful
5849 * background saving in order to perform non-blocking synchronization. */
5850 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5851 listNode *ln;
5852 int startbgsave = 0;
5853
5854 listRewind(server.slaves);
5855 while((ln = listYield(server.slaves))) {
5856 redisClient *slave = ln->value;
5857
5858 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5859 startbgsave = 1;
5860 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5861 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5862 struct redis_stat buf;
5863
5864 if (bgsaveerr != REDIS_OK) {
5865 freeClient(slave);
5866 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5867 continue;
5868 }
5869 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5870 redis_fstat(slave->repldbfd,&buf) == -1) {
5871 freeClient(slave);
5872 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5873 continue;
5874 }
5875 slave->repldboff = 0;
5876 slave->repldbsize = buf.st_size;
5877 slave->replstate = REDIS_REPL_SEND_BULK;
5878 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5879 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5880 freeClient(slave);
5881 continue;
5882 }
5883 }
5884 }
5885 if (startbgsave) {
5886 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5887 listRewind(server.slaves);
5888 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5889 while((ln = listYield(server.slaves))) {
5890 redisClient *slave = ln->value;
5891
5892 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5893 freeClient(slave);
5894 }
5895 }
5896 }
5897 }
5898
5899 static int syncWithMaster(void) {
5900 char buf[1024], tmpfile[256], authcmd[1024];
5901 int dumpsize;
5902 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5903 int dfd;
5904
5905 if (fd == -1) {
5906 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5907 strerror(errno));
5908 return REDIS_ERR;
5909 }
5910
5911 /* AUTH with the master if required. */
5912 if(server.masterauth) {
5913 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5914 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5915 close(fd);
5916 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5917 strerror(errno));
5918 return REDIS_ERR;
5919 }
5920 /* Read the AUTH result. */
5921 if (syncReadLine(fd,buf,1024,3600) == -1) {
5922 close(fd);
5923 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5924 strerror(errno));
5925 return REDIS_ERR;
5926 }
5927 if (buf[0] != '+') {
5928 close(fd);
5929 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5930 return REDIS_ERR;
5931 }
5932 }
5933
5934 /* Issue the SYNC command */
5935 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5936 close(fd);
5937 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5938 strerror(errno));
5939 return REDIS_ERR;
5940 }
5941 /* Read the bulk write count */
5942 if (syncReadLine(fd,buf,1024,3600) == -1) {
5943 close(fd);
5944 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5945 strerror(errno));
5946 return REDIS_ERR;
5947 }
5948 if (buf[0] != '$') {
5949 close(fd);
5950 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5951 return REDIS_ERR;
5952 }
5953 dumpsize = atoi(buf+1);
5954 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5955 /* Read the bulk write data on a temp file */
5956 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5957 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5958 if (dfd == -1) {
5959 close(fd);
5960 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5961 return REDIS_ERR;
5962 }
5963 while(dumpsize) {
5964 int nread, nwritten;
5965
5966 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5967 if (nread == -1) {
5968 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5969 strerror(errno));
5970 close(fd);
5971 close(dfd);
5972 return REDIS_ERR;
5973 }
5974 nwritten = write(dfd,buf,nread);
5975 if (nwritten == -1) {
5976 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5977 close(fd);
5978 close(dfd);
5979 return REDIS_ERR;
5980 }
5981 dumpsize -= nread;
5982 }
5983 close(dfd);
5984 if (rename(tmpfile,server.dbfilename) == -1) {
5985 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5986 unlink(tmpfile);
5987 close(fd);
5988 return REDIS_ERR;
5989 }
5990 emptyDb();
5991 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5992 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5993 close(fd);
5994 return REDIS_ERR;
5995 }
5996 server.master = createClient(fd);
5997 server.master->flags |= REDIS_MASTER;
5998 server.master->authenticated = 1;
5999 server.replstate = REDIS_REPL_CONNECTED;
6000 return REDIS_OK;
6001 }
6002
6003 static void slaveofCommand(redisClient *c) {
6004 if (!strcasecmp(c->argv[1]->ptr,"no") &&
6005 !strcasecmp(c->argv[2]->ptr,"one")) {
6006 if (server.masterhost) {
6007 sdsfree(server.masterhost);
6008 server.masterhost = NULL;
6009 if (server.master) freeClient(server.master);
6010 server.replstate = REDIS_REPL_NONE;
6011 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
6012 }
6013 } else {
6014 sdsfree(server.masterhost);
6015 server.masterhost = sdsdup(c->argv[1]->ptr);
6016 server.masterport = atoi(c->argv[2]->ptr);
6017 if (server.master) freeClient(server.master);
6018 server.replstate = REDIS_REPL_CONNECT;
6019 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
6020 server.masterhost, server.masterport);
6021 }
6022 addReply(c,shared.ok);
6023 }
6024
6025 /* ============================ Maxmemory directive ======================== */
6026
6027 /* This function gets called when 'maxmemory' is set on the config file to limit
6028 * the max memory used by the server, and we are out of memory.
6029 * This function will try to, in order:
6030 *
6031 * - Free objects from the free list
6032 * - Try to remove keys with an EXPIRE set
6033 *
6034 * It is not possible to free enough memory to reach used-memory < maxmemory
6035 * the server will start refusing commands that will enlarge even more the
6036 * memory usage.
6037 */
6038 static void freeMemoryIfNeeded(void) {
6039 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
6040 if (listLength(server.objfreelist)) {
6041 robj *o;
6042
6043 listNode *head = listFirst(server.objfreelist);
6044 o = listNodeValue(head);
6045 listDelNode(server.objfreelist,head);
6046 zfree(o);
6047 } else {
6048 int j, k, freed = 0;
6049
6050 for (j = 0; j < server.dbnum; j++) {
6051 int minttl = -1;
6052 robj *minkey = NULL;
6053 struct dictEntry *de;
6054
6055 if (dictSize(server.db[j].expires)) {
6056 freed = 1;
6057 /* From a sample of three keys drop the one nearest to
6058 * the natural expire */
6059 for (k = 0; k < 3; k++) {
6060 time_t t;
6061
6062 de = dictGetRandomKey(server.db[j].expires);
6063 t = (time_t) dictGetEntryVal(de);
6064 if (minttl == -1 || t < minttl) {
6065 minkey = dictGetEntryKey(de);
6066 minttl = t;
6067 }
6068 }
6069 deleteKey(server.db+j,minkey);
6070 }
6071 }
6072 if (!freed) return; /* nothing to free... */
6073 }
6074 }
6075 }
6076
6077 /* ============================== Append Only file ========================== */
6078
6079 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6080 sds buf = sdsempty();
6081 int j;
6082 ssize_t nwritten;
6083 time_t now;
6084 robj *tmpargv[3];
6085
6086 /* The DB this command was targetting is not the same as the last command
6087 * we appendend. To issue a SELECT command is needed. */
6088 if (dictid != server.appendseldb) {
6089 char seldb[64];
6090
6091 snprintf(seldb,sizeof(seldb),"%d",dictid);
6092 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6093 (unsigned long)strlen(seldb),seldb);
6094 server.appendseldb = dictid;
6095 }
6096
6097 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6098 * EXPIREs into EXPIREATs calls */
6099 if (cmd->proc == expireCommand) {
6100 long when;
6101
6102 tmpargv[0] = createStringObject("EXPIREAT",8);
6103 tmpargv[1] = argv[1];
6104 incrRefCount(argv[1]);
6105 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
6106 tmpargv[2] = createObject(REDIS_STRING,
6107 sdscatprintf(sdsempty(),"%ld",when));
6108 argv = tmpargv;
6109 }
6110
6111 /* Append the actual command */
6112 buf = sdscatprintf(buf,"*%d\r\n",argc);
6113 for (j = 0; j < argc; j++) {
6114 robj *o = argv[j];
6115
6116 o = getDecodedObject(o);
6117 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
6118 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
6119 buf = sdscatlen(buf,"\r\n",2);
6120 decrRefCount(o);
6121 }
6122
6123 /* Free the objects from the modified argv for EXPIREAT */
6124 if (cmd->proc == expireCommand) {
6125 for (j = 0; j < 3; j++)
6126 decrRefCount(argv[j]);
6127 }
6128
6129 /* We want to perform a single write. This should be guaranteed atomic
6130 * at least if the filesystem we are writing is a real physical one.
6131 * While this will save us against the server being killed I don't think
6132 * there is much to do about the whole server stopping for power problems
6133 * or alike */
6134 nwritten = write(server.appendfd,buf,sdslen(buf));
6135 if (nwritten != (signed)sdslen(buf)) {
6136 /* Ooops, we are in troubles. The best thing to do for now is
6137 * to simply exit instead to give the illusion that everything is
6138 * working as expected. */
6139 if (nwritten == -1) {
6140 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
6141 } else {
6142 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
6143 }
6144 exit(1);
6145 }
6146 /* If a background append only file rewriting is in progress we want to
6147 * accumulate the differences between the child DB and the current one
6148 * in a buffer, so that when the child process will do its work we
6149 * can append the differences to the new append only file. */
6150 if (server.bgrewritechildpid != -1)
6151 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
6152
6153 sdsfree(buf);
6154 now = time(NULL);
6155 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
6156 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
6157 now-server.lastfsync > 1))
6158 {
6159 fsync(server.appendfd); /* Let's try to get this data on the disk */
6160 server.lastfsync = now;
6161 }
6162 }
6163
6164 /* In Redis commands are always executed in the context of a client, so in
6165 * order to load the append only file we need to create a fake client. */
6166 static struct redisClient *createFakeClient(void) {
6167 struct redisClient *c = zmalloc(sizeof(*c));
6168
6169 selectDb(c,0);
6170 c->fd = -1;
6171 c->querybuf = sdsempty();
6172 c->argc = 0;
6173 c->argv = NULL;
6174 c->flags = 0;
6175 /* We set the fake client as a slave waiting for the synchronization
6176 * so that Redis will not try to send replies to this client. */
6177 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
6178 c->reply = listCreate();
6179 listSetFreeMethod(c->reply,decrRefCount);
6180 listSetDupMethod(c->reply,dupClientReplyValue);
6181 return c;
6182 }
6183
6184 static void freeFakeClient(struct redisClient *c) {
6185 sdsfree(c->querybuf);
6186 listRelease(c->reply);
6187 zfree(c);
6188 }
6189
6190 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6191 * error (the append only file is zero-length) REDIS_ERR is returned. On
6192 * fatal error an error message is logged and the program exists. */
6193 int loadAppendOnlyFile(char *filename) {
6194 struct redisClient *fakeClient;
6195 FILE *fp = fopen(filename,"r");
6196 struct redis_stat sb;
6197
6198 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
6199 return REDIS_ERR;
6200
6201 if (fp == NULL) {
6202 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
6203 exit(1);
6204 }
6205
6206 fakeClient = createFakeClient();
6207 while(1) {
6208 int argc, j;
6209 unsigned long len;
6210 robj **argv;
6211 char buf[128];
6212 sds argsds;
6213 struct redisCommand *cmd;
6214
6215 if (fgets(buf,sizeof(buf),fp) == NULL) {
6216 if (feof(fp))
6217 break;
6218 else
6219 goto readerr;
6220 }
6221 if (buf[0] != '*') goto fmterr;
6222 argc = atoi(buf+1);
6223 argv = zmalloc(sizeof(robj*)*argc);
6224 for (j = 0; j < argc; j++) {
6225 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
6226 if (buf[0] != '$') goto fmterr;
6227 len = strtol(buf+1,NULL,10);
6228 argsds = sdsnewlen(NULL,len);
6229 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
6230 argv[j] = createObject(REDIS_STRING,argsds);
6231 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
6232 }
6233
6234 /* Command lookup */
6235 cmd = lookupCommand(argv[0]->ptr);
6236 if (!cmd) {
6237 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
6238 exit(1);
6239 }
6240 /* Try object sharing and encoding */
6241 if (server.shareobjects) {
6242 int j;
6243 for(j = 1; j < argc; j++)
6244 argv[j] = tryObjectSharing(argv[j]);
6245 }
6246 if (cmd->flags & REDIS_CMD_BULK)
6247 tryObjectEncoding(argv[argc-1]);
6248 /* Run the command in the context of a fake client */
6249 fakeClient->argc = argc;
6250 fakeClient->argv = argv;
6251 cmd->proc(fakeClient);
6252 /* Discard the reply objects list from the fake client */
6253 while(listLength(fakeClient->reply))
6254 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
6255 /* Clean up, ready for the next command */
6256 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
6257 zfree(argv);
6258 }
6259 fclose(fp);
6260 freeFakeClient(fakeClient);
6261 return REDIS_OK;
6262
6263 readerr:
6264 if (feof(fp)) {
6265 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
6266 } else {
6267 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
6268 }
6269 exit(1);
6270 fmterr:
6271 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
6272 exit(1);
6273 }
6274
6275 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6276 static int fwriteBulk(FILE *fp, robj *obj) {
6277 char buf[128];
6278 obj = getDecodedObject(obj);
6279 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6280 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
6281 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6282 goto err;
6283 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6284 decrRefCount(obj);
6285 return 1;
6286 err:
6287 decrRefCount(obj);
6288 return 0;
6289 }
6290
6291 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6292 static int fwriteBulkDouble(FILE *fp, double d) {
6293 char buf[128], dbuf[128];
6294
6295 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6296 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6297 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6298 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6299 return 1;
6300 }
6301
6302 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6303 static int fwriteBulkLong(FILE *fp, long l) {
6304 char buf[128], lbuf[128];
6305
6306 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6307 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6308 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6309 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6310 return 1;
6311 }
6312
6313 /* Write a sequence of commands able to fully rebuild the dataset into
6314 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6315 static int rewriteAppendOnlyFile(char *filename) {
6316 dictIterator *di = NULL;
6317 dictEntry *de;
6318 FILE *fp;
6319 char tmpfile[256];
6320 int j;
6321 time_t now = time(NULL);
6322
6323 /* Note that we have to use a different temp name here compared to the
6324 * one used by rewriteAppendOnlyFileBackground() function. */
6325 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6326 fp = fopen(tmpfile,"w");
6327 if (!fp) {
6328 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6329 return REDIS_ERR;
6330 }
6331 for (j = 0; j < server.dbnum; j++) {
6332 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6333 redisDb *db = server.db+j;
6334 dict *d = db->dict;
6335 if (dictSize(d) == 0) continue;
6336 di = dictGetIterator(d);
6337 if (!di) {
6338 fclose(fp);
6339 return REDIS_ERR;
6340 }
6341
6342 /* SELECT the new DB */
6343 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
6344 if (fwriteBulkLong(fp,j) == 0) goto werr;
6345
6346 /* Iterate this DB writing every entry */
6347 while((de = dictNext(di)) != NULL) {
6348 robj *key = dictGetEntryKey(de);
6349 robj *o = dictGetEntryVal(de);
6350 time_t expiretime = getExpire(db,key);
6351
6352 /* Save the key and associated value */
6353 if (o->type == REDIS_STRING) {
6354 /* Emit a SET command */
6355 char cmd[]="*3\r\n$3\r\nSET\r\n";
6356 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6357 /* Key and value */
6358 if (fwriteBulk(fp,key) == 0) goto werr;
6359 if (fwriteBulk(fp,o) == 0) goto werr;
6360 } else if (o->type == REDIS_LIST) {
6361 /* Emit the RPUSHes needed to rebuild the list */
6362 list *list = o->ptr;
6363 listNode *ln;
6364
6365 listRewind(list);
6366 while((ln = listYield(list))) {
6367 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6368 robj *eleobj = listNodeValue(ln);
6369
6370 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6371 if (fwriteBulk(fp,key) == 0) goto werr;
6372 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6373 }
6374 } else if (o->type == REDIS_SET) {
6375 /* Emit the SADDs needed to rebuild the set */
6376 dict *set = o->ptr;
6377 dictIterator *di = dictGetIterator(set);
6378 dictEntry *de;
6379
6380 while((de = dictNext(di)) != NULL) {
6381 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6382 robj *eleobj = dictGetEntryKey(de);
6383
6384 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6385 if (fwriteBulk(fp,key) == 0) goto werr;
6386 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6387 }
6388 dictReleaseIterator(di);
6389 } else if (o->type == REDIS_ZSET) {
6390 /* Emit the ZADDs needed to rebuild the sorted set */
6391 zset *zs = o->ptr;
6392 dictIterator *di = dictGetIterator(zs->dict);
6393 dictEntry *de;
6394
6395 while((de = dictNext(di)) != NULL) {
6396 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6397 robj *eleobj = dictGetEntryKey(de);
6398 double *score = dictGetEntryVal(de);
6399
6400 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6401 if (fwriteBulk(fp,key) == 0) goto werr;
6402 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6403 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6404 }
6405 dictReleaseIterator(di);
6406 } else {
6407 redisAssert(0 != 0);
6408 }
6409 /* Save the expire time */
6410 if (expiretime != -1) {
6411 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6412 /* If this key is already expired skip it */
6413 if (expiretime < now) continue;
6414 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6415 if (fwriteBulk(fp,key) == 0) goto werr;
6416 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6417 }
6418 }
6419 dictReleaseIterator(di);
6420 }
6421
6422 /* Make sure data will not remain on the OS's output buffers */
6423 fflush(fp);
6424 fsync(fileno(fp));
6425 fclose(fp);
6426
6427 /* Use RENAME to make sure the DB file is changed atomically only
6428 * if the generate DB file is ok. */
6429 if (rename(tmpfile,filename) == -1) {
6430 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6431 unlink(tmpfile);
6432 return REDIS_ERR;
6433 }
6434 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6435 return REDIS_OK;
6436
6437 werr:
6438 fclose(fp);
6439 unlink(tmpfile);
6440 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6441 if (di) dictReleaseIterator(di);
6442 return REDIS_ERR;
6443 }
6444
6445 /* This is how rewriting of the append only file in background works:
6446 *
6447 * 1) The user calls BGREWRITEAOF
6448 * 2) Redis calls this function, that forks():
6449 * 2a) the child rewrite the append only file in a temp file.
6450 * 2b) the parent accumulates differences in server.bgrewritebuf.
6451 * 3) When the child finished '2a' exists.
6452 * 4) The parent will trap the exit code, if it's OK, will append the
6453 * data accumulated into server.bgrewritebuf into the temp file, and
6454 * finally will rename(2) the temp file in the actual file name.
6455 * The the new file is reopened as the new append only file. Profit!
6456 */
6457 static int rewriteAppendOnlyFileBackground(void) {
6458 pid_t childpid;
6459
6460 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6461 if ((childpid = fork()) == 0) {
6462 /* Child */
6463 char tmpfile[256];
6464 close(server.fd);
6465
6466 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6467 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6468 exit(0);
6469 } else {
6470 exit(1);
6471 }
6472 } else {
6473 /* Parent */
6474 if (childpid == -1) {
6475 redisLog(REDIS_WARNING,
6476 "Can't rewrite append only file in background: fork: %s",
6477 strerror(errno));
6478 return REDIS_ERR;
6479 }
6480 redisLog(REDIS_NOTICE,
6481 "Background append only file rewriting started by pid %d",childpid);
6482 server.bgrewritechildpid = childpid;
6483 /* We set appendseldb to -1 in order to force the next call to the
6484 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6485 * accumulated by the parent into server.bgrewritebuf will start
6486 * with a SELECT statement and it will be safe to merge. */
6487 server.appendseldb = -1;
6488 return REDIS_OK;
6489 }
6490 return REDIS_OK; /* unreached */
6491 }
6492
6493 static void bgrewriteaofCommand(redisClient *c) {
6494 if (server.bgrewritechildpid != -1) {
6495 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6496 return;
6497 }
6498 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6499 char *status = "+Background append only file rewriting started\r\n";
6500 addReplySds(c,sdsnew(status));
6501 } else {
6502 addReply(c,shared.err);
6503 }
6504 }
6505
6506 static void aofRemoveTempFile(pid_t childpid) {
6507 char tmpfile[256];
6508
6509 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6510 unlink(tmpfile);
6511 }
6512
6513 /* ================================= Debugging ============================== */
6514
6515 static void debugCommand(redisClient *c) {
6516 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6517 *((char*)-1) = 'x';
6518 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6519 if (rdbSave(server.dbfilename) != REDIS_OK) {
6520 addReply(c,shared.err);
6521 return;
6522 }
6523 emptyDb();
6524 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6525 addReply(c,shared.err);
6526 return;
6527 }
6528 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6529 addReply(c,shared.ok);
6530 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6531 emptyDb();
6532 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6533 addReply(c,shared.err);
6534 return;
6535 }
6536 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6537 addReply(c,shared.ok);
6538 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6539 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6540 robj *key, *val;
6541
6542 if (!de) {
6543 addReply(c,shared.nokeyerr);
6544 return;
6545 }
6546 key = dictGetEntryKey(de);
6547 val = dictGetEntryVal(de);
6548 addReplySds(c,sdscatprintf(sdsempty(),
6549 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6550 (void*)key, key->refcount, (void*)val, val->refcount,
6551 val->encoding));
6552 } else {
6553 addReplySds(c,sdsnew(
6554 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6555 }
6556 }
6557
6558 static void _redisAssert(char *estr) {
6559 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6560 redisLog(REDIS_WARNING,"==> %s\n",estr);
6561 #ifdef HAVE_BACKTRACE
6562 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6563 *((char*)-1) = 'x';
6564 #endif
6565 }
6566
6567 /* =================================== Main! ================================ */
6568
6569 #ifdef __linux__
6570 int linuxOvercommitMemoryValue(void) {
6571 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6572 char buf[64];
6573
6574 if (!fp) return -1;
6575 if (fgets(buf,64,fp) == NULL) {
6576 fclose(fp);
6577 return -1;
6578 }
6579 fclose(fp);
6580
6581 return atoi(buf);
6582 }
6583
6584 void linuxOvercommitMemoryWarning(void) {
6585 if (linuxOvercommitMemoryValue() == 0) {
6586 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6587 }
6588 }
6589 #endif /* __linux__ */
6590
6591 static void daemonize(void) {
6592 int fd;
6593 FILE *fp;
6594
6595 if (fork() != 0) exit(0); /* parent exits */
6596 printf("New pid: %d\n", getpid());
6597 setsid(); /* create a new session */
6598
6599 /* Every output goes to /dev/null. If Redis is daemonized but
6600 * the 'logfile' is set to 'stdout' in the configuration file
6601 * it will not log at all. */
6602 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6603 dup2(fd, STDIN_FILENO);
6604 dup2(fd, STDOUT_FILENO);
6605 dup2(fd, STDERR_FILENO);
6606 if (fd > STDERR_FILENO) close(fd);
6607 }
6608 /* Try to write the pid file */
6609 fp = fopen(server.pidfile,"w");
6610 if (fp) {
6611 fprintf(fp,"%d\n",getpid());
6612 fclose(fp);
6613 }
6614 }
6615
6616 int main(int argc, char **argv) {
6617 initServerConfig();
6618 if (argc == 2) {
6619 resetServerSaveParams();
6620 loadServerConfig(argv[1]);
6621 } else if (argc > 2) {
6622 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6623 exit(1);
6624 } else {
6625 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6626 }
6627 if (server.daemonize) daemonize();
6628 initServer();
6629 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6630 #ifdef __linux__
6631 linuxOvercommitMemoryWarning();
6632 #endif
6633 if (server.appendonly) {
6634 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6635 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6636 } else {
6637 if (rdbLoad(server.dbfilename) == REDIS_OK)
6638 redisLog(REDIS_NOTICE,"DB loaded from disk");
6639 }
6640 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6641 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6642 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6643 aeMain(server.el);
6644 aeDeleteEventLoop(server.el);
6645 return 0;
6646 }
6647
6648 /* ============================= Backtrace support ========================= */
6649
6650 #ifdef HAVE_BACKTRACE
6651 static char *findFuncName(void *pointer, unsigned long *offset);
6652
6653 static void *getMcontextEip(ucontext_t *uc) {
6654 #if defined(__FreeBSD__)
6655 return (void*) uc->uc_mcontext.mc_eip;
6656 #elif defined(__dietlibc__)
6657 return (void*) uc->uc_mcontext.eip;
6658 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6659 #if __x86_64__
6660 return (void*) uc->uc_mcontext->__ss.__rip;
6661 #else
6662 return (void*) uc->uc_mcontext->__ss.__eip;
6663 #endif
6664 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6665 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6666 return (void*) uc->uc_mcontext->__ss.__rip;
6667 #else
6668 return (void*) uc->uc_mcontext->__ss.__eip;
6669 #endif
6670 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6671 return (void*) uc->uc_mcontext.gregs[REG_EIP]; /* Linux 32/64 bit */
6672 #elif defined(__ia64__) /* Linux IA64 */
6673 return (void*) uc->uc_mcontext.sc_ip;
6674 #else
6675 return NULL;
6676 #endif
6677 }
6678
6679 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6680 void *trace[100];
6681 char **messages = NULL;
6682 int i, trace_size = 0;
6683 unsigned long offset=0;
6684 ucontext_t *uc = (ucontext_t*) secret;
6685 sds infostring;
6686 REDIS_NOTUSED(info);
6687
6688 redisLog(REDIS_WARNING,
6689 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6690 infostring = genRedisInfoString();
6691 redisLog(REDIS_WARNING, "%s",infostring);
6692 /* It's not safe to sdsfree() the returned string under memory
6693 * corruption conditions. Let it leak as we are going to abort */
6694
6695 trace_size = backtrace(trace, 100);
6696 /* overwrite sigaction with caller's address */
6697 if (getMcontextEip(uc) != NULL) {
6698 trace[1] = getMcontextEip(uc);
6699 }
6700 messages = backtrace_symbols(trace, trace_size);
6701
6702 for (i=1; i<trace_size; ++i) {
6703 char *fn = findFuncName(trace[i], &offset), *p;
6704
6705 p = strchr(messages[i],'+');
6706 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6707 redisLog(REDIS_WARNING,"%s", messages[i]);
6708 } else {
6709 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6710 }
6711 }
6712 /* free(messages); Don't call free() with possibly corrupted memory. */
6713 exit(0);
6714 }
6715
6716 static void setupSigSegvAction(void) {
6717 struct sigaction act;
6718
6719 sigemptyset (&act.sa_mask);
6720 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6721 * is used. Otherwise, sa_handler is used */
6722 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6723 act.sa_sigaction = segvHandler;
6724 sigaction (SIGSEGV, &act, NULL);
6725 sigaction (SIGBUS, &act, NULL);
6726 sigaction (SIGFPE, &act, NULL);
6727 sigaction (SIGILL, &act, NULL);
6728 sigaction (SIGBUS, &act, NULL);
6729 return;
6730 }
6731
6732 #include "staticsymbols.h"
6733 /* This function try to convert a pointer into a function name. It's used in
6734 * oreder to provide a backtrace under segmentation fault that's able to
6735 * display functions declared as static (otherwise the backtrace is useless). */
6736 static char *findFuncName(void *pointer, unsigned long *offset){
6737 int i, ret = -1;
6738 unsigned long off, minoff = 0;
6739
6740 /* Try to match against the Symbol with the smallest offset */
6741 for (i=0; symsTable[i].pointer; i++) {
6742 unsigned long lp = (unsigned long) pointer;
6743
6744 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6745 off=lp-symsTable[i].pointer;
6746 if (ret < 0 || off < minoff) {
6747 minoff=off;
6748 ret=i;
6749 }
6750 }
6751 }
6752 if (ret == -1) return NULL;
6753 *offset = minoff;
6754 return symsTable[ret].name;
6755 }
6756 #else /* HAVE_BACKTRACE */
6757 static void setupSigSegvAction(void) {
6758 }
6759 #endif /* HAVE_BACKTRACE */
6760
6761
6762
6763 /* The End */
6764
6765
6766