]> git.saurik.com Git - redis.git/blob - redis.c
a988392265478c4a32efa26977f01e0cd3aea395
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "0.101"
31
32 #include "fmacros.h"
33
34 #include <stdio.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 #define __USE_POSIX199309
40 #include <signal.h>
41 #include <execinfo.h>
42 #include <ucontext.h>
43 #include <sys/wait.h>
44 #include <errno.h>
45 #include <assert.h>
46 #include <ctype.h>
47 #include <stdarg.h>
48 #include <inttypes.h>
49 #include <arpa/inet.h>
50 #include <sys/stat.h>
51 #include <fcntl.h>
52 #include <sys/time.h>
53 #include <sys/resource.h>
54 #include <limits.h>
55 #include <execinfo.h>
56
57 #include "redis.h"
58 #include "ae.h" /* Event driven programming library */
59 #include "sds.h" /* Dynamic safe strings */
60 #include "anet.h" /* Networking the easy way */
61 #include "dict.h" /* Hash tables */
62 #include "adlist.h" /* Linked lists */
63 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
64 #include "lzf.h" /* LZF compression library */
65 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
66
67 /* Error codes */
68 #define REDIS_OK 0
69 #define REDIS_ERR -1
70
71 /* Static server configuration */
72 #define REDIS_SERVERPORT 6379 /* TCP port */
73 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
74 #define REDIS_IOBUF_LEN 1024
75 #define REDIS_LOADBUF_LEN 1024
76 #define REDIS_STATIC_ARGS 4
77 #define REDIS_DEFAULT_DBNUM 16
78 #define REDIS_CONFIGLINE_MAX 1024
79 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
80 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
81 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
82
83 /* Hash table parameters */
84 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
85 #define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
86
87 /* Command flags */
88 #define REDIS_CMD_BULK 1 /* Bulk write command */
89 #define REDIS_CMD_INLINE 2 /* Inline command */
90 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
91 this flags will return an error when the 'maxmemory' option is set in the
92 config file and the server is using more than maxmemory bytes of memory.
93 In short this commands are denied on low memory conditions. */
94 #define REDIS_CMD_DENYOOM 4
95
96 /* Object types */
97 #define REDIS_STRING 0
98 #define REDIS_LIST 1
99 #define REDIS_SET 2
100 #define REDIS_HASH 3
101
102 /* Object types only used for dumping to disk */
103 #define REDIS_EXPIRETIME 253
104 #define REDIS_SELECTDB 254
105 #define REDIS_EOF 255
106
107 /* Defines related to the dump file format. To store 32 bits lengths for short
108 * keys requires a lot of space, so we check the most significant 2 bits of
109 * the first byte to interpreter the length:
110 *
111 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
112 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
113 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
114 * 11|000000 this means: specially encoded object will follow. The six bits
115 * number specify the kind of object that follows.
116 * See the REDIS_RDB_ENC_* defines.
117 *
118 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
119 * values, will fit inside. */
120 #define REDIS_RDB_6BITLEN 0
121 #define REDIS_RDB_14BITLEN 1
122 #define REDIS_RDB_32BITLEN 2
123 #define REDIS_RDB_ENCVAL 3
124 #define REDIS_RDB_LENERR UINT_MAX
125
126 /* When a length of a string object stored on disk has the first two bits
127 * set, the remaining two bits specify a special encoding for the object
128 * accordingly to the following defines: */
129 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
130 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
131 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
132 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
133
134 /* Client flags */
135 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
136 #define REDIS_SLAVE 2 /* This client is a slave server */
137 #define REDIS_MASTER 4 /* This client is a master server */
138 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
139
140 /* Slave replication state - slave side */
141 #define REDIS_REPL_NONE 0 /* No active replication */
142 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
143 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
144
145 /* Slave replication state - from the point of view of master
146 * Note that in SEND_BULK and ONLINE state the slave receives new updates
147 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
148 * to start the next background saving in order to send updates to it. */
149 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
150 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
151 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
152 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
153
154 /* List related stuff */
155 #define REDIS_HEAD 0
156 #define REDIS_TAIL 1
157
158 /* Sort operations */
159 #define REDIS_SORT_GET 0
160 #define REDIS_SORT_DEL 1
161 #define REDIS_SORT_INCR 2
162 #define REDIS_SORT_DECR 3
163 #define REDIS_SORT_ASC 4
164 #define REDIS_SORT_DESC 5
165 #define REDIS_SORTKEY_MAX 1024
166
167 /* Log levels */
168 #define REDIS_DEBUG 0
169 #define REDIS_NOTICE 1
170 #define REDIS_WARNING 2
171
172 /* Anti-warning macro... */
173 #define REDIS_NOTUSED(V) ((void) V)
174
175
176 /*================================= Data types ============================== */
177
178 /* A redis object, that is a type able to hold a string / list / set */
179 typedef struct redisObject {
180 void *ptr;
181 int type;
182 int refcount;
183 } robj;
184
185 typedef struct redisDb {
186 dict *dict;
187 dict *expires;
188 int id;
189 } redisDb;
190
191 /* With multiplexing we need to take per-clinet state.
192 * Clients are taken in a liked list. */
193 typedef struct redisClient {
194 int fd;
195 redisDb *db;
196 int dictid;
197 sds querybuf;
198 robj **argv;
199 int argc;
200 int bulklen; /* bulk read len. -1 if not in bulk read mode */
201 list *reply;
202 int sentlen;
203 time_t lastinteraction; /* time of the last interaction, used for timeout */
204 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
205 int slaveseldb; /* slave selected db, if this client is a slave */
206 int authenticated; /* when requirepass is non-NULL */
207 int replstate; /* replication state if this is a slave */
208 int repldbfd; /* replication DB file descriptor */
209 long repldboff; /* replication DB file offset */
210 off_t repldbsize; /* replication DB file size */
211 } redisClient;
212
213 struct saveparam {
214 time_t seconds;
215 int changes;
216 };
217
218 /* Global server state structure */
219 struct redisServer {
220 int port;
221 int fd;
222 redisDb *db;
223 dict *sharingpool;
224 unsigned int sharingpoolsize;
225 long long dirty; /* changes to DB from the last save */
226 list *clients;
227 list *slaves, *monitors;
228 char neterr[ANET_ERR_LEN];
229 aeEventLoop *el;
230 int cronloops; /* number of times the cron function run */
231 list *objfreelist; /* A list of freed objects to avoid malloc() */
232 time_t lastsave; /* Unix time of last save succeeede */
233 size_t usedmemory; /* Used memory in megabytes */
234 /* Fields used only for stats */
235 time_t stat_starttime; /* server start time */
236 long long stat_numcommands; /* number of processed commands */
237 long long stat_numconnections; /* number of connections received */
238 /* Configuration */
239 int verbosity;
240 int glueoutputbuf;
241 int maxidletime;
242 int dbnum;
243 int daemonize;
244 char *pidfile;
245 int bgsaveinprogress;
246 struct saveparam *saveparams;
247 int saveparamslen;
248 char *logfile;
249 char *bindaddr;
250 char *dbfilename;
251 char *requirepass;
252 int shareobjects;
253 /* Replication related */
254 int isslave;
255 char *masterhost;
256 int masterport;
257 redisClient *master; /* client that is master for this slave */
258 int replstate;
259 unsigned int maxclients;
260 unsigned int maxmemory;
261 /* Sort parameters - qsort_r() is only available under BSD so we
262 * have to take this state global, in order to pass it to sortCompare() */
263 int sort_desc;
264 int sort_alpha;
265 int sort_bypattern;
266 };
267
268 typedef void redisCommandProc(redisClient *c);
269 struct redisCommand {
270 char *name;
271 redisCommandProc *proc;
272 int arity;
273 int flags;
274 };
275
276 struct redisFunctionSym {
277 char *name;
278 long pointer;
279 };
280
281 typedef struct _redisSortObject {
282 robj *obj;
283 union {
284 double score;
285 robj *cmpobj;
286 } u;
287 } redisSortObject;
288
289 typedef struct _redisSortOperation {
290 int type;
291 robj *pattern;
292 } redisSortOperation;
293
294 struct sharedObjectsStruct {
295 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
296 *colon, *nullbulk, *nullmultibulk,
297 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
298 *outofrangeerr, *plus,
299 *select0, *select1, *select2, *select3, *select4,
300 *select5, *select6, *select7, *select8, *select9;
301 } shared;
302
303 /*================================ Prototypes =============================== */
304
305 static void freeStringObject(robj *o);
306 static void freeListObject(robj *o);
307 static void freeSetObject(robj *o);
308 static void decrRefCount(void *o);
309 static robj *createObject(int type, void *ptr);
310 static void freeClient(redisClient *c);
311 static int rdbLoad(char *filename);
312 static void addReply(redisClient *c, robj *obj);
313 static void addReplySds(redisClient *c, sds s);
314 static void incrRefCount(robj *o);
315 static int rdbSaveBackground(char *filename);
316 static robj *createStringObject(char *ptr, size_t len);
317 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
318 static int syncWithMaster(void);
319 static robj *tryObjectSharing(robj *o);
320 static int removeExpire(redisDb *db, robj *key);
321 static int expireIfNeeded(redisDb *db, robj *key);
322 static int deleteIfVolatile(redisDb *db, robj *key);
323 static int deleteKey(redisDb *db, robj *key);
324 static time_t getExpire(redisDb *db, robj *key);
325 static int setExpire(redisDb *db, robj *key, time_t when);
326 static void updateSalvesWaitingBgsave(int bgsaveerr);
327 static void freeMemoryIfNeeded(void);
328 static int processCommand(redisClient *c);
329
330 static void authCommand(redisClient *c);
331 static void pingCommand(redisClient *c);
332 static void echoCommand(redisClient *c);
333 static void setCommand(redisClient *c);
334 static void setnxCommand(redisClient *c);
335 static void getCommand(redisClient *c);
336 static void delCommand(redisClient *c);
337 static void existsCommand(redisClient *c);
338 static void incrCommand(redisClient *c);
339 static void decrCommand(redisClient *c);
340 static void incrbyCommand(redisClient *c);
341 static void decrbyCommand(redisClient *c);
342 static void selectCommand(redisClient *c);
343 static void randomkeyCommand(redisClient *c);
344 static void keysCommand(redisClient *c);
345 static void dbsizeCommand(redisClient *c);
346 static void lastsaveCommand(redisClient *c);
347 static void saveCommand(redisClient *c);
348 static void bgsaveCommand(redisClient *c);
349 static void shutdownCommand(redisClient *c);
350 static void moveCommand(redisClient *c);
351 static void renameCommand(redisClient *c);
352 static void renamenxCommand(redisClient *c);
353 static void lpushCommand(redisClient *c);
354 static void rpushCommand(redisClient *c);
355 static void lpopCommand(redisClient *c);
356 static void rpopCommand(redisClient *c);
357 static void llenCommand(redisClient *c);
358 static void lindexCommand(redisClient *c);
359 static void lrangeCommand(redisClient *c);
360 static void ltrimCommand(redisClient *c);
361 static void typeCommand(redisClient *c);
362 static void lsetCommand(redisClient *c);
363 static void saddCommand(redisClient *c);
364 static void sremCommand(redisClient *c);
365 static void smoveCommand(redisClient *c);
366 static void sismemberCommand(redisClient *c);
367 static void scardCommand(redisClient *c);
368 static void sinterCommand(redisClient *c);
369 static void sinterstoreCommand(redisClient *c);
370 static void sunionCommand(redisClient *c);
371 static void sunionstoreCommand(redisClient *c);
372 static void sdiffCommand(redisClient *c);
373 static void sdiffstoreCommand(redisClient *c);
374 static void syncCommand(redisClient *c);
375 static void flushdbCommand(redisClient *c);
376 static void flushallCommand(redisClient *c);
377 static void sortCommand(redisClient *c);
378 static void lremCommand(redisClient *c);
379 static void infoCommand(redisClient *c);
380 static void mgetCommand(redisClient *c);
381 static void monitorCommand(redisClient *c);
382 static void expireCommand(redisClient *c);
383 static void getSetCommand(redisClient *c);
384 static void ttlCommand(redisClient *c);
385 static void slaveofCommand(redisClient *c);
386 static void debugCommand(redisClient *c);
387 static void setupSigSegvAction();
388 /*================================= Globals ================================= */
389
390 /* Global vars */
391 static struct redisServer server; /* server global state */
392 static struct redisCommand cmdTable[] = {
393 {"get",getCommand,2,REDIS_CMD_INLINE},
394 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
395 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
396 {"del",delCommand,-2,REDIS_CMD_INLINE},
397 {"exists",existsCommand,2,REDIS_CMD_INLINE},
398 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
399 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
400 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
401 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
402 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
403 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
404 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
405 {"llen",llenCommand,2,REDIS_CMD_INLINE},
406 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
407 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
408 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
409 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
410 {"lrem",lremCommand,4,REDIS_CMD_BULK},
411 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
412 {"srem",sremCommand,3,REDIS_CMD_BULK},
413 {"smove",smoveCommand,4,REDIS_CMD_BULK},
414 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
415 {"scard",scardCommand,2,REDIS_CMD_INLINE},
416 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
417 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
418 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
419 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
420 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
421 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
422 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
423 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
424 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
425 {"getset",getSetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
426 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
427 {"select",selectCommand,2,REDIS_CMD_INLINE},
428 {"move",moveCommand,3,REDIS_CMD_INLINE},
429 {"rename",renameCommand,3,REDIS_CMD_INLINE},
430 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
431 {"expire",expireCommand,3,REDIS_CMD_INLINE},
432 {"keys",keysCommand,2,REDIS_CMD_INLINE},
433 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
434 {"auth",authCommand,2,REDIS_CMD_INLINE},
435 {"ping",pingCommand,1,REDIS_CMD_INLINE},
436 {"echo",echoCommand,2,REDIS_CMD_BULK},
437 {"save",saveCommand,1,REDIS_CMD_INLINE},
438 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
439 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
440 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
441 {"type",typeCommand,2,REDIS_CMD_INLINE},
442 {"sync",syncCommand,1,REDIS_CMD_INLINE},
443 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
444 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
445 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
446 {"info",infoCommand,1,REDIS_CMD_INLINE},
447 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
448 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
449 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
450 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
451 {NULL,NULL,0,0}
452 };
453 static struct redisFunctionSym symsTable[] = {
454 {"freeStringObject", (long)freeStringObject},
455 {"freeListObject", (long)freeListObject},
456 {"freeSetObject", (long)freeSetObject},
457 {"decrRefCount", (long)decrRefCount},
458 {"createObject", (long)createObject},
459 {"freeClient", (long)freeClient},
460 {"rdbLoad", (long)rdbLoad},
461 {"addReply", (long)addReply},
462 {"addReplySds", (long)addReplySds},
463 {"incrRefCount", (long)incrRefCount},
464 {"rdbSaveBackground", (long)rdbSaveBackground},
465 {"createStringObject", (long)createStringObject},
466 {"replicationFeedSlaves", (long)replicationFeedSlaves},
467 {"syncWithMaster", (long)syncWithMaster},
468 {"tryObjectSharing", (long)tryObjectSharing},
469 {"removeExpire", (long)removeExpire},
470 {"expireIfNeeded", (long)expireIfNeeded},
471 {"deleteIfVolatile", (long)deleteIfVolatile},
472 {"deleteKey", (long)deleteKey},
473 {"getExpire", (long)getExpire},
474 {"setExpire", (long)setExpire},
475 {"updateSalvesWaitingBgsave", (long)updateSalvesWaitingBgsave},
476 {"freeMemoryIfNeeded", (long)freeMemoryIfNeeded},
477 {"authCommand", (long)authCommand},
478 {"pingCommand", (long)pingCommand},
479 {"echoCommand", (long)echoCommand},
480 {"setCommand", (long)setCommand},
481 {"setnxCommand", (long)setnxCommand},
482 {"getCommand", (long)getCommand},
483 {"delCommand", (long)delCommand},
484 {"existsCommand", (long)existsCommand},
485 {"incrCommand", (long)incrCommand},
486 {"decrCommand", (long)decrCommand},
487 {"incrbyCommand", (long)incrbyCommand},
488 {"decrbyCommand", (long)decrbyCommand},
489 {"selectCommand", (long)selectCommand},
490 {"randomkeyCommand", (long)randomkeyCommand},
491 {"keysCommand", (long)keysCommand},
492 {"dbsizeCommand", (long)dbsizeCommand},
493 {"lastsaveCommand", (long)lastsaveCommand},
494 {"saveCommand", (long)saveCommand},
495 {"bgsaveCommand", (long)bgsaveCommand},
496 {"shutdownCommand", (long)shutdownCommand},
497 {"moveCommand", (long)moveCommand},
498 {"renameCommand", (long)renameCommand},
499 {"renamenxCommand", (long)renamenxCommand},
500 {"lpushCommand", (long)lpushCommand},
501 {"rpushCommand", (long)rpushCommand},
502 {"lpopCommand", (long)lpopCommand},
503 {"rpopCommand", (long)rpopCommand},
504 {"llenCommand", (long)llenCommand},
505 {"lindexCommand", (long)lindexCommand},
506 {"lrangeCommand", (long)lrangeCommand},
507 {"ltrimCommand", (long)ltrimCommand},
508 {"typeCommand", (long)typeCommand},
509 {"lsetCommand", (long)lsetCommand},
510 {"saddCommand", (long)saddCommand},
511 {"sremCommand", (long)sremCommand},
512 {"smoveCommand", (long)smoveCommand},
513 {"sismemberCommand", (long)sismemberCommand},
514 {"scardCommand", (long)scardCommand},
515 {"sinterCommand", (long)sinterCommand},
516 {"sinterstoreCommand", (long)sinterstoreCommand},
517 {"sunionCommand", (long)sunionCommand},
518 {"sunionstoreCommand", (long)sunionstoreCommand},
519 {"sdiffCommand", (long)sdiffCommand},
520 {"sdiffstoreCommand", (long)sdiffstoreCommand},
521 {"syncCommand", (long)syncCommand},
522 {"flushdbCommand", (long)flushdbCommand},
523 {"flushallCommand", (long)flushallCommand},
524 {"sortCommand", (long)sortCommand},
525 {"lremCommand", (long)lremCommand},
526 {"infoCommand", (long)infoCommand},
527 {"mgetCommand", (long)mgetCommand},
528 {"monitorCommand", (long)monitorCommand},
529 {"expireCommand", (long)expireCommand},
530 {"getSetCommand", (long)getSetCommand},
531 {"ttlCommand", (long)ttlCommand},
532 {"slaveofCommand", (long)slaveofCommand},
533 {"debugCommand", (long)debugCommand},
534 {"processCommand", (long)processCommand},
535 {"setupSigSegvAction", (long)setupSigSegvAction},
536 {NULL,0}
537 };
538 /*============================ Utility functions ============================ */
539
540 /* Glob-style pattern matching. */
541 int stringmatchlen(const char *pattern, int patternLen,
542 const char *string, int stringLen, int nocase)
543 {
544 while(patternLen) {
545 switch(pattern[0]) {
546 case '*':
547 while (pattern[1] == '*') {
548 pattern++;
549 patternLen--;
550 }
551 if (patternLen == 1)
552 return 1; /* match */
553 while(stringLen) {
554 if (stringmatchlen(pattern+1, patternLen-1,
555 string, stringLen, nocase))
556 return 1; /* match */
557 string++;
558 stringLen--;
559 }
560 return 0; /* no match */
561 break;
562 case '?':
563 if (stringLen == 0)
564 return 0; /* no match */
565 string++;
566 stringLen--;
567 break;
568 case '[':
569 {
570 int not, match;
571
572 pattern++;
573 patternLen--;
574 not = pattern[0] == '^';
575 if (not) {
576 pattern++;
577 patternLen--;
578 }
579 match = 0;
580 while(1) {
581 if (pattern[0] == '\\') {
582 pattern++;
583 patternLen--;
584 if (pattern[0] == string[0])
585 match = 1;
586 } else if (pattern[0] == ']') {
587 break;
588 } else if (patternLen == 0) {
589 pattern--;
590 patternLen++;
591 break;
592 } else if (pattern[1] == '-' && patternLen >= 3) {
593 int start = pattern[0];
594 int end = pattern[2];
595 int c = string[0];
596 if (start > end) {
597 int t = start;
598 start = end;
599 end = t;
600 }
601 if (nocase) {
602 start = tolower(start);
603 end = tolower(end);
604 c = tolower(c);
605 }
606 pattern += 2;
607 patternLen -= 2;
608 if (c >= start && c <= end)
609 match = 1;
610 } else {
611 if (!nocase) {
612 if (pattern[0] == string[0])
613 match = 1;
614 } else {
615 if (tolower((int)pattern[0]) == tolower((int)string[0]))
616 match = 1;
617 }
618 }
619 pattern++;
620 patternLen--;
621 }
622 if (not)
623 match = !match;
624 if (!match)
625 return 0; /* no match */
626 string++;
627 stringLen--;
628 break;
629 }
630 case '\\':
631 if (patternLen >= 2) {
632 pattern++;
633 patternLen--;
634 }
635 /* fall through */
636 default:
637 if (!nocase) {
638 if (pattern[0] != string[0])
639 return 0; /* no match */
640 } else {
641 if (tolower((int)pattern[0]) != tolower((int)string[0]))
642 return 0; /* no match */
643 }
644 string++;
645 stringLen--;
646 break;
647 }
648 pattern++;
649 patternLen--;
650 if (stringLen == 0) {
651 while(*pattern == '*') {
652 pattern++;
653 patternLen--;
654 }
655 break;
656 }
657 }
658 if (patternLen == 0 && stringLen == 0)
659 return 1;
660 return 0;
661 }
662
663 void redisLog(int level, const char *fmt, ...)
664 {
665 va_list ap;
666 FILE *fp;
667
668 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
669 if (!fp) return;
670
671 va_start(ap, fmt);
672 if (level >= server.verbosity) {
673 char *c = ".-*";
674 char buf[64];
675 time_t now;
676
677 now = time(NULL);
678 strftime(buf,64,"%d %b %H:%M:%S",gmtime(&now));
679 fprintf(fp,"%s %c ",buf,c[level]);
680 vfprintf(fp, fmt, ap);
681 fprintf(fp,"\n");
682 fflush(fp);
683 }
684 va_end(ap);
685
686 if (server.logfile) fclose(fp);
687 }
688
689 /*====================== Hash table type implementation ==================== */
690
691 /* This is an hash table type that uses the SDS dynamic strings libary as
692 * keys and radis objects as values (objects can hold SDS strings,
693 * lists, sets). */
694
695 static int sdsDictKeyCompare(void *privdata, const void *key1,
696 const void *key2)
697 {
698 int l1,l2;
699 DICT_NOTUSED(privdata);
700
701 l1 = sdslen((sds)key1);
702 l2 = sdslen((sds)key2);
703 if (l1 != l2) return 0;
704 return memcmp(key1, key2, l1) == 0;
705 }
706
707 static void dictRedisObjectDestructor(void *privdata, void *val)
708 {
709 DICT_NOTUSED(privdata);
710
711 decrRefCount(val);
712 }
713
714 static int dictSdsKeyCompare(void *privdata, const void *key1,
715 const void *key2)
716 {
717 const robj *o1 = key1, *o2 = key2;
718 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
719 }
720
721 static unsigned int dictSdsHash(const void *key) {
722 const robj *o = key;
723 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
724 }
725
726 static dictType setDictType = {
727 dictSdsHash, /* hash function */
728 NULL, /* key dup */
729 NULL, /* val dup */
730 dictSdsKeyCompare, /* key compare */
731 dictRedisObjectDestructor, /* key destructor */
732 NULL /* val destructor */
733 };
734
735 static dictType hashDictType = {
736 dictSdsHash, /* hash function */
737 NULL, /* key dup */
738 NULL, /* val dup */
739 dictSdsKeyCompare, /* key compare */
740 dictRedisObjectDestructor, /* key destructor */
741 dictRedisObjectDestructor /* val destructor */
742 };
743
744 /* ========================= Random utility functions ======================= */
745
746 /* Redis generally does not try to recover from out of memory conditions
747 * when allocating objects or strings, it is not clear if it will be possible
748 * to report this condition to the client since the networking layer itself
749 * is based on heap allocation for send buffers, so we simply abort.
750 * At least the code will be simpler to read... */
751 static void oom(const char *msg) {
752 fprintf(stderr, "%s: Out of memory\n",msg);
753 fflush(stderr);
754 sleep(1);
755 abort();
756 }
757
758 /* ====================== Redis server networking stuff ===================== */
759 void closeTimedoutClients(void) {
760 redisClient *c;
761 listNode *ln;
762 time_t now = time(NULL);
763
764 listRewind(server.clients);
765 while ((ln = listYield(server.clients)) != NULL) {
766 c = listNodeValue(ln);
767 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
768 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
769 (now - c->lastinteraction > server.maxidletime)) {
770 redisLog(REDIS_DEBUG,"Closing idle client");
771 freeClient(c);
772 }
773 }
774 }
775
776 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
777 * we resize the hash table to save memory */
778 void tryResizeHashTables(void) {
779 int j;
780
781 for (j = 0; j < server.dbnum; j++) {
782 long long size, used;
783
784 size = dictSlots(server.db[j].dict);
785 used = dictSize(server.db[j].dict);
786 if (size && used && size > REDIS_HT_MINSLOTS &&
787 (used*100/size < REDIS_HT_MINFILL)) {
788 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
789 dictResize(server.db[j].dict);
790 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
791 }
792 }
793 }
794
795 int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
796 int j, loops = server.cronloops++;
797 REDIS_NOTUSED(eventLoop);
798 REDIS_NOTUSED(id);
799 REDIS_NOTUSED(clientData);
800
801 /* Update the global state with the amount of used memory */
802 server.usedmemory = zmalloc_used_memory();
803
804 /* Show some info about non-empty databases */
805 for (j = 0; j < server.dbnum; j++) {
806 long long size, used, vkeys;
807
808 size = dictSlots(server.db[j].dict);
809 used = dictSize(server.db[j].dict);
810 vkeys = dictSize(server.db[j].expires);
811 if (!(loops % 5) && used > 0) {
812 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
813 /* dictPrintStats(server.dict); */
814 }
815 }
816
817 /* We don't want to resize the hash tables while a bacground saving
818 * is in progress: the saving child is created using fork() that is
819 * implemented with a copy-on-write semantic in most modern systems, so
820 * if we resize the HT while there is the saving child at work actually
821 * a lot of memory movements in the parent will cause a lot of pages
822 * copied. */
823 if (!server.bgsaveinprogress) tryResizeHashTables();
824
825 /* Show information about connected clients */
826 if (!(loops % 5)) {
827 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
828 listLength(server.clients)-listLength(server.slaves),
829 listLength(server.slaves),
830 server.usedmemory,
831 dictSize(server.sharingpool));
832 }
833
834 /* Close connections of timedout clients */
835 if (server.maxidletime && !(loops % 10))
836 closeTimedoutClients();
837
838 /* Check if a background saving in progress terminated */
839 if (server.bgsaveinprogress) {
840 int statloc;
841 /* XXX: TODO handle the case of the saving child killed */
842 if (wait4(-1,&statloc,WNOHANG,NULL)) {
843 int exitcode = WEXITSTATUS(statloc);
844 if (exitcode == 0) {
845 redisLog(REDIS_NOTICE,
846 "Background saving terminated with success");
847 server.dirty = 0;
848 server.lastsave = time(NULL);
849 } else {
850 redisLog(REDIS_WARNING,
851 "Background saving error");
852 }
853 server.bgsaveinprogress = 0;
854 updateSalvesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
855 }
856 } else {
857 /* If there is not a background saving in progress check if
858 * we have to save now */
859 time_t now = time(NULL);
860 for (j = 0; j < server.saveparamslen; j++) {
861 struct saveparam *sp = server.saveparams+j;
862
863 if (server.dirty >= sp->changes &&
864 now-server.lastsave > sp->seconds) {
865 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
866 sp->changes, sp->seconds);
867 rdbSaveBackground(server.dbfilename);
868 break;
869 }
870 }
871 }
872
873 /* Try to expire a few timed out keys */
874 for (j = 0; j < server.dbnum; j++) {
875 redisDb *db = server.db+j;
876 int num = dictSize(db->expires);
877
878 if (num) {
879 time_t now = time(NULL);
880
881 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
882 num = REDIS_EXPIRELOOKUPS_PER_CRON;
883 while (num--) {
884 dictEntry *de;
885 time_t t;
886
887 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
888 t = (time_t) dictGetEntryVal(de);
889 if (now > t) {
890 deleteKey(db,dictGetEntryKey(de));
891 }
892 }
893 }
894 }
895
896 /* Check if we should connect to a MASTER */
897 if (server.replstate == REDIS_REPL_CONNECT) {
898 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
899 if (syncWithMaster() == REDIS_OK) {
900 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
901 }
902 }
903 return 1000;
904 }
905
906 static void createSharedObjects(void) {
907 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
908 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
909 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
910 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
911 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
912 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
913 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
914 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
915 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
916 /* no such key */
917 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
918 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
919 "-ERR Operation against a key holding the wrong kind of value\r\n"));
920 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
921 "-ERR no such key\r\n"));
922 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
923 "-ERR syntax error\r\n"));
924 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
925 "-ERR source and destination objects are the same\r\n"));
926 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
927 "-ERR index out of range\r\n"));
928 shared.space = createObject(REDIS_STRING,sdsnew(" "));
929 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
930 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
931 shared.select0 = createStringObject("select 0\r\n",10);
932 shared.select1 = createStringObject("select 1\r\n",10);
933 shared.select2 = createStringObject("select 2\r\n",10);
934 shared.select3 = createStringObject("select 3\r\n",10);
935 shared.select4 = createStringObject("select 4\r\n",10);
936 shared.select5 = createStringObject("select 5\r\n",10);
937 shared.select6 = createStringObject("select 6\r\n",10);
938 shared.select7 = createStringObject("select 7\r\n",10);
939 shared.select8 = createStringObject("select 8\r\n",10);
940 shared.select9 = createStringObject("select 9\r\n",10);
941 }
942
943 static void appendServerSaveParams(time_t seconds, int changes) {
944 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
945 if (server.saveparams == NULL) oom("appendServerSaveParams");
946 server.saveparams[server.saveparamslen].seconds = seconds;
947 server.saveparams[server.saveparamslen].changes = changes;
948 server.saveparamslen++;
949 }
950
951 static void ResetServerSaveParams() {
952 zfree(server.saveparams);
953 server.saveparams = NULL;
954 server.saveparamslen = 0;
955 }
956
957 static void initServerConfig() {
958 server.dbnum = REDIS_DEFAULT_DBNUM;
959 server.port = REDIS_SERVERPORT;
960 server.verbosity = REDIS_DEBUG;
961 server.maxidletime = REDIS_MAXIDLETIME;
962 server.saveparams = NULL;
963 server.logfile = NULL; /* NULL = log on standard output */
964 server.bindaddr = NULL;
965 server.glueoutputbuf = 1;
966 server.daemonize = 0;
967 server.pidfile = "/var/run/redis.pid";
968 server.dbfilename = "dump.rdb";
969 server.requirepass = NULL;
970 server.shareobjects = 0;
971 server.maxclients = 0;
972 server.maxmemory = 0;
973 ResetServerSaveParams();
974
975 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
976 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
977 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
978 /* Replication related */
979 server.isslave = 0;
980 server.masterhost = NULL;
981 server.masterport = 6379;
982 server.master = NULL;
983 server.replstate = REDIS_REPL_NONE;
984 }
985
986 static void initServer() {
987 int j;
988
989 signal(SIGHUP, SIG_IGN);
990 signal(SIGPIPE, SIG_IGN);
991 setupSigSegvAction();
992
993 server.clients = listCreate();
994 server.slaves = listCreate();
995 server.monitors = listCreate();
996 server.objfreelist = listCreate();
997 createSharedObjects();
998 server.el = aeCreateEventLoop();
999 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1000 server.sharingpool = dictCreate(&setDictType,NULL);
1001 server.sharingpoolsize = 1024;
1002 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
1003 oom("server initialization"); /* Fatal OOM */
1004 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1005 if (server.fd == -1) {
1006 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1007 exit(1);
1008 }
1009 for (j = 0; j < server.dbnum; j++) {
1010 server.db[j].dict = dictCreate(&hashDictType,NULL);
1011 server.db[j].expires = dictCreate(&setDictType,NULL);
1012 server.db[j].id = j;
1013 }
1014 server.cronloops = 0;
1015 server.bgsaveinprogress = 0;
1016 server.lastsave = time(NULL);
1017 server.dirty = 0;
1018 server.usedmemory = 0;
1019 server.stat_numcommands = 0;
1020 server.stat_numconnections = 0;
1021 server.stat_starttime = time(NULL);
1022 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
1023 }
1024
1025 /* Empty the whole database */
1026 static long long emptyDb() {
1027 int j;
1028 long long removed = 0;
1029
1030 for (j = 0; j < server.dbnum; j++) {
1031 removed += dictSize(server.db[j].dict);
1032 dictEmpty(server.db[j].dict);
1033 dictEmpty(server.db[j].expires);
1034 }
1035 return removed;
1036 }
1037
1038 static int yesnotoi(char *s) {
1039 if (!strcasecmp(s,"yes")) return 1;
1040 else if (!strcasecmp(s,"no")) return 0;
1041 else return -1;
1042 }
1043
1044 /* I agree, this is a very rudimental way to load a configuration...
1045 will improve later if the config gets more complex */
1046 static void loadServerConfig(char *filename) {
1047 FILE *fp = fopen(filename,"r");
1048 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1049 int linenum = 0;
1050 sds line = NULL;
1051
1052 if (!fp) {
1053 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1054 exit(1);
1055 }
1056 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1057 sds *argv;
1058 int argc, j;
1059
1060 linenum++;
1061 line = sdsnew(buf);
1062 line = sdstrim(line," \t\r\n");
1063
1064 /* Skip comments and blank lines*/
1065 if (line[0] == '#' || line[0] == '\0') {
1066 sdsfree(line);
1067 continue;
1068 }
1069
1070 /* Split into arguments */
1071 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1072 sdstolower(argv[0]);
1073
1074 /* Execute config directives */
1075 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1076 server.maxidletime = atoi(argv[1]);
1077 if (server.maxidletime < 0) {
1078 err = "Invalid timeout value"; goto loaderr;
1079 }
1080 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1081 server.port = atoi(argv[1]);
1082 if (server.port < 1 || server.port > 65535) {
1083 err = "Invalid port"; goto loaderr;
1084 }
1085 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1086 server.bindaddr = zstrdup(argv[1]);
1087 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1088 int seconds = atoi(argv[1]);
1089 int changes = atoi(argv[2]);
1090 if (seconds < 1 || changes < 0) {
1091 err = "Invalid save parameters"; goto loaderr;
1092 }
1093 appendServerSaveParams(seconds,changes);
1094 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1095 if (chdir(argv[1]) == -1) {
1096 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1097 argv[1], strerror(errno));
1098 exit(1);
1099 }
1100 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1101 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1102 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1103 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1104 else {
1105 err = "Invalid log level. Must be one of debug, notice, warning";
1106 goto loaderr;
1107 }
1108 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1109 FILE *fp;
1110
1111 server.logfile = zstrdup(argv[1]);
1112 if (!strcasecmp(server.logfile,"stdout")) {
1113 zfree(server.logfile);
1114 server.logfile = NULL;
1115 }
1116 if (server.logfile) {
1117 /* Test if we are able to open the file. The server will not
1118 * be able to abort just for this problem later... */
1119 fp = fopen(server.logfile,"a");
1120 if (fp == NULL) {
1121 err = sdscatprintf(sdsempty(),
1122 "Can't open the log file: %s", strerror(errno));
1123 goto loaderr;
1124 }
1125 fclose(fp);
1126 }
1127 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1128 server.dbnum = atoi(argv[1]);
1129 if (server.dbnum < 1) {
1130 err = "Invalid number of databases"; goto loaderr;
1131 }
1132 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1133 server.maxclients = atoi(argv[1]);
1134 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1135 server.maxmemory = atoi(argv[1]);
1136 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1137 server.masterhost = sdsnew(argv[1]);
1138 server.masterport = atoi(argv[2]);
1139 server.replstate = REDIS_REPL_CONNECT;
1140 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1141 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1142 err = "argument must be 'yes' or 'no'"; goto loaderr;
1143 }
1144 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1145 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1146 err = "argument must be 'yes' or 'no'"; goto loaderr;
1147 }
1148 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1149 server.sharingpoolsize = atoi(argv[1]);
1150 if (server.sharingpoolsize < 1) {
1151 err = "invalid object sharing pool size"; goto loaderr;
1152 }
1153 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1154 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1155 err = "argument must be 'yes' or 'no'"; goto loaderr;
1156 }
1157 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1158 server.requirepass = zstrdup(argv[1]);
1159 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1160 server.pidfile = zstrdup(argv[1]);
1161 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1162 server.dbfilename = zstrdup(argv[1]);
1163 } else {
1164 err = "Bad directive or wrong number of arguments"; goto loaderr;
1165 }
1166 for (j = 0; j < argc; j++)
1167 sdsfree(argv[j]);
1168 zfree(argv);
1169 sdsfree(line);
1170 }
1171 fclose(fp);
1172 return;
1173
1174 loaderr:
1175 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1176 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1177 fprintf(stderr, ">>> '%s'\n", line);
1178 fprintf(stderr, "%s\n", err);
1179 exit(1);
1180 }
1181
1182 static void freeClientArgv(redisClient *c) {
1183 int j;
1184
1185 for (j = 0; j < c->argc; j++)
1186 decrRefCount(c->argv[j]);
1187 c->argc = 0;
1188 }
1189
1190 static void freeClient(redisClient *c) {
1191 listNode *ln;
1192
1193 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1194 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1195 sdsfree(c->querybuf);
1196 listRelease(c->reply);
1197 freeClientArgv(c);
1198 close(c->fd);
1199 ln = listSearchKey(server.clients,c);
1200 assert(ln != NULL);
1201 listDelNode(server.clients,ln);
1202 if (c->flags & REDIS_SLAVE) {
1203 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1204 close(c->repldbfd);
1205 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1206 ln = listSearchKey(l,c);
1207 assert(ln != NULL);
1208 listDelNode(l,ln);
1209 }
1210 if (c->flags & REDIS_MASTER) {
1211 server.master = NULL;
1212 server.replstate = REDIS_REPL_CONNECT;
1213 }
1214 zfree(c->argv);
1215 zfree(c);
1216 }
1217
1218 static void glueReplyBuffersIfNeeded(redisClient *c) {
1219 int totlen = 0;
1220 listNode *ln;
1221 robj *o;
1222
1223 listRewind(c->reply);
1224 while((ln = listYield(c->reply))) {
1225 o = ln->value;
1226 totlen += sdslen(o->ptr);
1227 /* This optimization makes more sense if we don't have to copy
1228 * too much data */
1229 if (totlen > 1024) return;
1230 }
1231 if (totlen > 0) {
1232 char buf[1024];
1233 int copylen = 0;
1234
1235 listRewind(c->reply);
1236 while((ln = listYield(c->reply))) {
1237 o = ln->value;
1238 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1239 copylen += sdslen(o->ptr);
1240 listDelNode(c->reply,ln);
1241 }
1242 /* Now the output buffer is empty, add the new single element */
1243 o = createObject(REDIS_STRING,sdsnewlen(buf,totlen));
1244 if (!listAddNodeTail(c->reply,o)) oom("listAddNodeTail");
1245 }
1246 }
1247
1248 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1249 redisClient *c = privdata;
1250 int nwritten = 0, totwritten = 0, objlen;
1251 robj *o;
1252 REDIS_NOTUSED(el);
1253 REDIS_NOTUSED(mask);
1254
1255 if (server.glueoutputbuf && listLength(c->reply) > 1)
1256 glueReplyBuffersIfNeeded(c);
1257 while(listLength(c->reply)) {
1258 o = listNodeValue(listFirst(c->reply));
1259 objlen = sdslen(o->ptr);
1260
1261 if (objlen == 0) {
1262 listDelNode(c->reply,listFirst(c->reply));
1263 continue;
1264 }
1265
1266 if (c->flags & REDIS_MASTER) {
1267 nwritten = objlen - c->sentlen;
1268 } else {
1269 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1270 if (nwritten <= 0) break;
1271 }
1272 c->sentlen += nwritten;
1273 totwritten += nwritten;
1274 /* If we fully sent the object on head go to the next one */
1275 if (c->sentlen == objlen) {
1276 listDelNode(c->reply,listFirst(c->reply));
1277 c->sentlen = 0;
1278 }
1279 }
1280 if (nwritten == -1) {
1281 if (errno == EAGAIN) {
1282 nwritten = 0;
1283 } else {
1284 redisLog(REDIS_DEBUG,
1285 "Error writing to client: %s", strerror(errno));
1286 freeClient(c);
1287 return;
1288 }
1289 }
1290 if (totwritten > 0) c->lastinteraction = time(NULL);
1291 if (listLength(c->reply) == 0) {
1292 c->sentlen = 0;
1293 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1294 }
1295 }
1296
1297 static struct redisCommand *lookupCommand(char *name) {
1298 int j = 0;
1299 while(cmdTable[j].name != NULL) {
1300 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1301 j++;
1302 }
1303 return NULL;
1304 }
1305
1306 /* resetClient prepare the client to process the next command */
1307 static void resetClient(redisClient *c) {
1308 freeClientArgv(c);
1309 c->bulklen = -1;
1310 }
1311
1312 /* If this function gets called we already read a whole
1313 * command, argments are in the client argv/argc fields.
1314 * processCommand() execute the command or prepare the
1315 * server for a bulk read from the client.
1316 *
1317 * If 1 is returned the client is still alive and valid and
1318 * and other operations can be performed by the caller. Otherwise
1319 * if 0 is returned the client was destroied (i.e. after QUIT). */
1320 static int processCommand(redisClient *c) {
1321 struct redisCommand *cmd;
1322 long long dirty;
1323
1324 /* Free some memory if needed (maxmemory setting) */
1325 if (server.maxmemory) freeMemoryIfNeeded();
1326
1327 /* The QUIT command is handled as a special case. Normal command
1328 * procs are unable to close the client connection safely */
1329 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1330 freeClient(c);
1331 return 0;
1332 }
1333 cmd = lookupCommand(c->argv[0]->ptr);
1334 if (!cmd) {
1335 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1336 resetClient(c);
1337 return 1;
1338 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1339 (c->argc < -cmd->arity)) {
1340 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1341 resetClient(c);
1342 return 1;
1343 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1344 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1345 resetClient(c);
1346 return 1;
1347 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1348 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1349
1350 decrRefCount(c->argv[c->argc-1]);
1351 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1352 c->argc--;
1353 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1354 resetClient(c);
1355 return 1;
1356 }
1357 c->argc--;
1358 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1359 /* It is possible that the bulk read is already in the
1360 * buffer. Check this condition and handle it accordingly */
1361 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1362 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1363 c->argc++;
1364 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1365 } else {
1366 return 1;
1367 }
1368 }
1369 /* Let's try to share objects on the command arguments vector */
1370 if (server.shareobjects) {
1371 int j;
1372 for(j = 1; j < c->argc; j++)
1373 c->argv[j] = tryObjectSharing(c->argv[j]);
1374 }
1375 /* Check if the user is authenticated */
1376 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1377 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1378 resetClient(c);
1379 return 1;
1380 }
1381
1382 /* Exec the command */
1383 dirty = server.dirty;
1384 cmd->proc(c);
1385 if (server.dirty-dirty != 0 && listLength(server.slaves))
1386 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1387 if (listLength(server.monitors))
1388 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1389 server.stat_numcommands++;
1390
1391 /* Prepare the client for the next command */
1392 if (c->flags & REDIS_CLOSE) {
1393 freeClient(c);
1394 return 0;
1395 }
1396 resetClient(c);
1397 return 1;
1398 }
1399
1400 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1401 listNode *ln;
1402 int outc = 0, j;
1403 robj **outv;
1404 /* (args*2)+1 is enough room for args, spaces, newlines */
1405 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1406
1407 if (argc <= REDIS_STATIC_ARGS) {
1408 outv = static_outv;
1409 } else {
1410 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1411 if (!outv) oom("replicationFeedSlaves");
1412 }
1413
1414 for (j = 0; j < argc; j++) {
1415 if (j != 0) outv[outc++] = shared.space;
1416 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1417 robj *lenobj;
1418
1419 lenobj = createObject(REDIS_STRING,
1420 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1421 lenobj->refcount = 0;
1422 outv[outc++] = lenobj;
1423 }
1424 outv[outc++] = argv[j];
1425 }
1426 outv[outc++] = shared.crlf;
1427
1428 /* Increment all the refcounts at start and decrement at end in order to
1429 * be sure to free objects if there is no slave in a replication state
1430 * able to be feed with commands */
1431 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1432 listRewind(slaves);
1433 while((ln = listYield(slaves))) {
1434 redisClient *slave = ln->value;
1435
1436 /* Don't feed slaves that are still waiting for BGSAVE to start */
1437 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1438
1439 /* Feed all the other slaves, MONITORs and so on */
1440 if (slave->slaveseldb != dictid) {
1441 robj *selectcmd;
1442
1443 switch(dictid) {
1444 case 0: selectcmd = shared.select0; break;
1445 case 1: selectcmd = shared.select1; break;
1446 case 2: selectcmd = shared.select2; break;
1447 case 3: selectcmd = shared.select3; break;
1448 case 4: selectcmd = shared.select4; break;
1449 case 5: selectcmd = shared.select5; break;
1450 case 6: selectcmd = shared.select6; break;
1451 case 7: selectcmd = shared.select7; break;
1452 case 8: selectcmd = shared.select8; break;
1453 case 9: selectcmd = shared.select9; break;
1454 default:
1455 selectcmd = createObject(REDIS_STRING,
1456 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1457 selectcmd->refcount = 0;
1458 break;
1459 }
1460 addReply(slave,selectcmd);
1461 slave->slaveseldb = dictid;
1462 }
1463 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1464 }
1465 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1466 if (outv != static_outv) zfree(outv);
1467 }
1468
1469 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1470 redisClient *c = (redisClient*) privdata;
1471 char buf[REDIS_IOBUF_LEN];
1472 int nread;
1473 REDIS_NOTUSED(el);
1474 REDIS_NOTUSED(mask);
1475
1476 nread = read(fd, buf, REDIS_IOBUF_LEN);
1477 if (nread == -1) {
1478 if (errno == EAGAIN) {
1479 nread = 0;
1480 } else {
1481 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1482 freeClient(c);
1483 return;
1484 }
1485 } else if (nread == 0) {
1486 redisLog(REDIS_DEBUG, "Client closed connection");
1487 freeClient(c);
1488 return;
1489 }
1490 if (nread) {
1491 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1492 c->lastinteraction = time(NULL);
1493 } else {
1494 return;
1495 }
1496
1497 again:
1498 if (c->bulklen == -1) {
1499 /* Read the first line of the query */
1500 char *p = strchr(c->querybuf,'\n');
1501 size_t querylen;
1502 if (p) {
1503 sds query, *argv;
1504 int argc, j;
1505
1506 query = c->querybuf;
1507 c->querybuf = sdsempty();
1508 querylen = 1+(p-(query));
1509 if (sdslen(query) > querylen) {
1510 /* leave data after the first line of the query in the buffer */
1511 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1512 }
1513 *p = '\0'; /* remove "\n" */
1514 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1515 sdsupdatelen(query);
1516
1517 /* Now we can split the query in arguments */
1518 if (sdslen(query) == 0) {
1519 /* Ignore empty query */
1520 sdsfree(query);
1521 return;
1522 }
1523 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1524 if (argv == NULL) oom("sdssplitlen");
1525 sdsfree(query);
1526
1527 if (c->argv) zfree(c->argv);
1528 c->argv = zmalloc(sizeof(robj*)*argc);
1529 if (c->argv == NULL) oom("allocating arguments list for client");
1530
1531 for (j = 0; j < argc; j++) {
1532 if (sdslen(argv[j])) {
1533 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1534 c->argc++;
1535 } else {
1536 sdsfree(argv[j]);
1537 }
1538 }
1539 zfree(argv);
1540 /* Execute the command. If the client is still valid
1541 * after processCommand() return and there is something
1542 * on the query buffer try to process the next command. */
1543 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1544 return;
1545 } else if (sdslen(c->querybuf) >= 1024*32) {
1546 redisLog(REDIS_DEBUG, "Client protocol error");
1547 freeClient(c);
1548 return;
1549 }
1550 } else {
1551 /* Bulk read handling. Note that if we are at this point
1552 the client already sent a command terminated with a newline,
1553 we are reading the bulk data that is actually the last
1554 argument of the command. */
1555 int qbl = sdslen(c->querybuf);
1556
1557 if (c->bulklen <= qbl) {
1558 /* Copy everything but the final CRLF as final argument */
1559 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1560 c->argc++;
1561 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1562 processCommand(c);
1563 return;
1564 }
1565 }
1566 }
1567
1568 static int selectDb(redisClient *c, int id) {
1569 if (id < 0 || id >= server.dbnum)
1570 return REDIS_ERR;
1571 c->db = &server.db[id];
1572 return REDIS_OK;
1573 }
1574
1575 static void *dupClientReplyValue(void *o) {
1576 incrRefCount((robj*)o);
1577 return 0;
1578 }
1579
1580 static redisClient *createClient(int fd) {
1581 redisClient *c = zmalloc(sizeof(*c));
1582
1583 anetNonBlock(NULL,fd);
1584 anetTcpNoDelay(NULL,fd);
1585 if (!c) return NULL;
1586 selectDb(c,0);
1587 c->fd = fd;
1588 c->querybuf = sdsempty();
1589 c->argc = 0;
1590 c->argv = NULL;
1591 c->bulklen = -1;
1592 c->sentlen = 0;
1593 c->flags = 0;
1594 c->lastinteraction = time(NULL);
1595 c->authenticated = 0;
1596 c->replstate = REDIS_REPL_NONE;
1597 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1598 listSetFreeMethod(c->reply,decrRefCount);
1599 listSetDupMethod(c->reply,dupClientReplyValue);
1600 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1601 readQueryFromClient, c, NULL) == AE_ERR) {
1602 freeClient(c);
1603 return NULL;
1604 }
1605 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1606 return c;
1607 }
1608
1609 static void addReply(redisClient *c, robj *obj) {
1610 if (listLength(c->reply) == 0 &&
1611 (c->replstate == REDIS_REPL_NONE ||
1612 c->replstate == REDIS_REPL_ONLINE) &&
1613 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1614 sendReplyToClient, c, NULL) == AE_ERR) return;
1615 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1616 incrRefCount(obj);
1617 }
1618
1619 static void addReplySds(redisClient *c, sds s) {
1620 robj *o = createObject(REDIS_STRING,s);
1621 addReply(c,o);
1622 decrRefCount(o);
1623 }
1624
1625 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1626 int cport, cfd;
1627 char cip[128];
1628 redisClient *c;
1629 REDIS_NOTUSED(el);
1630 REDIS_NOTUSED(mask);
1631 REDIS_NOTUSED(privdata);
1632
1633 cfd = anetAccept(server.neterr, fd, cip, &cport);
1634 if (cfd == AE_ERR) {
1635 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1636 return;
1637 }
1638 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1639 if ((c = createClient(cfd)) == NULL) {
1640 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1641 close(cfd); /* May be already closed, just ingore errors */
1642 return;
1643 }
1644 /* If maxclient directive is set and this is one client more... close the
1645 * connection. Note that we create the client instead to check before
1646 * for this condition, since now the socket is already set in nonblocking
1647 * mode and we can send an error for free using the Kernel I/O */
1648 if (server.maxclients && listLength(server.clients) > server.maxclients) {
1649 char *err = "-ERR max number of clients reached\r\n";
1650
1651 /* That's a best effort error message, don't check write errors */
1652 (void) write(c->fd,err,strlen(err));
1653 freeClient(c);
1654 return;
1655 }
1656 server.stat_numconnections++;
1657 }
1658
1659 /* ======================= Redis objects implementation ===================== */
1660
1661 static robj *createObject(int type, void *ptr) {
1662 robj *o;
1663
1664 if (listLength(server.objfreelist)) {
1665 listNode *head = listFirst(server.objfreelist);
1666 o = listNodeValue(head);
1667 listDelNode(server.objfreelist,head);
1668 } else {
1669 o = zmalloc(sizeof(*o));
1670 }
1671 if (!o) oom("createObject");
1672 o->type = type;
1673 o->ptr = ptr;
1674 o->refcount = 1;
1675 return o;
1676 }
1677
1678 static robj *createStringObject(char *ptr, size_t len) {
1679 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1680 }
1681
1682 static robj *createListObject(void) {
1683 list *l = listCreate();
1684
1685 if (!l) oom("listCreate");
1686 listSetFreeMethod(l,decrRefCount);
1687 return createObject(REDIS_LIST,l);
1688 }
1689
1690 static robj *createSetObject(void) {
1691 dict *d = dictCreate(&setDictType,NULL);
1692 if (!d) oom("dictCreate");
1693 return createObject(REDIS_SET,d);
1694 }
1695
1696 static void freeStringObject(robj *o) {
1697 sdsfree(o->ptr);
1698 }
1699
1700 static void freeListObject(robj *o) {
1701 listRelease((list*) o->ptr);
1702 }
1703
1704 static void freeSetObject(robj *o) {
1705 dictRelease((dict*) o->ptr);
1706 }
1707
1708 static void freeHashObject(robj *o) {
1709 dictRelease((dict*) o->ptr);
1710 }
1711
1712 static void incrRefCount(robj *o) {
1713 o->refcount++;
1714 #ifdef DEBUG_REFCOUNT
1715 if (o->type == REDIS_STRING)
1716 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1717 #endif
1718 }
1719
1720 static void decrRefCount(void *obj) {
1721 robj *o = obj;
1722
1723 #ifdef DEBUG_REFCOUNT
1724 if (o->type == REDIS_STRING)
1725 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1726 #endif
1727 if (--(o->refcount) == 0) {
1728 switch(o->type) {
1729 case REDIS_STRING: freeStringObject(o); break;
1730 case REDIS_LIST: freeListObject(o); break;
1731 case REDIS_SET: freeSetObject(o); break;
1732 case REDIS_HASH: freeHashObject(o); break;
1733 default: assert(0 != 0); break;
1734 }
1735 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1736 !listAddNodeHead(server.objfreelist,o))
1737 zfree(o);
1738 }
1739 }
1740
1741 /* Try to share an object against the shared objects pool */
1742 static robj *tryObjectSharing(robj *o) {
1743 struct dictEntry *de;
1744 unsigned long c;
1745
1746 if (o == NULL || server.shareobjects == 0) return o;
1747
1748 assert(o->type == REDIS_STRING);
1749 de = dictFind(server.sharingpool,o);
1750 if (de) {
1751 robj *shared = dictGetEntryKey(de);
1752
1753 c = ((unsigned long) dictGetEntryVal(de))+1;
1754 dictGetEntryVal(de) = (void*) c;
1755 incrRefCount(shared);
1756 decrRefCount(o);
1757 return shared;
1758 } else {
1759 /* Here we are using a stream algorihtm: Every time an object is
1760 * shared we increment its count, everytime there is a miss we
1761 * recrement the counter of a random object. If this object reaches
1762 * zero we remove the object and put the current object instead. */
1763 if (dictSize(server.sharingpool) >=
1764 server.sharingpoolsize) {
1765 de = dictGetRandomKey(server.sharingpool);
1766 assert(de != NULL);
1767 c = ((unsigned long) dictGetEntryVal(de))-1;
1768 dictGetEntryVal(de) = (void*) c;
1769 if (c == 0) {
1770 dictDelete(server.sharingpool,de->key);
1771 }
1772 } else {
1773 c = 0; /* If the pool is empty we want to add this object */
1774 }
1775 if (c == 0) {
1776 int retval;
1777
1778 retval = dictAdd(server.sharingpool,o,(void*)1);
1779 assert(retval == DICT_OK);
1780 incrRefCount(o);
1781 }
1782 return o;
1783 }
1784 }
1785
1786 static robj *lookupKey(redisDb *db, robj *key) {
1787 dictEntry *de = dictFind(db->dict,key);
1788 return de ? dictGetEntryVal(de) : NULL;
1789 }
1790
1791 static robj *lookupKeyRead(redisDb *db, robj *key) {
1792 expireIfNeeded(db,key);
1793 return lookupKey(db,key);
1794 }
1795
1796 static robj *lookupKeyWrite(redisDb *db, robj *key) {
1797 deleteIfVolatile(db,key);
1798 return lookupKey(db,key);
1799 }
1800
1801 static int deleteKey(redisDb *db, robj *key) {
1802 int retval;
1803
1804 /* We need to protect key from destruction: after the first dictDelete()
1805 * it may happen that 'key' is no longer valid if we don't increment
1806 * it's count. This may happen when we get the object reference directly
1807 * from the hash table with dictRandomKey() or dict iterators */
1808 incrRefCount(key);
1809 if (dictSize(db->expires)) dictDelete(db->expires,key);
1810 retval = dictDelete(db->dict,key);
1811 decrRefCount(key);
1812
1813 return retval == DICT_OK;
1814 }
1815
1816 /*============================ DB saving/loading ============================ */
1817
1818 static int rdbSaveType(FILE *fp, unsigned char type) {
1819 if (fwrite(&type,1,1,fp) == 0) return -1;
1820 return 0;
1821 }
1822
1823 static int rdbSaveTime(FILE *fp, time_t t) {
1824 int32_t t32 = (int32_t) t;
1825 if (fwrite(&t32,4,1,fp) == 0) return -1;
1826 return 0;
1827 }
1828
1829 /* check rdbLoadLen() comments for more info */
1830 static int rdbSaveLen(FILE *fp, uint32_t len) {
1831 unsigned char buf[2];
1832
1833 if (len < (1<<6)) {
1834 /* Save a 6 bit len */
1835 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1836 if (fwrite(buf,1,1,fp) == 0) return -1;
1837 } else if (len < (1<<14)) {
1838 /* Save a 14 bit len */
1839 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1840 buf[1] = len&0xFF;
1841 if (fwrite(buf,2,1,fp) == 0) return -1;
1842 } else {
1843 /* Save a 32 bit len */
1844 buf[0] = (REDIS_RDB_32BITLEN<<6);
1845 if (fwrite(buf,1,1,fp) == 0) return -1;
1846 len = htonl(len);
1847 if (fwrite(&len,4,1,fp) == 0) return -1;
1848 }
1849 return 0;
1850 }
1851
1852 /* String objects in the form "2391" "-100" without any space and with a
1853 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1854 * encoded as integers to save space */
1855 int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1856 long long value;
1857 char *endptr, buf[32];
1858
1859 /* Check if it's possible to encode this value as a number */
1860 value = strtoll(s, &endptr, 10);
1861 if (endptr[0] != '\0') return 0;
1862 snprintf(buf,32,"%lld",value);
1863
1864 /* If the number converted back into a string is not identical
1865 * then it's not possible to encode the string as integer */
1866 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1867
1868 /* Finally check if it fits in our ranges */
1869 if (value >= -(1<<7) && value <= (1<<7)-1) {
1870 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1871 enc[1] = value&0xFF;
1872 return 2;
1873 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1874 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1875 enc[1] = value&0xFF;
1876 enc[2] = (value>>8)&0xFF;
1877 return 3;
1878 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1879 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1880 enc[1] = value&0xFF;
1881 enc[2] = (value>>8)&0xFF;
1882 enc[3] = (value>>16)&0xFF;
1883 enc[4] = (value>>24)&0xFF;
1884 return 5;
1885 } else {
1886 return 0;
1887 }
1888 }
1889
1890 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1891 unsigned int comprlen, outlen;
1892 unsigned char byte;
1893 void *out;
1894
1895 /* We require at least four bytes compression for this to be worth it */
1896 outlen = sdslen(obj->ptr)-4;
1897 if (outlen <= 0) return 0;
1898 if ((out = zmalloc(outlen+1)) == NULL) return 0;
1899 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1900 if (comprlen == 0) {
1901 zfree(out);
1902 return 0;
1903 }
1904 /* Data compressed! Let's save it on disk */
1905 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1906 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1907 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1908 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1909 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1910 zfree(out);
1911 return comprlen;
1912
1913 writeerr:
1914 zfree(out);
1915 return -1;
1916 }
1917
1918 /* Save a string objet as [len][data] on disk. If the object is a string
1919 * representation of an integer value we try to safe it in a special form */
1920 static int rdbSaveStringObject(FILE *fp, robj *obj) {
1921 size_t len = sdslen(obj->ptr);
1922 int enclen;
1923
1924 /* Try integer encoding */
1925 if (len <= 11) {
1926 unsigned char buf[5];
1927 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1928 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1929 return 0;
1930 }
1931 }
1932
1933 /* Try LZF compression - under 20 bytes it's unable to compress even
1934 * aaaaaaaaaaaaaaaaaa so skip it */
1935 if (1 && len > 20) {
1936 int retval;
1937
1938 retval = rdbSaveLzfStringObject(fp,obj);
1939 if (retval == -1) return -1;
1940 if (retval > 0) return 0;
1941 /* retval == 0 means data can't be compressed, save the old way */
1942 }
1943
1944 /* Store verbatim */
1945 if (rdbSaveLen(fp,len) == -1) return -1;
1946 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1947 return 0;
1948 }
1949
1950 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1951 static int rdbSave(char *filename) {
1952 dictIterator *di = NULL;
1953 dictEntry *de;
1954 FILE *fp;
1955 char tmpfile[256];
1956 int j;
1957 time_t now = time(NULL);
1958
1959 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1960 fp = fopen(tmpfile,"w");
1961 if (!fp) {
1962 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1963 return REDIS_ERR;
1964 }
1965 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1966 for (j = 0; j < server.dbnum; j++) {
1967 redisDb *db = server.db+j;
1968 dict *d = db->dict;
1969 if (dictSize(d) == 0) continue;
1970 di = dictGetIterator(d);
1971 if (!di) {
1972 fclose(fp);
1973 return REDIS_ERR;
1974 }
1975
1976 /* Write the SELECT DB opcode */
1977 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1978 if (rdbSaveLen(fp,j) == -1) goto werr;
1979
1980 /* Iterate this DB writing every entry */
1981 while((de = dictNext(di)) != NULL) {
1982 robj *key = dictGetEntryKey(de);
1983 robj *o = dictGetEntryVal(de);
1984 time_t expiretime = getExpire(db,key);
1985
1986 /* Save the expire time */
1987 if (expiretime != -1) {
1988 /* If this key is already expired skip it */
1989 if (expiretime < now) continue;
1990 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1991 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1992 }
1993 /* Save the key and associated value */
1994 if (rdbSaveType(fp,o->type) == -1) goto werr;
1995 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1996 if (o->type == REDIS_STRING) {
1997 /* Save a string value */
1998 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1999 } else if (o->type == REDIS_LIST) {
2000 /* Save a list value */
2001 list *list = o->ptr;
2002 listNode *ln;
2003
2004 listRewind(list);
2005 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2006 while((ln = listYield(list))) {
2007 robj *eleobj = listNodeValue(ln);
2008
2009 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2010 }
2011 } else if (o->type == REDIS_SET) {
2012 /* Save a set value */
2013 dict *set = o->ptr;
2014 dictIterator *di = dictGetIterator(set);
2015 dictEntry *de;
2016
2017 if (!set) oom("dictGetIteraotr");
2018 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2019 while((de = dictNext(di)) != NULL) {
2020 robj *eleobj = dictGetEntryKey(de);
2021
2022 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2023 }
2024 dictReleaseIterator(di);
2025 } else {
2026 assert(0 != 0);
2027 }
2028 }
2029 dictReleaseIterator(di);
2030 }
2031 /* EOF opcode */
2032 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2033
2034 /* Make sure data will not remain on the OS's output buffers */
2035 fflush(fp);
2036 fsync(fileno(fp));
2037 fclose(fp);
2038
2039 /* Use RENAME to make sure the DB file is changed atomically only
2040 * if the generate DB file is ok. */
2041 if (rename(tmpfile,filename) == -1) {
2042 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
2043 unlink(tmpfile);
2044 return REDIS_ERR;
2045 }
2046 redisLog(REDIS_NOTICE,"DB saved on disk");
2047 server.dirty = 0;
2048 server.lastsave = time(NULL);
2049 return REDIS_OK;
2050
2051 werr:
2052 fclose(fp);
2053 unlink(tmpfile);
2054 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2055 if (di) dictReleaseIterator(di);
2056 return REDIS_ERR;
2057 }
2058
2059 static int rdbSaveBackground(char *filename) {
2060 pid_t childpid;
2061
2062 if (server.bgsaveinprogress) return REDIS_ERR;
2063 if ((childpid = fork()) == 0) {
2064 /* Child */
2065 close(server.fd);
2066 if (rdbSave(filename) == REDIS_OK) {
2067 exit(0);
2068 } else {
2069 exit(1);
2070 }
2071 } else {
2072 /* Parent */
2073 if (childpid == -1) {
2074 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2075 strerror(errno));
2076 return REDIS_ERR;
2077 }
2078 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2079 server.bgsaveinprogress = 1;
2080 return REDIS_OK;
2081 }
2082 return REDIS_OK; /* unreached */
2083 }
2084
2085 static int rdbLoadType(FILE *fp) {
2086 unsigned char type;
2087 if (fread(&type,1,1,fp) == 0) return -1;
2088 return type;
2089 }
2090
2091 static time_t rdbLoadTime(FILE *fp) {
2092 int32_t t32;
2093 if (fread(&t32,4,1,fp) == 0) return -1;
2094 return (time_t) t32;
2095 }
2096
2097 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2098 * of this file for a description of how this are stored on disk.
2099 *
2100 * isencoded is set to 1 if the readed length is not actually a length but
2101 * an "encoding type", check the above comments for more info */
2102 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2103 unsigned char buf[2];
2104 uint32_t len;
2105
2106 if (isencoded) *isencoded = 0;
2107 if (rdbver == 0) {
2108 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2109 return ntohl(len);
2110 } else {
2111 int type;
2112
2113 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2114 type = (buf[0]&0xC0)>>6;
2115 if (type == REDIS_RDB_6BITLEN) {
2116 /* Read a 6 bit len */
2117 return buf[0]&0x3F;
2118 } else if (type == REDIS_RDB_ENCVAL) {
2119 /* Read a 6 bit len encoding type */
2120 if (isencoded) *isencoded = 1;
2121 return buf[0]&0x3F;
2122 } else if (type == REDIS_RDB_14BITLEN) {
2123 /* Read a 14 bit len */
2124 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2125 return ((buf[0]&0x3F)<<8)|buf[1];
2126 } else {
2127 /* Read a 32 bit len */
2128 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2129 return ntohl(len);
2130 }
2131 }
2132 }
2133
2134 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2135 unsigned char enc[4];
2136 long long val;
2137
2138 if (enctype == REDIS_RDB_ENC_INT8) {
2139 if (fread(enc,1,1,fp) == 0) return NULL;
2140 val = (signed char)enc[0];
2141 } else if (enctype == REDIS_RDB_ENC_INT16) {
2142 uint16_t v;
2143 if (fread(enc,2,1,fp) == 0) return NULL;
2144 v = enc[0]|(enc[1]<<8);
2145 val = (int16_t)v;
2146 } else if (enctype == REDIS_RDB_ENC_INT32) {
2147 uint32_t v;
2148 if (fread(enc,4,1,fp) == 0) return NULL;
2149 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2150 val = (int32_t)v;
2151 } else {
2152 val = 0; /* anti-warning */
2153 assert(0!=0);
2154 }
2155 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2156 }
2157
2158 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2159 unsigned int len, clen;
2160 unsigned char *c = NULL;
2161 sds val = NULL;
2162
2163 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2164 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2165 if ((c = zmalloc(clen)) == NULL) goto err;
2166 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2167 if (fread(c,clen,1,fp) == 0) goto err;
2168 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2169 zfree(c);
2170 return createObject(REDIS_STRING,val);
2171 err:
2172 zfree(c);
2173 sdsfree(val);
2174 return NULL;
2175 }
2176
2177 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2178 int isencoded;
2179 uint32_t len;
2180 sds val;
2181
2182 len = rdbLoadLen(fp,rdbver,&isencoded);
2183 if (isencoded) {
2184 switch(len) {
2185 case REDIS_RDB_ENC_INT8:
2186 case REDIS_RDB_ENC_INT16:
2187 case REDIS_RDB_ENC_INT32:
2188 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2189 case REDIS_RDB_ENC_LZF:
2190 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2191 default:
2192 assert(0!=0);
2193 }
2194 }
2195
2196 if (len == REDIS_RDB_LENERR) return NULL;
2197 val = sdsnewlen(NULL,len);
2198 if (len && fread(val,len,1,fp) == 0) {
2199 sdsfree(val);
2200 return NULL;
2201 }
2202 return tryObjectSharing(createObject(REDIS_STRING,val));
2203 }
2204
2205 static int rdbLoad(char *filename) {
2206 FILE *fp;
2207 robj *keyobj = NULL;
2208 uint32_t dbid;
2209 int type, retval, rdbver;
2210 dict *d = server.db[0].dict;
2211 redisDb *db = server.db+0;
2212 char buf[1024];
2213 time_t expiretime = -1, now = time(NULL);
2214
2215 fp = fopen(filename,"r");
2216 if (!fp) return REDIS_ERR;
2217 if (fread(buf,9,1,fp) == 0) goto eoferr;
2218 buf[9] = '\0';
2219 if (memcmp(buf,"REDIS",5) != 0) {
2220 fclose(fp);
2221 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2222 return REDIS_ERR;
2223 }
2224 rdbver = atoi(buf+5);
2225 if (rdbver > 1) {
2226 fclose(fp);
2227 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2228 return REDIS_ERR;
2229 }
2230 while(1) {
2231 robj *o;
2232
2233 /* Read type. */
2234 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2235 if (type == REDIS_EXPIRETIME) {
2236 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2237 /* We read the time so we need to read the object type again */
2238 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2239 }
2240 if (type == REDIS_EOF) break;
2241 /* Handle SELECT DB opcode as a special case */
2242 if (type == REDIS_SELECTDB) {
2243 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2244 goto eoferr;
2245 if (dbid >= (unsigned)server.dbnum) {
2246 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2247 exit(1);
2248 }
2249 db = server.db+dbid;
2250 d = db->dict;
2251 continue;
2252 }
2253 /* Read key */
2254 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2255
2256 if (type == REDIS_STRING) {
2257 /* Read string value */
2258 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2259 } else if (type == REDIS_LIST || type == REDIS_SET) {
2260 /* Read list/set value */
2261 uint32_t listlen;
2262
2263 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2264 goto eoferr;
2265 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2266 /* Load every single element of the list/set */
2267 while(listlen--) {
2268 robj *ele;
2269
2270 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2271 if (type == REDIS_LIST) {
2272 if (!listAddNodeTail((list*)o->ptr,ele))
2273 oom("listAddNodeTail");
2274 } else {
2275 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2276 oom("dictAdd");
2277 }
2278 }
2279 } else {
2280 assert(0 != 0);
2281 }
2282 /* Add the new object in the hash table */
2283 retval = dictAdd(d,keyobj,o);
2284 if (retval == DICT_ERR) {
2285 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2286 exit(1);
2287 }
2288 /* Set the expire time if needed */
2289 if (expiretime != -1) {
2290 setExpire(db,keyobj,expiretime);
2291 /* Delete this key if already expired */
2292 if (expiretime < now) deleteKey(db,keyobj);
2293 expiretime = -1;
2294 }
2295 keyobj = o = NULL;
2296 }
2297 fclose(fp);
2298 return REDIS_OK;
2299
2300 eoferr: /* unexpected end of file is handled here with a fatal exit */
2301 if (keyobj) decrRefCount(keyobj);
2302 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2303 exit(1);
2304 return REDIS_ERR; /* Just to avoid warning */
2305 }
2306
2307 /*================================== Commands =============================== */
2308
2309 static void authCommand(redisClient *c) {
2310 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2311 c->authenticated = 1;
2312 addReply(c,shared.ok);
2313 } else {
2314 c->authenticated = 0;
2315 addReply(c,shared.err);
2316 }
2317 }
2318
2319 static void pingCommand(redisClient *c) {
2320 addReply(c,shared.pong);
2321 }
2322
2323 static void echoCommand(redisClient *c) {
2324 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2325 (int)sdslen(c->argv[1]->ptr)));
2326 addReply(c,c->argv[1]);
2327 addReply(c,shared.crlf);
2328 }
2329
2330 /*=================================== Strings =============================== */
2331
2332 static void setGenericCommand(redisClient *c, int nx) {
2333 int retval;
2334
2335 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2336 if (retval == DICT_ERR) {
2337 if (!nx) {
2338 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2339 incrRefCount(c->argv[2]);
2340 } else {
2341 addReply(c,shared.czero);
2342 return;
2343 }
2344 } else {
2345 incrRefCount(c->argv[1]);
2346 incrRefCount(c->argv[2]);
2347 }
2348 server.dirty++;
2349 removeExpire(c->db,c->argv[1]);
2350 addReply(c, nx ? shared.cone : shared.ok);
2351 }
2352
2353 static void setCommand(redisClient *c) {
2354 setGenericCommand(c,0);
2355 }
2356
2357 static void setnxCommand(redisClient *c) {
2358 setGenericCommand(c,1);
2359 }
2360
2361 static void getCommand(redisClient *c) {
2362 robj *o = lookupKeyRead(c->db,c->argv[1]);
2363
2364 if (o == NULL) {
2365 addReply(c,shared.nullbulk);
2366 } else {
2367 if (o->type != REDIS_STRING) {
2368 addReply(c,shared.wrongtypeerr);
2369 } else {
2370 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2371 addReply(c,o);
2372 addReply(c,shared.crlf);
2373 }
2374 }
2375 }
2376
2377 static void getSetCommand(redisClient *c) {
2378 getCommand(c);
2379 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
2380 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2381 } else {
2382 incrRefCount(c->argv[1]);
2383 }
2384 incrRefCount(c->argv[2]);
2385 server.dirty++;
2386 removeExpire(c->db,c->argv[1]);
2387 }
2388
2389 static void mgetCommand(redisClient *c) {
2390 int j;
2391
2392 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2393 for (j = 1; j < c->argc; j++) {
2394 robj *o = lookupKeyRead(c->db,c->argv[j]);
2395 if (o == NULL) {
2396 addReply(c,shared.nullbulk);
2397 } else {
2398 if (o->type != REDIS_STRING) {
2399 addReply(c,shared.nullbulk);
2400 } else {
2401 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2402 addReply(c,o);
2403 addReply(c,shared.crlf);
2404 }
2405 }
2406 }
2407 }
2408
2409 static void incrDecrCommand(redisClient *c, long long incr) {
2410 long long value;
2411 int retval;
2412 robj *o;
2413
2414 o = lookupKeyWrite(c->db,c->argv[1]);
2415 if (o == NULL) {
2416 value = 0;
2417 } else {
2418 if (o->type != REDIS_STRING) {
2419 value = 0;
2420 } else {
2421 char *eptr;
2422
2423 value = strtoll(o->ptr, &eptr, 10);
2424 }
2425 }
2426
2427 value += incr;
2428 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2429 retval = dictAdd(c->db->dict,c->argv[1],o);
2430 if (retval == DICT_ERR) {
2431 dictReplace(c->db->dict,c->argv[1],o);
2432 removeExpire(c->db,c->argv[1]);
2433 } else {
2434 incrRefCount(c->argv[1]);
2435 }
2436 server.dirty++;
2437 addReply(c,shared.colon);
2438 addReply(c,o);
2439 addReply(c,shared.crlf);
2440 }
2441
2442 static void incrCommand(redisClient *c) {
2443 incrDecrCommand(c,1);
2444 }
2445
2446 static void decrCommand(redisClient *c) {
2447 incrDecrCommand(c,-1);
2448 }
2449
2450 static void incrbyCommand(redisClient *c) {
2451 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2452 incrDecrCommand(c,incr);
2453 }
2454
2455 static void decrbyCommand(redisClient *c) {
2456 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
2457 incrDecrCommand(c,-incr);
2458 }
2459
2460 /* ========================= Type agnostic commands ========================= */
2461
2462 static void delCommand(redisClient *c) {
2463 int deleted = 0, j;
2464
2465 for (j = 1; j < c->argc; j++) {
2466 if (deleteKey(c->db,c->argv[j])) {
2467 server.dirty++;
2468 deleted++;
2469 }
2470 }
2471 switch(deleted) {
2472 case 0:
2473 addReply(c,shared.czero);
2474 break;
2475 case 1:
2476 addReply(c,shared.cone);
2477 break;
2478 default:
2479 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
2480 break;
2481 }
2482 }
2483
2484 static void existsCommand(redisClient *c) {
2485 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2486 }
2487
2488 static void selectCommand(redisClient *c) {
2489 int id = atoi(c->argv[1]->ptr);
2490
2491 if (selectDb(c,id) == REDIS_ERR) {
2492 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2493 } else {
2494 addReply(c,shared.ok);
2495 }
2496 }
2497
2498 static void randomkeyCommand(redisClient *c) {
2499 dictEntry *de;
2500
2501 while(1) {
2502 de = dictGetRandomKey(c->db->dict);
2503 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2504 }
2505 if (de == NULL) {
2506 addReply(c,shared.plus);
2507 addReply(c,shared.crlf);
2508 } else {
2509 addReply(c,shared.plus);
2510 addReply(c,dictGetEntryKey(de));
2511 addReply(c,shared.crlf);
2512 }
2513 }
2514
2515 static void keysCommand(redisClient *c) {
2516 dictIterator *di;
2517 dictEntry *de;
2518 sds pattern = c->argv[1]->ptr;
2519 int plen = sdslen(pattern);
2520 int numkeys = 0, keyslen = 0;
2521 robj *lenobj = createObject(REDIS_STRING,NULL);
2522
2523 di = dictGetIterator(c->db->dict);
2524 if (!di) oom("dictGetIterator");
2525 addReply(c,lenobj);
2526 decrRefCount(lenobj);
2527 while((de = dictNext(di)) != NULL) {
2528 robj *keyobj = dictGetEntryKey(de);
2529
2530 sds key = keyobj->ptr;
2531 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2532 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2533 if (expireIfNeeded(c->db,keyobj) == 0) {
2534 if (numkeys != 0)
2535 addReply(c,shared.space);
2536 addReply(c,keyobj);
2537 numkeys++;
2538 keyslen += sdslen(key);
2539 }
2540 }
2541 }
2542 dictReleaseIterator(di);
2543 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2544 addReply(c,shared.crlf);
2545 }
2546
2547 static void dbsizeCommand(redisClient *c) {
2548 addReplySds(c,
2549 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2550 }
2551
2552 static void lastsaveCommand(redisClient *c) {
2553 addReplySds(c,
2554 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2555 }
2556
2557 static void typeCommand(redisClient *c) {
2558 robj *o;
2559 char *type;
2560
2561 o = lookupKeyRead(c->db,c->argv[1]);
2562 if (o == NULL) {
2563 type = "+none";
2564 } else {
2565 switch(o->type) {
2566 case REDIS_STRING: type = "+string"; break;
2567 case REDIS_LIST: type = "+list"; break;
2568 case REDIS_SET: type = "+set"; break;
2569 default: type = "unknown"; break;
2570 }
2571 }
2572 addReplySds(c,sdsnew(type));
2573 addReply(c,shared.crlf);
2574 }
2575
2576 static void saveCommand(redisClient *c) {
2577 if (server.bgsaveinprogress) {
2578 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2579 return;
2580 }
2581 if (rdbSave(server.dbfilename) == REDIS_OK) {
2582 addReply(c,shared.ok);
2583 } else {
2584 addReply(c,shared.err);
2585 }
2586 }
2587
2588 static void bgsaveCommand(redisClient *c) {
2589 if (server.bgsaveinprogress) {
2590 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2591 return;
2592 }
2593 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2594 addReply(c,shared.ok);
2595 } else {
2596 addReply(c,shared.err);
2597 }
2598 }
2599
2600 static void shutdownCommand(redisClient *c) {
2601 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2602 /* XXX: TODO kill the child if there is a bgsave in progress */
2603 if (rdbSave(server.dbfilename) == REDIS_OK) {
2604 if (server.daemonize) {
2605 unlink(server.pidfile);
2606 }
2607 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
2608 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2609 exit(1);
2610 } else {
2611 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2612 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2613 }
2614 }
2615
2616 static void renameGenericCommand(redisClient *c, int nx) {
2617 robj *o;
2618
2619 /* To use the same key as src and dst is probably an error */
2620 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2621 addReply(c,shared.sameobjecterr);
2622 return;
2623 }
2624
2625 o = lookupKeyWrite(c->db,c->argv[1]);
2626 if (o == NULL) {
2627 addReply(c,shared.nokeyerr);
2628 return;
2629 }
2630 incrRefCount(o);
2631 deleteIfVolatile(c->db,c->argv[2]);
2632 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2633 if (nx) {
2634 decrRefCount(o);
2635 addReply(c,shared.czero);
2636 return;
2637 }
2638 dictReplace(c->db->dict,c->argv[2],o);
2639 } else {
2640 incrRefCount(c->argv[2]);
2641 }
2642 deleteKey(c->db,c->argv[1]);
2643 server.dirty++;
2644 addReply(c,nx ? shared.cone : shared.ok);
2645 }
2646
2647 static void renameCommand(redisClient *c) {
2648 renameGenericCommand(c,0);
2649 }
2650
2651 static void renamenxCommand(redisClient *c) {
2652 renameGenericCommand(c,1);
2653 }
2654
2655 static void moveCommand(redisClient *c) {
2656 robj *o;
2657 redisDb *src, *dst;
2658 int srcid;
2659
2660 /* Obtain source and target DB pointers */
2661 src = c->db;
2662 srcid = c->db->id;
2663 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2664 addReply(c,shared.outofrangeerr);
2665 return;
2666 }
2667 dst = c->db;
2668 selectDb(c,srcid); /* Back to the source DB */
2669
2670 /* If the user is moving using as target the same
2671 * DB as the source DB it is probably an error. */
2672 if (src == dst) {
2673 addReply(c,shared.sameobjecterr);
2674 return;
2675 }
2676
2677 /* Check if the element exists and get a reference */
2678 o = lookupKeyWrite(c->db,c->argv[1]);
2679 if (!o) {
2680 addReply(c,shared.czero);
2681 return;
2682 }
2683
2684 /* Try to add the element to the target DB */
2685 deleteIfVolatile(dst,c->argv[1]);
2686 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2687 addReply(c,shared.czero);
2688 return;
2689 }
2690 incrRefCount(c->argv[1]);
2691 incrRefCount(o);
2692
2693 /* OK! key moved, free the entry in the source DB */
2694 deleteKey(src,c->argv[1]);
2695 server.dirty++;
2696 addReply(c,shared.cone);
2697 }
2698
2699 /* =================================== Lists ================================ */
2700 static void pushGenericCommand(redisClient *c, int where) {
2701 robj *lobj;
2702 list *list;
2703
2704 lobj = lookupKeyWrite(c->db,c->argv[1]);
2705 if (lobj == NULL) {
2706 lobj = createListObject();
2707 list = lobj->ptr;
2708 if (where == REDIS_HEAD) {
2709 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2710 } else {
2711 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2712 }
2713 dictAdd(c->db->dict,c->argv[1],lobj);
2714 incrRefCount(c->argv[1]);
2715 incrRefCount(c->argv[2]);
2716 } else {
2717 if (lobj->type != REDIS_LIST) {
2718 addReply(c,shared.wrongtypeerr);
2719 return;
2720 }
2721 list = lobj->ptr;
2722 if (where == REDIS_HEAD) {
2723 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2724 } else {
2725 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2726 }
2727 incrRefCount(c->argv[2]);
2728 }
2729 server.dirty++;
2730 addReply(c,shared.ok);
2731 }
2732
2733 static void lpushCommand(redisClient *c) {
2734 pushGenericCommand(c,REDIS_HEAD);
2735 }
2736
2737 static void rpushCommand(redisClient *c) {
2738 pushGenericCommand(c,REDIS_TAIL);
2739 }
2740
2741 static void llenCommand(redisClient *c) {
2742 robj *o;
2743 list *l;
2744
2745 o = lookupKeyRead(c->db,c->argv[1]);
2746 if (o == NULL) {
2747 addReply(c,shared.czero);
2748 return;
2749 } else {
2750 if (o->type != REDIS_LIST) {
2751 addReply(c,shared.wrongtypeerr);
2752 } else {
2753 l = o->ptr;
2754 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2755 }
2756 }
2757 }
2758
2759 static void lindexCommand(redisClient *c) {
2760 robj *o;
2761 int index = atoi(c->argv[2]->ptr);
2762
2763 o = lookupKeyRead(c->db,c->argv[1]);
2764 if (o == NULL) {
2765 addReply(c,shared.nullbulk);
2766 } else {
2767 if (o->type != REDIS_LIST) {
2768 addReply(c,shared.wrongtypeerr);
2769 } else {
2770 list *list = o->ptr;
2771 listNode *ln;
2772
2773 ln = listIndex(list, index);
2774 if (ln == NULL) {
2775 addReply(c,shared.nullbulk);
2776 } else {
2777 robj *ele = listNodeValue(ln);
2778 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2779 addReply(c,ele);
2780 addReply(c,shared.crlf);
2781 }
2782 }
2783 }
2784 }
2785
2786 static void lsetCommand(redisClient *c) {
2787 robj *o;
2788 int index = atoi(c->argv[2]->ptr);
2789
2790 o = lookupKeyWrite(c->db,c->argv[1]);
2791 if (o == NULL) {
2792 addReply(c,shared.nokeyerr);
2793 } else {
2794 if (o->type != REDIS_LIST) {
2795 addReply(c,shared.wrongtypeerr);
2796 } else {
2797 list *list = o->ptr;
2798 listNode *ln;
2799
2800 ln = listIndex(list, index);
2801 if (ln == NULL) {
2802 addReply(c,shared.outofrangeerr);
2803 } else {
2804 robj *ele = listNodeValue(ln);
2805
2806 decrRefCount(ele);
2807 listNodeValue(ln) = c->argv[3];
2808 incrRefCount(c->argv[3]);
2809 addReply(c,shared.ok);
2810 server.dirty++;
2811 }
2812 }
2813 }
2814 }
2815
2816 static void popGenericCommand(redisClient *c, int where) {
2817 robj *o;
2818
2819 o = lookupKeyWrite(c->db,c->argv[1]);
2820 if (o == NULL) {
2821 addReply(c,shared.nullbulk);
2822 } else {
2823 if (o->type != REDIS_LIST) {
2824 addReply(c,shared.wrongtypeerr);
2825 } else {
2826 list *list = o->ptr;
2827 listNode *ln;
2828
2829 if (where == REDIS_HEAD)
2830 ln = listFirst(list);
2831 else
2832 ln = listLast(list);
2833
2834 if (ln == NULL) {
2835 addReply(c,shared.nullbulk);
2836 } else {
2837 robj *ele = listNodeValue(ln);
2838 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2839 addReply(c,ele);
2840 addReply(c,shared.crlf);
2841 listDelNode(list,ln);
2842 server.dirty++;
2843 }
2844 }
2845 }
2846 }
2847
2848 static void lpopCommand(redisClient *c) {
2849 popGenericCommand(c,REDIS_HEAD);
2850 }
2851
2852 static void rpopCommand(redisClient *c) {
2853 popGenericCommand(c,REDIS_TAIL);
2854 }
2855
2856 static void lrangeCommand(redisClient *c) {
2857 robj *o;
2858 int start = atoi(c->argv[2]->ptr);
2859 int end = atoi(c->argv[3]->ptr);
2860
2861 o = lookupKeyRead(c->db,c->argv[1]);
2862 if (o == NULL) {
2863 addReply(c,shared.nullmultibulk);
2864 } else {
2865 if (o->type != REDIS_LIST) {
2866 addReply(c,shared.wrongtypeerr);
2867 } else {
2868 list *list = o->ptr;
2869 listNode *ln;
2870 int llen = listLength(list);
2871 int rangelen, j;
2872 robj *ele;
2873
2874 /* convert negative indexes */
2875 if (start < 0) start = llen+start;
2876 if (end < 0) end = llen+end;
2877 if (start < 0) start = 0;
2878 if (end < 0) end = 0;
2879
2880 /* indexes sanity checks */
2881 if (start > end || start >= llen) {
2882 /* Out of range start or start > end result in empty list */
2883 addReply(c,shared.emptymultibulk);
2884 return;
2885 }
2886 if (end >= llen) end = llen-1;
2887 rangelen = (end-start)+1;
2888
2889 /* Return the result in form of a multi-bulk reply */
2890 ln = listIndex(list, start);
2891 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2892 for (j = 0; j < rangelen; j++) {
2893 ele = listNodeValue(ln);
2894 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2895 addReply(c,ele);
2896 addReply(c,shared.crlf);
2897 ln = ln->next;
2898 }
2899 }
2900 }
2901 }
2902
2903 static void ltrimCommand(redisClient *c) {
2904 robj *o;
2905 int start = atoi(c->argv[2]->ptr);
2906 int end = atoi(c->argv[3]->ptr);
2907
2908 o = lookupKeyWrite(c->db,c->argv[1]);
2909 if (o == NULL) {
2910 addReply(c,shared.nokeyerr);
2911 } else {
2912 if (o->type != REDIS_LIST) {
2913 addReply(c,shared.wrongtypeerr);
2914 } else {
2915 list *list = o->ptr;
2916 listNode *ln;
2917 int llen = listLength(list);
2918 int j, ltrim, rtrim;
2919
2920 /* convert negative indexes */
2921 if (start < 0) start = llen+start;
2922 if (end < 0) end = llen+end;
2923 if (start < 0) start = 0;
2924 if (end < 0) end = 0;
2925
2926 /* indexes sanity checks */
2927 if (start > end || start >= llen) {
2928 /* Out of range start or start > end result in empty list */
2929 ltrim = llen;
2930 rtrim = 0;
2931 } else {
2932 if (end >= llen) end = llen-1;
2933 ltrim = start;
2934 rtrim = llen-end-1;
2935 }
2936
2937 /* Remove list elements to perform the trim */
2938 for (j = 0; j < ltrim; j++) {
2939 ln = listFirst(list);
2940 listDelNode(list,ln);
2941 }
2942 for (j = 0; j < rtrim; j++) {
2943 ln = listLast(list);
2944 listDelNode(list,ln);
2945 }
2946 addReply(c,shared.ok);
2947 server.dirty++;
2948 }
2949 }
2950 }
2951
2952 static void lremCommand(redisClient *c) {
2953 robj *o;
2954
2955 o = lookupKeyWrite(c->db,c->argv[1]);
2956 if (o == NULL) {
2957 addReply(c,shared.nokeyerr);
2958 } else {
2959 if (o->type != REDIS_LIST) {
2960 addReply(c,shared.wrongtypeerr);
2961 } else {
2962 list *list = o->ptr;
2963 listNode *ln, *next;
2964 int toremove = atoi(c->argv[2]->ptr);
2965 int removed = 0;
2966 int fromtail = 0;
2967
2968 if (toremove < 0) {
2969 toremove = -toremove;
2970 fromtail = 1;
2971 }
2972 ln = fromtail ? list->tail : list->head;
2973 while (ln) {
2974 robj *ele = listNodeValue(ln);
2975
2976 next = fromtail ? ln->prev : ln->next;
2977 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2978 listDelNode(list,ln);
2979 server.dirty++;
2980 removed++;
2981 if (toremove && removed == toremove) break;
2982 }
2983 ln = next;
2984 }
2985 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2986 }
2987 }
2988 }
2989
2990 /* ==================================== Sets ================================ */
2991
2992 static void saddCommand(redisClient *c) {
2993 robj *set;
2994
2995 set = lookupKeyWrite(c->db,c->argv[1]);
2996 if (set == NULL) {
2997 set = createSetObject();
2998 dictAdd(c->db->dict,c->argv[1],set);
2999 incrRefCount(c->argv[1]);
3000 } else {
3001 if (set->type != REDIS_SET) {
3002 addReply(c,shared.wrongtypeerr);
3003 return;
3004 }
3005 }
3006 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3007 incrRefCount(c->argv[2]);
3008 server.dirty++;
3009 addReply(c,shared.cone);
3010 } else {
3011 addReply(c,shared.czero);
3012 }
3013 }
3014
3015 static void sremCommand(redisClient *c) {
3016 robj *set;
3017
3018 set = lookupKeyWrite(c->db,c->argv[1]);
3019 if (set == NULL) {
3020 addReply(c,shared.czero);
3021 } else {
3022 if (set->type != REDIS_SET) {
3023 addReply(c,shared.wrongtypeerr);
3024 return;
3025 }
3026 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3027 server.dirty++;
3028 addReply(c,shared.cone);
3029 } else {
3030 addReply(c,shared.czero);
3031 }
3032 }
3033 }
3034
3035 static void smoveCommand(redisClient *c) {
3036 robj *srcset, *dstset;
3037
3038 srcset = lookupKeyWrite(c->db,c->argv[1]);
3039 dstset = lookupKeyWrite(c->db,c->argv[2]);
3040
3041 /* If the source key does not exist return 0, if it's of the wrong type
3042 * raise an error */
3043 if (srcset == NULL || srcset->type != REDIS_SET) {
3044 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3045 return;
3046 }
3047 /* Error if the destination key is not a set as well */
3048 if (dstset && dstset->type != REDIS_SET) {
3049 addReply(c,shared.wrongtypeerr);
3050 return;
3051 }
3052 /* Remove the element from the source set */
3053 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3054 /* Key not found in the src set! return zero */
3055 addReply(c,shared.czero);
3056 return;
3057 }
3058 server.dirty++;
3059 /* Add the element to the destination set */
3060 if (!dstset) {
3061 dstset = createSetObject();
3062 dictAdd(c->db->dict,c->argv[2],dstset);
3063 incrRefCount(c->argv[2]);
3064 }
3065 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3066 incrRefCount(c->argv[3]);
3067 addReply(c,shared.cone);
3068 }
3069
3070 static void sismemberCommand(redisClient *c) {
3071 robj *set;
3072
3073 set = lookupKeyRead(c->db,c->argv[1]);
3074 if (set == NULL) {
3075 addReply(c,shared.czero);
3076 } else {
3077 if (set->type != REDIS_SET) {
3078 addReply(c,shared.wrongtypeerr);
3079 return;
3080 }
3081 if (dictFind(set->ptr,c->argv[2]))
3082 addReply(c,shared.cone);
3083 else
3084 addReply(c,shared.czero);
3085 }
3086 }
3087
3088 static void scardCommand(redisClient *c) {
3089 robj *o;
3090 dict *s;
3091
3092 o = lookupKeyRead(c->db,c->argv[1]);
3093 if (o == NULL) {
3094 addReply(c,shared.czero);
3095 return;
3096 } else {
3097 if (o->type != REDIS_SET) {
3098 addReply(c,shared.wrongtypeerr);
3099 } else {
3100 s = o->ptr;
3101 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3102 dictSize(s)));
3103 }
3104 }
3105 }
3106
3107 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3108 dict **d1 = (void*) s1, **d2 = (void*) s2;
3109
3110 return dictSize(*d1)-dictSize(*d2);
3111 }
3112
3113 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3114 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3115 dictIterator *di;
3116 dictEntry *de;
3117 robj *lenobj = NULL, *dstset = NULL;
3118 int j, cardinality = 0;
3119
3120 if (!dv) oom("sinterGenericCommand");
3121 for (j = 0; j < setsnum; j++) {
3122 robj *setobj;
3123
3124 setobj = dstkey ?
3125 lookupKeyWrite(c->db,setskeys[j]) :
3126 lookupKeyRead(c->db,setskeys[j]);
3127 if (!setobj) {
3128 zfree(dv);
3129 if (dstkey) {
3130 deleteKey(c->db,dstkey);
3131 addReply(c,shared.ok);
3132 } else {
3133 addReply(c,shared.nullmultibulk);
3134 }
3135 return;
3136 }
3137 if (setobj->type != REDIS_SET) {
3138 zfree(dv);
3139 addReply(c,shared.wrongtypeerr);
3140 return;
3141 }
3142 dv[j] = setobj->ptr;
3143 }
3144 /* Sort sets from the smallest to largest, this will improve our
3145 * algorithm's performace */
3146 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3147
3148 /* The first thing we should output is the total number of elements...
3149 * since this is a multi-bulk write, but at this stage we don't know
3150 * the intersection set size, so we use a trick, append an empty object
3151 * to the output list and save the pointer to later modify it with the
3152 * right length */
3153 if (!dstkey) {
3154 lenobj = createObject(REDIS_STRING,NULL);
3155 addReply(c,lenobj);
3156 decrRefCount(lenobj);
3157 } else {
3158 /* If we have a target key where to store the resulting set
3159 * create this key with an empty set inside */
3160 dstset = createSetObject();
3161 }
3162
3163 /* Iterate all the elements of the first (smallest) set, and test
3164 * the element against all the other sets, if at least one set does
3165 * not include the element it is discarded */
3166 di = dictGetIterator(dv[0]);
3167 if (!di) oom("dictGetIterator");
3168
3169 while((de = dictNext(di)) != NULL) {
3170 robj *ele;
3171
3172 for (j = 1; j < setsnum; j++)
3173 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3174 if (j != setsnum)
3175 continue; /* at least one set does not contain the member */
3176 ele = dictGetEntryKey(de);
3177 if (!dstkey) {
3178 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
3179 addReply(c,ele);
3180 addReply(c,shared.crlf);
3181 cardinality++;
3182 } else {
3183 dictAdd(dstset->ptr,ele,NULL);
3184 incrRefCount(ele);
3185 }
3186 }
3187 dictReleaseIterator(di);
3188
3189 if (dstkey) {
3190 /* Store the resulting set into the target */
3191 deleteKey(c->db,dstkey);
3192 dictAdd(c->db->dict,dstkey,dstset);
3193 incrRefCount(dstkey);
3194 }
3195
3196 if (!dstkey) {
3197 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3198 } else {
3199 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3200 dictSize((dict*)dstset->ptr)));
3201 server.dirty++;
3202 }
3203 zfree(dv);
3204 }
3205
3206 static void sinterCommand(redisClient *c) {
3207 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
3208 }
3209
3210 static void sinterstoreCommand(redisClient *c) {
3211 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3212 }
3213
3214 #define REDIS_OP_UNION 0
3215 #define REDIS_OP_DIFF 1
3216
3217 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
3218 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3219 dictIterator *di;
3220 dictEntry *de;
3221 robj *dstset = NULL;
3222 int j, cardinality = 0;
3223
3224 if (!dv) oom("sunionDiffGenericCommand");
3225 for (j = 0; j < setsnum; j++) {
3226 robj *setobj;
3227
3228 setobj = dstkey ?
3229 lookupKeyWrite(c->db,setskeys[j]) :
3230 lookupKeyRead(c->db,setskeys[j]);
3231 if (!setobj) {
3232 dv[j] = NULL;
3233 continue;
3234 }
3235 if (setobj->type != REDIS_SET) {
3236 zfree(dv);
3237 addReply(c,shared.wrongtypeerr);
3238 return;
3239 }
3240 dv[j] = setobj->ptr;
3241 }
3242
3243 /* We need a temp set object to store our union. If the dstkey
3244 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
3245 * this set object will be the resulting object to set into the target key*/
3246 dstset = createSetObject();
3247
3248 /* Iterate all the elements of all the sets, add every element a single
3249 * time to the result set */
3250 for (j = 0; j < setsnum; j++) {
3251 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
3252 if (!dv[j]) continue; /* non existing keys are like empty sets */
3253
3254 di = dictGetIterator(dv[j]);
3255 if (!di) oom("dictGetIterator");
3256
3257 while((de = dictNext(di)) != NULL) {
3258 robj *ele;
3259
3260 /* dictAdd will not add the same element multiple times */
3261 ele = dictGetEntryKey(de);
3262 if (op == REDIS_OP_UNION || j == 0) {
3263 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3264 incrRefCount(ele);
3265 cardinality++;
3266 }
3267 } else if (op == REDIS_OP_DIFF) {
3268 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
3269 cardinality--;
3270 }
3271 }
3272 }
3273 dictReleaseIterator(di);
3274
3275 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
3276 }
3277
3278 /* Output the content of the resulting set, if not in STORE mode */
3279 if (!dstkey) {
3280 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
3281 di = dictGetIterator(dstset->ptr);
3282 if (!di) oom("dictGetIterator");
3283 while((de = dictNext(di)) != NULL) {
3284 robj *ele;
3285
3286 ele = dictGetEntryKey(de);
3287 addReplySds(c,sdscatprintf(sdsempty(),
3288 "$%d\r\n",sdslen(ele->ptr)));
3289 addReply(c,ele);
3290 addReply(c,shared.crlf);
3291 }
3292 dictReleaseIterator(di);
3293 } else {
3294 /* If we have a target key where to store the resulting set
3295 * create this key with the result set inside */
3296 deleteKey(c->db,dstkey);
3297 dictAdd(c->db->dict,dstkey,dstset);
3298 incrRefCount(dstkey);
3299 }
3300
3301 /* Cleanup */
3302 if (!dstkey) {
3303 decrRefCount(dstset);
3304 } else {
3305 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3306 dictSize((dict*)dstset->ptr)));
3307 server.dirty++;
3308 }
3309 zfree(dv);
3310 }
3311
3312 static void sunionCommand(redisClient *c) {
3313 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
3314 }
3315
3316 static void sunionstoreCommand(redisClient *c) {
3317 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
3318 }
3319
3320 static void sdiffCommand(redisClient *c) {
3321 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
3322 }
3323
3324 static void sdiffstoreCommand(redisClient *c) {
3325 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
3326 }
3327
3328 static void flushdbCommand(redisClient *c) {
3329 server.dirty += dictSize(c->db->dict);
3330 dictEmpty(c->db->dict);
3331 dictEmpty(c->db->expires);
3332 addReply(c,shared.ok);
3333 }
3334
3335 static void flushallCommand(redisClient *c) {
3336 server.dirty += emptyDb();
3337 addReply(c,shared.ok);
3338 rdbSave(server.dbfilename);
3339 server.dirty++;
3340 }
3341
3342 redisSortOperation *createSortOperation(int type, robj *pattern) {
3343 redisSortOperation *so = zmalloc(sizeof(*so));
3344 if (!so) oom("createSortOperation");
3345 so->type = type;
3346 so->pattern = pattern;
3347 return so;
3348 }
3349
3350 /* Return the value associated to the key with a name obtained
3351 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3352 robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3353 char *p;
3354 sds spat, ssub;
3355 robj keyobj;
3356 int prefixlen, sublen, postfixlen;
3357 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3358 struct {
3359 long len;
3360 long free;
3361 char buf[REDIS_SORTKEY_MAX+1];
3362 } keyname;
3363
3364 spat = pattern->ptr;
3365 ssub = subst->ptr;
3366 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3367 p = strchr(spat,'*');
3368 if (!p) return NULL;
3369
3370 prefixlen = p-spat;
3371 sublen = sdslen(ssub);
3372 postfixlen = sdslen(spat)-(prefixlen+1);
3373 memcpy(keyname.buf,spat,prefixlen);
3374 memcpy(keyname.buf+prefixlen,ssub,sublen);
3375 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3376 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3377 keyname.len = prefixlen+sublen+postfixlen;
3378
3379 keyobj.refcount = 1;
3380 keyobj.type = REDIS_STRING;
3381 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3382
3383 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3384 return lookupKeyRead(db,&keyobj);
3385 }
3386
3387 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3388 * the additional parameter is not standard but a BSD-specific we have to
3389 * pass sorting parameters via the global 'server' structure */
3390 static int sortCompare(const void *s1, const void *s2) {
3391 const redisSortObject *so1 = s1, *so2 = s2;
3392 int cmp;
3393
3394 if (!server.sort_alpha) {
3395 /* Numeric sorting. Here it's trivial as we precomputed scores */
3396 if (so1->u.score > so2->u.score) {
3397 cmp = 1;
3398 } else if (so1->u.score < so2->u.score) {
3399 cmp = -1;
3400 } else {
3401 cmp = 0;
3402 }
3403 } else {
3404 /* Alphanumeric sorting */
3405 if (server.sort_bypattern) {
3406 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3407 /* At least one compare object is NULL */
3408 if (so1->u.cmpobj == so2->u.cmpobj)
3409 cmp = 0;
3410 else if (so1->u.cmpobj == NULL)
3411 cmp = -1;
3412 else
3413 cmp = 1;
3414 } else {
3415 /* We have both the objects, use strcoll */
3416 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3417 }
3418 } else {
3419 /* Compare elements directly */
3420 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3421 }
3422 }
3423 return server.sort_desc ? -cmp : cmp;
3424 }
3425
3426 /* The SORT command is the most complex command in Redis. Warning: this code
3427 * is optimized for speed and a bit less for readability */
3428 static void sortCommand(redisClient *c) {
3429 list *operations;
3430 int outputlen = 0;
3431 int desc = 0, alpha = 0;
3432 int limit_start = 0, limit_count = -1, start, end;
3433 int j, dontsort = 0, vectorlen;
3434 int getop = 0; /* GET operation counter */
3435 robj *sortval, *sortby = NULL;
3436 redisSortObject *vector; /* Resulting vector to sort */
3437
3438 /* Lookup the key to sort. It must be of the right types */
3439 sortval = lookupKeyRead(c->db,c->argv[1]);
3440 if (sortval == NULL) {
3441 addReply(c,shared.nokeyerr);
3442 return;
3443 }
3444 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3445 addReply(c,shared.wrongtypeerr);
3446 return;
3447 }
3448
3449 /* Create a list of operations to perform for every sorted element.
3450 * Operations can be GET/DEL/INCR/DECR */
3451 operations = listCreate();
3452 listSetFreeMethod(operations,zfree);
3453 j = 2;
3454
3455 /* Now we need to protect sortval incrementing its count, in the future
3456 * SORT may have options able to overwrite/delete keys during the sorting
3457 * and the sorted key itself may get destroied */
3458 incrRefCount(sortval);
3459
3460 /* The SORT command has an SQL-alike syntax, parse it */
3461 while(j < c->argc) {
3462 int leftargs = c->argc-j-1;
3463 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3464 desc = 0;
3465 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3466 desc = 1;
3467 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3468 alpha = 1;
3469 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3470 limit_start = atoi(c->argv[j+1]->ptr);
3471 limit_count = atoi(c->argv[j+2]->ptr);
3472 j+=2;
3473 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3474 sortby = c->argv[j+1];
3475 /* If the BY pattern does not contain '*', i.e. it is constant,
3476 * we don't need to sort nor to lookup the weight keys. */
3477 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3478 j++;
3479 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3480 listAddNodeTail(operations,createSortOperation(
3481 REDIS_SORT_GET,c->argv[j+1]));
3482 getop++;
3483 j++;
3484 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3485 listAddNodeTail(operations,createSortOperation(
3486 REDIS_SORT_DEL,c->argv[j+1]));
3487 j++;
3488 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3489 listAddNodeTail(operations,createSortOperation(
3490 REDIS_SORT_INCR,c->argv[j+1]));
3491 j++;
3492 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3493 listAddNodeTail(operations,createSortOperation(
3494 REDIS_SORT_DECR,c->argv[j+1]));
3495 j++;
3496 } else {
3497 decrRefCount(sortval);
3498 listRelease(operations);
3499 addReply(c,shared.syntaxerr);
3500 return;
3501 }
3502 j++;
3503 }
3504
3505 /* Load the sorting vector with all the objects to sort */
3506 vectorlen = (sortval->type == REDIS_LIST) ?
3507 listLength((list*)sortval->ptr) :
3508 dictSize((dict*)sortval->ptr);
3509 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3510 if (!vector) oom("allocating objects vector for SORT");
3511 j = 0;
3512 if (sortval->type == REDIS_LIST) {
3513 list *list = sortval->ptr;
3514 listNode *ln;
3515
3516 listRewind(list);
3517 while((ln = listYield(list))) {
3518 robj *ele = ln->value;
3519 vector[j].obj = ele;
3520 vector[j].u.score = 0;
3521 vector[j].u.cmpobj = NULL;
3522 j++;
3523 }
3524 } else {
3525 dict *set = sortval->ptr;
3526 dictIterator *di;
3527 dictEntry *setele;
3528
3529 di = dictGetIterator(set);
3530 if (!di) oom("dictGetIterator");
3531 while((setele = dictNext(di)) != NULL) {
3532 vector[j].obj = dictGetEntryKey(setele);
3533 vector[j].u.score = 0;
3534 vector[j].u.cmpobj = NULL;
3535 j++;
3536 }
3537 dictReleaseIterator(di);
3538 }
3539 assert(j == vectorlen);
3540
3541 /* Now it's time to load the right scores in the sorting vector */
3542 if (dontsort == 0) {
3543 for (j = 0; j < vectorlen; j++) {
3544 if (sortby) {
3545 robj *byval;
3546
3547 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3548 if (!byval || byval->type != REDIS_STRING) continue;
3549 if (alpha) {
3550 vector[j].u.cmpobj = byval;
3551 incrRefCount(byval);
3552 } else {
3553 vector[j].u.score = strtod(byval->ptr,NULL);
3554 }
3555 } else {
3556 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3557 }
3558 }
3559 }
3560
3561 /* We are ready to sort the vector... perform a bit of sanity check
3562 * on the LIMIT option too. We'll use a partial version of quicksort. */
3563 start = (limit_start < 0) ? 0 : limit_start;
3564 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3565 if (start >= vectorlen) {
3566 start = vectorlen-1;
3567 end = vectorlen-2;
3568 }
3569 if (end >= vectorlen) end = vectorlen-1;
3570
3571 if (dontsort == 0) {
3572 server.sort_desc = desc;
3573 server.sort_alpha = alpha;
3574 server.sort_bypattern = sortby ? 1 : 0;
3575 if (sortby && (start != 0 || end != vectorlen-1))
3576 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
3577 else
3578 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3579 }
3580
3581 /* Send command output to the output buffer, performing the specified
3582 * GET/DEL/INCR/DECR operations if any. */
3583 outputlen = getop ? getop*(end-start+1) : end-start+1;
3584 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3585 for (j = start; j <= end; j++) {
3586 listNode *ln;
3587 if (!getop) {
3588 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3589 sdslen(vector[j].obj->ptr)));
3590 addReply(c,vector[j].obj);
3591 addReply(c,shared.crlf);
3592 }
3593 listRewind(operations);
3594 while((ln = listYield(operations))) {
3595 redisSortOperation *sop = ln->value;
3596 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3597 vector[j].obj);
3598
3599 if (sop->type == REDIS_SORT_GET) {
3600 if (!val || val->type != REDIS_STRING) {
3601 addReply(c,shared.nullbulk);
3602 } else {
3603 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3604 sdslen(val->ptr)));
3605 addReply(c,val);
3606 addReply(c,shared.crlf);
3607 }
3608 } else if (sop->type == REDIS_SORT_DEL) {
3609 /* TODO */
3610 }
3611 }
3612 }
3613
3614 /* Cleanup */
3615 decrRefCount(sortval);
3616 listRelease(operations);
3617 for (j = 0; j < vectorlen; j++) {
3618 if (sortby && alpha && vector[j].u.cmpobj)
3619 decrRefCount(vector[j].u.cmpobj);
3620 }
3621 zfree(vector);
3622 }
3623
3624 static void infoCommand(redisClient *c) {
3625 sds info;
3626 time_t uptime = time(NULL)-server.stat_starttime;
3627
3628 info = sdscatprintf(sdsempty(),
3629 "redis_version:%s\r\n"
3630 "uptime_in_seconds:%d\r\n"
3631 "uptime_in_days:%d\r\n"
3632 "connected_clients:%d\r\n"
3633 "connected_slaves:%d\r\n"
3634 "used_memory:%zu\r\n"
3635 "changes_since_last_save:%lld\r\n"
3636 "bgsave_in_progress:%d\r\n"
3637 "last_save_time:%d\r\n"
3638 "total_connections_received:%lld\r\n"
3639 "total_commands_processed:%lld\r\n"
3640 "role:%s\r\n"
3641 ,REDIS_VERSION,
3642 uptime,
3643 uptime/(3600*24),
3644 listLength(server.clients)-listLength(server.slaves),
3645 listLength(server.slaves),
3646 server.usedmemory,
3647 server.dirty,
3648 server.bgsaveinprogress,
3649 server.lastsave,
3650 server.stat_numconnections,
3651 server.stat_numcommands,
3652 server.masterhost == NULL ? "master" : "slave"
3653 );
3654 if (server.masterhost) {
3655 info = sdscatprintf(info,
3656 "master_host:%s\r\n"
3657 "master_port:%d\r\n"
3658 "master_link_status:%s\r\n"
3659 "master_last_io_seconds_ago:%d\r\n"
3660 ,server.masterhost,
3661 server.masterport,
3662 (server.replstate == REDIS_REPL_CONNECTED) ?
3663 "up" : "down",
3664 (int)(time(NULL)-server.master->lastinteraction)
3665 );
3666 }
3667 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3668 addReplySds(c,info);
3669 addReply(c,shared.crlf);
3670 }
3671
3672 static void monitorCommand(redisClient *c) {
3673 /* ignore MONITOR if aleady slave or in monitor mode */
3674 if (c->flags & REDIS_SLAVE) return;
3675
3676 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3677 c->slaveseldb = 0;
3678 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3679 addReply(c,shared.ok);
3680 }
3681
3682 /* ================================= Expire ================================= */
3683 static int removeExpire(redisDb *db, robj *key) {
3684 if (dictDelete(db->expires,key) == DICT_OK) {
3685 return 1;
3686 } else {
3687 return 0;
3688 }
3689 }
3690
3691 static int setExpire(redisDb *db, robj *key, time_t when) {
3692 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3693 return 0;
3694 } else {
3695 incrRefCount(key);
3696 return 1;
3697 }
3698 }
3699
3700 /* Return the expire time of the specified key, or -1 if no expire
3701 * is associated with this key (i.e. the key is non volatile) */
3702 static time_t getExpire(redisDb *db, robj *key) {
3703 dictEntry *de;
3704
3705 /* No expire? return ASAP */
3706 if (dictSize(db->expires) == 0 ||
3707 (de = dictFind(db->expires,key)) == NULL) return -1;
3708
3709 return (time_t) dictGetEntryVal(de);
3710 }
3711
3712 static int expireIfNeeded(redisDb *db, robj *key) {
3713 time_t when;
3714 dictEntry *de;
3715
3716 /* No expire? return ASAP */
3717 if (dictSize(db->expires) == 0 ||
3718 (de = dictFind(db->expires,key)) == NULL) return 0;
3719
3720 /* Lookup the expire */
3721 when = (time_t) dictGetEntryVal(de);
3722 if (time(NULL) <= when) return 0;
3723
3724 /* Delete the key */
3725 dictDelete(db->expires,key);
3726 return dictDelete(db->dict,key) == DICT_OK;
3727 }
3728
3729 static int deleteIfVolatile(redisDb *db, robj *key) {
3730 dictEntry *de;
3731
3732 /* No expire? return ASAP */
3733 if (dictSize(db->expires) == 0 ||
3734 (de = dictFind(db->expires,key)) == NULL) return 0;
3735
3736 /* Delete the key */
3737 server.dirty++;
3738 dictDelete(db->expires,key);
3739 return dictDelete(db->dict,key) == DICT_OK;
3740 }
3741
3742 static void expireCommand(redisClient *c) {
3743 dictEntry *de;
3744 int seconds = atoi(c->argv[2]->ptr);
3745
3746 de = dictFind(c->db->dict,c->argv[1]);
3747 if (de == NULL) {
3748 addReply(c,shared.czero);
3749 return;
3750 }
3751 if (seconds <= 0) {
3752 addReply(c, shared.czero);
3753 return;
3754 } else {
3755 time_t when = time(NULL)+seconds;
3756 if (setExpire(c->db,c->argv[1],when))
3757 addReply(c,shared.cone);
3758 else
3759 addReply(c,shared.czero);
3760 return;
3761 }
3762 }
3763
3764 static void ttlCommand(redisClient *c) {
3765 time_t expire;
3766 int ttl = -1;
3767
3768 expire = getExpire(c->db,c->argv[1]);
3769 if (expire != -1) {
3770 ttl = (int) (expire-time(NULL));
3771 if (ttl < 0) ttl = -1;
3772 }
3773 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
3774 }
3775
3776 /* =============================== Replication ============================= */
3777
3778 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3779 ssize_t nwritten, ret = size;
3780 time_t start = time(NULL);
3781
3782 timeout++;
3783 while(size) {
3784 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3785 nwritten = write(fd,ptr,size);
3786 if (nwritten == -1) return -1;
3787 ptr += nwritten;
3788 size -= nwritten;
3789 }
3790 if ((time(NULL)-start) > timeout) {
3791 errno = ETIMEDOUT;
3792 return -1;
3793 }
3794 }
3795 return ret;
3796 }
3797
3798 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3799 ssize_t nread, totread = 0;
3800 time_t start = time(NULL);
3801
3802 timeout++;
3803 while(size) {
3804 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3805 nread = read(fd,ptr,size);
3806 if (nread == -1) return -1;
3807 ptr += nread;
3808 size -= nread;
3809 totread += nread;
3810 }
3811 if ((time(NULL)-start) > timeout) {
3812 errno = ETIMEDOUT;
3813 return -1;
3814 }
3815 }
3816 return totread;
3817 }
3818
3819 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3820 ssize_t nread = 0;
3821
3822 size--;
3823 while(size) {
3824 char c;
3825
3826 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3827 if (c == '\n') {
3828 *ptr = '\0';
3829 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3830 return nread;
3831 } else {
3832 *ptr++ = c;
3833 *ptr = '\0';
3834 nread++;
3835 }
3836 }
3837 return nread;
3838 }
3839
3840 static void syncCommand(redisClient *c) {
3841 /* ignore SYNC if aleady slave or in monitor mode */
3842 if (c->flags & REDIS_SLAVE) return;
3843
3844 /* SYNC can't be issued when the server has pending data to send to
3845 * the client about already issued commands. We need a fresh reply
3846 * buffer registering the differences between the BGSAVE and the current
3847 * dataset, so that we can copy to other slaves if needed. */
3848 if (listLength(c->reply) != 0) {
3849 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3850 return;
3851 }
3852
3853 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3854 /* Here we need to check if there is a background saving operation
3855 * in progress, or if it is required to start one */
3856 if (server.bgsaveinprogress) {
3857 /* Ok a background save is in progress. Let's check if it is a good
3858 * one for replication, i.e. if there is another slave that is
3859 * registering differences since the server forked to save */
3860 redisClient *slave;
3861 listNode *ln;
3862
3863 listRewind(server.slaves);
3864 while((ln = listYield(server.slaves))) {
3865 slave = ln->value;
3866 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3867 }
3868 if (ln) {
3869 /* Perfect, the server is already registering differences for
3870 * another slave. Set the right state, and copy the buffer. */
3871 listRelease(c->reply);
3872 c->reply = listDup(slave->reply);
3873 if (!c->reply) oom("listDup copying slave reply list");
3874 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3875 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3876 } else {
3877 /* No way, we need to wait for the next BGSAVE in order to
3878 * register differences */
3879 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3880 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3881 }
3882 } else {
3883 /* Ok we don't have a BGSAVE in progress, let's start one */
3884 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3885 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3886 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3887 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3888 return;
3889 }
3890 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3891 }
3892 c->repldbfd = -1;
3893 c->flags |= REDIS_SLAVE;
3894 c->slaveseldb = 0;
3895 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3896 return;
3897 }
3898
3899 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
3900 redisClient *slave = privdata;
3901 REDIS_NOTUSED(el);
3902 REDIS_NOTUSED(mask);
3903 char buf[REDIS_IOBUF_LEN];
3904 ssize_t nwritten, buflen;
3905
3906 if (slave->repldboff == 0) {
3907 /* Write the bulk write count before to transfer the DB. In theory here
3908 * we don't know how much room there is in the output buffer of the
3909 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
3910 * operations) will never be smaller than the few bytes we need. */
3911 sds bulkcount;
3912
3913 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
3914 slave->repldbsize);
3915 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
3916 {
3917 sdsfree(bulkcount);
3918 freeClient(slave);
3919 return;
3920 }
3921 sdsfree(bulkcount);
3922 }
3923 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
3924 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
3925 if (buflen <= 0) {
3926 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
3927 (buflen == 0) ? "premature EOF" : strerror(errno));
3928 freeClient(slave);
3929 return;
3930 }
3931 if ((nwritten = write(fd,buf,buflen)) == -1) {
3932 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
3933 strerror(errno));
3934 freeClient(slave);
3935 return;
3936 }
3937 slave->repldboff += nwritten;
3938 if (slave->repldboff == slave->repldbsize) {
3939 close(slave->repldbfd);
3940 slave->repldbfd = -1;
3941 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3942 slave->replstate = REDIS_REPL_ONLINE;
3943 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
3944 sendReplyToClient, slave, NULL) == AE_ERR) {
3945 freeClient(slave);
3946 return;
3947 }
3948 addReplySds(slave,sdsempty());
3949 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3950 }
3951 }
3952
3953 static void updateSalvesWaitingBgsave(int bgsaveerr) {
3954 listNode *ln;
3955 int startbgsave = 0;
3956
3957 listRewind(server.slaves);
3958 while((ln = listYield(server.slaves))) {
3959 redisClient *slave = ln->value;
3960
3961 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
3962 startbgsave = 1;
3963 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3964 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
3965 struct stat buf;
3966
3967 if (bgsaveerr != REDIS_OK) {
3968 freeClient(slave);
3969 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
3970 continue;
3971 }
3972 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
3973 fstat(slave->repldbfd,&buf) == -1) {
3974 freeClient(slave);
3975 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
3976 continue;
3977 }
3978 slave->repldboff = 0;
3979 slave->repldbsize = buf.st_size;
3980 slave->replstate = REDIS_REPL_SEND_BULK;
3981 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
3982 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave, NULL) == AE_ERR) {
3983 freeClient(slave);
3984 continue;
3985 }
3986 }
3987 }
3988 if (startbgsave) {
3989 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3990 listRewind(server.slaves);
3991 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
3992 while((ln = listYield(server.slaves))) {
3993 redisClient *slave = ln->value;
3994
3995 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
3996 freeClient(slave);
3997 }
3998 }
3999 }
4000 }
4001
4002 static int syncWithMaster(void) {
4003 char buf[1024], tmpfile[256];
4004 int dumpsize;
4005 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
4006 int dfd;
4007
4008 if (fd == -1) {
4009 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
4010 strerror(errno));
4011 return REDIS_ERR;
4012 }
4013 /* Issue the SYNC command */
4014 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
4015 close(fd);
4016 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
4017 strerror(errno));
4018 return REDIS_ERR;
4019 }
4020 /* Read the bulk write count */
4021 if (syncReadLine(fd,buf,1024,3600) == -1) {
4022 close(fd);
4023 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
4024 strerror(errno));
4025 return REDIS_ERR;
4026 }
4027 dumpsize = atoi(buf+1);
4028 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
4029 /* Read the bulk write data on a temp file */
4030 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
4031 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
4032 if (dfd == -1) {
4033 close(fd);
4034 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
4035 return REDIS_ERR;
4036 }
4037 while(dumpsize) {
4038 int nread, nwritten;
4039
4040 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
4041 if (nread == -1) {
4042 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
4043 strerror(errno));
4044 close(fd);
4045 close(dfd);
4046 return REDIS_ERR;
4047 }
4048 nwritten = write(dfd,buf,nread);
4049 if (nwritten == -1) {
4050 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
4051 close(fd);
4052 close(dfd);
4053 return REDIS_ERR;
4054 }
4055 dumpsize -= nread;
4056 }
4057 close(dfd);
4058 if (rename(tmpfile,server.dbfilename) == -1) {
4059 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
4060 unlink(tmpfile);
4061 close(fd);
4062 return REDIS_ERR;
4063 }
4064 emptyDb();
4065 if (rdbLoad(server.dbfilename) != REDIS_OK) {
4066 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
4067 close(fd);
4068 return REDIS_ERR;
4069 }
4070 server.master = createClient(fd);
4071 server.master->flags |= REDIS_MASTER;
4072 server.replstate = REDIS_REPL_CONNECTED;
4073 return REDIS_OK;
4074 }
4075
4076 static void slaveofCommand(redisClient *c) {
4077 if (!strcasecmp(c->argv[1]->ptr,"no") &&
4078 !strcasecmp(c->argv[2]->ptr,"one")) {
4079 if (server.masterhost) {
4080 sdsfree(server.masterhost);
4081 server.masterhost = NULL;
4082 if (server.master) freeClient(server.master);
4083 server.replstate = REDIS_REPL_NONE;
4084 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
4085 }
4086 } else {
4087 sdsfree(server.masterhost);
4088 server.masterhost = sdsdup(c->argv[1]->ptr);
4089 server.masterport = atoi(c->argv[2]->ptr);
4090 if (server.master) freeClient(server.master);
4091 server.replstate = REDIS_REPL_CONNECT;
4092 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
4093 server.masterhost, server.masterport);
4094 }
4095 addReply(c,shared.ok);
4096 }
4097
4098 /* ============================ Maxmemory directive ======================== */
4099
4100 /* This function gets called when 'maxmemory' is set on the config file to limit
4101 * the max memory used by the server, and we are out of memory.
4102 * This function will try to, in order:
4103 *
4104 * - Free objects from the free list
4105 * - Try to remove keys with an EXPIRE set
4106 *
4107 * It is not possible to free enough memory to reach used-memory < maxmemory
4108 * the server will start refusing commands that will enlarge even more the
4109 * memory usage.
4110 */
4111 static void freeMemoryIfNeeded(void) {
4112 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
4113 if (listLength(server.objfreelist)) {
4114 robj *o;
4115
4116 listNode *head = listFirst(server.objfreelist);
4117 o = listNodeValue(head);
4118 listDelNode(server.objfreelist,head);
4119 zfree(o);
4120 } else {
4121 int j, k, freed = 0;
4122
4123 for (j = 0; j < server.dbnum; j++) {
4124 int minttl = -1;
4125 robj *minkey = NULL;
4126 struct dictEntry *de;
4127
4128 if (dictSize(server.db[j].expires)) {
4129 freed = 1;
4130 /* From a sample of three keys drop the one nearest to
4131 * the natural expire */
4132 for (k = 0; k < 3; k++) {
4133 time_t t;
4134
4135 de = dictGetRandomKey(server.db[j].expires);
4136 t = (time_t) dictGetEntryVal(de);
4137 if (minttl == -1 || t < minttl) {
4138 minkey = dictGetEntryKey(de);
4139 minttl = t;
4140 }
4141 }
4142 deleteKey(server.db+j,minkey);
4143 }
4144 }
4145 if (!freed) return; /* nothing to free... */
4146 }
4147 }
4148 }
4149
4150 /* ================================= Debugging ============================== */
4151
4152 static void debugCommand(redisClient *c) {
4153 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
4154 *((char*)-1) = 'x';
4155 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
4156 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
4157 robj *key, *val;
4158
4159 if (!de) {
4160 addReply(c,shared.nokeyerr);
4161 return;
4162 }
4163 key = dictGetEntryKey(de);
4164 val = dictGetEntryVal(de);
4165 addReplySds(c,sdscatprintf(sdsempty(),
4166 "+Key at:%p refcount:%d, value at:%p refcount:%d\r\n",
4167 key, key->refcount, val, val->refcount));
4168 } else {
4169 addReplySds(c,sdsnew(
4170 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>]\r\n"));
4171 }
4172 }
4173 char *findFuncName(void *pointer, long *offset){
4174 int i, ret=-1;
4175 long val, off;
4176 for(i=0; symsTable[i].pointer!=0; i++){
4177 val=(long)pointer-symsTable[i].pointer;
4178 if(val>=0 && (off<0 || val <= off)){
4179 off=val;
4180 ret=i;
4181 }
4182 }
4183 if(ret<0)
4184 *offset=0;
4185 else
4186 *offset=off;
4187 return ret>=0?symsTable[ret].name:"unknown";
4188 }
4189
4190 static void segvHandler (int sig, siginfo_t *info, void *secret) {
4191
4192 void *trace[100];
4193 char **messages = (char **)NULL;
4194 char *tmp;
4195 int i, trace_size = 0;
4196 long offset=0;
4197 ucontext_t *uc = (ucontext_t *)secret;
4198
4199 redisLog(REDIS_DEBUG, "Segmentation fault -%d- Redis %s", sig, REDIS_VERSION );
4200 redisLog(REDIS_DEBUG,"EIP %p", (void *)uc->uc_mcontext.gregs[REG_EIP]);
4201 redisLog(REDIS_DEBUG,"EAX %p, EBX %p, ECX %p, EDX %p", (void *)uc->uc_mcontext.gregs[REG_EAX], (void *)uc->uc_mcontext.gregs[REG_EBX], (void *)uc->uc_mcontext.gregs[REG_ECX], (void *)uc->uc_mcontext.gregs[REG_EDX]);
4202
4203
4204 trace_size = backtrace(trace, 100);
4205 char pointer[trace_size][11];
4206 /* overwrite sigaction with caller's address */
4207 trace[1] = (void *) uc->uc_mcontext.gregs[REG_EIP];
4208 for (i=1; i<trace_size; ++i)
4209 snprintf(pointer[i],11,"[%p]", trace[i]);
4210
4211 messages = backtrace_symbols(trace, trace_size);
4212
4213 for (i=1; i<trace_size; ++i){
4214 tmp=strstr(messages[i],pointer[i]);
4215 if((tmp-2)[0]!=')'){
4216 char *a=findFuncName(trace[i], &offset);
4217 redisLog(REDIS_DEBUG,"[bt] (%s+%x) %s", a, (unsigned int)offset, tmp);
4218 }
4219 else
4220 redisLog(REDIS_DEBUG,"[bt] %s", messages[i]);
4221 }
4222
4223 free(messages);
4224 exit(0);
4225 }
4226
4227 void setupSigSegvAction(){
4228 struct sigaction act;
4229 sigemptyset (&act.sa_mask);
4230 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used. Otherwise, sa_handler is used */
4231 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
4232 act.sa_sigaction = segvHandler;
4233 sigaction (SIGSEGV, &act, NULL);
4234 }
4235 /* =================================== Main! ================================ */
4236
4237 #ifdef __linux__
4238 int linuxOvercommitMemoryValue(void) {
4239 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
4240 char buf[64];
4241
4242 if (!fp) return -1;
4243 if (fgets(buf,64,fp) == NULL) {
4244 fclose(fp);
4245 return -1;
4246 }
4247 fclose(fp);
4248
4249 return atoi(buf);
4250 }
4251
4252 void linuxOvercommitMemoryWarning(void) {
4253 if (linuxOvercommitMemoryValue() == 0) {
4254 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'echo 1 > /proc/sys/vm/overcommit_memory' in your init scripts.");
4255 }
4256 }
4257 #endif /* __linux__ */
4258
4259 static void daemonize(void) {
4260 int fd;
4261 FILE *fp;
4262
4263 if (fork() != 0) exit(0); /* parent exits */
4264 setsid(); /* create a new session */
4265
4266 /* Every output goes to /dev/null. If Redis is daemonized but
4267 * the 'logfile' is set to 'stdout' in the configuration file
4268 * it will not log at all. */
4269 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
4270 dup2(fd, STDIN_FILENO);
4271 dup2(fd, STDOUT_FILENO);
4272 dup2(fd, STDERR_FILENO);
4273 if (fd > STDERR_FILENO) close(fd);
4274 }
4275 /* Try to write the pid file */
4276 fp = fopen(server.pidfile,"w");
4277 if (fp) {
4278 fprintf(fp,"%d\n",getpid());
4279 fclose(fp);
4280 }
4281 }
4282
4283 int main(int argc, char **argv) {
4284 #ifdef __linux__
4285 linuxOvercommitMemoryWarning();
4286 #endif
4287
4288 initServerConfig();
4289 if (argc == 2) {
4290 ResetServerSaveParams();
4291 loadServerConfig(argv[1]);
4292 } else if (argc > 2) {
4293 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
4294 exit(1);
4295 } else {
4296 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
4297 }
4298 initServer();
4299 if (server.daemonize) daemonize();
4300 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
4301 if (rdbLoad(server.dbfilename) == REDIS_OK)
4302 redisLog(REDIS_NOTICE,"DB loaded from disk");
4303 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
4304 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
4305 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
4306 aeMain(server.el);
4307 aeDeleteEventLoop(server.el);
4308 return 0;
4309 }