]> git.saurik.com Git - redis.git/blob - redis.c
append only file fixes
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.07"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Client flags */
156 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
157 #define REDIS_SLAVE 2 /* This client is a slave server */
158 #define REDIS_MASTER 4 /* This client is a master server */
159 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
160
161 /* Slave replication state - slave side */
162 #define REDIS_REPL_NONE 0 /* No active replication */
163 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
164 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
165
166 /* Slave replication state - from the point of view of master
167 * Note that in SEND_BULK and ONLINE state the slave receives new updates
168 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
169 * to start the next background saving in order to send updates to it. */
170 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
171 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
172 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
173 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
174
175 /* List related stuff */
176 #define REDIS_HEAD 0
177 #define REDIS_TAIL 1
178
179 /* Sort operations */
180 #define REDIS_SORT_GET 0
181 #define REDIS_SORT_ASC 1
182 #define REDIS_SORT_DESC 2
183 #define REDIS_SORTKEY_MAX 1024
184
185 /* Log levels */
186 #define REDIS_DEBUG 0
187 #define REDIS_NOTICE 1
188 #define REDIS_WARNING 2
189
190 /* Anti-warning macro... */
191 #define REDIS_NOTUSED(V) ((void) V)
192
193 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
194 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
195
196 /* Append only defines */
197 #define APPENDFSYNC_NO 0
198 #define APPENDFSYNC_ALWAYS 1
199 #define APPENDFSYNC_EVERYSEC 2
200
201 /*================================= Data types ============================== */
202
203 /* A redis object, that is a type able to hold a string / list / set */
204 typedef struct redisObject {
205 void *ptr;
206 unsigned char type;
207 unsigned char encoding;
208 unsigned char notused[2];
209 int refcount;
210 } robj;
211
212 typedef struct redisDb {
213 dict *dict;
214 dict *expires;
215 int id;
216 } redisDb;
217
218 /* With multiplexing we need to take per-clinet state.
219 * Clients are taken in a liked list. */
220 typedef struct redisClient {
221 int fd;
222 redisDb *db;
223 int dictid;
224 sds querybuf;
225 robj **argv, **mbargv;
226 int argc, mbargc;
227 int bulklen; /* bulk read len. -1 if not in bulk read mode */
228 int multibulk; /* multi bulk command format active */
229 list *reply;
230 int sentlen;
231 time_t lastinteraction; /* time of the last interaction, used for timeout */
232 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
233 int slaveseldb; /* slave selected db, if this client is a slave */
234 int authenticated; /* when requirepass is non-NULL */
235 int replstate; /* replication state if this is a slave */
236 int repldbfd; /* replication DB file descriptor */
237 long repldboff; /* replication DB file offset */
238 off_t repldbsize; /* replication DB file size */
239 } redisClient;
240
241 struct saveparam {
242 time_t seconds;
243 int changes;
244 };
245
246 /* Global server state structure */
247 struct redisServer {
248 int port;
249 int fd;
250 redisDb *db;
251 dict *sharingpool;
252 unsigned int sharingpoolsize;
253 long long dirty; /* changes to DB from the last save */
254 list *clients;
255 list *slaves, *monitors;
256 char neterr[ANET_ERR_LEN];
257 aeEventLoop *el;
258 int cronloops; /* number of times the cron function run */
259 list *objfreelist; /* A list of freed objects to avoid malloc() */
260 time_t lastsave; /* Unix time of last save succeeede */
261 size_t usedmemory; /* Used memory in megabytes */
262 /* Fields used only for stats */
263 time_t stat_starttime; /* server start time */
264 long long stat_numcommands; /* number of processed commands */
265 long long stat_numconnections; /* number of connections received */
266 /* Configuration */
267 int verbosity;
268 int glueoutputbuf;
269 int maxidletime;
270 int dbnum;
271 int daemonize;
272 int appendonly;
273 int appendfsync;
274 time_t lastfsync;
275 int appendfd;
276 int appendseldb;
277 char *pidfile;
278 pid_t bgsavechildpid;
279 pid_t bgrewritechildpid;
280 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
281 struct saveparam *saveparams;
282 int saveparamslen;
283 char *logfile;
284 char *bindaddr;
285 char *dbfilename;
286 char *appendfilename;
287 char *requirepass;
288 int shareobjects;
289 /* Replication related */
290 int isslave;
291 char *masterauth;
292 char *masterhost;
293 int masterport;
294 redisClient *master; /* client that is master for this slave */
295 int replstate;
296 unsigned int maxclients;
297 unsigned long maxmemory;
298 /* Sort parameters - qsort_r() is only available under BSD so we
299 * have to take this state global, in order to pass it to sortCompare() */
300 int sort_desc;
301 int sort_alpha;
302 int sort_bypattern;
303 };
304
305 typedef void redisCommandProc(redisClient *c);
306 struct redisCommand {
307 char *name;
308 redisCommandProc *proc;
309 int arity;
310 int flags;
311 };
312
313 struct redisFunctionSym {
314 char *name;
315 unsigned long pointer;
316 };
317
318 typedef struct _redisSortObject {
319 robj *obj;
320 union {
321 double score;
322 robj *cmpobj;
323 } u;
324 } redisSortObject;
325
326 typedef struct _redisSortOperation {
327 int type;
328 robj *pattern;
329 } redisSortOperation;
330
331 /* ZSETs use a specialized version of Skiplists */
332
333 typedef struct zskiplistNode {
334 struct zskiplistNode **forward;
335 struct zskiplistNode *backward;
336 double score;
337 robj *obj;
338 } zskiplistNode;
339
340 typedef struct zskiplist {
341 struct zskiplistNode *header, *tail;
342 unsigned long length;
343 int level;
344 } zskiplist;
345
346 typedef struct zset {
347 dict *dict;
348 zskiplist *zsl;
349 } zset;
350
351 /* Our shared "common" objects */
352
353 struct sharedObjectsStruct {
354 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
355 *colon, *nullbulk, *nullmultibulk,
356 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
357 *outofrangeerr, *plus,
358 *select0, *select1, *select2, *select3, *select4,
359 *select5, *select6, *select7, *select8, *select9;
360 } shared;
361
362 /* Global vars that are actally used as constants. The following double
363 * values are used for double on-disk serialization, and are initialized
364 * at runtime to avoid strange compiler optimizations. */
365
366 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
367
368 /*================================ Prototypes =============================== */
369
370 static void freeStringObject(robj *o);
371 static void freeListObject(robj *o);
372 static void freeSetObject(robj *o);
373 static void decrRefCount(void *o);
374 static robj *createObject(int type, void *ptr);
375 static void freeClient(redisClient *c);
376 static int rdbLoad(char *filename);
377 static void addReply(redisClient *c, robj *obj);
378 static void addReplySds(redisClient *c, sds s);
379 static void incrRefCount(robj *o);
380 static int rdbSaveBackground(char *filename);
381 static robj *createStringObject(char *ptr, size_t len);
382 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
383 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
384 static int syncWithMaster(void);
385 static robj *tryObjectSharing(robj *o);
386 static int tryObjectEncoding(robj *o);
387 static robj *getDecodedObject(robj *o);
388 static int removeExpire(redisDb *db, robj *key);
389 static int expireIfNeeded(redisDb *db, robj *key);
390 static int deleteIfVolatile(redisDb *db, robj *key);
391 static int deleteKey(redisDb *db, robj *key);
392 static time_t getExpire(redisDb *db, robj *key);
393 static int setExpire(redisDb *db, robj *key, time_t when);
394 static void updateSlavesWaitingBgsave(int bgsaveerr);
395 static void freeMemoryIfNeeded(void);
396 static int processCommand(redisClient *c);
397 static void setupSigSegvAction(void);
398 static void rdbRemoveTempFile(pid_t childpid);
399 static void aofRemoveTempFile(pid_t childpid);
400 static size_t stringObjectLen(robj *o);
401 static void processInputBuffer(redisClient *c);
402 static zskiplist *zslCreate(void);
403 static void zslFree(zskiplist *zsl);
404 static void zslInsert(zskiplist *zsl, double score, robj *obj);
405 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
406
407 static void authCommand(redisClient *c);
408 static void pingCommand(redisClient *c);
409 static void echoCommand(redisClient *c);
410 static void setCommand(redisClient *c);
411 static void setnxCommand(redisClient *c);
412 static void getCommand(redisClient *c);
413 static void delCommand(redisClient *c);
414 static void existsCommand(redisClient *c);
415 static void incrCommand(redisClient *c);
416 static void decrCommand(redisClient *c);
417 static void incrbyCommand(redisClient *c);
418 static void decrbyCommand(redisClient *c);
419 static void selectCommand(redisClient *c);
420 static void randomkeyCommand(redisClient *c);
421 static void keysCommand(redisClient *c);
422 static void dbsizeCommand(redisClient *c);
423 static void lastsaveCommand(redisClient *c);
424 static void saveCommand(redisClient *c);
425 static void bgsaveCommand(redisClient *c);
426 static void bgrewriteaofCommand(redisClient *c);
427 static void shutdownCommand(redisClient *c);
428 static void moveCommand(redisClient *c);
429 static void renameCommand(redisClient *c);
430 static void renamenxCommand(redisClient *c);
431 static void lpushCommand(redisClient *c);
432 static void rpushCommand(redisClient *c);
433 static void lpopCommand(redisClient *c);
434 static void rpopCommand(redisClient *c);
435 static void llenCommand(redisClient *c);
436 static void lindexCommand(redisClient *c);
437 static void lrangeCommand(redisClient *c);
438 static void ltrimCommand(redisClient *c);
439 static void typeCommand(redisClient *c);
440 static void lsetCommand(redisClient *c);
441 static void saddCommand(redisClient *c);
442 static void sremCommand(redisClient *c);
443 static void smoveCommand(redisClient *c);
444 static void sismemberCommand(redisClient *c);
445 static void scardCommand(redisClient *c);
446 static void spopCommand(redisClient *c);
447 static void srandmemberCommand(redisClient *c);
448 static void sinterCommand(redisClient *c);
449 static void sinterstoreCommand(redisClient *c);
450 static void sunionCommand(redisClient *c);
451 static void sunionstoreCommand(redisClient *c);
452 static void sdiffCommand(redisClient *c);
453 static void sdiffstoreCommand(redisClient *c);
454 static void syncCommand(redisClient *c);
455 static void flushdbCommand(redisClient *c);
456 static void flushallCommand(redisClient *c);
457 static void sortCommand(redisClient *c);
458 static void lremCommand(redisClient *c);
459 static void rpoplpushcommand(redisClient *c);
460 static void infoCommand(redisClient *c);
461 static void mgetCommand(redisClient *c);
462 static void monitorCommand(redisClient *c);
463 static void expireCommand(redisClient *c);
464 static void expireatCommand(redisClient *c);
465 static void getsetCommand(redisClient *c);
466 static void ttlCommand(redisClient *c);
467 static void slaveofCommand(redisClient *c);
468 static void debugCommand(redisClient *c);
469 static void msetCommand(redisClient *c);
470 static void msetnxCommand(redisClient *c);
471 static void zaddCommand(redisClient *c);
472 static void zincrbyCommand(redisClient *c);
473 static void zrangeCommand(redisClient *c);
474 static void zrangebyscoreCommand(redisClient *c);
475 static void zrevrangeCommand(redisClient *c);
476 static void zcardCommand(redisClient *c);
477 static void zremCommand(redisClient *c);
478 static void zscoreCommand(redisClient *c);
479 static void zremrangebyscoreCommand(redisClient *c);
480
481 /*================================= Globals ================================= */
482
483 /* Global vars */
484 static struct redisServer server; /* server global state */
485 static struct redisCommand cmdTable[] = {
486 {"get",getCommand,2,REDIS_CMD_INLINE},
487 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
488 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
489 {"del",delCommand,-2,REDIS_CMD_INLINE},
490 {"exists",existsCommand,2,REDIS_CMD_INLINE},
491 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
492 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
493 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
494 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
495 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
496 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
497 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
498 {"llen",llenCommand,2,REDIS_CMD_INLINE},
499 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
500 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
501 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
502 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
503 {"lrem",lremCommand,4,REDIS_CMD_BULK},
504 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK},
505 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
506 {"srem",sremCommand,3,REDIS_CMD_BULK},
507 {"smove",smoveCommand,4,REDIS_CMD_BULK},
508 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
509 {"scard",scardCommand,2,REDIS_CMD_INLINE},
510 {"spop",spopCommand,2,REDIS_CMD_INLINE},
511 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
512 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
513 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
514 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
515 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
516 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
517 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
518 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
519 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
520 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
521 {"zrem",zremCommand,3,REDIS_CMD_BULK},
522 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
523 {"zrange",zrangeCommand,4,REDIS_CMD_INLINE},
524 {"zrangebyscore",zrangebyscoreCommand,4,REDIS_CMD_INLINE},
525 {"zrevrange",zrevrangeCommand,4,REDIS_CMD_INLINE},
526 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
527 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
528 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
529 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
530 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
531 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
532 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
533 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
534 {"select",selectCommand,2,REDIS_CMD_INLINE},
535 {"move",moveCommand,3,REDIS_CMD_INLINE},
536 {"rename",renameCommand,3,REDIS_CMD_INLINE},
537 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
538 {"expire",expireCommand,3,REDIS_CMD_INLINE},
539 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
540 {"keys",keysCommand,2,REDIS_CMD_INLINE},
541 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
542 {"auth",authCommand,2,REDIS_CMD_INLINE},
543 {"ping",pingCommand,1,REDIS_CMD_INLINE},
544 {"echo",echoCommand,2,REDIS_CMD_BULK},
545 {"save",saveCommand,1,REDIS_CMD_INLINE},
546 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
547 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
548 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
549 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
550 {"type",typeCommand,2,REDIS_CMD_INLINE},
551 {"sync",syncCommand,1,REDIS_CMD_INLINE},
552 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
553 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
554 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
555 {"info",infoCommand,1,REDIS_CMD_INLINE},
556 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
557 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
558 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
559 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
560 {NULL,NULL,0,0}
561 };
562
563 /*============================ Utility functions ============================ */
564
565 /* Glob-style pattern matching. */
566 int stringmatchlen(const char *pattern, int patternLen,
567 const char *string, int stringLen, int nocase)
568 {
569 while(patternLen) {
570 switch(pattern[0]) {
571 case '*':
572 while (pattern[1] == '*') {
573 pattern++;
574 patternLen--;
575 }
576 if (patternLen == 1)
577 return 1; /* match */
578 while(stringLen) {
579 if (stringmatchlen(pattern+1, patternLen-1,
580 string, stringLen, nocase))
581 return 1; /* match */
582 string++;
583 stringLen--;
584 }
585 return 0; /* no match */
586 break;
587 case '?':
588 if (stringLen == 0)
589 return 0; /* no match */
590 string++;
591 stringLen--;
592 break;
593 case '[':
594 {
595 int not, match;
596
597 pattern++;
598 patternLen--;
599 not = pattern[0] == '^';
600 if (not) {
601 pattern++;
602 patternLen--;
603 }
604 match = 0;
605 while(1) {
606 if (pattern[0] == '\\') {
607 pattern++;
608 patternLen--;
609 if (pattern[0] == string[0])
610 match = 1;
611 } else if (pattern[0] == ']') {
612 break;
613 } else if (patternLen == 0) {
614 pattern--;
615 patternLen++;
616 break;
617 } else if (pattern[1] == '-' && patternLen >= 3) {
618 int start = pattern[0];
619 int end = pattern[2];
620 int c = string[0];
621 if (start > end) {
622 int t = start;
623 start = end;
624 end = t;
625 }
626 if (nocase) {
627 start = tolower(start);
628 end = tolower(end);
629 c = tolower(c);
630 }
631 pattern += 2;
632 patternLen -= 2;
633 if (c >= start && c <= end)
634 match = 1;
635 } else {
636 if (!nocase) {
637 if (pattern[0] == string[0])
638 match = 1;
639 } else {
640 if (tolower((int)pattern[0]) == tolower((int)string[0]))
641 match = 1;
642 }
643 }
644 pattern++;
645 patternLen--;
646 }
647 if (not)
648 match = !match;
649 if (!match)
650 return 0; /* no match */
651 string++;
652 stringLen--;
653 break;
654 }
655 case '\\':
656 if (patternLen >= 2) {
657 pattern++;
658 patternLen--;
659 }
660 /* fall through */
661 default:
662 if (!nocase) {
663 if (pattern[0] != string[0])
664 return 0; /* no match */
665 } else {
666 if (tolower((int)pattern[0]) != tolower((int)string[0]))
667 return 0; /* no match */
668 }
669 string++;
670 stringLen--;
671 break;
672 }
673 pattern++;
674 patternLen--;
675 if (stringLen == 0) {
676 while(*pattern == '*') {
677 pattern++;
678 patternLen--;
679 }
680 break;
681 }
682 }
683 if (patternLen == 0 && stringLen == 0)
684 return 1;
685 return 0;
686 }
687
688 static void redisLog(int level, const char *fmt, ...) {
689 va_list ap;
690 FILE *fp;
691
692 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
693 if (!fp) return;
694
695 va_start(ap, fmt);
696 if (level >= server.verbosity) {
697 char *c = ".-*";
698 char buf[64];
699 time_t now;
700
701 now = time(NULL);
702 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
703 fprintf(fp,"%s %c ",buf,c[level]);
704 vfprintf(fp, fmt, ap);
705 fprintf(fp,"\n");
706 fflush(fp);
707 }
708 va_end(ap);
709
710 if (server.logfile) fclose(fp);
711 }
712
713 /*====================== Hash table type implementation ==================== */
714
715 /* This is an hash table type that uses the SDS dynamic strings libary as
716 * keys and radis objects as values (objects can hold SDS strings,
717 * lists, sets). */
718
719 static void dictVanillaFree(void *privdata, void *val)
720 {
721 DICT_NOTUSED(privdata);
722 zfree(val);
723 }
724
725 static int sdsDictKeyCompare(void *privdata, const void *key1,
726 const void *key2)
727 {
728 int l1,l2;
729 DICT_NOTUSED(privdata);
730
731 l1 = sdslen((sds)key1);
732 l2 = sdslen((sds)key2);
733 if (l1 != l2) return 0;
734 return memcmp(key1, key2, l1) == 0;
735 }
736
737 static void dictRedisObjectDestructor(void *privdata, void *val)
738 {
739 DICT_NOTUSED(privdata);
740
741 decrRefCount(val);
742 }
743
744 static int dictObjKeyCompare(void *privdata, const void *key1,
745 const void *key2)
746 {
747 const robj *o1 = key1, *o2 = key2;
748 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
749 }
750
751 static unsigned int dictObjHash(const void *key) {
752 const robj *o = key;
753 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
754 }
755
756 static int dictEncObjKeyCompare(void *privdata, const void *key1,
757 const void *key2)
758 {
759 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
760 int cmp;
761
762 o1 = getDecodedObject(o1);
763 o2 = getDecodedObject(o2);
764 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
765 decrRefCount(o1);
766 decrRefCount(o2);
767 return cmp;
768 }
769
770 static unsigned int dictEncObjHash(const void *key) {
771 robj *o = (robj*) key;
772
773 o = getDecodedObject(o);
774 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
775 decrRefCount(o);
776 return hash;
777 }
778
779 static dictType setDictType = {
780 dictEncObjHash, /* hash function */
781 NULL, /* key dup */
782 NULL, /* val dup */
783 dictEncObjKeyCompare, /* key compare */
784 dictRedisObjectDestructor, /* key destructor */
785 NULL /* val destructor */
786 };
787
788 static dictType zsetDictType = {
789 dictEncObjHash, /* hash function */
790 NULL, /* key dup */
791 NULL, /* val dup */
792 dictEncObjKeyCompare, /* key compare */
793 dictRedisObjectDestructor, /* key destructor */
794 dictVanillaFree /* val destructor */
795 };
796
797 static dictType hashDictType = {
798 dictObjHash, /* hash function */
799 NULL, /* key dup */
800 NULL, /* val dup */
801 dictObjKeyCompare, /* key compare */
802 dictRedisObjectDestructor, /* key destructor */
803 dictRedisObjectDestructor /* val destructor */
804 };
805
806 /* ========================= Random utility functions ======================= */
807
808 /* Redis generally does not try to recover from out of memory conditions
809 * when allocating objects or strings, it is not clear if it will be possible
810 * to report this condition to the client since the networking layer itself
811 * is based on heap allocation for send buffers, so we simply abort.
812 * At least the code will be simpler to read... */
813 static void oom(const char *msg) {
814 fprintf(stderr, "%s: Out of memory\n",msg);
815 fflush(stderr);
816 sleep(1);
817 abort();
818 }
819
820 /* ====================== Redis server networking stuff ===================== */
821 static void closeTimedoutClients(void) {
822 redisClient *c;
823 listNode *ln;
824 time_t now = time(NULL);
825
826 listRewind(server.clients);
827 while ((ln = listYield(server.clients)) != NULL) {
828 c = listNodeValue(ln);
829 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
830 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
831 (now - c->lastinteraction > server.maxidletime)) {
832 redisLog(REDIS_DEBUG,"Closing idle client");
833 freeClient(c);
834 }
835 }
836 }
837
838 static int htNeedsResize(dict *dict) {
839 long long size, used;
840
841 size = dictSlots(dict);
842 used = dictSize(dict);
843 return (size && used && size > DICT_HT_INITIAL_SIZE &&
844 (used*100/size < REDIS_HT_MINFILL));
845 }
846
847 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
848 * we resize the hash table to save memory */
849 static void tryResizeHashTables(void) {
850 int j;
851
852 for (j = 0; j < server.dbnum; j++) {
853 if (htNeedsResize(server.db[j].dict)) {
854 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
855 dictResize(server.db[j].dict);
856 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
857 }
858 if (htNeedsResize(server.db[j].expires))
859 dictResize(server.db[j].expires);
860 }
861 }
862
863 /* A background saving child (BGSAVE) terminated its work. Handle this. */
864 void backgroundSaveDoneHandler(int statloc) {
865 int exitcode = WEXITSTATUS(statloc);
866 int bysignal = WIFSIGNALED(statloc);
867
868 if (!bysignal && exitcode == 0) {
869 redisLog(REDIS_NOTICE,
870 "Background saving terminated with success");
871 server.dirty = 0;
872 server.lastsave = time(NULL);
873 } else if (!bysignal && exitcode != 0) {
874 redisLog(REDIS_WARNING, "Background saving error");
875 } else {
876 redisLog(REDIS_WARNING,
877 "Background saving terminated by signal");
878 rdbRemoveTempFile(server.bgsavechildpid);
879 }
880 server.bgsavechildpid = -1;
881 /* Possibly there are slaves waiting for a BGSAVE in order to be served
882 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
883 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
884 }
885
886 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
887 * Handle this. */
888 void backgroundRewriteDoneHandler(int statloc) {
889 int exitcode = WEXITSTATUS(statloc);
890 int bysignal = WIFSIGNALED(statloc);
891
892 if (!bysignal && exitcode == 0) {
893 int fd;
894 char tmpfile[256];
895
896 redisLog(REDIS_NOTICE,
897 "Background append only file rewriting terminated with success");
898 /* Now it's time to flush the differences accumulated by the parent */
899 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
900 fd = open(tmpfile,O_WRONLY|O_APPEND);
901 if (fd == -1) {
902 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
903 goto cleanup;
904 }
905 /* Flush our data... */
906 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
907 (signed) sdslen(server.bgrewritebuf)) {
908 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
909 close(fd);
910 goto cleanup;
911 }
912 redisLog(REDIS_WARNING,"Parent diff flushed into the new append log file with success");
913 /* Now our work is to rename the temp file into the stable file. And
914 * switch the file descriptor used by the server for append only. */
915 if (rename(tmpfile,server.appendfilename) == -1) {
916 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
917 close(fd);
918 goto cleanup;
919 }
920 /* Mission completed... almost */
921 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
922 if (server.appendfd != -1) {
923 /* If append only is actually enabled... */
924 close(server.appendfd);
925 server.appendfd = fd;
926 fsync(fd);
927 server.appendseldb = -1; /* Make sure it will issue SELECT */
928 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
929 } else {
930 /* If append only is disabled we just generate a dump in this
931 * format. Why not? */
932 close(fd);
933 }
934 } else if (!bysignal && exitcode != 0) {
935 redisLog(REDIS_WARNING, "Background append only file rewriting error");
936 } else {
937 redisLog(REDIS_WARNING,
938 "Background append only file rewriting terminated by signal");
939 }
940 cleanup:
941 sdsfree(server.bgrewritebuf);
942 server.bgrewritebuf = sdsempty();
943 aofRemoveTempFile(server.bgrewritechildpid);
944 server.bgrewritechildpid = -1;
945 }
946
947 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
948 int j, loops = server.cronloops++;
949 REDIS_NOTUSED(eventLoop);
950 REDIS_NOTUSED(id);
951 REDIS_NOTUSED(clientData);
952
953 /* Update the global state with the amount of used memory */
954 server.usedmemory = zmalloc_used_memory();
955
956 /* Show some info about non-empty databases */
957 for (j = 0; j < server.dbnum; j++) {
958 long long size, used, vkeys;
959
960 size = dictSlots(server.db[j].dict);
961 used = dictSize(server.db[j].dict);
962 vkeys = dictSize(server.db[j].expires);
963 if (!(loops % 5) && (used || vkeys)) {
964 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
965 /* dictPrintStats(server.dict); */
966 }
967 }
968
969 /* We don't want to resize the hash tables while a bacground saving
970 * is in progress: the saving child is created using fork() that is
971 * implemented with a copy-on-write semantic in most modern systems, so
972 * if we resize the HT while there is the saving child at work actually
973 * a lot of memory movements in the parent will cause a lot of pages
974 * copied. */
975 if (server.bgsavechildpid == -1) tryResizeHashTables();
976
977 /* Show information about connected clients */
978 if (!(loops % 5)) {
979 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
980 listLength(server.clients)-listLength(server.slaves),
981 listLength(server.slaves),
982 server.usedmemory,
983 dictSize(server.sharingpool));
984 }
985
986 /* Close connections of timedout clients */
987 if (server.maxidletime && !(loops % 10))
988 closeTimedoutClients();
989
990 /* Check if a background saving or AOF rewrite in progress terminated */
991 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
992 int statloc;
993 pid_t pid;
994
995 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
996 if (pid == server.bgsavechildpid) {
997 backgroundSaveDoneHandler(statloc);
998 } else {
999 backgroundRewriteDoneHandler(statloc);
1000 }
1001 }
1002 } else {
1003 /* If there is not a background saving in progress check if
1004 * we have to save now */
1005 time_t now = time(NULL);
1006 for (j = 0; j < server.saveparamslen; j++) {
1007 struct saveparam *sp = server.saveparams+j;
1008
1009 if (server.dirty >= sp->changes &&
1010 now-server.lastsave > sp->seconds) {
1011 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1012 sp->changes, sp->seconds);
1013 rdbSaveBackground(server.dbfilename);
1014 break;
1015 }
1016 }
1017 }
1018
1019 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1020 * will use few CPU cycles if there are few expiring keys, otherwise
1021 * it will get more aggressive to avoid that too much memory is used by
1022 * keys that can be removed from the keyspace. */
1023 for (j = 0; j < server.dbnum; j++) {
1024 int expired;
1025 redisDb *db = server.db+j;
1026
1027 /* Continue to expire if at the end of the cycle more than 25%
1028 * of the keys were expired. */
1029 do {
1030 int num = dictSize(db->expires);
1031 time_t now = time(NULL);
1032
1033 expired = 0;
1034 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1035 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1036 while (num--) {
1037 dictEntry *de;
1038 time_t t;
1039
1040 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1041 t = (time_t) dictGetEntryVal(de);
1042 if (now > t) {
1043 deleteKey(db,dictGetEntryKey(de));
1044 expired++;
1045 }
1046 }
1047 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1048 }
1049
1050 /* Check if we should connect to a MASTER */
1051 if (server.replstate == REDIS_REPL_CONNECT) {
1052 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1053 if (syncWithMaster() == REDIS_OK) {
1054 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1055 }
1056 }
1057 return 1000;
1058 }
1059
1060 static void createSharedObjects(void) {
1061 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1062 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1063 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1064 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1065 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1066 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1067 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1068 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1069 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1070 /* no such key */
1071 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1072 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1073 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1074 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1075 "-ERR no such key\r\n"));
1076 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1077 "-ERR syntax error\r\n"));
1078 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1079 "-ERR source and destination objects are the same\r\n"));
1080 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1081 "-ERR index out of range\r\n"));
1082 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1083 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1084 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1085 shared.select0 = createStringObject("select 0\r\n",10);
1086 shared.select1 = createStringObject("select 1\r\n",10);
1087 shared.select2 = createStringObject("select 2\r\n",10);
1088 shared.select3 = createStringObject("select 3\r\n",10);
1089 shared.select4 = createStringObject("select 4\r\n",10);
1090 shared.select5 = createStringObject("select 5\r\n",10);
1091 shared.select6 = createStringObject("select 6\r\n",10);
1092 shared.select7 = createStringObject("select 7\r\n",10);
1093 shared.select8 = createStringObject("select 8\r\n",10);
1094 shared.select9 = createStringObject("select 9\r\n",10);
1095 }
1096
1097 static void appendServerSaveParams(time_t seconds, int changes) {
1098 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1099 server.saveparams[server.saveparamslen].seconds = seconds;
1100 server.saveparams[server.saveparamslen].changes = changes;
1101 server.saveparamslen++;
1102 }
1103
1104 static void resetServerSaveParams() {
1105 zfree(server.saveparams);
1106 server.saveparams = NULL;
1107 server.saveparamslen = 0;
1108 }
1109
1110 static void initServerConfig() {
1111 server.dbnum = REDIS_DEFAULT_DBNUM;
1112 server.port = REDIS_SERVERPORT;
1113 server.verbosity = REDIS_DEBUG;
1114 server.maxidletime = REDIS_MAXIDLETIME;
1115 server.saveparams = NULL;
1116 server.logfile = NULL; /* NULL = log on standard output */
1117 server.bindaddr = NULL;
1118 server.glueoutputbuf = 1;
1119 server.daemonize = 0;
1120 server.appendonly = 0;
1121 server.appendfsync = APPENDFSYNC_ALWAYS;
1122 server.lastfsync = time(NULL);
1123 server.appendfd = -1;
1124 server.appendseldb = -1; /* Make sure the first time will not match */
1125 server.pidfile = "/var/run/redis.pid";
1126 server.dbfilename = "dump.rdb";
1127 server.appendfilename = "appendonly.aof";
1128 server.requirepass = NULL;
1129 server.shareobjects = 0;
1130 server.sharingpoolsize = 1024;
1131 server.maxclients = 0;
1132 server.maxmemory = 0;
1133 resetServerSaveParams();
1134
1135 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1136 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1137 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1138 /* Replication related */
1139 server.isslave = 0;
1140 server.masterauth = NULL;
1141 server.masterhost = NULL;
1142 server.masterport = 6379;
1143 server.master = NULL;
1144 server.replstate = REDIS_REPL_NONE;
1145
1146 /* Double constants initialization */
1147 R_Zero = 0.0;
1148 R_PosInf = 1.0/R_Zero;
1149 R_NegInf = -1.0/R_Zero;
1150 R_Nan = R_Zero/R_Zero;
1151 }
1152
1153 static void initServer() {
1154 int j;
1155
1156 signal(SIGHUP, SIG_IGN);
1157 signal(SIGPIPE, SIG_IGN);
1158 setupSigSegvAction();
1159
1160 server.clients = listCreate();
1161 server.slaves = listCreate();
1162 server.monitors = listCreate();
1163 server.objfreelist = listCreate();
1164 createSharedObjects();
1165 server.el = aeCreateEventLoop();
1166 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1167 server.sharingpool = dictCreate(&setDictType,NULL);
1168 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1169 if (server.fd == -1) {
1170 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1171 exit(1);
1172 }
1173 for (j = 0; j < server.dbnum; j++) {
1174 server.db[j].dict = dictCreate(&hashDictType,NULL);
1175 server.db[j].expires = dictCreate(&setDictType,NULL);
1176 server.db[j].id = j;
1177 }
1178 server.cronloops = 0;
1179 server.bgsavechildpid = -1;
1180 server.bgrewritechildpid = -1;
1181 server.bgrewritebuf = sdsempty();
1182 server.lastsave = time(NULL);
1183 server.dirty = 0;
1184 server.usedmemory = 0;
1185 server.stat_numcommands = 0;
1186 server.stat_numconnections = 0;
1187 server.stat_starttime = time(NULL);
1188 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1189
1190 if (server.appendonly) {
1191 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1192 if (server.appendfd == -1) {
1193 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1194 strerror(errno));
1195 exit(1);
1196 }
1197 }
1198 }
1199
1200 /* Empty the whole database */
1201 static long long emptyDb() {
1202 int j;
1203 long long removed = 0;
1204
1205 for (j = 0; j < server.dbnum; j++) {
1206 removed += dictSize(server.db[j].dict);
1207 dictEmpty(server.db[j].dict);
1208 dictEmpty(server.db[j].expires);
1209 }
1210 return removed;
1211 }
1212
1213 static int yesnotoi(char *s) {
1214 if (!strcasecmp(s,"yes")) return 1;
1215 else if (!strcasecmp(s,"no")) return 0;
1216 else return -1;
1217 }
1218
1219 /* I agree, this is a very rudimental way to load a configuration...
1220 will improve later if the config gets more complex */
1221 static void loadServerConfig(char *filename) {
1222 FILE *fp;
1223 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1224 int linenum = 0;
1225 sds line = NULL;
1226
1227 if (filename[0] == '-' && filename[1] == '\0')
1228 fp = stdin;
1229 else {
1230 if ((fp = fopen(filename,"r")) == NULL) {
1231 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1232 exit(1);
1233 }
1234 }
1235
1236 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1237 sds *argv;
1238 int argc, j;
1239
1240 linenum++;
1241 line = sdsnew(buf);
1242 line = sdstrim(line," \t\r\n");
1243
1244 /* Skip comments and blank lines*/
1245 if (line[0] == '#' || line[0] == '\0') {
1246 sdsfree(line);
1247 continue;
1248 }
1249
1250 /* Split into arguments */
1251 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1252 sdstolower(argv[0]);
1253
1254 /* Execute config directives */
1255 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1256 server.maxidletime = atoi(argv[1]);
1257 if (server.maxidletime < 0) {
1258 err = "Invalid timeout value"; goto loaderr;
1259 }
1260 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1261 server.port = atoi(argv[1]);
1262 if (server.port < 1 || server.port > 65535) {
1263 err = "Invalid port"; goto loaderr;
1264 }
1265 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1266 server.bindaddr = zstrdup(argv[1]);
1267 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1268 int seconds = atoi(argv[1]);
1269 int changes = atoi(argv[2]);
1270 if (seconds < 1 || changes < 0) {
1271 err = "Invalid save parameters"; goto loaderr;
1272 }
1273 appendServerSaveParams(seconds,changes);
1274 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1275 if (chdir(argv[1]) == -1) {
1276 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1277 argv[1], strerror(errno));
1278 exit(1);
1279 }
1280 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1281 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1282 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1283 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1284 else {
1285 err = "Invalid log level. Must be one of debug, notice, warning";
1286 goto loaderr;
1287 }
1288 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1289 FILE *logfp;
1290
1291 server.logfile = zstrdup(argv[1]);
1292 if (!strcasecmp(server.logfile,"stdout")) {
1293 zfree(server.logfile);
1294 server.logfile = NULL;
1295 }
1296 if (server.logfile) {
1297 /* Test if we are able to open the file. The server will not
1298 * be able to abort just for this problem later... */
1299 logfp = fopen(server.logfile,"a");
1300 if (logfp == NULL) {
1301 err = sdscatprintf(sdsempty(),
1302 "Can't open the log file: %s", strerror(errno));
1303 goto loaderr;
1304 }
1305 fclose(logfp);
1306 }
1307 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1308 server.dbnum = atoi(argv[1]);
1309 if (server.dbnum < 1) {
1310 err = "Invalid number of databases"; goto loaderr;
1311 }
1312 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1313 server.maxclients = atoi(argv[1]);
1314 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1315 server.maxmemory = strtoll(argv[1], NULL, 10);
1316 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1317 server.masterhost = sdsnew(argv[1]);
1318 server.masterport = atoi(argv[2]);
1319 server.replstate = REDIS_REPL_CONNECT;
1320 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1321 server.masterauth = zstrdup(argv[1]);
1322 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1323 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1324 err = "argument must be 'yes' or 'no'"; goto loaderr;
1325 }
1326 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1327 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1328 err = "argument must be 'yes' or 'no'"; goto loaderr;
1329 }
1330 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1331 server.sharingpoolsize = atoi(argv[1]);
1332 if (server.sharingpoolsize < 1) {
1333 err = "invalid object sharing pool size"; goto loaderr;
1334 }
1335 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1336 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1337 err = "argument must be 'yes' or 'no'"; goto loaderr;
1338 }
1339 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1340 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1341 err = "argument must be 'yes' or 'no'"; goto loaderr;
1342 }
1343 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1344 if (!strcasecmp(argv[1],"no")) {
1345 server.appendfsync = APPENDFSYNC_NO;
1346 } else if (!strcasecmp(argv[1],"always")) {
1347 server.appendfsync = APPENDFSYNC_ALWAYS;
1348 } else if (!strcasecmp(argv[1],"everysec")) {
1349 server.appendfsync = APPENDFSYNC_EVERYSEC;
1350 } else {
1351 err = "argument must be 'no', 'always' or 'everysec'";
1352 goto loaderr;
1353 }
1354 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1355 server.requirepass = zstrdup(argv[1]);
1356 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1357 server.pidfile = zstrdup(argv[1]);
1358 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1359 server.dbfilename = zstrdup(argv[1]);
1360 } else {
1361 err = "Bad directive or wrong number of arguments"; goto loaderr;
1362 }
1363 for (j = 0; j < argc; j++)
1364 sdsfree(argv[j]);
1365 zfree(argv);
1366 sdsfree(line);
1367 }
1368 if (fp != stdin) fclose(fp);
1369 return;
1370
1371 loaderr:
1372 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1373 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1374 fprintf(stderr, ">>> '%s'\n", line);
1375 fprintf(stderr, "%s\n", err);
1376 exit(1);
1377 }
1378
1379 static void freeClientArgv(redisClient *c) {
1380 int j;
1381
1382 for (j = 0; j < c->argc; j++)
1383 decrRefCount(c->argv[j]);
1384 for (j = 0; j < c->mbargc; j++)
1385 decrRefCount(c->mbargv[j]);
1386 c->argc = 0;
1387 c->mbargc = 0;
1388 }
1389
1390 static void freeClient(redisClient *c) {
1391 listNode *ln;
1392
1393 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1394 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1395 sdsfree(c->querybuf);
1396 listRelease(c->reply);
1397 freeClientArgv(c);
1398 close(c->fd);
1399 ln = listSearchKey(server.clients,c);
1400 assert(ln != NULL);
1401 listDelNode(server.clients,ln);
1402 if (c->flags & REDIS_SLAVE) {
1403 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1404 close(c->repldbfd);
1405 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1406 ln = listSearchKey(l,c);
1407 assert(ln != NULL);
1408 listDelNode(l,ln);
1409 }
1410 if (c->flags & REDIS_MASTER) {
1411 server.master = NULL;
1412 server.replstate = REDIS_REPL_CONNECT;
1413 }
1414 zfree(c->argv);
1415 zfree(c->mbargv);
1416 zfree(c);
1417 }
1418
1419 #define GLUEREPLY_UP_TO (1024)
1420 static void glueReplyBuffersIfNeeded(redisClient *c) {
1421 int copylen = 0;
1422 char buf[GLUEREPLY_UP_TO];
1423 listNode *ln;
1424 robj *o;
1425
1426 listRewind(c->reply);
1427 while((ln = listYield(c->reply))) {
1428 int objlen;
1429
1430 o = ln->value;
1431 objlen = sdslen(o->ptr);
1432 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1433 memcpy(buf+copylen,o->ptr,objlen);
1434 copylen += objlen;
1435 listDelNode(c->reply,ln);
1436 } else {
1437 if (copylen == 0) return;
1438 break;
1439 }
1440 }
1441 /* Now the output buffer is empty, add the new single element */
1442 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1443 listAddNodeHead(c->reply,o);
1444 }
1445
1446 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1447 redisClient *c = privdata;
1448 int nwritten = 0, totwritten = 0, objlen;
1449 robj *o;
1450 REDIS_NOTUSED(el);
1451 REDIS_NOTUSED(mask);
1452
1453 /* Use writev() if we have enough buffers to send */
1454 if (!server.glueoutputbuf &&
1455 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1456 !(c->flags & REDIS_MASTER))
1457 {
1458 sendReplyToClientWritev(el, fd, privdata, mask);
1459 return;
1460 }
1461
1462 while(listLength(c->reply)) {
1463 if (server.glueoutputbuf && listLength(c->reply) > 1)
1464 glueReplyBuffersIfNeeded(c);
1465
1466 o = listNodeValue(listFirst(c->reply));
1467 objlen = sdslen(o->ptr);
1468
1469 if (objlen == 0) {
1470 listDelNode(c->reply,listFirst(c->reply));
1471 continue;
1472 }
1473
1474 if (c->flags & REDIS_MASTER) {
1475 /* Don't reply to a master */
1476 nwritten = objlen - c->sentlen;
1477 } else {
1478 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1479 if (nwritten <= 0) break;
1480 }
1481 c->sentlen += nwritten;
1482 totwritten += nwritten;
1483 /* If we fully sent the object on head go to the next one */
1484 if (c->sentlen == objlen) {
1485 listDelNode(c->reply,listFirst(c->reply));
1486 c->sentlen = 0;
1487 }
1488 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1489 * bytes, in a single threaded server it's a good idea to serve
1490 * other clients as well, even if a very large request comes from
1491 * super fast link that is always able to accept data (in real world
1492 * scenario think about 'KEYS *' against the loopback interfae) */
1493 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1494 }
1495 if (nwritten == -1) {
1496 if (errno == EAGAIN) {
1497 nwritten = 0;
1498 } else {
1499 redisLog(REDIS_DEBUG,
1500 "Error writing to client: %s", strerror(errno));
1501 freeClient(c);
1502 return;
1503 }
1504 }
1505 if (totwritten > 0) c->lastinteraction = time(NULL);
1506 if (listLength(c->reply) == 0) {
1507 c->sentlen = 0;
1508 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1509 }
1510 }
1511
1512 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1513 {
1514 redisClient *c = privdata;
1515 int nwritten = 0, totwritten = 0, objlen, willwrite;
1516 robj *o;
1517 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1518 int offset, ion = 0;
1519 REDIS_NOTUSED(el);
1520 REDIS_NOTUSED(mask);
1521
1522 listNode *node;
1523 while (listLength(c->reply)) {
1524 offset = c->sentlen;
1525 ion = 0;
1526 willwrite = 0;
1527
1528 /* fill-in the iov[] array */
1529 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1530 o = listNodeValue(node);
1531 objlen = sdslen(o->ptr);
1532
1533 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1534 break;
1535
1536 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1537 break; /* no more iovecs */
1538
1539 iov[ion].iov_base = ((char*)o->ptr) + offset;
1540 iov[ion].iov_len = objlen - offset;
1541 willwrite += objlen - offset;
1542 offset = 0; /* just for the first item */
1543 ion++;
1544 }
1545
1546 if(willwrite == 0)
1547 break;
1548
1549 /* write all collected blocks at once */
1550 if((nwritten = writev(fd, iov, ion)) < 0) {
1551 if (errno != EAGAIN) {
1552 redisLog(REDIS_DEBUG,
1553 "Error writing to client: %s", strerror(errno));
1554 freeClient(c);
1555 return;
1556 }
1557 break;
1558 }
1559
1560 totwritten += nwritten;
1561 offset = c->sentlen;
1562
1563 /* remove written robjs from c->reply */
1564 while (nwritten && listLength(c->reply)) {
1565 o = listNodeValue(listFirst(c->reply));
1566 objlen = sdslen(o->ptr);
1567
1568 if(nwritten >= objlen - offset) {
1569 listDelNode(c->reply, listFirst(c->reply));
1570 nwritten -= objlen - offset;
1571 c->sentlen = 0;
1572 } else {
1573 /* partial write */
1574 c->sentlen += nwritten;
1575 break;
1576 }
1577 offset = 0;
1578 }
1579 }
1580
1581 if (totwritten > 0)
1582 c->lastinteraction = time(NULL);
1583
1584 if (listLength(c->reply) == 0) {
1585 c->sentlen = 0;
1586 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1587 }
1588 }
1589
1590 static struct redisCommand *lookupCommand(char *name) {
1591 int j = 0;
1592 while(cmdTable[j].name != NULL) {
1593 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1594 j++;
1595 }
1596 return NULL;
1597 }
1598
1599 /* resetClient prepare the client to process the next command */
1600 static void resetClient(redisClient *c) {
1601 freeClientArgv(c);
1602 c->bulklen = -1;
1603 c->multibulk = 0;
1604 }
1605
1606 /* If this function gets called we already read a whole
1607 * command, argments are in the client argv/argc fields.
1608 * processCommand() execute the command or prepare the
1609 * server for a bulk read from the client.
1610 *
1611 * If 1 is returned the client is still alive and valid and
1612 * and other operations can be performed by the caller. Otherwise
1613 * if 0 is returned the client was destroied (i.e. after QUIT). */
1614 static int processCommand(redisClient *c) {
1615 struct redisCommand *cmd;
1616 long long dirty;
1617
1618 /* Free some memory if needed (maxmemory setting) */
1619 if (server.maxmemory) freeMemoryIfNeeded();
1620
1621 /* Handle the multi bulk command type. This is an alternative protocol
1622 * supported by Redis in order to receive commands that are composed of
1623 * multiple binary-safe "bulk" arguments. The latency of processing is
1624 * a bit higher but this allows things like multi-sets, so if this
1625 * protocol is used only for MSET and similar commands this is a big win. */
1626 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1627 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1628 if (c->multibulk <= 0) {
1629 resetClient(c);
1630 return 1;
1631 } else {
1632 decrRefCount(c->argv[c->argc-1]);
1633 c->argc--;
1634 return 1;
1635 }
1636 } else if (c->multibulk) {
1637 if (c->bulklen == -1) {
1638 if (((char*)c->argv[0]->ptr)[0] != '$') {
1639 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1640 resetClient(c);
1641 return 1;
1642 } else {
1643 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1644 decrRefCount(c->argv[0]);
1645 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1646 c->argc--;
1647 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1648 resetClient(c);
1649 return 1;
1650 }
1651 c->argc--;
1652 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1653 return 1;
1654 }
1655 } else {
1656 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1657 c->mbargv[c->mbargc] = c->argv[0];
1658 c->mbargc++;
1659 c->argc--;
1660 c->multibulk--;
1661 if (c->multibulk == 0) {
1662 robj **auxargv;
1663 int auxargc;
1664
1665 /* Here we need to swap the multi-bulk argc/argv with the
1666 * normal argc/argv of the client structure. */
1667 auxargv = c->argv;
1668 c->argv = c->mbargv;
1669 c->mbargv = auxargv;
1670
1671 auxargc = c->argc;
1672 c->argc = c->mbargc;
1673 c->mbargc = auxargc;
1674
1675 /* We need to set bulklen to something different than -1
1676 * in order for the code below to process the command without
1677 * to try to read the last argument of a bulk command as
1678 * a special argument. */
1679 c->bulklen = 0;
1680 /* continue below and process the command */
1681 } else {
1682 c->bulklen = -1;
1683 return 1;
1684 }
1685 }
1686 }
1687 /* -- end of multi bulk commands processing -- */
1688
1689 /* The QUIT command is handled as a special case. Normal command
1690 * procs are unable to close the client connection safely */
1691 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1692 freeClient(c);
1693 return 0;
1694 }
1695 cmd = lookupCommand(c->argv[0]->ptr);
1696 if (!cmd) {
1697 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1698 resetClient(c);
1699 return 1;
1700 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1701 (c->argc < -cmd->arity)) {
1702 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1703 resetClient(c);
1704 return 1;
1705 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1706 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1707 resetClient(c);
1708 return 1;
1709 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1710 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1711
1712 decrRefCount(c->argv[c->argc-1]);
1713 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1714 c->argc--;
1715 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1716 resetClient(c);
1717 return 1;
1718 }
1719 c->argc--;
1720 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1721 /* It is possible that the bulk read is already in the
1722 * buffer. Check this condition and handle it accordingly.
1723 * This is just a fast path, alternative to call processInputBuffer().
1724 * It's a good idea since the code is small and this condition
1725 * happens most of the times. */
1726 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1727 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1728 c->argc++;
1729 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1730 } else {
1731 return 1;
1732 }
1733 }
1734 /* Let's try to share objects on the command arguments vector */
1735 if (server.shareobjects) {
1736 int j;
1737 for(j = 1; j < c->argc; j++)
1738 c->argv[j] = tryObjectSharing(c->argv[j]);
1739 }
1740 /* Let's try to encode the bulk object to save space. */
1741 if (cmd->flags & REDIS_CMD_BULK)
1742 tryObjectEncoding(c->argv[c->argc-1]);
1743
1744 /* Check if the user is authenticated */
1745 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1746 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1747 resetClient(c);
1748 return 1;
1749 }
1750
1751 /* Exec the command */
1752 dirty = server.dirty;
1753 cmd->proc(c);
1754 if (server.appendonly && server.dirty-dirty)
1755 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1756 if (server.dirty-dirty && listLength(server.slaves))
1757 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1758 if (listLength(server.monitors))
1759 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1760 server.stat_numcommands++;
1761
1762 /* Prepare the client for the next command */
1763 if (c->flags & REDIS_CLOSE) {
1764 freeClient(c);
1765 return 0;
1766 }
1767 resetClient(c);
1768 return 1;
1769 }
1770
1771 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1772 listNode *ln;
1773 int outc = 0, j;
1774 robj **outv;
1775 /* (args*2)+1 is enough room for args, spaces, newlines */
1776 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1777
1778 if (argc <= REDIS_STATIC_ARGS) {
1779 outv = static_outv;
1780 } else {
1781 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1782 }
1783
1784 for (j = 0; j < argc; j++) {
1785 if (j != 0) outv[outc++] = shared.space;
1786 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1787 robj *lenobj;
1788
1789 lenobj = createObject(REDIS_STRING,
1790 sdscatprintf(sdsempty(),"%d\r\n",
1791 stringObjectLen(argv[j])));
1792 lenobj->refcount = 0;
1793 outv[outc++] = lenobj;
1794 }
1795 outv[outc++] = argv[j];
1796 }
1797 outv[outc++] = shared.crlf;
1798
1799 /* Increment all the refcounts at start and decrement at end in order to
1800 * be sure to free objects if there is no slave in a replication state
1801 * able to be feed with commands */
1802 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1803 listRewind(slaves);
1804 while((ln = listYield(slaves))) {
1805 redisClient *slave = ln->value;
1806
1807 /* Don't feed slaves that are still waiting for BGSAVE to start */
1808 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1809
1810 /* Feed all the other slaves, MONITORs and so on */
1811 if (slave->slaveseldb != dictid) {
1812 robj *selectcmd;
1813
1814 switch(dictid) {
1815 case 0: selectcmd = shared.select0; break;
1816 case 1: selectcmd = shared.select1; break;
1817 case 2: selectcmd = shared.select2; break;
1818 case 3: selectcmd = shared.select3; break;
1819 case 4: selectcmd = shared.select4; break;
1820 case 5: selectcmd = shared.select5; break;
1821 case 6: selectcmd = shared.select6; break;
1822 case 7: selectcmd = shared.select7; break;
1823 case 8: selectcmd = shared.select8; break;
1824 case 9: selectcmd = shared.select9; break;
1825 default:
1826 selectcmd = createObject(REDIS_STRING,
1827 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1828 selectcmd->refcount = 0;
1829 break;
1830 }
1831 addReply(slave,selectcmd);
1832 slave->slaveseldb = dictid;
1833 }
1834 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1835 }
1836 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1837 if (outv != static_outv) zfree(outv);
1838 }
1839
1840 static void processInputBuffer(redisClient *c) {
1841 again:
1842 if (c->bulklen == -1) {
1843 /* Read the first line of the query */
1844 char *p = strchr(c->querybuf,'\n');
1845 size_t querylen;
1846
1847 if (p) {
1848 sds query, *argv;
1849 int argc, j;
1850
1851 query = c->querybuf;
1852 c->querybuf = sdsempty();
1853 querylen = 1+(p-(query));
1854 if (sdslen(query) > querylen) {
1855 /* leave data after the first line of the query in the buffer */
1856 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1857 }
1858 *p = '\0'; /* remove "\n" */
1859 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1860 sdsupdatelen(query);
1861
1862 /* Now we can split the query in arguments */
1863 if (sdslen(query) == 0) {
1864 /* Ignore empty query */
1865 sdsfree(query);
1866 return;
1867 }
1868 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1869 sdsfree(query);
1870
1871 if (c->argv) zfree(c->argv);
1872 c->argv = zmalloc(sizeof(robj*)*argc);
1873
1874 for (j = 0; j < argc; j++) {
1875 if (sdslen(argv[j])) {
1876 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1877 c->argc++;
1878 } else {
1879 sdsfree(argv[j]);
1880 }
1881 }
1882 zfree(argv);
1883 /* Execute the command. If the client is still valid
1884 * after processCommand() return and there is something
1885 * on the query buffer try to process the next command. */
1886 if (c->argc && processCommand(c) && sdslen(c->querybuf)) goto again;
1887 return;
1888 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
1889 redisLog(REDIS_DEBUG, "Client protocol error");
1890 freeClient(c);
1891 return;
1892 }
1893 } else {
1894 /* Bulk read handling. Note that if we are at this point
1895 the client already sent a command terminated with a newline,
1896 we are reading the bulk data that is actually the last
1897 argument of the command. */
1898 int qbl = sdslen(c->querybuf);
1899
1900 if (c->bulklen <= qbl) {
1901 /* Copy everything but the final CRLF as final argument */
1902 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1903 c->argc++;
1904 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1905 /* Process the command. If the client is still valid after
1906 * the processing and there is more data in the buffer
1907 * try to parse it. */
1908 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1909 return;
1910 }
1911 }
1912 }
1913
1914 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1915 redisClient *c = (redisClient*) privdata;
1916 char buf[REDIS_IOBUF_LEN];
1917 int nread;
1918 REDIS_NOTUSED(el);
1919 REDIS_NOTUSED(mask);
1920
1921 nread = read(fd, buf, REDIS_IOBUF_LEN);
1922 if (nread == -1) {
1923 if (errno == EAGAIN) {
1924 nread = 0;
1925 } else {
1926 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1927 freeClient(c);
1928 return;
1929 }
1930 } else if (nread == 0) {
1931 redisLog(REDIS_DEBUG, "Client closed connection");
1932 freeClient(c);
1933 return;
1934 }
1935 if (nread) {
1936 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1937 c->lastinteraction = time(NULL);
1938 } else {
1939 return;
1940 }
1941 processInputBuffer(c);
1942 }
1943
1944 static int selectDb(redisClient *c, int id) {
1945 if (id < 0 || id >= server.dbnum)
1946 return REDIS_ERR;
1947 c->db = &server.db[id];
1948 return REDIS_OK;
1949 }
1950
1951 static void *dupClientReplyValue(void *o) {
1952 incrRefCount((robj*)o);
1953 return 0;
1954 }
1955
1956 static redisClient *createClient(int fd) {
1957 redisClient *c = zmalloc(sizeof(*c));
1958
1959 anetNonBlock(NULL,fd);
1960 anetTcpNoDelay(NULL,fd);
1961 if (!c) return NULL;
1962 selectDb(c,0);
1963 c->fd = fd;
1964 c->querybuf = sdsempty();
1965 c->argc = 0;
1966 c->argv = NULL;
1967 c->bulklen = -1;
1968 c->multibulk = 0;
1969 c->mbargc = 0;
1970 c->mbargv = NULL;
1971 c->sentlen = 0;
1972 c->flags = 0;
1973 c->lastinteraction = time(NULL);
1974 c->authenticated = 0;
1975 c->replstate = REDIS_REPL_NONE;
1976 c->reply = listCreate();
1977 listSetFreeMethod(c->reply,decrRefCount);
1978 listSetDupMethod(c->reply,dupClientReplyValue);
1979 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1980 readQueryFromClient, c) == AE_ERR) {
1981 freeClient(c);
1982 return NULL;
1983 }
1984 listAddNodeTail(server.clients,c);
1985 return c;
1986 }
1987
1988 static void addReply(redisClient *c, robj *obj) {
1989 if (listLength(c->reply) == 0 &&
1990 (c->replstate == REDIS_REPL_NONE ||
1991 c->replstate == REDIS_REPL_ONLINE) &&
1992 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1993 sendReplyToClient, c) == AE_ERR) return;
1994 listAddNodeTail(c->reply,getDecodedObject(obj));
1995 }
1996
1997 static void addReplySds(redisClient *c, sds s) {
1998 robj *o = createObject(REDIS_STRING,s);
1999 addReply(c,o);
2000 decrRefCount(o);
2001 }
2002
2003 static void addReplyDouble(redisClient *c, double d) {
2004 char buf[128];
2005
2006 snprintf(buf,sizeof(buf),"%.17g",d);
2007 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n%s\r\n",
2008 strlen(buf),buf));
2009 }
2010
2011 static void addReplyBulkLen(redisClient *c, robj *obj) {
2012 size_t len;
2013
2014 if (obj->encoding == REDIS_ENCODING_RAW) {
2015 len = sdslen(obj->ptr);
2016 } else {
2017 long n = (long)obj->ptr;
2018
2019 len = 1;
2020 if (n < 0) {
2021 len++;
2022 n = -n;
2023 }
2024 while((n = n/10) != 0) {
2025 len++;
2026 }
2027 }
2028 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",len));
2029 }
2030
2031 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2032 int cport, cfd;
2033 char cip[128];
2034 redisClient *c;
2035 REDIS_NOTUSED(el);
2036 REDIS_NOTUSED(mask);
2037 REDIS_NOTUSED(privdata);
2038
2039 cfd = anetAccept(server.neterr, fd, cip, &cport);
2040 if (cfd == AE_ERR) {
2041 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2042 return;
2043 }
2044 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2045 if ((c = createClient(cfd)) == NULL) {
2046 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2047 close(cfd); /* May be already closed, just ingore errors */
2048 return;
2049 }
2050 /* If maxclient directive is set and this is one client more... close the
2051 * connection. Note that we create the client instead to check before
2052 * for this condition, since now the socket is already set in nonblocking
2053 * mode and we can send an error for free using the Kernel I/O */
2054 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2055 char *err = "-ERR max number of clients reached\r\n";
2056
2057 /* That's a best effort error message, don't check write errors */
2058 if (write(c->fd,err,strlen(err)) == -1) {
2059 /* Nothing to do, Just to avoid the warning... */
2060 }
2061 freeClient(c);
2062 return;
2063 }
2064 server.stat_numconnections++;
2065 }
2066
2067 /* ======================= Redis objects implementation ===================== */
2068
2069 static robj *createObject(int type, void *ptr) {
2070 robj *o;
2071
2072 if (listLength(server.objfreelist)) {
2073 listNode *head = listFirst(server.objfreelist);
2074 o = listNodeValue(head);
2075 listDelNode(server.objfreelist,head);
2076 } else {
2077 o = zmalloc(sizeof(*o));
2078 }
2079 o->type = type;
2080 o->encoding = REDIS_ENCODING_RAW;
2081 o->ptr = ptr;
2082 o->refcount = 1;
2083 return o;
2084 }
2085
2086 static robj *createStringObject(char *ptr, size_t len) {
2087 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2088 }
2089
2090 static robj *createListObject(void) {
2091 list *l = listCreate();
2092
2093 listSetFreeMethod(l,decrRefCount);
2094 return createObject(REDIS_LIST,l);
2095 }
2096
2097 static robj *createSetObject(void) {
2098 dict *d = dictCreate(&setDictType,NULL);
2099 return createObject(REDIS_SET,d);
2100 }
2101
2102 static robj *createZsetObject(void) {
2103 zset *zs = zmalloc(sizeof(*zs));
2104
2105 zs->dict = dictCreate(&zsetDictType,NULL);
2106 zs->zsl = zslCreate();
2107 return createObject(REDIS_ZSET,zs);
2108 }
2109
2110 static void freeStringObject(robj *o) {
2111 if (o->encoding == REDIS_ENCODING_RAW) {
2112 sdsfree(o->ptr);
2113 }
2114 }
2115
2116 static void freeListObject(robj *o) {
2117 listRelease((list*) o->ptr);
2118 }
2119
2120 static void freeSetObject(robj *o) {
2121 dictRelease((dict*) o->ptr);
2122 }
2123
2124 static void freeZsetObject(robj *o) {
2125 zset *zs = o->ptr;
2126
2127 dictRelease(zs->dict);
2128 zslFree(zs->zsl);
2129 zfree(zs);
2130 }
2131
2132 static void freeHashObject(robj *o) {
2133 dictRelease((dict*) o->ptr);
2134 }
2135
2136 static void incrRefCount(robj *o) {
2137 o->refcount++;
2138 #ifdef DEBUG_REFCOUNT
2139 if (o->type == REDIS_STRING)
2140 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2141 #endif
2142 }
2143
2144 static void decrRefCount(void *obj) {
2145 robj *o = obj;
2146
2147 #ifdef DEBUG_REFCOUNT
2148 if (o->type == REDIS_STRING)
2149 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2150 #endif
2151 if (--(o->refcount) == 0) {
2152 switch(o->type) {
2153 case REDIS_STRING: freeStringObject(o); break;
2154 case REDIS_LIST: freeListObject(o); break;
2155 case REDIS_SET: freeSetObject(o); break;
2156 case REDIS_ZSET: freeZsetObject(o); break;
2157 case REDIS_HASH: freeHashObject(o); break;
2158 default: assert(0 != 0); break;
2159 }
2160 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2161 !listAddNodeHead(server.objfreelist,o))
2162 zfree(o);
2163 }
2164 }
2165
2166 static robj *lookupKey(redisDb *db, robj *key) {
2167 dictEntry *de = dictFind(db->dict,key);
2168 return de ? dictGetEntryVal(de) : NULL;
2169 }
2170
2171 static robj *lookupKeyRead(redisDb *db, robj *key) {
2172 expireIfNeeded(db,key);
2173 return lookupKey(db,key);
2174 }
2175
2176 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2177 deleteIfVolatile(db,key);
2178 return lookupKey(db,key);
2179 }
2180
2181 static int deleteKey(redisDb *db, robj *key) {
2182 int retval;
2183
2184 /* We need to protect key from destruction: after the first dictDelete()
2185 * it may happen that 'key' is no longer valid if we don't increment
2186 * it's count. This may happen when we get the object reference directly
2187 * from the hash table with dictRandomKey() or dict iterators */
2188 incrRefCount(key);
2189 if (dictSize(db->expires)) dictDelete(db->expires,key);
2190 retval = dictDelete(db->dict,key);
2191 decrRefCount(key);
2192
2193 return retval == DICT_OK;
2194 }
2195
2196 /* Try to share an object against the shared objects pool */
2197 static robj *tryObjectSharing(robj *o) {
2198 struct dictEntry *de;
2199 unsigned long c;
2200
2201 if (o == NULL || server.shareobjects == 0) return o;
2202
2203 assert(o->type == REDIS_STRING);
2204 de = dictFind(server.sharingpool,o);
2205 if (de) {
2206 robj *shared = dictGetEntryKey(de);
2207
2208 c = ((unsigned long) dictGetEntryVal(de))+1;
2209 dictGetEntryVal(de) = (void*) c;
2210 incrRefCount(shared);
2211 decrRefCount(o);
2212 return shared;
2213 } else {
2214 /* Here we are using a stream algorihtm: Every time an object is
2215 * shared we increment its count, everytime there is a miss we
2216 * recrement the counter of a random object. If this object reaches
2217 * zero we remove the object and put the current object instead. */
2218 if (dictSize(server.sharingpool) >=
2219 server.sharingpoolsize) {
2220 de = dictGetRandomKey(server.sharingpool);
2221 assert(de != NULL);
2222 c = ((unsigned long) dictGetEntryVal(de))-1;
2223 dictGetEntryVal(de) = (void*) c;
2224 if (c == 0) {
2225 dictDelete(server.sharingpool,de->key);
2226 }
2227 } else {
2228 c = 0; /* If the pool is empty we want to add this object */
2229 }
2230 if (c == 0) {
2231 int retval;
2232
2233 retval = dictAdd(server.sharingpool,o,(void*)1);
2234 assert(retval == DICT_OK);
2235 incrRefCount(o);
2236 }
2237 return o;
2238 }
2239 }
2240
2241 /* Check if the nul-terminated string 's' can be represented by a long
2242 * (that is, is a number that fits into long without any other space or
2243 * character before or after the digits).
2244 *
2245 * If so, the function returns REDIS_OK and *longval is set to the value
2246 * of the number. Otherwise REDIS_ERR is returned */
2247 static int isStringRepresentableAsLong(sds s, long *longval) {
2248 char buf[32], *endptr;
2249 long value;
2250 int slen;
2251
2252 value = strtol(s, &endptr, 10);
2253 if (endptr[0] != '\0') return REDIS_ERR;
2254 slen = snprintf(buf,32,"%ld",value);
2255
2256 /* If the number converted back into a string is not identical
2257 * then it's not possible to encode the string as integer */
2258 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2259 if (longval) *longval = value;
2260 return REDIS_OK;
2261 }
2262
2263 /* Try to encode a string object in order to save space */
2264 static int tryObjectEncoding(robj *o) {
2265 long value;
2266 sds s = o->ptr;
2267
2268 if (o->encoding != REDIS_ENCODING_RAW)
2269 return REDIS_ERR; /* Already encoded */
2270
2271 /* It's not save to encode shared objects: shared objects can be shared
2272 * everywhere in the "object space" of Redis. Encoded objects can only
2273 * appear as "values" (and not, for instance, as keys) */
2274 if (o->refcount > 1) return REDIS_ERR;
2275
2276 /* Currently we try to encode only strings */
2277 assert(o->type == REDIS_STRING);
2278
2279 /* Check if we can represent this string as a long integer */
2280 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2281
2282 /* Ok, this object can be encoded */
2283 o->encoding = REDIS_ENCODING_INT;
2284 sdsfree(o->ptr);
2285 o->ptr = (void*) value;
2286 return REDIS_OK;
2287 }
2288
2289 /* Get a decoded version of an encoded object (returned as a new object).
2290 * If the object is already raw-encoded just increment the ref count. */
2291 static robj *getDecodedObject(robj *o) {
2292 robj *dec;
2293
2294 if (o->encoding == REDIS_ENCODING_RAW) {
2295 incrRefCount(o);
2296 return o;
2297 }
2298 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2299 char buf[32];
2300
2301 snprintf(buf,32,"%ld",(long)o->ptr);
2302 dec = createStringObject(buf,strlen(buf));
2303 return dec;
2304 } else {
2305 assert(1 != 1);
2306 }
2307 }
2308
2309 /* Compare two string objects via strcmp() or alike.
2310 * Note that the objects may be integer-encoded. In such a case we
2311 * use snprintf() to get a string representation of the numbers on the stack
2312 * and compare the strings, it's much faster than calling getDecodedObject(). */
2313 static int compareStringObjects(robj *a, robj *b) {
2314 assert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2315 char bufa[128], bufb[128], *astr, *bstr;
2316 int bothsds = 1;
2317
2318 if (a == b) return 0;
2319 if (a->encoding != REDIS_ENCODING_RAW) {
2320 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2321 astr = bufa;
2322 bothsds = 0;
2323 } else {
2324 astr = a->ptr;
2325 }
2326 if (b->encoding != REDIS_ENCODING_RAW) {
2327 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2328 bstr = bufb;
2329 bothsds = 0;
2330 } else {
2331 bstr = b->ptr;
2332 }
2333 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2334 }
2335
2336 static size_t stringObjectLen(robj *o) {
2337 assert(o->type == REDIS_STRING);
2338 if (o->encoding == REDIS_ENCODING_RAW) {
2339 return sdslen(o->ptr);
2340 } else {
2341 char buf[32];
2342
2343 return snprintf(buf,32,"%ld",(long)o->ptr);
2344 }
2345 }
2346
2347 /*============================ DB saving/loading ============================ */
2348
2349 static int rdbSaveType(FILE *fp, unsigned char type) {
2350 if (fwrite(&type,1,1,fp) == 0) return -1;
2351 return 0;
2352 }
2353
2354 static int rdbSaveTime(FILE *fp, time_t t) {
2355 int32_t t32 = (int32_t) t;
2356 if (fwrite(&t32,4,1,fp) == 0) return -1;
2357 return 0;
2358 }
2359
2360 /* check rdbLoadLen() comments for more info */
2361 static int rdbSaveLen(FILE *fp, uint32_t len) {
2362 unsigned char buf[2];
2363
2364 if (len < (1<<6)) {
2365 /* Save a 6 bit len */
2366 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2367 if (fwrite(buf,1,1,fp) == 0) return -1;
2368 } else if (len < (1<<14)) {
2369 /* Save a 14 bit len */
2370 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2371 buf[1] = len&0xFF;
2372 if (fwrite(buf,2,1,fp) == 0) return -1;
2373 } else {
2374 /* Save a 32 bit len */
2375 buf[0] = (REDIS_RDB_32BITLEN<<6);
2376 if (fwrite(buf,1,1,fp) == 0) return -1;
2377 len = htonl(len);
2378 if (fwrite(&len,4,1,fp) == 0) return -1;
2379 }
2380 return 0;
2381 }
2382
2383 /* String objects in the form "2391" "-100" without any space and with a
2384 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2385 * encoded as integers to save space */
2386 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2387 long long value;
2388 char *endptr, buf[32];
2389
2390 /* Check if it's possible to encode this value as a number */
2391 value = strtoll(s, &endptr, 10);
2392 if (endptr[0] != '\0') return 0;
2393 snprintf(buf,32,"%lld",value);
2394
2395 /* If the number converted back into a string is not identical
2396 * then it's not possible to encode the string as integer */
2397 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2398
2399 /* Finally check if it fits in our ranges */
2400 if (value >= -(1<<7) && value <= (1<<7)-1) {
2401 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2402 enc[1] = value&0xFF;
2403 return 2;
2404 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2405 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2406 enc[1] = value&0xFF;
2407 enc[2] = (value>>8)&0xFF;
2408 return 3;
2409 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2410 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2411 enc[1] = value&0xFF;
2412 enc[2] = (value>>8)&0xFF;
2413 enc[3] = (value>>16)&0xFF;
2414 enc[4] = (value>>24)&0xFF;
2415 return 5;
2416 } else {
2417 return 0;
2418 }
2419 }
2420
2421 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2422 unsigned int comprlen, outlen;
2423 unsigned char byte;
2424 void *out;
2425
2426 /* We require at least four bytes compression for this to be worth it */
2427 outlen = sdslen(obj->ptr)-4;
2428 if (outlen <= 0) return 0;
2429 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2430 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2431 if (comprlen == 0) {
2432 zfree(out);
2433 return 0;
2434 }
2435 /* Data compressed! Let's save it on disk */
2436 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2437 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2438 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2439 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2440 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2441 zfree(out);
2442 return comprlen;
2443
2444 writeerr:
2445 zfree(out);
2446 return -1;
2447 }
2448
2449 /* Save a string objet as [len][data] on disk. If the object is a string
2450 * representation of an integer value we try to safe it in a special form */
2451 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2452 size_t len;
2453 int enclen;
2454
2455 len = sdslen(obj->ptr);
2456
2457 /* Try integer encoding */
2458 if (len <= 11) {
2459 unsigned char buf[5];
2460 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2461 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2462 return 0;
2463 }
2464 }
2465
2466 /* Try LZF compression - under 20 bytes it's unable to compress even
2467 * aaaaaaaaaaaaaaaaaa so skip it */
2468 if (len > 20) {
2469 int retval;
2470
2471 retval = rdbSaveLzfStringObject(fp,obj);
2472 if (retval == -1) return -1;
2473 if (retval > 0) return 0;
2474 /* retval == 0 means data can't be compressed, save the old way */
2475 }
2476
2477 /* Store verbatim */
2478 if (rdbSaveLen(fp,len) == -1) return -1;
2479 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2480 return 0;
2481 }
2482
2483 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2484 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2485 int retval;
2486
2487 obj = getDecodedObject(obj);
2488 retval = rdbSaveStringObjectRaw(fp,obj);
2489 decrRefCount(obj);
2490 return retval;
2491 }
2492
2493 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2494 * 8 bit integer specifing the length of the representation.
2495 * This 8 bit integer has special values in order to specify the following
2496 * conditions:
2497 * 253: not a number
2498 * 254: + inf
2499 * 255: - inf
2500 */
2501 static int rdbSaveDoubleValue(FILE *fp, double val) {
2502 unsigned char buf[128];
2503 int len;
2504
2505 if (isnan(val)) {
2506 buf[0] = 253;
2507 len = 1;
2508 } else if (!isfinite(val)) {
2509 len = 1;
2510 buf[0] = (val < 0) ? 255 : 254;
2511 } else {
2512 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2513 buf[0] = strlen((char*)buf+1);
2514 len = buf[0]+1;
2515 }
2516 if (fwrite(buf,len,1,fp) == 0) return -1;
2517 return 0;
2518 }
2519
2520 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2521 static int rdbSave(char *filename) {
2522 dictIterator *di = NULL;
2523 dictEntry *de;
2524 FILE *fp;
2525 char tmpfile[256];
2526 int j;
2527 time_t now = time(NULL);
2528
2529 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2530 fp = fopen(tmpfile,"w");
2531 if (!fp) {
2532 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2533 return REDIS_ERR;
2534 }
2535 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2536 for (j = 0; j < server.dbnum; j++) {
2537 redisDb *db = server.db+j;
2538 dict *d = db->dict;
2539 if (dictSize(d) == 0) continue;
2540 di = dictGetIterator(d);
2541 if (!di) {
2542 fclose(fp);
2543 return REDIS_ERR;
2544 }
2545
2546 /* Write the SELECT DB opcode */
2547 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2548 if (rdbSaveLen(fp,j) == -1) goto werr;
2549
2550 /* Iterate this DB writing every entry */
2551 while((de = dictNext(di)) != NULL) {
2552 robj *key = dictGetEntryKey(de);
2553 robj *o = dictGetEntryVal(de);
2554 time_t expiretime = getExpire(db,key);
2555
2556 /* Save the expire time */
2557 if (expiretime != -1) {
2558 /* If this key is already expired skip it */
2559 if (expiretime < now) continue;
2560 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2561 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2562 }
2563 /* Save the key and associated value */
2564 if (rdbSaveType(fp,o->type) == -1) goto werr;
2565 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2566 if (o->type == REDIS_STRING) {
2567 /* Save a string value */
2568 if (rdbSaveStringObject(fp,o) == -1) goto werr;
2569 } else if (o->type == REDIS_LIST) {
2570 /* Save a list value */
2571 list *list = o->ptr;
2572 listNode *ln;
2573
2574 listRewind(list);
2575 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
2576 while((ln = listYield(list))) {
2577 robj *eleobj = listNodeValue(ln);
2578
2579 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2580 }
2581 } else if (o->type == REDIS_SET) {
2582 /* Save a set value */
2583 dict *set = o->ptr;
2584 dictIterator *di = dictGetIterator(set);
2585 dictEntry *de;
2586
2587 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
2588 while((de = dictNext(di)) != NULL) {
2589 robj *eleobj = dictGetEntryKey(de);
2590
2591 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2592 }
2593 dictReleaseIterator(di);
2594 } else if (o->type == REDIS_ZSET) {
2595 /* Save a set value */
2596 zset *zs = o->ptr;
2597 dictIterator *di = dictGetIterator(zs->dict);
2598 dictEntry *de;
2599
2600 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) goto werr;
2601 while((de = dictNext(di)) != NULL) {
2602 robj *eleobj = dictGetEntryKey(de);
2603 double *score = dictGetEntryVal(de);
2604
2605 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
2606 if (rdbSaveDoubleValue(fp,*score) == -1) goto werr;
2607 }
2608 dictReleaseIterator(di);
2609 } else {
2610 assert(0 != 0);
2611 }
2612 }
2613 dictReleaseIterator(di);
2614 }
2615 /* EOF opcode */
2616 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2617
2618 /* Make sure data will not remain on the OS's output buffers */
2619 fflush(fp);
2620 fsync(fileno(fp));
2621 fclose(fp);
2622
2623 /* Use RENAME to make sure the DB file is changed atomically only
2624 * if the generate DB file is ok. */
2625 if (rename(tmpfile,filename) == -1) {
2626 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2627 unlink(tmpfile);
2628 return REDIS_ERR;
2629 }
2630 redisLog(REDIS_NOTICE,"DB saved on disk");
2631 server.dirty = 0;
2632 server.lastsave = time(NULL);
2633 return REDIS_OK;
2634
2635 werr:
2636 fclose(fp);
2637 unlink(tmpfile);
2638 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2639 if (di) dictReleaseIterator(di);
2640 return REDIS_ERR;
2641 }
2642
2643 static int rdbSaveBackground(char *filename) {
2644 pid_t childpid;
2645
2646 if (server.bgsavechildpid != -1) return REDIS_ERR;
2647 if ((childpid = fork()) == 0) {
2648 /* Child */
2649 close(server.fd);
2650 if (rdbSave(filename) == REDIS_OK) {
2651 exit(0);
2652 } else {
2653 exit(1);
2654 }
2655 } else {
2656 /* Parent */
2657 if (childpid == -1) {
2658 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2659 strerror(errno));
2660 return REDIS_ERR;
2661 }
2662 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2663 server.bgsavechildpid = childpid;
2664 return REDIS_OK;
2665 }
2666 return REDIS_OK; /* unreached */
2667 }
2668
2669 static void rdbRemoveTempFile(pid_t childpid) {
2670 char tmpfile[256];
2671
2672 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2673 unlink(tmpfile);
2674 }
2675
2676 static int rdbLoadType(FILE *fp) {
2677 unsigned char type;
2678 if (fread(&type,1,1,fp) == 0) return -1;
2679 return type;
2680 }
2681
2682 static time_t rdbLoadTime(FILE *fp) {
2683 int32_t t32;
2684 if (fread(&t32,4,1,fp) == 0) return -1;
2685 return (time_t) t32;
2686 }
2687
2688 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2689 * of this file for a description of how this are stored on disk.
2690 *
2691 * isencoded is set to 1 if the readed length is not actually a length but
2692 * an "encoding type", check the above comments for more info */
2693 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2694 unsigned char buf[2];
2695 uint32_t len;
2696
2697 if (isencoded) *isencoded = 0;
2698 if (rdbver == 0) {
2699 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2700 return ntohl(len);
2701 } else {
2702 int type;
2703
2704 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2705 type = (buf[0]&0xC0)>>6;
2706 if (type == REDIS_RDB_6BITLEN) {
2707 /* Read a 6 bit len */
2708 return buf[0]&0x3F;
2709 } else if (type == REDIS_RDB_ENCVAL) {
2710 /* Read a 6 bit len encoding type */
2711 if (isencoded) *isencoded = 1;
2712 return buf[0]&0x3F;
2713 } else if (type == REDIS_RDB_14BITLEN) {
2714 /* Read a 14 bit len */
2715 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2716 return ((buf[0]&0x3F)<<8)|buf[1];
2717 } else {
2718 /* Read a 32 bit len */
2719 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2720 return ntohl(len);
2721 }
2722 }
2723 }
2724
2725 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2726 unsigned char enc[4];
2727 long long val;
2728
2729 if (enctype == REDIS_RDB_ENC_INT8) {
2730 if (fread(enc,1,1,fp) == 0) return NULL;
2731 val = (signed char)enc[0];
2732 } else if (enctype == REDIS_RDB_ENC_INT16) {
2733 uint16_t v;
2734 if (fread(enc,2,1,fp) == 0) return NULL;
2735 v = enc[0]|(enc[1]<<8);
2736 val = (int16_t)v;
2737 } else if (enctype == REDIS_RDB_ENC_INT32) {
2738 uint32_t v;
2739 if (fread(enc,4,1,fp) == 0) return NULL;
2740 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2741 val = (int32_t)v;
2742 } else {
2743 val = 0; /* anti-warning */
2744 assert(0!=0);
2745 }
2746 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2747 }
2748
2749 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2750 unsigned int len, clen;
2751 unsigned char *c = NULL;
2752 sds val = NULL;
2753
2754 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2755 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2756 if ((c = zmalloc(clen)) == NULL) goto err;
2757 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2758 if (fread(c,clen,1,fp) == 0) goto err;
2759 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2760 zfree(c);
2761 return createObject(REDIS_STRING,val);
2762 err:
2763 zfree(c);
2764 sdsfree(val);
2765 return NULL;
2766 }
2767
2768 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2769 int isencoded;
2770 uint32_t len;
2771 sds val;
2772
2773 len = rdbLoadLen(fp,rdbver,&isencoded);
2774 if (isencoded) {
2775 switch(len) {
2776 case REDIS_RDB_ENC_INT8:
2777 case REDIS_RDB_ENC_INT16:
2778 case REDIS_RDB_ENC_INT32:
2779 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2780 case REDIS_RDB_ENC_LZF:
2781 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2782 default:
2783 assert(0!=0);
2784 }
2785 }
2786
2787 if (len == REDIS_RDB_LENERR) return NULL;
2788 val = sdsnewlen(NULL,len);
2789 if (len && fread(val,len,1,fp) == 0) {
2790 sdsfree(val);
2791 return NULL;
2792 }
2793 return tryObjectSharing(createObject(REDIS_STRING,val));
2794 }
2795
2796 /* For information about double serialization check rdbSaveDoubleValue() */
2797 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2798 char buf[128];
2799 unsigned char len;
2800
2801 if (fread(&len,1,1,fp) == 0) return -1;
2802 switch(len) {
2803 case 255: *val = R_NegInf; return 0;
2804 case 254: *val = R_PosInf; return 0;
2805 case 253: *val = R_Nan; return 0;
2806 default:
2807 if (fread(buf,len,1,fp) == 0) return -1;
2808 sscanf(buf, "%lg", val);
2809 return 0;
2810 }
2811 }
2812
2813 static int rdbLoad(char *filename) {
2814 FILE *fp;
2815 robj *keyobj = NULL;
2816 uint32_t dbid;
2817 int type, retval, rdbver;
2818 dict *d = server.db[0].dict;
2819 redisDb *db = server.db+0;
2820 char buf[1024];
2821 time_t expiretime = -1, now = time(NULL);
2822
2823 fp = fopen(filename,"r");
2824 if (!fp) return REDIS_ERR;
2825 if (fread(buf,9,1,fp) == 0) goto eoferr;
2826 buf[9] = '\0';
2827 if (memcmp(buf,"REDIS",5) != 0) {
2828 fclose(fp);
2829 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2830 return REDIS_ERR;
2831 }
2832 rdbver = atoi(buf+5);
2833 if (rdbver > 1) {
2834 fclose(fp);
2835 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2836 return REDIS_ERR;
2837 }
2838 while(1) {
2839 robj *o;
2840
2841 /* Read type. */
2842 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2843 if (type == REDIS_EXPIRETIME) {
2844 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2845 /* We read the time so we need to read the object type again */
2846 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2847 }
2848 if (type == REDIS_EOF) break;
2849 /* Handle SELECT DB opcode as a special case */
2850 if (type == REDIS_SELECTDB) {
2851 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2852 goto eoferr;
2853 if (dbid >= (unsigned)server.dbnum) {
2854 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2855 exit(1);
2856 }
2857 db = server.db+dbid;
2858 d = db->dict;
2859 continue;
2860 }
2861 /* Read key */
2862 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2863
2864 if (type == REDIS_STRING) {
2865 /* Read string value */
2866 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2867 tryObjectEncoding(o);
2868 } else if (type == REDIS_LIST || type == REDIS_SET) {
2869 /* Read list/set value */
2870 uint32_t listlen;
2871
2872 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2873 goto eoferr;
2874 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2875 /* Load every single element of the list/set */
2876 while(listlen--) {
2877 robj *ele;
2878
2879 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2880 tryObjectEncoding(ele);
2881 if (type == REDIS_LIST) {
2882 listAddNodeTail((list*)o->ptr,ele);
2883 } else {
2884 dictAdd((dict*)o->ptr,ele,NULL);
2885 }
2886 }
2887 } else if (type == REDIS_ZSET) {
2888 /* Read list/set value */
2889 uint32_t zsetlen;
2890 zset *zs;
2891
2892 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2893 goto eoferr;
2894 o = createZsetObject();
2895 zs = o->ptr;
2896 /* Load every single element of the list/set */
2897 while(zsetlen--) {
2898 robj *ele;
2899 double *score = zmalloc(sizeof(double));
2900
2901 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2902 tryObjectEncoding(ele);
2903 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
2904 dictAdd(zs->dict,ele,score);
2905 zslInsert(zs->zsl,*score,ele);
2906 incrRefCount(ele); /* added to skiplist */
2907 }
2908 } else {
2909 assert(0 != 0);
2910 }
2911 /* Add the new object in the hash table */
2912 retval = dictAdd(d,keyobj,o);
2913 if (retval == DICT_ERR) {
2914 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2915 exit(1);
2916 }
2917 /* Set the expire time if needed */
2918 if (expiretime != -1) {
2919 setExpire(db,keyobj,expiretime);
2920 /* Delete this key if already expired */
2921 if (expiretime < now) deleteKey(db,keyobj);
2922 expiretime = -1;
2923 }
2924 keyobj = o = NULL;
2925 }
2926 fclose(fp);
2927 return REDIS_OK;
2928
2929 eoferr: /* unexpected end of file is handled here with a fatal exit */
2930 if (keyobj) decrRefCount(keyobj);
2931 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
2932 exit(1);
2933 return REDIS_ERR; /* Just to avoid warning */
2934 }
2935
2936 /*================================== Commands =============================== */
2937
2938 static void authCommand(redisClient *c) {
2939 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2940 c->authenticated = 1;
2941 addReply(c,shared.ok);
2942 } else {
2943 c->authenticated = 0;
2944 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
2945 }
2946 }
2947
2948 static void pingCommand(redisClient *c) {
2949 addReply(c,shared.pong);
2950 }
2951
2952 static void echoCommand(redisClient *c) {
2953 addReplyBulkLen(c,c->argv[1]);
2954 addReply(c,c->argv[1]);
2955 addReply(c,shared.crlf);
2956 }
2957
2958 /*=================================== Strings =============================== */
2959
2960 static void setGenericCommand(redisClient *c, int nx) {
2961 int retval;
2962
2963 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2964 if (retval == DICT_ERR) {
2965 if (!nx) {
2966 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2967 incrRefCount(c->argv[2]);
2968 } else {
2969 addReply(c,shared.czero);
2970 return;
2971 }
2972 } else {
2973 incrRefCount(c->argv[1]);
2974 incrRefCount(c->argv[2]);
2975 }
2976 server.dirty++;
2977 removeExpire(c->db,c->argv[1]);
2978 addReply(c, nx ? shared.cone : shared.ok);
2979 }
2980
2981 static void setCommand(redisClient *c) {
2982 setGenericCommand(c,0);
2983 }
2984
2985 static void setnxCommand(redisClient *c) {
2986 setGenericCommand(c,1);
2987 }
2988
2989 static void getCommand(redisClient *c) {
2990 robj *o = lookupKeyRead(c->db,c->argv[1]);
2991
2992 if (o == NULL) {
2993 addReply(c,shared.nullbulk);
2994 } else {
2995 if (o->type != REDIS_STRING) {
2996 addReply(c,shared.wrongtypeerr);
2997 } else {
2998 addReplyBulkLen(c,o);
2999 addReply(c,o);
3000 addReply(c,shared.crlf);
3001 }
3002 }
3003 }
3004
3005 static void getsetCommand(redisClient *c) {
3006 getCommand(c);
3007 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3008 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3009 } else {
3010 incrRefCount(c->argv[1]);
3011 }
3012 incrRefCount(c->argv[2]);
3013 server.dirty++;
3014 removeExpire(c->db,c->argv[1]);
3015 }
3016
3017 static void mgetCommand(redisClient *c) {
3018 int j;
3019
3020 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3021 for (j = 1; j < c->argc; j++) {
3022 robj *o = lookupKeyRead(c->db,c->argv[j]);
3023 if (o == NULL) {
3024 addReply(c,shared.nullbulk);
3025 } else {
3026 if (o->type != REDIS_STRING) {
3027 addReply(c,shared.nullbulk);
3028 } else {
3029 addReplyBulkLen(c,o);
3030 addReply(c,o);
3031 addReply(c,shared.crlf);
3032 }
3033 }
3034 }
3035 }
3036
3037 static void msetGenericCommand(redisClient *c, int nx) {
3038 int j;
3039
3040 if ((c->argc % 2) == 0) {
3041 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
3042 return;
3043 }
3044 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3045 * set nothing at all if at least one already key exists. */
3046 if (nx) {
3047 for (j = 1; j < c->argc; j += 2) {
3048 if (dictFind(c->db->dict,c->argv[j]) != NULL) {
3049 addReply(c, shared.czero);
3050 return;
3051 }
3052 }
3053 }
3054
3055 for (j = 1; j < c->argc; j += 2) {
3056 int retval;
3057
3058 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3059 if (retval == DICT_ERR) {
3060 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3061 incrRefCount(c->argv[j+1]);
3062 } else {
3063 incrRefCount(c->argv[j]);
3064 incrRefCount(c->argv[j+1]);
3065 }
3066 removeExpire(c->db,c->argv[j]);
3067 }
3068 server.dirty += (c->argc-1)/2;
3069 addReply(c, nx ? shared.cone : shared.ok);
3070 }
3071
3072 static void msetCommand(redisClient *c) {
3073 msetGenericCommand(c,0);
3074 }
3075
3076 static void msetnxCommand(redisClient *c) {
3077 msetGenericCommand(c,1);
3078 }
3079
3080 static void incrDecrCommand(redisClient *c, long long incr) {
3081 long long value;
3082 int retval;
3083 robj *o;
3084
3085 o = lookupKeyWrite(c->db,c->argv[1]);
3086 if (o == NULL) {
3087 value = 0;
3088 } else {
3089 if (o->type != REDIS_STRING) {
3090 value = 0;
3091 } else {
3092 char *eptr;
3093
3094 if (o->encoding == REDIS_ENCODING_RAW)
3095 value = strtoll(o->ptr, &eptr, 10);
3096 else if (o->encoding == REDIS_ENCODING_INT)
3097 value = (long)o->ptr;
3098 else
3099 assert(1 != 1);
3100 }
3101 }
3102
3103 value += incr;
3104 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3105 tryObjectEncoding(o);
3106 retval = dictAdd(c->db->dict,c->argv[1],o);
3107 if (retval == DICT_ERR) {
3108 dictReplace(c->db->dict,c->argv[1],o);
3109 removeExpire(c->db,c->argv[1]);
3110 } else {
3111 incrRefCount(c->argv[1]);
3112 }
3113 server.dirty++;
3114 addReply(c,shared.colon);
3115 addReply(c,o);
3116 addReply(c,shared.crlf);
3117 }
3118
3119 static void incrCommand(redisClient *c) {
3120 incrDecrCommand(c,1);
3121 }
3122
3123 static void decrCommand(redisClient *c) {
3124 incrDecrCommand(c,-1);
3125 }
3126
3127 static void incrbyCommand(redisClient *c) {
3128 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3129 incrDecrCommand(c,incr);
3130 }
3131
3132 static void decrbyCommand(redisClient *c) {
3133 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3134 incrDecrCommand(c,-incr);
3135 }
3136
3137 /* ========================= Type agnostic commands ========================= */
3138
3139 static void delCommand(redisClient *c) {
3140 int deleted = 0, j;
3141
3142 for (j = 1; j < c->argc; j++) {
3143 if (deleteKey(c->db,c->argv[j])) {
3144 server.dirty++;
3145 deleted++;
3146 }
3147 }
3148 switch(deleted) {
3149 case 0:
3150 addReply(c,shared.czero);
3151 break;
3152 case 1:
3153 addReply(c,shared.cone);
3154 break;
3155 default:
3156 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3157 break;
3158 }
3159 }
3160
3161 static void existsCommand(redisClient *c) {
3162 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3163 }
3164
3165 static void selectCommand(redisClient *c) {
3166 int id = atoi(c->argv[1]->ptr);
3167
3168 if (selectDb(c,id) == REDIS_ERR) {
3169 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3170 } else {
3171 addReply(c,shared.ok);
3172 }
3173 }
3174
3175 static void randomkeyCommand(redisClient *c) {
3176 dictEntry *de;
3177
3178 while(1) {
3179 de = dictGetRandomKey(c->db->dict);
3180 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3181 }
3182 if (de == NULL) {
3183 addReply(c,shared.plus);
3184 addReply(c,shared.crlf);
3185 } else {
3186 addReply(c,shared.plus);
3187 addReply(c,dictGetEntryKey(de));
3188 addReply(c,shared.crlf);
3189 }
3190 }
3191
3192 static void keysCommand(redisClient *c) {
3193 dictIterator *di;
3194 dictEntry *de;
3195 sds pattern = c->argv[1]->ptr;
3196 int plen = sdslen(pattern);
3197 int numkeys = 0, keyslen = 0;
3198 robj *lenobj = createObject(REDIS_STRING,NULL);
3199
3200 di = dictGetIterator(c->db->dict);
3201 addReply(c,lenobj);
3202 decrRefCount(lenobj);
3203 while((de = dictNext(di)) != NULL) {
3204 robj *keyobj = dictGetEntryKey(de);
3205
3206 sds key = keyobj->ptr;
3207 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3208 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3209 if (expireIfNeeded(c->db,keyobj) == 0) {
3210 if (numkeys != 0)
3211 addReply(c,shared.space);
3212 addReply(c,keyobj);
3213 numkeys++;
3214 keyslen += sdslen(key);
3215 }
3216 }
3217 }
3218 dictReleaseIterator(di);
3219 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3220 addReply(c,shared.crlf);
3221 }
3222
3223 static void dbsizeCommand(redisClient *c) {
3224 addReplySds(c,
3225 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3226 }
3227
3228 static void lastsaveCommand(redisClient *c) {
3229 addReplySds(c,
3230 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3231 }
3232
3233 static void typeCommand(redisClient *c) {
3234 robj *o;
3235 char *type;
3236
3237 o = lookupKeyRead(c->db,c->argv[1]);
3238 if (o == NULL) {
3239 type = "+none";
3240 } else {
3241 switch(o->type) {
3242 case REDIS_STRING: type = "+string"; break;
3243 case REDIS_LIST: type = "+list"; break;
3244 case REDIS_SET: type = "+set"; break;
3245 case REDIS_ZSET: type = "+zset"; break;
3246 default: type = "unknown"; break;
3247 }
3248 }
3249 addReplySds(c,sdsnew(type));
3250 addReply(c,shared.crlf);
3251 }
3252
3253 static void saveCommand(redisClient *c) {
3254 if (server.bgsavechildpid != -1) {
3255 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3256 return;
3257 }
3258 if (rdbSave(server.dbfilename) == REDIS_OK) {
3259 addReply(c,shared.ok);
3260 } else {
3261 addReply(c,shared.err);
3262 }
3263 }
3264
3265 static void bgsaveCommand(redisClient *c) {
3266 if (server.bgsavechildpid != -1) {
3267 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3268 return;
3269 }
3270 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3271 addReply(c,shared.ok);
3272 } else {
3273 addReply(c,shared.err);
3274 }
3275 }
3276
3277 static void shutdownCommand(redisClient *c) {
3278 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3279 /* Kill the saving child if there is a background saving in progress.
3280 We want to avoid race conditions, for instance our saving child may
3281 overwrite the synchronous saving did by SHUTDOWN. */
3282 if (server.bgsavechildpid != -1) {
3283 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3284 kill(server.bgsavechildpid,SIGKILL);
3285 rdbRemoveTempFile(server.bgsavechildpid);
3286 }
3287 /* SYNC SAVE */
3288 if (rdbSave(server.dbfilename) == REDIS_OK) {
3289 if (server.daemonize)
3290 unlink(server.pidfile);
3291 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3292 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3293 exit(1);
3294 } else {
3295 /* Ooops.. error saving! The best we can do is to continue operating.
3296 * Note that if there was a background saving process, in the next
3297 * cron() Redis will be notified that the background saving aborted,
3298 * handling special stuff like slaves pending for synchronization... */
3299 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3300 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3301 }
3302 }
3303
3304 static void renameGenericCommand(redisClient *c, int nx) {
3305 robj *o;
3306
3307 /* To use the same key as src and dst is probably an error */
3308 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3309 addReply(c,shared.sameobjecterr);
3310 return;
3311 }
3312
3313 o = lookupKeyWrite(c->db,c->argv[1]);
3314 if (o == NULL) {
3315 addReply(c,shared.nokeyerr);
3316 return;
3317 }
3318 incrRefCount(o);
3319 deleteIfVolatile(c->db,c->argv[2]);
3320 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3321 if (nx) {
3322 decrRefCount(o);
3323 addReply(c,shared.czero);
3324 return;
3325 }
3326 dictReplace(c->db->dict,c->argv[2],o);
3327 } else {
3328 incrRefCount(c->argv[2]);
3329 }
3330 deleteKey(c->db,c->argv[1]);
3331 server.dirty++;
3332 addReply(c,nx ? shared.cone : shared.ok);
3333 }
3334
3335 static void renameCommand(redisClient *c) {
3336 renameGenericCommand(c,0);
3337 }
3338
3339 static void renamenxCommand(redisClient *c) {
3340 renameGenericCommand(c,1);
3341 }
3342
3343 static void moveCommand(redisClient *c) {
3344 robj *o;
3345 redisDb *src, *dst;
3346 int srcid;
3347
3348 /* Obtain source and target DB pointers */
3349 src = c->db;
3350 srcid = c->db->id;
3351 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3352 addReply(c,shared.outofrangeerr);
3353 return;
3354 }
3355 dst = c->db;
3356 selectDb(c,srcid); /* Back to the source DB */
3357
3358 /* If the user is moving using as target the same
3359 * DB as the source DB it is probably an error. */
3360 if (src == dst) {
3361 addReply(c,shared.sameobjecterr);
3362 return;
3363 }
3364
3365 /* Check if the element exists and get a reference */
3366 o = lookupKeyWrite(c->db,c->argv[1]);
3367 if (!o) {
3368 addReply(c,shared.czero);
3369 return;
3370 }
3371
3372 /* Try to add the element to the target DB */
3373 deleteIfVolatile(dst,c->argv[1]);
3374 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3375 addReply(c,shared.czero);
3376 return;
3377 }
3378 incrRefCount(c->argv[1]);
3379 incrRefCount(o);
3380
3381 /* OK! key moved, free the entry in the source DB */
3382 deleteKey(src,c->argv[1]);
3383 server.dirty++;
3384 addReply(c,shared.cone);
3385 }
3386
3387 /* =================================== Lists ================================ */
3388 static void pushGenericCommand(redisClient *c, int where) {
3389 robj *lobj;
3390 list *list;
3391
3392 lobj = lookupKeyWrite(c->db,c->argv[1]);
3393 if (lobj == NULL) {
3394 lobj = createListObject();
3395 list = lobj->ptr;
3396 if (where == REDIS_HEAD) {
3397 listAddNodeHead(list,c->argv[2]);
3398 } else {
3399 listAddNodeTail(list,c->argv[2]);
3400 }
3401 dictAdd(c->db->dict,c->argv[1],lobj);
3402 incrRefCount(c->argv[1]);
3403 incrRefCount(c->argv[2]);
3404 } else {
3405 if (lobj->type != REDIS_LIST) {
3406 addReply(c,shared.wrongtypeerr);
3407 return;
3408 }
3409 list = lobj->ptr;
3410 if (where == REDIS_HEAD) {
3411 listAddNodeHead(list,c->argv[2]);
3412 } else {
3413 listAddNodeTail(list,c->argv[2]);
3414 }
3415 incrRefCount(c->argv[2]);
3416 }
3417 server.dirty++;
3418 addReply(c,shared.ok);
3419 }
3420
3421 static void lpushCommand(redisClient *c) {
3422 pushGenericCommand(c,REDIS_HEAD);
3423 }
3424
3425 static void rpushCommand(redisClient *c) {
3426 pushGenericCommand(c,REDIS_TAIL);
3427 }
3428
3429 static void llenCommand(redisClient *c) {
3430 robj *o;
3431 list *l;
3432
3433 o = lookupKeyRead(c->db,c->argv[1]);
3434 if (o == NULL) {
3435 addReply(c,shared.czero);
3436 return;
3437 } else {
3438 if (o->type != REDIS_LIST) {
3439 addReply(c,shared.wrongtypeerr);
3440 } else {
3441 l = o->ptr;
3442 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3443 }
3444 }
3445 }
3446
3447 static void lindexCommand(redisClient *c) {
3448 robj *o;
3449 int index = atoi(c->argv[2]->ptr);
3450
3451 o = lookupKeyRead(c->db,c->argv[1]);
3452 if (o == NULL) {
3453 addReply(c,shared.nullbulk);
3454 } else {
3455 if (o->type != REDIS_LIST) {
3456 addReply(c,shared.wrongtypeerr);
3457 } else {
3458 list *list = o->ptr;
3459 listNode *ln;
3460
3461 ln = listIndex(list, index);
3462 if (ln == NULL) {
3463 addReply(c,shared.nullbulk);
3464 } else {
3465 robj *ele = listNodeValue(ln);
3466 addReplyBulkLen(c,ele);
3467 addReply(c,ele);
3468 addReply(c,shared.crlf);
3469 }
3470 }
3471 }
3472 }
3473
3474 static void lsetCommand(redisClient *c) {
3475 robj *o;
3476 int index = atoi(c->argv[2]->ptr);
3477
3478 o = lookupKeyWrite(c->db,c->argv[1]);
3479 if (o == NULL) {
3480 addReply(c,shared.nokeyerr);
3481 } else {
3482 if (o->type != REDIS_LIST) {
3483 addReply(c,shared.wrongtypeerr);
3484 } else {
3485 list *list = o->ptr;
3486 listNode *ln;
3487
3488 ln = listIndex(list, index);
3489 if (ln == NULL) {
3490 addReply(c,shared.outofrangeerr);
3491 } else {
3492 robj *ele = listNodeValue(ln);
3493
3494 decrRefCount(ele);
3495 listNodeValue(ln) = c->argv[3];
3496 incrRefCount(c->argv[3]);
3497 addReply(c,shared.ok);
3498 server.dirty++;
3499 }
3500 }
3501 }
3502 }
3503
3504 static void popGenericCommand(redisClient *c, int where) {
3505 robj *o;
3506
3507 o = lookupKeyWrite(c->db,c->argv[1]);
3508 if (o == NULL) {
3509 addReply(c,shared.nullbulk);
3510 } else {
3511 if (o->type != REDIS_LIST) {
3512 addReply(c,shared.wrongtypeerr);
3513 } else {
3514 list *list = o->ptr;
3515 listNode *ln;
3516
3517 if (where == REDIS_HEAD)
3518 ln = listFirst(list);
3519 else
3520 ln = listLast(list);
3521
3522 if (ln == NULL) {
3523 addReply(c,shared.nullbulk);
3524 } else {
3525 robj *ele = listNodeValue(ln);
3526 addReplyBulkLen(c,ele);
3527 addReply(c,ele);
3528 addReply(c,shared.crlf);
3529 listDelNode(list,ln);
3530 server.dirty++;
3531 }
3532 }
3533 }
3534 }
3535
3536 static void lpopCommand(redisClient *c) {
3537 popGenericCommand(c,REDIS_HEAD);
3538 }
3539
3540 static void rpopCommand(redisClient *c) {
3541 popGenericCommand(c,REDIS_TAIL);
3542 }
3543
3544 static void lrangeCommand(redisClient *c) {
3545 robj *o;
3546 int start = atoi(c->argv[2]->ptr);
3547 int end = atoi(c->argv[3]->ptr);
3548
3549 o = lookupKeyRead(c->db,c->argv[1]);
3550 if (o == NULL) {
3551 addReply(c,shared.nullmultibulk);
3552 } else {
3553 if (o->type != REDIS_LIST) {
3554 addReply(c,shared.wrongtypeerr);
3555 } else {
3556 list *list = o->ptr;
3557 listNode *ln;
3558 int llen = listLength(list);
3559 int rangelen, j;
3560 robj *ele;
3561
3562 /* convert negative indexes */
3563 if (start < 0) start = llen+start;
3564 if (end < 0) end = llen+end;
3565 if (start < 0) start = 0;
3566 if (end < 0) end = 0;
3567
3568 /* indexes sanity checks */
3569 if (start > end || start >= llen) {
3570 /* Out of range start or start > end result in empty list */
3571 addReply(c,shared.emptymultibulk);
3572 return;
3573 }
3574 if (end >= llen) end = llen-1;
3575 rangelen = (end-start)+1;
3576
3577 /* Return the result in form of a multi-bulk reply */
3578 ln = listIndex(list, start);
3579 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3580 for (j = 0; j < rangelen; j++) {
3581 ele = listNodeValue(ln);
3582 addReplyBulkLen(c,ele);
3583 addReply(c,ele);
3584 addReply(c,shared.crlf);
3585 ln = ln->next;
3586 }
3587 }
3588 }
3589 }
3590
3591 static void ltrimCommand(redisClient *c) {
3592 robj *o;
3593 int start = atoi(c->argv[2]->ptr);
3594 int end = atoi(c->argv[3]->ptr);
3595
3596 o = lookupKeyWrite(c->db,c->argv[1]);
3597 if (o == NULL) {
3598 addReply(c,shared.nokeyerr);
3599 } else {
3600 if (o->type != REDIS_LIST) {
3601 addReply(c,shared.wrongtypeerr);
3602 } else {
3603 list *list = o->ptr;
3604 listNode *ln;
3605 int llen = listLength(list);
3606 int j, ltrim, rtrim;
3607
3608 /* convert negative indexes */
3609 if (start < 0) start = llen+start;
3610 if (end < 0) end = llen+end;
3611 if (start < 0) start = 0;
3612 if (end < 0) end = 0;
3613
3614 /* indexes sanity checks */
3615 if (start > end || start >= llen) {
3616 /* Out of range start or start > end result in empty list */
3617 ltrim = llen;
3618 rtrim = 0;
3619 } else {
3620 if (end >= llen) end = llen-1;
3621 ltrim = start;
3622 rtrim = llen-end-1;
3623 }
3624
3625 /* Remove list elements to perform the trim */
3626 for (j = 0; j < ltrim; j++) {
3627 ln = listFirst(list);
3628 listDelNode(list,ln);
3629 }
3630 for (j = 0; j < rtrim; j++) {
3631 ln = listLast(list);
3632 listDelNode(list,ln);
3633 }
3634 server.dirty++;
3635 addReply(c,shared.ok);
3636 }
3637 }
3638 }
3639
3640 static void lremCommand(redisClient *c) {
3641 robj *o;
3642
3643 o = lookupKeyWrite(c->db,c->argv[1]);
3644 if (o == NULL) {
3645 addReply(c,shared.czero);
3646 } else {
3647 if (o->type != REDIS_LIST) {
3648 addReply(c,shared.wrongtypeerr);
3649 } else {
3650 list *list = o->ptr;
3651 listNode *ln, *next;
3652 int toremove = atoi(c->argv[2]->ptr);
3653 int removed = 0;
3654 int fromtail = 0;
3655
3656 if (toremove < 0) {
3657 toremove = -toremove;
3658 fromtail = 1;
3659 }
3660 ln = fromtail ? list->tail : list->head;
3661 while (ln) {
3662 robj *ele = listNodeValue(ln);
3663
3664 next = fromtail ? ln->prev : ln->next;
3665 if (compareStringObjects(ele,c->argv[3]) == 0) {
3666 listDelNode(list,ln);
3667 server.dirty++;
3668 removed++;
3669 if (toremove && removed == toremove) break;
3670 }
3671 ln = next;
3672 }
3673 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3674 }
3675 }
3676 }
3677
3678 /* This is the semantic of this command:
3679 * RPOPLPUSH srclist dstlist:
3680 * IF LLEN(srclist) > 0
3681 * element = RPOP srclist
3682 * LPUSH dstlist element
3683 * RETURN element
3684 * ELSE
3685 * RETURN nil
3686 * END
3687 * END
3688 *
3689 * The idea is to be able to get an element from a list in a reliable way
3690 * since the element is not just returned but pushed against another list
3691 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3692 */
3693 static void rpoplpushcommand(redisClient *c) {
3694 robj *sobj;
3695
3696 sobj = lookupKeyWrite(c->db,c->argv[1]);
3697 if (sobj == NULL) {
3698 addReply(c,shared.nullbulk);
3699 } else {
3700 if (sobj->type != REDIS_LIST) {
3701 addReply(c,shared.wrongtypeerr);
3702 } else {
3703 list *srclist = sobj->ptr;
3704 listNode *ln = listLast(srclist);
3705
3706 if (ln == NULL) {
3707 addReply(c,shared.nullbulk);
3708 } else {
3709 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3710 robj *ele = listNodeValue(ln);
3711 list *dstlist;
3712
3713 if (dobj == NULL) {
3714
3715 /* Create the list if the key does not exist */
3716 dobj = createListObject();
3717 dictAdd(c->db->dict,c->argv[2],dobj);
3718 incrRefCount(c->argv[2]);
3719 } else if (dobj->type != REDIS_LIST) {
3720 addReply(c,shared.wrongtypeerr);
3721 return;
3722 }
3723 /* Add the element to the target list */
3724 dstlist = dobj->ptr;
3725 listAddNodeHead(dstlist,ele);
3726 incrRefCount(ele);
3727
3728 /* Send the element to the client as reply as well */
3729 addReplyBulkLen(c,ele);
3730 addReply(c,ele);
3731 addReply(c,shared.crlf);
3732
3733 /* Finally remove the element from the source list */
3734 listDelNode(srclist,ln);
3735 server.dirty++;
3736 }
3737 }
3738 }
3739 }
3740
3741
3742 /* ==================================== Sets ================================ */
3743
3744 static void saddCommand(redisClient *c) {
3745 robj *set;
3746
3747 set = lookupKeyWrite(c->db,c->argv[1]);
3748 if (set == NULL) {
3749 set = createSetObject();
3750 dictAdd(c->db->dict,c->argv[1],set);
3751 incrRefCount(c->argv[1]);
3752 } else {
3753 if (set->type != REDIS_SET) {
3754 addReply(c,shared.wrongtypeerr);
3755 return;
3756 }
3757 }
3758 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3759 incrRefCount(c->argv[2]);
3760 server.dirty++;
3761 addReply(c,shared.cone);
3762 } else {
3763 addReply(c,shared.czero);
3764 }
3765 }
3766
3767 static void sremCommand(redisClient *c) {
3768 robj *set;
3769
3770 set = lookupKeyWrite(c->db,c->argv[1]);
3771 if (set == NULL) {
3772 addReply(c,shared.czero);
3773 } else {
3774 if (set->type != REDIS_SET) {
3775 addReply(c,shared.wrongtypeerr);
3776 return;
3777 }
3778 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
3779 server.dirty++;
3780 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3781 addReply(c,shared.cone);
3782 } else {
3783 addReply(c,shared.czero);
3784 }
3785 }
3786 }
3787
3788 static void smoveCommand(redisClient *c) {
3789 robj *srcset, *dstset;
3790
3791 srcset = lookupKeyWrite(c->db,c->argv[1]);
3792 dstset = lookupKeyWrite(c->db,c->argv[2]);
3793
3794 /* If the source key does not exist return 0, if it's of the wrong type
3795 * raise an error */
3796 if (srcset == NULL || srcset->type != REDIS_SET) {
3797 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
3798 return;
3799 }
3800 /* Error if the destination key is not a set as well */
3801 if (dstset && dstset->type != REDIS_SET) {
3802 addReply(c,shared.wrongtypeerr);
3803 return;
3804 }
3805 /* Remove the element from the source set */
3806 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
3807 /* Key not found in the src set! return zero */
3808 addReply(c,shared.czero);
3809 return;
3810 }
3811 server.dirty++;
3812 /* Add the element to the destination set */
3813 if (!dstset) {
3814 dstset = createSetObject();
3815 dictAdd(c->db->dict,c->argv[2],dstset);
3816 incrRefCount(c->argv[2]);
3817 }
3818 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
3819 incrRefCount(c->argv[3]);
3820 addReply(c,shared.cone);
3821 }
3822
3823 static void sismemberCommand(redisClient *c) {
3824 robj *set;
3825
3826 set = lookupKeyRead(c->db,c->argv[1]);
3827 if (set == NULL) {
3828 addReply(c,shared.czero);
3829 } else {
3830 if (set->type != REDIS_SET) {
3831 addReply(c,shared.wrongtypeerr);
3832 return;
3833 }
3834 if (dictFind(set->ptr,c->argv[2]))
3835 addReply(c,shared.cone);
3836 else
3837 addReply(c,shared.czero);
3838 }
3839 }
3840
3841 static void scardCommand(redisClient *c) {
3842 robj *o;
3843 dict *s;
3844
3845 o = lookupKeyRead(c->db,c->argv[1]);
3846 if (o == NULL) {
3847 addReply(c,shared.czero);
3848 return;
3849 } else {
3850 if (o->type != REDIS_SET) {
3851 addReply(c,shared.wrongtypeerr);
3852 } else {
3853 s = o->ptr;
3854 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
3855 dictSize(s)));
3856 }
3857 }
3858 }
3859
3860 static void spopCommand(redisClient *c) {
3861 robj *set;
3862 dictEntry *de;
3863
3864 set = lookupKeyWrite(c->db,c->argv[1]);
3865 if (set == NULL) {
3866 addReply(c,shared.nullbulk);
3867 } else {
3868 if (set->type != REDIS_SET) {
3869 addReply(c,shared.wrongtypeerr);
3870 return;
3871 }
3872 de = dictGetRandomKey(set->ptr);
3873 if (de == NULL) {
3874 addReply(c,shared.nullbulk);
3875 } else {
3876 robj *ele = dictGetEntryKey(de);
3877
3878 addReplyBulkLen(c,ele);
3879 addReply(c,ele);
3880 addReply(c,shared.crlf);
3881 dictDelete(set->ptr,ele);
3882 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
3883 server.dirty++;
3884 }
3885 }
3886 }
3887
3888 static void srandmemberCommand(redisClient *c) {
3889 robj *set;
3890 dictEntry *de;
3891
3892 set = lookupKeyRead(c->db,c->argv[1]);
3893 if (set == NULL) {
3894 addReply(c,shared.nullbulk);
3895 } else {
3896 if (set->type != REDIS_SET) {
3897 addReply(c,shared.wrongtypeerr);
3898 return;
3899 }
3900 de = dictGetRandomKey(set->ptr);
3901 if (de == NULL) {
3902 addReply(c,shared.nullbulk);
3903 } else {
3904 robj *ele = dictGetEntryKey(de);
3905
3906 addReplyBulkLen(c,ele);
3907 addReply(c,ele);
3908 addReply(c,shared.crlf);
3909 }
3910 }
3911 }
3912
3913 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
3914 dict **d1 = (void*) s1, **d2 = (void*) s2;
3915
3916 return dictSize(*d1)-dictSize(*d2);
3917 }
3918
3919 static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
3920 dict **dv = zmalloc(sizeof(dict*)*setsnum);
3921 dictIterator *di;
3922 dictEntry *de;
3923 robj *lenobj = NULL, *dstset = NULL;
3924 int j, cardinality = 0;
3925
3926 for (j = 0; j < setsnum; j++) {
3927 robj *setobj;
3928
3929 setobj = dstkey ?
3930 lookupKeyWrite(c->db,setskeys[j]) :
3931 lookupKeyRead(c->db,setskeys[j]);
3932 if (!setobj) {
3933 zfree(dv);
3934 if (dstkey) {
3935 deleteKey(c->db,dstkey);
3936 addReply(c,shared.ok);
3937 } else {
3938 addReply(c,shared.nullmultibulk);
3939 }
3940 return;
3941 }
3942 if (setobj->type != REDIS_SET) {
3943 zfree(dv);
3944 addReply(c,shared.wrongtypeerr);
3945 return;
3946 }
3947 dv[j] = setobj->ptr;
3948 }
3949 /* Sort sets from the smallest to largest, this will improve our
3950 * algorithm's performace */
3951 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
3952
3953 /* The first thing we should output is the total number of elements...
3954 * since this is a multi-bulk write, but at this stage we don't know
3955 * the intersection set size, so we use a trick, append an empty object
3956 * to the output list and save the pointer to later modify it with the
3957 * right length */
3958 if (!dstkey) {
3959 lenobj = createObject(REDIS_STRING,NULL);
3960 addReply(c,lenobj);
3961 decrRefCount(lenobj);
3962 } else {
3963 /* If we have a target key where to store the resulting set
3964 * create this key with an empty set inside */
3965 dstset = createSetObject();
3966 }
3967
3968 /* Iterate all the elements of the first (smallest) set, and test
3969 * the element against all the other sets, if at least one set does
3970 * not include the element it is discarded */
3971 di = dictGetIterator(dv[0]);
3972
3973 while((de = dictNext(di)) != NULL) {
3974 robj *ele;
3975
3976 for (j = 1; j < setsnum; j++)
3977 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
3978 if (j != setsnum)
3979 continue; /* at least one set does not contain the member */
3980 ele = dictGetEntryKey(de);
3981 if (!dstkey) {
3982 addReplyBulkLen(c,ele);
3983 addReply(c,ele);
3984 addReply(c,shared.crlf);
3985 cardinality++;
3986 } else {
3987 dictAdd(dstset->ptr,ele,NULL);
3988 incrRefCount(ele);
3989 }
3990 }
3991 dictReleaseIterator(di);
3992
3993 if (dstkey) {
3994 /* Store the resulting set into the target */
3995 deleteKey(c->db,dstkey);
3996 dictAdd(c->db->dict,dstkey,dstset);
3997 incrRefCount(dstkey);
3998 }
3999
4000 if (!dstkey) {
4001 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
4002 } else {
4003 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
4004 dictSize((dict*)dstset->ptr)));
4005 server.dirty++;
4006 }
4007 zfree(dv);
4008 }
4009
4010 static void sinterCommand(redisClient *c) {
4011 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4012 }
4013
4014 static void sinterstoreCommand(redisClient *c) {
4015 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4016 }
4017
4018 #define REDIS_OP_UNION 0
4019 #define REDIS_OP_DIFF 1
4020
4021 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4022 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4023 dictIterator *di;
4024 dictEntry *de;
4025 robj *dstset = NULL;
4026 int j, cardinality = 0;
4027
4028 for (j = 0; j < setsnum; j++) {
4029 robj *setobj;
4030
4031 setobj = dstkey ?
4032 lookupKeyWrite(c->db,setskeys[j]) :
4033 lookupKeyRead(c->db,setskeys[j]);
4034 if (!setobj) {
4035 dv[j] = NULL;
4036 continue;
4037 }
4038 if (setobj->type != REDIS_SET) {
4039 zfree(dv);
4040 addReply(c,shared.wrongtypeerr);
4041 return;
4042 }
4043 dv[j] = setobj->ptr;
4044 }
4045
4046 /* We need a temp set object to store our union. If the dstkey
4047 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4048 * this set object will be the resulting object to set into the target key*/
4049 dstset = createSetObject();
4050
4051 /* Iterate all the elements of all the sets, add every element a single
4052 * time to the result set */
4053 for (j = 0; j < setsnum; j++) {
4054 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4055 if (!dv[j]) continue; /* non existing keys are like empty sets */
4056
4057 di = dictGetIterator(dv[j]);
4058
4059 while((de = dictNext(di)) != NULL) {
4060 robj *ele;
4061
4062 /* dictAdd will not add the same element multiple times */
4063 ele = dictGetEntryKey(de);
4064 if (op == REDIS_OP_UNION || j == 0) {
4065 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4066 incrRefCount(ele);
4067 cardinality++;
4068 }
4069 } else if (op == REDIS_OP_DIFF) {
4070 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4071 cardinality--;
4072 }
4073 }
4074 }
4075 dictReleaseIterator(di);
4076
4077 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4078 }
4079
4080 /* Output the content of the resulting set, if not in STORE mode */
4081 if (!dstkey) {
4082 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4083 di = dictGetIterator(dstset->ptr);
4084 while((de = dictNext(di)) != NULL) {
4085 robj *ele;
4086
4087 ele = dictGetEntryKey(de);
4088 addReplyBulkLen(c,ele);
4089 addReply(c,ele);
4090 addReply(c,shared.crlf);
4091 }
4092 dictReleaseIterator(di);
4093 } else {
4094 /* If we have a target key where to store the resulting set
4095 * create this key with the result set inside */
4096 deleteKey(c->db,dstkey);
4097 dictAdd(c->db->dict,dstkey,dstset);
4098 incrRefCount(dstkey);
4099 }
4100
4101 /* Cleanup */
4102 if (!dstkey) {
4103 decrRefCount(dstset);
4104 } else {
4105 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
4106 dictSize((dict*)dstset->ptr)));
4107 server.dirty++;
4108 }
4109 zfree(dv);
4110 }
4111
4112 static void sunionCommand(redisClient *c) {
4113 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4114 }
4115
4116 static void sunionstoreCommand(redisClient *c) {
4117 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4118 }
4119
4120 static void sdiffCommand(redisClient *c) {
4121 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4122 }
4123
4124 static void sdiffstoreCommand(redisClient *c) {
4125 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4126 }
4127
4128 /* ==================================== ZSets =============================== */
4129
4130 /* ZSETs are ordered sets using two data structures to hold the same elements
4131 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4132 * data structure.
4133 *
4134 * The elements are added to an hash table mapping Redis objects to scores.
4135 * At the same time the elements are added to a skip list mapping scores
4136 * to Redis objects (so objects are sorted by scores in this "view"). */
4137
4138 /* This skiplist implementation is almost a C translation of the original
4139 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4140 * Alternative to Balanced Trees", modified in three ways:
4141 * a) this implementation allows for repeated values.
4142 * b) the comparison is not just by key (our 'score') but by satellite data.
4143 * c) there is a back pointer, so it's a doubly linked list with the back
4144 * pointers being only at "level 1". This allows to traverse the list
4145 * from tail to head, useful for ZREVRANGE. */
4146
4147 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4148 zskiplistNode *zn = zmalloc(sizeof(*zn));
4149
4150 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4151 zn->score = score;
4152 zn->obj = obj;
4153 return zn;
4154 }
4155
4156 static zskiplist *zslCreate(void) {
4157 int j;
4158 zskiplist *zsl;
4159
4160 zsl = zmalloc(sizeof(*zsl));
4161 zsl->level = 1;
4162 zsl->length = 0;
4163 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4164 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4165 zsl->header->forward[j] = NULL;
4166 zsl->header->backward = NULL;
4167 zsl->tail = NULL;
4168 return zsl;
4169 }
4170
4171 static void zslFreeNode(zskiplistNode *node) {
4172 decrRefCount(node->obj);
4173 zfree(node->forward);
4174 zfree(node);
4175 }
4176
4177 static void zslFree(zskiplist *zsl) {
4178 zskiplistNode *node = zsl->header->forward[0], *next;
4179
4180 zfree(zsl->header->forward);
4181 zfree(zsl->header);
4182 while(node) {
4183 next = node->forward[0];
4184 zslFreeNode(node);
4185 node = next;
4186 }
4187 zfree(zsl);
4188 }
4189
4190 static int zslRandomLevel(void) {
4191 int level = 1;
4192 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4193 level += 1;
4194 return level;
4195 }
4196
4197 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4198 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4199 int i, level;
4200
4201 x = zsl->header;
4202 for (i = zsl->level-1; i >= 0; i--) {
4203 while (x->forward[i] &&
4204 (x->forward[i]->score < score ||
4205 (x->forward[i]->score == score &&
4206 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4207 x = x->forward[i];
4208 update[i] = x;
4209 }
4210 /* we assume the key is not already inside, since we allow duplicated
4211 * scores, and the re-insertion of score and redis object should never
4212 * happpen since the caller of zslInsert() should test in the hash table
4213 * if the element is already inside or not. */
4214 level = zslRandomLevel();
4215 if (level > zsl->level) {
4216 for (i = zsl->level; i < level; i++)
4217 update[i] = zsl->header;
4218 zsl->level = level;
4219 }
4220 x = zslCreateNode(level,score,obj);
4221 for (i = 0; i < level; i++) {
4222 x->forward[i] = update[i]->forward[i];
4223 update[i]->forward[i] = x;
4224 }
4225 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4226 if (x->forward[0])
4227 x->forward[0]->backward = x;
4228 else
4229 zsl->tail = x;
4230 zsl->length++;
4231 }
4232
4233 /* Delete an element with matching score/object from the skiplist. */
4234 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4235 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4236 int i;
4237
4238 x = zsl->header;
4239 for (i = zsl->level-1; i >= 0; i--) {
4240 while (x->forward[i] &&
4241 (x->forward[i]->score < score ||
4242 (x->forward[i]->score == score &&
4243 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4244 x = x->forward[i];
4245 update[i] = x;
4246 }
4247 /* We may have multiple elements with the same score, what we need
4248 * is to find the element with both the right score and object. */
4249 x = x->forward[0];
4250 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4251 for (i = 0; i < zsl->level; i++) {
4252 if (update[i]->forward[i] != x) break;
4253 update[i]->forward[i] = x->forward[i];
4254 }
4255 if (x->forward[0]) {
4256 x->forward[0]->backward = (x->backward == zsl->header) ?
4257 NULL : x->backward;
4258 } else {
4259 zsl->tail = x->backward;
4260 }
4261 zslFreeNode(x);
4262 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4263 zsl->level--;
4264 zsl->length--;
4265 return 1;
4266 } else {
4267 return 0; /* not found */
4268 }
4269 return 0; /* not found */
4270 }
4271
4272 /* Delete all the elements with score between min and max from the skiplist.
4273 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4274 * Note that this function takes the reference to the hash table view of the
4275 * sorted set, in order to remove the elements from the hash table too. */
4276 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4277 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4278 unsigned long removed = 0;
4279 int i;
4280
4281 x = zsl->header;
4282 for (i = zsl->level-1; i >= 0; i--) {
4283 while (x->forward[i] && x->forward[i]->score < min)
4284 x = x->forward[i];
4285 update[i] = x;
4286 }
4287 /* We may have multiple elements with the same score, what we need
4288 * is to find the element with both the right score and object. */
4289 x = x->forward[0];
4290 while (x && x->score <= max) {
4291 zskiplistNode *next;
4292
4293 for (i = 0; i < zsl->level; i++) {
4294 if (update[i]->forward[i] != x) break;
4295 update[i]->forward[i] = x->forward[i];
4296 }
4297 if (x->forward[0]) {
4298 x->forward[0]->backward = (x->backward == zsl->header) ?
4299 NULL : x->backward;
4300 } else {
4301 zsl->tail = x->backward;
4302 }
4303 next = x->forward[0];
4304 dictDelete(dict,x->obj);
4305 zslFreeNode(x);
4306 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4307 zsl->level--;
4308 zsl->length--;
4309 removed++;
4310 x = next;
4311 }
4312 return removed; /* not found */
4313 }
4314
4315 /* Find the first node having a score equal or greater than the specified one.
4316 * Returns NULL if there is no match. */
4317 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4318 zskiplistNode *x;
4319 int i;
4320
4321 x = zsl->header;
4322 for (i = zsl->level-1; i >= 0; i--) {
4323 while (x->forward[i] && x->forward[i]->score < score)
4324 x = x->forward[i];
4325 }
4326 /* We may have multiple elements with the same score, what we need
4327 * is to find the element with both the right score and object. */
4328 return x->forward[0];
4329 }
4330
4331 /* The actual Z-commands implementations */
4332
4333 /* This generic command implements both ZADD and ZINCRBY.
4334 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4335 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4336 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4337 robj *zsetobj;
4338 zset *zs;
4339 double *score;
4340
4341 zsetobj = lookupKeyWrite(c->db,key);
4342 if (zsetobj == NULL) {
4343 zsetobj = createZsetObject();
4344 dictAdd(c->db->dict,key,zsetobj);
4345 incrRefCount(key);
4346 } else {
4347 if (zsetobj->type != REDIS_ZSET) {
4348 addReply(c,shared.wrongtypeerr);
4349 return;
4350 }
4351 }
4352 zs = zsetobj->ptr;
4353
4354 /* Ok now since we implement both ZADD and ZINCRBY here the code
4355 * needs to handle the two different conditions. It's all about setting
4356 * '*score', that is, the new score to set, to the right value. */
4357 score = zmalloc(sizeof(double));
4358 if (doincrement) {
4359 dictEntry *de;
4360
4361 /* Read the old score. If the element was not present starts from 0 */
4362 de = dictFind(zs->dict,ele);
4363 if (de) {
4364 double *oldscore = dictGetEntryVal(de);
4365 *score = *oldscore + scoreval;
4366 } else {
4367 *score = scoreval;
4368 }
4369 } else {
4370 *score = scoreval;
4371 }
4372
4373 /* What follows is a simple remove and re-insert operation that is common
4374 * to both ZADD and ZINCRBY... */
4375 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4376 /* case 1: New element */
4377 incrRefCount(ele); /* added to hash */
4378 zslInsert(zs->zsl,*score,ele);
4379 incrRefCount(ele); /* added to skiplist */
4380 server.dirty++;
4381 if (doincrement)
4382 addReplyDouble(c,*score);
4383 else
4384 addReply(c,shared.cone);
4385 } else {
4386 dictEntry *de;
4387 double *oldscore;
4388
4389 /* case 2: Score update operation */
4390 de = dictFind(zs->dict,ele);
4391 assert(de != NULL);
4392 oldscore = dictGetEntryVal(de);
4393 if (*score != *oldscore) {
4394 int deleted;
4395
4396 /* Remove and insert the element in the skip list with new score */
4397 deleted = zslDelete(zs->zsl,*oldscore,ele);
4398 assert(deleted != 0);
4399 zslInsert(zs->zsl,*score,ele);
4400 incrRefCount(ele);
4401 /* Update the score in the hash table */
4402 dictReplace(zs->dict,ele,score);
4403 server.dirty++;
4404 } else {
4405 zfree(score);
4406 }
4407 if (doincrement)
4408 addReplyDouble(c,*score);
4409 else
4410 addReply(c,shared.czero);
4411 }
4412 }
4413
4414 static void zaddCommand(redisClient *c) {
4415 double scoreval;
4416
4417 scoreval = strtod(c->argv[2]->ptr,NULL);
4418 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4419 }
4420
4421 static void zincrbyCommand(redisClient *c) {
4422 double scoreval;
4423
4424 scoreval = strtod(c->argv[2]->ptr,NULL);
4425 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4426 }
4427
4428 static void zremCommand(redisClient *c) {
4429 robj *zsetobj;
4430 zset *zs;
4431
4432 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4433 if (zsetobj == NULL) {
4434 addReply(c,shared.czero);
4435 } else {
4436 dictEntry *de;
4437 double *oldscore;
4438 int deleted;
4439
4440 if (zsetobj->type != REDIS_ZSET) {
4441 addReply(c,shared.wrongtypeerr);
4442 return;
4443 }
4444 zs = zsetobj->ptr;
4445 de = dictFind(zs->dict,c->argv[2]);
4446 if (de == NULL) {
4447 addReply(c,shared.czero);
4448 return;
4449 }
4450 /* Delete from the skiplist */
4451 oldscore = dictGetEntryVal(de);
4452 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4453 assert(deleted != 0);
4454
4455 /* Delete from the hash table */
4456 dictDelete(zs->dict,c->argv[2]);
4457 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4458 server.dirty++;
4459 addReply(c,shared.cone);
4460 }
4461 }
4462
4463 static void zremrangebyscoreCommand(redisClient *c) {
4464 double min = strtod(c->argv[2]->ptr,NULL);
4465 double max = strtod(c->argv[3]->ptr,NULL);
4466 robj *zsetobj;
4467 zset *zs;
4468
4469 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4470 if (zsetobj == NULL) {
4471 addReply(c,shared.czero);
4472 } else {
4473 long deleted;
4474
4475 if (zsetobj->type != REDIS_ZSET) {
4476 addReply(c,shared.wrongtypeerr);
4477 return;
4478 }
4479 zs = zsetobj->ptr;
4480 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4481 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4482 server.dirty += deleted;
4483 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4484 }
4485 }
4486
4487 static void zrangeGenericCommand(redisClient *c, int reverse) {
4488 robj *o;
4489 int start = atoi(c->argv[2]->ptr);
4490 int end = atoi(c->argv[3]->ptr);
4491
4492 o = lookupKeyRead(c->db,c->argv[1]);
4493 if (o == NULL) {
4494 addReply(c,shared.nullmultibulk);
4495 } else {
4496 if (o->type != REDIS_ZSET) {
4497 addReply(c,shared.wrongtypeerr);
4498 } else {
4499 zset *zsetobj = o->ptr;
4500 zskiplist *zsl = zsetobj->zsl;
4501 zskiplistNode *ln;
4502
4503 int llen = zsl->length;
4504 int rangelen, j;
4505 robj *ele;
4506
4507 /* convert negative indexes */
4508 if (start < 0) start = llen+start;
4509 if (end < 0) end = llen+end;
4510 if (start < 0) start = 0;
4511 if (end < 0) end = 0;
4512
4513 /* indexes sanity checks */
4514 if (start > end || start >= llen) {
4515 /* Out of range start or start > end result in empty list */
4516 addReply(c,shared.emptymultibulk);
4517 return;
4518 }
4519 if (end >= llen) end = llen-1;
4520 rangelen = (end-start)+1;
4521
4522 /* Return the result in form of a multi-bulk reply */
4523 if (reverse) {
4524 ln = zsl->tail;
4525 while (start--)
4526 ln = ln->backward;
4527 } else {
4528 ln = zsl->header->forward[0];
4529 while (start--)
4530 ln = ln->forward[0];
4531 }
4532
4533 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
4534 for (j = 0; j < rangelen; j++) {
4535 ele = ln->obj;
4536 addReplyBulkLen(c,ele);
4537 addReply(c,ele);
4538 addReply(c,shared.crlf);
4539 ln = reverse ? ln->backward : ln->forward[0];
4540 }
4541 }
4542 }
4543 }
4544
4545 static void zrangeCommand(redisClient *c) {
4546 zrangeGenericCommand(c,0);
4547 }
4548
4549 static void zrevrangeCommand(redisClient *c) {
4550 zrangeGenericCommand(c,1);
4551 }
4552
4553 static void zrangebyscoreCommand(redisClient *c) {
4554 robj *o;
4555 double min = strtod(c->argv[2]->ptr,NULL);
4556 double max = strtod(c->argv[3]->ptr,NULL);
4557
4558 o = lookupKeyRead(c->db,c->argv[1]);
4559 if (o == NULL) {
4560 addReply(c,shared.nullmultibulk);
4561 } else {
4562 if (o->type != REDIS_ZSET) {
4563 addReply(c,shared.wrongtypeerr);
4564 } else {
4565 zset *zsetobj = o->ptr;
4566 zskiplist *zsl = zsetobj->zsl;
4567 zskiplistNode *ln;
4568 robj *ele, *lenobj;
4569 unsigned int rangelen = 0;
4570
4571 /* Get the first node with the score >= min */
4572 ln = zslFirstWithScore(zsl,min);
4573 if (ln == NULL) {
4574 /* No element matching the speciifed interval */
4575 addReply(c,shared.emptymultibulk);
4576 return;
4577 }
4578
4579 /* We don't know in advance how many matching elements there
4580 * are in the list, so we push this object that will represent
4581 * the multi-bulk length in the output buffer, and will "fix"
4582 * it later */
4583 lenobj = createObject(REDIS_STRING,NULL);
4584 addReply(c,lenobj);
4585
4586 while(ln && ln->score <= max) {
4587 ele = ln->obj;
4588 addReplyBulkLen(c,ele);
4589 addReply(c,ele);
4590 addReply(c,shared.crlf);
4591 ln = ln->forward[0];
4592 rangelen++;
4593 }
4594 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4595 }
4596 }
4597 }
4598
4599 static void zcardCommand(redisClient *c) {
4600 robj *o;
4601 zset *zs;
4602
4603 o = lookupKeyRead(c->db,c->argv[1]);
4604 if (o == NULL) {
4605 addReply(c,shared.czero);
4606 return;
4607 } else {
4608 if (o->type != REDIS_ZSET) {
4609 addReply(c,shared.wrongtypeerr);
4610 } else {
4611 zs = o->ptr;
4612 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",zs->zsl->length));
4613 }
4614 }
4615 }
4616
4617 static void zscoreCommand(redisClient *c) {
4618 robj *o;
4619 zset *zs;
4620
4621 o = lookupKeyRead(c->db,c->argv[1]);
4622 if (o == NULL) {
4623 addReply(c,shared.nullbulk);
4624 return;
4625 } else {
4626 if (o->type != REDIS_ZSET) {
4627 addReply(c,shared.wrongtypeerr);
4628 } else {
4629 dictEntry *de;
4630
4631 zs = o->ptr;
4632 de = dictFind(zs->dict,c->argv[2]);
4633 if (!de) {
4634 addReply(c,shared.nullbulk);
4635 } else {
4636 double *score = dictGetEntryVal(de);
4637
4638 addReplyDouble(c,*score);
4639 }
4640 }
4641 }
4642 }
4643
4644 /* ========================= Non type-specific commands ==================== */
4645
4646 static void flushdbCommand(redisClient *c) {
4647 server.dirty += dictSize(c->db->dict);
4648 dictEmpty(c->db->dict);
4649 dictEmpty(c->db->expires);
4650 addReply(c,shared.ok);
4651 }
4652
4653 static void flushallCommand(redisClient *c) {
4654 server.dirty += emptyDb();
4655 addReply(c,shared.ok);
4656 rdbSave(server.dbfilename);
4657 server.dirty++;
4658 }
4659
4660 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4661 redisSortOperation *so = zmalloc(sizeof(*so));
4662 so->type = type;
4663 so->pattern = pattern;
4664 return so;
4665 }
4666
4667 /* Return the value associated to the key with a name obtained
4668 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4669 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4670 char *p;
4671 sds spat, ssub;
4672 robj keyobj;
4673 int prefixlen, sublen, postfixlen;
4674 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4675 struct {
4676 long len;
4677 long free;
4678 char buf[REDIS_SORTKEY_MAX+1];
4679 } keyname;
4680
4681 /* If the pattern is "#" return the substitution object itself in order
4682 * to implement the "SORT ... GET #" feature. */
4683 spat = pattern->ptr;
4684 if (spat[0] == '#' && spat[1] == '\0') {
4685 return subst;
4686 }
4687
4688 /* The substitution object may be specially encoded. If so we create
4689 * a decoded object on the fly. Otherwise getDecodedObject will just
4690 * increment the ref count, that we'll decrement later. */
4691 subst = getDecodedObject(subst);
4692
4693 ssub = subst->ptr;
4694 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4695 p = strchr(spat,'*');
4696 if (!p) {
4697 decrRefCount(subst);
4698 return NULL;
4699 }
4700
4701 prefixlen = p-spat;
4702 sublen = sdslen(ssub);
4703 postfixlen = sdslen(spat)-(prefixlen+1);
4704 memcpy(keyname.buf,spat,prefixlen);
4705 memcpy(keyname.buf+prefixlen,ssub,sublen);
4706 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4707 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4708 keyname.len = prefixlen+sublen+postfixlen;
4709
4710 keyobj.refcount = 1;
4711 keyobj.type = REDIS_STRING;
4712 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
4713
4714 decrRefCount(subst);
4715
4716 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4717 return lookupKeyRead(db,&keyobj);
4718 }
4719
4720 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4721 * the additional parameter is not standard but a BSD-specific we have to
4722 * pass sorting parameters via the global 'server' structure */
4723 static int sortCompare(const void *s1, const void *s2) {
4724 const redisSortObject *so1 = s1, *so2 = s2;
4725 int cmp;
4726
4727 if (!server.sort_alpha) {
4728 /* Numeric sorting. Here it's trivial as we precomputed scores */
4729 if (so1->u.score > so2->u.score) {
4730 cmp = 1;
4731 } else if (so1->u.score < so2->u.score) {
4732 cmp = -1;
4733 } else {
4734 cmp = 0;
4735 }
4736 } else {
4737 /* Alphanumeric sorting */
4738 if (server.sort_bypattern) {
4739 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4740 /* At least one compare object is NULL */
4741 if (so1->u.cmpobj == so2->u.cmpobj)
4742 cmp = 0;
4743 else if (so1->u.cmpobj == NULL)
4744 cmp = -1;
4745 else
4746 cmp = 1;
4747 } else {
4748 /* We have both the objects, use strcoll */
4749 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
4750 }
4751 } else {
4752 /* Compare elements directly */
4753 robj *dec1, *dec2;
4754
4755 dec1 = getDecodedObject(so1->obj);
4756 dec2 = getDecodedObject(so2->obj);
4757 cmp = strcoll(dec1->ptr,dec2->ptr);
4758 decrRefCount(dec1);
4759 decrRefCount(dec2);
4760 }
4761 }
4762 return server.sort_desc ? -cmp : cmp;
4763 }
4764
4765 /* The SORT command is the most complex command in Redis. Warning: this code
4766 * is optimized for speed and a bit less for readability */
4767 static void sortCommand(redisClient *c) {
4768 list *operations;
4769 int outputlen = 0;
4770 int desc = 0, alpha = 0;
4771 int limit_start = 0, limit_count = -1, start, end;
4772 int j, dontsort = 0, vectorlen;
4773 int getop = 0; /* GET operation counter */
4774 robj *sortval, *sortby = NULL, *storekey = NULL;
4775 redisSortObject *vector; /* Resulting vector to sort */
4776
4777 /* Lookup the key to sort. It must be of the right types */
4778 sortval = lookupKeyRead(c->db,c->argv[1]);
4779 if (sortval == NULL) {
4780 addReply(c,shared.nokeyerr);
4781 return;
4782 }
4783 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
4784 addReply(c,shared.wrongtypeerr);
4785 return;
4786 }
4787
4788 /* Create a list of operations to perform for every sorted element.
4789 * Operations can be GET/DEL/INCR/DECR */
4790 operations = listCreate();
4791 listSetFreeMethod(operations,zfree);
4792 j = 2;
4793
4794 /* Now we need to protect sortval incrementing its count, in the future
4795 * SORT may have options able to overwrite/delete keys during the sorting
4796 * and the sorted key itself may get destroied */
4797 incrRefCount(sortval);
4798
4799 /* The SORT command has an SQL-alike syntax, parse it */
4800 while(j < c->argc) {
4801 int leftargs = c->argc-j-1;
4802 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
4803 desc = 0;
4804 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
4805 desc = 1;
4806 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
4807 alpha = 1;
4808 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
4809 limit_start = atoi(c->argv[j+1]->ptr);
4810 limit_count = atoi(c->argv[j+2]->ptr);
4811 j+=2;
4812 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
4813 storekey = c->argv[j+1];
4814 j++;
4815 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
4816 sortby = c->argv[j+1];
4817 /* If the BY pattern does not contain '*', i.e. it is constant,
4818 * we don't need to sort nor to lookup the weight keys. */
4819 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
4820 j++;
4821 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
4822 listAddNodeTail(operations,createSortOperation(
4823 REDIS_SORT_GET,c->argv[j+1]));
4824 getop++;
4825 j++;
4826 } else {
4827 decrRefCount(sortval);
4828 listRelease(operations);
4829 addReply(c,shared.syntaxerr);
4830 return;
4831 }
4832 j++;
4833 }
4834
4835 /* Load the sorting vector with all the objects to sort */
4836 vectorlen = (sortval->type == REDIS_LIST) ?
4837 listLength((list*)sortval->ptr) :
4838 dictSize((dict*)sortval->ptr);
4839 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
4840 j = 0;
4841 if (sortval->type == REDIS_LIST) {
4842 list *list = sortval->ptr;
4843 listNode *ln;
4844
4845 listRewind(list);
4846 while((ln = listYield(list))) {
4847 robj *ele = ln->value;
4848 vector[j].obj = ele;
4849 vector[j].u.score = 0;
4850 vector[j].u.cmpobj = NULL;
4851 j++;
4852 }
4853 } else {
4854 dict *set = sortval->ptr;
4855 dictIterator *di;
4856 dictEntry *setele;
4857
4858 di = dictGetIterator(set);
4859 while((setele = dictNext(di)) != NULL) {
4860 vector[j].obj = dictGetEntryKey(setele);
4861 vector[j].u.score = 0;
4862 vector[j].u.cmpobj = NULL;
4863 j++;
4864 }
4865 dictReleaseIterator(di);
4866 }
4867 assert(j == vectorlen);
4868
4869 /* Now it's time to load the right scores in the sorting vector */
4870 if (dontsort == 0) {
4871 for (j = 0; j < vectorlen; j++) {
4872 if (sortby) {
4873 robj *byval;
4874
4875 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
4876 if (!byval || byval->type != REDIS_STRING) continue;
4877 if (alpha) {
4878 vector[j].u.cmpobj = getDecodedObject(byval);
4879 } else {
4880 if (byval->encoding == REDIS_ENCODING_RAW) {
4881 vector[j].u.score = strtod(byval->ptr,NULL);
4882 } else {
4883 /* Don't need to decode the object if it's
4884 * integer-encoded (the only encoding supported) so
4885 * far. We can just cast it */
4886 if (byval->encoding == REDIS_ENCODING_INT) {
4887 vector[j].u.score = (long)byval->ptr;
4888 } else
4889 assert(1 != 1);
4890 }
4891 }
4892 } else {
4893 if (!alpha) {
4894 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
4895 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
4896 else {
4897 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
4898 vector[j].u.score = (long) vector[j].obj->ptr;
4899 else
4900 assert(1 != 1);
4901 }
4902 }
4903 }
4904 }
4905 }
4906
4907 /* We are ready to sort the vector... perform a bit of sanity check
4908 * on the LIMIT option too. We'll use a partial version of quicksort. */
4909 start = (limit_start < 0) ? 0 : limit_start;
4910 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
4911 if (start >= vectorlen) {
4912 start = vectorlen-1;
4913 end = vectorlen-2;
4914 }
4915 if (end >= vectorlen) end = vectorlen-1;
4916
4917 if (dontsort == 0) {
4918 server.sort_desc = desc;
4919 server.sort_alpha = alpha;
4920 server.sort_bypattern = sortby ? 1 : 0;
4921 if (sortby && (start != 0 || end != vectorlen-1))
4922 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
4923 else
4924 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
4925 }
4926
4927 /* Send command output to the output buffer, performing the specified
4928 * GET/DEL/INCR/DECR operations if any. */
4929 outputlen = getop ? getop*(end-start+1) : end-start+1;
4930 if (storekey == NULL) {
4931 /* STORE option not specified, sent the sorting result to client */
4932 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
4933 for (j = start; j <= end; j++) {
4934 listNode *ln;
4935 if (!getop) {
4936 addReplyBulkLen(c,vector[j].obj);
4937 addReply(c,vector[j].obj);
4938 addReply(c,shared.crlf);
4939 }
4940 listRewind(operations);
4941 while((ln = listYield(operations))) {
4942 redisSortOperation *sop = ln->value;
4943 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4944 vector[j].obj);
4945
4946 if (sop->type == REDIS_SORT_GET) {
4947 if (!val || val->type != REDIS_STRING) {
4948 addReply(c,shared.nullbulk);
4949 } else {
4950 addReplyBulkLen(c,val);
4951 addReply(c,val);
4952 addReply(c,shared.crlf);
4953 }
4954 } else {
4955 assert(sop->type == REDIS_SORT_GET); /* always fails */
4956 }
4957 }
4958 }
4959 } else {
4960 robj *listObject = createListObject();
4961 list *listPtr = (list*) listObject->ptr;
4962
4963 /* STORE option specified, set the sorting result as a List object */
4964 for (j = start; j <= end; j++) {
4965 listNode *ln;
4966 if (!getop) {
4967 listAddNodeTail(listPtr,vector[j].obj);
4968 incrRefCount(vector[j].obj);
4969 }
4970 listRewind(operations);
4971 while((ln = listYield(operations))) {
4972 redisSortOperation *sop = ln->value;
4973 robj *val = lookupKeyByPattern(c->db,sop->pattern,
4974 vector[j].obj);
4975
4976 if (sop->type == REDIS_SORT_GET) {
4977 if (!val || val->type != REDIS_STRING) {
4978 listAddNodeTail(listPtr,createStringObject("",0));
4979 } else {
4980 listAddNodeTail(listPtr,val);
4981 incrRefCount(val);
4982 }
4983 } else {
4984 assert(sop->type == REDIS_SORT_GET); /* always fails */
4985 }
4986 }
4987 }
4988 if (dictReplace(c->db->dict,storekey,listObject)) {
4989 incrRefCount(storekey);
4990 }
4991 /* Note: we add 1 because the DB is dirty anyway since even if the
4992 * SORT result is empty a new key is set and maybe the old content
4993 * replaced. */
4994 server.dirty += 1+outputlen;
4995 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
4996 }
4997
4998 /* Cleanup */
4999 decrRefCount(sortval);
5000 listRelease(operations);
5001 for (j = 0; j < vectorlen; j++) {
5002 if (sortby && alpha && vector[j].u.cmpobj)
5003 decrRefCount(vector[j].u.cmpobj);
5004 }
5005 zfree(vector);
5006 }
5007
5008 static void infoCommand(redisClient *c) {
5009 sds info;
5010 time_t uptime = time(NULL)-server.stat_starttime;
5011 int j;
5012
5013 info = sdscatprintf(sdsempty(),
5014 "redis_version:%s\r\n"
5015 "arch_bits:%s\r\n"
5016 "uptime_in_seconds:%d\r\n"
5017 "uptime_in_days:%d\r\n"
5018 "connected_clients:%d\r\n"
5019 "connected_slaves:%d\r\n"
5020 "used_memory:%zu\r\n"
5021 "changes_since_last_save:%lld\r\n"
5022 "bgsave_in_progress:%d\r\n"
5023 "last_save_time:%d\r\n"
5024 "total_connections_received:%lld\r\n"
5025 "total_commands_processed:%lld\r\n"
5026 "role:%s\r\n"
5027 ,REDIS_VERSION,
5028 (sizeof(long) == 8) ? "64" : "32",
5029 uptime,
5030 uptime/(3600*24),
5031 listLength(server.clients)-listLength(server.slaves),
5032 listLength(server.slaves),
5033 server.usedmemory,
5034 server.dirty,
5035 server.bgsavechildpid != -1,
5036 server.lastsave,
5037 server.stat_numconnections,
5038 server.stat_numcommands,
5039 server.masterhost == NULL ? "master" : "slave"
5040 );
5041 if (server.masterhost) {
5042 info = sdscatprintf(info,
5043 "master_host:%s\r\n"
5044 "master_port:%d\r\n"
5045 "master_link_status:%s\r\n"
5046 "master_last_io_seconds_ago:%d\r\n"
5047 ,server.masterhost,
5048 server.masterport,
5049 (server.replstate == REDIS_REPL_CONNECTED) ?
5050 "up" : "down",
5051 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5052 );
5053 }
5054 for (j = 0; j < server.dbnum; j++) {
5055 long long keys, vkeys;
5056
5057 keys = dictSize(server.db[j].dict);
5058 vkeys = dictSize(server.db[j].expires);
5059 if (keys || vkeys) {
5060 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5061 j, keys, vkeys);
5062 }
5063 }
5064 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
5065 addReplySds(c,info);
5066 addReply(c,shared.crlf);
5067 }
5068
5069 static void monitorCommand(redisClient *c) {
5070 /* ignore MONITOR if aleady slave or in monitor mode */
5071 if (c->flags & REDIS_SLAVE) return;
5072
5073 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5074 c->slaveseldb = 0;
5075 listAddNodeTail(server.monitors,c);
5076 addReply(c,shared.ok);
5077 }
5078
5079 /* ================================= Expire ================================= */
5080 static int removeExpire(redisDb *db, robj *key) {
5081 if (dictDelete(db->expires,key) == DICT_OK) {
5082 return 1;
5083 } else {
5084 return 0;
5085 }
5086 }
5087
5088 static int setExpire(redisDb *db, robj *key, time_t when) {
5089 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5090 return 0;
5091 } else {
5092 incrRefCount(key);
5093 return 1;
5094 }
5095 }
5096
5097 /* Return the expire time of the specified key, or -1 if no expire
5098 * is associated with this key (i.e. the key is non volatile) */
5099 static time_t getExpire(redisDb *db, robj *key) {
5100 dictEntry *de;
5101
5102 /* No expire? return ASAP */
5103 if (dictSize(db->expires) == 0 ||
5104 (de = dictFind(db->expires,key)) == NULL) return -1;
5105
5106 return (time_t) dictGetEntryVal(de);
5107 }
5108
5109 static int expireIfNeeded(redisDb *db, robj *key) {
5110 time_t when;
5111 dictEntry *de;
5112
5113 /* No expire? return ASAP */
5114 if (dictSize(db->expires) == 0 ||
5115 (de = dictFind(db->expires,key)) == NULL) return 0;
5116
5117 /* Lookup the expire */
5118 when = (time_t) dictGetEntryVal(de);
5119 if (time(NULL) <= when) return 0;
5120
5121 /* Delete the key */
5122 dictDelete(db->expires,key);
5123 return dictDelete(db->dict,key) == DICT_OK;
5124 }
5125
5126 static int deleteIfVolatile(redisDb *db, robj *key) {
5127 dictEntry *de;
5128
5129 /* No expire? return ASAP */
5130 if (dictSize(db->expires) == 0 ||
5131 (de = dictFind(db->expires,key)) == NULL) return 0;
5132
5133 /* Delete the key */
5134 server.dirty++;
5135 dictDelete(db->expires,key);
5136 return dictDelete(db->dict,key) == DICT_OK;
5137 }
5138
5139 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5140 dictEntry *de;
5141
5142 de = dictFind(c->db->dict,key);
5143 if (de == NULL) {
5144 addReply(c,shared.czero);
5145 return;
5146 }
5147 if (seconds < 0) {
5148 if (deleteKey(c->db,key)) server.dirty++;
5149 addReply(c, shared.cone);
5150 return;
5151 } else {
5152 time_t when = time(NULL)+seconds;
5153 if (setExpire(c->db,key,when)) {
5154 addReply(c,shared.cone);
5155 server.dirty++;
5156 } else {
5157 addReply(c,shared.czero);
5158 }
5159 return;
5160 }
5161 }
5162
5163 static void expireCommand(redisClient *c) {
5164 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5165 }
5166
5167 static void expireatCommand(redisClient *c) {
5168 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5169 }
5170
5171 static void ttlCommand(redisClient *c) {
5172 time_t expire;
5173 int ttl = -1;
5174
5175 expire = getExpire(c->db,c->argv[1]);
5176 if (expire != -1) {
5177 ttl = (int) (expire-time(NULL));
5178 if (ttl < 0) ttl = -1;
5179 }
5180 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5181 }
5182
5183 /* =============================== Replication ============================= */
5184
5185 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5186 ssize_t nwritten, ret = size;
5187 time_t start = time(NULL);
5188
5189 timeout++;
5190 while(size) {
5191 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5192 nwritten = write(fd,ptr,size);
5193 if (nwritten == -1) return -1;
5194 ptr += nwritten;
5195 size -= nwritten;
5196 }
5197 if ((time(NULL)-start) > timeout) {
5198 errno = ETIMEDOUT;
5199 return -1;
5200 }
5201 }
5202 return ret;
5203 }
5204
5205 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5206 ssize_t nread, totread = 0;
5207 time_t start = time(NULL);
5208
5209 timeout++;
5210 while(size) {
5211 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5212 nread = read(fd,ptr,size);
5213 if (nread == -1) return -1;
5214 ptr += nread;
5215 size -= nread;
5216 totread += nread;
5217 }
5218 if ((time(NULL)-start) > timeout) {
5219 errno = ETIMEDOUT;
5220 return -1;
5221 }
5222 }
5223 return totread;
5224 }
5225
5226 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5227 ssize_t nread = 0;
5228
5229 size--;
5230 while(size) {
5231 char c;
5232
5233 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5234 if (c == '\n') {
5235 *ptr = '\0';
5236 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5237 return nread;
5238 } else {
5239 *ptr++ = c;
5240 *ptr = '\0';
5241 nread++;
5242 }
5243 }
5244 return nread;
5245 }
5246
5247 static void syncCommand(redisClient *c) {
5248 /* ignore SYNC if aleady slave or in monitor mode */
5249 if (c->flags & REDIS_SLAVE) return;
5250
5251 /* SYNC can't be issued when the server has pending data to send to
5252 * the client about already issued commands. We need a fresh reply
5253 * buffer registering the differences between the BGSAVE and the current
5254 * dataset, so that we can copy to other slaves if needed. */
5255 if (listLength(c->reply) != 0) {
5256 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5257 return;
5258 }
5259
5260 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5261 /* Here we need to check if there is a background saving operation
5262 * in progress, or if it is required to start one */
5263 if (server.bgsavechildpid != -1) {
5264 /* Ok a background save is in progress. Let's check if it is a good
5265 * one for replication, i.e. if there is another slave that is
5266 * registering differences since the server forked to save */
5267 redisClient *slave;
5268 listNode *ln;
5269
5270 listRewind(server.slaves);
5271 while((ln = listYield(server.slaves))) {
5272 slave = ln->value;
5273 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5274 }
5275 if (ln) {
5276 /* Perfect, the server is already registering differences for
5277 * another slave. Set the right state, and copy the buffer. */
5278 listRelease(c->reply);
5279 c->reply = listDup(slave->reply);
5280 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5281 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5282 } else {
5283 /* No way, we need to wait for the next BGSAVE in order to
5284 * register differences */
5285 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5286 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5287 }
5288 } else {
5289 /* Ok we don't have a BGSAVE in progress, let's start one */
5290 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5291 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5292 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5293 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5294 return;
5295 }
5296 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5297 }
5298 c->repldbfd = -1;
5299 c->flags |= REDIS_SLAVE;
5300 c->slaveseldb = 0;
5301 listAddNodeTail(server.slaves,c);
5302 return;
5303 }
5304
5305 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5306 redisClient *slave = privdata;
5307 REDIS_NOTUSED(el);
5308 REDIS_NOTUSED(mask);
5309 char buf[REDIS_IOBUF_LEN];
5310 ssize_t nwritten, buflen;
5311
5312 if (slave->repldboff == 0) {
5313 /* Write the bulk write count before to transfer the DB. In theory here
5314 * we don't know how much room there is in the output buffer of the
5315 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5316 * operations) will never be smaller than the few bytes we need. */
5317 sds bulkcount;
5318
5319 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5320 slave->repldbsize);
5321 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5322 {
5323 sdsfree(bulkcount);
5324 freeClient(slave);
5325 return;
5326 }
5327 sdsfree(bulkcount);
5328 }
5329 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5330 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5331 if (buflen <= 0) {
5332 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5333 (buflen == 0) ? "premature EOF" : strerror(errno));
5334 freeClient(slave);
5335 return;
5336 }
5337 if ((nwritten = write(fd,buf,buflen)) == -1) {
5338 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5339 strerror(errno));
5340 freeClient(slave);
5341 return;
5342 }
5343 slave->repldboff += nwritten;
5344 if (slave->repldboff == slave->repldbsize) {
5345 close(slave->repldbfd);
5346 slave->repldbfd = -1;
5347 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5348 slave->replstate = REDIS_REPL_ONLINE;
5349 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5350 sendReplyToClient, slave) == AE_ERR) {
5351 freeClient(slave);
5352 return;
5353 }
5354 addReplySds(slave,sdsempty());
5355 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5356 }
5357 }
5358
5359 /* This function is called at the end of every backgrond saving.
5360 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5361 * otherwise REDIS_ERR is passed to the function.
5362 *
5363 * The goal of this function is to handle slaves waiting for a successful
5364 * background saving in order to perform non-blocking synchronization. */
5365 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5366 listNode *ln;
5367 int startbgsave = 0;
5368
5369 listRewind(server.slaves);
5370 while((ln = listYield(server.slaves))) {
5371 redisClient *slave = ln->value;
5372
5373 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5374 startbgsave = 1;
5375 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5376 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5377 struct redis_stat buf;
5378
5379 if (bgsaveerr != REDIS_OK) {
5380 freeClient(slave);
5381 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5382 continue;
5383 }
5384 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5385 redis_fstat(slave->repldbfd,&buf) == -1) {
5386 freeClient(slave);
5387 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5388 continue;
5389 }
5390 slave->repldboff = 0;
5391 slave->repldbsize = buf.st_size;
5392 slave->replstate = REDIS_REPL_SEND_BULK;
5393 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5394 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5395 freeClient(slave);
5396 continue;
5397 }
5398 }
5399 }
5400 if (startbgsave) {
5401 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5402 listRewind(server.slaves);
5403 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5404 while((ln = listYield(server.slaves))) {
5405 redisClient *slave = ln->value;
5406
5407 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5408 freeClient(slave);
5409 }
5410 }
5411 }
5412 }
5413
5414 static int syncWithMaster(void) {
5415 char buf[1024], tmpfile[256], authcmd[1024];
5416 int dumpsize;
5417 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5418 int dfd;
5419
5420 if (fd == -1) {
5421 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5422 strerror(errno));
5423 return REDIS_ERR;
5424 }
5425
5426 /* AUTH with the master if required. */
5427 if(server.masterauth) {
5428 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5429 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5430 close(fd);
5431 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5432 strerror(errno));
5433 return REDIS_ERR;
5434 }
5435 /* Read the AUTH result. */
5436 if (syncReadLine(fd,buf,1024,3600) == -1) {
5437 close(fd);
5438 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5439 strerror(errno));
5440 return REDIS_ERR;
5441 }
5442 if (buf[0] != '+') {
5443 close(fd);
5444 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5445 return REDIS_ERR;
5446 }
5447 }
5448
5449 /* Issue the SYNC command */
5450 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
5451 close(fd);
5452 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
5453 strerror(errno));
5454 return REDIS_ERR;
5455 }
5456 /* Read the bulk write count */
5457 if (syncReadLine(fd,buf,1024,3600) == -1) {
5458 close(fd);
5459 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
5460 strerror(errno));
5461 return REDIS_ERR;
5462 }
5463 if (buf[0] != '$') {
5464 close(fd);
5465 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
5466 return REDIS_ERR;
5467 }
5468 dumpsize = atoi(buf+1);
5469 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
5470 /* Read the bulk write data on a temp file */
5471 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
5472 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
5473 if (dfd == -1) {
5474 close(fd);
5475 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
5476 return REDIS_ERR;
5477 }
5478 while(dumpsize) {
5479 int nread, nwritten;
5480
5481 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
5482 if (nread == -1) {
5483 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
5484 strerror(errno));
5485 close(fd);
5486 close(dfd);
5487 return REDIS_ERR;
5488 }
5489 nwritten = write(dfd,buf,nread);
5490 if (nwritten == -1) {
5491 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
5492 close(fd);
5493 close(dfd);
5494 return REDIS_ERR;
5495 }
5496 dumpsize -= nread;
5497 }
5498 close(dfd);
5499 if (rename(tmpfile,server.dbfilename) == -1) {
5500 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
5501 unlink(tmpfile);
5502 close(fd);
5503 return REDIS_ERR;
5504 }
5505 emptyDb();
5506 if (rdbLoad(server.dbfilename) != REDIS_OK) {
5507 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
5508 close(fd);
5509 return REDIS_ERR;
5510 }
5511 server.master = createClient(fd);
5512 server.master->flags |= REDIS_MASTER;
5513 server.replstate = REDIS_REPL_CONNECTED;
5514 return REDIS_OK;
5515 }
5516
5517 static void slaveofCommand(redisClient *c) {
5518 if (!strcasecmp(c->argv[1]->ptr,"no") &&
5519 !strcasecmp(c->argv[2]->ptr,"one")) {
5520 if (server.masterhost) {
5521 sdsfree(server.masterhost);
5522 server.masterhost = NULL;
5523 if (server.master) freeClient(server.master);
5524 server.replstate = REDIS_REPL_NONE;
5525 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
5526 }
5527 } else {
5528 sdsfree(server.masterhost);
5529 server.masterhost = sdsdup(c->argv[1]->ptr);
5530 server.masterport = atoi(c->argv[2]->ptr);
5531 if (server.master) freeClient(server.master);
5532 server.replstate = REDIS_REPL_CONNECT;
5533 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
5534 server.masterhost, server.masterport);
5535 }
5536 addReply(c,shared.ok);
5537 }
5538
5539 /* ============================ Maxmemory directive ======================== */
5540
5541 /* This function gets called when 'maxmemory' is set on the config file to limit
5542 * the max memory used by the server, and we are out of memory.
5543 * This function will try to, in order:
5544 *
5545 * - Free objects from the free list
5546 * - Try to remove keys with an EXPIRE set
5547 *
5548 * It is not possible to free enough memory to reach used-memory < maxmemory
5549 * the server will start refusing commands that will enlarge even more the
5550 * memory usage.
5551 */
5552 static void freeMemoryIfNeeded(void) {
5553 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
5554 if (listLength(server.objfreelist)) {
5555 robj *o;
5556
5557 listNode *head = listFirst(server.objfreelist);
5558 o = listNodeValue(head);
5559 listDelNode(server.objfreelist,head);
5560 zfree(o);
5561 } else {
5562 int j, k, freed = 0;
5563
5564 for (j = 0; j < server.dbnum; j++) {
5565 int minttl = -1;
5566 robj *minkey = NULL;
5567 struct dictEntry *de;
5568
5569 if (dictSize(server.db[j].expires)) {
5570 freed = 1;
5571 /* From a sample of three keys drop the one nearest to
5572 * the natural expire */
5573 for (k = 0; k < 3; k++) {
5574 time_t t;
5575
5576 de = dictGetRandomKey(server.db[j].expires);
5577 t = (time_t) dictGetEntryVal(de);
5578 if (minttl == -1 || t < minttl) {
5579 minkey = dictGetEntryKey(de);
5580 minttl = t;
5581 }
5582 }
5583 deleteKey(server.db+j,minkey);
5584 }
5585 }
5586 if (!freed) return; /* nothing to free... */
5587 }
5588 }
5589 }
5590
5591 /* ============================== Append Only file ========================== */
5592
5593 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
5594 sds buf = sdsempty();
5595 int j;
5596 ssize_t nwritten;
5597 time_t now;
5598 robj *tmpargv[3];
5599
5600 /* The DB this command was targetting is not the same as the last command
5601 * we appendend. To issue a SELECT command is needed. */
5602 if (dictid != server.appendseldb) {
5603 char seldb[64];
5604
5605 snprintf(seldb,sizeof(seldb),"%d",dictid);
5606 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
5607 strlen(seldb),seldb);
5608 server.appendseldb = dictid;
5609 }
5610
5611 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
5612 * EXPIREs into EXPIREATs calls */
5613 if (cmd->proc == expireCommand) {
5614 long when;
5615
5616 tmpargv[0] = createStringObject("EXPIREAT",8);
5617 tmpargv[1] = argv[1];
5618 incrRefCount(argv[1]);
5619 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
5620 tmpargv[2] = createObject(REDIS_STRING,
5621 sdscatprintf(sdsempty(),"%ld",when));
5622 argv = tmpargv;
5623 }
5624
5625 /* Append the actual command */
5626 buf = sdscatprintf(buf,"*%d\r\n",argc);
5627 for (j = 0; j < argc; j++) {
5628 robj *o = argv[j];
5629
5630 o = getDecodedObject(o);
5631 buf = sdscatprintf(buf,"$%d\r\n",sdslen(o->ptr));
5632 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
5633 buf = sdscatlen(buf,"\r\n",2);
5634 decrRefCount(o);
5635 }
5636
5637 /* Free the objects from the modified argv for EXPIREAT */
5638 if (cmd->proc == expireCommand) {
5639 for (j = 0; j < 3; j++)
5640 decrRefCount(argv[j]);
5641 }
5642
5643 /* We want to perform a single write. This should be guaranteed atomic
5644 * at least if the filesystem we are writing is a real physical one.
5645 * While this will save us against the server being killed I don't think
5646 * there is much to do about the whole server stopping for power problems
5647 * or alike */
5648 nwritten = write(server.appendfd,buf,sdslen(buf));
5649 if (nwritten != (signed)sdslen(buf)) {
5650 /* Ooops, we are in troubles. The best thing to do for now is
5651 * to simply exit instead to give the illusion that everything is
5652 * working as expected. */
5653 if (nwritten == -1) {
5654 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
5655 } else {
5656 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
5657 }
5658 exit(1);
5659 }
5660 /* If a background append only file rewriting is in progress we want to
5661 * accumulate the differences between the child DB and the current one
5662 * in a buffer, so that when the child process will do its work we
5663 * can append the differences to the new append only file. */
5664 if (server.bgrewritechildpid != -1)
5665 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
5666
5667 sdsfree(buf);
5668 now = time(NULL);
5669 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
5670 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
5671 now-server.lastfsync > 1))
5672 {
5673 fsync(server.appendfd); /* Let's try to get this data on the disk */
5674 server.lastfsync = now;
5675 }
5676 }
5677
5678 /* In Redis commands are always executed in the context of a client, so in
5679 * order to load the append only file we need to create a fake client. */
5680 static struct redisClient *createFakeClient(void) {
5681 struct redisClient *c = zmalloc(sizeof(*c));
5682
5683 selectDb(c,0);
5684 c->fd = -1;
5685 c->querybuf = sdsempty();
5686 c->argc = 0;
5687 c->argv = NULL;
5688 c->flags = 0;
5689 /* We set the fake client as a slave waiting for the synchronization
5690 * so that Redis will not try to send replies to this client. */
5691 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5692 c->reply = listCreate();
5693 listSetFreeMethod(c->reply,decrRefCount);
5694 listSetDupMethod(c->reply,dupClientReplyValue);
5695 return c;
5696 }
5697
5698 static void freeFakeClient(struct redisClient *c) {
5699 sdsfree(c->querybuf);
5700 listRelease(c->reply);
5701 zfree(c);
5702 }
5703
5704 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
5705 * error (the append only file is zero-length) REDIS_ERR is returned. On
5706 * fatal error an error message is logged and the program exists. */
5707 int loadAppendOnlyFile(char *filename) {
5708 struct redisClient *fakeClient;
5709 FILE *fp = fopen(filename,"r");
5710 struct redis_stat sb;
5711
5712 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
5713 return REDIS_ERR;
5714
5715 if (fp == NULL) {
5716 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
5717 exit(1);
5718 }
5719
5720 fakeClient = createFakeClient();
5721 while(1) {
5722 int argc, j;
5723 unsigned long len;
5724 robj **argv;
5725 char buf[128];
5726 sds argsds;
5727 struct redisCommand *cmd;
5728
5729 if (fgets(buf,sizeof(buf),fp) == NULL) {
5730 if (feof(fp))
5731 break;
5732 else
5733 goto readerr;
5734 }
5735 if (buf[0] != '*') goto fmterr;
5736 argc = atoi(buf+1);
5737 argv = zmalloc(sizeof(robj*)*argc);
5738 for (j = 0; j < argc; j++) {
5739 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
5740 if (buf[0] != '$') goto fmterr;
5741 len = strtol(buf+1,NULL,10);
5742 argsds = sdsnewlen(NULL,len);
5743 if (fread(argsds,len,1,fp) == 0) goto fmterr;
5744 argv[j] = createObject(REDIS_STRING,argsds);
5745 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
5746 }
5747
5748 /* Command lookup */
5749 cmd = lookupCommand(argv[0]->ptr);
5750 if (!cmd) {
5751 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
5752 exit(1);
5753 }
5754 /* Try object sharing and encoding */
5755 if (server.shareobjects) {
5756 int j;
5757 for(j = 1; j < argc; j++)
5758 argv[j] = tryObjectSharing(argv[j]);
5759 }
5760 if (cmd->flags & REDIS_CMD_BULK)
5761 tryObjectEncoding(argv[argc-1]);
5762 /* Run the command in the context of a fake client */
5763 fakeClient->argc = argc;
5764 fakeClient->argv = argv;
5765 cmd->proc(fakeClient);
5766 /* Discard the reply objects list from the fake client */
5767 while(listLength(fakeClient->reply))
5768 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
5769 /* Clean up, ready for the next command */
5770 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
5771 zfree(argv);
5772 }
5773 fclose(fp);
5774 freeFakeClient(fakeClient);
5775 return REDIS_OK;
5776
5777 readerr:
5778 if (feof(fp)) {
5779 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
5780 } else {
5781 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
5782 }
5783 exit(1);
5784 fmterr:
5785 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
5786 exit(1);
5787 }
5788
5789 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
5790 static int fwriteBulk(FILE *fp, robj *obj) {
5791 char buf[128];
5792 obj = getDecodedObject(obj);
5793 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
5794 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
5795 if (fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0) goto err;
5796 if (fwrite("\r\n",2,1,fp) == 0) goto err;
5797 decrRefCount(obj);
5798 return 1;
5799 err:
5800 decrRefCount(obj);
5801 return 0;
5802 }
5803
5804 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
5805 static int fwriteBulkDouble(FILE *fp, double d) {
5806 char buf[128], dbuf[128];
5807
5808 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
5809 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
5810 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5811 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
5812 return 1;
5813 }
5814
5815 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
5816 static int fwriteBulkLong(FILE *fp, long l) {
5817 char buf[128], lbuf[128];
5818
5819 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
5820 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
5821 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
5822 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
5823 return 1;
5824 }
5825
5826 /* Write a sequence of commands able to fully rebuild the dataset into
5827 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
5828 static int rewriteAppendOnlyFile(char *filename) {
5829 dictIterator *di = NULL;
5830 dictEntry *de;
5831 FILE *fp;
5832 char tmpfile[256];
5833 int j;
5834 time_t now = time(NULL);
5835
5836 /* Note that we have to use a different temp name here compared to the
5837 * one used by rewriteAppendOnlyFileBackground() function. */
5838 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
5839 fp = fopen(tmpfile,"w");
5840 if (!fp) {
5841 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
5842 return REDIS_ERR;
5843 }
5844 for (j = 0; j < server.dbnum; j++) {
5845 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
5846 redisDb *db = server.db+j;
5847 dict *d = db->dict;
5848 if (dictSize(d) == 0) continue;
5849 di = dictGetIterator(d);
5850 if (!di) {
5851 fclose(fp);
5852 return REDIS_ERR;
5853 }
5854
5855 /* SELECT the new DB */
5856 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
5857 if (fwriteBulkLong(fp,j) == 0) goto werr;
5858
5859 /* Iterate this DB writing every entry */
5860 while((de = dictNext(di)) != NULL) {
5861 robj *key = dictGetEntryKey(de);
5862 robj *o = dictGetEntryVal(de);
5863 time_t expiretime = getExpire(db,key);
5864
5865 /* Save the key and associated value */
5866 if (o->type == REDIS_STRING) {
5867 /* Emit a SET command */
5868 char cmd[]="*3\r\n$3\r\nSET\r\n";
5869 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5870 /* Key and value */
5871 if (fwriteBulk(fp,key) == 0) goto werr;
5872 if (fwriteBulk(fp,o) == 0) goto werr;
5873 } else if (o->type == REDIS_LIST) {
5874 /* Emit the RPUSHes needed to rebuild the list */
5875 list *list = o->ptr;
5876 listNode *ln;
5877
5878 listRewind(list);
5879 while((ln = listYield(list))) {
5880 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
5881 robj *eleobj = listNodeValue(ln);
5882
5883 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5884 if (fwriteBulk(fp,key) == 0) goto werr;
5885 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5886 }
5887 } else if (o->type == REDIS_SET) {
5888 /* Emit the SADDs needed to rebuild the set */
5889 dict *set = o->ptr;
5890 dictIterator *di = dictGetIterator(set);
5891 dictEntry *de;
5892
5893 while((de = dictNext(di)) != NULL) {
5894 char cmd[]="*3\r\n$4\r\nSADD\r\n";
5895 robj *eleobj = dictGetEntryKey(de);
5896
5897 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5898 if (fwriteBulk(fp,key) == 0) goto werr;
5899 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5900 }
5901 dictReleaseIterator(di);
5902 } else if (o->type == REDIS_ZSET) {
5903 /* Emit the ZADDs needed to rebuild the sorted set */
5904 zset *zs = o->ptr;
5905 dictIterator *di = dictGetIterator(zs->dict);
5906 dictEntry *de;
5907
5908 while((de = dictNext(di)) != NULL) {
5909 char cmd[]="*4\r\n$4\r\nZADD\r\n";
5910 robj *eleobj = dictGetEntryKey(de);
5911 double *score = dictGetEntryVal(de);
5912
5913 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5914 if (fwriteBulk(fp,key) == 0) goto werr;
5915 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
5916 if (fwriteBulk(fp,eleobj) == 0) goto werr;
5917 }
5918 dictReleaseIterator(di);
5919 } else {
5920 assert(0 != 0);
5921 }
5922 /* Save the expire time */
5923 if (expiretime != -1) {
5924 char cmd[]="*3\r\n$6\r\nEXPIRE\r\n";
5925 /* If this key is already expired skip it */
5926 if (expiretime < now) continue;
5927 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
5928 if (fwriteBulk(fp,key) == 0) goto werr;
5929 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
5930 }
5931 }
5932 dictReleaseIterator(di);
5933 }
5934
5935 /* Make sure data will not remain on the OS's output buffers */
5936 fflush(fp);
5937 fsync(fileno(fp));
5938 fclose(fp);
5939
5940 /* Use RENAME to make sure the DB file is changed atomically only
5941 * if the generate DB file is ok. */
5942 if (rename(tmpfile,filename) == -1) {
5943 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
5944 unlink(tmpfile);
5945 return REDIS_ERR;
5946 }
5947 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
5948 return REDIS_OK;
5949
5950 werr:
5951 fclose(fp);
5952 unlink(tmpfile);
5953 redisLog(REDIS_WARNING,"Write error writing append only fileon disk: %s", strerror(errno));
5954 if (di) dictReleaseIterator(di);
5955 return REDIS_ERR;
5956 }
5957
5958 /* This is how rewriting of the append only file in background works:
5959 *
5960 * 1) The user calls BGREWRITEAOF
5961 * 2) Redis calls this function, that forks():
5962 * 2a) the child rewrite the append only file in a temp file.
5963 * 2b) the parent accumulates differences in server.bgrewritebuf.
5964 * 3) When the child finished '2a' exists.
5965 * 4) The parent will trap the exit code, if it's OK, will append the
5966 * data accumulated into server.bgrewritebuf into the temp file, and
5967 * finally will rename(2) the temp file in the actual file name.
5968 * The the new file is reopened as the new append only file. Profit!
5969 */
5970 static int rewriteAppendOnlyFileBackground(void) {
5971 pid_t childpid;
5972
5973 if (server.bgrewritechildpid != -1) return REDIS_ERR;
5974 if ((childpid = fork()) == 0) {
5975 /* Child */
5976 char tmpfile[256];
5977 close(server.fd);
5978
5979 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
5980 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
5981 exit(0);
5982 } else {
5983 exit(1);
5984 }
5985 } else {
5986 /* Parent */
5987 if (childpid == -1) {
5988 redisLog(REDIS_WARNING,
5989 "Can't rewrite append only file in background: fork: %s",
5990 strerror(errno));
5991 return REDIS_ERR;
5992 }
5993 redisLog(REDIS_NOTICE,
5994 "Background append only file rewriting started by pid %d",childpid);
5995 server.bgrewritechildpid = childpid;
5996 /* We set appendseldb to -1 in order to force the next call to the
5997 * feedAppendOnlyFile() to issue a SELECT command, so the differences
5998 * accumulated by the parent into server.bgrewritebuf will start
5999 * with a SELECT statement and it will be safe to merge. */
6000 server.appendseldb = -1;
6001 return REDIS_OK;
6002 }
6003 return REDIS_OK; /* unreached */
6004 }
6005
6006 static void bgrewriteaofCommand(redisClient *c) {
6007 if (server.bgrewritechildpid != -1) {
6008 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6009 return;
6010 }
6011 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6012 addReply(c,shared.ok);
6013 } else {
6014 addReply(c,shared.err);
6015 }
6016 }
6017
6018 static void aofRemoveTempFile(pid_t childpid) {
6019 char tmpfile[256];
6020
6021 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6022 unlink(tmpfile);
6023 }
6024
6025 /* ================================= Debugging ============================== */
6026
6027 static void debugCommand(redisClient *c) {
6028 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6029 *((char*)-1) = 'x';
6030 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6031 if (rdbSave(server.dbfilename) != REDIS_OK) {
6032 addReply(c,shared.err);
6033 return;
6034 }
6035 emptyDb();
6036 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6037 addReply(c,shared.err);
6038 return;
6039 }
6040 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6041 addReply(c,shared.ok);
6042 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6043 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6044 robj *key, *val;
6045
6046 if (!de) {
6047 addReply(c,shared.nokeyerr);
6048 return;
6049 }
6050 key = dictGetEntryKey(de);
6051 val = dictGetEntryVal(de);
6052 addReplySds(c,sdscatprintf(sdsempty(),
6053 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d\r\n",
6054 key, key->refcount, val, val->refcount, val->encoding));
6055 } else {
6056 addReplySds(c,sdsnew(
6057 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6058 }
6059 }
6060
6061 /* =================================== Main! ================================ */
6062
6063 #ifdef __linux__
6064 int linuxOvercommitMemoryValue(void) {
6065 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6066 char buf[64];
6067
6068 if (!fp) return -1;
6069 if (fgets(buf,64,fp) == NULL) {
6070 fclose(fp);
6071 return -1;
6072 }
6073 fclose(fp);
6074
6075 return atoi(buf);
6076 }
6077
6078 void linuxOvercommitMemoryWarning(void) {
6079 if (linuxOvercommitMemoryValue() == 0) {
6080 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6081 }
6082 }
6083 #endif /* __linux__ */
6084
6085 static void daemonize(void) {
6086 int fd;
6087 FILE *fp;
6088
6089 if (fork() != 0) exit(0); /* parent exits */
6090 setsid(); /* create a new session */
6091
6092 /* Every output goes to /dev/null. If Redis is daemonized but
6093 * the 'logfile' is set to 'stdout' in the configuration file
6094 * it will not log at all. */
6095 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6096 dup2(fd, STDIN_FILENO);
6097 dup2(fd, STDOUT_FILENO);
6098 dup2(fd, STDERR_FILENO);
6099 if (fd > STDERR_FILENO) close(fd);
6100 }
6101 /* Try to write the pid file */
6102 fp = fopen(server.pidfile,"w");
6103 if (fp) {
6104 fprintf(fp,"%d\n",getpid());
6105 fclose(fp);
6106 }
6107 }
6108
6109 int main(int argc, char **argv) {
6110 initServerConfig();
6111 if (argc == 2) {
6112 resetServerSaveParams();
6113 loadServerConfig(argv[1]);
6114 } else if (argc > 2) {
6115 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6116 exit(1);
6117 } else {
6118 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6119 }
6120 initServer();
6121 if (server.daemonize) daemonize();
6122 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6123 #ifdef __linux__
6124 linuxOvercommitMemoryWarning();
6125 #endif
6126 if (server.appendonly) {
6127 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6128 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6129 } else {
6130 if (rdbLoad(server.dbfilename) == REDIS_OK)
6131 redisLog(REDIS_NOTICE,"DB loaded from disk");
6132 }
6133 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6134 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6135 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6136 aeMain(server.el);
6137 aeDeleteEventLoop(server.el);
6138 return 0;
6139 }
6140
6141 /* ============================= Backtrace support ========================= */
6142
6143 #ifdef HAVE_BACKTRACE
6144 static char *findFuncName(void *pointer, unsigned long *offset);
6145
6146 static void *getMcontextEip(ucontext_t *uc) {
6147 #if defined(__FreeBSD__)
6148 return (void*) uc->uc_mcontext.mc_eip;
6149 #elif defined(__dietlibc__)
6150 return (void*) uc->uc_mcontext.eip;
6151 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6152 return (void*) uc->uc_mcontext->__ss.__eip;
6153 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6154 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6155 return (void*) uc->uc_mcontext->__ss.__rip;
6156 #else
6157 return (void*) uc->uc_mcontext->__ss.__eip;
6158 #endif
6159 #elif defined(__i386__) || defined(__X86_64__) /* Linux x86 */
6160 return (void*) uc->uc_mcontext.gregs[REG_EIP];
6161 #elif defined(__ia64__) /* Linux IA64 */
6162 return (void*) uc->uc_mcontext.sc_ip;
6163 #else
6164 return NULL;
6165 #endif
6166 }
6167
6168 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6169 void *trace[100];
6170 char **messages = NULL;
6171 int i, trace_size = 0;
6172 unsigned long offset=0;
6173 time_t uptime = time(NULL)-server.stat_starttime;
6174 ucontext_t *uc = (ucontext_t*) secret;
6175 REDIS_NOTUSED(info);
6176
6177 redisLog(REDIS_WARNING,
6178 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6179 redisLog(REDIS_WARNING, "%s", sdscatprintf(sdsempty(),
6180 "redis_version:%s; "
6181 "uptime_in_seconds:%d; "
6182 "connected_clients:%d; "
6183 "connected_slaves:%d; "
6184 "used_memory:%zu; "
6185 "changes_since_last_save:%lld; "
6186 "bgsave_in_progress:%d; "
6187 "last_save_time:%d; "
6188 "total_connections_received:%lld; "
6189 "total_commands_processed:%lld; "
6190 "role:%s;"
6191 ,REDIS_VERSION,
6192 uptime,
6193 listLength(server.clients)-listLength(server.slaves),
6194 listLength(server.slaves),
6195 server.usedmemory,
6196 server.dirty,
6197 server.bgsavechildpid != -1,
6198 server.lastsave,
6199 server.stat_numconnections,
6200 server.stat_numcommands,
6201 server.masterhost == NULL ? "master" : "slave"
6202 ));
6203
6204 trace_size = backtrace(trace, 100);
6205 /* overwrite sigaction with caller's address */
6206 if (getMcontextEip(uc) != NULL) {
6207 trace[1] = getMcontextEip(uc);
6208 }
6209 messages = backtrace_symbols(trace, trace_size);
6210
6211 for (i=1; i<trace_size; ++i) {
6212 char *fn = findFuncName(trace[i], &offset), *p;
6213
6214 p = strchr(messages[i],'+');
6215 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6216 redisLog(REDIS_WARNING,"%s", messages[i]);
6217 } else {
6218 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6219 }
6220 }
6221 free(messages);
6222 exit(0);
6223 }
6224
6225 static void setupSigSegvAction(void) {
6226 struct sigaction act;
6227
6228 sigemptyset (&act.sa_mask);
6229 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6230 * is used. Otherwise, sa_handler is used */
6231 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6232 act.sa_sigaction = segvHandler;
6233 sigaction (SIGSEGV, &act, NULL);
6234 sigaction (SIGBUS, &act, NULL);
6235 sigaction (SIGFPE, &act, NULL);
6236 sigaction (SIGILL, &act, NULL);
6237 sigaction (SIGBUS, &act, NULL);
6238 return;
6239 }
6240
6241 #include "staticsymbols.h"
6242 /* This function try to convert a pointer into a function name. It's used in
6243 * oreder to provide a backtrace under segmentation fault that's able to
6244 * display functions declared as static (otherwise the backtrace is useless). */
6245 static char *findFuncName(void *pointer, unsigned long *offset){
6246 int i, ret = -1;
6247 unsigned long off, minoff = 0;
6248
6249 /* Try to match against the Symbol with the smallest offset */
6250 for (i=0; symsTable[i].pointer; i++) {
6251 unsigned long lp = (unsigned long) pointer;
6252
6253 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6254 off=lp-symsTable[i].pointer;
6255 if (ret < 0 || off < minoff) {
6256 minoff=off;
6257 ret=i;
6258 }
6259 }
6260 }
6261 if (ret == -1) return NULL;
6262 *offset = minoff;
6263 return symsTable[ret].name;
6264 }
6265 #else /* HAVE_BACKTRACE */
6266 static void setupSigSegvAction(void) {
6267 }
6268 #endif /* HAVE_BACKTRACE */
6269
6270
6271
6272 /* The End */
6273
6274
6275