]> git.saurik.com Git - redis.git/blob - redis.c
1236d9b5bff0c4e4bdde13692cfd73a1758a79e2
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.07"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /*================================= Data types ============================== */
202
203 /* A redis object, that is a type able to hold a string / list / set */
204 typedef struct redisObject {
205 void *ptr;
206 unsigned char type;
207 unsigned char encoding;
208 unsigned char notused[2];
209 int refcount;
210 } robj;
211
212 typedef struct redisDb {
213 dict *dict;
214 dict *expires;
215 int id;
216 } redisDb;
217
218 /* With multiplexing we need to take per-clinet state.
219 * Clients are taken in a liked list. */
220 typedef struct redisClient {
221 int fd;
222 redisDb *db;
223 int dictid;
224 sds querybuf;
225 robj **argv, **mbargv;
226 int argc, mbargc;
227 int bulklen; /* bulk read len. -1 if not in bulk read mode */
228 int multibulk; /* multi bulk command format active */
229 list *reply;
230 int sentlen;
231 time_t lastinteraction; /* time of the last interaction, used for timeout */
232 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
233 int slaveseldb; /* slave selected db, if this client is a slave */
234 int authenticated; /* when requirepass is non-NULL */
235 int replstate; /* replication state if this is a slave */
236 int repldbfd; /* replication DB file descriptor */
237 long repldboff; /* replication DB file offset */
238 off_t repldbsize; /* replication DB file size */
239 } redisClient;
240
241 struct saveparam {
242 time_t seconds;
243 int changes;
244 };
245
246 /* Global server state structure */
247 struct redisServer {
248 int port;
249 int fd;
250 redisDb *db;
251 dict *sharingpool;
252 unsigned int sharingpoolsize;
253 long long dirty; /* changes to DB from the last save */
254 list *clients;
255 list *slaves, *monitors;
256 char neterr[ANET_ERR_LEN];
257 aeEventLoop *el;
258 int cronloops; /* number of times the cron function run */
259 list *objfreelist; /* A list of freed objects to avoid malloc() */
260 time_t lastsave; /* Unix time of last save succeeede */
261 size_t usedmemory; /* Used memory in megabytes */
262 /* Fields used only for stats */
263 time_t stat_starttime; /* server start time */
264 long long stat_numcommands; /* number of processed commands */
265 long long stat_numconnections; /* number of connections received */
266 /* Configuration */
267 int verbosity;
268 int glueoutputbuf;
269 int maxidletime;
270 int dbnum;
271 int daemonize;
272 int appendonly;
273 int appendfsync;
274 time_t lastfsync;
275 int appendfd;
276 int appendseldb;
277 char *pidfile;
278 pid_t bgsavechildpid;
279 pid_t bgrewritechildpid;
280 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
281 struct saveparam *saveparams;
282 int saveparamslen;
283 char *logfile;
284 char *bindaddr;
285 char *dbfilename;
286 char *appendfilename;
287 char *requirepass;
288 int shareobjects;
289 /* Replication related */
290 int isslave;
291 char *masterauth;
292 char *masterhost;
293 int masterport;
294 redisClient *master; /* client that is master for this slave */
295 int replstate;
296 unsigned int maxclients;
297 unsigned long maxmemory;
298 /* Sort parameters - qsort_r() is only available under BSD so we
299 * have to take this state global, in order to pass it to sortCompare() */
300 int sort_desc;
301 int sort_alpha;
302 int sort_bypattern;
303 };
304
305 typedef void redisCommandProc(redisClient *c);
306 struct redisCommand {
307 char *name;
308 redisCommandProc *proc;
309 int arity;
310 int flags;
311 };
312
313 struct redisFunctionSym {
314 char *name;
315 unsigned long pointer;
316 };
317
318 typedef struct _redisSortObject {
319 robj *obj;
320 union {
321 double score;
322 robj *cmpobj;
323 } u;
324 } redisSortObject;
325
326 typedef struct _redisSortOperation {
327 int type;
328 robj *pattern;
329 } redisSortOperation;
330
331 /* ZSETs use a specialized version of Skiplists */
332
333 typedef struct zskiplistNode {
334 struct zskiplistNode **forward;
335 struct zskiplistNode *backward;
336 double score;
337 robj *obj;
338 } zskiplistNode;
339
340 typedef struct zskiplist {
341 struct zskiplistNode *header, *tail;
342 unsigned long length;
343 int level;
344 } zskiplist;
345
346 typedef struct zset {
347 dict *dict;
348 zskiplist *zsl;
349 } zset;
350
351 /* Our shared "common" objects */
352
353 struct sharedObjectsStruct {
354 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
355 *colon, *nullbulk, *nullmultibulk,
356 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
357 *outofrangeerr, *plus,
358 *select0, *select1, *select2, *select3, *select4,
359 *select5, *select6, *select7, *select8, *select9;
360 } shared;
361
362 /* Global vars that are actally used as constants. The following double
363 * values are used for double on-disk serialization, and are initialized
364 * at runtime to avoid strange compiler optimizations. */
365
366 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
367
368 /*================================ Prototypes =============================== */
369
370 static void freeStringObject(robj *o);
371 static void freeListObject(robj *o);
372 static void freeSetObject(robj *o);
373 static void decrRefCount(void *o);
374 static robj *createObject(int type, void *ptr);
375 static void freeClient(redisClient *c);
376 static int rdbLoad(char *filename);
377 static void addReply(redisClient *c, robj *obj);
378 static void addReplySds(redisClient *c, sds s);
379 static void incrRefCount(robj *o);
380 static int rdbSaveBackground(char *filename);
381 static robj *createStringObject(char *ptr, size_t len);
382 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
383 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
384 static int syncWithMaster(void);
385 static robj *tryObjectSharing(robj *o);
386 static int tryObjectEncoding(robj *o);
387 static robj *getDecodedObject(robj *o);
388 static int removeExpire(redisDb *db, robj *key);
389 static int expireIfNeeded(redisDb *db, robj *key);
390 static int deleteIfVolatile(redisDb *db, robj *key);
391 static int deleteKey(redisDb *db, robj *key);
392 static time_t getExpire(redisDb *db, robj *key);
393 static int setExpire(redisDb *db, robj *key, time_t when);
394 static void updateSlavesWaitingBgsave(int bgsaveerr);
395 static void freeMemoryIfNeeded(void);
396 static int processCommand(redisClient *c);
397 static void setupSigSegvAction(void);
398 static void rdbRemoveTempFile(pid_t childpid);
399 static void aofRemoveTempFile(pid_t childpid);
400 static size_t stringObjectLen(robj *o);
401 static void processInputBuffer(redisClient *c);
402 static zskiplist *zslCreate(void);
403 static void zslFree(zskiplist *zsl);
404 static void zslInsert(zskiplist *zsl, double score, robj *obj);
405 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
406
407 static void authCommand(redisClient *c);
408 static void pingCommand(redisClient *c);
409 static void echoCommand(redisClient *c);
410 static void setCommand(redisClient *c);
411 static void setnxCommand(redisClient *c);
412 static void getCommand(redisClient *c);
413 static void delCommand(redisClient *c);
414 static void existsCommand(redisClient *c);
415 static void incrCommand(redisClient *c);
416 static void decrCommand(redisClient *c);
417 static void incrbyCommand(redisClient *c);
418 static void decrbyCommand(redisClient *c);
419 static void selectCommand(redisClient *c);
420 static void randomkeyCommand(redisClient *c);
421 static void keysCommand(redisClient *c);
422 static void dbsizeCommand(redisClient *c);
423 static void lastsaveCommand(redisClient *c);
424 static void saveCommand(redisClient *c);
425 static void bgsaveCommand(redisClient *c);
426 static void bgrewriteaofCommand(redisClient *c);
427 static void shutdownCommand(redisClient *c);
428 static void moveCommand(redisClient *c);
429 static void renameCommand(redisClient *c);
430 static void renamenxCommand(redisClient *c);
431 static void lpushCommand(redisClient *c);
432 static void rpushCommand(redisClient *c);
433 static void lpopCommand(redisClient *c);
434 static void rpopCommand(redisClient *c);
435 static void llenCommand(redisClient *c);
436 static void lindexCommand(redisClient *c);
437 static void lrangeCommand(redisClient *c);
438 static void ltrimCommand(redisClient *c);
439 static void typeCommand(redisClient *c);
440 static void lsetCommand(redisClient *c);
441 static void saddCommand(redisClient *c);
442 static void sremCommand(redisClient *c);
443 static void smoveCommand(redisClient *c);
444 static void sismemberCommand(redisClient *c);
445 static void scardCommand(redisClient *c);
446 static void spopCommand(redisClient *c);
447 static void srandmemberCommand(redisClient *c);
448 static void sinterCommand(redisClient *c);
449 static void sinterstoreCommand(redisClient *c);
450 static void sunionCommand(redisClient *c);
451 static void sunionstoreCommand(redisClient *c);
452 static void sdiffCommand(redisClient *c);
453 static void sdiffstoreCommand(redisClient *c);
454 static void syncCommand(redisClient *c);
455 static void flushdbCommand(redisClient *c);
456 static void flushallCommand(redisClient *c);
457 static void sortCommand(redisClient *c);
458 static void lremCommand(redisClient *c);
459 static void rpoplpushcommand(redisClient *c);
460 static void infoCommand(redisClient *c);
461 static void mgetCommand(redisClient *c);
462 static void monitorCommand(redisClient *c);
463 static void expireCommand(redisClient *c);
464 static void expireatCommand(redisClient *c);
465 static void getsetCommand(redisClient *c);
466 static void ttlCommand(redisClient *c);
467 static void slaveofCommand(redisClient *c);
468 static void debugCommand(redisClient *c);
469 static void msetCommand(redisClient *c);
470 static void msetnxCommand(redisClient *c);
471 static void zaddCommand(redisClient *c);
472 static void zincrbyCommand(redisClient *c);
473 static void zrangeCommand(redisClient *c);
474 static void zrangebyscoreCommand(redisClient *c);
475 static void zrevrangeCommand(redisClient *c);
476 static void zcardCommand(redisClient *c);
477 static void zremCommand(redisClient *c);
478 static void zscoreCommand(redisClient *c);
479 static void zremrangebyscoreCommand(redisClient *c);
480
481 /*================================= Globals ================================= */
482
483 /* Global vars */
484 static struct redisServer server; /* server global state */
485 static struct redisCommand cmdTable[] = {
486 {"get",getCommand,2,REDIS_CMD_INLINE},
487 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
488 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
489 {"del",delCommand,-2,REDIS_CMD_INLINE},
490 {"exists",existsCommand,2,REDIS_CMD_INLINE},
491 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
492 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
493 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
494 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
495 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
496 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
497 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
498 {"llen",llenCommand,2,REDIS_CMD_INLINE},
499 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
500 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
501 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
502 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
503 {"lrem",lremCommand,4,REDIS_CMD_BULK},
504 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK},
505 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
506 {"srem",sremCommand,3,REDIS_CMD_BULK},
507 {"smove",smoveCommand,4,REDIS_CMD_BULK},
508 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
509 {"scard",scardCommand,2,REDIS_CMD_INLINE},
510 {"spop",spopCommand,2,REDIS_CMD_INLINE},
511 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
512 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
513 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
514 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
515 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
516 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
517 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
518 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
519 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
520 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"zrem",zremCommand,3,REDIS_CMD_BULK},
522 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
523 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
524 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
525 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
526 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
527 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
528 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
530 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
531 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
532 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
533 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
534 {"select",selectCommand,2,REDIS_CMD_INLINE},
535 {"move",moveCommand,3,REDIS_CMD_INLINE},
536 {"rename",renameCommand,3,REDIS_CMD_INLINE},
537 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
538 {"expire",expireCommand,3,REDIS_CMD_INLINE},
539 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
540 {"keys",keysCommand,2,REDIS_CMD_INLINE},
541 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
542 {"auth",authCommand,2,REDIS_CMD_INLINE},
543 {"ping",pingCommand,1,REDIS_CMD_INLINE},
544 {"echo",echoCommand,2,REDIS_CMD_BULK},
545 {"save",saveCommand,1,REDIS_CMD_INLINE},
546 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
547 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
548 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
549 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
550 {"type",typeCommand,2,REDIS_CMD_INLINE},
551 {"sync",syncCommand,1,REDIS_CMD_INLINE},
552 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
553 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
554 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
555 {"info",infoCommand,1,REDIS_CMD_INLINE},
556 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
557 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
558 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
559 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
560 {NULL,NULL,0,0}
561 };
562
563 /*============================ Utility functions ============================ */
564
565 /* Glob-style pattern matching. */
566 int stringmatchlen(const char *pattern, int patternLen,
567 const char *string, int stringLen, int nocase)
568 {
569 while(patternLen) {
570 switch(pattern[0]) {
571 case '*':
572 while (pattern[1] == '*') {
573 pattern++;
574 patternLen--;
575 }
576 if (patternLen == 1)
577 return 1; /* match */
578 while(stringLen) {
579 if (stringmatchlen(pattern+1, patternLen-1,
580 string, stringLen, nocase))
581 return 1; /* match */
582 string++;
583 stringLen--;
584 }
585 return 0; /* no match */
586 break;
587 case '?':
588 if (stringLen == 0)
589 return 0; /* no match */
590 string++;
591 stringLen--;
592 break;
593 case '[':
594 {
595 int not, match;
596
597 pattern++;
598 patternLen--;
599 not = pattern[0] == '^';
600 if (not) {
601 pattern++;
602 patternLen--;
603 }
604 match = 0;
605 while(1) {
606 if (pattern[0] == '\\') {
607 pattern++;
608 patternLen--;
609 if (pattern[0] == string[0])
610 match = 1;
611 } else if (pattern[0] == ']') {
612 break;
613 } else if (patternLen == 0) {
614 pattern--;
615 patternLen++;
616 break;
617 } else if (pattern[1] == '-' && patternLen >= 3) {
618 int start = pattern[0];
619 int end = pattern[2];
620 int c = string[0];
621 if (start > end) {
622 int t = start;
623 start = end;
624 end = t;
625 }
626 if (nocase) {
627 start = tolower(start);
628 end = tolower(end);
629 c = tolower(c);
630 }
631 pattern += 2;
632 patternLen -= 2;
633 if (c >= start && c <= end)
634 match = 1;
635 } else {
636 if (!nocase) {
637 if (pattern[0] == string[0])
638 match = 1;
639 } else {
640 if (tolower((int)pattern[0]) == tolower((int)string[0]))
641 match = 1;
642 }
643 }
644 pattern++;
645 patternLen--;
646 }
647 if (not)
648 match = !match;
649 if (!match)
650 return 0; /* no match */
651 string++;
652 stringLen--;
653 break;
654 }
655 case '\\':
656 if (patternLen >= 2) {
657 pattern++;
658 patternLen--;
659 }
660 /* fall through */
661 default:
662 if (!nocase) {
663 if (pattern[0] != string[0])
664 return 0; /* no match */
665 } else {
666 if (tolower((int)pattern[0]) != tolower((int)string[0]))
667 return 0; /* no match */
668 }
669 string++;
670 stringLen--;
671 break;
672 }
673 pattern++;
674 patternLen--;
675 if (stringLen == 0) {
676 while(*pattern == '*') {
677 pattern++;
678 patternLen--;
679 }
680 break;
681 }
682 }
683 if (patternLen == 0 && stringLen == 0)
684 return 1;
685 return 0;
686 }
687
688 static void redisLog(int level, const char *fmt, ...) {
689 va_list ap;
690 FILE *fp;
691
692 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
693 if (!fp) return;
694
695 va_start(ap, fmt);
696 if (level >= server.verbosity) {
697 char *c = ".-*";
698 char buf[64];
699 time_t now;
700
701 now = time(NULL);
702 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
703 fprintf(fp,"%s %c ",buf,c[level]);
704 vfprintf(fp, fmt, ap);
705 fprintf(fp,"\n");
706 fflush(fp);
707 }
708 va_end(ap);
709
710 if (server.logfile) fclose(fp);
711 }
712
713 /*====================== Hash table type implementation ==================== */
714
715 /* This is an hash table type that uses the SDS dynamic strings libary as
716 * keys and radis objects as values (objects can hold SDS strings,
717 * lists, sets). */
718
719 static void dictVanillaFree(void *privdata, void *val)
720 {
721 DICT_NOTUSED(privdata);
722 zfree(val);
723 }
724
725 static int sdsDictKeyCompare(void *privdata, const void *key1,
726 const void *key2)
727 {
728 int l1,l2;
729 DICT_NOTUSED(privdata);
730
731 l1 = sdslen((sds)key1);
732 l2 = sdslen((sds)key2);
733 if (l1 != l2) return 0;
734 return memcmp(key1, key2, l1) == 0;
735 }
736
737 static void dictRedisObjectDestructor(void *privdata, void *val)
738 {
739 DICT_NOTUSED(privdata);
740
741 decrRefCount(val);
742 }
743
744 static int dictObjKeyCompare(void *privdata, const void *key1,
745 const void *key2)
746 {
747 const robj *o1 = key1, *o2 = key2;
748 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
749 }
750
751 static unsigned int dictObjHash(const void *key) {
752 const robj *o = key;
753 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
754 }
755
756 static int dictEncObjKeyCompare(void *privdata, const void *key1,
757 const void *key2)
758 {
759 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
760 int cmp;
761
762 o1 = getDecodedObject(o1);
763 o2 = getDecodedObject(o2);
764 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
765 decrRefCount(o1);
766 decrRefCount(o2);
767 return cmp;
768 }
769
770 static unsigned int dictEncObjHash(const void *key) {
771 robj *o = (robj*) key;
772
773 o = getDecodedObject(o);
774 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
775 decrRefCount(o);
776 return hash;
777 }
778
779 static dictType setDictType = {
780 dictEncObjHash, /* hash function */
781 NULL, /* key dup */
782 NULL, /* val dup */
783 dictEncObjKeyCompare, /* key compare */
784 dictRedisObjectDestructor, /* key destructor */
785 NULL /* val destructor */
786 };
787
788 static dictType zsetDictType = {
789 dictEncObjHash, /* hash function */
790 NULL, /* key dup */
791 NULL, /* val dup */
792 dictEncObjKeyCompare, /* key compare */
793 dictRedisObjectDestructor, /* key destructor */
794 dictVanillaFree /* val destructor */
795 };
796
797 static dictType hashDictType = {
798 dictObjHash, /* hash function */
799 NULL, /* key dup */
800 NULL, /* val dup */
801 dictObjKeyCompare, /* key compare */
802 dictRedisObjectDestructor, /* key destructor */
803 dictRedisObjectDestructor /* val destructor */
804 };
805
806 /* ========================= Random utility functions ======================= */
807
808 /* Redis generally does not try to recover from out of memory conditions
809 * when allocating objects or strings, it is not clear if it will be possible
810 * to report this condition to the client since the networking layer itself
811 * is based on heap allocation for send buffers, so we simply abort.
812 * At least the code will be simpler to read... */
813 static void oom(const char *msg) {
814 fprintf(stderr, "%s: Out of memory\n",msg);
815 fflush(stderr);
816 sleep(1);
817 abort();
818 }
819
820 /* ====================== Redis server networking stuff ===================== */
821 static void closeTimedoutClients(void) {
822 redisClient *c;
823 listNode *ln;
824 time_t now = time(NULL);
825
826 listRewind(server.clients);
827 while ((ln = listYield(server.clients)) != NULL) {
828 c = listNodeValue(ln);
829 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
830 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
831 (now - c->lastinteraction > server.maxidletime)) {
832 redisLog(REDIS_DEBUG,"Closing idle client");
833 freeClient(c);
834 }
835 }
836 }
837
838 static int htNeedsResize(dict *dict) {
839 long long size, used;
840
841 size = dictSlots(dict);
842 used = dictSize(dict);
843 return (size && used && size > DICT_HT_INITIAL_SIZE &&
844 (used*100/size < REDIS_HT_MINFILL));
845 }
846
847 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
848 * we resize the hash table to save memory */
849 static void tryResizeHashTables(void) {
850 int j;
851
852 for (j = 0; j < server.dbnum; j++) {
853 if (htNeedsResize(server.db[j].dict)) {
854 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
855 dictResize(server.db[j].dict);
856 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
857 }
858 if (htNeedsResize(server.db[j].expires))
859 dictResize(server.db[j].expires);
860 }
861 }
862
863 /* A background saving child (BGSAVE) terminated its work. Handle this. */
864 void backgroundSaveDoneHandler(int statloc) {
865 int exitcode = WEXITSTATUS(statloc);
866 int bysignal = WIFSIGNALED(statloc);
867
868 if (!bysignal && exitcode == 0) {
869 redisLog(REDIS_NOTICE,
870 "Background saving terminated with success");
871 server.dirty = 0;
872 server.lastsave = time(NULL);
873 } else if (!bysignal && exitcode != 0) {
874 redisLog(REDIS_WARNING, "Background saving error");
875 } else {
876 redisLog(REDIS_WARNING,
877 "Background saving terminated by signal");
878 rdbRemoveTempFile(server.bgsavechildpid);
879 }
880 server.bgsavechildpid = -1;
881 /* Possibly there are slaves waiting for a BGSAVE in order to be served
882 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
883 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
884 }
885
886 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
887 * Handle this. */
888 void backgroundRewriteDoneHandler(int statloc) {
889 int exitcode = WEXITSTATUS(statloc);
890 int bysignal = WIFSIGNALED(statloc);
891
892 if (!bysignal && exitcode == 0) {
893 int fd;
894 char tmpfile[256];
895
896 redisLog(REDIS_NOTICE,
897 "Background append only file rewriting terminated with success");
898 /* Now it's time to flush the differences accumulated by the parent */
899 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
900 fd = open(tmpfile,O_WRONLY|O_APPEND);
901 if (fd == -1) {
902 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
903 goto cleanup;
904 }
905 /* Flush our data... */
906 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
907 (signed) sdslen(server.bgrewritebuf)) {
908 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
909 close(fd);
910 goto cleanup;
911 }
912 redisLog(REDIS_WARNING,"Parent diff flushed into the new append log file with success");
913 /* Now our work is to rename the temp file into the stable file. And
914 * switch the file descriptor used by the server for append only. */
915 if (rename(tmpfile,server.appendfilename) == -1) {
916 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
917 close(fd);
918 goto cleanup;
919 }
920 /* Mission completed... almost */
921 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
922 if (server.appendfd != -1) {
923 /* If append only is actually enabled... */
924 close(server.appendfd);
925 server.appendfd = fd;
926 fsync(fd);
927 server.appendseldb = -1; /* Make sure it will issue SELECT */
928 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
929 } else {
930 /* If append only is disabled we just generate a dump in this
931 * format. Why not? */
932 close(fd);
933 }
934 } else if (!bysignal && exitcode != 0) {
935 redisLog(REDIS_WARNING, "Background append only file rewriting error");
936 } else {
937 redisLog(REDIS_WARNING,
938 "Background append only file rewriting terminated by signal");
939 }
940 cleanup:
941 sdsfree(server.bgrewritebuf);
942 server.bgrewritebuf = sdsempty();
943 aofRemoveTempFile(server.bgrewritechildpid);
944 server.bgrewritechildpid = -1;
945 }
946
947 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
948 int j, loops = server.cronloops++;
949 REDIS_NOTUSED(eventLoop);
950 REDIS_NOTUSED(id);
951 REDIS_NOTUSED(clientData);
952
953 /* Update the global state with the amount of used memory */
954 server.usedmemory = zmalloc_used_memory();
955
956 /* Show some info about non-empty databases */
957 for (j = 0; j < server.dbnum; j++) {
958 long long size, used, vkeys;
959
960 size = dictSlots(server.db[j].dict);
961 used = dictSize(server.db[j].dict);
962 vkeys = dictSize(server.db[j].expires);
963 if (!(loops % 5) && (used || vkeys)) {
964 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
965 /* dictPrintStats(server.dict); */
966 }
967 }
968
969 /* We don't want to resize the hash tables while a bacground saving
970 * is in progress: the saving child is created using fork() that is
971 * implemented with a copy-on-write semantic in most modern systems, so
972 * if we resize the HT while there is the saving child at work actually
973 * a lot of memory movements in the parent will cause a lot of pages
974 * copied. */
975 if (server.bgsavechildpid == -1) tryResizeHashTables();
976
977 /* Show information about connected clients */
978 if (!(loops % 5)) {
979 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
980 listLength(server.clients)-listLength(server.slaves),
981 listLength(server.slaves),
982 server.usedmemory,
983 dictSize(server.sharingpool));
984 }
985
986 /* Close connections of timedout clients */
987 if (server.maxidletime && !(loops % 10))
988 closeTimedoutClients();
989
990 /* Check if a background saving or AOF rewrite in progress terminated */
991 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
992 int statloc;
993 pid_t pid;
994
995 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
996 if (pid == server.bgsavechildpid) {
997 backgroundSaveDoneHandler(statloc);
998 } else {
999 backgroundRewriteDoneHandler(statloc);
1000 }
1001 }
1002 } else {
1003 /* If there is not a background saving in progress check if
1004 * we have to save now */
1005 time_t now = time(NULL);
1006 for (j = 0; j < server.saveparamslen; j++) {
1007 struct saveparam *sp = server.saveparams+j;
1008
1009 if (server.dirty >= sp->changes &&
1010 now-server.lastsave > sp->seconds) {
1011 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1012 sp->changes, sp->seconds);
1013 rdbSaveBackground(server.dbfilename);
1014 break;
1015 }
1016 }
1017 }
1018
1019 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1020 * will use few CPU cycles if there are few expiring keys, otherwise
1021 * it will get more aggressive to avoid that too much memory is used by
1022 * keys that can be removed from the keyspace. */
1023 for (j = 0; j < server.dbnum; j++) {
1024 int expired;
1025 redisDb *db = server.db+j;
1026
1027 /* Continue to expire if at the end of the cycle more than 25%
1028 * of the keys were expired. */
1029 do {
1030 int num = dictSize(db->expires);
1031 time_t now = time(NULL);
1032
1033 expired = 0;
1034 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1035 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1036 while (num--) {
1037 dictEntry *de;
1038 time_t t;
1039
1040 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1041 t = (time_t) dictGetEntryVal(de);
1042 if (now > t) {
1043 deleteKey(db,dictGetEntryKey(de));
1044 expired++;
1045 }
1046 }
1047 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1048 }
1049
1050 /* Check if we should connect to a MASTER */
1051 if (server.replstate == REDIS_REPL_CONNECT) {
1052 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1053 if (syncWithMaster() == REDIS_OK) {
1054 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1055 }
1056 }
1057 return 1000;
1058 }
1059
1060 static void createSharedObjects(void) {
1061 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1062 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1063 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1064 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1065 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1066 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1067 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1068 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1069 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1070 /* no such key */
1071 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1072 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1073 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1074 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1075 "-ERR no such key\r\n"));
1076 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1077 "-ERR syntax error\r\n"));
1078 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1079 "-ERR source and destination objects are the same\r\n"));
1080 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1081 "-ERR index out of range\r\n"));
1082 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1083 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1084 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1085 shared.select0 = createStringObject("select 0\r\n",10);
1086 shared.select1 = createStringObject("select 1\r\n",10);
1087 shared.select2 = createStringObject("select 2\r\n",10);
1088 shared.select3 = createStringObject("select 3\r\n",10);
1089 shared.select4 = createStringObject("select 4\r\n",10);
1090 shared.select5 = createStringObject("select 5\r\n",10);
1091 shared.select6 = createStringObject("select 6\r\n",10);
1092 shared.select7 = createStringObject("select 7\r\n",10);
1093 shared.select8 = createStringObject("select 8\r\n",10);
1094 shared.select9 = createStringObject("select 9\r\n",10);
1095 }
1096
1097 static void appendServerSaveParams(time_t seconds, int changes) {
1098 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1099 server.saveparams[server.saveparamslen].seconds = seconds;
1100 server.saveparams[server.saveparamslen].changes = changes;
1101 server.saveparamslen++;
1102 }
1103
1104 static void resetServerSaveParams() {
1105 zfree(server.saveparams);
1106 server.saveparams = NULL;
1107 server.saveparamslen = 0;
1108 }
1109
1110 static void initServerConfig() {
1111 server.dbnum = REDIS_DEFAULT_DBNUM;
1112 server.port = REDIS_SERVERPORT;
1113 server.verbosity = REDIS_DEBUG;
1114 server.maxidletime = REDIS_MAXIDLETIME;
1115 server.saveparams = NULL;
1116 server.logfile = NULL; /* NULL = log on standard output */
1117 server.bindaddr = NULL;
1118 server.glueoutputbuf = 1;
1119 server.daemonize = 0;
1120 server.appendonly = 0;
1121 server.appendfsync = APPENDFSYNC_ALWAYS;
1122 server.lastfsync = time(NULL);
1123 server.appendfd = -1;
1124 server.appendseldb = -1; /* Make sure the first time will not match */
1125 server.pidfile = "/var/run/redis.pid";
1126 server.dbfilename = "dump.rdb";
1127 server.appendfilename = "appendonly.aof";
1128 server.requirepass = NULL;
1129 server.shareobjects = 0;
1130 server.sharingpoolsize = 1024;
1131 server.maxclients = 0;
1132 server.maxmemory = 0;
1133 resetServerSaveParams();
1134
1135 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1136 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1137 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1138 /* Replication related */
1139 server.isslave = 0;
1140 server.masterauth = NULL;
1141 server.masterhost = NULL;
1142 server.masterport = 6379;
1143 server.master = NULL;
1144 server.replstate = REDIS_REPL_NONE;
1145
1146 /* Double constants initialization */
1147 R_Zero = 0.0;
1148 R_PosInf = 1.0/R_Zero;
1149 R_NegInf = -1.0/R_Zero;
1150 R_Nan = R_Zero/R_Zero;
1151 }
1152
1153 static void initServer() {
1154 int j;
1155
1156 signal(SIGHUP, SIG_IGN);
1157 signal(SIGPIPE, SIG_IGN);
1158 setupSigSegvAction();
1159
1160 server.clients = listCreate();
1161 server.slaves = listCreate();
1162 server.monitors = listCreate();
1163 server.objfreelist = listCreate();
1164 createSharedObjects();
1165 server.el = aeCreateEventLoop();
1166 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1167 server.sharingpool = dictCreate(&setDictType,NULL);
1168 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1169 if (server.fd == -1) {
1170 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1171 exit(1);
1172 }
1173 for (j = 0; j < server.dbnum; j++) {
1174 server.db[j].dict = dictCreate(&hashDictType,NULL);
1175 server.db[j].expires = dictCreate(&setDictType,NULL);
1176 server.db[j].id = j;
1177 }
1178 server.cronloops = 0;
1179 server.bgsavechildpid = -1;
1180 server.bgrewritechildpid = -1;
1181 server.bgrewritebuf = sdsempty();
1182 server.lastsave = time(NULL);
1183 server.dirty = 0;
1184 server.usedmemory = 0;
1185 server.stat_numcommands = 0;
1186 server.stat_numconnections = 0;
1187 server.stat_starttime = time(NULL);
1188 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1189
1190 if (server.appendonly) {
1191 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1192 if (server.appendfd == -1) {
1193 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1194 strerror(errno));
1195 exit(1);
1196 }
1197 }
1198 }
1199
1200 /* Empty the whole database */
1201 static long long emptyDb() {
1202 int j;
1203 long long removed = 0;
1204
1205 for (j = 0; j < server.dbnum; j++) {
1206 removed += dictSize(server.db[j].dict);
1207 dictEmpty(server.db[j].dict);
1208 dictEmpty(server.db[j].expires);
1209 }
1210 return removed;
1211 }
1212
1213 static int yesnotoi(char *s) {
1214 if (!strcasecmp(s,"yes")) return 1;
1215 else if (!strcasecmp(s,"no")) return 0;
1216 else return -1;
1217 }
1218
1219 /* I agree, this is a very rudimental way to load a configuration...
1220 will improve later if the config gets more complex */
1221 static void loadServerConfig(char *filename) {
1222 FILE *fp;
1223 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1224 int linenum = 0;
1225 sds line = NULL;
1226
1227 if (filename[0] == '-' && filename[1] == '\0')
1228 fp = stdin;
1229 else {
1230 if ((fp = fopen(filename,"r")) == NULL) {
1231 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1232 exit(1);
1233 }
1234 }
1235
1236 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1237 sds *argv;
1238 int argc, j;
1239
1240 linenum++;
1241 line = sdsnew(buf);
1242 line = sdstrim(line," \t\r\n");
1243
1244 /* Skip comments and blank lines*/
1245 if (line[0] == '#' || line[0] == '\0') {
1246 sdsfree(line);
1247 continue;
1248 }
1249
1250 /* Split into arguments */
1251 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1252 sdstolower(argv[0]);
1253
1254 /* Execute config directives */
1255 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1256 server.maxidletime = atoi(argv[1]);
1257 if (server.maxidletime < 0) {
1258 err = "Invalid timeout value"; goto loaderr;
1259 }
1260 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1261 server.port = atoi(argv[1]);
1262 if (server.port < 1 || server.port > 65535) {
1263 err = "Invalid port"; goto loaderr;
1264 }
1265 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1266 server.bindaddr = zstrdup(argv[1]);
1267 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1268 int seconds = atoi(argv[1]);
1269 int changes = atoi(argv[2]);
1270 if (seconds < 1 || changes < 0) {
1271 err = "Invalid save parameters"; goto loaderr;
1272 }
1273 appendServerSaveParams(seconds,changes);
1274 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1275 if (chdir(argv[1]) == -1) {
1276 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1277 argv[1], strerror(errno));
1278 exit(1);
1279 }
1280 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1281 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1282 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1283 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1284 else {
1285 err = "Invalid log level. Must be one of debug, notice, warning";
1286 goto loaderr;
1287 }
1288 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1289 FILE *logfp;
1290
1291 server.logfile = zstrdup(argv[1]);
1292 if (!strcasecmp(server.logfile,"stdout")) {
1293 zfree(server.logfile);
1294 server.logfile = NULL;
1295 }
1296 if (server.logfile) {
1297 /* Test if we are able to open the file. The server will not
1298 * be able to abort just for this problem later... */
1299 logfp = fopen(server.logfile,"a");
1300 if (logfp == NULL) {
1301 err = sdscatprintf(sdsempty(),
1302 "Can't open the log file: %s", strerror(errno));
1303 goto loaderr;
1304 }
1305 fclose(logfp);
1306 }
1307 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1308 server.dbnum = atoi(argv[1]);
1309 if (server.dbnum < 1) {
1310 err = "Invalid number of databases"; goto loaderr;
1311 }
1312 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1313 server.maxclients = atoi(argv[1]);
1314 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1315 server.maxmemory = strtoll(argv[1], NULL, 10);
1316 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1317 server.masterhost = sdsnew(argv[1]);
1318 server.masterport = atoi(argv[2]);
1319 server.replstate = REDIS_REPL_CONNECT;
1320 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1321 server.masterauth = zstrdup(argv[1]);
1322 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1323 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1324 err = "argument must be 'yes' or 'no'"; goto loaderr;
1325 }
1326 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1327 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1328 err = "argument must be 'yes' or 'no'"; goto loaderr;
1329 }
1330 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1331 server.sharingpoolsize = atoi(argv[1]);
1332 if (server.sharingpoolsize < 1) {
1333 err = "invalid object sharing pool size"; goto loaderr;
1334 }
1335 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1336 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1337 err = "argument must be 'yes' or 'no'"; goto loaderr;
1338 }
1339 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1340 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1341 err = "argument must be 'yes' or 'no'"; goto loaderr;
1342 }
1343 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1344 if (!strcasecmp(argv[1],"no")) {
1345 server.appendfsync = APPENDFSYNC_NO;
1346 } else if (!strcasecmp(argv[1],"always")) {
1347 server.appendfsync = APPENDFSYNC_ALWAYS;
1348 } else if (!strcasecmp(argv[1],"everysec")) {
1349 server.appendfsync = APPENDFSYNC_EVERYSEC;
1350 } else {
1351 err = "argument must be 'no', 'always' or 'everysec'";
1352 goto loaderr;
1353 }
1354 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1355 server.requirepass = zstrdup(argv[1]);
1356 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1357 server.pidfile = zstrdup(argv[1]);
1358 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1359 server.dbfilename = zstrdup(argv[1]);
1360 } else {
1361 err = "Bad directive or wrong number of arguments"; goto loaderr;
1362 }
1363 for (j = 0; j < argc; j++)
1364 sdsfree(argv[j]);
1365 zfree(argv);
1366 sdsfree(line);
1367 }
1368 if (fp != stdin) fclose(fp);
1369 return;
1370
1371 loaderr:
1372 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1373 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1374 fprintf(stderr, ">>> '%s'\n", line);
1375 fprintf(stderr, "%s\n", err);
1376 exit(1);
1377 }
1378
1379 static void freeClientArgv(redisClient *c) {
1380 int j;
1381
1382 for (j = 0; j < c->argc; j++)
1383 decrRefCount(c->argv[j]);
1384 for (j = 0; j < c->mbargc; j++)
1385 decrRefCount(c->mbargv[j]);
1386 c->argc = 0;
1387 c->mbargc = 0;
1388 }
1389
1390 static void freeClient(redisClient *c) {
1391 listNode *ln;
1392
1393 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1394 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1395 sdsfree(c->querybuf);
1396 listRelease(c->reply);
1397 freeClientArgv(c);
1398 close(c->fd);
1399 ln = listSearchKey(server.clients,c);
1400 assert(ln != NULL);
1401 listDelNode(server.clients,ln);
1402 if (c->flags & REDIS_SLAVE) {
1403 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1404 close(c->repldbfd);
1405 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1406 ln = listSearchKey(l,c);
1407 assert(ln != NULL);
1408 listDelNode(l,ln);
1409 }
1410 if (c->flags & REDIS_MASTER) {
1411 server.master = NULL;
1412 server.replstate = REDIS_REPL_CONNECT;
1413 }
1414 zfree(c->argv);
1415 zfree(c->mbargv);
1416 zfree(c);
1417 }
1418
1419 #define GLUEREPLY_UP_TO (1024)
1420 static void glueReplyBuffersIfNeeded(redisClient *c) {
1421 int copylen = 0;
1422 char buf[GLUEREPLY_UP_TO];
1423 listNode *ln;
1424 robj *o;
1425
1426 listRewind(c->reply);
1427 while((ln = listYield(c->reply))) {
1428 int objlen;
1429
1430 o = ln->value;
1431 objlen = sdslen(o->ptr);
1432 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1433 memcpy(buf+copylen,o->ptr,objlen);
1434 copylen += objlen;
1435 listDelNode(c->reply,ln);
1436 } else {
1437 if (copylen == 0) return;
1438 break;
1439 }
1440 }
1441 /* Now the output buffer is empty, add the new single element */
1442 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1443 listAddNodeHead(c->reply,o);
1444 }
1445
1446 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1447 redisClient *c = privdata;
1448 int nwritten = 0, totwritten = 0, objlen;
1449 robj *o;
1450 REDIS_NOTUSED(el);
1451 REDIS_NOTUSED(mask);
1452
1453 /* Use writev() if we have enough buffers to send */
1454 if (!server.glueoutputbuf &&
1455 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1456 !(c->flags & REDIS_MASTER))
1457 {
1458 sendReplyToClientWritev(el, fd, privdata, mask);
1459 return;
1460 }
1461
1462 while(listLength(c->reply)) {
1463 if (server.glueoutputbuf && listLength(c->reply) > 1)
1464 glueReplyBuffersIfNeeded(c);
1465
1466 o = listNodeValue(listFirst(c->reply));
1467 objlen = sdslen(o->ptr);
1468
1469 if (objlen == 0) {
1470 listDelNode(c->reply,listFirst(c->reply));
1471 continue;
1472 }
1473
1474 if (c->flags & REDIS_MASTER) {
1475 /* Don't reply to a master */
1476 nwritten = objlen - c->sentlen;
1477 } else {
1478 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1479 if (nwritten <= 0) break;
1480 }
1481 c->sentlen += nwritten;
1482 totwritten += nwritten;
1483 /* If we fully sent the object on head go to the next one */
1484 if (c->sentlen == objlen) {
1485 listDelNode(c->reply,listFirst(c->reply));
1486 c->sentlen = 0;
1487 }
1488 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1489 * bytes, in a single threaded server it's a good idea to serve
1490 * other clients as well, even if a very large request comes from
1491 * super fast link that is always able to accept data (in real world
1492 * scenario think about 'KEYS *' against the loopback interfae) */
1493 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1494 }
1495 if (nwritten == -1) {
1496 if (errno == EAGAIN) {
1497 nwritten = 0;
1498 } else {
1499 redisLog(REDIS_DEBUG,
1500 "Error writing to client: %s", strerror(errno));
1501 freeClient(c);
1502 return;
1503 }
1504 }
1505 if (totwritten > 0) c->lastinteraction = time(NULL);
1506 if (listLength(c->reply) == 0) {
1507 c->sentlen = 0;
1508 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1509 }
1510 }
1511
1512 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1513 {
1514 redisClient *c = privdata;
1515 int nwritten = 0, totwritten = 0, objlen, willwrite;
1516 robj *o;
1517 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1518 int offset, ion = 0;
1519 REDIS_NOTUSED(el);
1520 REDIS_NOTUSED(mask);
1521
1522 listNode *node;
1523 while (listLength(c->reply)) {
1524 offset = c->sentlen;
1525 ion = 0;
1526 willwrite = 0;
1527
1528 /* fill-in the iov[] array */
1529 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1530 o = listNodeValue(node);
1531 objlen = sdslen(o->ptr);
1532
1533 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1534 break;
1535
1536 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1537 break; /* no more iovecs */
1538
1539 iov[ion].iov_base = ((char*)o->ptr) + offset;
1540 iov[ion].iov_len = objlen - offset;
1541 willwrite += objlen - offset;
1542 offset = 0; /* just for the first item */
1543 ion++;
1544 }
1545
1546 if(willwrite == 0)
1547 break;
1548
1549 /* write all collected blocks at once */
1550 if((nwritten = writev(fd, iov, ion)) < 0) {
1551 if (errno != EAGAIN) {
1552 redisLog(REDIS_DEBUG,
1553 "Error writing to client: %s", strerror(errno));
1554 freeClient(c);
1555 return;
1556 }
1557 break;
1558 }
1559
1560 totwritten += nwritten;
1561 offset = c->sentlen;
1562
1563 /* remove written robjs from c->reply */
1564 while (nwritten && listLength(c->reply)) {
1565 o = listNodeValue(listFirst(c->reply));
1566 objlen = sdslen(o->ptr);
1567
1568 if(nwritten >= objlen - offset) {
1569 listDelNode(c->reply, listFirst(c->reply));
1570 nwritten -= objlen - offset;
1571 c->sentlen = 0;
1572 } else {
1573 /* partial write */
1574 c->sentlen += nwritten;
1575 break;
1576 }
1577 offset = 0;
1578 }
1579 }
1580
1581 if (totwritten > 0)
1582 c->lastinteraction = time(NULL);
1583
1584 if (listLength(c->reply) == 0) {
1585 c->sentlen = 0;
1586 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1587 }
1588 }
1589
1590 static struct redisCommand *lookupCommand(char *name) {
1591 int j = 0;
1592 while(cmdTable[j].name != NULL) {
1593 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1594 j++;
1595 }
1596 return NULL;
1597 }
1598
1599 /* resetClient prepare the client to process the next command */
1600 static void resetClient(redisClient *c) {
1601 freeClientArgv(c);
1602 c->bulklen = -1;
1603 c->multibulk = 0;
1604 }
1605
1606 /* If this function gets called we already read a whole
1607 * command, argments are in the client argv/argc fields.
1608 * processCommand() execute the command or prepare the
1609 * server for a bulk read from the client.
1610 *
1611 * If 1 is returned the client is still alive and valid and
1612 * and other operations can be performed by the caller. Otherwise
1613 * if 0 is returned the client was destroied (i.e. after QUIT). */
1614 static int processCommand(redisClient *c) {
1615 struct redisCommand *cmd;
1616 long long dirty;
1617
1618 /* Free some memory if needed (maxmemory setting) */
1619 if (server.maxmemory) freeMemoryIfNeeded();
1620
1621 /* Handle the multi bulk command type. This is an alternative protocol
1622 * supported by Redis in order to receive commands that are composed of
1623 * multiple binary-safe "bulk" arguments. The latency of processing is
1624 * a bit higher but this allows things like multi-sets, so if this
1625 * protocol is used only for MSET and similar commands this is a big win. */
1626 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1627 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1628 if (c->multibulk <= 0) {
1629 resetClient(c);
1630 return 1;
1631 } else {
1632 decrRefCount(c->argv[c->argc-1]);
1633 c->argc--;
1634 return 1;
1635 }
1636 } else if (c->multibulk) {
1637 if (c->bulklen == -1) {
1638 if (((char*)c->argv[0]->ptr)[0] != '$') {
1639 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1640 resetClient(c);
1641 return 1;
1642 } else {
1643 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1644 decrRefCount(c->argv[0]);
1645 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1646 c->argc--;
1647 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1648 resetClient(c);
1649 return 1;
1650 }
1651 c->argc--;
1652 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1653 return 1;
1654 }
1655 } else {
1656 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1657 c->mbargv[c->mbargc] = c->argv[0];
1658 c->mbargc++;
1659 c->argc--;
1660 c->multibulk--;
1661 if (c->multibulk == 0) {
1662 robj **auxargv;
1663 int auxargc;
1664
1665 /* Here we need to swap the multi-bulk argc/argv with the
1666 * normal argc/argv of the client structure. */
1667 auxargv = c->argv;
1668 c->argv = c->mbargv;
1669 c->mbargv = auxargv;
1670
1671 auxargc = c->argc;
1672 c->argc = c->mbargc;
1673 c->mbargc = auxargc;
1674
1675 /* We need to set bulklen to something different than -1
1676 * in order for the code below to process the command without
1677 * to try to read the last argument of a bulk command as
1678 * a special argument. */
1679 c->bulklen = 0;
1680 /* continue below and process the command */
1681 } else {
1682 c->bulklen = -1;
1683 return 1;
1684 }
1685 }
1686 }
1687 /* -- end of multi bulk commands processing -- */
1688
1689 /* The QUIT command is handled as a special case. Normal command
1690 * procs are unable to close the client connection safely */
1691 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1692 freeClient(c);
1693 return 0;
1694 }
1695 cmd = lookupCommand(c->argv[0]->ptr);
1696 if (!cmd) {
1697 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1698 resetClient(c);
1699 return 1;
1700 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1701 (c->argc < -cmd->arity)) {
1702 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1703 resetClient(c);
1704 return 1;
1705 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1706 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1707 resetClient(c);
1708 return 1;
1709 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1710 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1711
1712 decrRefCount(c->argv[c->argc-1]);
1713 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1714 c->argc--;
1715 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1716 resetClient(c);
1717 return 1;
1718 }
1719 c->argc--;
1720 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1721 /* It is possible that the bulk read is already in the
1722 * buffer. Check this condition and handle it accordingly.
1723 * This is just a fast path, alternative to call processInputBuffer().
1724 * It's a good idea since the code is small and this condition
1725 * happens most of the times. */
1726 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1727 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1728 c->argc++;
1729 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1730 } else {
1731 return 1;
1732 }
1733 }
1734 /* Let's try to share objects on the command arguments vector */
1735 if (server.shareobjects) {
1736 int j;
1737 for(j = 1; j < c->argc; j++)
1738 c->argv[j] = tryObjectSharing(c->argv[j]);
1739 }
1740 /* Let's try to encode the bulk object to save space. */
1741 if (cmd->flags & REDIS_CMD_BULK)
1742 tryObjectEncoding(c->argv[c->argc-1]);
1743
1744 /* Check if the user is authenticated */
1745 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1746 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1747 resetClient(c);
1748 return 1;
1749 }
1750
1751 /* Exec the command */
1752 dirty = server.dirty;
1753 cmd->proc(c);
1754 if (server.appendonly && server.dirty-dirty)
1755 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1756 if (server.dirty-dirty && listLength(server.slaves))
1757 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1758 if (listLength(server.monitors))
1759 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1760 server.stat_numcommands++;
1761
1762 /* Prepare the client for the next command */
1763 if (c->flags & REDIS_CLOSE) {
1764 freeClient(c);
1765 return 0;
1766 }
1767 resetClient(c);
1768 return 1;
1769 }
1770
1771 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1772 listNode *ln;
1773 int outc = 0, j;
1774 robj **outv;
1775 /* (args*2)+1 is enough room for args, spaces, newlines */
1776 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1777
1778 if (argc <= REDIS_STATIC_ARGS) {
1779 outv = static_outv;
1780 } else {
1781 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1782 }
1783
1784 for (j = 0; j < argc; j++) {
1785 if (j != 0) outv[outc++] = shared.space;
1786 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1787 robj *lenobj;
1788
1789 lenobj = createObject(REDIS_STRING,
1790 sdscatprintf(sdsempty(),"%d\r\n",
1791 stringObjectLen(argv[j])));
1792 lenobj->refcount = 0;
1793 outv[outc++] = lenobj;
1794 }
1795 outv[outc++] = argv[j];
1796 }
1797 outv[outc++] = shared.crlf;
1798
1799 /* Increment all the refcounts at start and decrement at end in order to
1800 * be sure to free objects if there is no slave in a replication state
1801 * able to be feed with commands */
1802 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1803 listRewind(slaves);
1804 while((ln = listYield(slaves))) {
1805 redisClient *slave = ln->value;
1806
1807 /* Don't feed slaves that are still waiting for BGSAVE to start */
1808 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1809
1810 /* Feed all the other slaves, MONITORs and so on */
1811 if (slave->slaveseldb != dictid) {
1812 robj *selectcmd;
1813
1814 switch(dictid) {
1815 case 0: selectcmd = shared.select0; break;
1816 case 1: selectcmd = shared.select1; break;
1817 case 2: selectcmd = shared.select2; break;
1818 case 3: selectcmd = shared.select3; break;
1819 case 4: selectcmd = shared.select4; break;
1820 case 5: selectcmd = shared.select5; break;
1821 case 6: selectcmd = shared.select6; break;
1822 case 7: selectcmd = shared.select7; break;
1823 case 8: selectcmd = shared.select8; break;
1824 case 9: selectcmd = shared.select9; break;
1825 default:
1826 selectcmd = createObject(REDIS_STRING,
1827 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1828 selectcmd->refcount = 0;
1829 break;
1830 }
1831 addReply(slave,selectcmd);
1832 slave->slaveseldb = dictid;
1833 }
1834 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1835 }
1836 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1837 if (outv != static_outv) zfree(outv);
1838 }
1839
1840 static void processInputBuffer(redisClient *c) {
1841 again:
1842 if (c->bulklen == -1) {
1843 /* Read the first line of the query */
1844 char *p = strchr(c->querybuf,'\n');
1845 size_t querylen;
1846
1847 if (p) {
1848 sds query, *argv;
1849 int argc, j;
1850
1851 query = c->querybuf;
1852 c->querybuf = sdsempty();
1853 querylen = 1+(p-(query));
1854 if (sdslen(query) > querylen) {
1855 /* leave data after the first line of the query in the buffer */
1856 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1857 }
1858 *p = '\0'; /* remove "\n" */
1859 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1860 sdsupdatelen(query);
1861
1862 /* Now we can split the query in arguments */
1863 if (sdslen(query) == 0) {
1864 /* Ignore empty query */
1865 sdsfree(query);
1866 return;
1867 }
1868 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1869 sdsfree(query);
1870
1871 if (c->argv) zfree(c->argv);
1872 c->argv = zmalloc(sizeof(robj*)*argc);
1873
1874 for (j = 0; j < argc; j++) {
1875 if (sdslen(argv[j])) {
1876 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1877 c->argc++;
1878 } else {
1879 sdsfree(argv[j]);
1880 }
1881 }
1882 zfree(argv);
1883 /* Execute the command. If the client is still valid
1884 * after processCommand() return and there is something
1885 * on the query buffer try to process the next command. */
1886 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1887 return;
1888 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1889 redisLog(REDIS_DEBUG, "Client protocol error");
1890 freeClient(c);
1891 return;
1892 }
1893 } else {
1894 /* Bulk read handling. Note that if we are at this point
1895 the client already sent a command terminated with a newline,
1896 we are reading the bulk data that is actually the last
1897 argument of the command. */
1898 int qbl = sdslen(c->querybuf);
1899
1900 if (c->bulklen <= qbl) {
1901 /* Copy everything but the final CRLF as final argument */
1902 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1903 c->argc++;
1904 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1905 /* Process the command. If the client is still valid after
1906 * the processing and there is more data in the buffer
1907 * try to parse it. */
1908 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1909 return;
1910 }
1911 }
1912 }
1913
1914 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1915 redisClient *c = (redisClient*) privdata;
1916 char buf[REDIS_IOBUF_LEN];
1917 int nread;
1918 REDIS_NOTUSED(el);
1919 REDIS_NOTUSED(mask);
1920
1921 nread = read(fd, buf, REDIS_IOBUF_LEN);
1922 if (nread == -1) {
1923 if (errno == EAGAIN) {
1924 nread = 0;
1925 } else {
1926 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1927 freeClient(c);
1928 return;
1929 }
1930 } else if (nread == 0) {
1931 redisLog(REDIS_DEBUG, "Client closed connection");
1932 freeClient(c);
1933 return;
1934 }
1935 if (nread) {
1936 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1937 c->lastinteraction = time(NULL);
1938 } else {
1939 return;
1940 }
1941 processInputBuffer(c);
1942 }
1943
1944 static int selectDb(redisClient *c, int id) {
1945 if (id < 0 || id >= server.dbnum)
1946 return REDIS_ERR;
1947 c->db = &server.db[id];
1948 return REDIS_OK;
1949 }
1950
1951 static void *dupClientReplyValue(void *o) {
1952 incrRefCount((robj*)o);
1953 return 0;
1954 }
1955
1956 static redisClient *createClient(int fd) {
1957 redisClient *c = zmalloc(sizeof(*c));
1958
1959 anetNonBlock(NULL,fd);
1960 anetTcpNoDelay(NULL,fd);
1961 if (!c) return NULL;
1962 selectDb(c,0);
1963 c->fd = fd;
1964 c->querybuf = sdsempty();
1965 c->argc = 0;
1966 c->argv = NULL;
1967 c->bulklen = -1;
1968 c->multibulk = 0;
1969 c->mbargc = 0;
1970 c->mbargv = NULL;
1971 c->sentlen = 0;
1972 c->flags = 0;
1973 c->lastinteraction = time(NULL);
1974 c->authenticated = 0;
1975 c->replstate = REDIS_REPL_NONE;
1976 c->reply = listCreate();
1977 listSetFreeMethod(c->reply,decrRefCount);
1978 listSetDupMethod(c->reply,dupClientReplyValue);
1979 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1980 readQueryFromClient, c) == AE_ERR) {
1981 freeClient(c);
1982 return NULL;
1983 }
1984 listAddNodeTail(server.clients,c);
1985 return c;
1986 }
1987
1988 static void addReply(redisClient *c, robj *obj) {
1989 if (listLength(c->reply) == 0 &&
1990 (c->replstate == REDIS_REPL_NONE ||
1991 c->replstate == REDIS_REPL_ONLINE) &&
1992 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1993 sendReplyToClient, c) == AE_ERR) return;
1994 listAddNodeTail(c->reply,getDecodedObject(obj));
1995 }
1996
1997 static void addReplySds(redisClient *c, sds s) {
1998 robj *o = createObject(REDIS_STRING,s);
1999 addReply(c,o);
2000 decrRefCount(o);
2001 }
2002
2003 static void addReplyDouble(redisClient *c, double d) {
2004 char buf[128];
2005
2006 snprintf(buf,sizeof(buf),"%.17g",d);
2007 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
2008 strlen(buf),buf));
2009 }
2010
2011 static void addReplyBulkLen(redisClient *c, robj *obj) {
2012 size_t len;
2013
2014 if (obj->encoding == REDIS_ENCODING_RAW) {
2015 len = sdslen(obj->ptr);
2016 } else {
2017 long n = (long)obj->ptr;
2018
2019 len = 1;
2020 if (n < 0) {
2021 len++;
2022 n = -n;
2023 }
2024 while((n = n/10) != 0) {
2025 len++;
2026 }
2027 }
2028 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
2029 }
2030
2031 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2032 int cport, cfd;
2033 char cip[128];
2034 redisClient *c;
2035 REDIS_NOTUSED(el);
2036 REDIS_NOTUSED(mask);
2037 REDIS_NOTUSED(privdata);
2038
2039 cfd = anetAccept(server.neterr, fd, cip, &cport);
2040 if (cfd == AE_ERR) {
2041 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2042 return;
2043 }
2044 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2045 if ((c = createClient(cfd)) == NULL) {
2046 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2047 close(cfd); /* May be already closed, just ingore errors */
2048 return;
2049 }
2050 /* If maxclient directive is set and this is one client more... close the
2051 * connection. Note that we create the client instead to check before
2052 * for this condition, since now the socket is already set in nonblocking
2053 * mode and we can send an error for free using the Kernel I/O */
2054 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2055 char *err = "-ERR max number of clients reached\r\n";
2056
2057 /* That's a best effort error message, don't check write errors */
2058 if (write(c->fd,err,strlen(err)) == -1) {
2059 /* Nothing to do, Just to avoid the warning... */
2060 }
2061 freeClient(c);
2062 return;
2063 }
2064 server.stat_numconnections++;
2065 }
2066
2067 /* ======================= Redis objects implementation ===================== */
2068
2069 static robj *createObject(int type, void *ptr) {
2070 robj *o;
2071
2072 if (listLength(server.objfreelist)) {
2073 listNode *head = listFirst(server.objfreelist);
2074 o = listNodeValue(head);
2075 listDelNode(server.objfreelist,head);
2076 } else {
2077 o = zmalloc(sizeof(*o));
2078 }
2079 o->type = type;
2080 o->encoding = REDIS_ENCODING_RAW;
2081 o->ptr = ptr;
2082 o->refcount = 1;
2083 return o;
2084 }
2085
2086 static robj *createStringObject(char *ptr, size_t len) {
2087 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2088 }
2089
2090 static robj *createListObject(void) {
2091 list *l = listCreate();
2092
2093 listSetFreeMethod(l,decrRefCount);
2094 return createObject(REDIS_LIST,l);
2095 }
2096
2097 static robj *createSetObject(void) {
2098 dict *d = dictCreate(&setDictType,NULL);
2099 return createObject(REDIS_SET,d);
2100 }
2101
2102 static robj *createZsetObject(void) {
2103 zset *zs = zmalloc(sizeof(*zs));
2104
2105 zs->dict = dictCreate(&zsetDictType,NULL);
2106 zs->zsl = zslCreate();
2107 return createObject(REDIS_ZSET,zs);
2108 }
2109
2110 static void freeStringObject(robj *o) {
2111 if (o->encoding == REDIS_ENCODING_RAW) {
2112 sdsfree(o->ptr);
2113 }
2114 }
2115
2116 static void freeListObject(robj *o) {
2117 listRelease((list*) o->ptr);
2118 }
2119
2120 static void freeSetObject(robj *o) {
2121 dictRelease((dict*) o->ptr);
2122 }
2123
2124 static void freeZsetObject(robj *o) {
2125 zset *zs = o->ptr;
2126
2127 dictRelease(zs->dict);
2128 zslFree(zs->zsl);
2129 zfree(zs);
2130 }
2131
2132 static void freeHashObject(robj *o) {
2133 dictRelease((dict*) o->ptr);
2134 }
2135
2136 static void incrRefCount(robj *o) {
2137 o->refcount++;
2138 #ifdef DEBUG_REFCOUNT
2139 if (o->type == REDIS_STRING)
2140 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2141 #endif
2142 }
2143
2144 static void decrRefCount(void *obj) {
2145 robj *o = obj;
2146
2147 #ifdef DEBUG_REFCOUNT
2148 if (o->type == REDIS_STRING)
2149 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2150 #endif
2151 if (--(o->refcount) == 0) {
2152 switch(o->type) {
2153 case REDIS_STRING: freeStringObject(o); break;
2154 case REDIS_LIST: freeListObject(o); break;
2155 case REDIS_SET: freeSetObject(o); break;
2156 case REDIS_ZSET: freeZsetObject(o); break;
2157 case REDIS_HASH: freeHashObject(o); break;
2158 default: assert(0 != 0); break;
2159 }
2160 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2161 !listAddNodeHead(server.objfreelist,o))
2162 zfree(o);
2163 }
2164 }
2165
2166 static robj *lookupKey(redisDb *db, robj *key) {
2167 dictEntry *de = dictFind(db->dict,key);
2168 return de ? dictGetEntryVal(de) : NULL;
2169 }
2170
2171 static robj *lookupKeyRead(redisDb *db, robj *key) {
2172 expireIfNeeded(db,key);
2173 return lookupKey(db,key);
2174 }
2175
2176 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2177 deleteIfVolatile(db,key);
2178 return lookupKey(db,key);
2179 }
2180
2181 static int deleteKey(redisDb *db, robj *key) {
2182 int retval;
2183
2184 /* We need to protect key from destruction: after the first dictDelete()
2185 * it may happen that 'key' is no longer valid if we don't increment
2186 * it's count. This may happen when we get the object reference directly
2187 * from the hash table with dictRandomKey() or dict iterators */
2188 incrRefCount(key);
2189 if (dictSize(db->expires)) dictDelete(db->expires,key);
2190 retval = dictDelete(db->dict,key);
2191 decrRefCount(key);
2192
2193 return retval == DICT_OK;
2194 }
2195
2196 /* Try to share an object against the shared objects pool */
2197 static robj *tryObjectSharing(robj *o) {
2198 struct dictEntry *de;
2199 unsigned long c;
2200
2201 if (o == NULL || server.shareobjects == 0) return o;
2202
2203 assert(o->type == REDIS_STRING);
2204 de = dictFind(server.sharingpool,o);
2205 if (de) {
2206 robj *shared = dictGetEntryKey(de);
2207
2208 c = ((unsigned long) dictGetEntryVal(de))+1;
2209 dictGetEntryVal(de) = (void*) c;
2210 incrRefCount(shared);
2211 decrRefCount(o);
2212 return shared;
2213 } else {
2214 /* Here we are using a stream algorihtm: Every time an object is
2215 * shared we increment its count, everytime there is a miss we
2216 * recrement the counter of a random object. If this object reaches
2217 * zero we remove the object and put the current object instead. */
2218 if (dictSize(server.sharingpool) >=
2219 server.sharingpoolsize) {
2220 de = dictGetRandomKey(server.sharingpool);
2221 assert(de != NULL);
2222 c = ((unsigned long) dictGetEntryVal(de))-1;
2223 dictGetEntryVal(de) = (void*) c;
2224 if (c == 0) {
2225 dictDelete(server.sharingpool,de->key);
2226 }
2227 } else {
2228 c = 0; /* If the pool is empty we want to add this object */
2229 }
2230 if (c == 0) {
2231 int retval;
2232
2233 retval = dictAdd(server.sharingpool,o,(void*)1);
2234 assert(retval == DICT_OK);
2235 incrRefCount(o);
2236 }
2237 return o;
2238 }
2239 }
2240
2241 /* Check if the nul-terminated string 's' can be represented by a long
2242 * (that is, is a number that fits into long without any other space or
2243 * character before or after the digits).
2244 *
2245 * If so, the function returns REDIS_OK and *longval is set to the value
2246 * of the number. Otherwise REDIS_ERR is returned */
2247 static int isStringRepresentableAsLong(sds s, long *longval) {
2248 char buf[32], *endptr;
2249 long value;
2250 int slen;
2251
2252 value = strtol(s, &endptr, 10);
2253 if (endptr[0] != '\0') return REDIS_ERR;
2254 slen = snprintf(buf,32,"%ld",value);
2255
2256 /* If the number converted back into a string is not identical
2257 * then it's not possible to encode the string as integer */
2258 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2259 if (longval) *longval = value;
2260 return REDIS_OK;
2261 }
2262
2263 /* Try to encode a string object in order to save space */
2264 static int tryObjectEncoding(robj *o) {
2265 long value;
2266 sds s = o->ptr;
2267
2268 if (o->encoding != REDIS_ENCODING_RAW)
2269 return REDIS_ERR; /* Already encoded */
2270
2271 /* It's not save to encode shared objects: shared objects can be shared
2272 * everywhere in the "object space" of Redis. Encoded objects can only
2273 * appear as "values" (and not, for instance, as keys) */
2274 if (o->refcount > 1) return REDIS_ERR;
2275
2276 /* Currently we try to encode only strings */
2277 assert(o->type == REDIS_STRING);
2278
2279 /* Check if we can represent this string as a long integer */
2280 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2281
2282 /* Ok, this object can be encoded */
2283 o->encoding = REDIS_ENCODING_INT;
2284 sdsfree(o->ptr);
2285 o->ptr = (void*) value;
2286 return REDIS_OK;
2287 }
2288
2289 /* Get a decoded version of an encoded object (returned as a new object).
2290 * If the object is already raw-encoded just increment the ref count. */
2291 static robj *getDecodedObject(robj *o) {
2292 robj *dec;
2293
2294 if (o->encoding == REDIS_ENCODING_RAW) {
2295 incrRefCount(o);
2296 return o;
2297 }
2298 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2299 char buf[32];
2300
2301 snprintf(buf,32,"%ld",(long)o->ptr);
2302 dec = createStringObject(buf,strlen(buf));
2303 return dec;
2304 } else {
2305 assert(1 != 1);
2306 }
2307 }
2308
2309 /* Compare two string objects via strcmp() or alike.
2310 * Note that the objects may be integer-encoded. In such a case we
2311 * use snprintf() to get a string representation of the numbers on the stack
2312 * and compare the strings, it's much faster than calling getDecodedObject().
2313 *
2314 * Important note: if objects are not integer encoded, but binary-safe strings,
2315 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2316 * binary safe. */
2317 static int compareStringObjects(robj *a, robj *b) {
2318 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2319 char bufa[128], bufb[128], *astr, *bstr;
2320 int bothsds = 1;
2321
2322 if (a == b) return 0;
2323 if (a->encoding != REDIS_ENCODING_RAW) {
2324 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2325 astr = bufa;
2326 bothsds = 0;
2327 } else {
2328 astr = a->ptr;
2329 }
2330 if (b->encoding != REDIS_ENCODING_RAW) {
2331 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2332 bstr = bufb;
2333 bothsds = 0;
2334 } else {
2335 bstr = b->ptr;
2336 }
2337 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2338 }
2339
2340 static size_t stringObjectLen(robj *o) {
2341 assert(o->type == REDIS_STRING);
2342 if (o->encoding == REDIS_ENCODING_RAW) {
2343 return sdslen(o->ptr);
2344 } else {
2345 char buf[32];
2346
2347 return snprintf(buf,32,"%ld",(long)o->ptr);
2348 }
2349 }
2350
2351 /*============================ DB saving/loading ============================ */
2352
2353 static int rdbSaveType(FILE *fp, unsigned char type) {
2354 if (fwrite(&type,1,1,fp) == 0) return -1;
2355 return 0;
2356 }
2357
2358 static int rdbSaveTime(FILE *fp, time_t t) {
2359 int32_t t32 = (int32_t) t;
2360 if (fwrite(&t32,4,1,fp) == 0) return -1;
2361 return 0;
2362 }
2363
2364 /* check rdbLoadLen() comments for more info */
2365 static int rdbSaveLen(FILE *fp, uint32_t len) {
2366 unsigned char buf[2];
2367
2368 if (len < (1<<6)) {
2369 /* Save a 6 bit len */
2370 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2371 if (fwrite(buf,1,1,fp) == 0) return -1;
2372 } else if (len < (1<<14)) {
2373 /* Save a 14 bit len */
2374 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2375 buf[1] = len&0xFF;
2376 if (fwrite(buf,2,1,fp) == 0) return -1;
2377 } else {
2378 /* Save a 32 bit len */
2379 buf[0] = (REDIS_RDB_32BITLEN<<6);
2380 if (fwrite(buf,1,1,fp) == 0) return -1;
2381 len = htonl(len);
2382 if (fwrite(&len,4,1,fp) == 0) return -1;
2383 }
2384 return 0;
2385 }
2386
2387 /* String objects in the form "2391" "-100" without any space and with a
2388 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2389 * encoded as integers to save space */
2390 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2391 long long value;
2392 char *endptr, buf[32];
2393
2394 /* Check if it's possible to encode this value as a number */
2395 value = strtoll(s, &endptr, 10);
2396 if (endptr[0] != '\0') return 0;
2397 snprintf(buf,32,"%lld",value);
2398
2399 /* If the number converted back into a string is not identical
2400 * then it's not possible to encode the string as integer */
2401 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2402
2403 /* Finally check if it fits in our ranges */
2404 if (value >= -(1<<7) && value <= (1<<7)-1) {
2405 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2406 enc[1] = value&0xFF;
2407 return 2;
2408 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2409 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2410 enc[1] = value&0xFF;
2411 enc[2] = (value>>8)&0xFF;
2412 return 3;
2413 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2414 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2415 enc[1] = value&0xFF;
2416 enc[2] = (value>>8)&0xFF;
2417 enc[3] = (value>>16)&0xFF;
2418 enc[4] = (value>>24)&0xFF;
2419 return 5;
2420 } else {
2421 return 0;
2422 }
2423 }
2424
2425 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2426 unsigned int comprlen, outlen;
2427 unsigned char byte;
2428 void *out;
2429
2430 /* We require at least four bytes compression for this to be worth it */
2431 outlen = sdslen(obj->ptr)-4;
2432 if (outlen <= 0) return 0;
2433 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2434 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2435 if (comprlen == 0) {
2436 zfree(out);
2437 return 0;
2438 }
2439 /* Data compressed! Let's save it on disk */
2440 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2441 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2442 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2443 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2444 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2445 zfree(out);
2446 return comprlen;
2447
2448 writeerr:
2449 zfree(out);
2450 return -1;
2451 }
2452
2453 /* Save a string objet as [len][data] on disk. If the object is a string
2454 * representation of an integer value we try to safe it in a special form */
2455 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2456 size_t len;
2457 int enclen;
2458
2459 len = sdslen(obj->ptr);
2460
2461 /* Try integer encoding */
2462 if (len <= 11) {
2463 unsigned char buf[5];
2464 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2465 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2466 return 0;
2467 }
2468 }
2469
2470 /* Try LZF compression - under 20 bytes it's unable to compress even
2471 * aaaaaaaaaaaaaaaaaa so skip it */
2472 if (len > 20) {
2473 int retval;
2474
2475 retval = rdbSaveLzfStringObject(fp,obj);
2476 if (retval == -1) return -1;
2477 if (retval > 0) return 0;
2478 /* retval == 0 means data can't be compressed, save the old way */
2479 }
2480
2481 /* Store verbatim */
2482 if (rdbSaveLen(fp,len) == -1) return -1;
2483 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2484 return 0;
2485 }
2486
2487 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2488 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2489 int retval;
2490
2491 obj = getDecodedObject(obj);
2492 retval = rdbSaveStringObjectRaw(fp,obj);
2493 decrRefCount(obj);
2494 return retval;
2495 }
2496
2497 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2498 * 8 bit integer specifing the length of the representation.
2499 * This 8 bit integer has special values in order to specify the following
2500 * conditions:
2501 * 253: not a number
2502 * 254: + inf
2503 * 255: - inf
2504 */
2505 static int rdbSaveDoubleValue(FILE *fp, double val) {
2506 unsigned char buf[128];
2507 int len;
2508
2509 if (isnan(val)) {
2510 buf[0] = 253;
2511 len = 1;
2512 } else if (!isfinite(val)) {
2513 len = 1;
2514 buf[0] = (val < 0) ? 255 : 254;
2515 } else {
2516 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2517 buf[0] = strlen((char*)buf+1);
2518 len = buf[0]+1;
2519 }
2520 if (fwrite(buf,len,1,fp) == 0) return -1;
2521 return 0;
2522 }
2523
2524 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2525 static int rdbSave(char *filename) {
2526 dictIterator *di = NULL;
2527 dictEntry *de;
2528 FILE *fp;
2529 char tmpfile[256];
2530 int j;
2531 time_t now = time(NULL);
2532
2533 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2534 fp = fopen(tmpfile,"w");
2535 if (!fp) {
2536 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2537 return REDIS_ERR;
2538 }
2539 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2540 for (j = 0; j < server.dbnum; j++) {
2541 redisDb *db = server.db+j;
2542 dict *d = db->dict;
2543 if (dictSize(d) == 0) continue;
2544 di = dictGetIterator(d);
2545 if (!di) {
2546 fclose(fp);
2547 return REDIS_ERR;
2548 }
2549
2550 /* Write the SELECT DB opcode */
2551 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2552 if (rdbSaveLen(fp,j) == -1) goto werr;
2553
2554 /* Iterate this DB writing every entry */
2555 while((de = dictNext(di)) != NULL) {
2556 robj *key = dictGetEntryKey(de);
2557 robj *o = dictGetEntryVal(de);
2558 time_t expiretime = getExpire(db,key);
2559
2560 /* Save the expire time */
2561 if (expiretime != -1) {
2562 /* If this key is already expired skip it */
2563 if (expiretime < now) continue;
2564 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2565 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2566 }
2567 /* Save the key and associated value */
2568 if (rdbSaveType(fp,o->type) == -1) goto werr;
2569 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2570 if (o->type == REDIS_STRING) {
2571 /* Save a string value */
2572 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2573 } else if (o->type == REDIS_LIST) {
2574 /* Save a list value */
2575 list *list = o->ptr;
2576 listNode *ln;
2577
2578 listRewind(list);
2579 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2580 while((ln = listYield(list))) {
2581 robj *eleobj = listNodeValue(ln);
2582
2583 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2584 }
2585 } else if (o->type == REDIS_SET) {
2586 /* Save a set value */
2587 dict *set = o->ptr;
2588 dictIterator *di = dictGetIterator(set);
2589 dictEntry *de;
2590
2591 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2592 while((de = dictNext(di)) != NULL) {
2593 robj *eleobj = dictGetEntryKey(de);
2594
2595 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2596 }
2597 dictReleaseIterator(di);
2598 } else if (o->type == REDIS_ZSET) {
2599 /* Save a set value */
2600 zset *zs = o->ptr;
2601 dictIterator *di = dictGetIterator(zs->dict);
2602 dictEntry *de;
2603
2604 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2605 while((de = dictNext(di)) != NULL) {
2606 robj *eleobj = dictGetEntryKey(de);
2607 double *score = dictGetEntryVal(de);
2608
2609 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2610 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2611 }
2612 dictReleaseIterator(di);
2613 } else {
2614 assert(0 != 0);
2615 }
2616 }
2617 dictReleaseIterator(di);
2618 }
2619 /* EOF opcode */
2620 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2621
2622 /* Make sure data will not remain on the OS's output buffers */
2623 fflush(fp);
2624 fsync(fileno(fp));
2625 fclose(fp);
2626
2627 /* Use RENAME to make sure the DB file is changed atomically only
2628 * if the generate DB file is ok. */
2629 if (rename(tmpfile,filename) == -1) {
2630 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2631 unlink(tmpfile);
2632 return REDIS_ERR;
2633 }
2634 redisLog(REDIS_NOTICE,"DB saved on disk");
2635 server.dirty = 0;
2636 server.lastsave = time(NULL);
2637 return REDIS_OK;
2638
2639 werr:
2640 fclose(fp);
2641 unlink(tmpfile);
2642 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2643 if (di) dictReleaseIterator(di);
2644 return REDIS_ERR;
2645 }
2646
2647 static int rdbSaveBackground(char *filename) {
2648 pid_t childpid;
2649
2650 if (server.bgsavechildpid != -1) return REDIS_ERR;
2651 if ((childpid = fork()) == 0) {
2652 /* Child */
2653 close(server.fd);
2654 if (rdbSave(filename) == REDIS_OK) {
2655 exit(0);
2656 } else {
2657 exit(1);
2658 }
2659 } else {
2660 /* Parent */
2661 if (childpid == -1) {
2662 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2663 strerror(errno));
2664 return REDIS_ERR;
2665 }
2666 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2667 server.bgsavechildpid = childpid;
2668 return REDIS_OK;
2669 }
2670 return REDIS_OK; /* unreached */
2671 }
2672
2673 static void rdbRemoveTempFile(pid_t childpid) {
2674 char tmpfile[256];
2675
2676 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2677 unlink(tmpfile);
2678 }
2679
2680 static int rdbLoadType(FILE *fp) {
2681 unsigned char type;
2682 if (fread(&type,1,1,fp) == 0) return -1;
2683 return type;
2684 }
2685
2686 static time_t rdbLoadTime(FILE *fp) {
2687 int32_t t32;
2688 if (fread(&t32,4,1,fp) == 0) return -1;
2689 return (time_t) t32;
2690 }
2691
2692 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2693 * of this file for a description of how this are stored on disk.
2694 *
2695 * isencoded is set to 1 if the readed length is not actually a length but
2696 * an "encoding type", check the above comments for more info */
2697 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2698 unsigned char buf[2];
2699 uint32_t len;
2700
2701 if (isencoded) *isencoded = 0;
2702 if (rdbver == 0) {
2703 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2704 return ntohl(len);
2705 } else {
2706 int type;
2707
2708 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2709 type = (buf[0]&0xC0)>>6;
2710 if (type == REDIS_RDB_6BITLEN) {
2711 /* Read a 6 bit len */
2712 return buf[0]&0x3F;
2713 } else if (type == REDIS_RDB_ENCVAL) {
2714 /* Read a 6 bit len encoding type */
2715 if (isencoded) *isencoded = 1;
2716 return buf[0]&0x3F;
2717 } else if (type == REDIS_RDB_14BITLEN) {
2718 /* Read a 14 bit len */
2719 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2720 return ((buf[0]&0x3F)<<8)|buf[1];
2721 } else {
2722 /* Read a 32 bit len */
2723 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2724 return ntohl(len);
2725 }
2726 }
2727 }
2728
2729 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2730 unsigned char enc[4];
2731 long long val;
2732
2733 if (enctype == REDIS_RDB_ENC_INT8) {
2734 if (fread(enc,1,1,fp) == 0) return NULL;
2735 val = (signed char)enc[0];
2736 } else if (enctype == REDIS_RDB_ENC_INT16) {
2737 uint16_t v;
2738 if (fread(enc,2,1,fp) == 0) return NULL;
2739 v = enc[0]|(enc[1]<<8);
2740 val = (int16_t)v;
2741 } else if (enctype == REDIS_RDB_ENC_INT32) {
2742 uint32_t v;
2743 if (fread(enc,4,1,fp) == 0) return NULL;
2744 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2745 val = (int32_t)v;
2746 } else {
2747 val = 0; /* anti-warning */
2748 assert(0!=0);
2749 }
2750 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2751 }
2752
2753 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2754 unsigned int len, clen;
2755 unsigned char *c = NULL;
2756 sds val = NULL;
2757
2758 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2759 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2760 if ((c = zmalloc(clen)) == NULL) goto err;
2761 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2762 if (fread(c,clen,1,fp) == 0) goto err;
2763 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2764 zfree(c);
2765 return createObject(REDIS_STRING,val);
2766 err:
2767 zfree(c);
2768 sdsfree(val);
2769 return NULL;
2770 }
2771
2772 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2773 int isencoded;
2774 uint32_t len;
2775 sds val;
2776
2777 len = rdbLoadLen(fp,rdbver,&isencoded);
2778 if (isencoded) {
2779 switch(len) {
2780 case REDIS_RDB_ENC_INT8:
2781 case REDIS_RDB_ENC_INT16:
2782 case REDIS_RDB_ENC_INT32:
2783 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2784 case REDIS_RDB_ENC_LZF:
2785 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2786 default:
2787 assert(0!=0);
2788 }
2789 }
2790
2791 if (len == REDIS_RDB_LENERR) return NULL;
2792 val = sdsnewlen(NULL,len);
2793 if (len && fread(val,len,1,fp) == 0) {
2794 sdsfree(val);
2795 return NULL;
2796 }
2797 return tryObjectSharing(createObject(REDIS_STRING,val));
2798 }
2799
2800 /* For information about double serialization check rdbSaveDoubleValue() */
2801 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2802 char buf[128];
2803 unsigned char len;
2804
2805 if (fread(&len,1,1,fp) == 0) return -1;
2806 switch(len) {
2807 case 255: *val = R_NegInf; return 0;
2808 case 254: *val = R_PosInf; return 0;
2809 case 253: *val = R_Nan; return 0;
2810 default:
2811 if (fread(buf,len,1,fp) == 0) return -1;
2812 sscanf(buf, "%lg", val);
2813 return 0;
2814 }
2815 }
2816
2817 static int rdbLoad(char *filename) {
2818 FILE *fp;
2819 robj *keyobj = NULL;
2820 uint32_t dbid;
2821 int type, retval, rdbver;
2822 dict *d = server.db[0].dict;
2823 redisDb *db = server.db+0;
2824 char buf[1024];
2825 time_t expiretime = -1, now = time(NULL);
2826
2827 fp = fopen(filename,"r");
2828 if (!fp) return REDIS_ERR;
2829 if (fread(buf,9,1,fp) == 0) goto eoferr;
2830 buf[9] = '\0';
2831 if (memcmp(buf,"REDIS",5) != 0) {
2832 fclose(fp);
2833 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2834 return REDIS_ERR;
2835 }
2836 rdbver = atoi(buf+5);
2837 if (rdbver > 1) {
2838 fclose(fp);
2839 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2840 return REDIS_ERR;
2841 }
2842 while(1) {
2843 robj *o;
2844
2845 /* Read type. */
2846 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2847 if (type == REDIS_EXPIRETIME) {
2848 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2849 /* We read the time so we need to read the object type again */
2850 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2851 }
2852 if (type == REDIS_EOF) break;
2853 /* Handle SELECT DB opcode as a special case */
2854 if (type == REDIS_SELECTDB) {
2855 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2856 goto eoferr;
2857 if (dbid >= (unsigned)server.dbnum) {
2858 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2859 exit(1);
2860 }
2861 db = server.db+dbid;
2862 d = db->dict;
2863 continue;
2864 }
2865 /* Read key */
2866 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2867
2868 if (type == REDIS_STRING) {
2869 /* Read string value */
2870 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2871 tryObjectEncoding(o);
2872 } else if (type == REDIS_LIST || type == REDIS_SET) {
2873 /* Read list/set value */
2874 uint32_t listlen;
2875
2876 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2877 goto eoferr;
2878 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2879 /* Load every single element of the list/set */
2880 while(listlen--) {
2881 robj *ele;
2882
2883 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2884 tryObjectEncoding(ele);
2885 if (type == REDIS_LIST) {
2886 listAddNodeTail((list*)o->ptr,ele);
2887 } else {
2888 dictAdd((dict*)o->ptr,ele,NULL);
2889 }
2890 }
2891 } else if (type == REDIS_ZSET) {
2892 /* Read list/set value */
2893 uint32_t zsetlen;
2894 zset *zs;
2895
2896 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2897 goto eoferr;
2898 o = createZsetObject();
2899 zs = o->ptr;
2900 /* Load every single element of the list/set */
2901 while(zsetlen--) {
2902 robj *ele;
2903 double *score = zmalloc(sizeof(double));
2904
2905 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2906 tryObjectEncoding(ele);
2907 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2908 dictAdd(zs->dict,ele,score);
2909 zslInsert(zs->zsl,*score,ele);
2910 incrRefCount(ele); /* added to skiplist */
2911 }
2912 } else {
2913 assert(0 != 0);
2914 }
2915 /* Add the new object in the hash table */
2916 retval = dictAdd(d,keyobj,o);
2917 if (retval == DICT_ERR) {
2918 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2919 exit(1);
2920 }
2921 /* Set the expire time if needed */
2922 if (expiretime != -1) {
2923 setExpire(db,keyobj,expiretime);
2924 /* Delete this key if already expired */
2925 if (expiretime < now) deleteKey(db,keyobj);
2926 expiretime = -1;
2927 }
2928 keyobj = o = NULL;
2929 }
2930 fclose(fp);
2931 return REDIS_OK;
2932
2933 eoferr: /* unexpected end of file is handled here with a fatal exit */
2934 if (keyobj) decrRefCount(keyobj);
2935 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2936 exit(1);
2937 return REDIS_ERR; /* Just to avoid warning */
2938 }
2939
2940 /*================================== Commands =============================== */
2941
2942 static void authCommand(redisClient *c) {
2943 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2944 c->authenticated = 1;
2945 addReply(c,shared.ok);
2946 } else {
2947 c->authenticated = 0;
2948 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2949 }
2950 }
2951
2952 static void pingCommand(redisClient *c) {
2953 addReply(c,shared.pong);
2954 }
2955
2956 static void echoCommand(redisClient *c) {
2957 addReplyBulkLen(c,c->argv[1]);
2958 addReply(c,c->argv[1]);
2959 addReply(c,shared.crlf);
2960 }
2961
2962 /*=================================== Strings =============================== */
2963
2964 static void setGenericCommand(redisClient *c, int nx) {
2965 int retval;
2966
2967 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2968 if (retval == DICT_ERR) {
2969 if (!nx) {
2970 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2971 incrRefCount(c->argv[2]);
2972 } else {
2973 addReply(c,shared.czero);
2974 return;
2975 }
2976 } else {
2977 incrRefCount(c->argv[1]);
2978 incrRefCount(c->argv[2]);
2979 }
2980 server.dirty++;
2981 removeExpire(c->db,c->argv[1]);
2982 addReply(c, nx ? shared.cone : shared.ok);
2983 }
2984
2985 static void setCommand(redisClient *c) {
2986 setGenericCommand(c,0);
2987 }
2988
2989 static void setnxCommand(redisClient *c) {
2990 setGenericCommand(c,1);
2991 }
2992
2993 static void getCommand(redisClient *c) {
2994 robj *o = lookupKeyRead(c->db,c->argv[1]);
2995
2996 if (o == NULL) {
2997 addReply(c,shared.nullbulk);
2998 } else {
2999 if (o->type != REDIS_STRING) {
3000 addReply(c,shared.wrongtypeerr);
3001 } else {
3002 addReplyBulkLen(c,o);
3003 addReply(c,o);
3004 addReply(c,shared.crlf);
3005 }
3006 }
3007 }
3008
3009 static void getsetCommand(redisClient *c) {
3010 getCommand(c);
3011 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3012 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3013 } else {
3014 incrRefCount(c->argv[1]);
3015 }
3016 incrRefCount(c->argv[2]);
3017 server.dirty++;
3018 removeExpire(c->db,c->argv[1]);
3019 }
3020
3021 static void mgetCommand(redisClient *c) {
3022 int j;
3023
3024 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3025 for (j = 1; j < c->argc; j++) {
3026 robj *o = lookupKeyRead(c->db,c->argv[j]);
3027 if (o == NULL) {
3028 addReply(c,shared.nullbulk);
3029 } else {
3030 if (o->type != REDIS_STRING) {
3031 addReply(c,shared.nullbulk);
3032 } else {
3033 addReplyBulkLen(c,o);
3034 addReply(c,o);
3035 addReply(c,shared.crlf);
3036 }
3037 }
3038 }
3039 }
3040
3041 static void msetGenericCommand(redisClient *c, int nx) {
3042 int j;
3043
3044 if ((c->argc % 2) == 0) {
3045 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
3046 return;
3047 }
3048 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3049 * set nothing at all if at least one already key exists. */
3050 if (nx) {
3051 for (j = 1; j < c->argc; j += 2) {
3052 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
3053 addReply(c, shared.czero);
3054 return;
3055 }
3056 }
3057 }
3058
3059 for (j = 1; j < c->argc; j += 2) {
3060 int retval;
3061
3062 tryObjectEncoding(c->argv[j+1]);
3063 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3064 if (retval == DICT_ERR) {
3065 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3066 incrRefCount(c->argv[j+1]);
3067 } else {
3068 incrRefCount(c->argv[j]);
3069 incrRefCount(c->argv[j+1]);
3070 }
3071 removeExpire(c->db,c->argv[j]);
3072 }
3073 server.dirty += (c->argc-1)/2;
3074 addReply(c, nx ? shared.cone : shared.ok);
3075 }
3076
3077 static void msetCommand(redisClient *c) {
3078 msetGenericCommand(c,0);
3079 }
3080
3081 static void msetnxCommand(redisClient *c) {
3082 msetGenericCommand(c,1);
3083 }
3084
3085 static void incrDecrCommand(redisClient *c, long long incr) {
3086 long long value;
3087 int retval;
3088 robj *o;
3089
3090 o = lookupKeyWrite(c->db,c->argv[1]);
3091 if (o == NULL) {
3092 value = 0;
3093 } else {
3094 if (o->type != REDIS_STRING) {
3095 value = 0;
3096 } else {
3097 char *eptr;
3098
3099 if (o->encoding == REDIS_ENCODING_RAW)
3100 value = strtoll(o->ptr, &eptr, 10);
3101 else if (o->encoding == REDIS_ENCODING_INT)
3102 value = (long)o->ptr;
3103 else
3104 assert(1 != 1);
3105 }
3106 }
3107
3108 value += incr;
3109 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3110 tryObjectEncoding(o);
3111 retval = dictAdd(c->db->dict,c->argv[1],o);
3112 if (retval == DICT_ERR) {
3113 dictReplace(c->db->dict,c->argv[1],o);
3114 removeExpire(c->db,c->argv[1]);
3115 } else {
3116 incrRefCount(c->argv[1]);
3117 }
3118 server.dirty++;
3119 addReply(c,shared.colon);
3120 addReply(c,o);
3121 addReply(c,shared.crlf);
3122 }
3123
3124 static void incrCommand(redisClient *c) {
3125 incrDecrCommand(c,1);
3126 }
3127
3128 static void decrCommand(redisClient *c) {
3129 incrDecrCommand(c,-1);
3130 }
3131
3132 static void incrbyCommand(redisClient *c) {
3133 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3134 incrDecrCommand(c,incr);
3135 }
3136
3137 static void decrbyCommand(redisClient *c) {
3138 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3139 incrDecrCommand(c,-incr);
3140 }
3141
3142 /* ========================= Type agnostic commands ========================= */
3143
3144 static void delCommand(redisClient *c) {
3145 int deleted = 0, j;
3146
3147 for (j = 1; j < c->argc; j++) {
3148 if (deleteKey(c->db,c->argv[j])) {
3149 server.dirty++;
3150 deleted++;
3151 }
3152 }
3153 switch(deleted) {
3154 case 0:
3155 addReply(c,shared.czero);
3156 break;
3157 case 1:
3158 addReply(c,shared.cone);
3159 break;
3160 default:
3161 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3162 break;
3163 }
3164 }
3165
3166 static void existsCommand(redisClient *c) {
3167 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3168 }
3169
3170 static void selectCommand(redisClient *c) {
3171 int id = atoi(c->argv[1]->ptr);
3172
3173 if (selectDb(c,id) == REDIS_ERR) {
3174 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3175 } else {
3176 addReply(c,shared.ok);
3177 }
3178 }
3179
3180 static void randomkeyCommand(redisClient *c) {
3181 dictEntry *de;
3182
3183 while(1) {
3184 de = dictGetRandomKey(c->db->dict);
3185 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3186 }
3187 if (de == NULL) {
3188 addReply(c,shared.plus);
3189 addReply(c,shared.crlf);
3190 } else {
3191 addReply(c,shared.plus);
3192 addReply(c,dictGetEntryKey(de));
3193 addReply(c,shared.crlf);
3194 }
3195 }
3196
3197 static void keysCommand(redisClient *c) {
3198 dictIterator *di;
3199 dictEntry *de;
3200 sds pattern = c->argv[1]->ptr;
3201 int plen = sdslen(pattern);
3202 int numkeys = 0, keyslen = 0;
3203 robj *lenobj = createObject(REDIS_STRING,NULL);
3204
3205 di = dictGetIterator(c->db->dict);
3206 addReply(c,lenobj);
3207 decrRefCount(lenobj);
3208 while((de = dictNext(di)) != NULL) {
3209 robj *keyobj = dictGetEntryKey(de);
3210
3211 sds key = keyobj->ptr;
3212 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3213 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3214 if (expireIfNeeded(c->db,keyobj) == 0) {
3215 if (numkeys != 0)
3216 addReply(c,shared.space);
3217 addReply(c,keyobj);
3218 numkeys++;
3219 keyslen += sdslen(key);
3220 }
3221 }
3222 }
3223 dictReleaseIterator(di);
3224 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3225 addReply(c,shared.crlf);
3226 }
3227
3228 static void dbsizeCommand(redisClient *c) {
3229 addReplySds(c,
3230 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3231 }
3232
3233 static void lastsaveCommand(redisClient *c) {
3234 addReplySds(c,
3235 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3236 }
3237
3238 static void typeCommand(redisClient *c) {
3239 robj *o;
3240 char *type;
3241
3242 o = lookupKeyRead(c->db,c->argv[1]);
3243 if (o == NULL) {
3244 type = "+none";
3245 } else {
3246 switch(o->type) {
3247 case REDIS_STRING: type = "+string"; break;
3248 case REDIS_LIST: type = "+list"; break;
3249 case REDIS_SET: type = "+set"; break;
3250 case REDIS_ZSET: type = "+zset"; break;
3251 default: type = "unknown"; break;
3252 }
3253 }
3254 addReplySds(c,sdsnew(type));
3255 addReply(c,shared.crlf);
3256 }
3257
3258 static void saveCommand(redisClient *c) {
3259 if (server.bgsavechildpid != -1) {
3260 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3261 return;
3262 }
3263 if (rdbSave(server.dbfilename) == REDIS_OK) {
3264 addReply(c,shared.ok);
3265 } else {
3266 addReply(c,shared.err);
3267 }
3268 }
3269
3270 static void bgsaveCommand(redisClient *c) {
3271 if (server.bgsavechildpid != -1) {
3272 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3273 return;
3274 }
3275 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3276 addReply(c,shared.ok);
3277 } else {
3278 addReply(c,shared.err);
3279 }
3280 }
3281
3282 static void shutdownCommand(redisClient *c) {
3283 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3284 /* Kill the saving child if there is a background saving in progress.
3285 We want to avoid race conditions, for instance our saving child may
3286 overwrite the synchronous saving did by SHUTDOWN. */
3287 if (server.bgsavechildpid != -1) {
3288 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3289 kill(server.bgsavechildpid,SIGKILL);
3290 rdbRemoveTempFile(server.bgsavechildpid);
3291 }
3292 /* SYNC SAVE */
3293 if (rdbSave(server.dbfilename) == REDIS_OK) {
3294 if (server.daemonize)
3295 unlink(server.pidfile);
3296 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3297 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3298 exit(1);
3299 } else {
3300 /* Ooops.. error saving! The best we can do is to continue operating.
3301 * Note that if there was a background saving process, in the next
3302 * cron() Redis will be notified that the background saving aborted,
3303 * handling special stuff like slaves pending for synchronization... */
3304 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3305 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3306 }
3307 }
3308
3309 static void renameGenericCommand(redisClient *c, int nx) {
3310 robj *o;
3311
3312 /* To use the same key as src and dst is probably an error */
3313 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3314 addReply(c,shared.sameobjecterr);
3315 return;
3316 }
3317
3318 o = lookupKeyWrite(c->db,c->argv[1]);
3319 if (o == NULL) {
3320 addReply(c,shared.nokeyerr);
3321 return;
3322 }
3323 incrRefCount(o);
3324 deleteIfVolatile(c->db,c->argv[2]);
3325 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3326 if (nx) {
3327 decrRefCount(o);
3328 addReply(c,shared.czero);
3329 return;
3330 }
3331 dictReplace(c->db->dict,c->argv[2],o);
3332 } else {
3333 incrRefCount(c->argv[2]);
3334 }
3335 deleteKey(c->db,c->argv[1]);
3336 server.dirty++;
3337 addReply(c,nx ? shared.cone : shared.ok);
3338 }
3339
3340 static void renameCommand(redisClient *c) {
3341 renameGenericCommand(c,0);
3342 }
3343
3344 static void renamenxCommand(redisClient *c) {
3345 renameGenericCommand(c,1);
3346 }
3347
3348 static void moveCommand(redisClient *c) {
3349 robj *o;
3350 redisDb *src, *dst;
3351 int srcid;
3352
3353 /* Obtain source and target DB pointers */
3354 src = c->db;
3355 srcid = c->db->id;
3356 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3357 addReply(c,shared.outofrangeerr);
3358 return;
3359 }
3360 dst = c->db;
3361 selectDb(c,srcid); /* Back to the source DB */
3362
3363 /* If the user is moving using as target the same
3364 * DB as the source DB it is probably an error. */
3365 if (src == dst) {
3366 addReply(c,shared.sameobjecterr);
3367 return;
3368 }
3369
3370 /* Check if the element exists and get a reference */
3371 o = lookupKeyWrite(c->db,c->argv[1]);
3372 if (!o) {
3373 addReply(c,shared.czero);
3374 return;
3375 }
3376
3377 /* Try to add the element to the target DB */
3378 deleteIfVolatile(dst,c->argv[1]);
3379 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3380 addReply(c,shared.czero);
3381 return;
3382 }
3383 incrRefCount(c->argv[1]);
3384 incrRefCount(o);
3385
3386 /* OK! key moved, free the entry in the source DB */
3387 deleteKey(src,c->argv[1]);
3388 server.dirty++;
3389 addReply(c,shared.cone);
3390 }
3391
3392 /* =================================== Lists ================================ */
3393 static void pushGenericCommand(redisClient *c, int where) {
3394 robj *lobj;
3395 list *list;
3396
3397 lobj = lookupKeyWrite(c->db,c->argv[1]);
3398 if (lobj == NULL) {
3399 lobj = createListObject();
3400 list = lobj->ptr;
3401 if (where == REDIS_HEAD) {
3402 listAddNodeHead(list,c->argv[2]);
3403 } else {
3404 listAddNodeTail(list,c->argv[2]);
3405 }
3406 dictAdd(c->db->dict,c->argv[1],lobj);
3407 incrRefCount(c->argv[1]);
3408 incrRefCount(c->argv[2]);
3409 } else {
3410 if (lobj->type != REDIS_LIST) {
3411 addReply(c,shared.wrongtypeerr);
3412 return;
3413 }
3414 list = lobj->ptr;
3415 if (where == REDIS_HEAD) {
3416 listAddNodeHead(list,c->argv[2]);
3417 } else {
3418 listAddNodeTail(list,c->argv[2]);
3419 }
3420 incrRefCount(c->argv[2]);
3421 }
3422 server.dirty++;
3423 addReply(c,shared.ok);
3424 }
3425
3426 static void lpushCommand(redisClient *c) {
3427 pushGenericCommand(c,REDIS_HEAD);
3428 }
3429
3430 static void rpushCommand(redisClient *c) {
3431 pushGenericCommand(c,REDIS_TAIL);
3432 }
3433
3434 static void llenCommand(redisClient *c) {
3435 robj *o;
3436 list *l;
3437
3438 o = lookupKeyRead(c->db,c->argv[1]);
3439 if (o == NULL) {
3440 addReply(c,shared.czero);
3441 return;
3442 } else {
3443 if (o->type != REDIS_LIST) {
3444 addReply(c,shared.wrongtypeerr);
3445 } else {
3446 l = o->ptr;
3447 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3448 }
3449 }
3450 }
3451
3452 static void lindexCommand(redisClient *c) {
3453 robj *o;
3454 int index = atoi(c->argv[2]->ptr);
3455
3456 o = lookupKeyRead(c->db,c->argv[1]);
3457 if (o == NULL) {
3458 addReply(c,shared.nullbulk);
3459 } else {
3460 if (o->type != REDIS_LIST) {
3461 addReply(c,shared.wrongtypeerr);
3462 } else {
3463 list *list = o->ptr;
3464 listNode *ln;
3465
3466 ln = listIndex(list, index);
3467 if (ln == NULL) {
3468 addReply(c,shared.nullbulk);
3469 } else {
3470 robj *ele = listNodeValue(ln);
3471 addReplyBulkLen(c,ele);
3472 addReply(c,ele);
3473 addReply(c,shared.crlf);
3474 }
3475 }
3476 }
3477 }
3478
3479 static void lsetCommand(redisClient *c) {
3480 robj *o;
3481 int index = atoi(c->argv[2]->ptr);
3482
3483 o = lookupKeyWrite(c->db,c->argv[1]);
3484 if (o == NULL) {
3485 addReply(c,shared.nokeyerr);
3486 } else {
3487 if (o->type != REDIS_LIST) {
3488 addReply(c,shared.wrongtypeerr);
3489 } else {
3490 list *list = o->ptr;
3491 listNode *ln;
3492
3493 ln = listIndex(list, index);
3494 if (ln == NULL) {
3495 addReply(c,shared.outofrangeerr);
3496 } else {
3497 robj *ele = listNodeValue(ln);
3498
3499 decrRefCount(ele);
3500 listNodeValue(ln) = c->argv[3];
3501 incrRefCount(c->argv[3]);
3502 addReply(c,shared.ok);
3503 server.dirty++;
3504 }
3505 }
3506 }
3507 }
3508
3509 static void popGenericCommand(redisClient *c, int where) {
3510 robj *o;
3511
3512 o = lookupKeyWrite(c->db,c->argv[1]);
3513 if (o == NULL) {
3514 addReply(c,shared.nullbulk);
3515 } else {
3516 if (o->type != REDIS_LIST) {
3517 addReply(c,shared.wrongtypeerr);
3518 } else {
3519 list *list = o->ptr;
3520 listNode *ln;
3521
3522 if (where == REDIS_HEAD)
3523 ln = listFirst(list);
3524 else
3525 ln = listLast(list);
3526
3527 if (ln == NULL) {
3528 addReply(c,shared.nullbulk);
3529 } else {
3530 robj *ele = listNodeValue(ln);
3531 addReplyBulkLen(c,ele);
3532 addReply(c,ele);
3533 addReply(c,shared.crlf);
3534 listDelNode(list,ln);
3535 server.dirty++;
3536 }
3537 }
3538 }
3539 }
3540
3541 static void lpopCommand(redisClient *c) {
3542 popGenericCommand(c,REDIS_HEAD);
3543 }
3544
3545 static void rpopCommand(redisClient *c) {
3546 popGenericCommand(c,REDIS_TAIL);
3547 }
3548
3549 static void lrangeCommand(redisClient *c) {
3550 robj *o;
3551 int start = atoi(c->argv[2]->ptr);
3552 int end = atoi(c->argv[3]->ptr);
3553
3554 o = lookupKeyRead(c->db,c->argv[1]);
3555 if (o == NULL) {
3556 addReply(c,shared.nullmultibulk);
3557 } else {
3558 if (o->type != REDIS_LIST) {
3559 addReply(c,shared.wrongtypeerr);
3560 } else {
3561 list *list = o->ptr;
3562 listNode *ln;
3563 int llen = listLength(list);
3564 int rangelen, j;
3565 robj *ele;
3566
3567 /* convert negative indexes */
3568 if (start < 0) start = llen+start;
3569 if (end < 0) end = llen+end;
3570 if (start < 0) start = 0;
3571 if (end < 0) end = 0;
3572
3573 /* indexes sanity checks */
3574 if (start > end || start >= llen) {
3575 /* Out of range start or start > end result in empty list */
3576 addReply(c,shared.emptymultibulk);
3577 return;
3578 }
3579 if (end >= llen) end = llen-1;
3580 rangelen = (end-start)+1;
3581
3582 /* Return the result in form of a multi-bulk reply */
3583 ln = listIndex(list, start);
3584 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3585 for (j = 0; j < rangelen; j++) {
3586 ele = listNodeValue(ln);
3587 addReplyBulkLen(c,ele);
3588 addReply(c,ele);
3589 addReply(c,shared.crlf);
3590 ln = ln->next;
3591 }
3592 }
3593 }
3594 }
3595
3596 static void ltrimCommand(redisClient *c) {
3597 robj *o;
3598 int start = atoi(c->argv[2]->ptr);
3599 int end = atoi(c->argv[3]->ptr);
3600
3601 o = lookupKeyWrite(c->db,c->argv[1]);
3602 if (o == NULL) {
3603 addReply(c,shared.nokeyerr);
3604 } else {
3605 if (o->type != REDIS_LIST) {
3606 addReply(c,shared.wrongtypeerr);
3607 } else {
3608 list *list = o->ptr;
3609 listNode *ln;
3610 int llen = listLength(list);
3611 int j, ltrim, rtrim;
3612
3613 /* convert negative indexes */
3614 if (start < 0) start = llen+start;
3615 if (end < 0) end = llen+end;
3616 if (start < 0) start = 0;
3617 if (end < 0) end = 0;
3618
3619 /* indexes sanity checks */
3620 if (start > end || start >= llen) {
3621 /* Out of range start or start > end result in empty list */
3622 ltrim = llen;
3623 rtrim = 0;
3624 } else {
3625 if (end >= llen) end = llen-1;
3626 ltrim = start;
3627 rtrim = llen-end-1;
3628 }
3629
3630 /* Remove list elements to perform the trim */
3631 for (j = 0; j < ltrim; j++) {
3632 ln = listFirst(list);
3633 listDelNode(list,ln);
3634 }
3635 for (j = 0; j < rtrim; j++) {
3636 ln = listLast(list);
3637 listDelNode(list,ln);
3638 }
3639 server.dirty++;
3640 addReply(c,shared.ok);
3641 }
3642 }
3643 }
3644
3645 static void lremCommand(redisClient *c) {
3646 robj *o;
3647
3648 o = lookupKeyWrite(c->db,c->argv[1]);
3649 if (o == NULL) {
3650 addReply(c,shared.czero);
3651 } else {
3652 if (o->type != REDIS_LIST) {
3653 addReply(c,shared.wrongtypeerr);
3654 } else {
3655 list *list = o->ptr;
3656 listNode *ln, *next;
3657 int toremove = atoi(c->argv[2]->ptr);
3658 int removed = 0;
3659 int fromtail = 0;
3660
3661 if (toremove < 0) {
3662 toremove = -toremove;
3663 fromtail = 1;
3664 }
3665 ln = fromtail ? list->tail : list->head;
3666 while (ln) {
3667 robj *ele = listNodeValue(ln);
3668
3669 next = fromtail ? ln->prev : ln->next;
3670 if (compareStringObjects(ele,c->argv[3]) == 0) {
3671 listDelNode(list,ln);
3672 server.dirty++;
3673 removed++;
3674 if (toremove && removed == toremove) break;
3675 }
3676 ln = next;
3677 }
3678 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3679 }
3680 }
3681 }
3682
3683 /* This is the semantic of this command:
3684 * RPOPLPUSH srclist dstlist:
3685 * IF LLEN(srclist) > 0
3686 * element = RPOP srclist
3687 * LPUSH dstlist element
3688 * RETURN element
3689 * ELSE
3690 * RETURN nil
3691 * END
3692 * END
3693 *
3694 * The idea is to be able to get an element from a list in a reliable way
3695 * since the element is not just returned but pushed against another list
3696 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3697 */
3698 static void rpoplpushcommand(redisClient *c) {
3699 robj *sobj;
3700
3701 sobj = lookupKeyWrite(c->db,c->argv[1]);
3702 if (sobj == NULL) {
3703 addReply(c,shared.nullbulk);
3704 } else {
3705 if (sobj->type != REDIS_LIST) {
3706 addReply(c,shared.wrongtypeerr);
3707 } else {
3708 list *srclist = sobj->ptr;
3709 listNode *ln = listLast(srclist);
3710
3711 if (ln == NULL) {
3712 addReply(c,shared.nullbulk);
3713 } else {
3714 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3715 robj *ele = listNodeValue(ln);
3716 list *dstlist;
3717
3718 if (dobj == NULL) {
3719
3720 /* Create the list if the key does not exist */
3721 dobj = createListObject();
3722 dictAdd(c->db->dict,c->argv[2],dobj);
3723 incrRefCount(c->argv[2]);
3724 } else if (dobj->type != REDIS_LIST) {
3725 addReply(c,shared.wrongtypeerr);
3726 return;
3727 }
3728 /* Add the element to the target list */
3729 dstlist = dobj->ptr;
3730 listAddNodeHead(dstlist,ele);
3731 incrRefCount(ele);
3732
3733 /* Send the element to the client as reply as well */
3734 addReplyBulkLen(c,ele);
3735 addReply(c,ele);
3736 addReply(c,shared.crlf);
3737
3738 /* Finally remove the element from the source list */
3739 listDelNode(srclist,ln);
3740 server.dirty++;
3741 }
3742 }
3743 }
3744 }
3745
3746
3747 /* ==================================== Sets ================================ */
3748
3749 static void saddCommand(redisClient *c) {
3750 robj *set;
3751
3752 set = lookupKeyWrite(c->db,c->argv[1]);
3753 if (set == NULL) {
3754 set = createSetObject();
3755 dictAdd(c->db->dict,c->argv[1],set);
3756 incrRefCount(c->argv[1]);
3757 } else {
3758 if (set->type != REDIS_SET) {
3759 addReply(c,shared.wrongtypeerr);
3760 return;
3761 }
3762 }
3763 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3764 incrRefCount(c->argv[2]);
3765 server.dirty++;
3766 addReply(c,shared.cone);
3767 } else {
3768 addReply(c,shared.czero);
3769 }
3770 }
3771
3772 static void sremCommand(redisClient *c) {
3773 robj *set;
3774
3775 set = lookupKeyWrite(c->db,c->argv[1]);
3776 if (set == NULL) {
3777 addReply(c,shared.czero);
3778 } else {
3779 if (set->type != REDIS_SET) {
3780 addReply(c,shared.wrongtypeerr);
3781 return;
3782 }
3783 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3784 server.dirty++;
3785 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3786 addReply(c,shared.cone);
3787 } else {
3788 addReply(c,shared.czero);
3789 }
3790 }
3791 }
3792
3793 static void smoveCommand(redisClient *c) {
3794 robj *srcset, *dstset;
3795
3796 srcset = lookupKeyWrite(c->db,c->argv[1]);
3797 dstset = lookupKeyWrite(c->db,c->argv[2]);
3798
3799 /* If the source key does not exist return 0, if it's of the wrong type
3800 * raise an error */
3801 if (srcset == NULL || srcset->type != REDIS_SET) {
3802 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3803 return;
3804 }
3805 /* Error if the destination key is not a set as well */
3806 if (dstset && dstset->type != REDIS_SET) {
3807 addReply(c,shared.wrongtypeerr);
3808 return;
3809 }
3810 /* Remove the element from the source set */
3811 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3812 /* Key not found in the src set! return zero */
3813 addReply(c,shared.czero);
3814 return;
3815 }
3816 server.dirty++;
3817 /* Add the element to the destination set */
3818 if (!dstset) {
3819 dstset = createSetObject();
3820 dictAdd(c->db->dict,c->argv[2],dstset);
3821 incrRefCount(c->argv[2]);
3822 }
3823 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3824 incrRefCount(c->argv[3]);
3825 addReply(c,shared.cone);
3826 }
3827
3828 static void sismemberCommand(redisClient *c) {
3829 robj *set;
3830
3831 set = lookupKeyRead(c->db,c->argv[1]);
3832 if (set == NULL) {
3833 addReply(c,shared.czero);
3834 } else {
3835 if (set->type != REDIS_SET) {
3836 addReply(c,shared.wrongtypeerr);
3837 return;
3838 }
3839 if (dictFind(set->ptr,c->argv[2]))
3840 addReply(c,shared.cone);
3841 else
3842 addReply(c,shared.czero);
3843 }
3844 }
3845
3846 static void scardCommand(redisClient *c) {
3847 robj *o;
3848 dict *s;
3849
3850 o = lookupKeyRead(c->db,c->argv[1]);
3851 if (o == NULL) {
3852 addReply(c,shared.czero);
3853 return;
3854 } else {
3855 if (o->type != REDIS_SET) {
3856 addReply(c,shared.wrongtypeerr);
3857 } else {
3858 s = o->ptr;
3859 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3860 dictSize(s)));
3861 }
3862 }
3863 }
3864
3865 static void spopCommand(redisClient *c) {
3866 robj *set;
3867 dictEntry *de;
3868
3869 set = lookupKeyWrite(c->db,c->argv[1]);
3870 if (set == NULL) {
3871 addReply(c,shared.nullbulk);
3872 } else {
3873 if (set->type != REDIS_SET) {
3874 addReply(c,shared.wrongtypeerr);
3875 return;
3876 }
3877 de = dictGetRandomKey(set->ptr);
3878 if (de == NULL) {
3879 addReply(c,shared.nullbulk);
3880 } else {
3881 robj *ele = dictGetEntryKey(de);
3882
3883 addReplyBulkLen(c,ele);
3884 addReply(c,ele);
3885 addReply(c,shared.crlf);
3886 dictDelete(set->ptr,ele);
3887 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3888 server.dirty++;
3889 }
3890 }
3891 }
3892
3893 static void srandmemberCommand(redisClient *c) {
3894 robj *set;
3895 dictEntry *de;
3896
3897 set = lookupKeyRead(c->db,c->argv[1]);
3898 if (set == NULL) {
3899 addReply(c,shared.nullbulk);
3900 } else {
3901 if (set->type != REDIS_SET) {
3902 addReply(c,shared.wrongtypeerr);
3903 return;
3904 }
3905 de = dictGetRandomKey(set->ptr);
3906 if (de == NULL) {
3907 addReply(c,shared.nullbulk);
3908 } else {
3909 robj *ele = dictGetEntryKey(de);
3910
3911 addReplyBulkLen(c,ele);
3912 addReply(c,ele);
3913 addReply(c,shared.crlf);
3914 }
3915 }
3916 }
3917
3918 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3919 dict **d1 = (void*) s1, **d2 = (void*) s2;
3920
3921 return dictSize(*d1)-dictSize(*d2);
3922 }
3923
3924 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3925 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3926 dictIterator *di;
3927 dictEntry *de;
3928 robj *lenobj = NULL, *dstset = NULL;
3929 int j, cardinality = 0;
3930
3931 for (j = 0; j < setsnum; j++) {
3932 robj *setobj;
3933
3934 setobj = dstkey ?
3935 lookupKeyWrite(c->db,setskeys[j]) :
3936 lookupKeyRead(c->db,setskeys[j]);
3937 if (!setobj) {
3938 zfree(dv);
3939 if (dstkey) {
3940 deleteKey(c->db,dstkey);
3941 addReply(c,shared.ok);
3942 } else {
3943 addReply(c,shared.nullmultibulk);
3944 }
3945 return;
3946 }
3947 if (setobj->type != REDIS_SET) {
3948 zfree(dv);
3949 addReply(c,shared.wrongtypeerr);
3950 return;
3951 }
3952 dv[j] = setobj->ptr;
3953 }
3954 /* Sort sets from the smallest to largest, this will improve our
3955 * algorithm's performace */
3956 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3957
3958 /* The first thing we should output is the total number of elements...
3959 * since this is a multi-bulk write, but at this stage we don't know
3960 * the intersection set size, so we use a trick, append an empty object
3961 * to the output list and save the pointer to later modify it with the
3962 * right length */
3963 if (!dstkey) {
3964 lenobj = createObject(REDIS_STRING,NULL);
3965 addReply(c,lenobj);
3966 decrRefCount(lenobj);
3967 } else {
3968 /* If we have a target key where to store the resulting set
3969 * create this key with an empty set inside */
3970 dstset = createSetObject();
3971 }
3972
3973 /* Iterate all the elements of the first (smallest) set, and test
3974 * the element against all the other sets, if at least one set does
3975 * not include the element it is discarded */
3976 di = dictGetIterator(dv[0]);
3977
3978 while((de = dictNext(di)) != NULL) {
3979 robj *ele;
3980
3981 for (j = 1; j < setsnum; j++)
3982 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3983 if (j != setsnum)
3984 continue; /* at least one set does not contain the member */
3985 ele = dictGetEntryKey(de);
3986 if (!dstkey) {
3987 addReplyBulkLen(c,ele);
3988 addReply(c,ele);
3989 addReply(c,shared.crlf);
3990 cardinality++;
3991 } else {
3992 dictAdd(dstset->ptr,ele,NULL);
3993 incrRefCount(ele);
3994 }
3995 }
3996 dictReleaseIterator(di);
3997
3998 if (dstkey) {
3999 /* Store the resulting set into the target */
4000 deleteKey(c->db,dstkey);
4001 dictAdd(c->db->dict,dstkey,dstset);
4002 incrRefCount(dstkey);
4003 }
4004
4005 if (!dstkey) {
4006 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
4007 } else {
4008 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
4009 dictSize((dict*)dstset->ptr)));
4010 server.dirty++;
4011 }
4012 zfree(dv);
4013 }
4014
4015 static void sinterCommand(redisClient *c) {
4016 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4017 }
4018
4019 static void sinterstoreCommand(redisClient *c) {
4020 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4021 }
4022
4023 #define REDIS_OP_UNION 0
4024 #define REDIS_OP_DIFF 1
4025
4026 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4027 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4028 dictIterator *di;
4029 dictEntry *de;
4030 robj *dstset = NULL;
4031 int j, cardinality = 0;
4032
4033 for (j = 0; j < setsnum; j++) {
4034 robj *setobj;
4035
4036 setobj = dstkey ?
4037 lookupKeyWrite(c->db,setskeys[j]) :
4038 lookupKeyRead(c->db,setskeys[j]);
4039 if (!setobj) {
4040 dv[j] = NULL;
4041 continue;
4042 }
4043 if (setobj->type != REDIS_SET) {
4044 zfree(dv);
4045 addReply(c,shared.wrongtypeerr);
4046 return;
4047 }
4048 dv[j] = setobj->ptr;
4049 }
4050
4051 /* We need a temp set object to store our union. If the dstkey
4052 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4053 * this set object will be the resulting object to set into the target key*/
4054 dstset = createSetObject();
4055
4056 /* Iterate all the elements of all the sets, add every element a single
4057 * time to the result set */
4058 for (j = 0; j < setsnum; j++) {
4059 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4060 if (!dv[j]) continue; /* non existing keys are like empty sets */
4061
4062 di = dictGetIterator(dv[j]);
4063
4064 while((de = dictNext(di)) != NULL) {
4065 robj *ele;
4066
4067 /* dictAdd will not add the same element multiple times */
4068 ele = dictGetEntryKey(de);
4069 if (op == REDIS_OP_UNION || j == 0) {
4070 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4071 incrRefCount(ele);
4072 cardinality++;
4073 }
4074 } else if (op == REDIS_OP_DIFF) {
4075 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4076 cardinality--;
4077 }
4078 }
4079 }
4080 dictReleaseIterator(di);
4081
4082 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4083 }
4084
4085 /* Output the content of the resulting set, if not in STORE mode */
4086 if (!dstkey) {
4087 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4088 di = dictGetIterator(dstset->ptr);
4089 while((de = dictNext(di)) != NULL) {
4090 robj *ele;
4091
4092 ele = dictGetEntryKey(de);
4093 addReplyBulkLen(c,ele);
4094 addReply(c,ele);
4095 addReply(c,shared.crlf);
4096 }
4097 dictReleaseIterator(di);
4098 } else {
4099 /* If we have a target key where to store the resulting set
4100 * create this key with the result set inside */
4101 deleteKey(c->db,dstkey);
4102 dictAdd(c->db->dict,dstkey,dstset);
4103 incrRefCount(dstkey);
4104 }
4105
4106 /* Cleanup */
4107 if (!dstkey) {
4108 decrRefCount(dstset);
4109 } else {
4110 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
4111 dictSize((dict*)dstset->ptr)));
4112 server.dirty++;
4113 }
4114 zfree(dv);
4115 }
4116
4117 static void sunionCommand(redisClient *c) {
4118 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4119 }
4120
4121 static void sunionstoreCommand(redisClient *c) {
4122 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4123 }
4124
4125 static void sdiffCommand(redisClient *c) {
4126 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4127 }
4128
4129 static void sdiffstoreCommand(redisClient *c) {
4130 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4131 }
4132
4133 /* ==================================== ZSets =============================== */
4134
4135 /* ZSETs are ordered sets using two data structures to hold the same elements
4136 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4137 * data structure.
4138 *
4139 * The elements are added to an hash table mapping Redis objects to scores.
4140 * At the same time the elements are added to a skip list mapping scores
4141 * to Redis objects (so objects are sorted by scores in this "view"). */
4142
4143 /* This skiplist implementation is almost a C translation of the original
4144 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4145 * Alternative to Balanced Trees", modified in three ways:
4146 * a) this implementation allows for repeated values.
4147 * b) the comparison is not just by key (our 'score') but by satellite data.
4148 * c) there is a back pointer, so it's a doubly linked list with the back
4149 * pointers being only at "level 1". This allows to traverse the list
4150 * from tail to head, useful for ZREVRANGE. */
4151
4152 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4153 zskiplistNode *zn = zmalloc(sizeof(*zn));
4154
4155 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4156 zn->score = score;
4157 zn->obj = obj;
4158 return zn;
4159 }
4160
4161 static zskiplist *zslCreate(void) {
4162 int j;
4163 zskiplist *zsl;
4164
4165 zsl = zmalloc(sizeof(*zsl));
4166 zsl->level = 1;
4167 zsl->length = 0;
4168 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4169 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4170 zsl->header->forward[j] = NULL;
4171 zsl->header->backward = NULL;
4172 zsl->tail = NULL;
4173 return zsl;
4174 }
4175
4176 static void zslFreeNode(zskiplistNode *node) {
4177 decrRefCount(node->obj);
4178 zfree(node->forward);
4179 zfree(node);
4180 }
4181
4182 static void zslFree(zskiplist *zsl) {
4183 zskiplistNode *node = zsl->header->forward[0], *next;
4184
4185 zfree(zsl->header->forward);
4186 zfree(zsl->header);
4187 while(node) {
4188 next = node->forward[0];
4189 zslFreeNode(node);
4190 node = next;
4191 }
4192 zfree(zsl);
4193 }
4194
4195 static int zslRandomLevel(void) {
4196 int level = 1;
4197 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4198 level += 1;
4199 return level;
4200 }
4201
4202 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4203 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4204 int i, level;
4205
4206 x = zsl->header;
4207 for (i = zsl->level-1; i >= 0; i--) {
4208 while (x->forward[i] &&
4209 (x->forward[i]->score < score ||
4210 (x->forward[i]->score == score &&
4211 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4212 x = x->forward[i];
4213 update[i] = x;
4214 }
4215 /* we assume the key is not already inside, since we allow duplicated
4216 * scores, and the re-insertion of score and redis object should never
4217 * happpen since the caller of zslInsert() should test in the hash table
4218 * if the element is already inside or not. */
4219 level = zslRandomLevel();
4220 if (level > zsl->level) {
4221 for (i = zsl->level; i < level; i++)
4222 update[i] = zsl->header;
4223 zsl->level = level;
4224 }
4225 x = zslCreateNode(level,score,obj);
4226 for (i = 0; i < level; i++) {
4227 x->forward[i] = update[i]->forward[i];
4228 update[i]->forward[i] = x;
4229 }
4230 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4231 if (x->forward[0])
4232 x->forward[0]->backward = x;
4233 else
4234 zsl->tail = x;
4235 zsl->length++;
4236 }
4237
4238 /* Delete an element with matching score/object from the skiplist. */
4239 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4240 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4241 int i;
4242
4243 x = zsl->header;
4244 for (i = zsl->level-1; i >= 0; i--) {
4245 while (x->forward[i] &&
4246 (x->forward[i]->score < score ||
4247 (x->forward[i]->score == score &&
4248 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4249 x = x->forward[i];
4250 update[i] = x;
4251 }
4252 /* We may have multiple elements with the same score, what we need
4253 * is to find the element with both the right score and object. */
4254 x = x->forward[0];
4255 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4256 for (i = 0; i < zsl->level; i++) {
4257 if (update[i]->forward[i] != x) break;
4258 update[i]->forward[i] = x->forward[i];
4259 }
4260 if (x->forward[0]) {
4261 x->forward[0]->backward = (x->backward == zsl->header) ?
4262 NULL : x->backward;
4263 } else {
4264 zsl->tail = x->backward;
4265 }
4266 zslFreeNode(x);
4267 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4268 zsl->level--;
4269 zsl->length--;
4270 return 1;
4271 } else {
4272 return 0; /* not found */
4273 }
4274 return 0; /* not found */
4275 }
4276
4277 /* Delete all the elements with score between min and max from the skiplist.
4278 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4279 * Note that this function takes the reference to the hash table view of the
4280 * sorted set, in order to remove the elements from the hash table too. */
4281 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4282 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4283 unsigned long removed = 0;
4284 int i;
4285
4286 x = zsl->header;
4287 for (i = zsl->level-1; i >= 0; i--) {
4288 while (x->forward[i] && x->forward[i]->score < min)
4289 x = x->forward[i];
4290 update[i] = x;
4291 }
4292 /* We may have multiple elements with the same score, what we need
4293 * is to find the element with both the right score and object. */
4294 x = x->forward[0];
4295 while (x && x->score <= max) {
4296 zskiplistNode *next;
4297
4298 for (i = 0; i < zsl->level; i++) {
4299 if (update[i]->forward[i] != x) break;
4300 update[i]->forward[i] = x->forward[i];
4301 }
4302 if (x->forward[0]) {
4303 x->forward[0]->backward = (x->backward == zsl->header) ?
4304 NULL : x->backward;
4305 } else {
4306 zsl->tail = x->backward;
4307 }
4308 next = x->forward[0];
4309 dictDelete(dict,x->obj);
4310 zslFreeNode(x);
4311 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4312 zsl->level--;
4313 zsl->length--;
4314 removed++;
4315 x = next;
4316 }
4317 return removed; /* not found */
4318 }
4319
4320 /* Find the first node having a score equal or greater than the specified one.
4321 * Returns NULL if there is no match. */
4322 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4323 zskiplistNode *x;
4324 int i;
4325
4326 x = zsl->header;
4327 for (i = zsl->level-1; i >= 0; i--) {
4328 while (x->forward[i] && x->forward[i]->score < score)
4329 x = x->forward[i];
4330 }
4331 /* We may have multiple elements with the same score, what we need
4332 * is to find the element with both the right score and object. */
4333 return x->forward[0];
4334 }
4335
4336 /* The actual Z-commands implementations */
4337
4338 /* This generic command implements both ZADD and ZINCRBY.
4339 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4340 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4341 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4342 robj *zsetobj;
4343 zset *zs;
4344 double *score;
4345
4346 zsetobj = lookupKeyWrite(c->db,key);
4347 if (zsetobj == NULL) {
4348 zsetobj = createZsetObject();
4349 dictAdd(c->db->dict,key,zsetobj);
4350 incrRefCount(key);
4351 } else {
4352 if (zsetobj->type != REDIS_ZSET) {
4353 addReply(c,shared.wrongtypeerr);
4354 return;
4355 }
4356 }
4357 zs = zsetobj->ptr;
4358
4359 /* Ok now since we implement both ZADD and ZINCRBY here the code
4360 * needs to handle the two different conditions. It's all about setting
4361 * '*score', that is, the new score to set, to the right value. */
4362 score = zmalloc(sizeof(double));
4363 if (doincrement) {
4364 dictEntry *de;
4365
4366 /* Read the old score. If the element was not present starts from 0 */
4367 de = dictFind(zs->dict,ele);
4368 if (de) {
4369 double *oldscore = dictGetEntryVal(de);
4370 *score = *oldscore + scoreval;
4371 } else {
4372 *score = scoreval;
4373 }
4374 } else {
4375 *score = scoreval;
4376 }
4377
4378 /* What follows is a simple remove and re-insert operation that is common
4379 * to both ZADD and ZINCRBY... */
4380 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4381 /* case 1: New element */
4382 incrRefCount(ele); /* added to hash */
4383 zslInsert(zs->zsl,*score,ele);
4384 incrRefCount(ele); /* added to skiplist */
4385 server.dirty++;
4386 if (doincrement)
4387 addReplyDouble(c,*score);
4388 else
4389 addReply(c,shared.cone);
4390 } else {
4391 dictEntry *de;
4392 double *oldscore;
4393
4394 /* case 2: Score update operation */
4395 de = dictFind(zs->dict,ele);
4396 assert(de != NULL);
4397 oldscore = dictGetEntryVal(de);
4398 if (*score != *oldscore) {
4399 int deleted;
4400
4401 /* Remove and insert the element in the skip list with new score */
4402 deleted = zslDelete(zs->zsl,*oldscore,ele);
4403 assert(deleted != 0);
4404 zslInsert(zs->zsl,*score,ele);
4405 incrRefCount(ele);
4406 /* Update the score in the hash table */
4407 dictReplace(zs->dict,ele,score);
4408 server.dirty++;
4409 } else {
4410 zfree(score);
4411 }
4412 if (doincrement)
4413 addReplyDouble(c,*score);
4414 else
4415 addReply(c,shared.czero);
4416 }
4417 }
4418
4419 static void zaddCommand(redisClient *c) {
4420 double scoreval;
4421
4422 scoreval = strtod(c->argv[2]->ptr,NULL);
4423 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4424 }
4425
4426 static void zincrbyCommand(redisClient *c) {
4427 double scoreval;
4428
4429 scoreval = strtod(c->argv[2]->ptr,NULL);
4430 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4431 }
4432
4433 static void zremCommand(redisClient *c) {
4434 robj *zsetobj;
4435 zset *zs;
4436
4437 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4438 if (zsetobj == NULL) {
4439 addReply(c,shared.czero);
4440 } else {
4441 dictEntry *de;
4442 double *oldscore;
4443 int deleted;
4444
4445 if (zsetobj->type != REDIS_ZSET) {
4446 addReply(c,shared.wrongtypeerr);
4447 return;
4448 }
4449 zs = zsetobj->ptr;
4450 de = dictFind(zs->dict,c->argv[2]);
4451 if (de == NULL) {
4452 addReply(c,shared.czero);
4453 return;
4454 }
4455 /* Delete from the skiplist */
4456 oldscore = dictGetEntryVal(de);
4457 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4458 assert(deleted != 0);
4459
4460 /* Delete from the hash table */
4461 dictDelete(zs->dict,c->argv[2]);
4462 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4463 server.dirty++;
4464 addReply(c,shared.cone);
4465 }
4466 }
4467
4468 static void zremrangebyscoreCommand(redisClient *c) {
4469 double min = strtod(c->argv[2]->ptr,NULL);
4470 double max = strtod(c->argv[3]->ptr,NULL);
4471 robj *zsetobj;
4472 zset *zs;
4473
4474 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4475 if (zsetobj == NULL) {
4476 addReply(c,shared.czero);
4477 } else {
4478 long deleted;
4479
4480 if (zsetobj->type != REDIS_ZSET) {
4481 addReply(c,shared.wrongtypeerr);
4482 return;
4483 }
4484 zs = zsetobj->ptr;
4485 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4486 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4487 server.dirty += deleted;
4488 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4489 }
4490 }
4491
4492 static void zrangeGenericCommand(redisClient *c, int reverse) {
4493 robj *o;
4494 int start = atoi(c->argv[2]->ptr);
4495 int end = atoi(c->argv[3]->ptr);
4496
4497 o = lookupKeyRead(c->db,c->argv[1]);
4498 if (o == NULL) {
4499 addReply(c,shared.nullmultibulk);
4500 } else {
4501 if (o->type != REDIS_ZSET) {
4502 addReply(c,shared.wrongtypeerr);
4503 } else {
4504 zset *zsetobj = o->ptr;
4505 zskiplist *zsl = zsetobj->zsl;
4506 zskiplistNode *ln;
4507
4508 int llen = zsl->length;
4509 int rangelen, j;
4510 robj *ele;
4511
4512 /* convert negative indexes */
4513 if (start < 0) start = llen+start;
4514 if (end < 0) end = llen+end;
4515 if (start < 0) start = 0;
4516 if (end < 0) end = 0;
4517
4518 /* indexes sanity checks */
4519 if (start > end || start >= llen) {
4520 /* Out of range start or start > end result in empty list */
4521 addReply(c,shared.emptymultibulk);
4522 return;
4523 }
4524 if (end >= llen) end = llen-1;
4525 rangelen = (end-start)+1;
4526
4527 /* Return the result in form of a multi-bulk reply */
4528 if (reverse) {
4529 ln = zsl->tail;
4530 while (start--)
4531 ln = ln->backward;
4532 } else {
4533 ln = zsl->header->forward[0];
4534 while (start--)
4535 ln = ln->forward[0];
4536 }
4537
4538 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4539 for (j = 0; j < rangelen; j++) {
4540 ele = ln->obj;
4541 addReplyBulkLen(c,ele);
4542 addReply(c,ele);
4543 addReply(c,shared.crlf);
4544 ln = reverse ? ln->backward : ln->forward[0];
4545 }
4546 }
4547 }
4548 }
4549
4550 static void zrangeCommand(redisClient *c) {
4551 zrangeGenericCommand(c,0);
4552 }
4553
4554 static void zrevrangeCommand(redisClient *c) {
4555 zrangeGenericCommand(c,1);
4556 }
4557
4558 static void zrangebyscoreCommand(redisClient *c) {
4559 robj *o;
4560 double min = strtod(c->argv[2]->ptr,NULL);
4561 double max = strtod(c->argv[3]->ptr,NULL);
4562
4563 o = lookupKeyRead(c->db,c->argv[1]);
4564 if (o == NULL) {
4565 addReply(c,shared.nullmultibulk);
4566 } else {
4567 if (o->type != REDIS_ZSET) {
4568 addReply(c,shared.wrongtypeerr);
4569 } else {
4570 zset *zsetobj = o->ptr;
4571 zskiplist *zsl = zsetobj->zsl;
4572 zskiplistNode *ln;
4573 robj *ele, *lenobj;
4574 unsigned int rangelen = 0;
4575
4576 /* Get the first node with the score >= min */
4577 ln = zslFirstWithScore(zsl,min);
4578 if (ln == NULL) {
4579 /* No element matching the speciifed interval */
4580 addReply(c,shared.emptymultibulk);
4581 return;
4582 }
4583
4584 /* We don't know in advance how many matching elements there
4585 * are in the list, so we push this object that will represent
4586 * the multi-bulk length in the output buffer, and will "fix"
4587 * it later */
4588 lenobj = createObject(REDIS_STRING,NULL);
4589 addReply(c,lenobj);
4590
4591 while(ln && ln->score <= max) {
4592 ele = ln->obj;
4593 addReplyBulkLen(c,ele);
4594 addReply(c,ele);
4595 addReply(c,shared.crlf);
4596 ln = ln->forward[0];
4597 rangelen++;
4598 }
4599 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4600 }
4601 }
4602 }
4603
4604 static void zcardCommand(redisClient *c) {
4605 robj *o;
4606 zset *zs;
4607
4608 o = lookupKeyRead(c->db,c->argv[1]);
4609 if (o == NULL) {
4610 addReply(c,shared.czero);
4611 return;
4612 } else {
4613 if (o->type != REDIS_ZSET) {
4614 addReply(c,shared.wrongtypeerr);
4615 } else {
4616 zs = o->ptr;
4617 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4618 }
4619 }
4620 }
4621
4622 static void zscoreCommand(redisClient *c) {
4623 robj *o;
4624 zset *zs;
4625
4626 o = lookupKeyRead(c->db,c->argv[1]);
4627 if (o == NULL) {
4628 addReply(c,shared.nullbulk);
4629 return;
4630 } else {
4631 if (o->type != REDIS_ZSET) {
4632 addReply(c,shared.wrongtypeerr);
4633 } else {
4634 dictEntry *de;
4635
4636 zs = o->ptr;
4637 de = dictFind(zs->dict,c->argv[2]);
4638 if (!de) {
4639 addReply(c,shared.nullbulk);
4640 } else {
4641 double *score = dictGetEntryVal(de);
4642
4643 addReplyDouble(c,*score);
4644 }
4645 }
4646 }
4647 }
4648
4649 /* ========================= Non type-specific commands ==================== */
4650
4651 static void flushdbCommand(redisClient *c) {
4652 server.dirty += dictSize(c->db->dict);
4653 dictEmpty(c->db->dict);
4654 dictEmpty(c->db->expires);
4655 addReply(c,shared.ok);
4656 }
4657
4658 static void flushallCommand(redisClient *c) {
4659 server.dirty += emptyDb();
4660 addReply(c,shared.ok);
4661 rdbSave(server.dbfilename);
4662 server.dirty++;
4663 }
4664
4665 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4666 redisSortOperation *so = zmalloc(sizeof(*so));
4667 so->type = type;
4668 so->pattern = pattern;
4669 return so;
4670 }
4671
4672 /* Return the value associated to the key with a name obtained
4673 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4674 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4675 char *p;
4676 sds spat, ssub;
4677 robj keyobj;
4678 int prefixlen, sublen, postfixlen;
4679 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4680 struct {
4681 long len;
4682 long free;
4683 char buf[REDIS_SORTKEY_MAX+1];
4684 } keyname;
4685
4686 /* If the pattern is "#" return the substitution object itself in order
4687 * to implement the "SORT ... GET #" feature. */
4688 spat = pattern->ptr;
4689 if (spat[0] == '#' && spat[1] == '\0') {
4690 return subst;
4691 }
4692
4693 /* The substitution object may be specially encoded. If so we create
4694 * a decoded object on the fly. Otherwise getDecodedObject will just
4695 * increment the ref count, that we'll decrement later. */
4696 subst = getDecodedObject(subst);
4697
4698 ssub = subst->ptr;
4699 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4700 p = strchr(spat,'*');
4701 if (!p) {
4702 decrRefCount(subst);
4703 return NULL;
4704 }
4705
4706 prefixlen = p-spat;
4707 sublen = sdslen(ssub);
4708 postfixlen = sdslen(spat)-(prefixlen+1);
4709 memcpy(keyname.buf,spat,prefixlen);
4710 memcpy(keyname.buf+prefixlen,ssub,sublen);
4711 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4712 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4713 keyname.len = prefixlen+sublen+postfixlen;
4714
4715 keyobj.refcount = 1;
4716 keyobj.type = REDIS_STRING;
4717 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4718
4719 decrRefCount(subst);
4720
4721 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4722 return lookupKeyRead(db,&keyobj);
4723 }
4724
4725 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4726 * the additional parameter is not standard but a BSD-specific we have to
4727 * pass sorting parameters via the global 'server' structure */
4728 static int sortCompare(const void *s1, const void *s2) {
4729 const redisSortObject *so1 = s1, *so2 = s2;
4730 int cmp;
4731
4732 if (!server.sort_alpha) {
4733 /* Numeric sorting. Here it's trivial as we precomputed scores */
4734 if (so1->u.score > so2->u.score) {
4735 cmp = 1;
4736 } else if (so1->u.score < so2->u.score) {
4737 cmp = -1;
4738 } else {
4739 cmp = 0;
4740 }
4741 } else {
4742 /* Alphanumeric sorting */
4743 if (server.sort_bypattern) {
4744 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4745 /* At least one compare object is NULL */
4746 if (so1->u.cmpobj == so2->u.cmpobj)
4747 cmp = 0;
4748 else if (so1->u.cmpobj == NULL)
4749 cmp = -1;
4750 else
4751 cmp = 1;
4752 } else {
4753 /* We have both the objects, use strcoll */
4754 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4755 }
4756 } else {
4757 /* Compare elements directly */
4758 robj *dec1, *dec2;
4759
4760 dec1 = getDecodedObject(so1->obj);
4761 dec2 = getDecodedObject(so2->obj);
4762 cmp = strcoll(dec1->ptr,dec2->ptr);
4763 decrRefCount(dec1);
4764 decrRefCount(dec2);
4765 }
4766 }
4767 return server.sort_desc ? -cmp : cmp;
4768 }
4769
4770 /* The SORT command is the most complex command in Redis. Warning: this code
4771 * is optimized for speed and a bit less for readability */
4772 static void sortCommand(redisClient *c) {
4773 list *operations;
4774 int outputlen = 0;
4775 int desc = 0, alpha = 0;
4776 int limit_start = 0, limit_count = -1, start, end;
4777 int j, dontsort = 0, vectorlen;
4778 int getop = 0; /* GET operation counter */
4779 robj *sortval, *sortby = NULL, *storekey = NULL;
4780 redisSortObject *vector; /* Resulting vector to sort */
4781
4782 /* Lookup the key to sort. It must be of the right types */
4783 sortval = lookupKeyRead(c->db,c->argv[1]);
4784 if (sortval == NULL) {
4785 addReply(c,shared.nokeyerr);
4786 return;
4787 }
4788 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4789 addReply(c,shared.wrongtypeerr);
4790 return;
4791 }
4792
4793 /* Create a list of operations to perform for every sorted element.
4794 * Operations can be GET/DEL/INCR/DECR */
4795 operations = listCreate();
4796 listSetFreeMethod(operations,zfree);
4797 j = 2;
4798
4799 /* Now we need to protect sortval incrementing its count, in the future
4800 * SORT may have options able to overwrite/delete keys during the sorting
4801 * and the sorted key itself may get destroied */
4802 incrRefCount(sortval);
4803
4804 /* The SORT command has an SQL-alike syntax, parse it */
4805 while(j < c->argc) {
4806 int leftargs = c->argc-j-1;
4807 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4808 desc = 0;
4809 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4810 desc = 1;
4811 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4812 alpha = 1;
4813 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4814 limit_start = atoi(c->argv[j+1]->ptr);
4815 limit_count = atoi(c->argv[j+2]->ptr);
4816 j+=2;
4817 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4818 storekey = c->argv[j+1];
4819 j++;
4820 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4821 sortby = c->argv[j+1];
4822 /* If the BY pattern does not contain '*', i.e. it is constant,
4823 * we don't need to sort nor to lookup the weight keys. */
4824 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4825 j++;
4826 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4827 listAddNodeTail(operations,createSortOperation(
4828 REDIS_SORT_GET,c->argv[j+1]));
4829 getop++;
4830 j++;
4831 } else {
4832 decrRefCount(sortval);
4833 listRelease(operations);
4834 addReply(c,shared.syntaxerr);
4835 return;
4836 }
4837 j++;
4838 }
4839
4840 /* Load the sorting vector with all the objects to sort */
4841 vectorlen = (sortval->type == REDIS_LIST) ?
4842 listLength((list*)sortval->ptr) :
4843 dictSize((dict*)sortval->ptr);
4844 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4845 j = 0;
4846 if (sortval->type == REDIS_LIST) {
4847 list *list = sortval->ptr;
4848 listNode *ln;
4849
4850 listRewind(list);
4851 while((ln = listYield(list))) {
4852 robj *ele = ln->value;
4853 vector[j].obj = ele;
4854 vector[j].u.score = 0;
4855 vector[j].u.cmpobj = NULL;
4856 j++;
4857 }
4858 } else {
4859 dict *set = sortval->ptr;
4860 dictIterator *di;
4861 dictEntry *setele;
4862
4863 di = dictGetIterator(set);
4864 while((setele = dictNext(di)) != NULL) {
4865 vector[j].obj = dictGetEntryKey(setele);
4866 vector[j].u.score = 0;
4867 vector[j].u.cmpobj = NULL;
4868 j++;
4869 }
4870 dictReleaseIterator(di);
4871 }
4872 assert(j == vectorlen);
4873
4874 /* Now it's time to load the right scores in the sorting vector */
4875 if (dontsort == 0) {
4876 for (j = 0; j < vectorlen; j++) {
4877 if (sortby) {
4878 robj *byval;
4879
4880 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4881 if (!byval || byval->type != REDIS_STRING) continue;
4882 if (alpha) {
4883 vector[j].u.cmpobj = getDecodedObject(byval);
4884 } else {
4885 if (byval->encoding == REDIS_ENCODING_RAW) {
4886 vector[j].u.score = strtod(byval->ptr,NULL);
4887 } else {
4888 /* Don't need to decode the object if it's
4889 * integer-encoded (the only encoding supported) so
4890 * far. We can just cast it */
4891 if (byval->encoding == REDIS_ENCODING_INT) {
4892 vector[j].u.score = (long)byval->ptr;
4893 } else
4894 assert(1 != 1);
4895 }
4896 }
4897 } else {
4898 if (!alpha) {
4899 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4900 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4901 else {
4902 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4903 vector[j].u.score = (long) vector[j].obj->ptr;
4904 else
4905 assert(1 != 1);
4906 }
4907 }
4908 }
4909 }
4910 }
4911
4912 /* We are ready to sort the vector... perform a bit of sanity check
4913 * on the LIMIT option too. We'll use a partial version of quicksort. */
4914 start = (limit_start < 0) ? 0 : limit_start;
4915 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4916 if (start >= vectorlen) {
4917 start = vectorlen-1;
4918 end = vectorlen-2;
4919 }
4920 if (end >= vectorlen) end = vectorlen-1;
4921
4922 if (dontsort == 0) {
4923 server.sort_desc = desc;
4924 server.sort_alpha = alpha;
4925 server.sort_bypattern = sortby ? 1 : 0;
4926 if (sortby && (start != 0 || end != vectorlen-1))
4927 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4928 else
4929 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4930 }
4931
4932 /* Send command output to the output buffer, performing the specified
4933 * GET/DEL/INCR/DECR operations if any. */
4934 outputlen = getop ? getop*(end-start+1) : end-start+1;
4935 if (storekey == NULL) {
4936 /* STORE option not specified, sent the sorting result to client */
4937 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4938 for (j = start; j <= end; j++) {
4939 listNode *ln;
4940 if (!getop) {
4941 addReplyBulkLen(c,vector[j].obj);
4942 addReply(c,vector[j].obj);
4943 addReply(c,shared.crlf);
4944 }
4945 listRewind(operations);
4946 while((ln = listYield(operations))) {
4947 redisSortOperation *sop = ln->value;
4948 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4949 vector[j].obj);
4950
4951 if (sop->type == REDIS_SORT_GET) {
4952 if (!val || val->type != REDIS_STRING) {
4953 addReply(c,shared.nullbulk);
4954 } else {
4955 addReplyBulkLen(c,val);
4956 addReply(c,val);
4957 addReply(c,shared.crlf);
4958 }
4959 } else {
4960 assert(sop->type == REDIS_SORT_GET); /* always fails */
4961 }
4962 }
4963 }
4964 } else {
4965 robj *listObject = createListObject();
4966 list *listPtr = (list*) listObject->ptr;
4967
4968 /* STORE option specified, set the sorting result as a List object */
4969 for (j = start; j <= end; j++) {
4970 listNode *ln;
4971 if (!getop) {
4972 listAddNodeTail(listPtr,vector[j].obj);
4973 incrRefCount(vector[j].obj);
4974 }
4975 listRewind(operations);
4976 while((ln = listYield(operations))) {
4977 redisSortOperation *sop = ln->value;
4978 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4979 vector[j].obj);
4980
4981 if (sop->type == REDIS_SORT_GET) {
4982 if (!val || val->type != REDIS_STRING) {
4983 listAddNodeTail(listPtr,createStringObject("",0));
4984 } else {
4985 listAddNodeTail(listPtr,val);
4986 incrRefCount(val);
4987 }
4988 } else {
4989 assert(sop->type == REDIS_SORT_GET); /* always fails */
4990 }
4991 }
4992 }
4993 if (dictReplace(c->db->dict,storekey,listObject)) {
4994 incrRefCount(storekey);
4995 }
4996 /* Note: we add 1 because the DB is dirty anyway since even if the
4997 * SORT result is empty a new key is set and maybe the old content
4998 * replaced. */
4999 server.dirty += 1+outputlen;
5000 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5001 }
5002
5003 /* Cleanup */
5004 decrRefCount(sortval);
5005 listRelease(operations);
5006 for (j = 0; j < vectorlen; j++) {
5007 if (sortby && alpha && vector[j].u.cmpobj)
5008 decrRefCount(vector[j].u.cmpobj);
5009 }
5010 zfree(vector);
5011 }
5012
5013 static void infoCommand(redisClient *c) {
5014 sds info;
5015 time_t uptime = time(NULL)-server.stat_starttime;
5016 int j;
5017
5018 info = sdscatprintf(sdsempty(),
5019 "redis_version:%s\r\n"
5020 "arch_bits:%s\r\n"
5021 "uptime_in_seconds:%d\r\n"
5022 "uptime_in_days:%d\r\n"
5023 "connected_clients:%d\r\n"
5024 "connected_slaves:%d\r\n"
5025 "used_memory:%zu\r\n"
5026 "changes_since_last_save:%lld\r\n"
5027 "bgsave_in_progress:%d\r\n"
5028 "last_save_time:%d\r\n"
5029 "total_connections_received:%lld\r\n"
5030 "total_commands_processed:%lld\r\n"
5031 "role:%s\r\n"
5032 ,REDIS_VERSION,
5033 (sizeof(long) == 8) ? "64" : "32",
5034 uptime,
5035 uptime/(3600*24),
5036 listLength(server.clients)-listLength(server.slaves),
5037 listLength(server.slaves),
5038 server.usedmemory,
5039 server.dirty,
5040 server.bgsavechildpid != -1,
5041 server.lastsave,
5042 server.stat_numconnections,
5043 server.stat_numcommands,
5044 server.masterhost == NULL ? "master" : "slave"
5045 );
5046 if (server.masterhost) {
5047 info = sdscatprintf(info,
5048 "master_host:%s\r\n"
5049 "master_port:%d\r\n"
5050 "master_link_status:%s\r\n"
5051 "master_last_io_seconds_ago:%d\r\n"
5052 ,server.masterhost,
5053 server.masterport,
5054 (server.replstate == REDIS_REPL_CONNECTED) ?
5055 "up" : "down",
5056 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5057 );
5058 }
5059 for (j = 0; j < server.dbnum; j++) {
5060 long long keys, vkeys;
5061
5062 keys = dictSize(server.db[j].dict);
5063 vkeys = dictSize(server.db[j].expires);
5064 if (keys || vkeys) {
5065 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5066 j, keys, vkeys);
5067 }
5068 }
5069 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
5070 addReplySds(c,info);
5071 addReply(c,shared.crlf);
5072 }
5073
5074 static void monitorCommand(redisClient *c) {
5075 /* ignore MONITOR if aleady slave or in monitor mode */
5076 if (c->flags & REDIS_SLAVE) return;
5077
5078 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5079 c->slaveseldb = 0;
5080 listAddNodeTail(server.monitors,c);
5081 addReply(c,shared.ok);
5082 }
5083
5084 /* ================================= Expire ================================= */
5085 static int removeExpire(redisDb *db, robj *key) {
5086 if (dictDelete(db->expires,key) == DICT_OK) {
5087 return 1;
5088 } else {
5089 return 0;
5090 }
5091 }
5092
5093 static int setExpire(redisDb *db, robj *key, time_t when) {
5094 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5095 return 0;
5096 } else {
5097 incrRefCount(key);
5098 return 1;
5099 }
5100 }
5101
5102 /* Return the expire time of the specified key, or -1 if no expire
5103 * is associated with this key (i.e. the key is non volatile) */
5104 static time_t getExpire(redisDb *db, robj *key) {
5105 dictEntry *de;
5106
5107 /* No expire? return ASAP */
5108 if (dictSize(db->expires) == 0 ||
5109 (de = dictFind(db->expires,key)) == NULL) return -1;
5110
5111 return (time_t) dictGetEntryVal(de);
5112 }
5113
5114 static int expireIfNeeded(redisDb *db, robj *key) {
5115 time_t when;
5116 dictEntry *de;
5117
5118 /* No expire? return ASAP */
5119 if (dictSize(db->expires) == 0 ||
5120 (de = dictFind(db->expires,key)) == NULL) return 0;
5121
5122 /* Lookup the expire */
5123 when = (time_t) dictGetEntryVal(de);
5124 if (time(NULL) <= when) return 0;
5125
5126 /* Delete the key */
5127 dictDelete(db->expires,key);
5128 return dictDelete(db->dict,key) == DICT_OK;
5129 }
5130
5131 static int deleteIfVolatile(redisDb *db, robj *key) {
5132 dictEntry *de;
5133
5134 /* No expire? return ASAP */
5135 if (dictSize(db->expires) == 0 ||
5136 (de = dictFind(db->expires,key)) == NULL) return 0;
5137
5138 /* Delete the key */
5139 server.dirty++;
5140 dictDelete(db->expires,key);
5141 return dictDelete(db->dict,key) == DICT_OK;
5142 }
5143
5144 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5145 dictEntry *de;
5146
5147 de = dictFind(c->db->dict,key);
5148 if (de == NULL) {
5149 addReply(c,shared.czero);
5150 return;
5151 }
5152 if (seconds < 0) {
5153 if (deleteKey(c->db,key)) server.dirty++;
5154 addReply(c, shared.cone);
5155 return;
5156 } else {
5157 time_t when = time(NULL)+seconds;
5158 if (setExpire(c->db,key,when)) {
5159 addReply(c,shared.cone);
5160 server.dirty++;
5161 } else {
5162 addReply(c,shared.czero);
5163 }
5164 return;
5165 }
5166 }
5167
5168 static void expireCommand(redisClient *c) {
5169 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5170 }
5171
5172 static void expireatCommand(redisClient *c) {
5173 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5174 }
5175
5176 static void ttlCommand(redisClient *c) {
5177 time_t expire;
5178 int ttl = -1;
5179
5180 expire = getExpire(c->db,c->argv[1]);
5181 if (expire != -1) {
5182 ttl = (int) (expire-time(NULL));
5183 if (ttl < 0) ttl = -1;
5184 }
5185 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5186 }
5187
5188 /* =============================== Replication ============================= */
5189
5190 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5191 ssize_t nwritten, ret = size;
5192 time_t start = time(NULL);
5193
5194 timeout++;
5195 while(size) {
5196 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5197 nwritten = write(fd,ptr,size);
5198 if (nwritten == -1) return -1;
5199 ptr += nwritten;
5200 size -= nwritten;
5201 }
5202 if ((time(NULL)-start) > timeout) {
5203 errno = ETIMEDOUT;
5204 return -1;
5205 }
5206 }
5207 return ret;
5208 }
5209
5210 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5211 ssize_t nread, totread = 0;
5212 time_t start = time(NULL);
5213
5214 timeout++;
5215 while(size) {
5216 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5217 nread = read(fd,ptr,size);
5218 if (nread == -1) return -1;
5219 ptr += nread;
5220 size -= nread;
5221 totread += nread;
5222 }
5223 if ((time(NULL)-start) > timeout) {
5224 errno = ETIMEDOUT;
5225 return -1;
5226 }
5227 }
5228 return totread;
5229 }
5230
5231 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5232 ssize_t nread = 0;
5233
5234 size--;
5235 while(size) {
5236 char c;
5237
5238 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5239 if (c == '\n') {
5240 *ptr = '\0';
5241 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5242 return nread;
5243 } else {
5244 *ptr++ = c;
5245 *ptr = '\0';
5246 nread++;
5247 }
5248 }
5249 return nread;
5250 }
5251
5252 static void syncCommand(redisClient *c) {
5253 /* ignore SYNC if aleady slave or in monitor mode */
5254 if (c->flags & REDIS_SLAVE) return;
5255
5256 /* SYNC can't be issued when the server has pending data to send to
5257 * the client about already issued commands. We need a fresh reply
5258 * buffer registering the differences between the BGSAVE and the current
5259 * dataset, so that we can copy to other slaves if needed. */
5260 if (listLength(c->reply) != 0) {
5261 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5262 return;
5263 }
5264
5265 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5266 /* Here we need to check if there is a background saving operation
5267 * in progress, or if it is required to start one */
5268 if (server.bgsavechildpid != -1) {
5269 /* Ok a background save is in progress. Let's check if it is a good
5270 * one for replication, i.e. if there is another slave that is
5271 * registering differences since the server forked to save */
5272 redisClient *slave;
5273 listNode *ln;
5274
5275 listRewind(server.slaves);
5276 while((ln = listYield(server.slaves))) {
5277 slave = ln->value;
5278 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5279 }
5280 if (ln) {
5281 /* Perfect, the server is already registering differences for
5282 * another slave. Set the right state, and copy the buffer. */
5283 listRelease(c->reply);
5284 c->reply = listDup(slave->reply);
5285 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5286 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5287 } else {
5288 /* No way, we need to wait for the next BGSAVE in order to
5289 * register differences */
5290 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5291 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5292 }
5293 } else {
5294 /* Ok we don't have a BGSAVE in progress, let's start one */
5295 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5296 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5297 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5298 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5299 return;
5300 }
5301 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5302 }
5303 c->repldbfd = -1;
5304 c->flags |= REDIS_SLAVE;
5305 c->slaveseldb = 0;
5306 listAddNodeTail(server.slaves,c);
5307 return;
5308 }
5309
5310 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5311 redisClient *slave = privdata;
5312 REDIS_NOTUSED(el);
5313 REDIS_NOTUSED(mask);
5314 char buf[REDIS_IOBUF_LEN];
5315 ssize_t nwritten, buflen;
5316
5317 if (slave->repldboff == 0) {
5318 /* Write the bulk write count before to transfer the DB. In theory here
5319 * we don't know how much room there is in the output buffer of the
5320 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5321 * operations) will never be smaller than the few bytes we need. */
5322 sds bulkcount;
5323
5324 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5325 slave->repldbsize);
5326 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5327 {
5328 sdsfree(bulkcount);
5329 freeClient(slave);
5330 return;
5331 }
5332 sdsfree(bulkcount);
5333 }
5334 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5335 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5336 if (buflen <= 0) {
5337 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5338 (buflen == 0) ? "premature EOF" : strerror(errno));
5339 freeClient(slave);
5340 return;
5341 }
5342 if ((nwritten = write(fd,buf,buflen)) == -1) {
5343 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5344 strerror(errno));
5345 freeClient(slave);
5346 return;
5347 }
5348 slave->repldboff += nwritten;
5349 if (slave->repldboff == slave->repldbsize) {
5350 close(slave->repldbfd);
5351 slave->repldbfd = -1;
5352 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5353 slave->replstate = REDIS_REPL_ONLINE;
5354 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5355 sendReplyToClient, slave) == AE_ERR) {
5356 freeClient(slave);
5357 return;
5358 }
5359 addReplySds(slave,sdsempty());
5360 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5361 }
5362 }
5363
5364 /* This function is called at the end of every backgrond saving.
5365 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5366 * otherwise REDIS_ERR is passed to the function.
5367 *
5368 * The goal of this function is to handle slaves waiting for a successful
5369 * background saving in order to perform non-blocking synchronization. */
5370 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5371 listNode *ln;
5372 int startbgsave = 0;
5373
5374 listRewind(server.slaves);
5375 while((ln = listYield(server.slaves))) {
5376 redisClient *slave = ln->value;
5377
5378 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5379 startbgsave = 1;
5380 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5381 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5382 struct redis_stat buf;
5383
5384 if (bgsaveerr != REDIS_OK) {
5385 freeClient(slave);
5386 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5387 continue;
5388 }
5389 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5390 redis_fstat(slave->repldbfd,&buf) == -1) {
5391 freeClient(slave);
5392 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5393 continue;
5394 }
5395 slave->repldboff = 0;
5396 slave->repldbsize = buf.st_size;
5397 slave->replstate = REDIS_REPL_SEND_BULK;
5398 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5399 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5400 freeClient(slave);
5401 continue;
5402 }
5403 }
5404 }
5405 if (startbgsave) {
5406 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5407 listRewind(server.slaves);
5408 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5409 while((ln = listYield(server.slaves))) {
5410 redisClient *slave = ln->value;
5411
5412 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5413 freeClient(slave);
5414 }
5415 }
5416 }
5417 }
5418
5419 static int syncWithMaster(void) {
5420 char buf[1024], tmpfile[256], authcmd[1024];
5421 int dumpsize;
5422 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5423 int dfd;
5424
5425 if (fd == -1) {
5426 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5427 strerror(errno));
5428 return REDIS_ERR;
5429 }
5430
5431 /* AUTH with the master if required. */
5432 if(server.masterauth) {
5433 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5434 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5435 close(fd);
5436 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5437 strerror(errno));
5438 return REDIS_ERR;
5439 }
5440 /* Read the AUTH result. */
5441 if (syncReadLine(fd,buf,1024,3600) == -1) {
5442 close(fd);
5443 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5444 strerror(errno));
5445 return REDIS_ERR;
5446 }
5447 if (buf[0] != '+') {
5448 close(fd);
5449 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5450 return REDIS_ERR;
5451 }
5452 }
5453
5454 /* Issue the SYNC command */
5455 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5456 close(fd);
5457 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5458 strerror(errno));
5459 return REDIS_ERR;
5460 }
5461 /* Read the bulk write count */
5462 if (syncReadLine(fd,buf,1024,3600) == -1) {
5463 close(fd);
5464 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5465 strerror(errno));
5466 return REDIS_ERR;
5467 }
5468 if (buf[0] != '$') {
5469 close(fd);
5470 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5471 return REDIS_ERR;
5472 }
5473 dumpsize = atoi(buf+1);
5474 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5475 /* Read the bulk write data on a temp file */
5476 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5477 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5478 if (dfd == -1) {
5479 close(fd);
5480 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5481 return REDIS_ERR;
5482 }
5483 while(dumpsize) {
5484 int nread, nwritten;
5485
5486 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5487 if (nread == -1) {
5488 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5489 strerror(errno));
5490 close(fd);
5491 close(dfd);
5492 return REDIS_ERR;
5493 }
5494 nwritten = write(dfd,buf,nread);
5495 if (nwritten == -1) {
5496 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5497 close(fd);
5498 close(dfd);
5499 return REDIS_ERR;
5500 }
5501 dumpsize -= nread;
5502 }
5503 close(dfd);
5504 if (rename(tmpfile,server.dbfilename) == -1) {
5505 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5506 unlink(tmpfile);
5507 close(fd);
5508 return REDIS_ERR;
5509 }
5510 emptyDb();
5511 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5512 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5513 close(fd);
5514 return REDIS_ERR;
5515 }
5516 server.master = createClient(fd);
5517 server.master->flags |= REDIS_MASTER;
5518 server.replstate = REDIS_REPL_CONNECTED;
5519 return REDIS_OK;
5520 }
5521
5522 static void slaveofCommand(redisClient *c) {
5523 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5524 !strcasecmp(c->argv[2]->ptr,"one")) {
5525 if (server.masterhost) {
5526 sdsfree(server.masterhost);
5527 server.masterhost = NULL;
5528 if (server.master) freeClient(server.master);
5529 server.replstate = REDIS_REPL_NONE;
5530 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5531 }
5532 } else {
5533 sdsfree(server.masterhost);
5534 server.masterhost = sdsdup(c->argv[1]->ptr);
5535 server.masterport = atoi(c->argv[2]->ptr);
5536 if (server.master) freeClient(server.master);
5537 server.replstate = REDIS_REPL_CONNECT;
5538 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5539 server.masterhost, server.masterport);
5540 }
5541 addReply(c,shared.ok);
5542 }
5543
5544 /* ============================ Maxmemory directive ======================== */
5545
5546 /* This function gets called when 'maxmemory' is set on the config file to limit
5547 * the max memory used by the server, and we are out of memory.
5548 * This function will try to, in order:
5549 *
5550 * - Free objects from the free list
5551 * - Try to remove keys with an EXPIRE set
5552 *
5553 * It is not possible to free enough memory to reach used-memory < maxmemory
5554 * the server will start refusing commands that will enlarge even more the
5555 * memory usage.
5556 */
5557 static void freeMemoryIfNeeded(void) {
5558 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5559 if (listLength(server.objfreelist)) {
5560 robj *o;
5561
5562 listNode *head = listFirst(server.objfreelist);
5563 o = listNodeValue(head);
5564 listDelNode(server.objfreelist,head);
5565 zfree(o);
5566 } else {
5567 int j, k, freed = 0;
5568
5569 for (j = 0; j < server.dbnum; j++) {
5570 int minttl = -1;
5571 robj *minkey = NULL;
5572 struct dictEntry *de;
5573
5574 if (dictSize(server.db[j].expires)) {
5575 freed = 1;
5576 /* From a sample of three keys drop the one nearest to
5577 * the natural expire */
5578 for (k = 0; k < 3; k++) {
5579 time_t t;
5580
5581 de = dictGetRandomKey(server.db[j].expires);
5582 t = (time_t) dictGetEntryVal(de);
5583 if (minttl == -1 || t < minttl) {
5584 minkey = dictGetEntryKey(de);
5585 minttl = t;
5586 }
5587 }
5588 deleteKey(server.db+j,minkey);
5589 }
5590 }
5591 if (!freed) return; /* nothing to free... */
5592 }
5593 }
5594 }
5595
5596 /* ============================== Append Only file ========================== */
5597
5598 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5599 sds buf = sdsempty();
5600 int j;
5601 ssize_t nwritten;
5602 time_t now;
5603 robj *tmpargv[3];
5604
5605 /* The DB this command was targetting is not the same as the last command
5606 * we appendend. To issue a SELECT command is needed. */
5607 if (dictid != server.appendseldb) {
5608 char seldb[64];
5609
5610 snprintf(seldb,sizeof(seldb),"%d",dictid);
5611 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5612 strlen(seldb),seldb);
5613 server.appendseldb = dictid;
5614 }
5615
5616 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5617 * EXPIREs into EXPIREATs calls */
5618 if (cmd->proc == expireCommand) {
5619 long when;
5620
5621 tmpargv[0] = createStringObject("EXPIREAT",8);
5622 tmpargv[1] = argv[1];
5623 incrRefCount(argv[1]);
5624 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5625 tmpargv[2] = createObject(REDIS_STRING,
5626 sdscatprintf(sdsempty(),"%ld",when));
5627 argv = tmpargv;
5628 }
5629
5630 /* Append the actual command */
5631 buf = sdscatprintf(buf,"*%d\r\n",argc);
5632 for (j = 0; j < argc; j++) {
5633 robj *o = argv[j];
5634
5635 o = getDecodedObject(o);
5636 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5637 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5638 buf = sdscatlen(buf,"\r\n",2);
5639 decrRefCount(o);
5640 }
5641
5642 /* Free the objects from the modified argv for EXPIREAT */
5643 if (cmd->proc == expireCommand) {
5644 for (j = 0; j < 3; j++)
5645 decrRefCount(argv[j]);
5646 }
5647
5648 /* We want to perform a single write. This should be guaranteed atomic
5649 * at least if the filesystem we are writing is a real physical one.
5650 * While this will save us against the server being killed I don't think
5651 * there is much to do about the whole server stopping for power problems
5652 * or alike */
5653 nwritten = write(server.appendfd,buf,sdslen(buf));
5654 if (nwritten != (signed)sdslen(buf)) {
5655 /* Ooops, we are in troubles. The best thing to do for now is
5656 * to simply exit instead to give the illusion that everything is
5657 * working as expected. */
5658 if (nwritten == -1) {
5659 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5660 } else {
5661 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5662 }
5663 exit(1);
5664 }
5665 /* If a background append only file rewriting is in progress we want to
5666 * accumulate the differences between the child DB and the current one
5667 * in a buffer, so that when the child process will do its work we
5668 * can append the differences to the new append only file. */
5669 if (server.bgrewritechildpid != -1)
5670 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5671
5672 sdsfree(buf);
5673 now = time(NULL);
5674 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5675 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5676 now-server.lastfsync > 1))
5677 {
5678 fsync(server.appendfd); /* Let's try to get this data on the disk */
5679 server.lastfsync = now;
5680 }
5681 }
5682
5683 /* In Redis commands are always executed in the context of a client, so in
5684 * order to load the append only file we need to create a fake client. */
5685 static struct redisClient *createFakeClient(void) {
5686 struct redisClient *c = zmalloc(sizeof(*c));
5687
5688 selectDb(c,0);
5689 c->fd = -1;
5690 c->querybuf = sdsempty();
5691 c->argc = 0;
5692 c->argv = NULL;
5693 c->flags = 0;
5694 /* We set the fake client as a slave waiting for the synchronization
5695 * so that Redis will not try to send replies to this client. */
5696 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5697 c->reply = listCreate();
5698 listSetFreeMethod(c->reply,decrRefCount);
5699 listSetDupMethod(c->reply,dupClientReplyValue);
5700 return c;
5701 }
5702
5703 static void freeFakeClient(struct redisClient *c) {
5704 sdsfree(c->querybuf);
5705 listRelease(c->reply);
5706 zfree(c);
5707 }
5708
5709 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5710 * error (the append only file is zero-length) REDIS_ERR is returned. On
5711 * fatal error an error message is logged and the program exists. */
5712 int loadAppendOnlyFile(char *filename) {
5713 struct redisClient *fakeClient;
5714 FILE *fp = fopen(filename,"r");
5715 struct redis_stat sb;
5716
5717 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5718 return REDIS_ERR;
5719
5720 if (fp == NULL) {
5721 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5722 exit(1);
5723 }
5724
5725 fakeClient = createFakeClient();
5726 while(1) {
5727 int argc, j;
5728 unsigned long len;
5729 robj **argv;
5730 char buf[128];
5731 sds argsds;
5732 struct redisCommand *cmd;
5733
5734 if (fgets(buf,sizeof(buf),fp) == NULL) {
5735 if (feof(fp))
5736 break;
5737 else
5738 goto readerr;
5739 }
5740 if (buf[0] != '*') goto fmterr;
5741 argc = atoi(buf+1);
5742 argv = zmalloc(sizeof(robj*)*argc);
5743 for (j = 0; j < argc; j++) {
5744 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5745 if (buf[0] != '$') goto fmterr;
5746 len = strtol(buf+1,NULL,10);
5747 argsds = sdsnewlen(NULL,len);
5748 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
5749 argv[j] = createObject(REDIS_STRING,argsds);
5750 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5751 }
5752
5753 /* Command lookup */
5754 cmd = lookupCommand(argv[0]->ptr);
5755 if (!cmd) {
5756 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5757 exit(1);
5758 }
5759 /* Try object sharing and encoding */
5760 if (server.shareobjects) {
5761 int j;
5762 for(j = 1; j < argc; j++)
5763 argv[j] = tryObjectSharing(argv[j]);
5764 }
5765 if (cmd->flags & REDIS_CMD_BULK)
5766 tryObjectEncoding(argv[argc-1]);
5767 /* Run the command in the context of a fake client */
5768 fakeClient->argc = argc;
5769 fakeClient->argv = argv;
5770 cmd->proc(fakeClient);
5771 /* Discard the reply objects list from the fake client */
5772 while(listLength(fakeClient->reply))
5773 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5774 /* Clean up, ready for the next command */
5775 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5776 zfree(argv);
5777 }
5778 fclose(fp);
5779 freeFakeClient(fakeClient);
5780 return REDIS_OK;
5781
5782 readerr:
5783 if (feof(fp)) {
5784 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5785 } else {
5786 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5787 }
5788 exit(1);
5789 fmterr:
5790 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5791 exit(1);
5792 }
5793
5794 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5795 static int fwriteBulk(FILE *fp, robj *obj) {
5796 char buf[128];
5797 obj = getDecodedObject(obj);
5798 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
5799 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
5800 if (fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) goto err;
5801 if (fwrite("\r\n",2,1,fp) == 0) goto err;
5802 decrRefCount(obj);
5803 return 1;
5804 err:
5805 decrRefCount(obj);
5806 return 0;
5807 }
5808
5809 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5810 static int fwriteBulkDouble(FILE *fp, double d) {
5811 char buf[128], dbuf[128];
5812
5813 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
5814 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
5815 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5816 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
5817 return 1;
5818 }
5819
5820 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5821 static int fwriteBulkLong(FILE *fp, long l) {
5822 char buf[128], lbuf[128];
5823
5824 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
5825 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
5826 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5827 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
5828 return 1;
5829 }
5830
5831 /* Write a sequence of commands able to fully rebuild the dataset into
5832 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5833 static int rewriteAppendOnlyFile(char *filename) {
5834 dictIterator *di = NULL;
5835 dictEntry *de;
5836 FILE *fp;
5837 char tmpfile[256];
5838 int j;
5839 time_t now = time(NULL);
5840
5841 /* Note that we have to use a different temp name here compared to the
5842 * one used by rewriteAppendOnlyFileBackground() function. */
5843 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
5844 fp = fopen(tmpfile,"w");
5845 if (!fp) {
5846 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
5847 return REDIS_ERR;
5848 }
5849 for (j = 0; j < server.dbnum; j++) {
5850 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
5851 redisDb *db = server.db+j;
5852 dict *d = db->dict;
5853 if (dictSize(d) == 0) continue;
5854 di = dictGetIterator(d);
5855 if (!di) {
5856 fclose(fp);
5857 return REDIS_ERR;
5858 }
5859
5860 /* SELECT the new DB */
5861 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
5862 if (fwriteBulkLong(fp,j) == 0) goto werr;
5863
5864 /* Iterate this DB writing every entry */
5865 while((de = dictNext(di)) != NULL) {
5866 robj *key = dictGetEntryKey(de);
5867 robj *o = dictGetEntryVal(de);
5868 time_t expiretime = getExpire(db,key);
5869
5870 /* Save the key and associated value */
5871 if (o->type == REDIS_STRING) {
5872 /* Emit a SET command */
5873 char cmd[]="*3\r\n$3\r\nSET\r\n";
5874 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5875 /* Key and value */
5876 if (fwriteBulk(fp,key) == 0) goto werr;
5877 if (fwriteBulk(fp,o) == 0) goto werr;
5878 } else if (o->type == REDIS_LIST) {
5879 /* Emit the RPUSHes needed to rebuild the list */
5880 list *list = o->ptr;
5881 listNode *ln;
5882
5883 listRewind(list);
5884 while((ln = listYield(list))) {
5885 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
5886 robj *eleobj = listNodeValue(ln);
5887
5888 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5889 if (fwriteBulk(fp,key) == 0) goto werr;
5890 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5891 }
5892 } else if (o->type == REDIS_SET) {
5893 /* Emit the SADDs needed to rebuild the set */
5894 dict *set = o->ptr;
5895 dictIterator *di = dictGetIterator(set);
5896 dictEntry *de;
5897
5898 while((de = dictNext(di)) != NULL) {
5899 char cmd[]="*3\r\n$4\r\nSADD\r\n";
5900 robj *eleobj = dictGetEntryKey(de);
5901
5902 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5903 if (fwriteBulk(fp,key) == 0) goto werr;
5904 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5905 }
5906 dictReleaseIterator(di);
5907 } else if (o->type == REDIS_ZSET) {
5908 /* Emit the ZADDs needed to rebuild the sorted set */
5909 zset *zs = o->ptr;
5910 dictIterator *di = dictGetIterator(zs->dict);
5911 dictEntry *de;
5912
5913 while((de = dictNext(di)) != NULL) {
5914 char cmd[]="*4\r\n$4\r\nZADD\r\n";
5915 robj *eleobj = dictGetEntryKey(de);
5916 double *score = dictGetEntryVal(de);
5917
5918 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5919 if (fwriteBulk(fp,key) == 0) goto werr;
5920 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
5921 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5922 }
5923 dictReleaseIterator(di);
5924 } else {
5925 assert(0 != 0);
5926 }
5927 /* Save the expire time */
5928 if (expiretime != -1) {
5929 char cmd[]="*3\r\n$6\r\nEXPIRE\r\n";
5930 /* If this key is already expired skip it */
5931 if (expiretime < now) continue;
5932 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5933 if (fwriteBulk(fp,key) == 0) goto werr;
5934 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
5935 }
5936 }
5937 dictReleaseIterator(di);
5938 }
5939
5940 /* Make sure data will not remain on the OS's output buffers */
5941 fflush(fp);
5942 fsync(fileno(fp));
5943 fclose(fp);
5944
5945 /* Use RENAME to make sure the DB file is changed atomically only
5946 * if the generate DB file is ok. */
5947 if (rename(tmpfile,filename) == -1) {
5948 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
5949 unlink(tmpfile);
5950 return REDIS_ERR;
5951 }
5952 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
5953 return REDIS_OK;
5954
5955 werr:
5956 fclose(fp);
5957 unlink(tmpfile);
5958 redisLog(REDIS_WARNING,"Write error writing append only fileon disk: %s", strerror(errno));
5959 if (di) dictReleaseIterator(di);
5960 return REDIS_ERR;
5961 }
5962
5963 /* This is how rewriting of the append only file in background works:
5964 *
5965 * 1) The user calls BGREWRITEAOF
5966 * 2) Redis calls this function, that forks():
5967 * 2a) the child rewrite the append only file in a temp file.
5968 * 2b) the parent accumulates differences in server.bgrewritebuf.
5969 * 3) When the child finished '2a' exists.
5970 * 4) The parent will trap the exit code, if it's OK, will append the
5971 * data accumulated into server.bgrewritebuf into the temp file, and
5972 * finally will rename(2) the temp file in the actual file name.
5973 * The the new file is reopened as the new append only file. Profit!
5974 */
5975 static int rewriteAppendOnlyFileBackground(void) {
5976 pid_t childpid;
5977
5978 if (server.bgrewritechildpid != -1) return REDIS_ERR;
5979 if ((childpid = fork()) == 0) {
5980 /* Child */
5981 char tmpfile[256];
5982 close(server.fd);
5983
5984 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
5985 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
5986 exit(0);
5987 } else {
5988 exit(1);
5989 }
5990 } else {
5991 /* Parent */
5992 if (childpid == -1) {
5993 redisLog(REDIS_WARNING,
5994 "Can't rewrite append only file in background: fork: %s",
5995 strerror(errno));
5996 return REDIS_ERR;
5997 }
5998 redisLog(REDIS_NOTICE,
5999 "Background append only file rewriting started by pid %d",childpid);
6000 server.bgrewritechildpid = childpid;
6001 /* We set appendseldb to -1 in order to force the next call to the
6002 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6003 * accumulated by the parent into server.bgrewritebuf will start
6004 * with a SELECT statement and it will be safe to merge. */
6005 server.appendseldb = -1;
6006 return REDIS_OK;
6007 }
6008 return REDIS_OK; /* unreached */
6009 }
6010
6011 static void bgrewriteaofCommand(redisClient *c) {
6012 if (server.bgrewritechildpid != -1) {
6013 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6014 return;
6015 }
6016 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6017 addReply(c,shared.ok);
6018 } else {
6019 addReply(c,shared.err);
6020 }
6021 }
6022
6023 static void aofRemoveTempFile(pid_t childpid) {
6024 char tmpfile[256];
6025
6026 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6027 unlink(tmpfile);
6028 }
6029
6030 /* ================================= Debugging ============================== */
6031
6032 static void debugCommand(redisClient *c) {
6033 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6034 *((char*)-1) = 'x';
6035 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6036 if (rdbSave(server.dbfilename) != REDIS_OK) {
6037 addReply(c,shared.err);
6038 return;
6039 }
6040 emptyDb();
6041 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6042 addReply(c,shared.err);
6043 return;
6044 }
6045 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6046 addReply(c,shared.ok);
6047 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6048 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6049 robj *key, *val;
6050
6051 if (!de) {
6052 addReply(c,shared.nokeyerr);
6053 return;
6054 }
6055 key = dictGetEntryKey(de);
6056 val = dictGetEntryVal(de);
6057 addReplySds(c,sdscatprintf(sdsempty(),
6058 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6059 key, key->refcount, val, val->refcount, val->encoding));
6060 } else {
6061 addReplySds(c,sdsnew(
6062 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6063 }
6064 }
6065
6066 /* =================================== Main! ================================ */
6067
6068 #ifdef __linux__
6069 int linuxOvercommitMemoryValue(void) {
6070 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6071 char buf[64];
6072
6073 if (!fp) return -1;
6074 if (fgets(buf,64,fp) == NULL) {
6075 fclose(fp);
6076 return -1;
6077 }
6078 fclose(fp);
6079
6080 return atoi(buf);
6081 }
6082
6083 void linuxOvercommitMemoryWarning(void) {
6084 if (linuxOvercommitMemoryValue() == 0) {
6085 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6086 }
6087 }
6088 #endif /* __linux__ */
6089
6090 static void daemonize(void) {
6091 int fd;
6092 FILE *fp;
6093
6094 if (fork() != 0) exit(0); /* parent exits */
6095 setsid(); /* create a new session */
6096
6097 /* Every output goes to /dev/null. If Redis is daemonized but
6098 * the 'logfile' is set to 'stdout' in the configuration file
6099 * it will not log at all. */
6100 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6101 dup2(fd, STDIN_FILENO);
6102 dup2(fd, STDOUT_FILENO);
6103 dup2(fd, STDERR_FILENO);
6104 if (fd > STDERR_FILENO) close(fd);
6105 }
6106 /* Try to write the pid file */
6107 fp = fopen(server.pidfile,"w");
6108 if (fp) {
6109 fprintf(fp,"%d\n",getpid());
6110 fclose(fp);
6111 }
6112 }
6113
6114 int main(int argc, char **argv) {
6115 initServerConfig();
6116 if (argc == 2) {
6117 resetServerSaveParams();
6118 loadServerConfig(argv[1]);
6119 } else if (argc > 2) {
6120 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6121 exit(1);
6122 } else {
6123 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6124 }
6125 initServer();
6126 if (server.daemonize) daemonize();
6127 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6128 #ifdef __linux__
6129 linuxOvercommitMemoryWarning();
6130 #endif
6131 if (server.appendonly) {
6132 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6133 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6134 } else {
6135 if (rdbLoad(server.dbfilename) == REDIS_OK)
6136 redisLog(REDIS_NOTICE,"DB loaded from disk");
6137 }
6138 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6139 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6140 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6141 aeMain(server.el);
6142 aeDeleteEventLoop(server.el);
6143 return 0;
6144 }
6145
6146 /* ============================= Backtrace support ========================= */
6147
6148 #ifdef HAVE_BACKTRACE
6149 static char *findFuncName(void *pointer, unsigned long *offset);
6150
6151 static void *getMcontextEip(ucontext_t *uc) {
6152 #if defined(__FreeBSD__)
6153 return (void*) uc->uc_mcontext.mc_eip;
6154 #elif defined(__dietlibc__)
6155 return (void*) uc->uc_mcontext.eip;
6156 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6157 return (void*) uc->uc_mcontext->__ss.__eip;
6158 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6159 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6160 return (void*) uc->uc_mcontext->__ss.__rip;
6161 #else
6162 return (void*) uc->uc_mcontext->__ss.__eip;
6163 #endif
6164 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6165 return (void*) uc->uc_mcontext.gregs[REG_EIP];
6166 #elif defined(__ia64__) /* Linux IA64 */
6167 return (void*) uc->uc_mcontext.sc_ip;
6168 #else
6169 return NULL;
6170 #endif
6171 }
6172
6173 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6174 void *trace[100];
6175 char **messages = NULL;
6176 int i, trace_size = 0;
6177 unsigned long offset=0;
6178 time_t uptime = time(NULL)-server.stat_starttime;
6179 ucontext_t *uc = (ucontext_t*) secret;
6180 REDIS_NOTUSED(info);
6181
6182 redisLog(REDIS_WARNING,
6183 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6184 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
6185 "redis_version:%s; "
6186 "uptime_in_seconds:%d; "
6187 "connected_clients:%d; "
6188 "connected_slaves:%d; "
6189 "used_memory:%zu; "
6190 "changes_since_last_save:%lld; "
6191 "bgsave_in_progress:%d; "
6192 "last_save_time:%d; "
6193 "total_connections_received:%lld; "
6194 "total_commands_processed:%lld; "
6195 "role:%s;"
6196 ,REDIS_VERSION,
6197 uptime,
6198 listLength(server.clients)-listLength(server.slaves),
6199 listLength(server.slaves),
6200 server.usedmemory,
6201 server.dirty,
6202 server.bgsavechildpid != -1,
6203 server.lastsave,
6204 server.stat_numconnections,
6205 server.stat_numcommands,
6206 server.masterhost == NULL ? "master" : "slave"
6207 ));
6208
6209 trace_size = backtrace(trace, 100);
6210 /* overwrite sigaction with caller's address */
6211 if (getMcontextEip(uc) != NULL) {
6212 trace[1] = getMcontextEip(uc);
6213 }
6214 messages = backtrace_symbols(trace, trace_size);
6215
6216 for (i=1; i<trace_size; ++i) {
6217 char *fn = findFuncName(trace[i], &offset), *p;
6218
6219 p = strchr(messages[i],'+');
6220 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6221 redisLog(REDIS_WARNING,"%s", messages[i]);
6222 } else {
6223 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6224 }
6225 }
6226 free(messages);
6227 exit(0);
6228 }
6229
6230 static void setupSigSegvAction(void) {
6231 struct sigaction act;
6232
6233 sigemptyset (&act.sa_mask);
6234 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6235 * is used. Otherwise, sa_handler is used */
6236 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6237 act.sa_sigaction = segvHandler;
6238 sigaction (SIGSEGV, &act, NULL);
6239 sigaction (SIGBUS, &act, NULL);
6240 sigaction (SIGFPE, &act, NULL);
6241 sigaction (SIGILL, &act, NULL);
6242 sigaction (SIGBUS, &act, NULL);
6243 return;
6244 }
6245
6246 #include "staticsymbols.h"
6247 /* This function try to convert a pointer into a function name. It's used in
6248 * oreder to provide a backtrace under segmentation fault that's able to
6249 * display functions declared as static (otherwise the backtrace is useless). */
6250 static char *findFuncName(void *pointer, unsigned long *offset){
6251 int i, ret = -1;
6252 unsigned long off, minoff = 0;
6253
6254 /* Try to match against the Symbol with the smallest offset */
6255 for (i=0; symsTable[i].pointer; i++) {
6256 unsigned long lp = (unsigned long) pointer;
6257
6258 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6259 off=lp-symsTable[i].pointer;
6260 if (ret < 0 || off < minoff) {
6261 minoff=off;
6262 ret=i;
6263 }
6264 }
6265 }
6266 if (ret == -1) return NULL;
6267 *offset = minoff;
6268 return symsTable[ret].name;
6269 }
6270 #else /* HAVE_BACKTRACE */
6271 static void setupSigSegvAction(void) {
6272 }
6273 #endif /* HAVE_BACKTRACE */
6274
6275
6276
6277 /* The End */
6278
6279
6280