]> git.saurik.com Git - redis.git/blob - redis.c
df3b413192bc98cda7c2460968fdb7e37eb785ca
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.050"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /*================================= Data types ============================== */
202
203 /* A redis object, that is a type able to hold a string / list / set */
204 typedef struct redisObject {
205 void *ptr;
206 unsigned char type;
207 unsigned char encoding;
208 unsigned char notused[2];
209 int refcount;
210 } robj;
211
212 typedef struct redisDb {
213 dict *dict;
214 dict *expires;
215 int id;
216 } redisDb;
217
218 /* With multiplexing we need to take per-clinet state.
219 * Clients are taken in a liked list. */
220 typedef struct redisClient {
221 int fd;
222 redisDb *db;
223 int dictid;
224 sds querybuf;
225 robj **argv, **mbargv;
226 int argc, mbargc;
227 int bulklen; /* bulk read len. -1 if not in bulk read mode */
228 int multibulk; /* multi bulk command format active */
229 list *reply;
230 int sentlen;
231 time_t lastinteraction; /* time of the last interaction, used for timeout */
232 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
233 int slaveseldb; /* slave selected db, if this client is a slave */
234 int authenticated; /* when requirepass is non-NULL */
235 int replstate; /* replication state if this is a slave */
236 int repldbfd; /* replication DB file descriptor */
237 long repldboff; /* replication DB file offset */
238 off_t repldbsize; /* replication DB file size */
239 } redisClient;
240
241 struct saveparam {
242 time_t seconds;
243 int changes;
244 };
245
246 /* Global server state structure */
247 struct redisServer {
248 int port;
249 int fd;
250 redisDb *db;
251 dict *sharingpool;
252 unsigned int sharingpoolsize;
253 long long dirty; /* changes to DB from the last save */
254 list *clients;
255 list *slaves, *monitors;
256 char neterr[ANET_ERR_LEN];
257 aeEventLoop *el;
258 int cronloops; /* number of times the cron function run */
259 list *objfreelist; /* A list of freed objects to avoid malloc() */
260 time_t lastsave; /* Unix time of last save succeeede */
261 size_t usedmemory; /* Used memory in megabytes */
262 /* Fields used only for stats */
263 time_t stat_starttime; /* server start time */
264 long long stat_numcommands; /* number of processed commands */
265 long long stat_numconnections; /* number of connections received */
266 /* Configuration */
267 int verbosity;
268 int glueoutputbuf;
269 int maxidletime;
270 int dbnum;
271 int daemonize;
272 int appendonly;
273 int appendfsync;
274 time_t lastfsync;
275 int appendfd;
276 int appendseldb;
277 char *pidfile;
278 int bgsaveinprogress;
279 pid_t bgsavechildpid;
280 struct saveparam *saveparams;
281 int saveparamslen;
282 char *logfile;
283 char *bindaddr;
284 char *dbfilename;
285 char *appendfilename;
286 char *requirepass;
287 int shareobjects;
288 /* Replication related */
289 int isslave;
290 char *masterauth;
291 char *masterhost;
292 int masterport;
293 redisClient *master; /* client that is master for this slave */
294 int replstate;
295 unsigned int maxclients;
296 unsigned long maxmemory;
297 /* Sort parameters - qsort_r() is only available under BSD so we
298 * have to take this state global, in order to pass it to sortCompare() */
299 int sort_desc;
300 int sort_alpha;
301 int sort_bypattern;
302 };
303
304 typedef void redisCommandProc(redisClient *c);
305 struct redisCommand {
306 char *name;
307 redisCommandProc *proc;
308 int arity;
309 int flags;
310 };
311
312 struct redisFunctionSym {
313 char *name;
314 unsigned long pointer;
315 };
316
317 typedef struct _redisSortObject {
318 robj *obj;
319 union {
320 double score;
321 robj *cmpobj;
322 } u;
323 } redisSortObject;
324
325 typedef struct _redisSortOperation {
326 int type;
327 robj *pattern;
328 } redisSortOperation;
329
330 /* ZSETs use a specialized version of Skiplists */
331
332 typedef struct zskiplistNode {
333 struct zskiplistNode **forward;
334 struct zskiplistNode *backward;
335 double score;
336 robj *obj;
337 } zskiplistNode;
338
339 typedef struct zskiplist {
340 struct zskiplistNode *header, *tail;
341 unsigned long length;
342 int level;
343 } zskiplist;
344
345 typedef struct zset {
346 dict *dict;
347 zskiplist *zsl;
348 } zset;
349
350 /* Our shared "common" objects */
351
352 struct sharedObjectsStruct {
353 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
354 *colon, *nullbulk, *nullmultibulk,
355 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
356 *outofrangeerr, *plus,
357 *select0, *select1, *select2, *select3, *select4,
358 *select5, *select6, *select7, *select8, *select9;
359 } shared;
360
361 /* Global vars that are actally used as constants. The following double
362 * values are used for double on-disk serialization, and are initialized
363 * at runtime to avoid strange compiler optimizations. */
364
365 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
366
367 /*================================ Prototypes =============================== */
368
369 static void freeStringObject(robj *o);
370 static void freeListObject(robj *o);
371 static void freeSetObject(robj *o);
372 static void decrRefCount(void *o);
373 static robj *createObject(int type, void *ptr);
374 static void freeClient(redisClient *c);
375 static int rdbLoad(char *filename);
376 static void addReply(redisClient *c, robj *obj);
377 static void addReplySds(redisClient *c, sds s);
378 static void incrRefCount(robj *o);
379 static int rdbSaveBackground(char *filename);
380 static robj *createStringObject(char *ptr, size_t len);
381 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
382 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
383 static int syncWithMaster(void);
384 static robj *tryObjectSharing(robj *o);
385 static int tryObjectEncoding(robj *o);
386 static robj *getDecodedObject(const robj *o);
387 static int removeExpire(redisDb *db, robj *key);
388 static int expireIfNeeded(redisDb *db, robj *key);
389 static int deleteIfVolatile(redisDb *db, robj *key);
390 static int deleteKey(redisDb *db, robj *key);
391 static time_t getExpire(redisDb *db, robj *key);
392 static int setExpire(redisDb *db, robj *key, time_t when);
393 static void updateSlavesWaitingBgsave(int bgsaveerr);
394 static void freeMemoryIfNeeded(void);
395 static int processCommand(redisClient *c);
396 static void setupSigSegvAction(void);
397 static void rdbRemoveTempFile(pid_t childpid);
398 static size_t stringObjectLen(robj *o);
399 static void processInputBuffer(redisClient *c);
400 static zskiplist *zslCreate(void);
401 static void zslFree(zskiplist *zsl);
402 static void zslInsert(zskiplist *zsl, double score, robj *obj);
403 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
404
405 static void authCommand(redisClient *c);
406 static void pingCommand(redisClient *c);
407 static void echoCommand(redisClient *c);
408 static void setCommand(redisClient *c);
409 static void setnxCommand(redisClient *c);
410 static void getCommand(redisClient *c);
411 static void delCommand(redisClient *c);
412 static void existsCommand(redisClient *c);
413 static void incrCommand(redisClient *c);
414 static void decrCommand(redisClient *c);
415 static void incrbyCommand(redisClient *c);
416 static void decrbyCommand(redisClient *c);
417 static void selectCommand(redisClient *c);
418 static void randomkeyCommand(redisClient *c);
419 static void keysCommand(redisClient *c);
420 static void dbsizeCommand(redisClient *c);
421 static void lastsaveCommand(redisClient *c);
422 static void saveCommand(redisClient *c);
423 static void bgsaveCommand(redisClient *c);
424 static void shutdownCommand(redisClient *c);
425 static void moveCommand(redisClient *c);
426 static void renameCommand(redisClient *c);
427 static void renamenxCommand(redisClient *c);
428 static void lpushCommand(redisClient *c);
429 static void rpushCommand(redisClient *c);
430 static void lpopCommand(redisClient *c);
431 static void rpopCommand(redisClient *c);
432 static void llenCommand(redisClient *c);
433 static void lindexCommand(redisClient *c);
434 static void lrangeCommand(redisClient *c);
435 static void ltrimCommand(redisClient *c);
436 static void typeCommand(redisClient *c);
437 static void lsetCommand(redisClient *c);
438 static void saddCommand(redisClient *c);
439 static void sremCommand(redisClient *c);
440 static void smoveCommand(redisClient *c);
441 static void sismemberCommand(redisClient *c);
442 static void scardCommand(redisClient *c);
443 static void spopCommand(redisClient *c);
444 static void srandmemberCommand(redisClient *c);
445 static void sinterCommand(redisClient *c);
446 static void sinterstoreCommand(redisClient *c);
447 static void sunionCommand(redisClient *c);
448 static void sunionstoreCommand(redisClient *c);
449 static void sdiffCommand(redisClient *c);
450 static void sdiffstoreCommand(redisClient *c);
451 static void syncCommand(redisClient *c);
452 static void flushdbCommand(redisClient *c);
453 static void flushallCommand(redisClient *c);
454 static void sortCommand(redisClient *c);
455 static void lremCommand(redisClient *c);
456 static void rpoplpushcommand(redisClient *c);
457 static void infoCommand(redisClient *c);
458 static void mgetCommand(redisClient *c);
459 static void monitorCommand(redisClient *c);
460 static void expireCommand(redisClient *c);
461 static void expireatCommand(redisClient *c);
462 static void getsetCommand(redisClient *c);
463 static void ttlCommand(redisClient *c);
464 static void slaveofCommand(redisClient *c);
465 static void debugCommand(redisClient *c);
466 static void msetCommand(redisClient *c);
467 static void msetnxCommand(redisClient *c);
468 static void zaddCommand(redisClient *c);
469 static void zrangeCommand(redisClient *c);
470 static void zrangebyscoreCommand(redisClient *c);
471 static void zrevrangeCommand(redisClient *c);
472 static void zcardCommand(redisClient *c);
473 static void zremCommand(redisClient *c);
474 static void zscoreCommand(redisClient *c);
475 static void zremrangebyscoreCommand(redisClient *c);
476
477 /*================================= Globals ================================= */
478
479 /* Global vars */
480 static struct redisServer server; /* server global state */
481 static struct redisCommand cmdTable[] = {
482 {"get",getCommand,2,REDIS_CMD_INLINE},
483 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
484 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
485 {"del",delCommand,-2,REDIS_CMD_INLINE},
486 {"exists",existsCommand,2,REDIS_CMD_INLINE},
487 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
488 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
489 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
490 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
491 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
492 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
493 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
494 {"llen",llenCommand,2,REDIS_CMD_INLINE},
495 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
496 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
497 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
498 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
499 {"lrem",lremCommand,4,REDIS_CMD_BULK},
500 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK},
501 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
502 {"srem",sremCommand,3,REDIS_CMD_BULK},
503 {"smove",smoveCommand,4,REDIS_CMD_BULK},
504 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
505 {"scard",scardCommand,2,REDIS_CMD_INLINE},
506 {"spop",spopCommand,2,REDIS_CMD_INLINE},
507 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
508 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
509 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
510 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
511 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
512 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
513 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
514 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
515 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
516 {"zrem",zremCommand,3,REDIS_CMD_BULK},
517 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
518 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
519 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
520 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
521 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
522 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
523 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
524 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
525 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
526 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
527 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
528 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
529 {"select",selectCommand,2,REDIS_CMD_INLINE},
530 {"move",moveCommand,3,REDIS_CMD_INLINE},
531 {"rename",renameCommand,3,REDIS_CMD_INLINE},
532 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
533 {"expire",expireCommand,3,REDIS_CMD_INLINE},
534 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
535 {"keys",keysCommand,2,REDIS_CMD_INLINE},
536 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
537 {"auth",authCommand,2,REDIS_CMD_INLINE},
538 {"ping",pingCommand,1,REDIS_CMD_INLINE},
539 {"echo",echoCommand,2,REDIS_CMD_BULK},
540 {"save",saveCommand,1,REDIS_CMD_INLINE},
541 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
542 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
543 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
544 {"type",typeCommand,2,REDIS_CMD_INLINE},
545 {"sync",syncCommand,1,REDIS_CMD_INLINE},
546 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
547 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
548 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
549 {"info",infoCommand,1,REDIS_CMD_INLINE},
550 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
551 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
552 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
553 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
554 {NULL,NULL,0,0}
555 };
556
557 /*============================ Utility functions ============================ */
558
559 /* Glob-style pattern matching. */
560 int stringmatchlen(const char *pattern, int patternLen,
561 const char *string, int stringLen, int nocase)
562 {
563 while(patternLen) {
564 switch(pattern[0]) {
565 case '*':
566 while (pattern[1] == '*') {
567 pattern++;
568 patternLen--;
569 }
570 if (patternLen == 1)
571 return 1; /* match */
572 while(stringLen) {
573 if (stringmatchlen(pattern+1, patternLen-1,
574 string, stringLen, nocase))
575 return 1; /* match */
576 string++;
577 stringLen--;
578 }
579 return 0; /* no match */
580 break;
581 case '?':
582 if (stringLen == 0)
583 return 0; /* no match */
584 string++;
585 stringLen--;
586 break;
587 case '[':
588 {
589 int not, match;
590
591 pattern++;
592 patternLen--;
593 not = pattern[0] == '^';
594 if (not) {
595 pattern++;
596 patternLen--;
597 }
598 match = 0;
599 while(1) {
600 if (pattern[0] == '\\') {
601 pattern++;
602 patternLen--;
603 if (pattern[0] == string[0])
604 match = 1;
605 } else if (pattern[0] == ']') {
606 break;
607 } else if (patternLen == 0) {
608 pattern--;
609 patternLen++;
610 break;
611 } else if (pattern[1] == '-' && patternLen >= 3) {
612 int start = pattern[0];
613 int end = pattern[2];
614 int c = string[0];
615 if (start > end) {
616 int t = start;
617 start = end;
618 end = t;
619 }
620 if (nocase) {
621 start = tolower(start);
622 end = tolower(end);
623 c = tolower(c);
624 }
625 pattern += 2;
626 patternLen -= 2;
627 if (c >= start && c <= end)
628 match = 1;
629 } else {
630 if (!nocase) {
631 if (pattern[0] == string[0])
632 match = 1;
633 } else {
634 if (tolower((int)pattern[0]) == tolower((int)string[0]))
635 match = 1;
636 }
637 }
638 pattern++;
639 patternLen--;
640 }
641 if (not)
642 match = !match;
643 if (!match)
644 return 0; /* no match */
645 string++;
646 stringLen--;
647 break;
648 }
649 case '\\':
650 if (patternLen >= 2) {
651 pattern++;
652 patternLen--;
653 }
654 /* fall through */
655 default:
656 if (!nocase) {
657 if (pattern[0] != string[0])
658 return 0; /* no match */
659 } else {
660 if (tolower((int)pattern[0]) != tolower((int)string[0]))
661 return 0; /* no match */
662 }
663 string++;
664 stringLen--;
665 break;
666 }
667 pattern++;
668 patternLen--;
669 if (stringLen == 0) {
670 while(*pattern == '*') {
671 pattern++;
672 patternLen--;
673 }
674 break;
675 }
676 }
677 if (patternLen == 0 && stringLen == 0)
678 return 1;
679 return 0;
680 }
681
682 static void redisLog(int level, const char *fmt, ...) {
683 va_list ap;
684 FILE *fp;
685
686 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
687 if (!fp) return;
688
689 va_start(ap, fmt);
690 if (level >= server.verbosity) {
691 char *c = ".-*";
692 char buf[64];
693 time_t now;
694
695 now = time(NULL);
696 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
697 fprintf(fp,"%s %c ",buf,c[level]);
698 vfprintf(fp, fmt, ap);
699 fprintf(fp,"\n");
700 fflush(fp);
701 }
702 va_end(ap);
703
704 if (server.logfile) fclose(fp);
705 }
706
707 /*====================== Hash table type implementation ==================== */
708
709 /* This is an hash table type that uses the SDS dynamic strings libary as
710 * keys and radis objects as values (objects can hold SDS strings,
711 * lists, sets). */
712
713 static void dictVanillaFree(void *privdata, void *val)
714 {
715 DICT_NOTUSED(privdata);
716 zfree(val);
717 }
718
719 static int sdsDictKeyCompare(void *privdata, const void *key1,
720 const void *key2)
721 {
722 int l1,l2;
723 DICT_NOTUSED(privdata);
724
725 l1 = sdslen((sds)key1);
726 l2 = sdslen((sds)key2);
727 if (l1 != l2) return 0;
728 return memcmp(key1, key2, l1) == 0;
729 }
730
731 static void dictRedisObjectDestructor(void *privdata, void *val)
732 {
733 DICT_NOTUSED(privdata);
734
735 decrRefCount(val);
736 }
737
738 static int dictObjKeyCompare(void *privdata, const void *key1,
739 const void *key2)
740 {
741 const robj *o1 = key1, *o2 = key2;
742 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
743 }
744
745 static unsigned int dictObjHash(const void *key) {
746 const robj *o = key;
747 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
748 }
749
750 static int dictEncObjKeyCompare(void *privdata, const void *key1,
751 const void *key2)
752 {
753 const robj *o1 = key1, *o2 = key2;
754
755 if (o1->encoding == REDIS_ENCODING_RAW &&
756 o2->encoding == REDIS_ENCODING_RAW)
757 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
758 else {
759 robj *dec1, *dec2;
760 int cmp;
761
762 dec1 = o1->encoding != REDIS_ENCODING_RAW ?
763 getDecodedObject(o1) : (robj*)o1;
764 dec2 = o2->encoding != REDIS_ENCODING_RAW ?
765 getDecodedObject(o2) : (robj*)o2;
766 cmp = sdsDictKeyCompare(privdata,dec1->ptr,dec2->ptr);
767 if (dec1 != o1) decrRefCount(dec1);
768 if (dec2 != o2) decrRefCount(dec2);
769 return cmp;
770 }
771 }
772
773 static unsigned int dictEncObjHash(const void *key) {
774 const robj *o = key;
775
776 if (o->encoding == REDIS_ENCODING_RAW)
777 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
778 else {
779 robj *dec = getDecodedObject(o);
780 unsigned int hash = dictGenHashFunction(dec->ptr, sdslen((sds)dec->ptr));
781 decrRefCount(dec);
782 return hash;
783 }
784 }
785
786 static dictType setDictType = {
787 dictEncObjHash, /* hash function */
788 NULL, /* key dup */
789 NULL, /* val dup */
790 dictEncObjKeyCompare, /* key compare */
791 dictRedisObjectDestructor, /* key destructor */
792 NULL /* val destructor */
793 };
794
795 static dictType zsetDictType = {
796 dictEncObjHash, /* hash function */
797 NULL, /* key dup */
798 NULL, /* val dup */
799 dictEncObjKeyCompare, /* key compare */
800 dictRedisObjectDestructor, /* key destructor */
801 dictVanillaFree /* val destructor */
802 };
803
804 static dictType hashDictType = {
805 dictObjHash, /* hash function */
806 NULL, /* key dup */
807 NULL, /* val dup */
808 dictObjKeyCompare, /* key compare */
809 dictRedisObjectDestructor, /* key destructor */
810 dictRedisObjectDestructor /* val destructor */
811 };
812
813 /* ========================= Random utility functions ======================= */
814
815 /* Redis generally does not try to recover from out of memory conditions
816 * when allocating objects or strings, it is not clear if it will be possible
817 * to report this condition to the client since the networking layer itself
818 * is based on heap allocation for send buffers, so we simply abort.
819 * At least the code will be simpler to read... */
820 static void oom(const char *msg) {
821 fprintf(stderr, "%s: Out of memory\n",msg);
822 fflush(stderr);
823 sleep(1);
824 abort();
825 }
826
827 /* ====================== Redis server networking stuff ===================== */
828 static void closeTimedoutClients(void) {
829 redisClient *c;
830 listNode *ln;
831 time_t now = time(NULL);
832
833 listRewind(server.clients);
834 while ((ln = listYield(server.clients)) != NULL) {
835 c = listNodeValue(ln);
836 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
837 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
838 (now - c->lastinteraction > server.maxidletime)) {
839 redisLog(REDIS_DEBUG,"Closing idle client");
840 freeClient(c);
841 }
842 }
843 }
844
845 static int htNeedsResize(dict *dict) {
846 long long size, used;
847
848 size = dictSlots(dict);
849 used = dictSize(dict);
850 return (size && used && size > DICT_HT_INITIAL_SIZE &&
851 (used*100/size < REDIS_HT_MINFILL));
852 }
853
854 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
855 * we resize the hash table to save memory */
856 static void tryResizeHashTables(void) {
857 int j;
858
859 for (j = 0; j < server.dbnum; j++) {
860 if (htNeedsResize(server.db[j].dict)) {
861 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
862 dictResize(server.db[j].dict);
863 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
864 }
865 if (htNeedsResize(server.db[j].expires))
866 dictResize(server.db[j].expires);
867 }
868 }
869
870 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
871 int j, loops = server.cronloops++;
872 REDIS_NOTUSED(eventLoop);
873 REDIS_NOTUSED(id);
874 REDIS_NOTUSED(clientData);
875
876 /* Update the global state with the amount of used memory */
877 server.usedmemory = zmalloc_used_memory();
878
879 /* Show some info about non-empty databases */
880 for (j = 0; j < server.dbnum; j++) {
881 long long size, used, vkeys;
882
883 size = dictSlots(server.db[j].dict);
884 used = dictSize(server.db[j].dict);
885 vkeys = dictSize(server.db[j].expires);
886 if (!(loops % 5) && (used || vkeys)) {
887 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
888 /* dictPrintStats(server.dict); */
889 }
890 }
891
892 /* We don't want to resize the hash tables while a bacground saving
893 * is in progress: the saving child is created using fork() that is
894 * implemented with a copy-on-write semantic in most modern systems, so
895 * if we resize the HT while there is the saving child at work actually
896 * a lot of memory movements in the parent will cause a lot of pages
897 * copied. */
898 if (!server.bgsaveinprogress) tryResizeHashTables();
899
900 /* Show information about connected clients */
901 if (!(loops % 5)) {
902 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
903 listLength(server.clients)-listLength(server.slaves),
904 listLength(server.slaves),
905 server.usedmemory,
906 dictSize(server.sharingpool));
907 }
908
909 /* Close connections of timedout clients */
910 if (server.maxidletime && !(loops % 10))
911 closeTimedoutClients();
912
913 /* Check if a background saving in progress terminated */
914 if (server.bgsaveinprogress) {
915 int statloc;
916 if (wait3(&statloc,WNOHANG,NULL)) {
917 int exitcode = WEXITSTATUS(statloc);
918 int bysignal = WIFSIGNALED(statloc);
919
920 if (!bysignal && exitcode == 0) {
921 redisLog(REDIS_NOTICE,
922 "Background saving terminated with success");
923 server.dirty = 0;
924 server.lastsave = time(NULL);
925 } else if (!bysignal && exitcode != 0) {
926 redisLog(REDIS_WARNING, "Background saving error");
927 } else {
928 redisLog(REDIS_WARNING,
929 "Background saving terminated by signal");
930 rdbRemoveTempFile(server.bgsavechildpid);
931 }
932 server.bgsaveinprogress = 0;
933 server.bgsavechildpid = -1;
934 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
935 }
936 } else {
937 /* If there is not a background saving in progress check if
938 * we have to save now */
939 time_t now = time(NULL);
940 for (j = 0; j < server.saveparamslen; j++) {
941 struct saveparam *sp = server.saveparams+j;
942
943 if (server.dirty >= sp->changes &&
944 now-server.lastsave > sp->seconds) {
945 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
946 sp->changes, sp->seconds);
947 rdbSaveBackground(server.dbfilename);
948 break;
949 }
950 }
951 }
952
953 /* Try to expire a few timed out keys. The algorithm used is adaptive and
954 * will use few CPU cycles if there are few expiring keys, otherwise
955 * it will get more aggressive to avoid that too much memory is used by
956 * keys that can be removed from the keyspace. */
957 for (j = 0; j < server.dbnum; j++) {
958 int expired;
959 redisDb *db = server.db+j;
960
961 /* Continue to expire if at the end of the cycle more than 25%
962 * of the keys were expired. */
963 do {
964 int num = dictSize(db->expires);
965 time_t now = time(NULL);
966
967 expired = 0;
968 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
969 num = REDIS_EXPIRELOOKUPS_PER_CRON;
970 while (num--) {
971 dictEntry *de;
972 time_t t;
973
974 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
975 t = (time_t) dictGetEntryVal(de);
976 if (now > t) {
977 deleteKey(db,dictGetEntryKey(de));
978 expired++;
979 }
980 }
981 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
982 }
983
984 /* Check if we should connect to a MASTER */
985 if (server.replstate == REDIS_REPL_CONNECT) {
986 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
987 if (syncWithMaster() == REDIS_OK) {
988 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
989 }
990 }
991 return 1000;
992 }
993
994 static void createSharedObjects(void) {
995 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
996 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
997 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
998 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
999 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1000 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1001 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1002 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1003 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1004 /* no such key */
1005 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1006 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1007 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1008 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1009 "-ERR no such key\r\n"));
1010 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1011 "-ERR syntax error\r\n"));
1012 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1013 "-ERR source and destination objects are the same\r\n"));
1014 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1015 "-ERR index out of range\r\n"));
1016 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1017 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1018 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1019 shared.select0 = createStringObject("select 0\r\n",10);
1020 shared.select1 = createStringObject("select 1\r\n",10);
1021 shared.select2 = createStringObject("select 2\r\n",10);
1022 shared.select3 = createStringObject("select 3\r\n",10);
1023 shared.select4 = createStringObject("select 4\r\n",10);
1024 shared.select5 = createStringObject("select 5\r\n",10);
1025 shared.select6 = createStringObject("select 6\r\n",10);
1026 shared.select7 = createStringObject("select 7\r\n",10);
1027 shared.select8 = createStringObject("select 8\r\n",10);
1028 shared.select9 = createStringObject("select 9\r\n",10);
1029 }
1030
1031 static void appendServerSaveParams(time_t seconds, int changes) {
1032 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1033 server.saveparams[server.saveparamslen].seconds = seconds;
1034 server.saveparams[server.saveparamslen].changes = changes;
1035 server.saveparamslen++;
1036 }
1037
1038 static void resetServerSaveParams() {
1039 zfree(server.saveparams);
1040 server.saveparams = NULL;
1041 server.saveparamslen = 0;
1042 }
1043
1044 static void initServerConfig() {
1045 server.dbnum = REDIS_DEFAULT_DBNUM;
1046 server.port = REDIS_SERVERPORT;
1047 server.verbosity = REDIS_DEBUG;
1048 server.maxidletime = REDIS_MAXIDLETIME;
1049 server.saveparams = NULL;
1050 server.logfile = NULL; /* NULL = log on standard output */
1051 server.bindaddr = NULL;
1052 server.glueoutputbuf = 1;
1053 server.daemonize = 0;
1054 server.appendonly = 0;
1055 server.appendfsync = APPENDFSYNC_ALWAYS;
1056 server.lastfsync = time(NULL);
1057 server.appendfd = -1;
1058 server.appendseldb = -1; /* Make sure the first time will not match */
1059 server.pidfile = "/var/run/redis.pid";
1060 server.dbfilename = "dump.rdb";
1061 server.appendfilename = "appendonly.log";
1062 server.requirepass = NULL;
1063 server.shareobjects = 0;
1064 server.sharingpoolsize = 1024;
1065 server.maxclients = 0;
1066 server.maxmemory = 0;
1067 resetServerSaveParams();
1068
1069 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1070 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1071 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1072 /* Replication related */
1073 server.isslave = 0;
1074 server.masterauth = NULL;
1075 server.masterhost = NULL;
1076 server.masterport = 6379;
1077 server.master = NULL;
1078 server.replstate = REDIS_REPL_NONE;
1079
1080 /* Double constants initialization */
1081 R_Zero = 0.0;
1082 R_PosInf = 1.0/R_Zero;
1083 R_NegInf = -1.0/R_Zero;
1084 R_Nan = R_Zero/R_Zero;
1085 }
1086
1087 static void initServer() {
1088 int j;
1089
1090 signal(SIGHUP, SIG_IGN);
1091 signal(SIGPIPE, SIG_IGN);
1092 setupSigSegvAction();
1093
1094 server.clients = listCreate();
1095 server.slaves = listCreate();
1096 server.monitors = listCreate();
1097 server.objfreelist = listCreate();
1098 createSharedObjects();
1099 server.el = aeCreateEventLoop();
1100 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1101 server.sharingpool = dictCreate(&setDictType,NULL);
1102 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1103 if (server.fd == -1) {
1104 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1105 exit(1);
1106 }
1107 for (j = 0; j < server.dbnum; j++) {
1108 server.db[j].dict = dictCreate(&hashDictType,NULL);
1109 server.db[j].expires = dictCreate(&setDictType,NULL);
1110 server.db[j].id = j;
1111 }
1112 server.cronloops = 0;
1113 server.bgsaveinprogress = 0;
1114 server.bgsavechildpid = -1;
1115 server.lastsave = time(NULL);
1116 server.dirty = 0;
1117 server.usedmemory = 0;
1118 server.stat_numcommands = 0;
1119 server.stat_numconnections = 0;
1120 server.stat_starttime = time(NULL);
1121 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1122
1123 if (server.appendonly) {
1124 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1125 if (server.appendfd == -1) {
1126 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1127 strerror(errno));
1128 exit(1);
1129 }
1130 }
1131 }
1132
1133 /* Empty the whole database */
1134 static long long emptyDb() {
1135 int j;
1136 long long removed = 0;
1137
1138 for (j = 0; j < server.dbnum; j++) {
1139 removed += dictSize(server.db[j].dict);
1140 dictEmpty(server.db[j].dict);
1141 dictEmpty(server.db[j].expires);
1142 }
1143 return removed;
1144 }
1145
1146 static int yesnotoi(char *s) {
1147 if (!strcasecmp(s,"yes")) return 1;
1148 else if (!strcasecmp(s,"no")) return 0;
1149 else return -1;
1150 }
1151
1152 /* I agree, this is a very rudimental way to load a configuration...
1153 will improve later if the config gets more complex */
1154 static void loadServerConfig(char *filename) {
1155 FILE *fp;
1156 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1157 int linenum = 0;
1158 sds line = NULL;
1159
1160 if (filename[0] == '-' && filename[1] == '\0')
1161 fp = stdin;
1162 else {
1163 if ((fp = fopen(filename,"r")) == NULL) {
1164 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1165 exit(1);
1166 }
1167 }
1168
1169 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1170 sds *argv;
1171 int argc, j;
1172
1173 linenum++;
1174 line = sdsnew(buf);
1175 line = sdstrim(line," \t\r\n");
1176
1177 /* Skip comments and blank lines*/
1178 if (line[0] == '#' || line[0] == '\0') {
1179 sdsfree(line);
1180 continue;
1181 }
1182
1183 /* Split into arguments */
1184 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1185 sdstolower(argv[0]);
1186
1187 /* Execute config directives */
1188 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1189 server.maxidletime = atoi(argv[1]);
1190 if (server.maxidletime < 0) {
1191 err = "Invalid timeout value"; goto loaderr;
1192 }
1193 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1194 server.port = atoi(argv[1]);
1195 if (server.port < 1 || server.port > 65535) {
1196 err = "Invalid port"; goto loaderr;
1197 }
1198 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1199 server.bindaddr = zstrdup(argv[1]);
1200 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1201 int seconds = atoi(argv[1]);
1202 int changes = atoi(argv[2]);
1203 if (seconds < 1 || changes < 0) {
1204 err = "Invalid save parameters"; goto loaderr;
1205 }
1206 appendServerSaveParams(seconds,changes);
1207 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1208 if (chdir(argv[1]) == -1) {
1209 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1210 argv[1], strerror(errno));
1211 exit(1);
1212 }
1213 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1214 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1215 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1216 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1217 else {
1218 err = "Invalid log level. Must be one of debug, notice, warning";
1219 goto loaderr;
1220 }
1221 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1222 FILE *logfp;
1223
1224 server.logfile = zstrdup(argv[1]);
1225 if (!strcasecmp(server.logfile,"stdout")) {
1226 zfree(server.logfile);
1227 server.logfile = NULL;
1228 }
1229 if (server.logfile) {
1230 /* Test if we are able to open the file. The server will not
1231 * be able to abort just for this problem later... */
1232 logfp = fopen(server.logfile,"a");
1233 if (logfp == NULL) {
1234 err = sdscatprintf(sdsempty(),
1235 "Can't open the log file: %s", strerror(errno));
1236 goto loaderr;
1237 }
1238 fclose(logfp);
1239 }
1240 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1241 server.dbnum = atoi(argv[1]);
1242 if (server.dbnum < 1) {
1243 err = "Invalid number of databases"; goto loaderr;
1244 }
1245 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1246 server.maxclients = atoi(argv[1]);
1247 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1248 server.maxmemory = strtoll(argv[1], NULL, 10);
1249 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1250 server.masterhost = sdsnew(argv[1]);
1251 server.masterport = atoi(argv[2]);
1252 server.replstate = REDIS_REPL_CONNECT;
1253 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1254 server.masterauth = zstrdup(argv[1]);
1255 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1256 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1257 err = "argument must be 'yes' or 'no'"; goto loaderr;
1258 }
1259 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1260 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1261 err = "argument must be 'yes' or 'no'"; goto loaderr;
1262 }
1263 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1264 server.sharingpoolsize = atoi(argv[1]);
1265 if (server.sharingpoolsize < 1) {
1266 err = "invalid object sharing pool size"; goto loaderr;
1267 }
1268 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1269 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1270 err = "argument must be 'yes' or 'no'"; goto loaderr;
1271 }
1272 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1273 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1274 err = "argument must be 'yes' or 'no'"; goto loaderr;
1275 }
1276 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1277 if (!strcasecmp(argv[1],"no")) {
1278 server.appendfsync = APPENDFSYNC_NO;
1279 } else if (!strcasecmp(argv[1],"always")) {
1280 server.appendfsync = APPENDFSYNC_ALWAYS;
1281 } else if (!strcasecmp(argv[1],"everysec")) {
1282 server.appendfsync = APPENDFSYNC_EVERYSEC;
1283 } else {
1284 err = "argument must be 'no', 'always' or 'everysec'";
1285 goto loaderr;
1286 }
1287 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1288 server.requirepass = zstrdup(argv[1]);
1289 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1290 server.pidfile = zstrdup(argv[1]);
1291 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1292 server.dbfilename = zstrdup(argv[1]);
1293 } else {
1294 err = "Bad directive or wrong number of arguments"; goto loaderr;
1295 }
1296 for (j = 0; j < argc; j++)
1297 sdsfree(argv[j]);
1298 zfree(argv);
1299 sdsfree(line);
1300 }
1301 if (fp != stdin) fclose(fp);
1302 return;
1303
1304 loaderr:
1305 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1306 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1307 fprintf(stderr, ">>> '%s'\n", line);
1308 fprintf(stderr, "%s\n", err);
1309 exit(1);
1310 }
1311
1312 static void freeClientArgv(redisClient *c) {
1313 int j;
1314
1315 for (j = 0; j < c->argc; j++)
1316 decrRefCount(c->argv[j]);
1317 for (j = 0; j < c->mbargc; j++)
1318 decrRefCount(c->mbargv[j]);
1319 c->argc = 0;
1320 c->mbargc = 0;
1321 }
1322
1323 static void freeClient(redisClient *c) {
1324 listNode *ln;
1325
1326 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1327 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1328 sdsfree(c->querybuf);
1329 listRelease(c->reply);
1330 freeClientArgv(c);
1331 close(c->fd);
1332 ln = listSearchKey(server.clients,c);
1333 assert(ln != NULL);
1334 listDelNode(server.clients,ln);
1335 if (c->flags & REDIS_SLAVE) {
1336 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1337 close(c->repldbfd);
1338 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1339 ln = listSearchKey(l,c);
1340 assert(ln != NULL);
1341 listDelNode(l,ln);
1342 }
1343 if (c->flags & REDIS_MASTER) {
1344 server.master = NULL;
1345 server.replstate = REDIS_REPL_CONNECT;
1346 }
1347 zfree(c->argv);
1348 zfree(c->mbargv);
1349 zfree(c);
1350 }
1351
1352 #define GLUEREPLY_UP_TO (1024)
1353 static void glueReplyBuffersIfNeeded(redisClient *c) {
1354 int copylen = 0;
1355 char buf[GLUEREPLY_UP_TO];
1356 listNode *ln;
1357 robj *o;
1358
1359 listRewind(c->reply);
1360 while((ln = listYield(c->reply))) {
1361 int objlen;
1362
1363 o = ln->value;
1364 objlen = sdslen(o->ptr);
1365 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1366 memcpy(buf+copylen,o->ptr,objlen);
1367 copylen += objlen;
1368 listDelNode(c->reply,ln);
1369 } else {
1370 if (copylen == 0) return;
1371 break;
1372 }
1373 }
1374 /* Now the output buffer is empty, add the new single element */
1375 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1376 listAddNodeHead(c->reply,o);
1377 }
1378
1379 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1380 redisClient *c = privdata;
1381 int nwritten = 0, totwritten = 0, objlen;
1382 robj *o;
1383 REDIS_NOTUSED(el);
1384 REDIS_NOTUSED(mask);
1385
1386
1387 /* Use writev() if we have enough buffers to send */
1388 #if 0
1389 if (!server.glueoutputbuf &&
1390 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1391 !(c->flags & REDIS_MASTER))
1392 {
1393 sendReplyToClientWritev(el, fd, privdata, mask);
1394 return;
1395 }
1396 #endif
1397
1398 while(listLength(c->reply)) {
1399 if (server.glueoutputbuf && listLength(c->reply) > 1)
1400 glueReplyBuffersIfNeeded(c);
1401
1402 o = listNodeValue(listFirst(c->reply));
1403 objlen = sdslen(o->ptr);
1404
1405 if (objlen == 0) {
1406 listDelNode(c->reply,listFirst(c->reply));
1407 continue;
1408 }
1409
1410 if (c->flags & REDIS_MASTER) {
1411 /* Don't reply to a master */
1412 nwritten = objlen - c->sentlen;
1413 } else {
1414 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1415 if (nwritten <= 0) break;
1416 }
1417 c->sentlen += nwritten;
1418 totwritten += nwritten;
1419 /* If we fully sent the object on head go to the next one */
1420 if (c->sentlen == objlen) {
1421 listDelNode(c->reply,listFirst(c->reply));
1422 c->sentlen = 0;
1423 }
1424 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1425 * bytes, in a single threaded server it's a good idea to serve
1426 * other clients as well, even if a very large request comes from
1427 * super fast link that is always able to accept data (in real world
1428 * scenario think about 'KEYS *' against the loopback interfae) */
1429 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1430 }
1431 if (nwritten == -1) {
1432 if (errno == EAGAIN) {
1433 nwritten = 0;
1434 } else {
1435 redisLog(REDIS_DEBUG,
1436 "Error writing to client: %s", strerror(errno));
1437 freeClient(c);
1438 return;
1439 }
1440 }
1441 if (totwritten > 0) c->lastinteraction = time(NULL);
1442 if (listLength(c->reply) == 0) {
1443 c->sentlen = 0;
1444 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1445 }
1446 }
1447
1448 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1449 {
1450 redisClient *c = privdata;
1451 int nwritten = 0, totwritten = 0, objlen, willwrite;
1452 robj *o;
1453 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1454 int offset, ion = 0;
1455 REDIS_NOTUSED(el);
1456 REDIS_NOTUSED(mask);
1457
1458 listNode *node;
1459 while (listLength(c->reply)) {
1460 offset = c->sentlen;
1461 ion = 0;
1462 willwrite = 0;
1463
1464 /* fill-in the iov[] array */
1465 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1466 o = listNodeValue(node);
1467 objlen = sdslen(o->ptr);
1468
1469 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1470 break;
1471
1472 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1473 break; /* no more iovecs */
1474
1475 iov[ion].iov_base = ((char*)o->ptr) + offset;
1476 iov[ion].iov_len = objlen - offset;
1477 willwrite += objlen - offset;
1478 offset = 0; /* just for the first item */
1479 ion++;
1480 }
1481
1482 if(willwrite == 0)
1483 break;
1484
1485 /* write all collected blocks at once */
1486 if((nwritten = writev(fd, iov, ion)) < 0) {
1487 if (errno != EAGAIN) {
1488 redisLog(REDIS_DEBUG,
1489 "Error writing to client: %s", strerror(errno));
1490 freeClient(c);
1491 return;
1492 }
1493 break;
1494 }
1495
1496 totwritten += nwritten;
1497 offset = c->sentlen;
1498
1499 /* remove written robjs from c->reply */
1500 while (nwritten && listLength(c->reply)) {
1501 o = listNodeValue(listFirst(c->reply));
1502 objlen = sdslen(o->ptr);
1503
1504 if(nwritten >= objlen - offset) {
1505 listDelNode(c->reply, listFirst(c->reply));
1506 nwritten -= objlen - offset;
1507 c->sentlen = 0;
1508 } else {
1509 /* partial write */
1510 c->sentlen += nwritten;
1511 break;
1512 }
1513 offset = 0;
1514 }
1515 }
1516
1517 if (totwritten > 0)
1518 c->lastinteraction = time(NULL);
1519
1520 if (listLength(c->reply) == 0) {
1521 c->sentlen = 0;
1522 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1523 }
1524 }
1525
1526 static struct redisCommand *lookupCommand(char *name) {
1527 int j = 0;
1528 while(cmdTable[j].name != NULL) {
1529 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1530 j++;
1531 }
1532 return NULL;
1533 }
1534
1535 /* resetClient prepare the client to process the next command */
1536 static void resetClient(redisClient *c) {
1537 freeClientArgv(c);
1538 c->bulklen = -1;
1539 c->multibulk = 0;
1540 }
1541
1542 /* If this function gets called we already read a whole
1543 * command, argments are in the client argv/argc fields.
1544 * processCommand() execute the command or prepare the
1545 * server for a bulk read from the client.
1546 *
1547 * If 1 is returned the client is still alive and valid and
1548 * and other operations can be performed by the caller. Otherwise
1549 * if 0 is returned the client was destroied (i.e. after QUIT). */
1550 static int processCommand(redisClient *c) {
1551 struct redisCommand *cmd;
1552 long long dirty;
1553
1554 /* Free some memory if needed (maxmemory setting) */
1555 if (server.maxmemory) freeMemoryIfNeeded();
1556
1557 /* Handle the multi bulk command type. This is an alternative protocol
1558 * supported by Redis in order to receive commands that are composed of
1559 * multiple binary-safe "bulk" arguments. The latency of processing is
1560 * a bit higher but this allows things like multi-sets, so if this
1561 * protocol is used only for MSET and similar commands this is a big win. */
1562 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1563 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1564 if (c->multibulk <= 0) {
1565 resetClient(c);
1566 return 1;
1567 } else {
1568 decrRefCount(c->argv[c->argc-1]);
1569 c->argc--;
1570 return 1;
1571 }
1572 } else if (c->multibulk) {
1573 if (c->bulklen == -1) {
1574 if (((char*)c->argv[0]->ptr)[0] != '$') {
1575 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1576 resetClient(c);
1577 return 1;
1578 } else {
1579 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1580 decrRefCount(c->argv[0]);
1581 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1582 c->argc--;
1583 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1584 resetClient(c);
1585 return 1;
1586 }
1587 c->argc--;
1588 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1589 return 1;
1590 }
1591 } else {
1592 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1593 c->mbargv[c->mbargc] = c->argv[0];
1594 c->mbargc++;
1595 c->argc--;
1596 c->multibulk--;
1597 if (c->multibulk == 0) {
1598 robj **auxargv;
1599 int auxargc;
1600
1601 /* Here we need to swap the multi-bulk argc/argv with the
1602 * normal argc/argv of the client structure. */
1603 auxargv = c->argv;
1604 c->argv = c->mbargv;
1605 c->mbargv = auxargv;
1606
1607 auxargc = c->argc;
1608 c->argc = c->mbargc;
1609 c->mbargc = auxargc;
1610
1611 /* We need to set bulklen to something different than -1
1612 * in order for the code below to process the command without
1613 * to try to read the last argument of a bulk command as
1614 * a special argument. */
1615 c->bulklen = 0;
1616 /* continue below and process the command */
1617 } else {
1618 c->bulklen = -1;
1619 return 1;
1620 }
1621 }
1622 }
1623 /* -- end of multi bulk commands processing -- */
1624
1625 /* The QUIT command is handled as a special case. Normal command
1626 * procs are unable to close the client connection safely */
1627 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1628 freeClient(c);
1629 return 0;
1630 }
1631 cmd = lookupCommand(c->argv[0]->ptr);
1632 if (!cmd) {
1633 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1634 resetClient(c);
1635 return 1;
1636 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1637 (c->argc < -cmd->arity)) {
1638 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1639 resetClient(c);
1640 return 1;
1641 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1642 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1643 resetClient(c);
1644 return 1;
1645 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1646 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1647
1648 decrRefCount(c->argv[c->argc-1]);
1649 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1650 c->argc--;
1651 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1652 resetClient(c);
1653 return 1;
1654 }
1655 c->argc--;
1656 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1657 /* It is possible that the bulk read is already in the
1658 * buffer. Check this condition and handle it accordingly.
1659 * This is just a fast path, alternative to call processInputBuffer().
1660 * It's a good idea since the code is small and this condition
1661 * happens most of the times. */
1662 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1663 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1664 c->argc++;
1665 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1666 } else {
1667 return 1;
1668 }
1669 }
1670 /* Let's try to share objects on the command arguments vector */
1671 if (server.shareobjects) {
1672 int j;
1673 for(j = 1; j < c->argc; j++)
1674 c->argv[j] = tryObjectSharing(c->argv[j]);
1675 }
1676 /* Let's try to encode the bulk object to save space. */
1677 if (cmd->flags & REDIS_CMD_BULK)
1678 tryObjectEncoding(c->argv[c->argc-1]);
1679
1680 /* Check if the user is authenticated */
1681 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1682 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1683 resetClient(c);
1684 return 1;
1685 }
1686
1687 /* Exec the command */
1688 dirty = server.dirty;
1689 cmd->proc(c);
1690 if (server.appendonly && server.dirty-dirty)
1691 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1692 if (server.dirty-dirty && listLength(server.slaves))
1693 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1694 if (listLength(server.monitors))
1695 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1696 server.stat_numcommands++;
1697
1698 /* Prepare the client for the next command */
1699 if (c->flags & REDIS_CLOSE) {
1700 freeClient(c);
1701 return 0;
1702 }
1703 resetClient(c);
1704 return 1;
1705 }
1706
1707 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1708 listNode *ln;
1709 int outc = 0, j;
1710 robj **outv;
1711 /* (args*2)+1 is enough room for args, spaces, newlines */
1712 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1713
1714 if (argc <= REDIS_STATIC_ARGS) {
1715 outv = static_outv;
1716 } else {
1717 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1718 }
1719
1720 for (j = 0; j < argc; j++) {
1721 if (j != 0) outv[outc++] = shared.space;
1722 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1723 robj *lenobj;
1724
1725 lenobj = createObject(REDIS_STRING,
1726 sdscatprintf(sdsempty(),"%d\r\n",
1727 stringObjectLen(argv[j])));
1728 lenobj->refcount = 0;
1729 outv[outc++] = lenobj;
1730 }
1731 outv[outc++] = argv[j];
1732 }
1733 outv[outc++] = shared.crlf;
1734
1735 /* Increment all the refcounts at start and decrement at end in order to
1736 * be sure to free objects if there is no slave in a replication state
1737 * able to be feed with commands */
1738 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1739 listRewind(slaves);
1740 while((ln = listYield(slaves))) {
1741 redisClient *slave = ln->value;
1742
1743 /* Don't feed slaves that are still waiting for BGSAVE to start */
1744 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1745
1746 /* Feed all the other slaves, MONITORs and so on */
1747 if (slave->slaveseldb != dictid) {
1748 robj *selectcmd;
1749
1750 switch(dictid) {
1751 case 0: selectcmd = shared.select0; break;
1752 case 1: selectcmd = shared.select1; break;
1753 case 2: selectcmd = shared.select2; break;
1754 case 3: selectcmd = shared.select3; break;
1755 case 4: selectcmd = shared.select4; break;
1756 case 5: selectcmd = shared.select5; break;
1757 case 6: selectcmd = shared.select6; break;
1758 case 7: selectcmd = shared.select7; break;
1759 case 8: selectcmd = shared.select8; break;
1760 case 9: selectcmd = shared.select9; break;
1761 default:
1762 selectcmd = createObject(REDIS_STRING,
1763 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1764 selectcmd->refcount = 0;
1765 break;
1766 }
1767 addReply(slave,selectcmd);
1768 slave->slaveseldb = dictid;
1769 }
1770 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1771 }
1772 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1773 if (outv != static_outv) zfree(outv);
1774 }
1775
1776 static void processInputBuffer(redisClient *c) {
1777 again:
1778 if (c->bulklen == -1) {
1779 /* Read the first line of the query */
1780 char *p = strchr(c->querybuf,'\n');
1781 size_t querylen;
1782
1783 if (p) {
1784 sds query, *argv;
1785 int argc, j;
1786
1787 query = c->querybuf;
1788 c->querybuf = sdsempty();
1789 querylen = 1+(p-(query));
1790 if (sdslen(query) > querylen) {
1791 /* leave data after the first line of the query in the buffer */
1792 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1793 }
1794 *p = '\0'; /* remove "\n" */
1795 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1796 sdsupdatelen(query);
1797
1798 /* Now we can split the query in arguments */
1799 if (sdslen(query) == 0) {
1800 /* Ignore empty query */
1801 sdsfree(query);
1802 return;
1803 }
1804 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1805 sdsfree(query);
1806
1807 if (c->argv) zfree(c->argv);
1808 c->argv = zmalloc(sizeof(robj*)*argc);
1809
1810 for (j = 0; j < argc; j++) {
1811 if (sdslen(argv[j])) {
1812 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1813 c->argc++;
1814 } else {
1815 sdsfree(argv[j]);
1816 }
1817 }
1818 zfree(argv);
1819 /* Execute the command. If the client is still valid
1820 * after processCommand() return and there is something
1821 * on the query buffer try to process the next command. */
1822 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1823 return;
1824 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1825 redisLog(REDIS_DEBUG, "Client protocol error");
1826 freeClient(c);
1827 return;
1828 }
1829 } else {
1830 /* Bulk read handling. Note that if we are at this point
1831 the client already sent a command terminated with a newline,
1832 we are reading the bulk data that is actually the last
1833 argument of the command. */
1834 int qbl = sdslen(c->querybuf);
1835
1836 if (c->bulklen <= qbl) {
1837 /* Copy everything but the final CRLF as final argument */
1838 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1839 c->argc++;
1840 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1841 /* Process the command. If the client is still valid after
1842 * the processing and there is more data in the buffer
1843 * try to parse it. */
1844 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1845 return;
1846 }
1847 }
1848 }
1849
1850 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1851 redisClient *c = (redisClient*) privdata;
1852 char buf[REDIS_IOBUF_LEN];
1853 int nread;
1854 REDIS_NOTUSED(el);
1855 REDIS_NOTUSED(mask);
1856
1857 nread = read(fd, buf, REDIS_IOBUF_LEN);
1858 if (nread == -1) {
1859 if (errno == EAGAIN) {
1860 nread = 0;
1861 } else {
1862 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1863 freeClient(c);
1864 return;
1865 }
1866 } else if (nread == 0) {
1867 redisLog(REDIS_DEBUG, "Client closed connection");
1868 freeClient(c);
1869 return;
1870 }
1871 if (nread) {
1872 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1873 c->lastinteraction = time(NULL);
1874 } else {
1875 return;
1876 }
1877 processInputBuffer(c);
1878 }
1879
1880 static int selectDb(redisClient *c, int id) {
1881 if (id < 0 || id >= server.dbnum)
1882 return REDIS_ERR;
1883 c->db = &server.db[id];
1884 return REDIS_OK;
1885 }
1886
1887 static void *dupClientReplyValue(void *o) {
1888 incrRefCount((robj*)o);
1889 return 0;
1890 }
1891
1892 static redisClient *createClient(int fd) {
1893 redisClient *c = zmalloc(sizeof(*c));
1894
1895 anetNonBlock(NULL,fd);
1896 anetTcpNoDelay(NULL,fd);
1897 if (!c) return NULL;
1898 selectDb(c,0);
1899 c->fd = fd;
1900 c->querybuf = sdsempty();
1901 c->argc = 0;
1902 c->argv = NULL;
1903 c->bulklen = -1;
1904 c->multibulk = 0;
1905 c->mbargc = 0;
1906 c->mbargv = NULL;
1907 c->sentlen = 0;
1908 c->flags = 0;
1909 c->lastinteraction = time(NULL);
1910 c->authenticated = 0;
1911 c->replstate = REDIS_REPL_NONE;
1912 c->reply = listCreate();
1913 listSetFreeMethod(c->reply,decrRefCount);
1914 listSetDupMethod(c->reply,dupClientReplyValue);
1915 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1916 readQueryFromClient, c, NULL) == AE_ERR) {
1917 freeClient(c);
1918 return NULL;
1919 }
1920 listAddNodeTail(server.clients,c);
1921 return c;
1922 }
1923
1924 static void addReply(redisClient *c, robj *obj) {
1925 if (listLength(c->reply) == 0 &&
1926 (c->replstate == REDIS_REPL_NONE ||
1927 c->replstate == REDIS_REPL_ONLINE) &&
1928 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1929 sendReplyToClient, c, NULL) == AE_ERR) return;
1930 if (obj->encoding != REDIS_ENCODING_RAW) {
1931 obj = getDecodedObject(obj);
1932 } else {
1933 incrRefCount(obj);
1934 }
1935 listAddNodeTail(c->reply,obj);
1936 }
1937
1938 static void addReplySds(redisClient *c, sds s) {
1939 robj *o = createObject(REDIS_STRING,s);
1940 addReply(c,o);
1941 decrRefCount(o);
1942 }
1943
1944 static void addReplyBulkLen(redisClient *c, robj *obj) {
1945 size_t len;
1946
1947 if (obj->encoding == REDIS_ENCODING_RAW) {
1948 len = sdslen(obj->ptr);
1949 } else {
1950 long n = (long)obj->ptr;
1951
1952 len = 1;
1953 if (n < 0) {
1954 len++;
1955 n = -n;
1956 }
1957 while((n = n/10) != 0) {
1958 len++;
1959 }
1960 }
1961 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
1962 }
1963
1964 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1965 int cport, cfd;
1966 char cip[128];
1967 redisClient *c;
1968 REDIS_NOTUSED(el);
1969 REDIS_NOTUSED(mask);
1970 REDIS_NOTUSED(privdata);
1971
1972 cfd = anetAccept(server.neterr, fd, cip, &cport);
1973 if (cfd == AE_ERR) {
1974 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1975 return;
1976 }
1977 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1978 if ((c = createClient(cfd)) == NULL) {
1979 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1980 close(cfd); /* May be already closed, just ingore errors */
1981 return;
1982 }
1983 /* If maxclient directive is set and this is one client more... close the
1984 * connection. Note that we create the client instead to check before
1985 * for this condition, since now the socket is already set in nonblocking
1986 * mode and we can send an error for free using the Kernel I/O */
1987 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1988 char *err = "-ERR max number of clients reached\r\n";
1989
1990 /* That's a best effort error message, don't check write errors */
1991 if (write(c->fd,err,strlen(err)) == -1) {
1992 /* Nothing to do, Just to avoid the warning... */
1993 }
1994 freeClient(c);
1995 return;
1996 }
1997 server.stat_numconnections++;
1998 }
1999
2000 /* ======================= Redis objects implementation ===================== */
2001
2002 static robj *createObject(int type, void *ptr) {
2003 robj *o;
2004
2005 if (listLength(server.objfreelist)) {
2006 listNode *head = listFirst(server.objfreelist);
2007 o = listNodeValue(head);
2008 listDelNode(server.objfreelist,head);
2009 } else {
2010 o = zmalloc(sizeof(*o));
2011 }
2012 o->type = type;
2013 o->encoding = REDIS_ENCODING_RAW;
2014 o->ptr = ptr;
2015 o->refcount = 1;
2016 return o;
2017 }
2018
2019 static robj *createStringObject(char *ptr, size_t len) {
2020 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2021 }
2022
2023 static robj *createListObject(void) {
2024 list *l = listCreate();
2025
2026 listSetFreeMethod(l,decrRefCount);
2027 return createObject(REDIS_LIST,l);
2028 }
2029
2030 static robj *createSetObject(void) {
2031 dict *d = dictCreate(&setDictType,NULL);
2032 return createObject(REDIS_SET,d);
2033 }
2034
2035 static robj *createZsetObject(void) {
2036 zset *zs = zmalloc(sizeof(*zs));
2037
2038 zs->dict = dictCreate(&zsetDictType,NULL);
2039 zs->zsl = zslCreate();
2040 return createObject(REDIS_ZSET,zs);
2041 }
2042
2043 static void freeStringObject(robj *o) {
2044 if (o->encoding == REDIS_ENCODING_RAW) {
2045 sdsfree(o->ptr);
2046 }
2047 }
2048
2049 static void freeListObject(robj *o) {
2050 listRelease((list*) o->ptr);
2051 }
2052
2053 static void freeSetObject(robj *o) {
2054 dictRelease((dict*) o->ptr);
2055 }
2056
2057 static void freeZsetObject(robj *o) {
2058 zset *zs = o->ptr;
2059
2060 dictRelease(zs->dict);
2061 zslFree(zs->zsl);
2062 zfree(zs);
2063 }
2064
2065 static void freeHashObject(robj *o) {
2066 dictRelease((dict*) o->ptr);
2067 }
2068
2069 static void incrRefCount(robj *o) {
2070 o->refcount++;
2071 #ifdef DEBUG_REFCOUNT
2072 if (o->type == REDIS_STRING)
2073 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2074 #endif
2075 }
2076
2077 static void decrRefCount(void *obj) {
2078 robj *o = obj;
2079
2080 #ifdef DEBUG_REFCOUNT
2081 if (o->type == REDIS_STRING)
2082 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2083 #endif
2084 if (--(o->refcount) == 0) {
2085 switch(o->type) {
2086 case REDIS_STRING: freeStringObject(o); break;
2087 case REDIS_LIST: freeListObject(o); break;
2088 case REDIS_SET: freeSetObject(o); break;
2089 case REDIS_ZSET: freeZsetObject(o); break;
2090 case REDIS_HASH: freeHashObject(o); break;
2091 default: assert(0 != 0); break;
2092 }
2093 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2094 !listAddNodeHead(server.objfreelist,o))
2095 zfree(o);
2096 }
2097 }
2098
2099 static robj *lookupKey(redisDb *db, robj *key) {
2100 dictEntry *de = dictFind(db->dict,key);
2101 return de ? dictGetEntryVal(de) : NULL;
2102 }
2103
2104 static robj *lookupKeyRead(redisDb *db, robj *key) {
2105 expireIfNeeded(db,key);
2106 return lookupKey(db,key);
2107 }
2108
2109 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2110 deleteIfVolatile(db,key);
2111 return lookupKey(db,key);
2112 }
2113
2114 static int deleteKey(redisDb *db, robj *key) {
2115 int retval;
2116
2117 /* We need to protect key from destruction: after the first dictDelete()
2118 * it may happen that 'key' is no longer valid if we don't increment
2119 * it's count. This may happen when we get the object reference directly
2120 * from the hash table with dictRandomKey() or dict iterators */
2121 incrRefCount(key);
2122 if (dictSize(db->expires)) dictDelete(db->expires,key);
2123 retval = dictDelete(db->dict,key);
2124 decrRefCount(key);
2125
2126 return retval == DICT_OK;
2127 }
2128
2129 /* Try to share an object against the shared objects pool */
2130 static robj *tryObjectSharing(robj *o) {
2131 struct dictEntry *de;
2132 unsigned long c;
2133
2134 if (o == NULL || server.shareobjects == 0) return o;
2135
2136 assert(o->type == REDIS_STRING);
2137 de = dictFind(server.sharingpool,o);
2138 if (de) {
2139 robj *shared = dictGetEntryKey(de);
2140
2141 c = ((unsigned long) dictGetEntryVal(de))+1;
2142 dictGetEntryVal(de) = (void*) c;
2143 incrRefCount(shared);
2144 decrRefCount(o);
2145 return shared;
2146 } else {
2147 /* Here we are using a stream algorihtm: Every time an object is
2148 * shared we increment its count, everytime there is a miss we
2149 * recrement the counter of a random object. If this object reaches
2150 * zero we remove the object and put the current object instead. */
2151 if (dictSize(server.sharingpool) >=
2152 server.sharingpoolsize) {
2153 de = dictGetRandomKey(server.sharingpool);
2154 assert(de != NULL);
2155 c = ((unsigned long) dictGetEntryVal(de))-1;
2156 dictGetEntryVal(de) = (void*) c;
2157 if (c == 0) {
2158 dictDelete(server.sharingpool,de->key);
2159 }
2160 } else {
2161 c = 0; /* If the pool is empty we want to add this object */
2162 }
2163 if (c == 0) {
2164 int retval;
2165
2166 retval = dictAdd(server.sharingpool,o,(void*)1);
2167 assert(retval == DICT_OK);
2168 incrRefCount(o);
2169 }
2170 return o;
2171 }
2172 }
2173
2174 /* Check if the nul-terminated string 's' can be represented by a long
2175 * (that is, is a number that fits into long without any other space or
2176 * character before or after the digits).
2177 *
2178 * If so, the function returns REDIS_OK and *longval is set to the value
2179 * of the number. Otherwise REDIS_ERR is returned */
2180 static int isStringRepresentableAsLong(sds s, long *longval) {
2181 char buf[32], *endptr;
2182 long value;
2183 int slen;
2184
2185 value = strtol(s, &endptr, 10);
2186 if (endptr[0] != '\0') return REDIS_ERR;
2187 slen = snprintf(buf,32,"%ld",value);
2188
2189 /* If the number converted back into a string is not identical
2190 * then it's not possible to encode the string as integer */
2191 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2192 if (longval) *longval = value;
2193 return REDIS_OK;
2194 }
2195
2196 /* Try to encode a string object in order to save space */
2197 static int tryObjectEncoding(robj *o) {
2198 long value;
2199 sds s = o->ptr;
2200
2201 if (o->encoding != REDIS_ENCODING_RAW)
2202 return REDIS_ERR; /* Already encoded */
2203
2204 /* It's not save to encode shared objects: shared objects can be shared
2205 * everywhere in the "object space" of Redis. Encoded objects can only
2206 * appear as "values" (and not, for instance, as keys) */
2207 if (o->refcount > 1) return REDIS_ERR;
2208
2209 /* Currently we try to encode only strings */
2210 assert(o->type == REDIS_STRING);
2211
2212 /* Check if we can represent this string as a long integer */
2213 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2214
2215 /* Ok, this object can be encoded */
2216 o->encoding = REDIS_ENCODING_INT;
2217 sdsfree(o->ptr);
2218 o->ptr = (void*) value;
2219 return REDIS_OK;
2220 }
2221
2222 /* Get a decoded version of an encoded object (returned as a new object) */
2223 static robj *getDecodedObject(const robj *o) {
2224 robj *dec;
2225
2226 assert(o->encoding != REDIS_ENCODING_RAW);
2227 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2228 char buf[32];
2229
2230 snprintf(buf,32,"%ld",(long)o->ptr);
2231 dec = createStringObject(buf,strlen(buf));
2232 return dec;
2233 } else {
2234 assert(1 != 1);
2235 }
2236 }
2237
2238 /* Compare two string objects via strcmp() or alike.
2239 * Note that the objects may be integer-encoded. In such a case we
2240 * use snprintf() to get a string representation of the numbers on the stack
2241 * and compare the strings, it's much faster than calling getDecodedObject(). */
2242 static int compareStringObjects(robj *a, robj *b) {
2243 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2244 char bufa[128], bufb[128], *astr, *bstr;
2245 int bothsds = 1;
2246
2247 if (a == b) return 0;
2248 if (a->encoding != REDIS_ENCODING_RAW) {
2249 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2250 astr = bufa;
2251 bothsds = 0;
2252 } else {
2253 astr = a->ptr;
2254 }
2255 if (b->encoding != REDIS_ENCODING_RAW) {
2256 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2257 bstr = bufb;
2258 bothsds = 0;
2259 } else {
2260 bstr = b->ptr;
2261 }
2262 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2263 }
2264
2265 static size_t stringObjectLen(robj *o) {
2266 assert(o->type == REDIS_STRING);
2267 if (o->encoding == REDIS_ENCODING_RAW) {
2268 return sdslen(o->ptr);
2269 } else {
2270 char buf[32];
2271
2272 return snprintf(buf,32,"%ld",(long)o->ptr);
2273 }
2274 }
2275
2276 /*============================ DB saving/loading ============================ */
2277
2278 static int rdbSaveType(FILE *fp, unsigned char type) {
2279 if (fwrite(&type,1,1,fp) == 0) return -1;
2280 return 0;
2281 }
2282
2283 static int rdbSaveTime(FILE *fp, time_t t) {
2284 int32_t t32 = (int32_t) t;
2285 if (fwrite(&t32,4,1,fp) == 0) return -1;
2286 return 0;
2287 }
2288
2289 /* check rdbLoadLen() comments for more info */
2290 static int rdbSaveLen(FILE *fp, uint32_t len) {
2291 unsigned char buf[2];
2292
2293 if (len < (1<<6)) {
2294 /* Save a 6 bit len */
2295 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2296 if (fwrite(buf,1,1,fp) == 0) return -1;
2297 } else if (len < (1<<14)) {
2298 /* Save a 14 bit len */
2299 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2300 buf[1] = len&0xFF;
2301 if (fwrite(buf,2,1,fp) == 0) return -1;
2302 } else {
2303 /* Save a 32 bit len */
2304 buf[0] = (REDIS_RDB_32BITLEN<<6);
2305 if (fwrite(buf,1,1,fp) == 0) return -1;
2306 len = htonl(len);
2307 if (fwrite(&len,4,1,fp) == 0) return -1;
2308 }
2309 return 0;
2310 }
2311
2312 /* String objects in the form "2391" "-100" without any space and with a
2313 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2314 * encoded as integers to save space */
2315 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2316 long long value;
2317 char *endptr, buf[32];
2318
2319 /* Check if it's possible to encode this value as a number */
2320 value = strtoll(s, &endptr, 10);
2321 if (endptr[0] != '\0') return 0;
2322 snprintf(buf,32,"%lld",value);
2323
2324 /* If the number converted back into a string is not identical
2325 * then it's not possible to encode the string as integer */
2326 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2327
2328 /* Finally check if it fits in our ranges */
2329 if (value >= -(1<<7) && value <= (1<<7)-1) {
2330 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2331 enc[1] = value&0xFF;
2332 return 2;
2333 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2334 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2335 enc[1] = value&0xFF;
2336 enc[2] = (value>>8)&0xFF;
2337 return 3;
2338 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2339 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2340 enc[1] = value&0xFF;
2341 enc[2] = (value>>8)&0xFF;
2342 enc[3] = (value>>16)&0xFF;
2343 enc[4] = (value>>24)&0xFF;
2344 return 5;
2345 } else {
2346 return 0;
2347 }
2348 }
2349
2350 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2351 unsigned int comprlen, outlen;
2352 unsigned char byte;
2353 void *out;
2354
2355 /* We require at least four bytes compression for this to be worth it */
2356 outlen = sdslen(obj->ptr)-4;
2357 if (outlen <= 0) return 0;
2358 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2359 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2360 if (comprlen == 0) {
2361 zfree(out);
2362 return 0;
2363 }
2364 /* Data compressed! Let's save it on disk */
2365 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2366 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2367 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2368 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2369 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2370 zfree(out);
2371 return comprlen;
2372
2373 writeerr:
2374 zfree(out);
2375 return -1;
2376 }
2377
2378 /* Save a string objet as [len][data] on disk. If the object is a string
2379 * representation of an integer value we try to safe it in a special form */
2380 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2381 size_t len;
2382 int enclen;
2383
2384 len = sdslen(obj->ptr);
2385
2386 /* Try integer encoding */
2387 if (len <= 11) {
2388 unsigned char buf[5];
2389 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2390 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2391 return 0;
2392 }
2393 }
2394
2395 /* Try LZF compression - under 20 bytes it's unable to compress even
2396 * aaaaaaaaaaaaaaaaaa so skip it */
2397 if (len > 20) {
2398 int retval;
2399
2400 retval = rdbSaveLzfStringObject(fp,obj);
2401 if (retval == -1) return -1;
2402 if (retval > 0) return 0;
2403 /* retval == 0 means data can't be compressed, save the old way */
2404 }
2405
2406 /* Store verbatim */
2407 if (rdbSaveLen(fp,len) == -1) return -1;
2408 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2409 return 0;
2410 }
2411
2412 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2413 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2414 int retval;
2415 robj *dec;
2416
2417 if (obj->encoding != REDIS_ENCODING_RAW) {
2418 dec = getDecodedObject(obj);
2419 retval = rdbSaveStringObjectRaw(fp,dec);
2420 decrRefCount(dec);
2421 return retval;
2422 } else {
2423 return rdbSaveStringObjectRaw(fp,obj);
2424 }
2425 }
2426
2427 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2428 * 8 bit integer specifing the length of the representation.
2429 * This 8 bit integer has special values in order to specify the following
2430 * conditions:
2431 * 253: not a number
2432 * 254: + inf
2433 * 255: - inf
2434 */
2435 static int rdbSaveDoubleValue(FILE *fp, double val) {
2436 unsigned char buf[128];
2437 int len;
2438
2439 if (isnan(val)) {
2440 buf[0] = 253;
2441 len = 1;
2442 } else if (!isfinite(val)) {
2443 len = 1;
2444 buf[0] = (val < 0) ? 255 : 254;
2445 } else {
2446 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2447 buf[0] = strlen((char*)buf);
2448 len = buf[0]+1;
2449 }
2450 if (fwrite(buf,len,1,fp) == 0) return -1;
2451 return 0;
2452 }
2453
2454 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2455 static int rdbSave(char *filename) {
2456 dictIterator *di = NULL;
2457 dictEntry *de;
2458 FILE *fp;
2459 char tmpfile[256];
2460 int j;
2461 time_t now = time(NULL);
2462
2463 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2464 fp = fopen(tmpfile,"w");
2465 if (!fp) {
2466 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2467 return REDIS_ERR;
2468 }
2469 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2470 for (j = 0; j < server.dbnum; j++) {
2471 redisDb *db = server.db+j;
2472 dict *d = db->dict;
2473 if (dictSize(d) == 0) continue;
2474 di = dictGetIterator(d);
2475 if (!di) {
2476 fclose(fp);
2477 return REDIS_ERR;
2478 }
2479
2480 /* Write the SELECT DB opcode */
2481 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2482 if (rdbSaveLen(fp,j) == -1) goto werr;
2483
2484 /* Iterate this DB writing every entry */
2485 while((de = dictNext(di)) != NULL) {
2486 robj *key = dictGetEntryKey(de);
2487 robj *o = dictGetEntryVal(de);
2488 time_t expiretime = getExpire(db,key);
2489
2490 /* Save the expire time */
2491 if (expiretime != -1) {
2492 /* If this key is already expired skip it */
2493 if (expiretime < now) continue;
2494 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2495 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2496 }
2497 /* Save the key and associated value */
2498 if (rdbSaveType(fp,o->type) == -1) goto werr;
2499 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2500 if (o->type == REDIS_STRING) {
2501 /* Save a string value */
2502 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2503 } else if (o->type == REDIS_LIST) {
2504 /* Save a list value */
2505 list *list = o->ptr;
2506 listNode *ln;
2507
2508 listRewind(list);
2509 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2510 while((ln = listYield(list))) {
2511 robj *eleobj = listNodeValue(ln);
2512
2513 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2514 }
2515 } else if (o->type == REDIS_SET) {
2516 /* Save a set value */
2517 dict *set = o->ptr;
2518 dictIterator *di = dictGetIterator(set);
2519 dictEntry *de;
2520
2521 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2522 while((de = dictNext(di)) != NULL) {
2523 robj *eleobj = dictGetEntryKey(de);
2524
2525 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2526 }
2527 dictReleaseIterator(di);
2528 } else if (o->type == REDIS_ZSET) {
2529 /* Save a set value */
2530 zset *zs = o->ptr;
2531 dictIterator *di = dictGetIterator(zs->dict);
2532 dictEntry *de;
2533
2534 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2535 while((de = dictNext(di)) != NULL) {
2536 robj *eleobj = dictGetEntryKey(de);
2537 double *score = dictGetEntryVal(de);
2538
2539 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2540 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2541 }
2542 dictReleaseIterator(di);
2543 } else {
2544 assert(0 != 0);
2545 }
2546 }
2547 dictReleaseIterator(di);
2548 }
2549 /* EOF opcode */
2550 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2551
2552 /* Make sure data will not remain on the OS's output buffers */
2553 fflush(fp);
2554 fsync(fileno(fp));
2555 fclose(fp);
2556
2557 /* Use RENAME to make sure the DB file is changed atomically only
2558 * if the generate DB file is ok. */
2559 if (rename(tmpfile,filename) == -1) {
2560 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2561 unlink(tmpfile);
2562 return REDIS_ERR;
2563 }
2564 redisLog(REDIS_NOTICE,"DB saved on disk");
2565 server.dirty = 0;
2566 server.lastsave = time(NULL);
2567 return REDIS_OK;
2568
2569 werr:
2570 fclose(fp);
2571 unlink(tmpfile);
2572 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2573 if (di) dictReleaseIterator(di);
2574 return REDIS_ERR;
2575 }
2576
2577 static int rdbSaveBackground(char *filename) {
2578 pid_t childpid;
2579
2580 if (server.bgsaveinprogress) return REDIS_ERR;
2581 if ((childpid = fork()) == 0) {
2582 /* Child */
2583 close(server.fd);
2584 if (rdbSave(filename) == REDIS_OK) {
2585 exit(0);
2586 } else {
2587 exit(1);
2588 }
2589 } else {
2590 /* Parent */
2591 if (childpid == -1) {
2592 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2593 strerror(errno));
2594 return REDIS_ERR;
2595 }
2596 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2597 server.bgsaveinprogress = 1;
2598 server.bgsavechildpid = childpid;
2599 return REDIS_OK;
2600 }
2601 return REDIS_OK; /* unreached */
2602 }
2603
2604 static void rdbRemoveTempFile(pid_t childpid) {
2605 char tmpfile[256];
2606
2607 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2608 unlink(tmpfile);
2609 }
2610
2611 static int rdbLoadType(FILE *fp) {
2612 unsigned char type;
2613 if (fread(&type,1,1,fp) == 0) return -1;
2614 return type;
2615 }
2616
2617 static time_t rdbLoadTime(FILE *fp) {
2618 int32_t t32;
2619 if (fread(&t32,4,1,fp) == 0) return -1;
2620 return (time_t) t32;
2621 }
2622
2623 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2624 * of this file for a description of how this are stored on disk.
2625 *
2626 * isencoded is set to 1 if the readed length is not actually a length but
2627 * an "encoding type", check the above comments for more info */
2628 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2629 unsigned char buf[2];
2630 uint32_t len;
2631
2632 if (isencoded) *isencoded = 0;
2633 if (rdbver == 0) {
2634 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2635 return ntohl(len);
2636 } else {
2637 int type;
2638
2639 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2640 type = (buf[0]&0xC0)>>6;
2641 if (type == REDIS_RDB_6BITLEN) {
2642 /* Read a 6 bit len */
2643 return buf[0]&0x3F;
2644 } else if (type == REDIS_RDB_ENCVAL) {
2645 /* Read a 6 bit len encoding type */
2646 if (isencoded) *isencoded = 1;
2647 return buf[0]&0x3F;
2648 } else if (type == REDIS_RDB_14BITLEN) {
2649 /* Read a 14 bit len */
2650 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2651 return ((buf[0]&0x3F)<<8)|buf[1];
2652 } else {
2653 /* Read a 32 bit len */
2654 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2655 return ntohl(len);
2656 }
2657 }
2658 }
2659
2660 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2661 unsigned char enc[4];
2662 long long val;
2663
2664 if (enctype == REDIS_RDB_ENC_INT8) {
2665 if (fread(enc,1,1,fp) == 0) return NULL;
2666 val = (signed char)enc[0];
2667 } else if (enctype == REDIS_RDB_ENC_INT16) {
2668 uint16_t v;
2669 if (fread(enc,2,1,fp) == 0) return NULL;
2670 v = enc[0]|(enc[1]<<8);
2671 val = (int16_t)v;
2672 } else if (enctype == REDIS_RDB_ENC_INT32) {
2673 uint32_t v;
2674 if (fread(enc,4,1,fp) == 0) return NULL;
2675 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2676 val = (int32_t)v;
2677 } else {
2678 val = 0; /* anti-warning */
2679 assert(0!=0);
2680 }
2681 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2682 }
2683
2684 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2685 unsigned int len, clen;
2686 unsigned char *c = NULL;
2687 sds val = NULL;
2688
2689 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2690 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2691 if ((c = zmalloc(clen)) == NULL) goto err;
2692 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2693 if (fread(c,clen,1,fp) == 0) goto err;
2694 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2695 zfree(c);
2696 return createObject(REDIS_STRING,val);
2697 err:
2698 zfree(c);
2699 sdsfree(val);
2700 return NULL;
2701 }
2702
2703 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2704 int isencoded;
2705 uint32_t len;
2706 sds val;
2707
2708 len = rdbLoadLen(fp,rdbver,&isencoded);
2709 if (isencoded) {
2710 switch(len) {
2711 case REDIS_RDB_ENC_INT8:
2712 case REDIS_RDB_ENC_INT16:
2713 case REDIS_RDB_ENC_INT32:
2714 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2715 case REDIS_RDB_ENC_LZF:
2716 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2717 default:
2718 assert(0!=0);
2719 }
2720 }
2721
2722 if (len == REDIS_RDB_LENERR) return NULL;
2723 val = sdsnewlen(NULL,len);
2724 if (len && fread(val,len,1,fp) == 0) {
2725 sdsfree(val);
2726 return NULL;
2727 }
2728 return tryObjectSharing(createObject(REDIS_STRING,val));
2729 }
2730
2731 /* For information about double serialization check rdbSaveDoubleValue() */
2732 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2733 char buf[128];
2734 unsigned char len;
2735
2736 if (fread(&len,1,1,fp) == 0) return -1;
2737 switch(len) {
2738 case 255: *val = R_NegInf; return 0;
2739 case 254: *val = R_PosInf; return 0;
2740 case 253: *val = R_Nan; return 0;
2741 default:
2742 if (fread(buf,len,1,fp) == 0) return -1;
2743 sscanf(buf, "%lg", val);
2744 return 0;
2745 }
2746 }
2747
2748 static int rdbLoad(char *filename) {
2749 FILE *fp;
2750 robj *keyobj = NULL;
2751 uint32_t dbid;
2752 int type, retval, rdbver;
2753 dict *d = server.db[0].dict;
2754 redisDb *db = server.db+0;
2755 char buf[1024];
2756 time_t expiretime = -1, now = time(NULL);
2757
2758 fp = fopen(filename,"r");
2759 if (!fp) return REDIS_ERR;
2760 if (fread(buf,9,1,fp) == 0) goto eoferr;
2761 buf[9] = '\0';
2762 if (memcmp(buf,"REDIS",5) != 0) {
2763 fclose(fp);
2764 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2765 return REDIS_ERR;
2766 }
2767 rdbver = atoi(buf+5);
2768 if (rdbver > 1) {
2769 fclose(fp);
2770 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2771 return REDIS_ERR;
2772 }
2773 while(1) {
2774 robj *o;
2775
2776 /* Read type. */
2777 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2778 if (type == REDIS_EXPIRETIME) {
2779 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2780 /* We read the time so we need to read the object type again */
2781 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2782 }
2783 if (type == REDIS_EOF) break;
2784 /* Handle SELECT DB opcode as a special case */
2785 if (type == REDIS_SELECTDB) {
2786 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2787 goto eoferr;
2788 if (dbid >= (unsigned)server.dbnum) {
2789 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2790 exit(1);
2791 }
2792 db = server.db+dbid;
2793 d = db->dict;
2794 continue;
2795 }
2796 /* Read key */
2797 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2798
2799 if (type == REDIS_STRING) {
2800 /* Read string value */
2801 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2802 tryObjectEncoding(o);
2803 } else if (type == REDIS_LIST || type == REDIS_SET) {
2804 /* Read list/set value */
2805 uint32_t listlen;
2806
2807 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2808 goto eoferr;
2809 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2810 /* Load every single element of the list/set */
2811 while(listlen--) {
2812 robj *ele;
2813
2814 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2815 tryObjectEncoding(ele);
2816 if (type == REDIS_LIST) {
2817 listAddNodeTail((list*)o->ptr,ele);
2818 } else {
2819 dictAdd((dict*)o->ptr,ele,NULL);
2820 }
2821 }
2822 } else if (type == REDIS_ZSET) {
2823 /* Read list/set value */
2824 uint32_t zsetlen;
2825 zset *zs;
2826
2827 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2828 goto eoferr;
2829 o = createZsetObject();
2830 zs = o->ptr;
2831 /* Load every single element of the list/set */
2832 while(zsetlen--) {
2833 robj *ele;
2834 double *score = zmalloc(sizeof(double));
2835
2836 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2837 tryObjectEncoding(ele);
2838 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2839 dictAdd(zs->dict,ele,score);
2840 zslInsert(zs->zsl,*score,ele);
2841 incrRefCount(ele); /* added to skiplist */
2842 }
2843 } else {
2844 assert(0 != 0);
2845 }
2846 /* Add the new object in the hash table */
2847 retval = dictAdd(d,keyobj,o);
2848 if (retval == DICT_ERR) {
2849 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2850 exit(1);
2851 }
2852 /* Set the expire time if needed */
2853 if (expiretime != -1) {
2854 setExpire(db,keyobj,expiretime);
2855 /* Delete this key if already expired */
2856 if (expiretime < now) deleteKey(db,keyobj);
2857 expiretime = -1;
2858 }
2859 keyobj = o = NULL;
2860 }
2861 fclose(fp);
2862 return REDIS_OK;
2863
2864 eoferr: /* unexpected end of file is handled here with a fatal exit */
2865 if (keyobj) decrRefCount(keyobj);
2866 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2867 exit(1);
2868 return REDIS_ERR; /* Just to avoid warning */
2869 }
2870
2871 /*================================== Commands =============================== */
2872
2873 static void authCommand(redisClient *c) {
2874 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2875 c->authenticated = 1;
2876 addReply(c,shared.ok);
2877 } else {
2878 c->authenticated = 0;
2879 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2880 }
2881 }
2882
2883 static void pingCommand(redisClient *c) {
2884 addReply(c,shared.pong);
2885 }
2886
2887 static void echoCommand(redisClient *c) {
2888 addReplyBulkLen(c,c->argv[1]);
2889 addReply(c,c->argv[1]);
2890 addReply(c,shared.crlf);
2891 }
2892
2893 /*=================================== Strings =============================== */
2894
2895 static void setGenericCommand(redisClient *c, int nx) {
2896 int retval;
2897
2898 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2899 if (retval == DICT_ERR) {
2900 if (!nx) {
2901 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2902 incrRefCount(c->argv[2]);
2903 } else {
2904 addReply(c,shared.czero);
2905 return;
2906 }
2907 } else {
2908 incrRefCount(c->argv[1]);
2909 incrRefCount(c->argv[2]);
2910 }
2911 server.dirty++;
2912 removeExpire(c->db,c->argv[1]);
2913 addReply(c, nx ? shared.cone : shared.ok);
2914 }
2915
2916 static void setCommand(redisClient *c) {
2917 setGenericCommand(c,0);
2918 }
2919
2920 static void setnxCommand(redisClient *c) {
2921 setGenericCommand(c,1);
2922 }
2923
2924 static void getCommand(redisClient *c) {
2925 robj *o = lookupKeyRead(c->db,c->argv[1]);
2926
2927 if (o == NULL) {
2928 addReply(c,shared.nullbulk);
2929 } else {
2930 if (o->type != REDIS_STRING) {
2931 addReply(c,shared.wrongtypeerr);
2932 } else {
2933 addReplyBulkLen(c,o);
2934 addReply(c,o);
2935 addReply(c,shared.crlf);
2936 }
2937 }
2938 }
2939
2940 static void getsetCommand(redisClient *c) {
2941 getCommand(c);
2942 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2943 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2944 } else {
2945 incrRefCount(c->argv[1]);
2946 }
2947 incrRefCount(c->argv[2]);
2948 server.dirty++;
2949 removeExpire(c->db,c->argv[1]);
2950 }
2951
2952 static void mgetCommand(redisClient *c) {
2953 int j;
2954
2955 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2956 for (j = 1; j < c->argc; j++) {
2957 robj *o = lookupKeyRead(c->db,c->argv[j]);
2958 if (o == NULL) {
2959 addReply(c,shared.nullbulk);
2960 } else {
2961 if (o->type != REDIS_STRING) {
2962 addReply(c,shared.nullbulk);
2963 } else {
2964 addReplyBulkLen(c,o);
2965 addReply(c,o);
2966 addReply(c,shared.crlf);
2967 }
2968 }
2969 }
2970 }
2971
2972 static void incrDecrCommand(redisClient *c, long long incr) {
2973 long long value;
2974 int retval;
2975 robj *o;
2976
2977 o = lookupKeyWrite(c->db,c->argv[1]);
2978 if (o == NULL) {
2979 value = 0;
2980 } else {
2981 if (o->type != REDIS_STRING) {
2982 value = 0;
2983 } else {
2984 char *eptr;
2985
2986 if (o->encoding == REDIS_ENCODING_RAW)
2987 value = strtoll(o->ptr, &eptr, 10);
2988 else if (o->encoding == REDIS_ENCODING_INT)
2989 value = (long)o->ptr;
2990 else
2991 assert(1 != 1);
2992 }
2993 }
2994
2995 value += incr;
2996 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2997 tryObjectEncoding(o);
2998 retval = dictAdd(c->db->dict,c->argv[1],o);
2999 if (retval == DICT_ERR) {
3000 dictReplace(c->db->dict,c->argv[1],o);
3001 removeExpire(c->db,c->argv[1]);
3002 } else {
3003 incrRefCount(c->argv[1]);
3004 }
3005 server.dirty++;
3006 addReply(c,shared.colon);
3007 addReply(c,o);
3008 addReply(c,shared.crlf);
3009 }
3010
3011 static void incrCommand(redisClient *c) {
3012 incrDecrCommand(c,1);
3013 }
3014
3015 static void decrCommand(redisClient *c) {
3016 incrDecrCommand(c,-1);
3017 }
3018
3019 static void incrbyCommand(redisClient *c) {
3020 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3021 incrDecrCommand(c,incr);
3022 }
3023
3024 static void decrbyCommand(redisClient *c) {
3025 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3026 incrDecrCommand(c,-incr);
3027 }
3028
3029 /* ========================= Type agnostic commands ========================= */
3030
3031 static void delCommand(redisClient *c) {
3032 int deleted = 0, j;
3033
3034 for (j = 1; j < c->argc; j++) {
3035 if (deleteKey(c->db,c->argv[j])) {
3036 server.dirty++;
3037 deleted++;
3038 }
3039 }
3040 switch(deleted) {
3041 case 0:
3042 addReply(c,shared.czero);
3043 break;
3044 case 1:
3045 addReply(c,shared.cone);
3046 break;
3047 default:
3048 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3049 break;
3050 }
3051 }
3052
3053 static void existsCommand(redisClient *c) {
3054 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3055 }
3056
3057 static void selectCommand(redisClient *c) {
3058 int id = atoi(c->argv[1]->ptr);
3059
3060 if (selectDb(c,id) == REDIS_ERR) {
3061 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3062 } else {
3063 addReply(c,shared.ok);
3064 }
3065 }
3066
3067 static void randomkeyCommand(redisClient *c) {
3068 dictEntry *de;
3069
3070 while(1) {
3071 de = dictGetRandomKey(c->db->dict);
3072 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3073 }
3074 if (de == NULL) {
3075 addReply(c,shared.plus);
3076 addReply(c,shared.crlf);
3077 } else {
3078 addReply(c,shared.plus);
3079 addReply(c,dictGetEntryKey(de));
3080 addReply(c,shared.crlf);
3081 }
3082 }
3083
3084 static void keysCommand(redisClient *c) {
3085 dictIterator *di;
3086 dictEntry *de;
3087 sds pattern = c->argv[1]->ptr;
3088 int plen = sdslen(pattern);
3089 int numkeys = 0, keyslen = 0;
3090 robj *lenobj = createObject(REDIS_STRING,NULL);
3091
3092 di = dictGetIterator(c->db->dict);
3093 addReply(c,lenobj);
3094 decrRefCount(lenobj);
3095 while((de = dictNext(di)) != NULL) {
3096 robj *keyobj = dictGetEntryKey(de);
3097
3098 sds key = keyobj->ptr;
3099 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3100 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3101 if (expireIfNeeded(c->db,keyobj) == 0) {
3102 if (numkeys != 0)
3103 addReply(c,shared.space);
3104 addReply(c,keyobj);
3105 numkeys++;
3106 keyslen += sdslen(key);
3107 }
3108 }
3109 }
3110 dictReleaseIterator(di);
3111 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3112 addReply(c,shared.crlf);
3113 }
3114
3115 static void dbsizeCommand(redisClient *c) {
3116 addReplySds(c,
3117 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3118 }
3119
3120 static void lastsaveCommand(redisClient *c) {
3121 addReplySds(c,
3122 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3123 }
3124
3125 static void typeCommand(redisClient *c) {
3126 robj *o;
3127 char *type;
3128
3129 o = lookupKeyRead(c->db,c->argv[1]);
3130 if (o == NULL) {
3131 type = "+none";
3132 } else {
3133 switch(o->type) {
3134 case REDIS_STRING: type = "+string"; break;
3135 case REDIS_LIST: type = "+list"; break;
3136 case REDIS_SET: type = "+set"; break;
3137 case REDIS_ZSET: type = "+zset"; break;
3138 default: type = "unknown"; break;
3139 }
3140 }
3141 addReplySds(c,sdsnew(type));
3142 addReply(c,shared.crlf);
3143 }
3144
3145 static void saveCommand(redisClient *c) {
3146 if (server.bgsaveinprogress) {
3147 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3148 return;
3149 }
3150 if (rdbSave(server.dbfilename) == REDIS_OK) {
3151 addReply(c,shared.ok);
3152 } else {
3153 addReply(c,shared.err);
3154 }
3155 }
3156
3157 static void bgsaveCommand(redisClient *c) {
3158 if (server.bgsaveinprogress) {
3159 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3160 return;
3161 }
3162 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3163 addReply(c,shared.ok);
3164 } else {
3165 addReply(c,shared.err);
3166 }
3167 }
3168
3169 static void shutdownCommand(redisClient *c) {
3170 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3171 /* Kill the saving child if there is a background saving in progress.
3172 We want to avoid race conditions, for instance our saving child may
3173 overwrite the synchronous saving did by SHUTDOWN. */
3174 if (server.bgsaveinprogress) {
3175 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3176 kill(server.bgsavechildpid,SIGKILL);
3177 rdbRemoveTempFile(server.bgsavechildpid);
3178 }
3179 /* SYNC SAVE */
3180 if (rdbSave(server.dbfilename) == REDIS_OK) {
3181 if (server.daemonize)
3182 unlink(server.pidfile);
3183 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3184 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3185 exit(1);
3186 } else {
3187 /* Ooops.. error saving! The best we can do is to continue operating.
3188 * Note that if there was a background saving process, in the next
3189 * cron() Redis will be notified that the background saving aborted,
3190 * handling special stuff like slaves pending for synchronization... */
3191 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3192 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3193 }
3194 }
3195
3196 static void renameGenericCommand(redisClient *c, int nx) {
3197 robj *o;
3198
3199 /* To use the same key as src and dst is probably an error */
3200 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3201 addReply(c,shared.sameobjecterr);
3202 return;
3203 }
3204
3205 o = lookupKeyWrite(c->db,c->argv[1]);
3206 if (o == NULL) {
3207 addReply(c,shared.nokeyerr);
3208 return;
3209 }
3210 incrRefCount(o);
3211 deleteIfVolatile(c->db,c->argv[2]);
3212 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3213 if (nx) {
3214 decrRefCount(o);
3215 addReply(c,shared.czero);
3216 return;
3217 }
3218 dictReplace(c->db->dict,c->argv[2],o);
3219 } else {
3220 incrRefCount(c->argv[2]);
3221 }
3222 deleteKey(c->db,c->argv[1]);
3223 server.dirty++;
3224 addReply(c,nx ? shared.cone : shared.ok);
3225 }
3226
3227 static void renameCommand(redisClient *c) {
3228 renameGenericCommand(c,0);
3229 }
3230
3231 static void renamenxCommand(redisClient *c) {
3232 renameGenericCommand(c,1);
3233 }
3234
3235 static void moveCommand(redisClient *c) {
3236 robj *o;
3237 redisDb *src, *dst;
3238 int srcid;
3239
3240 /* Obtain source and target DB pointers */
3241 src = c->db;
3242 srcid = c->db->id;
3243 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3244 addReply(c,shared.outofrangeerr);
3245 return;
3246 }
3247 dst = c->db;
3248 selectDb(c,srcid); /* Back to the source DB */
3249
3250 /* If the user is moving using as target the same
3251 * DB as the source DB it is probably an error. */
3252 if (src == dst) {
3253 addReply(c,shared.sameobjecterr);
3254 return;
3255 }
3256
3257 /* Check if the element exists and get a reference */
3258 o = lookupKeyWrite(c->db,c->argv[1]);
3259 if (!o) {
3260 addReply(c,shared.czero);
3261 return;
3262 }
3263
3264 /* Try to add the element to the target DB */
3265 deleteIfVolatile(dst,c->argv[1]);
3266 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3267 addReply(c,shared.czero);
3268 return;
3269 }
3270 incrRefCount(c->argv[1]);
3271 incrRefCount(o);
3272
3273 /* OK! key moved, free the entry in the source DB */
3274 deleteKey(src,c->argv[1]);
3275 server.dirty++;
3276 addReply(c,shared.cone);
3277 }
3278
3279 /* =================================== Lists ================================ */
3280 static void pushGenericCommand(redisClient *c, int where) {
3281 robj *lobj;
3282 list *list;
3283
3284 lobj = lookupKeyWrite(c->db,c->argv[1]);
3285 if (lobj == NULL) {
3286 lobj = createListObject();
3287 list = lobj->ptr;
3288 if (where == REDIS_HEAD) {
3289 listAddNodeHead(list,c->argv[2]);
3290 } else {
3291 listAddNodeTail(list,c->argv[2]);
3292 }
3293 dictAdd(c->db->dict,c->argv[1],lobj);
3294 incrRefCount(c->argv[1]);
3295 incrRefCount(c->argv[2]);
3296 } else {
3297 if (lobj->type != REDIS_LIST) {
3298 addReply(c,shared.wrongtypeerr);
3299 return;
3300 }
3301 list = lobj->ptr;
3302 if (where == REDIS_HEAD) {
3303 listAddNodeHead(list,c->argv[2]);
3304 } else {
3305 listAddNodeTail(list,c->argv[2]);
3306 }
3307 incrRefCount(c->argv[2]);
3308 }
3309 server.dirty++;
3310 addReply(c,shared.ok);
3311 }
3312
3313 static void lpushCommand(redisClient *c) {
3314 pushGenericCommand(c,REDIS_HEAD);
3315 }
3316
3317 static void rpushCommand(redisClient *c) {
3318 pushGenericCommand(c,REDIS_TAIL);
3319 }
3320
3321 static void llenCommand(redisClient *c) {
3322 robj *o;
3323 list *l;
3324
3325 o = lookupKeyRead(c->db,c->argv[1]);
3326 if (o == NULL) {
3327 addReply(c,shared.czero);
3328 return;
3329 } else {
3330 if (o->type != REDIS_LIST) {
3331 addReply(c,shared.wrongtypeerr);
3332 } else {
3333 l = o->ptr;
3334 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3335 }
3336 }
3337 }
3338
3339 static void lindexCommand(redisClient *c) {
3340 robj *o;
3341 int index = atoi(c->argv[2]->ptr);
3342
3343 o = lookupKeyRead(c->db,c->argv[1]);
3344 if (o == NULL) {
3345 addReply(c,shared.nullbulk);
3346 } else {
3347 if (o->type != REDIS_LIST) {
3348 addReply(c,shared.wrongtypeerr);
3349 } else {
3350 list *list = o->ptr;
3351 listNode *ln;
3352
3353 ln = listIndex(list, index);
3354 if (ln == NULL) {
3355 addReply(c,shared.nullbulk);
3356 } else {
3357 robj *ele = listNodeValue(ln);
3358 addReplyBulkLen(c,ele);
3359 addReply(c,ele);
3360 addReply(c,shared.crlf);
3361 }
3362 }
3363 }
3364 }
3365
3366 static void lsetCommand(redisClient *c) {
3367 robj *o;
3368 int index = atoi(c->argv[2]->ptr);
3369
3370 o = lookupKeyWrite(c->db,c->argv[1]);
3371 if (o == NULL) {
3372 addReply(c,shared.nokeyerr);
3373 } else {
3374 if (o->type != REDIS_LIST) {
3375 addReply(c,shared.wrongtypeerr);
3376 } else {
3377 list *list = o->ptr;
3378 listNode *ln;
3379
3380 ln = listIndex(list, index);
3381 if (ln == NULL) {
3382 addReply(c,shared.outofrangeerr);
3383 } else {
3384 robj *ele = listNodeValue(ln);
3385
3386 decrRefCount(ele);
3387 listNodeValue(ln) = c->argv[3];
3388 incrRefCount(c->argv[3]);
3389 addReply(c,shared.ok);
3390 server.dirty++;
3391 }
3392 }
3393 }
3394 }
3395
3396 static void popGenericCommand(redisClient *c, int where) {
3397 robj *o;
3398
3399 o = lookupKeyWrite(c->db,c->argv[1]);
3400 if (o == NULL) {
3401 addReply(c,shared.nullbulk);
3402 } else {
3403 if (o->type != REDIS_LIST) {
3404 addReply(c,shared.wrongtypeerr);
3405 } else {
3406 list *list = o->ptr;
3407 listNode *ln;
3408
3409 if (where == REDIS_HEAD)
3410 ln = listFirst(list);
3411 else
3412 ln = listLast(list);
3413
3414 if (ln == NULL) {
3415 addReply(c,shared.nullbulk);
3416 } else {
3417 robj *ele = listNodeValue(ln);
3418 addReplyBulkLen(c,ele);
3419 addReply(c,ele);
3420 addReply(c,shared.crlf);
3421 listDelNode(list,ln);
3422 server.dirty++;
3423 }
3424 }
3425 }
3426 }
3427
3428 static void lpopCommand(redisClient *c) {
3429 popGenericCommand(c,REDIS_HEAD);
3430 }
3431
3432 static void rpopCommand(redisClient *c) {
3433 popGenericCommand(c,REDIS_TAIL);
3434 }
3435
3436 static void lrangeCommand(redisClient *c) {
3437 robj *o;
3438 int start = atoi(c->argv[2]->ptr);
3439 int end = atoi(c->argv[3]->ptr);
3440
3441 o = lookupKeyRead(c->db,c->argv[1]);
3442 if (o == NULL) {
3443 addReply(c,shared.nullmultibulk);
3444 } else {
3445 if (o->type != REDIS_LIST) {
3446 addReply(c,shared.wrongtypeerr);
3447 } else {
3448 list *list = o->ptr;
3449 listNode *ln;
3450 int llen = listLength(list);
3451 int rangelen, j;
3452 robj *ele;
3453
3454 /* convert negative indexes */
3455 if (start < 0) start = llen+start;
3456 if (end < 0) end = llen+end;
3457 if (start < 0) start = 0;
3458 if (end < 0) end = 0;
3459
3460 /* indexes sanity checks */
3461 if (start > end || start >= llen) {
3462 /* Out of range start or start > end result in empty list */
3463 addReply(c,shared.emptymultibulk);
3464 return;
3465 }
3466 if (end >= llen) end = llen-1;
3467 rangelen = (end-start)+1;
3468
3469 /* Return the result in form of a multi-bulk reply */
3470 ln = listIndex(list, start);
3471 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3472 for (j = 0; j < rangelen; j++) {
3473 ele = listNodeValue(ln);
3474 addReplyBulkLen(c,ele);
3475 addReply(c,ele);
3476 addReply(c,shared.crlf);
3477 ln = ln->next;
3478 }
3479 }
3480 }
3481 }
3482
3483 static void ltrimCommand(redisClient *c) {
3484 robj *o;
3485 int start = atoi(c->argv[2]->ptr);
3486 int end = atoi(c->argv[3]->ptr);
3487
3488 o = lookupKeyWrite(c->db,c->argv[1]);
3489 if (o == NULL) {
3490 addReply(c,shared.nokeyerr);
3491 } else {
3492 if (o->type != REDIS_LIST) {
3493 addReply(c,shared.wrongtypeerr);
3494 } else {
3495 list *list = o->ptr;
3496 listNode *ln;
3497 int llen = listLength(list);
3498 int j, ltrim, rtrim;
3499
3500 /* convert negative indexes */
3501 if (start < 0) start = llen+start;
3502 if (end < 0) end = llen+end;
3503 if (start < 0) start = 0;
3504 if (end < 0) end = 0;
3505
3506 /* indexes sanity checks */
3507 if (start > end || start >= llen) {
3508 /* Out of range start or start > end result in empty list */
3509 ltrim = llen;
3510 rtrim = 0;
3511 } else {
3512 if (end >= llen) end = llen-1;
3513 ltrim = start;
3514 rtrim = llen-end-1;
3515 }
3516
3517 /* Remove list elements to perform the trim */
3518 for (j = 0; j < ltrim; j++) {
3519 ln = listFirst(list);
3520 listDelNode(list,ln);
3521 }
3522 for (j = 0; j < rtrim; j++) {
3523 ln = listLast(list);
3524 listDelNode(list,ln);
3525 }
3526 server.dirty++;
3527 addReply(c,shared.ok);
3528 }
3529 }
3530 }
3531
3532 static void lremCommand(redisClient *c) {
3533 robj *o;
3534
3535 o = lookupKeyWrite(c->db,c->argv[1]);
3536 if (o == NULL) {
3537 addReply(c,shared.czero);
3538 } else {
3539 if (o->type != REDIS_LIST) {
3540 addReply(c,shared.wrongtypeerr);
3541 } else {
3542 list *list = o->ptr;
3543 listNode *ln, *next;
3544 int toremove = atoi(c->argv[2]->ptr);
3545 int removed = 0;
3546 int fromtail = 0;
3547
3548 if (toremove < 0) {
3549 toremove = -toremove;
3550 fromtail = 1;
3551 }
3552 ln = fromtail ? list->tail : list->head;
3553 while (ln) {
3554 robj *ele = listNodeValue(ln);
3555
3556 next = fromtail ? ln->prev : ln->next;
3557 if (compareStringObjects(ele,c->argv[3]) == 0) {
3558 listDelNode(list,ln);
3559 server.dirty++;
3560 removed++;
3561 if (toremove && removed == toremove) break;
3562 }
3563 ln = next;
3564 }
3565 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3566 }
3567 }
3568 }
3569
3570 /* This is the semantic of this command:
3571 * RPOPLPUSH srclist dstlist:
3572 * IF LLEN(srclist) > 0
3573 * element = RPOP srclist
3574 * LPUSH dstlist element
3575 * RETURN element
3576 * ELSE
3577 * RETURN nil
3578 * END
3579 * END
3580 *
3581 * The idea is to be able to get an element from a list in a reliable way
3582 * since the element is not just returned but pushed against another list
3583 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3584 */
3585 static void rpoplpushcommand(redisClient *c) {
3586 robj *sobj;
3587
3588 sobj = lookupKeyWrite(c->db,c->argv[1]);
3589 if (sobj == NULL) {
3590 addReply(c,shared.nullbulk);
3591 } else {
3592 if (sobj->type != REDIS_LIST) {
3593 addReply(c,shared.wrongtypeerr);
3594 } else {
3595 list *srclist = sobj->ptr;
3596 listNode *ln = listLast(srclist);
3597
3598 if (ln == NULL) {
3599 addReply(c,shared.nullbulk);
3600 } else {
3601 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3602 robj *ele = listNodeValue(ln);
3603 list *dstlist;
3604
3605 if (dobj == NULL) {
3606
3607 /* Create the list if the key does not exist */
3608 dobj = createListObject();
3609 dictAdd(c->db->dict,c->argv[2],dobj);
3610 incrRefCount(c->argv[2]);
3611 } else if (dobj->type != REDIS_LIST) {
3612 addReply(c,shared.wrongtypeerr);
3613 return;
3614 }
3615 /* Add the element to the target list */
3616 dstlist = dobj->ptr;
3617 listAddNodeHead(dstlist,ele);
3618 incrRefCount(ele);
3619
3620 /* Send the element to the client as reply as well */
3621 addReplyBulkLen(c,ele);
3622 addReply(c,ele);
3623 addReply(c,shared.crlf);
3624
3625 /* Finally remove the element from the source list */
3626 listDelNode(srclist,ln);
3627 server.dirty++;
3628 }
3629 }
3630 }
3631 }
3632
3633
3634 /* ==================================== Sets ================================ */
3635
3636 static void saddCommand(redisClient *c) {
3637 robj *set;
3638
3639 set = lookupKeyWrite(c->db,c->argv[1]);
3640 if (set == NULL) {
3641 set = createSetObject();
3642 dictAdd(c->db->dict,c->argv[1],set);
3643 incrRefCount(c->argv[1]);
3644 } else {
3645 if (set->type != REDIS_SET) {
3646 addReply(c,shared.wrongtypeerr);
3647 return;
3648 }
3649 }
3650 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3651 incrRefCount(c->argv[2]);
3652 server.dirty++;
3653 addReply(c,shared.cone);
3654 } else {
3655 addReply(c,shared.czero);
3656 }
3657 }
3658
3659 static void sremCommand(redisClient *c) {
3660 robj *set;
3661
3662 set = lookupKeyWrite(c->db,c->argv[1]);
3663 if (set == NULL) {
3664 addReply(c,shared.czero);
3665 } else {
3666 if (set->type != REDIS_SET) {
3667 addReply(c,shared.wrongtypeerr);
3668 return;
3669 }
3670 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3671 server.dirty++;
3672 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3673 addReply(c,shared.cone);
3674 } else {
3675 addReply(c,shared.czero);
3676 }
3677 }
3678 }
3679
3680 static void smoveCommand(redisClient *c) {
3681 robj *srcset, *dstset;
3682
3683 srcset = lookupKeyWrite(c->db,c->argv[1]);
3684 dstset = lookupKeyWrite(c->db,c->argv[2]);
3685
3686 /* If the source key does not exist return 0, if it's of the wrong type
3687 * raise an error */
3688 if (srcset == NULL || srcset->type != REDIS_SET) {
3689 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3690 return;
3691 }
3692 /* Error if the destination key is not a set as well */
3693 if (dstset && dstset->type != REDIS_SET) {
3694 addReply(c,shared.wrongtypeerr);
3695 return;
3696 }
3697 /* Remove the element from the source set */
3698 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3699 /* Key not found in the src set! return zero */
3700 addReply(c,shared.czero);
3701 return;
3702 }
3703 server.dirty++;
3704 /* Add the element to the destination set */
3705 if (!dstset) {
3706 dstset = createSetObject();
3707 dictAdd(c->db->dict,c->argv[2],dstset);
3708 incrRefCount(c->argv[2]);
3709 }
3710 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3711 incrRefCount(c->argv[3]);
3712 addReply(c,shared.cone);
3713 }
3714
3715 static void sismemberCommand(redisClient *c) {
3716 robj *set;
3717
3718 set = lookupKeyRead(c->db,c->argv[1]);
3719 if (set == NULL) {
3720 addReply(c,shared.czero);
3721 } else {
3722 if (set->type != REDIS_SET) {
3723 addReply(c,shared.wrongtypeerr);
3724 return;
3725 }
3726 if (dictFind(set->ptr,c->argv[2]))
3727 addReply(c,shared.cone);
3728 else
3729 addReply(c,shared.czero);
3730 }
3731 }
3732
3733 static void scardCommand(redisClient *c) {
3734 robj *o;
3735 dict *s;
3736
3737 o = lookupKeyRead(c->db,c->argv[1]);
3738 if (o == NULL) {
3739 addReply(c,shared.czero);
3740 return;
3741 } else {
3742 if (o->type != REDIS_SET) {
3743 addReply(c,shared.wrongtypeerr);
3744 } else {
3745 s = o->ptr;
3746 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3747 dictSize(s)));
3748 }
3749 }
3750 }
3751
3752 static void spopCommand(redisClient *c) {
3753 robj *set;
3754 dictEntry *de;
3755
3756 set = lookupKeyWrite(c->db,c->argv[1]);
3757 if (set == NULL) {
3758 addReply(c,shared.nullbulk);
3759 } else {
3760 if (set->type != REDIS_SET) {
3761 addReply(c,shared.wrongtypeerr);
3762 return;
3763 }
3764 de = dictGetRandomKey(set->ptr);
3765 if (de == NULL) {
3766 addReply(c,shared.nullbulk);
3767 } else {
3768 robj *ele = dictGetEntryKey(de);
3769
3770 addReplyBulkLen(c,ele);
3771 addReply(c,ele);
3772 addReply(c,shared.crlf);
3773 dictDelete(set->ptr,ele);
3774 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3775 server.dirty++;
3776 }
3777 }
3778 }
3779
3780 static void srandmemberCommand(redisClient *c) {
3781 robj *set;
3782 dictEntry *de;
3783
3784 set = lookupKeyRead(c->db,c->argv[1]);
3785 if (set == NULL) {
3786 addReply(c,shared.nullbulk);
3787 } else {
3788 if (set->type != REDIS_SET) {
3789 addReply(c,shared.wrongtypeerr);
3790 return;
3791 }
3792 de = dictGetRandomKey(set->ptr);
3793 if (de == NULL) {
3794 addReply(c,shared.nullbulk);
3795 } else {
3796 robj *ele = dictGetEntryKey(de);
3797
3798 addReplyBulkLen(c,ele);
3799 addReply(c,ele);
3800 addReply(c,shared.crlf);
3801 }
3802 }
3803 }
3804
3805 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3806 dict **d1 = (void*) s1, **d2 = (void*) s2;
3807
3808 return dictSize(*d1)-dictSize(*d2);
3809 }
3810
3811 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3812 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3813 dictIterator *di;
3814 dictEntry *de;
3815 robj *lenobj = NULL, *dstset = NULL;
3816 int j, cardinality = 0;
3817
3818 for (j = 0; j < setsnum; j++) {
3819 robj *setobj;
3820
3821 setobj = dstkey ?
3822 lookupKeyWrite(c->db,setskeys[j]) :
3823 lookupKeyRead(c->db,setskeys[j]);
3824 if (!setobj) {
3825 zfree(dv);
3826 if (dstkey) {
3827 deleteKey(c->db,dstkey);
3828 addReply(c,shared.ok);
3829 } else {
3830 addReply(c,shared.nullmultibulk);
3831 }
3832 return;
3833 }
3834 if (setobj->type != REDIS_SET) {
3835 zfree(dv);
3836 addReply(c,shared.wrongtypeerr);
3837 return;
3838 }
3839 dv[j] = setobj->ptr;
3840 }
3841 /* Sort sets from the smallest to largest, this will improve our
3842 * algorithm's performace */
3843 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3844
3845 /* The first thing we should output is the total number of elements...
3846 * since this is a multi-bulk write, but at this stage we don't know
3847 * the intersection set size, so we use a trick, append an empty object
3848 * to the output list and save the pointer to later modify it with the
3849 * right length */
3850 if (!dstkey) {
3851 lenobj = createObject(REDIS_STRING,NULL);
3852 addReply(c,lenobj);
3853 decrRefCount(lenobj);
3854 } else {
3855 /* If we have a target key where to store the resulting set
3856 * create this key with an empty set inside */
3857 dstset = createSetObject();
3858 }
3859
3860 /* Iterate all the elements of the first (smallest) set, and test
3861 * the element against all the other sets, if at least one set does
3862 * not include the element it is discarded */
3863 di = dictGetIterator(dv[0]);
3864
3865 while((de = dictNext(di)) != NULL) {
3866 robj *ele;
3867
3868 for (j = 1; j < setsnum; j++)
3869 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3870 if (j != setsnum)
3871 continue; /* at least one set does not contain the member */
3872 ele = dictGetEntryKey(de);
3873 if (!dstkey) {
3874 addReplyBulkLen(c,ele);
3875 addReply(c,ele);
3876 addReply(c,shared.crlf);
3877 cardinality++;
3878 } else {
3879 dictAdd(dstset->ptr,ele,NULL);
3880 incrRefCount(ele);
3881 }
3882 }
3883 dictReleaseIterator(di);
3884
3885 if (dstkey) {
3886 /* Store the resulting set into the target */
3887 deleteKey(c->db,dstkey);
3888 dictAdd(c->db->dict,dstkey,dstset);
3889 incrRefCount(dstkey);
3890 }
3891
3892 if (!dstkey) {
3893 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3894 } else {
3895 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3896 dictSize((dict*)dstset->ptr)));
3897 server.dirty++;
3898 }
3899 zfree(dv);
3900 }
3901
3902 static void sinterCommand(redisClient *c) {
3903 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3904 }
3905
3906 static void sinterstoreCommand(redisClient *c) {
3907 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3908 }
3909
3910 #define REDIS_OP_UNION 0
3911 #define REDIS_OP_DIFF 1
3912
3913 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3914 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3915 dictIterator *di;
3916 dictEntry *de;
3917 robj *dstset = NULL;
3918 int j, cardinality = 0;
3919
3920 for (j = 0; j < setsnum; j++) {
3921 robj *setobj;
3922
3923 setobj = dstkey ?
3924 lookupKeyWrite(c->db,setskeys[j]) :
3925 lookupKeyRead(c->db,setskeys[j]);
3926 if (!setobj) {
3927 dv[j] = NULL;
3928 continue;
3929 }
3930 if (setobj->type != REDIS_SET) {
3931 zfree(dv);
3932 addReply(c,shared.wrongtypeerr);
3933 return;
3934 }
3935 dv[j] = setobj->ptr;
3936 }
3937
3938 /* We need a temp set object to store our union. If the dstkey
3939 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3940 * this set object will be the resulting object to set into the target key*/
3941 dstset = createSetObject();
3942
3943 /* Iterate all the elements of all the sets, add every element a single
3944 * time to the result set */
3945 for (j = 0; j < setsnum; j++) {
3946 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3947 if (!dv[j]) continue; /* non existing keys are like empty sets */
3948
3949 di = dictGetIterator(dv[j]);
3950
3951 while((de = dictNext(di)) != NULL) {
3952 robj *ele;
3953
3954 /* dictAdd will not add the same element multiple times */
3955 ele = dictGetEntryKey(de);
3956 if (op == REDIS_OP_UNION || j == 0) {
3957 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3958 incrRefCount(ele);
3959 cardinality++;
3960 }
3961 } else if (op == REDIS_OP_DIFF) {
3962 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3963 cardinality--;
3964 }
3965 }
3966 }
3967 dictReleaseIterator(di);
3968
3969 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3970 }
3971
3972 /* Output the content of the resulting set, if not in STORE mode */
3973 if (!dstkey) {
3974 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3975 di = dictGetIterator(dstset->ptr);
3976 while((de = dictNext(di)) != NULL) {
3977 robj *ele;
3978
3979 ele = dictGetEntryKey(de);
3980 addReplyBulkLen(c,ele);
3981 addReply(c,ele);
3982 addReply(c,shared.crlf);
3983 }
3984 dictReleaseIterator(di);
3985 } else {
3986 /* If we have a target key where to store the resulting set
3987 * create this key with the result set inside */
3988 deleteKey(c->db,dstkey);
3989 dictAdd(c->db->dict,dstkey,dstset);
3990 incrRefCount(dstkey);
3991 }
3992
3993 /* Cleanup */
3994 if (!dstkey) {
3995 decrRefCount(dstset);
3996 } else {
3997 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3998 dictSize((dict*)dstset->ptr)));
3999 server.dirty++;
4000 }
4001 zfree(dv);
4002 }
4003
4004 static void sunionCommand(redisClient *c) {
4005 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4006 }
4007
4008 static void sunionstoreCommand(redisClient *c) {
4009 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4010 }
4011
4012 static void sdiffCommand(redisClient *c) {
4013 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4014 }
4015
4016 static void sdiffstoreCommand(redisClient *c) {
4017 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4018 }
4019
4020 /* ==================================== ZSets =============================== */
4021
4022 /* ZSETs are ordered sets using two data structures to hold the same elements
4023 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4024 * data structure.
4025 *
4026 * The elements are added to an hash table mapping Redis objects to scores.
4027 * At the same time the elements are added to a skip list mapping scores
4028 * to Redis objects (so objects are sorted by scores in this "view"). */
4029
4030 /* This skiplist implementation is almost a C translation of the original
4031 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4032 * Alternative to Balanced Trees", modified in three ways:
4033 * a) this implementation allows for repeated values.
4034 * b) the comparison is not just by key (our 'score') but by satellite data.
4035 * c) there is a back pointer, so it's a doubly linked list with the back
4036 * pointers being only at "level 1". This allows to traverse the list
4037 * from tail to head, useful for ZREVRANGE. */
4038
4039 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4040 zskiplistNode *zn = zmalloc(sizeof(*zn));
4041
4042 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4043 zn->score = score;
4044 zn->obj = obj;
4045 return zn;
4046 }
4047
4048 static zskiplist *zslCreate(void) {
4049 int j;
4050 zskiplist *zsl;
4051
4052 zsl = zmalloc(sizeof(*zsl));
4053 zsl->level = 1;
4054 zsl->length = 0;
4055 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4056 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4057 zsl->header->forward[j] = NULL;
4058 zsl->header->backward = NULL;
4059 zsl->tail = NULL;
4060 return zsl;
4061 }
4062
4063 static void zslFreeNode(zskiplistNode *node) {
4064 decrRefCount(node->obj);
4065 zfree(node->forward);
4066 zfree(node);
4067 }
4068
4069 static void zslFree(zskiplist *zsl) {
4070 zskiplistNode *node = zsl->header->forward[0], *next;
4071
4072 zfree(zsl->header->forward);
4073 zfree(zsl->header);
4074 while(node) {
4075 next = node->forward[0];
4076 zslFreeNode(node);
4077 node = next;
4078 }
4079 zfree(zsl);
4080 }
4081
4082 static int zslRandomLevel(void) {
4083 int level = 1;
4084 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4085 level += 1;
4086 return level;
4087 }
4088
4089 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4090 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4091 int i, level;
4092
4093 x = zsl->header;
4094 for (i = zsl->level-1; i >= 0; i--) {
4095 while (x->forward[i] &&
4096 (x->forward[i]->score < score ||
4097 (x->forward[i]->score == score &&
4098 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4099 x = x->forward[i];
4100 update[i] = x;
4101 }
4102 /* we assume the key is not already inside, since we allow duplicated
4103 * scores, and the re-insertion of score and redis object should never
4104 * happpen since the caller of zslInsert() should test in the hash table
4105 * if the element is already inside or not. */
4106 level = zslRandomLevel();
4107 if (level > zsl->level) {
4108 for (i = zsl->level; i < level; i++)
4109 update[i] = zsl->header;
4110 zsl->level = level;
4111 }
4112 x = zslCreateNode(level,score,obj);
4113 for (i = 0; i < level; i++) {
4114 x->forward[i] = update[i]->forward[i];
4115 update[i]->forward[i] = x;
4116 }
4117 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4118 if (x->forward[0])
4119 x->forward[0]->backward = x;
4120 else
4121 zsl->tail = x;
4122 zsl->length++;
4123 }
4124
4125 /* Delete an element with matching score/object from the skiplist. */
4126 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4127 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4128 int i;
4129
4130 x = zsl->header;
4131 for (i = zsl->level-1; i >= 0; i--) {
4132 while (x->forward[i] &&
4133 (x->forward[i]->score < score ||
4134 (x->forward[i]->score == score &&
4135 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4136 x = x->forward[i];
4137 update[i] = x;
4138 }
4139 /* We may have multiple elements with the same score, what we need
4140 * is to find the element with both the right score and object. */
4141 x = x->forward[0];
4142 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4143 for (i = 0; i < zsl->level; i++) {
4144 if (update[i]->forward[i] != x) break;
4145 update[i]->forward[i] = x->forward[i];
4146 }
4147 if (x->forward[0]) {
4148 x->forward[0]->backward = (x->backward == zsl->header) ?
4149 NULL : x->backward;
4150 } else {
4151 zsl->tail = x->backward;
4152 }
4153 zslFreeNode(x);
4154 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4155 zsl->level--;
4156 zsl->length--;
4157 return 1;
4158 } else {
4159 return 0; /* not found */
4160 }
4161 return 0; /* not found */
4162 }
4163
4164 /* Delete all the elements with score between min and max from the skiplist.
4165 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4166 * Note that this function takes the reference to the hash table view of the
4167 * sorted set, in order to remove the elements from the hash table too. */
4168 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4169 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4170 unsigned long removed = 0;
4171 int i;
4172
4173 x = zsl->header;
4174 for (i = zsl->level-1; i >= 0; i--) {
4175 while (x->forward[i] && x->forward[i]->score < min)
4176 x = x->forward[i];
4177 update[i] = x;
4178 }
4179 /* We may have multiple elements with the same score, what we need
4180 * is to find the element with both the right score and object. */
4181 x = x->forward[0];
4182 while (x && x->score <= max) {
4183 zskiplistNode *next;
4184
4185 for (i = 0; i < zsl->level; i++) {
4186 if (update[i]->forward[i] != x) break;
4187 update[i]->forward[i] = x->forward[i];
4188 }
4189 if (x->forward[0]) {
4190 x->forward[0]->backward = (x->backward == zsl->header) ?
4191 NULL : x->backward;
4192 } else {
4193 zsl->tail = x->backward;
4194 }
4195 next = x->forward[0];
4196 dictDelete(dict,x->obj);
4197 zslFreeNode(x);
4198 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4199 zsl->level--;
4200 zsl->length--;
4201 removed++;
4202 x = next;
4203 }
4204 return removed; /* not found */
4205 }
4206
4207 /* Find the first node having a score equal or greater than the specified one.
4208 * Returns NULL if there is no match. */
4209 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4210 zskiplistNode *x;
4211 int i;
4212
4213 x = zsl->header;
4214 for (i = zsl->level-1; i >= 0; i--) {
4215 while (x->forward[i] && x->forward[i]->score < score)
4216 x = x->forward[i];
4217 }
4218 /* We may have multiple elements with the same score, what we need
4219 * is to find the element with both the right score and object. */
4220 return x->forward[0];
4221 }
4222
4223 /* The actual Z-commands implementations */
4224
4225 static void zaddCommand(redisClient *c) {
4226 robj *zsetobj;
4227 zset *zs;
4228 double *score;
4229
4230 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4231 if (zsetobj == NULL) {
4232 zsetobj = createZsetObject();
4233 dictAdd(c->db->dict,c->argv[1],zsetobj);
4234 incrRefCount(c->argv[1]);
4235 } else {
4236 if (zsetobj->type != REDIS_ZSET) {
4237 addReply(c,shared.wrongtypeerr);
4238 return;
4239 }
4240 }
4241 score = zmalloc(sizeof(double));
4242 *score = strtod(c->argv[2]->ptr,NULL);
4243 zs = zsetobj->ptr;
4244 if (dictAdd(zs->dict,c->argv[3],score) == DICT_OK) {
4245 /* case 1: New element */
4246 incrRefCount(c->argv[3]); /* added to hash */
4247 zslInsert(zs->zsl,*score,c->argv[3]);
4248 incrRefCount(c->argv[3]); /* added to skiplist */
4249 server.dirty++;
4250 addReply(c,shared.cone);
4251 } else {
4252 dictEntry *de;
4253 double *oldscore;
4254
4255 /* case 2: Score update operation */
4256 de = dictFind(zs->dict,c->argv[3]);
4257 assert(de != NULL);
4258 oldscore = dictGetEntryVal(de);
4259 if (*score != *oldscore) {
4260 int deleted;
4261
4262 deleted = zslDelete(zs->zsl,*oldscore,c->argv[3]);
4263 assert(deleted != 0);
4264 zslInsert(zs->zsl,*score,c->argv[3]);
4265 incrRefCount(c->argv[3]);
4266 dictReplace(zs->dict,c->argv[3],score);
4267 server.dirty++;
4268 } else {
4269 zfree(score);
4270 }
4271 addReply(c,shared.czero);
4272 }
4273 }
4274
4275 static void zremCommand(redisClient *c) {
4276 robj *zsetobj;
4277 zset *zs;
4278
4279 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4280 if (zsetobj == NULL) {
4281 addReply(c,shared.czero);
4282 } else {
4283 dictEntry *de;
4284 double *oldscore;
4285 int deleted;
4286
4287 if (zsetobj->type != REDIS_ZSET) {
4288 addReply(c,shared.wrongtypeerr);
4289 return;
4290 }
4291 zs = zsetobj->ptr;
4292 de = dictFind(zs->dict,c->argv[2]);
4293 if (de == NULL) {
4294 addReply(c,shared.czero);
4295 return;
4296 }
4297 /* Delete from the skiplist */
4298 oldscore = dictGetEntryVal(de);
4299 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4300 assert(deleted != 0);
4301
4302 /* Delete from the hash table */
4303 dictDelete(zs->dict,c->argv[2]);
4304 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4305 server.dirty++;
4306 addReply(c,shared.cone);
4307 }
4308 }
4309
4310 static void zremrangebyscoreCommand(redisClient *c) {
4311 double min = strtod(c->argv[2]->ptr,NULL);
4312 double max = strtod(c->argv[3]->ptr,NULL);
4313 robj *zsetobj;
4314 zset *zs;
4315
4316 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4317 if (zsetobj == NULL) {
4318 addReply(c,shared.czero);
4319 } else {
4320 long deleted;
4321
4322 if (zsetobj->type != REDIS_ZSET) {
4323 addReply(c,shared.wrongtypeerr);
4324 return;
4325 }
4326 zs = zsetobj->ptr;
4327 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4328 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4329 server.dirty += deleted;
4330 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4331 }
4332 }
4333
4334 static void zrangeGenericCommand(redisClient *c, int reverse) {
4335 robj *o;
4336 int start = atoi(c->argv[2]->ptr);
4337 int end = atoi(c->argv[3]->ptr);
4338
4339 o = lookupKeyRead(c->db,c->argv[1]);
4340 if (o == NULL) {
4341 addReply(c,shared.nullmultibulk);
4342 } else {
4343 if (o->type != REDIS_ZSET) {
4344 addReply(c,shared.wrongtypeerr);
4345 } else {
4346 zset *zsetobj = o->ptr;
4347 zskiplist *zsl = zsetobj->zsl;
4348 zskiplistNode *ln;
4349
4350 int llen = zsl->length;
4351 int rangelen, j;
4352 robj *ele;
4353
4354 /* convert negative indexes */
4355 if (start < 0) start = llen+start;
4356 if (end < 0) end = llen+end;
4357 if (start < 0) start = 0;
4358 if (end < 0) end = 0;
4359
4360 /* indexes sanity checks */
4361 if (start > end || start >= llen) {
4362 /* Out of range start or start > end result in empty list */
4363 addReply(c,shared.emptymultibulk);
4364 return;
4365 }
4366 if (end >= llen) end = llen-1;
4367 rangelen = (end-start)+1;
4368
4369 /* Return the result in form of a multi-bulk reply */
4370 if (reverse) {
4371 ln = zsl->tail;
4372 while (start--)
4373 ln = ln->backward;
4374 } else {
4375 ln = zsl->header->forward[0];
4376 while (start--)
4377 ln = ln->forward[0];
4378 }
4379
4380 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4381 for (j = 0; j < rangelen; j++) {
4382 ele = ln->obj;
4383 addReplyBulkLen(c,ele);
4384 addReply(c,ele);
4385 addReply(c,shared.crlf);
4386 ln = reverse ? ln->backward : ln->forward[0];
4387 }
4388 }
4389 }
4390 }
4391
4392 static void zrangeCommand(redisClient *c) {
4393 zrangeGenericCommand(c,0);
4394 }
4395
4396 static void zrevrangeCommand(redisClient *c) {
4397 zrangeGenericCommand(c,1);
4398 }
4399
4400 static void zrangebyscoreCommand(redisClient *c) {
4401 robj *o;
4402 double min = strtod(c->argv[2]->ptr,NULL);
4403 double max = strtod(c->argv[3]->ptr,NULL);
4404
4405 o = lookupKeyRead(c->db,c->argv[1]);
4406 if (o == NULL) {
4407 addReply(c,shared.nullmultibulk);
4408 } else {
4409 if (o->type != REDIS_ZSET) {
4410 addReply(c,shared.wrongtypeerr);
4411 } else {
4412 zset *zsetobj = o->ptr;
4413 zskiplist *zsl = zsetobj->zsl;
4414 zskiplistNode *ln;
4415 robj *ele, *lenobj;
4416 unsigned int rangelen = 0;
4417
4418 /* Get the first node with the score >= min */
4419 ln = zslFirstWithScore(zsl,min);
4420 if (ln == NULL) {
4421 /* No element matching the speciifed interval */
4422 addReply(c,shared.emptymultibulk);
4423 return;
4424 }
4425
4426 /* We don't know in advance how many matching elements there
4427 * are in the list, so we push this object that will represent
4428 * the multi-bulk length in the output buffer, and will "fix"
4429 * it later */
4430 lenobj = createObject(REDIS_STRING,NULL);
4431 addReply(c,lenobj);
4432
4433 while(ln && ln->score <= max) {
4434 ele = ln->obj;
4435 addReplyBulkLen(c,ele);
4436 addReply(c,ele);
4437 addReply(c,shared.crlf);
4438 ln = ln->forward[0];
4439 rangelen++;
4440 }
4441 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4442 }
4443 }
4444 }
4445
4446 static void zcardCommand(redisClient *c) {
4447 robj *o;
4448 zset *zs;
4449
4450 o = lookupKeyRead(c->db,c->argv[1]);
4451 if (o == NULL) {
4452 addReply(c,shared.czero);
4453 return;
4454 } else {
4455 if (o->type != REDIS_ZSET) {
4456 addReply(c,shared.wrongtypeerr);
4457 } else {
4458 zs = o->ptr;
4459 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4460 }
4461 }
4462 }
4463
4464 static void zscoreCommand(redisClient *c) {
4465 robj *o;
4466 zset *zs;
4467
4468 o = lookupKeyRead(c->db,c->argv[1]);
4469 if (o == NULL) {
4470 addReply(c,shared.nullbulk);
4471 return;
4472 } else {
4473 if (o->type != REDIS_ZSET) {
4474 addReply(c,shared.wrongtypeerr);
4475 } else {
4476 dictEntry *de;
4477
4478 zs = o->ptr;
4479 de = dictFind(zs->dict,c->argv[2]);
4480 if (!de) {
4481 addReply(c,shared.nullbulk);
4482 } else {
4483 char buf[128];
4484 double *score = dictGetEntryVal(de);
4485
4486 snprintf(buf,sizeof(buf),"%.17g",*score);
4487 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
4488 strlen(buf),buf));
4489 }
4490 }
4491 }
4492 }
4493
4494 /* ========================= Non type-specific commands ==================== */
4495
4496 static void flushdbCommand(redisClient *c) {
4497 server.dirty += dictSize(c->db->dict);
4498 dictEmpty(c->db->dict);
4499 dictEmpty(c->db->expires);
4500 addReply(c,shared.ok);
4501 }
4502
4503 static void flushallCommand(redisClient *c) {
4504 server.dirty += emptyDb();
4505 addReply(c,shared.ok);
4506 rdbSave(server.dbfilename);
4507 server.dirty++;
4508 }
4509
4510 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4511 redisSortOperation *so = zmalloc(sizeof(*so));
4512 so->type = type;
4513 so->pattern = pattern;
4514 return so;
4515 }
4516
4517 /* Return the value associated to the key with a name obtained
4518 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4519 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4520 char *p;
4521 sds spat, ssub;
4522 robj keyobj;
4523 int prefixlen, sublen, postfixlen;
4524 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4525 struct {
4526 long len;
4527 long free;
4528 char buf[REDIS_SORTKEY_MAX+1];
4529 } keyname;
4530
4531 if (subst->encoding == REDIS_ENCODING_RAW)
4532 incrRefCount(subst);
4533 else {
4534 subst = getDecodedObject(subst);
4535 }
4536
4537 spat = pattern->ptr;
4538 ssub = subst->ptr;
4539 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4540 p = strchr(spat,'*');
4541 if (!p) return NULL;
4542
4543 prefixlen = p-spat;
4544 sublen = sdslen(ssub);
4545 postfixlen = sdslen(spat)-(prefixlen+1);
4546 memcpy(keyname.buf,spat,prefixlen);
4547 memcpy(keyname.buf+prefixlen,ssub,sublen);
4548 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4549 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4550 keyname.len = prefixlen+sublen+postfixlen;
4551
4552 keyobj.refcount = 1;
4553 keyobj.type = REDIS_STRING;
4554 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4555
4556 decrRefCount(subst);
4557
4558 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4559 return lookupKeyRead(db,&keyobj);
4560 }
4561
4562 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4563 * the additional parameter is not standard but a BSD-specific we have to
4564 * pass sorting parameters via the global 'server' structure */
4565 static int sortCompare(const void *s1, const void *s2) {
4566 const redisSortObject *so1 = s1, *so2 = s2;
4567 int cmp;
4568
4569 if (!server.sort_alpha) {
4570 /* Numeric sorting. Here it's trivial as we precomputed scores */
4571 if (so1->u.score > so2->u.score) {
4572 cmp = 1;
4573 } else if (so1->u.score < so2->u.score) {
4574 cmp = -1;
4575 } else {
4576 cmp = 0;
4577 }
4578 } else {
4579 /* Alphanumeric sorting */
4580 if (server.sort_bypattern) {
4581 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4582 /* At least one compare object is NULL */
4583 if (so1->u.cmpobj == so2->u.cmpobj)
4584 cmp = 0;
4585 else if (so1->u.cmpobj == NULL)
4586 cmp = -1;
4587 else
4588 cmp = 1;
4589 } else {
4590 /* We have both the objects, use strcoll */
4591 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4592 }
4593 } else {
4594 /* Compare elements directly */
4595 if (so1->obj->encoding == REDIS_ENCODING_RAW &&
4596 so2->obj->encoding == REDIS_ENCODING_RAW) {
4597 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
4598 } else {
4599 robj *dec1, *dec2;
4600
4601 dec1 = so1->obj->encoding == REDIS_ENCODING_RAW ?
4602 so1->obj : getDecodedObject(so1->obj);
4603 dec2 = so2->obj->encoding == REDIS_ENCODING_RAW ?
4604 so2->obj : getDecodedObject(so2->obj);
4605 cmp = strcoll(dec1->ptr,dec2->ptr);
4606 if (dec1 != so1->obj) decrRefCount(dec1);
4607 if (dec2 != so2->obj) decrRefCount(dec2);
4608 }
4609 }
4610 }
4611 return server.sort_desc ? -cmp : cmp;
4612 }
4613
4614 /* The SORT command is the most complex command in Redis. Warning: this code
4615 * is optimized for speed and a bit less for readability */
4616 static void sortCommand(redisClient *c) {
4617 list *operations;
4618 int outputlen = 0;
4619 int desc = 0, alpha = 0;
4620 int limit_start = 0, limit_count = -1, start, end;
4621 int j, dontsort = 0, vectorlen;
4622 int getop = 0; /* GET operation counter */
4623 robj *sortval, *sortby = NULL, *storekey = NULL;
4624 redisSortObject *vector; /* Resulting vector to sort */
4625
4626 /* Lookup the key to sort. It must be of the right types */
4627 sortval = lookupKeyRead(c->db,c->argv[1]);
4628 if (sortval == NULL) {
4629 addReply(c,shared.nokeyerr);
4630 return;
4631 }
4632 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4633 addReply(c,shared.wrongtypeerr);
4634 return;
4635 }
4636
4637 /* Create a list of operations to perform for every sorted element.
4638 * Operations can be GET/DEL/INCR/DECR */
4639 operations = listCreate();
4640 listSetFreeMethod(operations,zfree);
4641 j = 2;
4642
4643 /* Now we need to protect sortval incrementing its count, in the future
4644 * SORT may have options able to overwrite/delete keys during the sorting
4645 * and the sorted key itself may get destroied */
4646 incrRefCount(sortval);
4647
4648 /* The SORT command has an SQL-alike syntax, parse it */
4649 while(j < c->argc) {
4650 int leftargs = c->argc-j-1;
4651 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4652 desc = 0;
4653 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4654 desc = 1;
4655 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4656 alpha = 1;
4657 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4658 limit_start = atoi(c->argv[j+1]->ptr);
4659 limit_count = atoi(c->argv[j+2]->ptr);
4660 j+=2;
4661 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4662 storekey = c->argv[j+1];
4663 j++;
4664 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4665 sortby = c->argv[j+1];
4666 /* If the BY pattern does not contain '*', i.e. it is constant,
4667 * we don't need to sort nor to lookup the weight keys. */
4668 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4669 j++;
4670 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4671 listAddNodeTail(operations,createSortOperation(
4672 REDIS_SORT_GET,c->argv[j+1]));
4673 getop++;
4674 j++;
4675 } else {
4676 decrRefCount(sortval);
4677 listRelease(operations);
4678 addReply(c,shared.syntaxerr);
4679 return;
4680 }
4681 j++;
4682 }
4683
4684 /* Load the sorting vector with all the objects to sort */
4685 vectorlen = (sortval->type == REDIS_LIST) ?
4686 listLength((list*)sortval->ptr) :
4687 dictSize((dict*)sortval->ptr);
4688 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4689 j = 0;
4690 if (sortval->type == REDIS_LIST) {
4691 list *list = sortval->ptr;
4692 listNode *ln;
4693
4694 listRewind(list);
4695 while((ln = listYield(list))) {
4696 robj *ele = ln->value;
4697 vector[j].obj = ele;
4698 vector[j].u.score = 0;
4699 vector[j].u.cmpobj = NULL;
4700 j++;
4701 }
4702 } else {
4703 dict *set = sortval->ptr;
4704 dictIterator *di;
4705 dictEntry *setele;
4706
4707 di = dictGetIterator(set);
4708 while((setele = dictNext(di)) != NULL) {
4709 vector[j].obj = dictGetEntryKey(setele);
4710 vector[j].u.score = 0;
4711 vector[j].u.cmpobj = NULL;
4712 j++;
4713 }
4714 dictReleaseIterator(di);
4715 }
4716 assert(j == vectorlen);
4717
4718 /* Now it's time to load the right scores in the sorting vector */
4719 if (dontsort == 0) {
4720 for (j = 0; j < vectorlen; j++) {
4721 if (sortby) {
4722 robj *byval;
4723
4724 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4725 if (!byval || byval->type != REDIS_STRING) continue;
4726 if (alpha) {
4727 if (byval->encoding == REDIS_ENCODING_RAW) {
4728 vector[j].u.cmpobj = byval;
4729 incrRefCount(byval);
4730 } else {
4731 vector[j].u.cmpobj = getDecodedObject(byval);
4732 }
4733 } else {
4734 if (byval->encoding == REDIS_ENCODING_RAW) {
4735 vector[j].u.score = strtod(byval->ptr,NULL);
4736 } else {
4737 if (byval->encoding == REDIS_ENCODING_INT) {
4738 vector[j].u.score = (long)byval->ptr;
4739 } else
4740 assert(1 != 1);
4741 }
4742 }
4743 } else {
4744 if (!alpha) {
4745 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4746 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4747 else {
4748 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4749 vector[j].u.score = (long) vector[j].obj->ptr;
4750 else
4751 assert(1 != 1);
4752 }
4753 }
4754 }
4755 }
4756 }
4757
4758 /* We are ready to sort the vector... perform a bit of sanity check
4759 * on the LIMIT option too. We'll use a partial version of quicksort. */
4760 start = (limit_start < 0) ? 0 : limit_start;
4761 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4762 if (start >= vectorlen) {
4763 start = vectorlen-1;
4764 end = vectorlen-2;
4765 }
4766 if (end >= vectorlen) end = vectorlen-1;
4767
4768 if (dontsort == 0) {
4769 server.sort_desc = desc;
4770 server.sort_alpha = alpha;
4771 server.sort_bypattern = sortby ? 1 : 0;
4772 if (sortby && (start != 0 || end != vectorlen-1))
4773 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4774 else
4775 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4776 }
4777
4778 /* Send command output to the output buffer, performing the specified
4779 * GET/DEL/INCR/DECR operations if any. */
4780 outputlen = getop ? getop*(end-start+1) : end-start+1;
4781 if (storekey == NULL) {
4782 /* STORE option not specified, sent the sorting result to client */
4783 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4784 for (j = start; j <= end; j++) {
4785 listNode *ln;
4786 if (!getop) {
4787 addReplyBulkLen(c,vector[j].obj);
4788 addReply(c,vector[j].obj);
4789 addReply(c,shared.crlf);
4790 }
4791 listRewind(operations);
4792 while((ln = listYield(operations))) {
4793 redisSortOperation *sop = ln->value;
4794 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4795 vector[j].obj);
4796
4797 if (sop->type == REDIS_SORT_GET) {
4798 if (!val || val->type != REDIS_STRING) {
4799 addReply(c,shared.nullbulk);
4800 } else {
4801 addReplyBulkLen(c,val);
4802 addReply(c,val);
4803 addReply(c,shared.crlf);
4804 }
4805 } else {
4806 assert(sop->type == REDIS_SORT_GET); /* always fails */
4807 }
4808 }
4809 }
4810 } else {
4811 robj *listObject = createListObject();
4812 list *listPtr = (list*) listObject->ptr;
4813
4814 /* STORE option specified, set the sorting result as a List object */
4815 for (j = start; j <= end; j++) {
4816 listNode *ln;
4817 if (!getop) {
4818 listAddNodeTail(listPtr,vector[j].obj);
4819 incrRefCount(vector[j].obj);
4820 }
4821 listRewind(operations);
4822 while((ln = listYield(operations))) {
4823 redisSortOperation *sop = ln->value;
4824 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4825 vector[j].obj);
4826
4827 if (sop->type == REDIS_SORT_GET) {
4828 if (!val || val->type != REDIS_STRING) {
4829 listAddNodeTail(listPtr,createStringObject("",0));
4830 } else {
4831 listAddNodeTail(listPtr,val);
4832 incrRefCount(val);
4833 }
4834 } else {
4835 assert(sop->type == REDIS_SORT_GET); /* always fails */
4836 }
4837 }
4838 }
4839 if (dictReplace(c->db->dict,storekey,listObject)) {
4840 incrRefCount(storekey);
4841 }
4842 /* Note: we add 1 because the DB is dirty anyway since even if the
4843 * SORT result is empty a new key is set and maybe the old content
4844 * replaced. */
4845 server.dirty += 1+outputlen;
4846 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
4847 }
4848
4849 /* Cleanup */
4850 decrRefCount(sortval);
4851 listRelease(operations);
4852 for (j = 0; j < vectorlen; j++) {
4853 if (sortby && alpha && vector[j].u.cmpobj)
4854 decrRefCount(vector[j].u.cmpobj);
4855 }
4856 zfree(vector);
4857 }
4858
4859 static void infoCommand(redisClient *c) {
4860 sds info;
4861 time_t uptime = time(NULL)-server.stat_starttime;
4862 int j;
4863
4864 info = sdscatprintf(sdsempty(),
4865 "redis_version:%s\r\n"
4866 "arch_bits:%s\r\n"
4867 "uptime_in_seconds:%d\r\n"
4868 "uptime_in_days:%d\r\n"
4869 "connected_clients:%d\r\n"
4870 "connected_slaves:%d\r\n"
4871 "used_memory:%zu\r\n"
4872 "changes_since_last_save:%lld\r\n"
4873 "bgsave_in_progress:%d\r\n"
4874 "last_save_time:%d\r\n"
4875 "total_connections_received:%lld\r\n"
4876 "total_commands_processed:%lld\r\n"
4877 "role:%s\r\n"
4878 ,REDIS_VERSION,
4879 (sizeof(long) == 8) ? "64" : "32",
4880 uptime,
4881 uptime/(3600*24),
4882 listLength(server.clients)-listLength(server.slaves),
4883 listLength(server.slaves),
4884 server.usedmemory,
4885 server.dirty,
4886 server.bgsaveinprogress,
4887 server.lastsave,
4888 server.stat_numconnections,
4889 server.stat_numcommands,
4890 server.masterhost == NULL ? "master" : "slave"
4891 );
4892 if (server.masterhost) {
4893 info = sdscatprintf(info,
4894 "master_host:%s\r\n"
4895 "master_port:%d\r\n"
4896 "master_link_status:%s\r\n"
4897 "master_last_io_seconds_ago:%d\r\n"
4898 ,server.masterhost,
4899 server.masterport,
4900 (server.replstate == REDIS_REPL_CONNECTED) ?
4901 "up" : "down",
4902 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
4903 );
4904 }
4905 for (j = 0; j < server.dbnum; j++) {
4906 long long keys, vkeys;
4907
4908 keys = dictSize(server.db[j].dict);
4909 vkeys = dictSize(server.db[j].expires);
4910 if (keys || vkeys) {
4911 info = sdscatprintf(info, "db%d: keys=%lld,expires=%lld\r\n",
4912 j, keys, vkeys);
4913 }
4914 }
4915 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
4916 addReplySds(c,info);
4917 addReply(c,shared.crlf);
4918 }
4919
4920 static void monitorCommand(redisClient *c) {
4921 /* ignore MONITOR if aleady slave or in monitor mode */
4922 if (c->flags & REDIS_SLAVE) return;
4923
4924 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
4925 c->slaveseldb = 0;
4926 listAddNodeTail(server.monitors,c);
4927 addReply(c,shared.ok);
4928 }
4929
4930 /* ================================= Expire ================================= */
4931 static int removeExpire(redisDb *db, robj *key) {
4932 if (dictDelete(db->expires,key) == DICT_OK) {
4933 return 1;
4934 } else {
4935 return 0;
4936 }
4937 }
4938
4939 static int setExpire(redisDb *db, robj *key, time_t when) {
4940 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
4941 return 0;
4942 } else {
4943 incrRefCount(key);
4944 return 1;
4945 }
4946 }
4947
4948 /* Return the expire time of the specified key, or -1 if no expire
4949 * is associated with this key (i.e. the key is non volatile) */
4950 static time_t getExpire(redisDb *db, robj *key) {
4951 dictEntry *de;
4952
4953 /* No expire? return ASAP */
4954 if (dictSize(db->expires) == 0 ||
4955 (de = dictFind(db->expires,key)) == NULL) return -1;
4956
4957 return (time_t) dictGetEntryVal(de);
4958 }
4959
4960 static int expireIfNeeded(redisDb *db, robj *key) {
4961 time_t when;
4962 dictEntry *de;
4963
4964 /* No expire? return ASAP */
4965 if (dictSize(db->expires) == 0 ||
4966 (de = dictFind(db->expires,key)) == NULL) return 0;
4967
4968 /* Lookup the expire */
4969 when = (time_t) dictGetEntryVal(de);
4970 if (time(NULL) <= when) return 0;
4971
4972 /* Delete the key */
4973 dictDelete(db->expires,key);
4974 return dictDelete(db->dict,key) == DICT_OK;
4975 }
4976
4977 static int deleteIfVolatile(redisDb *db, robj *key) {
4978 dictEntry *de;
4979
4980 /* No expire? return ASAP */
4981 if (dictSize(db->expires) == 0 ||
4982 (de = dictFind(db->expires,key)) == NULL) return 0;
4983
4984 /* Delete the key */
4985 server.dirty++;
4986 dictDelete(db->expires,key);
4987 return dictDelete(db->dict,key) == DICT_OK;
4988 }
4989
4990 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
4991 dictEntry *de;
4992
4993 de = dictFind(c->db->dict,key);
4994 if (de == NULL) {
4995 addReply(c,shared.czero);
4996 return;
4997 }
4998 if (seconds < 0) {
4999 if (deleteKey(c->db,key)) server.dirty++;
5000 addReply(c, shared.cone);
5001 return;
5002 } else {
5003 time_t when = time(NULL)+seconds;
5004 if (setExpire(c->db,key,when)) {
5005 addReply(c,shared.cone);
5006 server.dirty++;
5007 } else {
5008 addReply(c,shared.czero);
5009 }
5010 return;
5011 }
5012 }
5013
5014 static void expireCommand(redisClient *c) {
5015 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5016 }
5017
5018 static void expireatCommand(redisClient *c) {
5019 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5020 }
5021
5022 static void ttlCommand(redisClient *c) {
5023 time_t expire;
5024 int ttl = -1;
5025
5026 expire = getExpire(c->db,c->argv[1]);
5027 if (expire != -1) {
5028 ttl = (int) (expire-time(NULL));
5029 if (ttl < 0) ttl = -1;
5030 }
5031 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5032 }
5033
5034 static void msetGenericCommand(redisClient *c, int nx) {
5035 int j;
5036
5037 if ((c->argc % 2) == 0) {
5038 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
5039 return;
5040 }
5041 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
5042 * set nothing at all if at least one already key exists. */
5043 if (nx) {
5044 for (j = 1; j < c->argc; j += 2) {
5045 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
5046 addReply(c, shared.czero);
5047 return;
5048 }
5049 }
5050 }
5051
5052 for (j = 1; j < c->argc; j += 2) {
5053 int retval;
5054
5055 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
5056 if (retval == DICT_ERR) {
5057 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
5058 incrRefCount(c->argv[j+1]);
5059 } else {
5060 incrRefCount(c->argv[j]);
5061 incrRefCount(c->argv[j+1]);
5062 }
5063 removeExpire(c->db,c->argv[j]);
5064 }
5065 server.dirty += (c->argc-1)/2;
5066 addReply(c, nx ? shared.cone : shared.ok);
5067 }
5068
5069 static void msetCommand(redisClient *c) {
5070 msetGenericCommand(c,0);
5071 }
5072
5073 static void msetnxCommand(redisClient *c) {
5074 msetGenericCommand(c,1);
5075 }
5076
5077 /* =============================== Replication ============================= */
5078
5079 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5080 ssize_t nwritten, ret = size;
5081 time_t start = time(NULL);
5082
5083 timeout++;
5084 while(size) {
5085 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5086 nwritten = write(fd,ptr,size);
5087 if (nwritten == -1) return -1;
5088 ptr += nwritten;
5089 size -= nwritten;
5090 }
5091 if ((time(NULL)-start) > timeout) {
5092 errno = ETIMEDOUT;
5093 return -1;
5094 }
5095 }
5096 return ret;
5097 }
5098
5099 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5100 ssize_t nread, totread = 0;
5101 time_t start = time(NULL);
5102
5103 timeout++;
5104 while(size) {
5105 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5106 nread = read(fd,ptr,size);
5107 if (nread == -1) return -1;
5108 ptr += nread;
5109 size -= nread;
5110 totread += nread;
5111 }
5112 if ((time(NULL)-start) > timeout) {
5113 errno = ETIMEDOUT;
5114 return -1;
5115 }
5116 }
5117 return totread;
5118 }
5119
5120 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5121 ssize_t nread = 0;
5122
5123 size--;
5124 while(size) {
5125 char c;
5126
5127 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5128 if (c == '\n') {
5129 *ptr = '\0';
5130 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5131 return nread;
5132 } else {
5133 *ptr++ = c;
5134 *ptr = '\0';
5135 nread++;
5136 }
5137 }
5138 return nread;
5139 }
5140
5141 static void syncCommand(redisClient *c) {
5142 /* ignore SYNC if aleady slave or in monitor mode */
5143 if (c->flags & REDIS_SLAVE) return;
5144
5145 /* SYNC can't be issued when the server has pending data to send to
5146 * the client about already issued commands. We need a fresh reply
5147 * buffer registering the differences between the BGSAVE and the current
5148 * dataset, so that we can copy to other slaves if needed. */
5149 if (listLength(c->reply) != 0) {
5150 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5151 return;
5152 }
5153
5154 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5155 /* Here we need to check if there is a background saving operation
5156 * in progress, or if it is required to start one */
5157 if (server.bgsaveinprogress) {
5158 /* Ok a background save is in progress. Let's check if it is a good
5159 * one for replication, i.e. if there is another slave that is
5160 * registering differences since the server forked to save */
5161 redisClient *slave;
5162 listNode *ln;
5163
5164 listRewind(server.slaves);
5165 while((ln = listYield(server.slaves))) {
5166 slave = ln->value;
5167 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5168 }
5169 if (ln) {
5170 /* Perfect, the server is already registering differences for
5171 * another slave. Set the right state, and copy the buffer. */
5172 listRelease(c->reply);
5173 c->reply = listDup(slave->reply);
5174 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5175 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5176 } else {
5177 /* No way, we need to wait for the next BGSAVE in order to
5178 * register differences */
5179 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5180 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5181 }
5182 } else {
5183 /* Ok we don't have a BGSAVE in progress, let's start one */
5184 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5185 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5186 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5187 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5188 return;
5189 }
5190 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5191 }
5192 c->repldbfd = -1;
5193 c->flags |= REDIS_SLAVE;
5194 c->slaveseldb = 0;
5195 listAddNodeTail(server.slaves,c);
5196 return;
5197 }
5198
5199 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5200 redisClient *slave = privdata;
5201 REDIS_NOTUSED(el);
5202 REDIS_NOTUSED(mask);
5203 char buf[REDIS_IOBUF_LEN];
5204 ssize_t nwritten, buflen;
5205
5206 if (slave->repldboff == 0) {
5207 /* Write the bulk write count before to transfer the DB. In theory here
5208 * we don't know how much room there is in the output buffer of the
5209 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5210 * operations) will never be smaller than the few bytes we need. */
5211 sds bulkcount;
5212
5213 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5214 slave->repldbsize);
5215 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5216 {
5217 sdsfree(bulkcount);
5218 freeClient(slave);
5219 return;
5220 }
5221 sdsfree(bulkcount);
5222 }
5223 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5224 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5225 if (buflen <= 0) {
5226 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5227 (buflen == 0) ? "premature EOF" : strerror(errno));
5228 freeClient(slave);
5229 return;
5230 }
5231 if ((nwritten = write(fd,buf,buflen)) == -1) {
5232 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5233 strerror(errno));
5234 freeClient(slave);
5235 return;
5236 }
5237 slave->repldboff += nwritten;
5238 if (slave->repldboff == slave->repldbsize) {
5239 close(slave->repldbfd);
5240 slave->repldbfd = -1;
5241 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5242 slave->replstate = REDIS_REPL_ONLINE;
5243 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5244 sendReplyToClient, slave, NULL) == AE_ERR) {
5245 freeClient(slave);
5246 return;
5247 }
5248 addReplySds(slave,sdsempty());
5249 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5250 }
5251 }
5252
5253 /* This function is called at the end of every backgrond saving.
5254 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5255 * otherwise REDIS_ERR is passed to the function.
5256 *
5257 * The goal of this function is to handle slaves waiting for a successful
5258 * background saving in order to perform non-blocking synchronization. */
5259 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5260 listNode *ln;
5261 int startbgsave = 0;
5262
5263 listRewind(server.slaves);
5264 while((ln = listYield(server.slaves))) {
5265 redisClient *slave = ln->value;
5266
5267 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5268 startbgsave = 1;
5269 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5270 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5271 struct redis_stat buf;
5272
5273 if (bgsaveerr != REDIS_OK) {
5274 freeClient(slave);
5275 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5276 continue;
5277 }
5278 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5279 redis_fstat(slave->repldbfd,&buf) == -1) {
5280 freeClient(slave);
5281 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5282 continue;
5283 }
5284 slave->repldboff = 0;
5285 slave->repldbsize = buf.st_size;
5286 slave->replstate = REDIS_REPL_SEND_BULK;
5287 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5288 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
5289 freeClient(slave);
5290 continue;
5291 }
5292 }
5293 }
5294 if (startbgsave) {
5295 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5296 listRewind(server.slaves);
5297 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5298 while((ln = listYield(server.slaves))) {
5299 redisClient *slave = ln->value;
5300
5301 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5302 freeClient(slave);
5303 }
5304 }
5305 }
5306 }
5307
5308 static int syncWithMaster(void) {
5309 char buf[1024], tmpfile[256], authcmd[1024];
5310 int dumpsize;
5311 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5312 int dfd;
5313
5314 if (fd == -1) {
5315 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5316 strerror(errno));
5317 return REDIS_ERR;
5318 }
5319
5320 /* AUTH with the master if required. */
5321 if(server.masterauth) {
5322 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5323 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5324 close(fd);
5325 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5326 strerror(errno));
5327 return REDIS_ERR;
5328 }
5329 /* Read the AUTH result. */
5330 if (syncReadLine(fd,buf,1024,3600) == -1) {
5331 close(fd);
5332 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5333 strerror(errno));
5334 return REDIS_ERR;
5335 }
5336 if (buf[0] != '+') {
5337 close(fd);
5338 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5339 return REDIS_ERR;
5340 }
5341 }
5342
5343 /* Issue the SYNC command */
5344 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5345 close(fd);
5346 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5347 strerror(errno));
5348 return REDIS_ERR;
5349 }
5350 /* Read the bulk write count */
5351 if (syncReadLine(fd,buf,1024,3600) == -1) {
5352 close(fd);
5353 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5354 strerror(errno));
5355 return REDIS_ERR;
5356 }
5357 if (buf[0] != '$') {
5358 close(fd);
5359 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5360 return REDIS_ERR;
5361 }
5362 dumpsize = atoi(buf+1);
5363 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5364 /* Read the bulk write data on a temp file */
5365 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5366 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5367 if (dfd == -1) {
5368 close(fd);
5369 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5370 return REDIS_ERR;
5371 }
5372 while(dumpsize) {
5373 int nread, nwritten;
5374
5375 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5376 if (nread == -1) {
5377 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5378 strerror(errno));
5379 close(fd);
5380 close(dfd);
5381 return REDIS_ERR;
5382 }
5383 nwritten = write(dfd,buf,nread);
5384 if (nwritten == -1) {
5385 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5386 close(fd);
5387 close(dfd);
5388 return REDIS_ERR;
5389 }
5390 dumpsize -= nread;
5391 }
5392 close(dfd);
5393 if (rename(tmpfile,server.dbfilename) == -1) {
5394 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5395 unlink(tmpfile);
5396 close(fd);
5397 return REDIS_ERR;
5398 }
5399 emptyDb();
5400 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5401 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5402 close(fd);
5403 return REDIS_ERR;
5404 }
5405 server.master = createClient(fd);
5406 server.master->flags |= REDIS_MASTER;
5407 server.replstate = REDIS_REPL_CONNECTED;
5408 return REDIS_OK;
5409 }
5410
5411 static void slaveofCommand(redisClient *c) {
5412 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5413 !strcasecmp(c->argv[2]->ptr,"one")) {
5414 if (server.masterhost) {
5415 sdsfree(server.masterhost);
5416 server.masterhost = NULL;
5417 if (server.master) freeClient(server.master);
5418 server.replstate = REDIS_REPL_NONE;
5419 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5420 }
5421 } else {
5422 sdsfree(server.masterhost);
5423 server.masterhost = sdsdup(c->argv[1]->ptr);
5424 server.masterport = atoi(c->argv[2]->ptr);
5425 if (server.master) freeClient(server.master);
5426 server.replstate = REDIS_REPL_CONNECT;
5427 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5428 server.masterhost, server.masterport);
5429 }
5430 addReply(c,shared.ok);
5431 }
5432
5433 /* ============================ Maxmemory directive ======================== */
5434
5435 /* This function gets called when 'maxmemory' is set on the config file to limit
5436 * the max memory used by the server, and we are out of memory.
5437 * This function will try to, in order:
5438 *
5439 * - Free objects from the free list
5440 * - Try to remove keys with an EXPIRE set
5441 *
5442 * It is not possible to free enough memory to reach used-memory < maxmemory
5443 * the server will start refusing commands that will enlarge even more the
5444 * memory usage.
5445 */
5446 static void freeMemoryIfNeeded(void) {
5447 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5448 if (listLength(server.objfreelist)) {
5449 robj *o;
5450
5451 listNode *head = listFirst(server.objfreelist);
5452 o = listNodeValue(head);
5453 listDelNode(server.objfreelist,head);
5454 zfree(o);
5455 } else {
5456 int j, k, freed = 0;
5457
5458 for (j = 0; j < server.dbnum; j++) {
5459 int minttl = -1;
5460 robj *minkey = NULL;
5461 struct dictEntry *de;
5462
5463 if (dictSize(server.db[j].expires)) {
5464 freed = 1;
5465 /* From a sample of three keys drop the one nearest to
5466 * the natural expire */
5467 for (k = 0; k < 3; k++) {
5468 time_t t;
5469
5470 de = dictGetRandomKey(server.db[j].expires);
5471 t = (time_t) dictGetEntryVal(de);
5472 if (minttl == -1 || t < minttl) {
5473 minkey = dictGetEntryKey(de);
5474 minttl = t;
5475 }
5476 }
5477 deleteKey(server.db+j,minkey);
5478 }
5479 }
5480 if (!freed) return; /* nothing to free... */
5481 }
5482 }
5483 }
5484
5485 /* ============================== Append Only file ========================== */
5486
5487 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5488 sds buf = sdsempty();
5489 int j;
5490 ssize_t nwritten;
5491 time_t now;
5492 robj *tmpargv[3];
5493
5494 /* The DB this command was targetting is not the same as the last command
5495 * we appendend. To issue a SELECT command is needed. */
5496 if (dictid != server.appendseldb) {
5497 char seldb[64];
5498
5499 snprintf(seldb,sizeof(seldb),"%d",dictid);
5500 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5501 strlen(seldb),seldb);
5502 server.appendseldb = dictid;
5503 }
5504
5505 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5506 * EXPIREs into EXPIREATs calls */
5507 if (cmd->proc == expireCommand) {
5508 long when;
5509
5510 tmpargv[0] = createStringObject("EXPIREAT",8);
5511 tmpargv[1] = argv[1];
5512 incrRefCount(argv[1]);
5513 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5514 tmpargv[2] = createObject(REDIS_STRING,
5515 sdscatprintf(sdsempty(),"%ld",when));
5516 argv = tmpargv;
5517 }
5518
5519 /* Append the actual command */
5520 buf = sdscatprintf(buf,"*%d\r\n",argc);
5521 for (j = 0; j < argc; j++) {
5522 robj *o = argv[j];
5523
5524 if (o->encoding != REDIS_ENCODING_RAW)
5525 o = getDecodedObject(o);
5526 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5527 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5528 buf = sdscatlen(buf,"\r\n",2);
5529 if (o != argv[j])
5530 decrRefCount(o);
5531 }
5532
5533 /* Free the objects from the modified argv for EXPIREAT */
5534 if (cmd->proc == expireCommand) {
5535 for (j = 0; j < 3; j++)
5536 decrRefCount(argv[j]);
5537 }
5538
5539 /* We want to perform a single write. This should be guaranteed atomic
5540 * at least if the filesystem we are writing is a real physical one.
5541 * While this will save us against the server being killed I don't think
5542 * there is much to do about the whole server stopping for power problems
5543 * or alike */
5544 nwritten = write(server.appendfd,buf,sdslen(buf));
5545 if (nwritten != (signed)sdslen(buf)) {
5546 /* Ooops, we are in troubles. The best thing to do for now is
5547 * to simply exit instead to give the illusion that everything is
5548 * working as expected. */
5549 if (nwritten == -1) {
5550 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5551 } else {
5552 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5553 }
5554 exit(1);
5555 }
5556 now = time(NULL);
5557 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5558 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5559 now-server.lastfsync > 1))
5560 {
5561 fsync(server.appendfd); /* Let's try to get this data on the disk */
5562 server.lastfsync = now;
5563 }
5564 }
5565
5566 /* In Redis commands are always executed in the context of a client, so in
5567 * order to load the append only file we need to create a fake client. */
5568 static struct redisClient *createFakeClient(void) {
5569 struct redisClient *c = zmalloc(sizeof(*c));
5570
5571 selectDb(c,0);
5572 c->fd = -1;
5573 c->querybuf = sdsempty();
5574 c->argc = 0;
5575 c->argv = NULL;
5576 c->flags = 0;
5577 /* We set the fake client as a slave waiting for the synchronization
5578 * so that Redis will not try to send replies to this client. */
5579 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5580 c->reply = listCreate();
5581 listSetFreeMethod(c->reply,decrRefCount);
5582 listSetDupMethod(c->reply,dupClientReplyValue);
5583 return c;
5584 }
5585
5586 static void freeFakeClient(struct redisClient *c) {
5587 sdsfree(c->querybuf);
5588 listRelease(c->reply);
5589 zfree(c);
5590 }
5591
5592 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5593 * error (the append only file is zero-length) REDIS_ERR is returned. On
5594 * fatal error an error message is logged and the program exists. */
5595 int loadAppendOnlyFile(char *filename) {
5596 struct redisClient *fakeClient;
5597 FILE *fp = fopen(filename,"r");
5598 struct redis_stat sb;
5599
5600 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5601 return REDIS_ERR;
5602
5603 if (fp == NULL) {
5604 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5605 exit(1);
5606 }
5607
5608 fakeClient = createFakeClient();
5609 while(1) {
5610 int argc, j;
5611 unsigned long len;
5612 robj **argv;
5613 char buf[128];
5614 sds argsds;
5615 struct redisCommand *cmd;
5616
5617 if (fgets(buf,sizeof(buf),fp) == NULL) {
5618 if (feof(fp))
5619 break;
5620 else
5621 goto readerr;
5622 }
5623 if (buf[0] != '*') goto fmterr;
5624 argc = atoi(buf+1);
5625 argv = zmalloc(sizeof(robj*)*argc);
5626 for (j = 0; j < argc; j++) {
5627 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5628 if (buf[0] != '$') goto fmterr;
5629 len = strtol(buf+1,NULL,10);
5630 argsds = sdsnewlen(NULL,len);
5631 if (fread(argsds,len,1,fp) == 0) goto fmterr;
5632 argv[j] = createObject(REDIS_STRING,argsds);
5633 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5634 }
5635
5636 /* Command lookup */
5637 cmd = lookupCommand(argv[0]->ptr);
5638 if (!cmd) {
5639 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5640 exit(1);
5641 }
5642 /* Try object sharing and encoding */
5643 if (server.shareobjects) {
5644 int j;
5645 for(j = 1; j < argc; j++)
5646 argv[j] = tryObjectSharing(argv[j]);
5647 }
5648 if (cmd->flags & REDIS_CMD_BULK)
5649 tryObjectEncoding(argv[argc-1]);
5650 /* Run the command in the context of a fake client */
5651 fakeClient->argc = argc;
5652 fakeClient->argv = argv;
5653 cmd->proc(fakeClient);
5654 /* Discard the reply objects list from the fake client */
5655 while(listLength(fakeClient->reply))
5656 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5657 /* Clean up, ready for the next command */
5658 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5659 zfree(argv);
5660 }
5661 fclose(fp);
5662 freeFakeClient(fakeClient);
5663 return REDIS_OK;
5664
5665 readerr:
5666 if (feof(fp)) {
5667 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5668 } else {
5669 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5670 }
5671 exit(1);
5672 fmterr:
5673 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5674 exit(1);
5675 }
5676
5677 /* ================================= Debugging ============================== */
5678
5679 static void debugCommand(redisClient *c) {
5680 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
5681 *((char*)-1) = 'x';
5682 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
5683 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
5684 robj *key, *val;
5685
5686 if (!de) {
5687 addReply(c,shared.nokeyerr);
5688 return;
5689 }
5690 key = dictGetEntryKey(de);
5691 val = dictGetEntryVal(de);
5692 addReplySds(c,sdscatprintf(sdsempty(),
5693 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
5694 key, key->refcount, val, val->refcount, val->encoding));
5695 } else {
5696 addReplySds(c,sdsnew(
5697 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
5698 }
5699 }
5700
5701 /* =================================== Main! ================================ */
5702
5703 #ifdef __linux__
5704 int linuxOvercommitMemoryValue(void) {
5705 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
5706 char buf[64];
5707
5708 if (!fp) return -1;
5709 if (fgets(buf,64,fp) == NULL) {
5710 fclose(fp);
5711 return -1;
5712 }
5713 fclose(fp);
5714
5715 return atoi(buf);
5716 }
5717
5718 void linuxOvercommitMemoryWarning(void) {
5719 if (linuxOvercommitMemoryValue() == 0) {
5720 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
5721 }
5722 }
5723 #endif /* __linux__ */
5724
5725 static void daemonize(void) {
5726 int fd;
5727 FILE *fp;
5728
5729 if (fork() != 0) exit(0); /* parent exits */
5730 setsid(); /* create a new session */
5731
5732 /* Every output goes to /dev/null. If Redis is daemonized but
5733 * the 'logfile' is set to 'stdout' in the configuration file
5734 * it will not log at all. */
5735 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
5736 dup2(fd, STDIN_FILENO);
5737 dup2(fd, STDOUT_FILENO);
5738 dup2(fd, STDERR_FILENO);
5739 if (fd > STDERR_FILENO) close(fd);
5740 }
5741 /* Try to write the pid file */
5742 fp = fopen(server.pidfile,"w");
5743 if (fp) {
5744 fprintf(fp,"%d\n",getpid());
5745 fclose(fp);
5746 }
5747 }
5748
5749 int main(int argc, char **argv) {
5750 initServerConfig();
5751 if (argc == 2) {
5752 resetServerSaveParams();
5753 loadServerConfig(argv[1]);
5754 } else if (argc > 2) {
5755 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
5756 exit(1);
5757 } else {
5758 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
5759 }
5760 initServer();
5761 if (server.daemonize) daemonize();
5762 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
5763 #ifdef __linux__
5764 linuxOvercommitMemoryWarning();
5765 #endif
5766 if (server.appendonly) {
5767 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
5768 redisLog(REDIS_NOTICE,"DB loaded from append only file");
5769 } else {
5770 if (rdbLoad(server.dbfilename) == REDIS_OK)
5771 redisLog(REDIS_NOTICE,"DB loaded from disk");
5772 }
5773 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
5774 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
5775 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
5776 aeMain(server.el);
5777 aeDeleteEventLoop(server.el);
5778 return 0;
5779 }
5780
5781 /* ============================= Backtrace support ========================= */
5782
5783 #ifdef HAVE_BACKTRACE
5784 static char *findFuncName(void *pointer, unsigned long *offset);
5785
5786 static void *getMcontextEip(ucontext_t *uc) {
5787 #if defined(__FreeBSD__)
5788 return (void*) uc->uc_mcontext.mc_eip;
5789 #elif defined(__dietlibc__)
5790 return (void*) uc->uc_mcontext.eip;
5791 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
5792 return (void*) uc->uc_mcontext->__ss.__eip;
5793 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
5794 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
5795 return (void*) uc->uc_mcontext->__ss.__rip;
5796 #else
5797 return (void*) uc->uc_mcontext->__ss.__eip;
5798 #endif
5799 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
5800 return (void*) uc->uc_mcontext.gregs[REG_EIP];
5801 #elif defined(__ia64__) /* Linux IA64 */
5802 return (void*) uc->uc_mcontext.sc_ip;
5803 #else
5804 return NULL;
5805 #endif
5806 }
5807
5808 static void segvHandler(int sig, siginfo_t *info, void *secret) {
5809 void *trace[100];
5810 char **messages = NULL;
5811 int i, trace_size = 0;
5812 unsigned long offset=0;
5813 time_t uptime = time(NULL)-server.stat_starttime;
5814 ucontext_t *uc = (ucontext_t*) secret;
5815 REDIS_NOTUSED(info);
5816
5817 redisLog(REDIS_WARNING,
5818 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
5819 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
5820 "redis_version:%s; "
5821 "uptime_in_seconds:%d; "
5822 "connected_clients:%d; "
5823 "connected_slaves:%d; "
5824 "used_memory:%zu; "
5825 "changes_since_last_save:%lld; "
5826 "bgsave_in_progress:%d; "
5827 "last_save_time:%d; "
5828 "total_connections_received:%lld; "
5829 "total_commands_processed:%lld; "
5830 "role:%s;"
5831 ,REDIS_VERSION,
5832 uptime,
5833 listLength(server.clients)-listLength(server.slaves),
5834 listLength(server.slaves),
5835 server.usedmemory,
5836 server.dirty,
5837 server.bgsaveinprogress,
5838 server.lastsave,
5839 server.stat_numconnections,
5840 server.stat_numcommands,
5841 server.masterhost == NULL ? "master" : "slave"
5842 ));
5843
5844 trace_size = backtrace(trace, 100);
5845 /* overwrite sigaction with caller's address */
5846 if (getMcontextEip(uc) != NULL) {
5847 trace[1] = getMcontextEip(uc);
5848 }
5849 messages = backtrace_symbols(trace, trace_size);
5850
5851 for (i=1; i<trace_size; ++i) {
5852 char *fn = findFuncName(trace[i], &offset), *p;
5853
5854 p = strchr(messages[i],'+');
5855 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
5856 redisLog(REDIS_WARNING,"%s", messages[i]);
5857 } else {
5858 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
5859 }
5860 }
5861 free(messages);
5862 exit(0);
5863 }
5864
5865 static void setupSigSegvAction(void) {
5866 struct sigaction act;
5867
5868 sigemptyset (&act.sa_mask);
5869 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
5870 * is used. Otherwise, sa_handler is used */
5871 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
5872 act.sa_sigaction = segvHandler;
5873 sigaction (SIGSEGV, &act, NULL);
5874 sigaction (SIGBUS, &act, NULL);
5875 sigaction (SIGFPE, &act, NULL);
5876 sigaction (SIGILL, &act, NULL);
5877 sigaction (SIGBUS, &act, NULL);
5878 return;
5879 }
5880
5881 #include "staticsymbols.h"
5882 /* This function try to convert a pointer into a function name. It's used in
5883 * oreder to provide a backtrace under segmentation fault that's able to
5884 * display functions declared as static (otherwise the backtrace is useless). */
5885 static char *findFuncName(void *pointer, unsigned long *offset){
5886 int i, ret = -1;
5887 unsigned long off, minoff = 0;
5888
5889 /* Try to match against the Symbol with the smallest offset */
5890 for (i=0; symsTable[i].pointer; i++) {
5891 unsigned long lp = (unsigned long) pointer;
5892
5893 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
5894 off=lp-symsTable[i].pointer;
5895 if (ret < 0 || off < minoff) {
5896 minoff=off;
5897 ret=i;
5898 }
5899 }
5900 }
5901 if (ret == -1) return NULL;
5902 *offset = minoff;
5903 return symsTable[ret].name;
5904 }
5905 #else /* HAVE_BACKTRACE */
5906 static void setupSigSegvAction(void) {
5907 }
5908 #endif /* HAVE_BACKTRACE */
5909
5910
5911
5912 /* The End */
5913
5914
5915