]> git.saurik.com Git - redis.git/blob - redis.c
a8d9520a69340e9e67a34082b5347aed193a3569
[redis.git] / redis.c
1 /*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #define REDIS_VERSION "1.3.2"
31
32 #include "fmacros.h"
33 #include "config.h"
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <time.h>
39 #include <unistd.h>
40 #define __USE_POSIX199309
41 #include <signal.h>
42
43 #ifdef HAVE_BACKTRACE
44 #include <execinfo.h>
45 #include <ucontext.h>
46 #endif /* HAVE_BACKTRACE */
47
48 #include <sys/wait.h>
49 #include <errno.h>
50 #include <assert.h>
51 #include <ctype.h>
52 #include <stdarg.h>
53 #include <inttypes.h>
54 #include <arpa/inet.h>
55 #include <sys/stat.h>
56 #include <fcntl.h>
57 #include <sys/time.h>
58 #include <sys/resource.h>
59 #include <sys/uio.h>
60 #include <limits.h>
61 #include <math.h>
62
63 #if defined(__sun)
64 #include "solarisfixes.h"
65 #endif
66
67 #include "redis.h"
68 #include "ae.h" /* Event driven programming library */
69 #include "sds.h" /* Dynamic safe strings */
70 #include "anet.h" /* Networking the easy way */
71 #include "dict.h" /* Hash tables */
72 #include "adlist.h" /* Linked lists */
73 #include "zmalloc.h" /* total memory usage aware version of malloc/free */
74 #include "lzf.h" /* LZF compression library */
75 #include "pqsort.h" /* Partial qsort for SORT+LIMIT */
76
77 /* Error codes */
78 #define REDIS_OK 0
79 #define REDIS_ERR -1
80
81 /* Static server configuration */
82 #define REDIS_SERVERPORT 6379 /* TCP port */
83 #define REDIS_MAXIDLETIME (60*5) /* default client timeout */
84 #define REDIS_IOBUF_LEN 1024
85 #define REDIS_LOADBUF_LEN 1024
86 #define REDIS_STATIC_ARGS 4
87 #define REDIS_DEFAULT_DBNUM 16
88 #define REDIS_CONFIGLINE_MAX 1024
89 #define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
90 #define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
91 #define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
92 #define REDIS_MAX_WRITE_PER_EVENT (1024*64)
93 #define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
94
95 /* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
96 #define REDIS_WRITEV_THRESHOLD 3
97 /* Max number of iovecs used for each writev call */
98 #define REDIS_WRITEV_IOVEC_COUNT 256
99
100 /* Hash table parameters */
101 #define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
102
103 /* Command flags */
104 #define REDIS_CMD_BULK 1 /* Bulk write command */
105 #define REDIS_CMD_INLINE 2 /* Inline command */
106 /* REDIS_CMD_DENYOOM reserves a longer comment: all the commands marked with
107 this flags will return an error when the 'maxmemory' option is set in the
108 config file and the server is using more than maxmemory bytes of memory.
109 In short this commands are denied on low memory conditions. */
110 #define REDIS_CMD_DENYOOM 4
111
112 /* Object types */
113 #define REDIS_STRING 0
114 #define REDIS_LIST 1
115 #define REDIS_SET 2
116 #define REDIS_ZSET 3
117 #define REDIS_HASH 4
118
119 /* Objects encoding */
120 #define REDIS_ENCODING_RAW 0 /* Raw representation */
121 #define REDIS_ENCODING_INT 1 /* Encoded as integer */
122
123 /* Object types only used for dumping to disk */
124 #define REDIS_EXPIRETIME 253
125 #define REDIS_SELECTDB 254
126 #define REDIS_EOF 255
127
128 /* Defines related to the dump file format. To store 32 bits lengths for short
129 * keys requires a lot of space, so we check the most significant 2 bits of
130 * the first byte to interpreter the length:
131 *
132 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
133 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
134 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
135 * 11|000000 this means: specially encoded object will follow. The six bits
136 * number specify the kind of object that follows.
137 * See the REDIS_RDB_ENC_* defines.
138 *
139 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
140 * values, will fit inside. */
141 #define REDIS_RDB_6BITLEN 0
142 #define REDIS_RDB_14BITLEN 1
143 #define REDIS_RDB_32BITLEN 2
144 #define REDIS_RDB_ENCVAL 3
145 #define REDIS_RDB_LENERR UINT_MAX
146
147 /* When a length of a string object stored on disk has the first two bits
148 * set, the remaining two bits specify a special encoding for the object
149 * accordingly to the following defines: */
150 #define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
151 #define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
152 #define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
153 #define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
154
155 /* Virtual memory object->where field. */
156 #define REDIS_VM_MEMORY 0 /* The object is on memory */
157 #define REDIS_VM_SWAPPED 1 /* The object is on disk */
158 #define REDIS_VM_SWAPPING 2 /* Redis is swapping this object on disk */
159 #define REDIS_VM_LOADING 3 /* Redis is loading this object from disk */
160
161 /* Client flags */
162 #define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
163 #define REDIS_SLAVE 2 /* This client is a slave server */
164 #define REDIS_MASTER 4 /* This client is a master server */
165 #define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
166 #define REDIS_MULTI 16 /* This client is in a MULTI context */
167 #define REDIS_BLOCKED 32 /* The client is waiting in a blocking operation */
168
169 /* Slave replication state - slave side */
170 #define REDIS_REPL_NONE 0 /* No active replication */
171 #define REDIS_REPL_CONNECT 1 /* Must connect to master */
172 #define REDIS_REPL_CONNECTED 2 /* Connected to master */
173
174 /* Slave replication state - from the point of view of master
175 * Note that in SEND_BULK and ONLINE state the slave receives new updates
176 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
177 * to start the next background saving in order to send updates to it. */
178 #define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
179 #define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
180 #define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
181 #define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
182
183 /* List related stuff */
184 #define REDIS_HEAD 0
185 #define REDIS_TAIL 1
186
187 /* Sort operations */
188 #define REDIS_SORT_GET 0
189 #define REDIS_SORT_ASC 1
190 #define REDIS_SORT_DESC 2
191 #define REDIS_SORTKEY_MAX 1024
192
193 /* Log levels */
194 #define REDIS_DEBUG 0
195 #define REDIS_NOTICE 1
196 #define REDIS_WARNING 2
197
198 /* Anti-warning macro... */
199 #define REDIS_NOTUSED(V) ((void) V)
200
201 #define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
202 #define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
203
204 /* Append only defines */
205 #define APPENDFSYNC_NO 0
206 #define APPENDFSYNC_ALWAYS 1
207 #define APPENDFSYNC_EVERYSEC 2
208
209 /* We can print the stacktrace, so our assert is defined this way: */
210 #define redisAssert(_e) ((_e)?(void)0 : (_redisAssert(#_e),exit(1)))
211 static void _redisAssert(char *estr);
212
213 /*================================= Data types ============================== */
214
215 /* A redis object, that is a type able to hold a string / list / set */
216
217 /* The VM object structure */
218 struct redisObjectVM {
219 off_t offset; /* the page at witch the object is stored on disk */
220 int pages; /* number of pages used on disk */
221 } vm;
222
223 /* The actual Redis Object */
224 typedef struct redisObject {
225 void *ptr;
226 unsigned char type;
227 unsigned char encoding;
228 unsigned char storage; /* where? REDIS_VM_MEMORY, REDIS_VM_SWAPPED, ... */
229 unsigned char notused;
230 int refcount;
231 /* VM fields, this are only allocated if VM is active, otherwise the
232 * object allocation function will just allocate
233 * sizeof(redisObjct) minus sizeof(redisObjectVM), so using
234 * Redis without VM active will not have any overhead. */
235 struct redisObjectVM vm;
236 } robj;
237
238 /* Macro used to initalize a Redis object allocated on the stack.
239 * Note that this macro is taken near the structure definition to make sure
240 * we'll update it when the structure is changed, to avoid bugs like
241 * bug #85 introduced exactly in this way. */
242 #define initStaticStringObject(_var,_ptr) do { \
243 _var.refcount = 1; \
244 _var.type = REDIS_STRING; \
245 _var.encoding = REDIS_ENCODING_RAW; \
246 _var.ptr = _ptr; \
247 } while(0);
248
249 typedef struct redisDb {
250 dict *dict; /* The keyspace for this DB */
251 dict *expires; /* Timeout of keys with a timeout set */
252 dict *blockingkeys; /* Keys with clients waiting for data (BLPOP) */
253 int id;
254 } redisDb;
255
256 /* Client MULTI/EXEC state */
257 typedef struct multiCmd {
258 robj **argv;
259 int argc;
260 struct redisCommand *cmd;
261 } multiCmd;
262
263 typedef struct multiState {
264 multiCmd *commands; /* Array of MULTI commands */
265 int count; /* Total number of MULTI commands */
266 } multiState;
267
268 /* With multiplexing we need to take per-clinet state.
269 * Clients are taken in a liked list. */
270 typedef struct redisClient {
271 int fd;
272 redisDb *db;
273 int dictid;
274 sds querybuf;
275 robj **argv, **mbargv;
276 int argc, mbargc;
277 int bulklen; /* bulk read len. -1 if not in bulk read mode */
278 int multibulk; /* multi bulk command format active */
279 list *reply;
280 int sentlen;
281 time_t lastinteraction; /* time of the last interaction, used for timeout */
282 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
283 /* REDIS_MULTI */
284 int slaveseldb; /* slave selected db, if this client is a slave */
285 int authenticated; /* when requirepass is non-NULL */
286 int replstate; /* replication state if this is a slave */
287 int repldbfd; /* replication DB file descriptor */
288 long repldboff; /* replication DB file offset */
289 off_t repldbsize; /* replication DB file size */
290 multiState mstate; /* MULTI/EXEC state */
291 robj **blockingkeys; /* The key we waiting to terminate a blocking
292 * operation such as BLPOP. Otherwise NULL. */
293 int blockingkeysnum; /* Number of blocking keys */
294 time_t blockingto; /* Blocking operation timeout. If UNIX current time
295 * is >= blockingto then the operation timed out. */
296 } redisClient;
297
298 struct saveparam {
299 time_t seconds;
300 int changes;
301 };
302
303 /* Global server state structure */
304 struct redisServer {
305 int port;
306 int fd;
307 redisDb *db;
308 dict *sharingpool; /* Poll used for object sharing */
309 unsigned int sharingpoolsize;
310 long long dirty; /* changes to DB from the last save */
311 list *clients;
312 list *slaves, *monitors;
313 char neterr[ANET_ERR_LEN];
314 aeEventLoop *el;
315 int cronloops; /* number of times the cron function run */
316 list *objfreelist; /* A list of freed objects to avoid malloc() */
317 time_t lastsave; /* Unix time of last save succeeede */
318 size_t usedmemory; /* Used memory in megabytes */
319 /* Fields used only for stats */
320 time_t stat_starttime; /* server start time */
321 long long stat_numcommands; /* number of processed commands */
322 long long stat_numconnections; /* number of connections received */
323 /* Configuration */
324 int verbosity;
325 int glueoutputbuf;
326 int maxidletime;
327 int dbnum;
328 int daemonize;
329 int appendonly;
330 int appendfsync;
331 time_t lastfsync;
332 int appendfd;
333 int appendseldb;
334 char *pidfile;
335 pid_t bgsavechildpid;
336 pid_t bgrewritechildpid;
337 sds bgrewritebuf; /* buffer taken by parent during oppend only rewrite */
338 struct saveparam *saveparams;
339 int saveparamslen;
340 char *logfile;
341 char *bindaddr;
342 char *dbfilename;
343 char *appendfilename;
344 char *requirepass;
345 int shareobjects;
346 int rdbcompression;
347 /* Replication related */
348 int isslave;
349 char *masterauth;
350 char *masterhost;
351 int masterport;
352 redisClient *master; /* client that is master for this slave */
353 int replstate;
354 unsigned int maxclients;
355 unsigned long maxmemory;
356 unsigned int blockedclients;
357 /* Sort parameters - qsort_r() is only available under BSD so we
358 * have to take this state global, in order to pass it to sortCompare() */
359 int sort_desc;
360 int sort_alpha;
361 int sort_bypattern;
362 /* Virtual memory configuration */
363 int vm_enabled;
364 off_t vm_page_size;
365 off_t vm_pages;
366 long vm_max_memory;
367 /* Virtual memory state */
368 FILE *vm_fp;
369 int vm_fd;
370 off_t vm_next_page; /* Next probably empty page */
371 off_t vm_near_pages; /* Number of pages allocated sequentially */
372 };
373
374 typedef void redisCommandProc(redisClient *c);
375 struct redisCommand {
376 char *name;
377 redisCommandProc *proc;
378 int arity;
379 int flags;
380 };
381
382 struct redisFunctionSym {
383 char *name;
384 unsigned long pointer;
385 };
386
387 typedef struct _redisSortObject {
388 robj *obj;
389 union {
390 double score;
391 robj *cmpobj;
392 } u;
393 } redisSortObject;
394
395 typedef struct _redisSortOperation {
396 int type;
397 robj *pattern;
398 } redisSortOperation;
399
400 /* ZSETs use a specialized version of Skiplists */
401
402 typedef struct zskiplistNode {
403 struct zskiplistNode **forward;
404 struct zskiplistNode *backward;
405 double score;
406 robj *obj;
407 } zskiplistNode;
408
409 typedef struct zskiplist {
410 struct zskiplistNode *header, *tail;
411 unsigned long length;
412 int level;
413 } zskiplist;
414
415 typedef struct zset {
416 dict *dict;
417 zskiplist *zsl;
418 } zset;
419
420 /* Our shared "common" objects */
421
422 struct sharedObjectsStruct {
423 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
424 *colon, *nullbulk, *nullmultibulk, *queued,
425 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
426 *outofrangeerr, *plus,
427 *select0, *select1, *select2, *select3, *select4,
428 *select5, *select6, *select7, *select8, *select9;
429 } shared;
430
431 /* Global vars that are actally used as constants. The following double
432 * values are used for double on-disk serialization, and are initialized
433 * at runtime to avoid strange compiler optimizations. */
434
435 static double R_Zero, R_PosInf, R_NegInf, R_Nan;
436
437 /*================================ Prototypes =============================== */
438
439 static void freeStringObject(robj *o);
440 static void freeListObject(robj *o);
441 static void freeSetObject(robj *o);
442 static void decrRefCount(void *o);
443 static robj *createObject(int type, void *ptr);
444 static void freeClient(redisClient *c);
445 static int rdbLoad(char *filename);
446 static void addReply(redisClient *c, robj *obj);
447 static void addReplySds(redisClient *c, sds s);
448 static void incrRefCount(robj *o);
449 static int rdbSaveBackground(char *filename);
450 static robj *createStringObject(char *ptr, size_t len);
451 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
452 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc);
453 static int syncWithMaster(void);
454 static robj *tryObjectSharing(robj *o);
455 static int tryObjectEncoding(robj *o);
456 static robj *getDecodedObject(robj *o);
457 static int removeExpire(redisDb *db, robj *key);
458 static int expireIfNeeded(redisDb *db, robj *key);
459 static int deleteIfVolatile(redisDb *db, robj *key);
460 static int deleteKey(redisDb *db, robj *key);
461 static time_t getExpire(redisDb *db, robj *key);
462 static int setExpire(redisDb *db, robj *key, time_t when);
463 static void updateSlavesWaitingBgsave(int bgsaveerr);
464 static void freeMemoryIfNeeded(void);
465 static int processCommand(redisClient *c);
466 static void setupSigSegvAction(void);
467 static void rdbRemoveTempFile(pid_t childpid);
468 static void aofRemoveTempFile(pid_t childpid);
469 static size_t stringObjectLen(robj *o);
470 static void processInputBuffer(redisClient *c);
471 static zskiplist *zslCreate(void);
472 static void zslFree(zskiplist *zsl);
473 static void zslInsert(zskiplist *zsl, double score, robj *obj);
474 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask);
475 static void initClientMultiState(redisClient *c);
476 static void freeClientMultiState(redisClient *c);
477 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd);
478 static void unblockClient(redisClient *c);
479 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
480 static void vmInit(void);
481
482 static void authCommand(redisClient *c);
483 static void pingCommand(redisClient *c);
484 static void echoCommand(redisClient *c);
485 static void setCommand(redisClient *c);
486 static void setnxCommand(redisClient *c);
487 static void getCommand(redisClient *c);
488 static void delCommand(redisClient *c);
489 static void existsCommand(redisClient *c);
490 static void incrCommand(redisClient *c);
491 static void decrCommand(redisClient *c);
492 static void incrbyCommand(redisClient *c);
493 static void decrbyCommand(redisClient *c);
494 static void selectCommand(redisClient *c);
495 static void randomkeyCommand(redisClient *c);
496 static void keysCommand(redisClient *c);
497 static void dbsizeCommand(redisClient *c);
498 static void lastsaveCommand(redisClient *c);
499 static void saveCommand(redisClient *c);
500 static void bgsaveCommand(redisClient *c);
501 static void bgrewriteaofCommand(redisClient *c);
502 static void shutdownCommand(redisClient *c);
503 static void moveCommand(redisClient *c);
504 static void renameCommand(redisClient *c);
505 static void renamenxCommand(redisClient *c);
506 static void lpushCommand(redisClient *c);
507 static void rpushCommand(redisClient *c);
508 static void lpopCommand(redisClient *c);
509 static void rpopCommand(redisClient *c);
510 static void llenCommand(redisClient *c);
511 static void lindexCommand(redisClient *c);
512 static void lrangeCommand(redisClient *c);
513 static void ltrimCommand(redisClient *c);
514 static void typeCommand(redisClient *c);
515 static void lsetCommand(redisClient *c);
516 static void saddCommand(redisClient *c);
517 static void sremCommand(redisClient *c);
518 static void smoveCommand(redisClient *c);
519 static void sismemberCommand(redisClient *c);
520 static void scardCommand(redisClient *c);
521 static void spopCommand(redisClient *c);
522 static void srandmemberCommand(redisClient *c);
523 static void sinterCommand(redisClient *c);
524 static void sinterstoreCommand(redisClient *c);
525 static void sunionCommand(redisClient *c);
526 static void sunionstoreCommand(redisClient *c);
527 static void sdiffCommand(redisClient *c);
528 static void sdiffstoreCommand(redisClient *c);
529 static void syncCommand(redisClient *c);
530 static void flushdbCommand(redisClient *c);
531 static void flushallCommand(redisClient *c);
532 static void sortCommand(redisClient *c);
533 static void lremCommand(redisClient *c);
534 static void rpoplpushcommand(redisClient *c);
535 static void infoCommand(redisClient *c);
536 static void mgetCommand(redisClient *c);
537 static void monitorCommand(redisClient *c);
538 static void expireCommand(redisClient *c);
539 static void expireatCommand(redisClient *c);
540 static void getsetCommand(redisClient *c);
541 static void ttlCommand(redisClient *c);
542 static void slaveofCommand(redisClient *c);
543 static void debugCommand(redisClient *c);
544 static void msetCommand(redisClient *c);
545 static void msetnxCommand(redisClient *c);
546 static void zaddCommand(redisClient *c);
547 static void zincrbyCommand(redisClient *c);
548 static void zrangeCommand(redisClient *c);
549 static void zrangebyscoreCommand(redisClient *c);
550 static void zrevrangeCommand(redisClient *c);
551 static void zcardCommand(redisClient *c);
552 static void zremCommand(redisClient *c);
553 static void zscoreCommand(redisClient *c);
554 static void zremrangebyscoreCommand(redisClient *c);
555 static void multiCommand(redisClient *c);
556 static void execCommand(redisClient *c);
557 static void blpopCommand(redisClient *c);
558 static void brpopCommand(redisClient *c);
559
560 /*================================= Globals ================================= */
561
562 /* Global vars */
563 static struct redisServer server; /* server global state */
564 static struct redisCommand cmdTable[] = {
565 {"get",getCommand,2,REDIS_CMD_INLINE},
566 {"set",setCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
567 {"setnx",setnxCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
568 {"del",delCommand,-2,REDIS_CMD_INLINE},
569 {"exists",existsCommand,2,REDIS_CMD_INLINE},
570 {"incr",incrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
571 {"decr",decrCommand,2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
572 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
573 {"rpush",rpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
574 {"lpush",lpushCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
575 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
576 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
577 {"brpop",brpopCommand,-3,REDIS_CMD_INLINE},
578 {"blpop",blpopCommand,-3,REDIS_CMD_INLINE},
579 {"llen",llenCommand,2,REDIS_CMD_INLINE},
580 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
581 {"lset",lsetCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
582 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
583 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
584 {"lrem",lremCommand,4,REDIS_CMD_BULK},
585 {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
586 {"sadd",saddCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
587 {"srem",sremCommand,3,REDIS_CMD_BULK},
588 {"smove",smoveCommand,4,REDIS_CMD_BULK},
589 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
590 {"scard",scardCommand,2,REDIS_CMD_INLINE},
591 {"spop",spopCommand,2,REDIS_CMD_INLINE},
592 {"srandmember",srandmemberCommand,2,REDIS_CMD_INLINE},
593 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
594 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
595 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
596 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
597 {"sdiff",sdiffCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
598 {"sdiffstore",sdiffstoreCommand,-3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
599 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
600 {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
601 {"zincrby",zincrbyCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
602 {"zrem",zremCommand,3,REDIS_CMD_BULK},
603 {"zremrangebyscore",zremrangebyscoreCommand,4,REDIS_CMD_INLINE},
604 {"zrange",zrangeCommand,-4,REDIS_CMD_INLINE},
605 {"zrangebyscore",zrangebyscoreCommand,-4,REDIS_CMD_INLINE},
606 {"zrevrange",zrevrangeCommand,-4,REDIS_CMD_INLINE},
607 {"zcard",zcardCommand,2,REDIS_CMD_INLINE},
608 {"zscore",zscoreCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
609 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
610 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
611 {"getset",getsetCommand,3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
612 {"mset",msetCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
613 {"msetnx",msetnxCommand,-3,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},
614 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
615 {"select",selectCommand,2,REDIS_CMD_INLINE},
616 {"move",moveCommand,3,REDIS_CMD_INLINE},
617 {"rename",renameCommand,3,REDIS_CMD_INLINE},
618 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
619 {"expire",expireCommand,3,REDIS_CMD_INLINE},
620 {"expireat",expireatCommand,3,REDIS_CMD_INLINE},
621 {"keys",keysCommand,2,REDIS_CMD_INLINE},
622 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
623 {"auth",authCommand,2,REDIS_CMD_INLINE},
624 {"ping",pingCommand,1,REDIS_CMD_INLINE},
625 {"echo",echoCommand,2,REDIS_CMD_BULK},
626 {"save",saveCommand,1,REDIS_CMD_INLINE},
627 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
628 {"bgrewriteaof",bgrewriteaofCommand,1,REDIS_CMD_INLINE},
629 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
630 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
631 {"type",typeCommand,2,REDIS_CMD_INLINE},
632 {"multi",multiCommand,1,REDIS_CMD_INLINE},
633 {"exec",execCommand,1,REDIS_CMD_INLINE},
634 {"sync",syncCommand,1,REDIS_CMD_INLINE},
635 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
636 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
637 {"sort",sortCommand,-2,REDIS_CMD_INLINE|REDIS_CMD_DENYOOM},
638 {"info",infoCommand,1,REDIS_CMD_INLINE},
639 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
640 {"ttl",ttlCommand,2,REDIS_CMD_INLINE},
641 {"slaveof",slaveofCommand,3,REDIS_CMD_INLINE},
642 {"debug",debugCommand,-2,REDIS_CMD_INLINE},
643 {NULL,NULL,0,0}
644 };
645
646 /*============================ Utility functions ============================ */
647
648 /* Glob-style pattern matching. */
649 int stringmatchlen(const char *pattern, int patternLen,
650 const char *string, int stringLen, int nocase)
651 {
652 while(patternLen) {
653 switch(pattern[0]) {
654 case '*':
655 while (pattern[1] == '*') {
656 pattern++;
657 patternLen--;
658 }
659 if (patternLen == 1)
660 return 1; /* match */
661 while(stringLen) {
662 if (stringmatchlen(pattern+1, patternLen-1,
663 string, stringLen, nocase))
664 return 1; /* match */
665 string++;
666 stringLen--;
667 }
668 return 0; /* no match */
669 break;
670 case '?':
671 if (stringLen == 0)
672 return 0; /* no match */
673 string++;
674 stringLen--;
675 break;
676 case '[':
677 {
678 int not, match;
679
680 pattern++;
681 patternLen--;
682 not = pattern[0] == '^';
683 if (not) {
684 pattern++;
685 patternLen--;
686 }
687 match = 0;
688 while(1) {
689 if (pattern[0] == '\\') {
690 pattern++;
691 patternLen--;
692 if (pattern[0] == string[0])
693 match = 1;
694 } else if (pattern[0] == ']') {
695 break;
696 } else if (patternLen == 0) {
697 pattern--;
698 patternLen++;
699 break;
700 } else if (pattern[1] == '-' && patternLen >= 3) {
701 int start = pattern[0];
702 int end = pattern[2];
703 int c = string[0];
704 if (start > end) {
705 int t = start;
706 start = end;
707 end = t;
708 }
709 if (nocase) {
710 start = tolower(start);
711 end = tolower(end);
712 c = tolower(c);
713 }
714 pattern += 2;
715 patternLen -= 2;
716 if (c >= start && c <= end)
717 match = 1;
718 } else {
719 if (!nocase) {
720 if (pattern[0] == string[0])
721 match = 1;
722 } else {
723 if (tolower((int)pattern[0]) == tolower((int)string[0]))
724 match = 1;
725 }
726 }
727 pattern++;
728 patternLen--;
729 }
730 if (not)
731 match = !match;
732 if (!match)
733 return 0; /* no match */
734 string++;
735 stringLen--;
736 break;
737 }
738 case '\\':
739 if (patternLen >= 2) {
740 pattern++;
741 patternLen--;
742 }
743 /* fall through */
744 default:
745 if (!nocase) {
746 if (pattern[0] != string[0])
747 return 0; /* no match */
748 } else {
749 if (tolower((int)pattern[0]) != tolower((int)string[0]))
750 return 0; /* no match */
751 }
752 string++;
753 stringLen--;
754 break;
755 }
756 pattern++;
757 patternLen--;
758 if (stringLen == 0) {
759 while(*pattern == '*') {
760 pattern++;
761 patternLen--;
762 }
763 break;
764 }
765 }
766 if (patternLen == 0 && stringLen == 0)
767 return 1;
768 return 0;
769 }
770
771 static void redisLog(int level, const char *fmt, ...) {
772 va_list ap;
773 FILE *fp;
774
775 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
776 if (!fp) return;
777
778 va_start(ap, fmt);
779 if (level >= server.verbosity) {
780 char *c = ".-*";
781 char buf[64];
782 time_t now;
783
784 now = time(NULL);
785 strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
786 fprintf(fp,"%s %c ",buf,c[level]);
787 vfprintf(fp, fmt, ap);
788 fprintf(fp,"\n");
789 fflush(fp);
790 }
791 va_end(ap);
792
793 if (server.logfile) fclose(fp);
794 }
795
796 /*====================== Hash table type implementation ==================== */
797
798 /* This is an hash table type that uses the SDS dynamic strings libary as
799 * keys and radis objects as values (objects can hold SDS strings,
800 * lists, sets). */
801
802 static void dictVanillaFree(void *privdata, void *val)
803 {
804 DICT_NOTUSED(privdata);
805 zfree(val);
806 }
807
808 static void dictListDestructor(void *privdata, void *val)
809 {
810 DICT_NOTUSED(privdata);
811 listRelease((list*)val);
812 }
813
814 static int sdsDictKeyCompare(void *privdata, const void *key1,
815 const void *key2)
816 {
817 int l1,l2;
818 DICT_NOTUSED(privdata);
819
820 l1 = sdslen((sds)key1);
821 l2 = sdslen((sds)key2);
822 if (l1 != l2) return 0;
823 return memcmp(key1, key2, l1) == 0;
824 }
825
826 static void dictRedisObjectDestructor(void *privdata, void *val)
827 {
828 DICT_NOTUSED(privdata);
829
830 decrRefCount(val);
831 }
832
833 static int dictObjKeyCompare(void *privdata, const void *key1,
834 const void *key2)
835 {
836 const robj *o1 = key1, *o2 = key2;
837 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
838 }
839
840 static unsigned int dictObjHash(const void *key) {
841 const robj *o = key;
842 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
843 }
844
845 static int dictEncObjKeyCompare(void *privdata, const void *key1,
846 const void *key2)
847 {
848 robj *o1 = (robj*) key1, *o2 = (robj*) key2;
849 int cmp;
850
851 o1 = getDecodedObject(o1);
852 o2 = getDecodedObject(o2);
853 cmp = sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
854 decrRefCount(o1);
855 decrRefCount(o2);
856 return cmp;
857 }
858
859 static unsigned int dictEncObjHash(const void *key) {
860 robj *o = (robj*) key;
861
862 o = getDecodedObject(o);
863 unsigned int hash = dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
864 decrRefCount(o);
865 return hash;
866 }
867
868 static dictType setDictType = {
869 dictEncObjHash, /* hash function */
870 NULL, /* key dup */
871 NULL, /* val dup */
872 dictEncObjKeyCompare, /* key compare */
873 dictRedisObjectDestructor, /* key destructor */
874 NULL /* val destructor */
875 };
876
877 static dictType zsetDictType = {
878 dictEncObjHash, /* hash function */
879 NULL, /* key dup */
880 NULL, /* val dup */
881 dictEncObjKeyCompare, /* key compare */
882 dictRedisObjectDestructor, /* key destructor */
883 dictVanillaFree /* val destructor of malloc(sizeof(double)) */
884 };
885
886 static dictType hashDictType = {
887 dictObjHash, /* hash function */
888 NULL, /* key dup */
889 NULL, /* val dup */
890 dictObjKeyCompare, /* key compare */
891 dictRedisObjectDestructor, /* key destructor */
892 dictRedisObjectDestructor /* val destructor */
893 };
894
895 /* Keylist hash table type has unencoded redis objects as keys and
896 * lists as values. It's used for blocking operations (BLPOP) */
897 static dictType keylistDictType = {
898 dictObjHash, /* hash function */
899 NULL, /* key dup */
900 NULL, /* val dup */
901 dictObjKeyCompare, /* key compare */
902 dictRedisObjectDestructor, /* key destructor */
903 dictListDestructor /* val destructor */
904 };
905
906 /* ========================= Random utility functions ======================= */
907
908 /* Redis generally does not try to recover from out of memory conditions
909 * when allocating objects or strings, it is not clear if it will be possible
910 * to report this condition to the client since the networking layer itself
911 * is based on heap allocation for send buffers, so we simply abort.
912 * At least the code will be simpler to read... */
913 static void oom(const char *msg) {
914 redisLog(REDIS_WARNING, "%s: Out of memory\n",msg);
915 sleep(1);
916 abort();
917 }
918
919 /* ====================== Redis server networking stuff ===================== */
920 static void closeTimedoutClients(void) {
921 redisClient *c;
922 listNode *ln;
923 time_t now = time(NULL);
924
925 listRewind(server.clients);
926 while ((ln = listYield(server.clients)) != NULL) {
927 c = listNodeValue(ln);
928 if (server.maxidletime &&
929 !(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
930 !(c->flags & REDIS_MASTER) && /* no timeout for masters */
931 (now - c->lastinteraction > server.maxidletime))
932 {
933 redisLog(REDIS_DEBUG,"Closing idle client");
934 freeClient(c);
935 } else if (c->flags & REDIS_BLOCKED) {
936 if (c->blockingto != 0 && c->blockingto < now) {
937 addReply(c,shared.nullmultibulk);
938 unblockClient(c);
939 }
940 }
941 }
942 }
943
944 static int htNeedsResize(dict *dict) {
945 long long size, used;
946
947 size = dictSlots(dict);
948 used = dictSize(dict);
949 return (size && used && size > DICT_HT_INITIAL_SIZE &&
950 (used*100/size < REDIS_HT_MINFILL));
951 }
952
953 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
954 * we resize the hash table to save memory */
955 static void tryResizeHashTables(void) {
956 int j;
957
958 for (j = 0; j < server.dbnum; j++) {
959 if (htNeedsResize(server.db[j].dict)) {
960 redisLog(REDIS_DEBUG,"The hash table %d is too sparse, resize it...",j);
961 dictResize(server.db[j].dict);
962 redisLog(REDIS_DEBUG,"Hash table %d resized.",j);
963 }
964 if (htNeedsResize(server.db[j].expires))
965 dictResize(server.db[j].expires);
966 }
967 }
968
969 /* A background saving child (BGSAVE) terminated its work. Handle this. */
970 void backgroundSaveDoneHandler(int statloc) {
971 int exitcode = WEXITSTATUS(statloc);
972 int bysignal = WIFSIGNALED(statloc);
973
974 if (!bysignal && exitcode == 0) {
975 redisLog(REDIS_NOTICE,
976 "Background saving terminated with success");
977 server.dirty = 0;
978 server.lastsave = time(NULL);
979 } else if (!bysignal && exitcode != 0) {
980 redisLog(REDIS_WARNING, "Background saving error");
981 } else {
982 redisLog(REDIS_WARNING,
983 "Background saving terminated by signal");
984 rdbRemoveTempFile(server.bgsavechildpid);
985 }
986 server.bgsavechildpid = -1;
987 /* Possibly there are slaves waiting for a BGSAVE in order to be served
988 * (the first stage of SYNC is a bulk transfer of dump.rdb) */
989 updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
990 }
991
992 /* A background append only file rewriting (BGREWRITEAOF) terminated its work.
993 * Handle this. */
994 void backgroundRewriteDoneHandler(int statloc) {
995 int exitcode = WEXITSTATUS(statloc);
996 int bysignal = WIFSIGNALED(statloc);
997
998 if (!bysignal && exitcode == 0) {
999 int fd;
1000 char tmpfile[256];
1001
1002 redisLog(REDIS_NOTICE,
1003 "Background append only file rewriting terminated with success");
1004 /* Now it's time to flush the differences accumulated by the parent */
1005 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) server.bgrewritechildpid);
1006 fd = open(tmpfile,O_WRONLY|O_APPEND);
1007 if (fd == -1) {
1008 redisLog(REDIS_WARNING, "Not able to open the temp append only file produced by the child: %s", strerror(errno));
1009 goto cleanup;
1010 }
1011 /* Flush our data... */
1012 if (write(fd,server.bgrewritebuf,sdslen(server.bgrewritebuf)) !=
1013 (signed) sdslen(server.bgrewritebuf)) {
1014 redisLog(REDIS_WARNING, "Error or short write trying to flush the parent diff of the append log file in the child temp file: %s", strerror(errno));
1015 close(fd);
1016 goto cleanup;
1017 }
1018 redisLog(REDIS_NOTICE,"Parent diff flushed into the new append log file with success (%lu bytes)",sdslen(server.bgrewritebuf));
1019 /* Now our work is to rename the temp file into the stable file. And
1020 * switch the file descriptor used by the server for append only. */
1021 if (rename(tmpfile,server.appendfilename) == -1) {
1022 redisLog(REDIS_WARNING,"Can't rename the temp append only file into the stable one: %s", strerror(errno));
1023 close(fd);
1024 goto cleanup;
1025 }
1026 /* Mission completed... almost */
1027 redisLog(REDIS_NOTICE,"Append only file successfully rewritten.");
1028 if (server.appendfd != -1) {
1029 /* If append only is actually enabled... */
1030 close(server.appendfd);
1031 server.appendfd = fd;
1032 fsync(fd);
1033 server.appendseldb = -1; /* Make sure it will issue SELECT */
1034 redisLog(REDIS_NOTICE,"The new append only file was selected for future appends.");
1035 } else {
1036 /* If append only is disabled we just generate a dump in this
1037 * format. Why not? */
1038 close(fd);
1039 }
1040 } else if (!bysignal && exitcode != 0) {
1041 redisLog(REDIS_WARNING, "Background append only file rewriting error");
1042 } else {
1043 redisLog(REDIS_WARNING,
1044 "Background append only file rewriting terminated by signal");
1045 }
1046 cleanup:
1047 sdsfree(server.bgrewritebuf);
1048 server.bgrewritebuf = sdsempty();
1049 aofRemoveTempFile(server.bgrewritechildpid);
1050 server.bgrewritechildpid = -1;
1051 }
1052
1053 static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
1054 int j, loops = server.cronloops++;
1055 REDIS_NOTUSED(eventLoop);
1056 REDIS_NOTUSED(id);
1057 REDIS_NOTUSED(clientData);
1058
1059 /* Update the global state with the amount of used memory */
1060 server.usedmemory = zmalloc_used_memory();
1061
1062 /* Show some info about non-empty databases */
1063 for (j = 0; j < server.dbnum; j++) {
1064 long long size, used, vkeys;
1065
1066 size = dictSlots(server.db[j].dict);
1067 used = dictSize(server.db[j].dict);
1068 vkeys = dictSize(server.db[j].expires);
1069 if (!(loops % 5) && (used || vkeys)) {
1070 redisLog(REDIS_DEBUG,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
1071 /* dictPrintStats(server.dict); */
1072 }
1073 }
1074
1075 /* We don't want to resize the hash tables while a bacground saving
1076 * is in progress: the saving child is created using fork() that is
1077 * implemented with a copy-on-write semantic in most modern systems, so
1078 * if we resize the HT while there is the saving child at work actually
1079 * a lot of memory movements in the parent will cause a lot of pages
1080 * copied. */
1081 if (server.bgsavechildpid == -1) tryResizeHashTables();
1082
1083 /* Show information about connected clients */
1084 if (!(loops % 5)) {
1085 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use, %d shared objects",
1086 listLength(server.clients)-listLength(server.slaves),
1087 listLength(server.slaves),
1088 server.usedmemory,
1089 dictSize(server.sharingpool));
1090 }
1091
1092 /* Close connections of timedout clients */
1093 if ((server.maxidletime && !(loops % 10)) || server.blockedclients)
1094 closeTimedoutClients();
1095
1096 /* Check if a background saving or AOF rewrite in progress terminated */
1097 if (server.bgsavechildpid != -1 || server.bgrewritechildpid != -1) {
1098 int statloc;
1099 pid_t pid;
1100
1101 if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
1102 if (pid == server.bgsavechildpid) {
1103 backgroundSaveDoneHandler(statloc);
1104 } else {
1105 backgroundRewriteDoneHandler(statloc);
1106 }
1107 }
1108 } else {
1109 /* If there is not a background saving in progress check if
1110 * we have to save now */
1111 time_t now = time(NULL);
1112 for (j = 0; j < server.saveparamslen; j++) {
1113 struct saveparam *sp = server.saveparams+j;
1114
1115 if (server.dirty >= sp->changes &&
1116 now-server.lastsave > sp->seconds) {
1117 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
1118 sp->changes, sp->seconds);
1119 rdbSaveBackground(server.dbfilename);
1120 break;
1121 }
1122 }
1123 }
1124
1125 /* Try to expire a few timed out keys. The algorithm used is adaptive and
1126 * will use few CPU cycles if there are few expiring keys, otherwise
1127 * it will get more aggressive to avoid that too much memory is used by
1128 * keys that can be removed from the keyspace. */
1129 for (j = 0; j < server.dbnum; j++) {
1130 int expired;
1131 redisDb *db = server.db+j;
1132
1133 /* Continue to expire if at the end of the cycle more than 25%
1134 * of the keys were expired. */
1135 do {
1136 int num = dictSize(db->expires);
1137 time_t now = time(NULL);
1138
1139 expired = 0;
1140 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
1141 num = REDIS_EXPIRELOOKUPS_PER_CRON;
1142 while (num--) {
1143 dictEntry *de;
1144 time_t t;
1145
1146 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
1147 t = (time_t) dictGetEntryVal(de);
1148 if (now > t) {
1149 deleteKey(db,dictGetEntryKey(de));
1150 expired++;
1151 }
1152 }
1153 } while (expired > REDIS_EXPIRELOOKUPS_PER_CRON/4);
1154 }
1155
1156 /* Check if we should connect to a MASTER */
1157 if (server.replstate == REDIS_REPL_CONNECT) {
1158 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
1159 if (syncWithMaster() == REDIS_OK) {
1160 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
1161 }
1162 }
1163 return 1000;
1164 }
1165
1166 static void createSharedObjects(void) {
1167 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
1168 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
1169 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
1170 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
1171 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
1172 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
1173 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
1174 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
1175 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
1176 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
1177 shared.queued = createObject(REDIS_STRING,sdsnew("+QUEUED\r\n"));
1178 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
1179 "-ERR Operation against a key holding the wrong kind of value\r\n"));
1180 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
1181 "-ERR no such key\r\n"));
1182 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
1183 "-ERR syntax error\r\n"));
1184 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
1185 "-ERR source and destination objects are the same\r\n"));
1186 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
1187 "-ERR index out of range\r\n"));
1188 shared.space = createObject(REDIS_STRING,sdsnew(" "));
1189 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
1190 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
1191 shared.select0 = createStringObject("select 0\r\n",10);
1192 shared.select1 = createStringObject("select 1\r\n",10);
1193 shared.select2 = createStringObject("select 2\r\n",10);
1194 shared.select3 = createStringObject("select 3\r\n",10);
1195 shared.select4 = createStringObject("select 4\r\n",10);
1196 shared.select5 = createStringObject("select 5\r\n",10);
1197 shared.select6 = createStringObject("select 6\r\n",10);
1198 shared.select7 = createStringObject("select 7\r\n",10);
1199 shared.select8 = createStringObject("select 8\r\n",10);
1200 shared.select9 = createStringObject("select 9\r\n",10);
1201 }
1202
1203 static void appendServerSaveParams(time_t seconds, int changes) {
1204 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
1205 server.saveparams[server.saveparamslen].seconds = seconds;
1206 server.saveparams[server.saveparamslen].changes = changes;
1207 server.saveparamslen++;
1208 }
1209
1210 static void resetServerSaveParams() {
1211 zfree(server.saveparams);
1212 server.saveparams = NULL;
1213 server.saveparamslen = 0;
1214 }
1215
1216 static void initServerConfig() {
1217 server.dbnum = REDIS_DEFAULT_DBNUM;
1218 server.port = REDIS_SERVERPORT;
1219 server.verbosity = REDIS_DEBUG;
1220 server.maxidletime = REDIS_MAXIDLETIME;
1221 server.saveparams = NULL;
1222 server.logfile = NULL; /* NULL = log on standard output */
1223 server.bindaddr = NULL;
1224 server.glueoutputbuf = 1;
1225 server.daemonize = 0;
1226 server.appendonly = 0;
1227 server.appendfsync = APPENDFSYNC_ALWAYS;
1228 server.lastfsync = time(NULL);
1229 server.appendfd = -1;
1230 server.appendseldb = -1; /* Make sure the first time will not match */
1231 server.pidfile = "/var/run/redis.pid";
1232 server.dbfilename = "dump.rdb";
1233 server.appendfilename = "appendonly.aof";
1234 server.requirepass = NULL;
1235 server.shareobjects = 0;
1236 server.rdbcompression = 1;
1237 server.sharingpoolsize = 1024;
1238 server.maxclients = 0;
1239 server.blockedclients = 0;
1240 server.maxmemory = 0;
1241 server.vm_enabled = 0;
1242 server.vm_page_size = 256; /* 256 bytes per page */
1243 server.vm_pages = 1024*1024*100; /* 104 millions of pages */
1244 server.vm_max_memory = 1024LL*1024*1024*1; /* 1 GB of RAM */
1245
1246 resetServerSaveParams();
1247
1248 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
1249 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
1250 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
1251 /* Replication related */
1252 server.isslave = 0;
1253 server.masterauth = NULL;
1254 server.masterhost = NULL;
1255 server.masterport = 6379;
1256 server.master = NULL;
1257 server.replstate = REDIS_REPL_NONE;
1258
1259 /* Double constants initialization */
1260 R_Zero = 0.0;
1261 R_PosInf = 1.0/R_Zero;
1262 R_NegInf = -1.0/R_Zero;
1263 R_Nan = R_Zero/R_Zero;
1264 }
1265
1266 static void initServer() {
1267 int j;
1268
1269 signal(SIGHUP, SIG_IGN);
1270 signal(SIGPIPE, SIG_IGN);
1271 setupSigSegvAction();
1272
1273 server.clients = listCreate();
1274 server.slaves = listCreate();
1275 server.monitors = listCreate();
1276 server.objfreelist = listCreate();
1277 createSharedObjects();
1278 server.el = aeCreateEventLoop();
1279 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
1280 server.sharingpool = dictCreate(&setDictType,NULL);
1281 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
1282 if (server.fd == -1) {
1283 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
1284 exit(1);
1285 }
1286 for (j = 0; j < server.dbnum; j++) {
1287 server.db[j].dict = dictCreate(&hashDictType,NULL);
1288 server.db[j].expires = dictCreate(&setDictType,NULL);
1289 server.db[j].blockingkeys = dictCreate(&keylistDictType,NULL);
1290 server.db[j].id = j;
1291 }
1292 server.cronloops = 0;
1293 server.bgsavechildpid = -1;
1294 server.bgrewritechildpid = -1;
1295 server.bgrewritebuf = sdsempty();
1296 server.lastsave = time(NULL);
1297 server.dirty = 0;
1298 server.usedmemory = 0;
1299 server.stat_numcommands = 0;
1300 server.stat_numconnections = 0;
1301 server.stat_starttime = time(NULL);
1302 aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL);
1303
1304 if (server.appendonly) {
1305 server.appendfd = open(server.appendfilename,O_WRONLY|O_APPEND|O_CREAT,0644);
1306 if (server.appendfd == -1) {
1307 redisLog(REDIS_WARNING, "Can't open the append-only file: %s",
1308 strerror(errno));
1309 exit(1);
1310 }
1311 }
1312
1313 if (server.vm_enabled) vmInit();
1314 }
1315
1316 /* Empty the whole database */
1317 static long long emptyDb() {
1318 int j;
1319 long long removed = 0;
1320
1321 for (j = 0; j < server.dbnum; j++) {
1322 removed += dictSize(server.db[j].dict);
1323 dictEmpty(server.db[j].dict);
1324 dictEmpty(server.db[j].expires);
1325 }
1326 return removed;
1327 }
1328
1329 static int yesnotoi(char *s) {
1330 if (!strcasecmp(s,"yes")) return 1;
1331 else if (!strcasecmp(s,"no")) return 0;
1332 else return -1;
1333 }
1334
1335 /* I agree, this is a very rudimental way to load a configuration...
1336 will improve later if the config gets more complex */
1337 static void loadServerConfig(char *filename) {
1338 FILE *fp;
1339 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
1340 int linenum = 0;
1341 sds line = NULL;
1342
1343 if (filename[0] == '-' && filename[1] == '\0')
1344 fp = stdin;
1345 else {
1346 if ((fp = fopen(filename,"r")) == NULL) {
1347 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
1348 exit(1);
1349 }
1350 }
1351
1352 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
1353 sds *argv;
1354 int argc, j;
1355
1356 linenum++;
1357 line = sdsnew(buf);
1358 line = sdstrim(line," \t\r\n");
1359
1360 /* Skip comments and blank lines*/
1361 if (line[0] == '#' || line[0] == '\0') {
1362 sdsfree(line);
1363 continue;
1364 }
1365
1366 /* Split into arguments */
1367 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
1368 sdstolower(argv[0]);
1369
1370 /* Execute config directives */
1371 if (!strcasecmp(argv[0],"timeout") && argc == 2) {
1372 server.maxidletime = atoi(argv[1]);
1373 if (server.maxidletime < 0) {
1374 err = "Invalid timeout value"; goto loaderr;
1375 }
1376 } else if (!strcasecmp(argv[0],"port") && argc == 2) {
1377 server.port = atoi(argv[1]);
1378 if (server.port < 1 || server.port > 65535) {
1379 err = "Invalid port"; goto loaderr;
1380 }
1381 } else if (!strcasecmp(argv[0],"bind") && argc == 2) {
1382 server.bindaddr = zstrdup(argv[1]);
1383 } else if (!strcasecmp(argv[0],"save") && argc == 3) {
1384 int seconds = atoi(argv[1]);
1385 int changes = atoi(argv[2]);
1386 if (seconds < 1 || changes < 0) {
1387 err = "Invalid save parameters"; goto loaderr;
1388 }
1389 appendServerSaveParams(seconds,changes);
1390 } else if (!strcasecmp(argv[0],"dir") && argc == 2) {
1391 if (chdir(argv[1]) == -1) {
1392 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
1393 argv[1], strerror(errno));
1394 exit(1);
1395 }
1396 } else if (!strcasecmp(argv[0],"loglevel") && argc == 2) {
1397 if (!strcasecmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
1398 else if (!strcasecmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
1399 else if (!strcasecmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
1400 else {
1401 err = "Invalid log level. Must be one of debug, notice, warning";
1402 goto loaderr;
1403 }
1404 } else if (!strcasecmp(argv[0],"logfile") && argc == 2) {
1405 FILE *logfp;
1406
1407 server.logfile = zstrdup(argv[1]);
1408 if (!strcasecmp(server.logfile,"stdout")) {
1409 zfree(server.logfile);
1410 server.logfile = NULL;
1411 }
1412 if (server.logfile) {
1413 /* Test if we are able to open the file. The server will not
1414 * be able to abort just for this problem later... */
1415 logfp = fopen(server.logfile,"a");
1416 if (logfp == NULL) {
1417 err = sdscatprintf(sdsempty(),
1418 "Can't open the log file: %s", strerror(errno));
1419 goto loaderr;
1420 }
1421 fclose(logfp);
1422 }
1423 } else if (!strcasecmp(argv[0],"databases") && argc == 2) {
1424 server.dbnum = atoi(argv[1]);
1425 if (server.dbnum < 1) {
1426 err = "Invalid number of databases"; goto loaderr;
1427 }
1428 } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) {
1429 server.maxclients = atoi(argv[1]);
1430 } else if (!strcasecmp(argv[0],"maxmemory") && argc == 2) {
1431 server.maxmemory = strtoll(argv[1], NULL, 10);
1432 } else if (!strcasecmp(argv[0],"slaveof") && argc == 3) {
1433 server.masterhost = sdsnew(argv[1]);
1434 server.masterport = atoi(argv[2]);
1435 server.replstate = REDIS_REPL_CONNECT;
1436 } else if (!strcasecmp(argv[0],"masterauth") && argc == 2) {
1437 server.masterauth = zstrdup(argv[1]);
1438 } else if (!strcasecmp(argv[0],"glueoutputbuf") && argc == 2) {
1439 if ((server.glueoutputbuf = yesnotoi(argv[1])) == -1) {
1440 err = "argument must be 'yes' or 'no'"; goto loaderr;
1441 }
1442 } else if (!strcasecmp(argv[0],"shareobjects") && argc == 2) {
1443 if ((server.shareobjects = yesnotoi(argv[1])) == -1) {
1444 err = "argument must be 'yes' or 'no'"; goto loaderr;
1445 }
1446 } else if (!strcasecmp(argv[0],"rdbcompression") && argc == 2) {
1447 if ((server.rdbcompression = yesnotoi(argv[1])) == -1) {
1448 err = "argument must be 'yes' or 'no'"; goto loaderr;
1449 }
1450 } else if (!strcasecmp(argv[0],"shareobjectspoolsize") && argc == 2) {
1451 server.sharingpoolsize = atoi(argv[1]);
1452 if (server.sharingpoolsize < 1) {
1453 err = "invalid object sharing pool size"; goto loaderr;
1454 }
1455 } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) {
1456 if ((server.daemonize = yesnotoi(argv[1])) == -1) {
1457 err = "argument must be 'yes' or 'no'"; goto loaderr;
1458 }
1459 } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) {
1460 if ((server.appendonly = yesnotoi(argv[1])) == -1) {
1461 err = "argument must be 'yes' or 'no'"; goto loaderr;
1462 }
1463 } else if (!strcasecmp(argv[0],"appendfsync") && argc == 2) {
1464 if (!strcasecmp(argv[1],"no")) {
1465 server.appendfsync = APPENDFSYNC_NO;
1466 } else if (!strcasecmp(argv[1],"always")) {
1467 server.appendfsync = APPENDFSYNC_ALWAYS;
1468 } else if (!strcasecmp(argv[1],"everysec")) {
1469 server.appendfsync = APPENDFSYNC_EVERYSEC;
1470 } else {
1471 err = "argument must be 'no', 'always' or 'everysec'";
1472 goto loaderr;
1473 }
1474 } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) {
1475 server.requirepass = zstrdup(argv[1]);
1476 } else if (!strcasecmp(argv[0],"pidfile") && argc == 2) {
1477 server.pidfile = zstrdup(argv[1]);
1478 } else if (!strcasecmp(argv[0],"dbfilename") && argc == 2) {
1479 server.dbfilename = zstrdup(argv[1]);
1480 } else if (!strcasecmp(argv[0],"vm-enabled") && argc == 2) {
1481 if ((server.vm_enabled = yesnotoi(argv[1])) == -1) {
1482 err = "argument must be 'yes' or 'no'"; goto loaderr;
1483 }
1484 } else {
1485 err = "Bad directive or wrong number of arguments"; goto loaderr;
1486 }
1487 for (j = 0; j < argc; j++)
1488 sdsfree(argv[j]);
1489 zfree(argv);
1490 sdsfree(line);
1491 }
1492 if (fp != stdin) fclose(fp);
1493 return;
1494
1495 loaderr:
1496 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1497 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1498 fprintf(stderr, ">>> '%s'\n", line);
1499 fprintf(stderr, "%s\n", err);
1500 exit(1);
1501 }
1502
1503 static void freeClientArgv(redisClient *c) {
1504 int j;
1505
1506 for (j = 0; j < c->argc; j++)
1507 decrRefCount(c->argv[j]);
1508 for (j = 0; j < c->mbargc; j++)
1509 decrRefCount(c->mbargv[j]);
1510 c->argc = 0;
1511 c->mbargc = 0;
1512 }
1513
1514 static void freeClient(redisClient *c) {
1515 listNode *ln;
1516
1517 /* Note that if the client we are freeing is blocked into a blocking
1518 * call, we have to set querybuf to NULL *before* to call unblockClient()
1519 * to avoid processInputBuffer() will get called. Also it is important
1520 * to remove the file events after this, because this call adds
1521 * the READABLE event. */
1522 sdsfree(c->querybuf);
1523 c->querybuf = NULL;
1524 if (c->flags & REDIS_BLOCKED)
1525 unblockClient(c);
1526
1527 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1528 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1529 listRelease(c->reply);
1530 freeClientArgv(c);
1531 close(c->fd);
1532 ln = listSearchKey(server.clients,c);
1533 redisAssert(ln != NULL);
1534 listDelNode(server.clients,ln);
1535 if (c->flags & REDIS_SLAVE) {
1536 if (c->replstate == REDIS_REPL_SEND_BULK && c->repldbfd != -1)
1537 close(c->repldbfd);
1538 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1539 ln = listSearchKey(l,c);
1540 redisAssert(ln != NULL);
1541 listDelNode(l,ln);
1542 }
1543 if (c->flags & REDIS_MASTER) {
1544 server.master = NULL;
1545 server.replstate = REDIS_REPL_CONNECT;
1546 }
1547 zfree(c->argv);
1548 zfree(c->mbargv);
1549 freeClientMultiState(c);
1550 zfree(c);
1551 }
1552
1553 #define GLUEREPLY_UP_TO (1024)
1554 static void glueReplyBuffersIfNeeded(redisClient *c) {
1555 int copylen = 0;
1556 char buf[GLUEREPLY_UP_TO];
1557 listNode *ln;
1558 robj *o;
1559
1560 listRewind(c->reply);
1561 while((ln = listYield(c->reply))) {
1562 int objlen;
1563
1564 o = ln->value;
1565 objlen = sdslen(o->ptr);
1566 if (copylen + objlen <= GLUEREPLY_UP_TO) {
1567 memcpy(buf+copylen,o->ptr,objlen);
1568 copylen += objlen;
1569 listDelNode(c->reply,ln);
1570 } else {
1571 if (copylen == 0) return;
1572 break;
1573 }
1574 }
1575 /* Now the output buffer is empty, add the new single element */
1576 o = createObject(REDIS_STRING,sdsnewlen(buf,copylen));
1577 listAddNodeHead(c->reply,o);
1578 }
1579
1580 static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1581 redisClient *c = privdata;
1582 int nwritten = 0, totwritten = 0, objlen;
1583 robj *o;
1584 REDIS_NOTUSED(el);
1585 REDIS_NOTUSED(mask);
1586
1587 /* Use writev() if we have enough buffers to send */
1588 if (!server.glueoutputbuf &&
1589 listLength(c->reply) > REDIS_WRITEV_THRESHOLD &&
1590 !(c->flags & REDIS_MASTER))
1591 {
1592 sendReplyToClientWritev(el, fd, privdata, mask);
1593 return;
1594 }
1595
1596 while(listLength(c->reply)) {
1597 if (server.glueoutputbuf && listLength(c->reply) > 1)
1598 glueReplyBuffersIfNeeded(c);
1599
1600 o = listNodeValue(listFirst(c->reply));
1601 objlen = sdslen(o->ptr);
1602
1603 if (objlen == 0) {
1604 listDelNode(c->reply,listFirst(c->reply));
1605 continue;
1606 }
1607
1608 if (c->flags & REDIS_MASTER) {
1609 /* Don't reply to a master */
1610 nwritten = objlen - c->sentlen;
1611 } else {
1612 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1613 if (nwritten <= 0) break;
1614 }
1615 c->sentlen += nwritten;
1616 totwritten += nwritten;
1617 /* If we fully sent the object on head go to the next one */
1618 if (c->sentlen == objlen) {
1619 listDelNode(c->reply,listFirst(c->reply));
1620 c->sentlen = 0;
1621 }
1622 /* Note that we avoid to send more thank REDIS_MAX_WRITE_PER_EVENT
1623 * bytes, in a single threaded server it's a good idea to serve
1624 * other clients as well, even if a very large request comes from
1625 * super fast link that is always able to accept data (in real world
1626 * scenario think about 'KEYS *' against the loopback interfae) */
1627 if (totwritten > REDIS_MAX_WRITE_PER_EVENT) break;
1628 }
1629 if (nwritten == -1) {
1630 if (errno == EAGAIN) {
1631 nwritten = 0;
1632 } else {
1633 redisLog(REDIS_DEBUG,
1634 "Error writing to client: %s", strerror(errno));
1635 freeClient(c);
1636 return;
1637 }
1638 }
1639 if (totwritten > 0) c->lastinteraction = time(NULL);
1640 if (listLength(c->reply) == 0) {
1641 c->sentlen = 0;
1642 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1643 }
1644 }
1645
1646 static void sendReplyToClientWritev(aeEventLoop *el, int fd, void *privdata, int mask)
1647 {
1648 redisClient *c = privdata;
1649 int nwritten = 0, totwritten = 0, objlen, willwrite;
1650 robj *o;
1651 struct iovec iov[REDIS_WRITEV_IOVEC_COUNT];
1652 int offset, ion = 0;
1653 REDIS_NOTUSED(el);
1654 REDIS_NOTUSED(mask);
1655
1656 listNode *node;
1657 while (listLength(c->reply)) {
1658 offset = c->sentlen;
1659 ion = 0;
1660 willwrite = 0;
1661
1662 /* fill-in the iov[] array */
1663 for(node = listFirst(c->reply); node; node = listNextNode(node)) {
1664 o = listNodeValue(node);
1665 objlen = sdslen(o->ptr);
1666
1667 if (totwritten + objlen - offset > REDIS_MAX_WRITE_PER_EVENT)
1668 break;
1669
1670 if(ion == REDIS_WRITEV_IOVEC_COUNT)
1671 break; /* no more iovecs */
1672
1673 iov[ion].iov_base = ((char*)o->ptr) + offset;
1674 iov[ion].iov_len = objlen - offset;
1675 willwrite += objlen - offset;
1676 offset = 0; /* just for the first item */
1677 ion++;
1678 }
1679
1680 if(willwrite == 0)
1681 break;
1682
1683 /* write all collected blocks at once */
1684 if((nwritten = writev(fd, iov, ion)) < 0) {
1685 if (errno != EAGAIN) {
1686 redisLog(REDIS_DEBUG,
1687 "Error writing to client: %s", strerror(errno));
1688 freeClient(c);
1689 return;
1690 }
1691 break;
1692 }
1693
1694 totwritten += nwritten;
1695 offset = c->sentlen;
1696
1697 /* remove written robjs from c->reply */
1698 while (nwritten && listLength(c->reply)) {
1699 o = listNodeValue(listFirst(c->reply));
1700 objlen = sdslen(o->ptr);
1701
1702 if(nwritten >= objlen - offset) {
1703 listDelNode(c->reply, listFirst(c->reply));
1704 nwritten -= objlen - offset;
1705 c->sentlen = 0;
1706 } else {
1707 /* partial write */
1708 c->sentlen += nwritten;
1709 break;
1710 }
1711 offset = 0;
1712 }
1713 }
1714
1715 if (totwritten > 0)
1716 c->lastinteraction = time(NULL);
1717
1718 if (listLength(c->reply) == 0) {
1719 c->sentlen = 0;
1720 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1721 }
1722 }
1723
1724 static struct redisCommand *lookupCommand(char *name) {
1725 int j = 0;
1726 while(cmdTable[j].name != NULL) {
1727 if (!strcasecmp(name,cmdTable[j].name)) return &cmdTable[j];
1728 j++;
1729 }
1730 return NULL;
1731 }
1732
1733 /* resetClient prepare the client to process the next command */
1734 static void resetClient(redisClient *c) {
1735 freeClientArgv(c);
1736 c->bulklen = -1;
1737 c->multibulk = 0;
1738 }
1739
1740 /* Call() is the core of Redis execution of a command */
1741 static void call(redisClient *c, struct redisCommand *cmd) {
1742 long long dirty;
1743
1744 dirty = server.dirty;
1745 cmd->proc(c);
1746 if (server.appendonly && server.dirty-dirty)
1747 feedAppendOnlyFile(cmd,c->db->id,c->argv,c->argc);
1748 if (server.dirty-dirty && listLength(server.slaves))
1749 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1750 if (listLength(server.monitors))
1751 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1752 server.stat_numcommands++;
1753 }
1754
1755 /* If this function gets called we already read a whole
1756 * command, argments are in the client argv/argc fields.
1757 * processCommand() execute the command or prepare the
1758 * server for a bulk read from the client.
1759 *
1760 * If 1 is returned the client is still alive and valid and
1761 * and other operations can be performed by the caller. Otherwise
1762 * if 0 is returned the client was destroied (i.e. after QUIT). */
1763 static int processCommand(redisClient *c) {
1764 struct redisCommand *cmd;
1765
1766 /* Free some memory if needed (maxmemory setting) */
1767 if (server.maxmemory) freeMemoryIfNeeded();
1768
1769 /* Handle the multi bulk command type. This is an alternative protocol
1770 * supported by Redis in order to receive commands that are composed of
1771 * multiple binary-safe "bulk" arguments. The latency of processing is
1772 * a bit higher but this allows things like multi-sets, so if this
1773 * protocol is used only for MSET and similar commands this is a big win. */
1774 if (c->multibulk == 0 && c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '*') {
1775 c->multibulk = atoi(((char*)c->argv[0]->ptr)+1);
1776 if (c->multibulk <= 0) {
1777 resetClient(c);
1778 return 1;
1779 } else {
1780 decrRefCount(c->argv[c->argc-1]);
1781 c->argc--;
1782 return 1;
1783 }
1784 } else if (c->multibulk) {
1785 if (c->bulklen == -1) {
1786 if (((char*)c->argv[0]->ptr)[0] != '$') {
1787 addReplySds(c,sdsnew("-ERR multi bulk protocol error\r\n"));
1788 resetClient(c);
1789 return 1;
1790 } else {
1791 int bulklen = atoi(((char*)c->argv[0]->ptr)+1);
1792 decrRefCount(c->argv[0]);
1793 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1794 c->argc--;
1795 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1796 resetClient(c);
1797 return 1;
1798 }
1799 c->argc--;
1800 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1801 return 1;
1802 }
1803 } else {
1804 c->mbargv = zrealloc(c->mbargv,(sizeof(robj*))*(c->mbargc+1));
1805 c->mbargv[c->mbargc] = c->argv[0];
1806 c->mbargc++;
1807 c->argc--;
1808 c->multibulk--;
1809 if (c->multibulk == 0) {
1810 robj **auxargv;
1811 int auxargc;
1812
1813 /* Here we need to swap the multi-bulk argc/argv with the
1814 * normal argc/argv of the client structure. */
1815 auxargv = c->argv;
1816 c->argv = c->mbargv;
1817 c->mbargv = auxargv;
1818
1819 auxargc = c->argc;
1820 c->argc = c->mbargc;
1821 c->mbargc = auxargc;
1822
1823 /* We need to set bulklen to something different than -1
1824 * in order for the code below to process the command without
1825 * to try to read the last argument of a bulk command as
1826 * a special argument. */
1827 c->bulklen = 0;
1828 /* continue below and process the command */
1829 } else {
1830 c->bulklen = -1;
1831 return 1;
1832 }
1833 }
1834 }
1835 /* -- end of multi bulk commands processing -- */
1836
1837 /* The QUIT command is handled as a special case. Normal command
1838 * procs are unable to close the client connection safely */
1839 if (!strcasecmp(c->argv[0]->ptr,"quit")) {
1840 freeClient(c);
1841 return 0;
1842 }
1843 cmd = lookupCommand(c->argv[0]->ptr);
1844 if (!cmd) {
1845 addReplySds(c,
1846 sdscatprintf(sdsempty(), "-ERR unknown command '%s'\r\n",
1847 (char*)c->argv[0]->ptr));
1848 resetClient(c);
1849 return 1;
1850 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1851 (c->argc < -cmd->arity)) {
1852 addReplySds(c,
1853 sdscatprintf(sdsempty(),
1854 "-ERR wrong number of arguments for '%s' command\r\n",
1855 cmd->name));
1856 resetClient(c);
1857 return 1;
1858 } else if (server.maxmemory && cmd->flags & REDIS_CMD_DENYOOM && zmalloc_used_memory() > server.maxmemory) {
1859 addReplySds(c,sdsnew("-ERR command not allowed when used memory > 'maxmemory'\r\n"));
1860 resetClient(c);
1861 return 1;
1862 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1863 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1864
1865 decrRefCount(c->argv[c->argc-1]);
1866 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1867 c->argc--;
1868 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1869 resetClient(c);
1870 return 1;
1871 }
1872 c->argc--;
1873 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1874 /* It is possible that the bulk read is already in the
1875 * buffer. Check this condition and handle it accordingly.
1876 * This is just a fast path, alternative to call processInputBuffer().
1877 * It's a good idea since the code is small and this condition
1878 * happens most of the times. */
1879 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1880 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1881 c->argc++;
1882 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1883 } else {
1884 return 1;
1885 }
1886 }
1887 /* Let's try to share objects on the command arguments vector */
1888 if (server.shareobjects) {
1889 int j;
1890 for(j = 1; j < c->argc; j++)
1891 c->argv[j] = tryObjectSharing(c->argv[j]);
1892 }
1893 /* Let's try to encode the bulk object to save space. */
1894 if (cmd->flags & REDIS_CMD_BULK)
1895 tryObjectEncoding(c->argv[c->argc-1]);
1896
1897 /* Check if the user is authenticated */
1898 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1899 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1900 resetClient(c);
1901 return 1;
1902 }
1903
1904 /* Exec the command */
1905 if (c->flags & REDIS_MULTI && cmd->proc != execCommand) {
1906 queueMultiCommand(c,cmd);
1907 addReply(c,shared.queued);
1908 } else {
1909 call(c,cmd);
1910 }
1911
1912 /* Prepare the client for the next command */
1913 if (c->flags & REDIS_CLOSE) {
1914 freeClient(c);
1915 return 0;
1916 }
1917 resetClient(c);
1918 return 1;
1919 }
1920
1921 static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1922 listNode *ln;
1923 int outc = 0, j;
1924 robj **outv;
1925 /* (args*2)+1 is enough room for args, spaces, newlines */
1926 robj *static_outv[REDIS_STATIC_ARGS*2+1];
1927
1928 if (argc <= REDIS_STATIC_ARGS) {
1929 outv = static_outv;
1930 } else {
1931 outv = zmalloc(sizeof(robj*)*(argc*2+1));
1932 }
1933
1934 for (j = 0; j < argc; j++) {
1935 if (j != 0) outv[outc++] = shared.space;
1936 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1937 robj *lenobj;
1938
1939 lenobj = createObject(REDIS_STRING,
1940 sdscatprintf(sdsempty(),"%lu\r\n",
1941 (unsigned long) stringObjectLen(argv[j])));
1942 lenobj->refcount = 0;
1943 outv[outc++] = lenobj;
1944 }
1945 outv[outc++] = argv[j];
1946 }
1947 outv[outc++] = shared.crlf;
1948
1949 /* Increment all the refcounts at start and decrement at end in order to
1950 * be sure to free objects if there is no slave in a replication state
1951 * able to be feed with commands */
1952 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1953 listRewind(slaves);
1954 while((ln = listYield(slaves))) {
1955 redisClient *slave = ln->value;
1956
1957 /* Don't feed slaves that are still waiting for BGSAVE to start */
1958 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;
1959
1960 /* Feed all the other slaves, MONITORs and so on */
1961 if (slave->slaveseldb != dictid) {
1962 robj *selectcmd;
1963
1964 switch(dictid) {
1965 case 0: selectcmd = shared.select0; break;
1966 case 1: selectcmd = shared.select1; break;
1967 case 2: selectcmd = shared.select2; break;
1968 case 3: selectcmd = shared.select3; break;
1969 case 4: selectcmd = shared.select4; break;
1970 case 5: selectcmd = shared.select5; break;
1971 case 6: selectcmd = shared.select6; break;
1972 case 7: selectcmd = shared.select7; break;
1973 case 8: selectcmd = shared.select8; break;
1974 case 9: selectcmd = shared.select9; break;
1975 default:
1976 selectcmd = createObject(REDIS_STRING,
1977 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1978 selectcmd->refcount = 0;
1979 break;
1980 }
1981 addReply(slave,selectcmd);
1982 slave->slaveseldb = dictid;
1983 }
1984 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1985 }
1986 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1987 if (outv != static_outv) zfree(outv);
1988 }
1989
1990 static void processInputBuffer(redisClient *c) {
1991 again:
1992 /* Before to process the input buffer, make sure the client is not
1993 * waitig for a blocking operation such as BLPOP. Note that the first
1994 * iteration the client is never blocked, otherwise the processInputBuffer
1995 * would not be called at all, but after the execution of the first commands
1996 * in the input buffer the client may be blocked, and the "goto again"
1997 * will try to reiterate. The following line will make it return asap. */
1998 if (c->flags & REDIS_BLOCKED) return;
1999 if (c->bulklen == -1) {
2000 /* Read the first line of the query */
2001 char *p = strchr(c->querybuf,'\n');
2002 size_t querylen;
2003
2004 if (p) {
2005 sds query, *argv;
2006 int argc, j;
2007
2008 query = c->querybuf;
2009 c->querybuf = sdsempty();
2010 querylen = 1+(p-(query));
2011 if (sdslen(query) > querylen) {
2012 /* leave data after the first line of the query in the buffer */
2013 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
2014 }
2015 *p = '\0'; /* remove "\n" */
2016 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
2017 sdsupdatelen(query);
2018
2019 /* Now we can split the query in arguments */
2020 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
2021 sdsfree(query);
2022
2023 if (c->argv) zfree(c->argv);
2024 c->argv = zmalloc(sizeof(robj*)*argc);
2025
2026 for (j = 0; j < argc; j++) {
2027 if (sdslen(argv[j])) {
2028 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
2029 c->argc++;
2030 } else {
2031 sdsfree(argv[j]);
2032 }
2033 }
2034 zfree(argv);
2035 if (c->argc) {
2036 /* Execute the command. If the client is still valid
2037 * after processCommand() return and there is something
2038 * on the query buffer try to process the next command. */
2039 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2040 } else {
2041 /* Nothing to process, argc == 0. Just process the query
2042 * buffer if it's not empty or return to the caller */
2043 if (sdslen(c->querybuf)) goto again;
2044 }
2045 return;
2046 } else if (sdslen(c->querybuf) >= REDIS_REQUEST_MAX_SIZE) {
2047 redisLog(REDIS_DEBUG, "Client protocol error");
2048 freeClient(c);
2049 return;
2050 }
2051 } else {
2052 /* Bulk read handling. Note that if we are at this point
2053 the client already sent a command terminated with a newline,
2054 we are reading the bulk data that is actually the last
2055 argument of the command. */
2056 int qbl = sdslen(c->querybuf);
2057
2058 if (c->bulklen <= qbl) {
2059 /* Copy everything but the final CRLF as final argument */
2060 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
2061 c->argc++;
2062 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
2063 /* Process the command. If the client is still valid after
2064 * the processing and there is more data in the buffer
2065 * try to parse it. */
2066 if (processCommand(c) && sdslen(c->querybuf)) goto again;
2067 return;
2068 }
2069 }
2070 }
2071
2072 static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
2073 redisClient *c = (redisClient*) privdata;
2074 char buf[REDIS_IOBUF_LEN];
2075 int nread;
2076 REDIS_NOTUSED(el);
2077 REDIS_NOTUSED(mask);
2078
2079 nread = read(fd, buf, REDIS_IOBUF_LEN);
2080 if (nread == -1) {
2081 if (errno == EAGAIN) {
2082 nread = 0;
2083 } else {
2084 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
2085 freeClient(c);
2086 return;
2087 }
2088 } else if (nread == 0) {
2089 redisLog(REDIS_DEBUG, "Client closed connection");
2090 freeClient(c);
2091 return;
2092 }
2093 if (nread) {
2094 c->querybuf = sdscatlen(c->querybuf, buf, nread);
2095 c->lastinteraction = time(NULL);
2096 } else {
2097 return;
2098 }
2099 processInputBuffer(c);
2100 }
2101
2102 static int selectDb(redisClient *c, int id) {
2103 if (id < 0 || id >= server.dbnum)
2104 return REDIS_ERR;
2105 c->db = &server.db[id];
2106 return REDIS_OK;
2107 }
2108
2109 static void *dupClientReplyValue(void *o) {
2110 incrRefCount((robj*)o);
2111 return 0;
2112 }
2113
2114 static redisClient *createClient(int fd) {
2115 redisClient *c = zmalloc(sizeof(*c));
2116
2117 anetNonBlock(NULL,fd);
2118 anetTcpNoDelay(NULL,fd);
2119 if (!c) return NULL;
2120 selectDb(c,0);
2121 c->fd = fd;
2122 c->querybuf = sdsempty();
2123 c->argc = 0;
2124 c->argv = NULL;
2125 c->bulklen = -1;
2126 c->multibulk = 0;
2127 c->mbargc = 0;
2128 c->mbargv = NULL;
2129 c->sentlen = 0;
2130 c->flags = 0;
2131 c->lastinteraction = time(NULL);
2132 c->authenticated = 0;
2133 c->replstate = REDIS_REPL_NONE;
2134 c->reply = listCreate();
2135 c->blockingkeys = NULL;
2136 c->blockingkeysnum = 0;
2137 listSetFreeMethod(c->reply,decrRefCount);
2138 listSetDupMethod(c->reply,dupClientReplyValue);
2139 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
2140 readQueryFromClient, c) == AE_ERR) {
2141 freeClient(c);
2142 return NULL;
2143 }
2144 listAddNodeTail(server.clients,c);
2145 initClientMultiState(c);
2146 return c;
2147 }
2148
2149 static void addReply(redisClient *c, robj *obj) {
2150 if (listLength(c->reply) == 0 &&
2151 (c->replstate == REDIS_REPL_NONE ||
2152 c->replstate == REDIS_REPL_ONLINE) &&
2153 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
2154 sendReplyToClient, c) == AE_ERR) return;
2155 listAddNodeTail(c->reply,getDecodedObject(obj));
2156 }
2157
2158 static void addReplySds(redisClient *c, sds s) {
2159 robj *o = createObject(REDIS_STRING,s);
2160 addReply(c,o);
2161 decrRefCount(o);
2162 }
2163
2164 static void addReplyDouble(redisClient *c, double d) {
2165 char buf[128];
2166
2167 snprintf(buf,sizeof(buf),"%.17g",d);
2168 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n%s\r\n",
2169 (unsigned long) strlen(buf),buf));
2170 }
2171
2172 static void addReplyBulkLen(redisClient *c, robj *obj) {
2173 size_t len;
2174
2175 if (obj->encoding == REDIS_ENCODING_RAW) {
2176 len = sdslen(obj->ptr);
2177 } else {
2178 long n = (long)obj->ptr;
2179
2180 /* Compute how many bytes will take this integer as a radix 10 string */
2181 len = 1;
2182 if (n < 0) {
2183 len++;
2184 n = -n;
2185 }
2186 while((n = n/10) != 0) {
2187 len++;
2188 }
2189 }
2190 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",(unsigned long)len));
2191 }
2192
2193 static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
2194 int cport, cfd;
2195 char cip[128];
2196 redisClient *c;
2197 REDIS_NOTUSED(el);
2198 REDIS_NOTUSED(mask);
2199 REDIS_NOTUSED(privdata);
2200
2201 cfd = anetAccept(server.neterr, fd, cip, &cport);
2202 if (cfd == AE_ERR) {
2203 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
2204 return;
2205 }
2206 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
2207 if ((c = createClient(cfd)) == NULL) {
2208 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
2209 close(cfd); /* May be already closed, just ingore errors */
2210 return;
2211 }
2212 /* If maxclient directive is set and this is one client more... close the
2213 * connection. Note that we create the client instead to check before
2214 * for this condition, since now the socket is already set in nonblocking
2215 * mode and we can send an error for free using the Kernel I/O */
2216 if (server.maxclients && listLength(server.clients) > server.maxclients) {
2217 char *err = "-ERR max number of clients reached\r\n";
2218
2219 /* That's a best effort error message, don't check write errors */
2220 if (write(c->fd,err,strlen(err)) == -1) {
2221 /* Nothing to do, Just to avoid the warning... */
2222 }
2223 freeClient(c);
2224 return;
2225 }
2226 server.stat_numconnections++;
2227 }
2228
2229 /* ======================= Redis objects implementation ===================== */
2230
2231 static robj *createObject(int type, void *ptr) {
2232 robj *o;
2233
2234 if (listLength(server.objfreelist)) {
2235 listNode *head = listFirst(server.objfreelist);
2236 o = listNodeValue(head);
2237 listDelNode(server.objfreelist,head);
2238 } else {
2239 if (server.vm_enabled) {
2240 o = zmalloc(sizeof(*o));
2241 } else {
2242 o = zmalloc(sizeof(*o)-sizeof(struct redisObjectVM));
2243 }
2244 }
2245 o->type = type;
2246 o->encoding = REDIS_ENCODING_RAW;
2247 o->ptr = ptr;
2248 o->refcount = 1;
2249 return o;
2250 }
2251
2252 static robj *createStringObject(char *ptr, size_t len) {
2253 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
2254 }
2255
2256 static robj *createListObject(void) {
2257 list *l = listCreate();
2258
2259 listSetFreeMethod(l,decrRefCount);
2260 return createObject(REDIS_LIST,l);
2261 }
2262
2263 static robj *createSetObject(void) {
2264 dict *d = dictCreate(&setDictType,NULL);
2265 return createObject(REDIS_SET,d);
2266 }
2267
2268 static robj *createZsetObject(void) {
2269 zset *zs = zmalloc(sizeof(*zs));
2270
2271 zs->dict = dictCreate(&zsetDictType,NULL);
2272 zs->zsl = zslCreate();
2273 return createObject(REDIS_ZSET,zs);
2274 }
2275
2276 static void freeStringObject(robj *o) {
2277 if (o->encoding == REDIS_ENCODING_RAW) {
2278 sdsfree(o->ptr);
2279 }
2280 }
2281
2282 static void freeListObject(robj *o) {
2283 listRelease((list*) o->ptr);
2284 }
2285
2286 static void freeSetObject(robj *o) {
2287 dictRelease((dict*) o->ptr);
2288 }
2289
2290 static void freeZsetObject(robj *o) {
2291 zset *zs = o->ptr;
2292
2293 dictRelease(zs->dict);
2294 zslFree(zs->zsl);
2295 zfree(zs);
2296 }
2297
2298 static void freeHashObject(robj *o) {
2299 dictRelease((dict*) o->ptr);
2300 }
2301
2302 static void incrRefCount(robj *o) {
2303 o->refcount++;
2304 #ifdef DEBUG_REFCOUNT
2305 if (o->type == REDIS_STRING)
2306 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
2307 #endif
2308 }
2309
2310 static void decrRefCount(void *obj) {
2311 robj *o = obj;
2312
2313 #ifdef DEBUG_REFCOUNT
2314 if (o->type == REDIS_STRING)
2315 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
2316 #endif
2317 if (--(o->refcount) == 0) {
2318 switch(o->type) {
2319 case REDIS_STRING: freeStringObject(o); break;
2320 case REDIS_LIST: freeListObject(o); break;
2321 case REDIS_SET: freeSetObject(o); break;
2322 case REDIS_ZSET: freeZsetObject(o); break;
2323 case REDIS_HASH: freeHashObject(o); break;
2324 default: redisAssert(0 != 0); break;
2325 }
2326 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
2327 !listAddNodeHead(server.objfreelist,o))
2328 zfree(o);
2329 }
2330 }
2331
2332 static robj *lookupKey(redisDb *db, robj *key) {
2333 dictEntry *de = dictFind(db->dict,key);
2334 return de ? dictGetEntryVal(de) : NULL;
2335 }
2336
2337 static robj *lookupKeyRead(redisDb *db, robj *key) {
2338 expireIfNeeded(db,key);
2339 return lookupKey(db,key);
2340 }
2341
2342 static robj *lookupKeyWrite(redisDb *db, robj *key) {
2343 deleteIfVolatile(db,key);
2344 return lookupKey(db,key);
2345 }
2346
2347 static int deleteKey(redisDb *db, robj *key) {
2348 int retval;
2349
2350 /* We need to protect key from destruction: after the first dictDelete()
2351 * it may happen that 'key' is no longer valid if we don't increment
2352 * it's count. This may happen when we get the object reference directly
2353 * from the hash table with dictRandomKey() or dict iterators */
2354 incrRefCount(key);
2355 if (dictSize(db->expires)) dictDelete(db->expires,key);
2356 retval = dictDelete(db->dict,key);
2357 decrRefCount(key);
2358
2359 return retval == DICT_OK;
2360 }
2361
2362 /* Try to share an object against the shared objects pool */
2363 static robj *tryObjectSharing(robj *o) {
2364 struct dictEntry *de;
2365 unsigned long c;
2366
2367 if (o == NULL || server.shareobjects == 0) return o;
2368
2369 redisAssert(o->type == REDIS_STRING);
2370 de = dictFind(server.sharingpool,o);
2371 if (de) {
2372 robj *shared = dictGetEntryKey(de);
2373
2374 c = ((unsigned long) dictGetEntryVal(de))+1;
2375 dictGetEntryVal(de) = (void*) c;
2376 incrRefCount(shared);
2377 decrRefCount(o);
2378 return shared;
2379 } else {
2380 /* Here we are using a stream algorihtm: Every time an object is
2381 * shared we increment its count, everytime there is a miss we
2382 * recrement the counter of a random object. If this object reaches
2383 * zero we remove the object and put the current object instead. */
2384 if (dictSize(server.sharingpool) >=
2385 server.sharingpoolsize) {
2386 de = dictGetRandomKey(server.sharingpool);
2387 redisAssert(de != NULL);
2388 c = ((unsigned long) dictGetEntryVal(de))-1;
2389 dictGetEntryVal(de) = (void*) c;
2390 if (c == 0) {
2391 dictDelete(server.sharingpool,de->key);
2392 }
2393 } else {
2394 c = 0; /* If the pool is empty we want to add this object */
2395 }
2396 if (c == 0) {
2397 int retval;
2398
2399 retval = dictAdd(server.sharingpool,o,(void*)1);
2400 redisAssert(retval == DICT_OK);
2401 incrRefCount(o);
2402 }
2403 return o;
2404 }
2405 }
2406
2407 /* Check if the nul-terminated string 's' can be represented by a long
2408 * (that is, is a number that fits into long without any other space or
2409 * character before or after the digits).
2410 *
2411 * If so, the function returns REDIS_OK and *longval is set to the value
2412 * of the number. Otherwise REDIS_ERR is returned */
2413 static int isStringRepresentableAsLong(sds s, long *longval) {
2414 char buf[32], *endptr;
2415 long value;
2416 int slen;
2417
2418 value = strtol(s, &endptr, 10);
2419 if (endptr[0] != '\0') return REDIS_ERR;
2420 slen = snprintf(buf,32,"%ld",value);
2421
2422 /* If the number converted back into a string is not identical
2423 * then it's not possible to encode the string as integer */
2424 if (sdslen(s) != (unsigned)slen || memcmp(buf,s,slen)) return REDIS_ERR;
2425 if (longval) *longval = value;
2426 return REDIS_OK;
2427 }
2428
2429 /* Try to encode a string object in order to save space */
2430 static int tryObjectEncoding(robj *o) {
2431 long value;
2432 sds s = o->ptr;
2433
2434 if (o->encoding != REDIS_ENCODING_RAW)
2435 return REDIS_ERR; /* Already encoded */
2436
2437 /* It's not save to encode shared objects: shared objects can be shared
2438 * everywhere in the "object space" of Redis. Encoded objects can only
2439 * appear as "values" (and not, for instance, as keys) */
2440 if (o->refcount > 1) return REDIS_ERR;
2441
2442 /* Currently we try to encode only strings */
2443 redisAssert(o->type == REDIS_STRING);
2444
2445 /* Check if we can represent this string as a long integer */
2446 if (isStringRepresentableAsLong(s,&value) == REDIS_ERR) return REDIS_ERR;
2447
2448 /* Ok, this object can be encoded */
2449 o->encoding = REDIS_ENCODING_INT;
2450 sdsfree(o->ptr);
2451 o->ptr = (void*) value;
2452 return REDIS_OK;
2453 }
2454
2455 /* Get a decoded version of an encoded object (returned as a new object).
2456 * If the object is already raw-encoded just increment the ref count. */
2457 static robj *getDecodedObject(robj *o) {
2458 robj *dec;
2459
2460 if (o->encoding == REDIS_ENCODING_RAW) {
2461 incrRefCount(o);
2462 return o;
2463 }
2464 if (o->type == REDIS_STRING && o->encoding == REDIS_ENCODING_INT) {
2465 char buf[32];
2466
2467 snprintf(buf,32,"%ld",(long)o->ptr);
2468 dec = createStringObject(buf,strlen(buf));
2469 return dec;
2470 } else {
2471 redisAssert(1 != 1);
2472 }
2473 }
2474
2475 /* Compare two string objects via strcmp() or alike.
2476 * Note that the objects may be integer-encoded. In such a case we
2477 * use snprintf() to get a string representation of the numbers on the stack
2478 * and compare the strings, it's much faster than calling getDecodedObject().
2479 *
2480 * Important note: if objects are not integer encoded, but binary-safe strings,
2481 * sdscmp() from sds.c will apply memcmp() so this function ca be considered
2482 * binary safe. */
2483 static int compareStringObjects(robj *a, robj *b) {
2484 redisAssert(a->type == REDIS_STRING && b->type == REDIS_STRING);
2485 char bufa[128], bufb[128], *astr, *bstr;
2486 int bothsds = 1;
2487
2488 if (a == b) return 0;
2489 if (a->encoding != REDIS_ENCODING_RAW) {
2490 snprintf(bufa,sizeof(bufa),"%ld",(long) a->ptr);
2491 astr = bufa;
2492 bothsds = 0;
2493 } else {
2494 astr = a->ptr;
2495 }
2496 if (b->encoding != REDIS_ENCODING_RAW) {
2497 snprintf(bufb,sizeof(bufb),"%ld",(long) b->ptr);
2498 bstr = bufb;
2499 bothsds = 0;
2500 } else {
2501 bstr = b->ptr;
2502 }
2503 return bothsds ? sdscmp(astr,bstr) : strcmp(astr,bstr);
2504 }
2505
2506 static size_t stringObjectLen(robj *o) {
2507 redisAssert(o->type == REDIS_STRING);
2508 if (o->encoding == REDIS_ENCODING_RAW) {
2509 return sdslen(o->ptr);
2510 } else {
2511 char buf[32];
2512
2513 return snprintf(buf,32,"%ld",(long)o->ptr);
2514 }
2515 }
2516
2517 /*============================ RDB saving/loading =========================== */
2518
2519 static int rdbSaveType(FILE *fp, unsigned char type) {
2520 if (fwrite(&type,1,1,fp) == 0) return -1;
2521 return 0;
2522 }
2523
2524 static int rdbSaveTime(FILE *fp, time_t t) {
2525 int32_t t32 = (int32_t) t;
2526 if (fwrite(&t32,4,1,fp) == 0) return -1;
2527 return 0;
2528 }
2529
2530 /* check rdbLoadLen() comments for more info */
2531 static int rdbSaveLen(FILE *fp, uint32_t len) {
2532 unsigned char buf[2];
2533
2534 if (len < (1<<6)) {
2535 /* Save a 6 bit len */
2536 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
2537 if (fwrite(buf,1,1,fp) == 0) return -1;
2538 } else if (len < (1<<14)) {
2539 /* Save a 14 bit len */
2540 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
2541 buf[1] = len&0xFF;
2542 if (fwrite(buf,2,1,fp) == 0) return -1;
2543 } else {
2544 /* Save a 32 bit len */
2545 buf[0] = (REDIS_RDB_32BITLEN<<6);
2546 if (fwrite(buf,1,1,fp) == 0) return -1;
2547 len = htonl(len);
2548 if (fwrite(&len,4,1,fp) == 0) return -1;
2549 }
2550 return 0;
2551 }
2552
2553 /* String objects in the form "2391" "-100" without any space and with a
2554 * range of values that can fit in an 8, 16 or 32 bit signed value can be
2555 * encoded as integers to save space */
2556 static int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
2557 long long value;
2558 char *endptr, buf[32];
2559
2560 /* Check if it's possible to encode this value as a number */
2561 value = strtoll(s, &endptr, 10);
2562 if (endptr[0] != '\0') return 0;
2563 snprintf(buf,32,"%lld",value);
2564
2565 /* If the number converted back into a string is not identical
2566 * then it's not possible to encode the string as integer */
2567 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
2568
2569 /* Finally check if it fits in our ranges */
2570 if (value >= -(1<<7) && value <= (1<<7)-1) {
2571 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
2572 enc[1] = value&0xFF;
2573 return 2;
2574 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
2575 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
2576 enc[1] = value&0xFF;
2577 enc[2] = (value>>8)&0xFF;
2578 return 3;
2579 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
2580 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
2581 enc[1] = value&0xFF;
2582 enc[2] = (value>>8)&0xFF;
2583 enc[3] = (value>>16)&0xFF;
2584 enc[4] = (value>>24)&0xFF;
2585 return 5;
2586 } else {
2587 return 0;
2588 }
2589 }
2590
2591 static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
2592 unsigned int comprlen, outlen;
2593 unsigned char byte;
2594 void *out;
2595
2596 /* We require at least four bytes compression for this to be worth it */
2597 outlen = sdslen(obj->ptr)-4;
2598 if (outlen <= 0) return 0;
2599 if ((out = zmalloc(outlen+1)) == NULL) return 0;
2600 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
2601 if (comprlen == 0) {
2602 zfree(out);
2603 return 0;
2604 }
2605 /* Data compressed! Let's save it on disk */
2606 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
2607 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
2608 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
2609 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
2610 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
2611 zfree(out);
2612 return comprlen;
2613
2614 writeerr:
2615 zfree(out);
2616 return -1;
2617 }
2618
2619 /* Save a string objet as [len][data] on disk. If the object is a string
2620 * representation of an integer value we try to safe it in a special form */
2621 static int rdbSaveStringObjectRaw(FILE *fp, robj *obj) {
2622 size_t len;
2623 int enclen;
2624
2625 len = sdslen(obj->ptr);
2626
2627 /* Try integer encoding */
2628 if (len <= 11) {
2629 unsigned char buf[5];
2630 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
2631 if (fwrite(buf,enclen,1,fp) == 0) return -1;
2632 return 0;
2633 }
2634 }
2635
2636 /* Try LZF compression - under 20 bytes it's unable to compress even
2637 * aaaaaaaaaaaaaaaaaa so skip it */
2638 if (server.rdbcompression && len > 20) {
2639 int retval;
2640
2641 retval = rdbSaveLzfStringObject(fp,obj);
2642 if (retval == -1) return -1;
2643 if (retval > 0) return 0;
2644 /* retval == 0 means data can't be compressed, save the old way */
2645 }
2646
2647 /* Store verbatim */
2648 if (rdbSaveLen(fp,len) == -1) return -1;
2649 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
2650 return 0;
2651 }
2652
2653 /* Like rdbSaveStringObjectRaw() but handle encoded objects */
2654 static int rdbSaveStringObject(FILE *fp, robj *obj) {
2655 int retval;
2656
2657 obj = getDecodedObject(obj);
2658 retval = rdbSaveStringObjectRaw(fp,obj);
2659 decrRefCount(obj);
2660 return retval;
2661 }
2662
2663 /* Save a double value. Doubles are saved as strings prefixed by an unsigned
2664 * 8 bit integer specifing the length of the representation.
2665 * This 8 bit integer has special values in order to specify the following
2666 * conditions:
2667 * 253: not a number
2668 * 254: + inf
2669 * 255: - inf
2670 */
2671 static int rdbSaveDoubleValue(FILE *fp, double val) {
2672 unsigned char buf[128];
2673 int len;
2674
2675 if (isnan(val)) {
2676 buf[0] = 253;
2677 len = 1;
2678 } else if (!isfinite(val)) {
2679 len = 1;
2680 buf[0] = (val < 0) ? 255 : 254;
2681 } else {
2682 snprintf((char*)buf+1,sizeof(buf)-1,"%.17g",val);
2683 buf[0] = strlen((char*)buf+1);
2684 len = buf[0]+1;
2685 }
2686 if (fwrite(buf,len,1,fp) == 0) return -1;
2687 return 0;
2688 }
2689
2690 /* Save a Redis object. */
2691 static int rdbSaveObject(FILE *fp, robj *o) {
2692 if (o->type == REDIS_STRING) {
2693 /* Save a string value */
2694 if (rdbSaveStringObject(fp,o) == -1) return -1;
2695 } else if (o->type == REDIS_LIST) {
2696 /* Save a list value */
2697 list *list = o->ptr;
2698 listNode *ln;
2699
2700 listRewind(list);
2701 if (rdbSaveLen(fp,listLength(list)) == -1) return -1;
2702 while((ln = listYield(list))) {
2703 robj *eleobj = listNodeValue(ln);
2704
2705 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2706 }
2707 } else if (o->type == REDIS_SET) {
2708 /* Save a set value */
2709 dict *set = o->ptr;
2710 dictIterator *di = dictGetIterator(set);
2711 dictEntry *de;
2712
2713 if (rdbSaveLen(fp,dictSize(set)) == -1) return -1;
2714 while((de = dictNext(di)) != NULL) {
2715 robj *eleobj = dictGetEntryKey(de);
2716
2717 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2718 }
2719 dictReleaseIterator(di);
2720 } else if (o->type == REDIS_ZSET) {
2721 /* Save a set value */
2722 zset *zs = o->ptr;
2723 dictIterator *di = dictGetIterator(zs->dict);
2724 dictEntry *de;
2725
2726 if (rdbSaveLen(fp,dictSize(zs->dict)) == -1) return -1;
2727 while((de = dictNext(di)) != NULL) {
2728 robj *eleobj = dictGetEntryKey(de);
2729 double *score = dictGetEntryVal(de);
2730
2731 if (rdbSaveStringObject(fp,eleobj) == -1) return -1;
2732 if (rdbSaveDoubleValue(fp,*score) == -1) return -1;
2733 }
2734 dictReleaseIterator(di);
2735 } else {
2736 redisAssert(0 != 0);
2737 }
2738 return 0;
2739 }
2740
2741 /* Return the length the object will have on disk if saved with
2742 * the rdbSaveObject() function. Currently we use a trick to get
2743 * this length with very little changes to the code. In the future
2744 * we could switch to a faster solution. */
2745 static off_t rdbSavedObjectLen(robj *o) {
2746 static FILE *fp = NULL;
2747
2748 if (fp == NULL) fp = fopen("/dev/null","w");
2749 assert(fp != NULL);
2750
2751 rewind(fp);
2752 assert(rdbSaveObject(fp,o) != 1);
2753 return ftello(fp);
2754 }
2755
2756 /* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
2757 static int rdbSave(char *filename) {
2758 dictIterator *di = NULL;
2759 dictEntry *de;
2760 FILE *fp;
2761 char tmpfile[256];
2762 int j;
2763 time_t now = time(NULL);
2764
2765 snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
2766 fp = fopen(tmpfile,"w");
2767 if (!fp) {
2768 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
2769 return REDIS_ERR;
2770 }
2771 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
2772 for (j = 0; j < server.dbnum; j++) {
2773 redisDb *db = server.db+j;
2774 dict *d = db->dict;
2775 if (dictSize(d) == 0) continue;
2776 di = dictGetIterator(d);
2777 if (!di) {
2778 fclose(fp);
2779 return REDIS_ERR;
2780 }
2781
2782 /* Write the SELECT DB opcode */
2783 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
2784 if (rdbSaveLen(fp,j) == -1) goto werr;
2785
2786 /* Iterate this DB writing every entry */
2787 while((de = dictNext(di)) != NULL) {
2788 robj *key = dictGetEntryKey(de);
2789 robj *o = dictGetEntryVal(de);
2790 time_t expiretime = getExpire(db,key);
2791
2792 /* Save the expire time */
2793 if (expiretime != -1) {
2794 /* If this key is already expired skip it */
2795 if (expiretime < now) continue;
2796 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
2797 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
2798 }
2799 /* Save the key and associated value */
2800 if (rdbSaveType(fp,o->type) == -1) goto werr;
2801 if (rdbSaveStringObject(fp,key) == -1) goto werr;
2802 /* Save the actual value */
2803 if (rdbSaveObject(fp,o) == -1) goto werr;
2804 }
2805 dictReleaseIterator(di);
2806 }
2807 /* EOF opcode */
2808 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
2809
2810 /* Make sure data will not remain on the OS's output buffers */
2811 fflush(fp);
2812 fsync(fileno(fp));
2813 fclose(fp);
2814
2815 /* Use RENAME to make sure the DB file is changed atomically only
2816 * if the generate DB file is ok. */
2817 if (rename(tmpfile,filename) == -1) {
2818 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
2819 unlink(tmpfile);
2820 return REDIS_ERR;
2821 }
2822 redisLog(REDIS_NOTICE,"DB saved on disk");
2823 server.dirty = 0;
2824 server.lastsave = time(NULL);
2825 return REDIS_OK;
2826
2827 werr:
2828 fclose(fp);
2829 unlink(tmpfile);
2830 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
2831 if (di) dictReleaseIterator(di);
2832 return REDIS_ERR;
2833 }
2834
2835 static int rdbSaveBackground(char *filename) {
2836 pid_t childpid;
2837
2838 if (server.bgsavechildpid != -1) return REDIS_ERR;
2839 if ((childpid = fork()) == 0) {
2840 /* Child */
2841 close(server.fd);
2842 if (rdbSave(filename) == REDIS_OK) {
2843 exit(0);
2844 } else {
2845 exit(1);
2846 }
2847 } else {
2848 /* Parent */
2849 if (childpid == -1) {
2850 redisLog(REDIS_WARNING,"Can't save in background: fork: %s",
2851 strerror(errno));
2852 return REDIS_ERR;
2853 }
2854 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
2855 server.bgsavechildpid = childpid;
2856 return REDIS_OK;
2857 }
2858 return REDIS_OK; /* unreached */
2859 }
2860
2861 static void rdbRemoveTempFile(pid_t childpid) {
2862 char tmpfile[256];
2863
2864 snprintf(tmpfile,256,"temp-%d.rdb", (int) childpid);
2865 unlink(tmpfile);
2866 }
2867
2868 static int rdbLoadType(FILE *fp) {
2869 unsigned char type;
2870 if (fread(&type,1,1,fp) == 0) return -1;
2871 return type;
2872 }
2873
2874 static time_t rdbLoadTime(FILE *fp) {
2875 int32_t t32;
2876 if (fread(&t32,4,1,fp) == 0) return -1;
2877 return (time_t) t32;
2878 }
2879
2880 /* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
2881 * of this file for a description of how this are stored on disk.
2882 *
2883 * isencoded is set to 1 if the readed length is not actually a length but
2884 * an "encoding type", check the above comments for more info */
2885 static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
2886 unsigned char buf[2];
2887 uint32_t len;
2888
2889 if (isencoded) *isencoded = 0;
2890 if (rdbver == 0) {
2891 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2892 return ntohl(len);
2893 } else {
2894 int type;
2895
2896 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
2897 type = (buf[0]&0xC0)>>6;
2898 if (type == REDIS_RDB_6BITLEN) {
2899 /* Read a 6 bit len */
2900 return buf[0]&0x3F;
2901 } else if (type == REDIS_RDB_ENCVAL) {
2902 /* Read a 6 bit len encoding type */
2903 if (isencoded) *isencoded = 1;
2904 return buf[0]&0x3F;
2905 } else if (type == REDIS_RDB_14BITLEN) {
2906 /* Read a 14 bit len */
2907 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
2908 return ((buf[0]&0x3F)<<8)|buf[1];
2909 } else {
2910 /* Read a 32 bit len */
2911 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
2912 return ntohl(len);
2913 }
2914 }
2915 }
2916
2917 static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
2918 unsigned char enc[4];
2919 long long val;
2920
2921 if (enctype == REDIS_RDB_ENC_INT8) {
2922 if (fread(enc,1,1,fp) == 0) return NULL;
2923 val = (signed char)enc[0];
2924 } else if (enctype == REDIS_RDB_ENC_INT16) {
2925 uint16_t v;
2926 if (fread(enc,2,1,fp) == 0) return NULL;
2927 v = enc[0]|(enc[1]<<8);
2928 val = (int16_t)v;
2929 } else if (enctype == REDIS_RDB_ENC_INT32) {
2930 uint32_t v;
2931 if (fread(enc,4,1,fp) == 0) return NULL;
2932 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
2933 val = (int32_t)v;
2934 } else {
2935 val = 0; /* anti-warning */
2936 redisAssert(0!=0);
2937 }
2938 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
2939 }
2940
2941 static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
2942 unsigned int len, clen;
2943 unsigned char *c = NULL;
2944 sds val = NULL;
2945
2946 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2947 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
2948 if ((c = zmalloc(clen)) == NULL) goto err;
2949 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
2950 if (fread(c,clen,1,fp) == 0) goto err;
2951 if (lzf_decompress(c,clen,val,len) == 0) goto err;
2952 zfree(c);
2953 return createObject(REDIS_STRING,val);
2954 err:
2955 zfree(c);
2956 sdsfree(val);
2957 return NULL;
2958 }
2959
2960 static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
2961 int isencoded;
2962 uint32_t len;
2963 sds val;
2964
2965 len = rdbLoadLen(fp,rdbver,&isencoded);
2966 if (isencoded) {
2967 switch(len) {
2968 case REDIS_RDB_ENC_INT8:
2969 case REDIS_RDB_ENC_INT16:
2970 case REDIS_RDB_ENC_INT32:
2971 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
2972 case REDIS_RDB_ENC_LZF:
2973 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
2974 default:
2975 redisAssert(0!=0);
2976 }
2977 }
2978
2979 if (len == REDIS_RDB_LENERR) return NULL;
2980 val = sdsnewlen(NULL,len);
2981 if (len && fread(val,len,1,fp) == 0) {
2982 sdsfree(val);
2983 return NULL;
2984 }
2985 return tryObjectSharing(createObject(REDIS_STRING,val));
2986 }
2987
2988 /* For information about double serialization check rdbSaveDoubleValue() */
2989 static int rdbLoadDoubleValue(FILE *fp, double *val) {
2990 char buf[128];
2991 unsigned char len;
2992
2993 if (fread(&len,1,1,fp) == 0) return -1;
2994 switch(len) {
2995 case 255: *val = R_NegInf; return 0;
2996 case 254: *val = R_PosInf; return 0;
2997 case 253: *val = R_Nan; return 0;
2998 default:
2999 if (fread(buf,len,1,fp) == 0) return -1;
3000 buf[len] = '\0';
3001 sscanf(buf, "%lg", val);
3002 return 0;
3003 }
3004 }
3005
3006 static int rdbLoad(char *filename) {
3007 FILE *fp;
3008 robj *keyobj = NULL;
3009 uint32_t dbid;
3010 int type, retval, rdbver;
3011 dict *d = server.db[0].dict;
3012 redisDb *db = server.db+0;
3013 char buf[1024];
3014 time_t expiretime = -1, now = time(NULL);
3015
3016 fp = fopen(filename,"r");
3017 if (!fp) return REDIS_ERR;
3018 if (fread(buf,9,1,fp) == 0) goto eoferr;
3019 buf[9] = '\0';
3020 if (memcmp(buf,"REDIS",5) != 0) {
3021 fclose(fp);
3022 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
3023 return REDIS_ERR;
3024 }
3025 rdbver = atoi(buf+5);
3026 if (rdbver > 1) {
3027 fclose(fp);
3028 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
3029 return REDIS_ERR;
3030 }
3031 while(1) {
3032 robj *o;
3033
3034 /* Read type. */
3035 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3036 if (type == REDIS_EXPIRETIME) {
3037 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
3038 /* We read the time so we need to read the object type again */
3039 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
3040 }
3041 if (type == REDIS_EOF) break;
3042 /* Handle SELECT DB opcode as a special case */
3043 if (type == REDIS_SELECTDB) {
3044 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
3045 goto eoferr;
3046 if (dbid >= (unsigned)server.dbnum) {
3047 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
3048 exit(1);
3049 }
3050 db = server.db+dbid;
3051 d = db->dict;
3052 continue;
3053 }
3054 /* Read key */
3055 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3056
3057 if (type == REDIS_STRING) {
3058 /* Read string value */
3059 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3060 tryObjectEncoding(o);
3061 } else if (type == REDIS_LIST || type == REDIS_SET) {
3062 /* Read list/set value */
3063 uint32_t listlen;
3064
3065 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
3066 goto eoferr;
3067 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
3068 /* Load every single element of the list/set */
3069 while(listlen--) {
3070 robj *ele;
3071
3072 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3073 tryObjectEncoding(ele);
3074 if (type == REDIS_LIST) {
3075 listAddNodeTail((list*)o->ptr,ele);
3076 } else {
3077 dictAdd((dict*)o->ptr,ele,NULL);
3078 }
3079 }
3080 } else if (type == REDIS_ZSET) {
3081 /* Read list/set value */
3082 uint32_t zsetlen;
3083 zset *zs;
3084
3085 if ((zsetlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
3086 goto eoferr;
3087 o = createZsetObject();
3088 zs = o->ptr;
3089 /* Load every single element of the list/set */
3090 while(zsetlen--) {
3091 robj *ele;
3092 double *score = zmalloc(sizeof(double));
3093
3094 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
3095 tryObjectEncoding(ele);
3096 if (rdbLoadDoubleValue(fp,score) == -1) goto eoferr;
3097 dictAdd(zs->dict,ele,score);
3098 zslInsert(zs->zsl,*score,ele);
3099 incrRefCount(ele); /* added to skiplist */
3100 }
3101 } else {
3102 redisAssert(0 != 0);
3103 }
3104 /* Add the new object in the hash table */
3105 retval = dictAdd(d,keyobj,o);
3106 if (retval == DICT_ERR) {
3107 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
3108 exit(1);
3109 }
3110 /* Set the expire time if needed */
3111 if (expiretime != -1) {
3112 setExpire(db,keyobj,expiretime);
3113 /* Delete this key if already expired */
3114 if (expiretime < now) deleteKey(db,keyobj);
3115 expiretime = -1;
3116 }
3117 keyobj = o = NULL;
3118 }
3119 fclose(fp);
3120 return REDIS_OK;
3121
3122 eoferr: /* unexpected end of file is handled here with a fatal exit */
3123 if (keyobj) decrRefCount(keyobj);
3124 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
3125 exit(1);
3126 return REDIS_ERR; /* Just to avoid warning */
3127 }
3128
3129 /*================================== Commands =============================== */
3130
3131 static void authCommand(redisClient *c) {
3132 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
3133 c->authenticated = 1;
3134 addReply(c,shared.ok);
3135 } else {
3136 c->authenticated = 0;
3137 addReplySds(c,sdscatprintf(sdsempty(),"-ERR invalid password\r\n"));
3138 }
3139 }
3140
3141 static void pingCommand(redisClient *c) {
3142 addReply(c,shared.pong);
3143 }
3144
3145 static void echoCommand(redisClient *c) {
3146 addReplyBulkLen(c,c->argv[1]);
3147 addReply(c,c->argv[1]);
3148 addReply(c,shared.crlf);
3149 }
3150
3151 /*=================================== Strings =============================== */
3152
3153 static void setGenericCommand(redisClient *c, int nx) {
3154 int retval;
3155
3156 if (nx) deleteIfVolatile(c->db,c->argv[1]);
3157 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
3158 if (retval == DICT_ERR) {
3159 if (!nx) {
3160 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3161 incrRefCount(c->argv[2]);
3162 } else {
3163 addReply(c,shared.czero);
3164 return;
3165 }
3166 } else {
3167 incrRefCount(c->argv[1]);
3168 incrRefCount(c->argv[2]);
3169 }
3170 server.dirty++;
3171 removeExpire(c->db,c->argv[1]);
3172 addReply(c, nx ? shared.cone : shared.ok);
3173 }
3174
3175 static void setCommand(redisClient *c) {
3176 setGenericCommand(c,0);
3177 }
3178
3179 static void setnxCommand(redisClient *c) {
3180 setGenericCommand(c,1);
3181 }
3182
3183 static int getGenericCommand(redisClient *c) {
3184 robj *o = lookupKeyRead(c->db,c->argv[1]);
3185
3186 if (o == NULL) {
3187 addReply(c,shared.nullbulk);
3188 return REDIS_OK;
3189 } else {
3190 if (o->type != REDIS_STRING) {
3191 addReply(c,shared.wrongtypeerr);
3192 return REDIS_ERR;
3193 } else {
3194 addReplyBulkLen(c,o);
3195 addReply(c,o);
3196 addReply(c,shared.crlf);
3197 return REDIS_OK;
3198 }
3199 }
3200 }
3201
3202 static void getCommand(redisClient *c) {
3203 getGenericCommand(c);
3204 }
3205
3206 static void getsetCommand(redisClient *c) {
3207 if (getGenericCommand(c) == REDIS_ERR) return;
3208 if (dictAdd(c->db->dict,c->argv[1],c->argv[2]) == DICT_ERR) {
3209 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
3210 } else {
3211 incrRefCount(c->argv[1]);
3212 }
3213 incrRefCount(c->argv[2]);
3214 server.dirty++;
3215 removeExpire(c->db,c->argv[1]);
3216 }
3217
3218 static void mgetCommand(redisClient *c) {
3219 int j;
3220
3221 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
3222 for (j = 1; j < c->argc; j++) {
3223 robj *o = lookupKeyRead(c->db,c->argv[j]);
3224 if (o == NULL) {
3225 addReply(c,shared.nullbulk);
3226 } else {
3227 if (o->type != REDIS_STRING) {
3228 addReply(c,shared.nullbulk);
3229 } else {
3230 addReplyBulkLen(c,o);
3231 addReply(c,o);
3232 addReply(c,shared.crlf);
3233 }
3234 }
3235 }
3236 }
3237
3238 static void msetGenericCommand(redisClient *c, int nx) {
3239 int j, busykeys = 0;
3240
3241 if ((c->argc % 2) == 0) {
3242 addReplySds(c,sdsnew("-ERR wrong number of arguments for MSET\r\n"));
3243 return;
3244 }
3245 /* Handle the NX flag. The MSETNX semantic is to return zero and don't
3246 * set nothing at all if at least one already key exists. */
3247 if (nx) {
3248 for (j = 1; j < c->argc; j += 2) {
3249 if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
3250 busykeys++;
3251 }
3252 }
3253 }
3254 if (busykeys) {
3255 addReply(c, shared.czero);
3256 return;
3257 }
3258
3259 for (j = 1; j < c->argc; j += 2) {
3260 int retval;
3261
3262 tryObjectEncoding(c->argv[j+1]);
3263 retval = dictAdd(c->db->dict,c->argv[j],c->argv[j+1]);
3264 if (retval == DICT_ERR) {
3265 dictReplace(c->db->dict,c->argv[j],c->argv[j+1]);
3266 incrRefCount(c->argv[j+1]);
3267 } else {
3268 incrRefCount(c->argv[j]);
3269 incrRefCount(c->argv[j+1]);
3270 }
3271 removeExpire(c->db,c->argv[j]);
3272 }
3273 server.dirty += (c->argc-1)/2;
3274 addReply(c, nx ? shared.cone : shared.ok);
3275 }
3276
3277 static void msetCommand(redisClient *c) {
3278 msetGenericCommand(c,0);
3279 }
3280
3281 static void msetnxCommand(redisClient *c) {
3282 msetGenericCommand(c,1);
3283 }
3284
3285 static void incrDecrCommand(redisClient *c, long long incr) {
3286 long long value;
3287 int retval;
3288 robj *o;
3289
3290 o = lookupKeyWrite(c->db,c->argv[1]);
3291 if (o == NULL) {
3292 value = 0;
3293 } else {
3294 if (o->type != REDIS_STRING) {
3295 value = 0;
3296 } else {
3297 char *eptr;
3298
3299 if (o->encoding == REDIS_ENCODING_RAW)
3300 value = strtoll(o->ptr, &eptr, 10);
3301 else if (o->encoding == REDIS_ENCODING_INT)
3302 value = (long)o->ptr;
3303 else
3304 redisAssert(1 != 1);
3305 }
3306 }
3307
3308 value += incr;
3309 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
3310 tryObjectEncoding(o);
3311 retval = dictAdd(c->db->dict,c->argv[1],o);
3312 if (retval == DICT_ERR) {
3313 dictReplace(c->db->dict,c->argv[1],o);
3314 removeExpire(c->db,c->argv[1]);
3315 } else {
3316 incrRefCount(c->argv[1]);
3317 }
3318 server.dirty++;
3319 addReply(c,shared.colon);
3320 addReply(c,o);
3321 addReply(c,shared.crlf);
3322 }
3323
3324 static void incrCommand(redisClient *c) {
3325 incrDecrCommand(c,1);
3326 }
3327
3328 static void decrCommand(redisClient *c) {
3329 incrDecrCommand(c,-1);
3330 }
3331
3332 static void incrbyCommand(redisClient *c) {
3333 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3334 incrDecrCommand(c,incr);
3335 }
3336
3337 static void decrbyCommand(redisClient *c) {
3338 long long incr = strtoll(c->argv[2]->ptr, NULL, 10);
3339 incrDecrCommand(c,-incr);
3340 }
3341
3342 /* ========================= Type agnostic commands ========================= */
3343
3344 static void delCommand(redisClient *c) {
3345 int deleted = 0, j;
3346
3347 for (j = 1; j < c->argc; j++) {
3348 if (deleteKey(c->db,c->argv[j])) {
3349 server.dirty++;
3350 deleted++;
3351 }
3352 }
3353 switch(deleted) {
3354 case 0:
3355 addReply(c,shared.czero);
3356 break;
3357 case 1:
3358 addReply(c,shared.cone);
3359 break;
3360 default:
3361 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",deleted));
3362 break;
3363 }
3364 }
3365
3366 static void existsCommand(redisClient *c) {
3367 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
3368 }
3369
3370 static void selectCommand(redisClient *c) {
3371 int id = atoi(c->argv[1]->ptr);
3372
3373 if (selectDb(c,id) == REDIS_ERR) {
3374 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
3375 } else {
3376 addReply(c,shared.ok);
3377 }
3378 }
3379
3380 static void randomkeyCommand(redisClient *c) {
3381 dictEntry *de;
3382
3383 while(1) {
3384 de = dictGetRandomKey(c->db->dict);
3385 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
3386 }
3387 if (de == NULL) {
3388 addReply(c,shared.plus);
3389 addReply(c,shared.crlf);
3390 } else {
3391 addReply(c,shared.plus);
3392 addReply(c,dictGetEntryKey(de));
3393 addReply(c,shared.crlf);
3394 }
3395 }
3396
3397 static void keysCommand(redisClient *c) {
3398 dictIterator *di;
3399 dictEntry *de;
3400 sds pattern = c->argv[1]->ptr;
3401 int plen = sdslen(pattern);
3402 unsigned long numkeys = 0, keyslen = 0;
3403 robj *lenobj = createObject(REDIS_STRING,NULL);
3404
3405 di = dictGetIterator(c->db->dict);
3406 addReply(c,lenobj);
3407 decrRefCount(lenobj);
3408 while((de = dictNext(di)) != NULL) {
3409 robj *keyobj = dictGetEntryKey(de);
3410
3411 sds key = keyobj->ptr;
3412 if ((pattern[0] == '*' && pattern[1] == '\0') ||
3413 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
3414 if (expireIfNeeded(c->db,keyobj) == 0) {
3415 if (numkeys != 0)
3416 addReply(c,shared.space);
3417 addReply(c,keyobj);
3418 numkeys++;
3419 keyslen += sdslen(key);
3420 }
3421 }
3422 }
3423 dictReleaseIterator(di);
3424 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
3425 addReply(c,shared.crlf);
3426 }
3427
3428 static void dbsizeCommand(redisClient *c) {
3429 addReplySds(c,
3430 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
3431 }
3432
3433 static void lastsaveCommand(redisClient *c) {
3434 addReplySds(c,
3435 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
3436 }
3437
3438 static void typeCommand(redisClient *c) {
3439 robj *o;
3440 char *type;
3441
3442 o = lookupKeyRead(c->db,c->argv[1]);
3443 if (o == NULL) {
3444 type = "+none";
3445 } else {
3446 switch(o->type) {
3447 case REDIS_STRING: type = "+string"; break;
3448 case REDIS_LIST: type = "+list"; break;
3449 case REDIS_SET: type = "+set"; break;
3450 case REDIS_ZSET: type = "+zset"; break;
3451 default: type = "unknown"; break;
3452 }
3453 }
3454 addReplySds(c,sdsnew(type));
3455 addReply(c,shared.crlf);
3456 }
3457
3458 static void saveCommand(redisClient *c) {
3459 if (server.bgsavechildpid != -1) {
3460 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
3461 return;
3462 }
3463 if (rdbSave(server.dbfilename) == REDIS_OK) {
3464 addReply(c,shared.ok);
3465 } else {
3466 addReply(c,shared.err);
3467 }
3468 }
3469
3470 static void bgsaveCommand(redisClient *c) {
3471 if (server.bgsavechildpid != -1) {
3472 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
3473 return;
3474 }
3475 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
3476 char *status = "+Background saving started\r\n";
3477 addReplySds(c,sdsnew(status));
3478 } else {
3479 addReply(c,shared.err);
3480 }
3481 }
3482
3483 static void shutdownCommand(redisClient *c) {
3484 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
3485 /* Kill the saving child if there is a background saving in progress.
3486 We want to avoid race conditions, for instance our saving child may
3487 overwrite the synchronous saving did by SHUTDOWN. */
3488 if (server.bgsavechildpid != -1) {
3489 redisLog(REDIS_WARNING,"There is a live saving child. Killing it!");
3490 kill(server.bgsavechildpid,SIGKILL);
3491 rdbRemoveTempFile(server.bgsavechildpid);
3492 }
3493 if (server.appendonly) {
3494 /* Append only file: fsync() the AOF and exit */
3495 fsync(server.appendfd);
3496 exit(0);
3497 } else {
3498 /* Snapshotting. Perform a SYNC SAVE and exit */
3499 if (rdbSave(server.dbfilename) == REDIS_OK) {
3500 if (server.daemonize)
3501 unlink(server.pidfile);
3502 redisLog(REDIS_WARNING,"%zu bytes used at exit",zmalloc_used_memory());
3503 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
3504 exit(0);
3505 } else {
3506 /* Ooops.. error saving! The best we can do is to continue operating.
3507 * Note that if there was a background saving process, in the next
3508 * cron() Redis will be notified that the background saving aborted,
3509 * handling special stuff like slaves pending for synchronization... */
3510 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
3511 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
3512 }
3513 }
3514 }
3515
3516 static void renameGenericCommand(redisClient *c, int nx) {
3517 robj *o;
3518
3519 /* To use the same key as src and dst is probably an error */
3520 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
3521 addReply(c,shared.sameobjecterr);
3522 return;
3523 }
3524
3525 o = lookupKeyWrite(c->db,c->argv[1]);
3526 if (o == NULL) {
3527 addReply(c,shared.nokeyerr);
3528 return;
3529 }
3530 incrRefCount(o);
3531 deleteIfVolatile(c->db,c->argv[2]);
3532 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
3533 if (nx) {
3534 decrRefCount(o);
3535 addReply(c,shared.czero);
3536 return;
3537 }
3538 dictReplace(c->db->dict,c->argv[2],o);
3539 } else {
3540 incrRefCount(c->argv[2]);
3541 }
3542 deleteKey(c->db,c->argv[1]);
3543 server.dirty++;
3544 addReply(c,nx ? shared.cone : shared.ok);
3545 }
3546
3547 static void renameCommand(redisClient *c) {
3548 renameGenericCommand(c,0);
3549 }
3550
3551 static void renamenxCommand(redisClient *c) {
3552 renameGenericCommand(c,1);
3553 }
3554
3555 static void moveCommand(redisClient *c) {
3556 robj *o;
3557 redisDb *src, *dst;
3558 int srcid;
3559
3560 /* Obtain source and target DB pointers */
3561 src = c->db;
3562 srcid = c->db->id;
3563 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
3564 addReply(c,shared.outofrangeerr);
3565 return;
3566 }
3567 dst = c->db;
3568 selectDb(c,srcid); /* Back to the source DB */
3569
3570 /* If the user is moving using as target the same
3571 * DB as the source DB it is probably an error. */
3572 if (src == dst) {
3573 addReply(c,shared.sameobjecterr);
3574 return;
3575 }
3576
3577 /* Check if the element exists and get a reference */
3578 o = lookupKeyWrite(c->db,c->argv[1]);
3579 if (!o) {
3580 addReply(c,shared.czero);
3581 return;
3582 }
3583
3584 /* Try to add the element to the target DB */
3585 deleteIfVolatile(dst,c->argv[1]);
3586 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
3587 addReply(c,shared.czero);
3588 return;
3589 }
3590 incrRefCount(c->argv[1]);
3591 incrRefCount(o);
3592
3593 /* OK! key moved, free the entry in the source DB */
3594 deleteKey(src,c->argv[1]);
3595 server.dirty++;
3596 addReply(c,shared.cone);
3597 }
3598
3599 /* =================================== Lists ================================ */
3600 static void pushGenericCommand(redisClient *c, int where) {
3601 robj *lobj;
3602 list *list;
3603
3604 lobj = lookupKeyWrite(c->db,c->argv[1]);
3605 if (lobj == NULL) {
3606 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3607 addReply(c,shared.ok);
3608 return;
3609 }
3610 lobj = createListObject();
3611 list = lobj->ptr;
3612 if (where == REDIS_HEAD) {
3613 listAddNodeHead(list,c->argv[2]);
3614 } else {
3615 listAddNodeTail(list,c->argv[2]);
3616 }
3617 dictAdd(c->db->dict,c->argv[1],lobj);
3618 incrRefCount(c->argv[1]);
3619 incrRefCount(c->argv[2]);
3620 } else {
3621 if (lobj->type != REDIS_LIST) {
3622 addReply(c,shared.wrongtypeerr);
3623 return;
3624 }
3625 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
3626 addReply(c,shared.ok);
3627 return;
3628 }
3629 list = lobj->ptr;
3630 if (where == REDIS_HEAD) {
3631 listAddNodeHead(list,c->argv[2]);
3632 } else {
3633 listAddNodeTail(list,c->argv[2]);
3634 }
3635 incrRefCount(c->argv[2]);
3636 }
3637 server.dirty++;
3638 addReply(c,shared.ok);
3639 }
3640
3641 static void lpushCommand(redisClient *c) {
3642 pushGenericCommand(c,REDIS_HEAD);
3643 }
3644
3645 static void rpushCommand(redisClient *c) {
3646 pushGenericCommand(c,REDIS_TAIL);
3647 }
3648
3649 static void llenCommand(redisClient *c) {
3650 robj *o;
3651 list *l;
3652
3653 o = lookupKeyRead(c->db,c->argv[1]);
3654 if (o == NULL) {
3655 addReply(c,shared.czero);
3656 return;
3657 } else {
3658 if (o->type != REDIS_LIST) {
3659 addReply(c,shared.wrongtypeerr);
3660 } else {
3661 l = o->ptr;
3662 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
3663 }
3664 }
3665 }
3666
3667 static void lindexCommand(redisClient *c) {
3668 robj *o;
3669 int index = atoi(c->argv[2]->ptr);
3670
3671 o = lookupKeyRead(c->db,c->argv[1]);
3672 if (o == NULL) {
3673 addReply(c,shared.nullbulk);
3674 } else {
3675 if (o->type != REDIS_LIST) {
3676 addReply(c,shared.wrongtypeerr);
3677 } else {
3678 list *list = o->ptr;
3679 listNode *ln;
3680
3681 ln = listIndex(list, index);
3682 if (ln == NULL) {
3683 addReply(c,shared.nullbulk);
3684 } else {
3685 robj *ele = listNodeValue(ln);
3686 addReplyBulkLen(c,ele);
3687 addReply(c,ele);
3688 addReply(c,shared.crlf);
3689 }
3690 }
3691 }
3692 }
3693
3694 static void lsetCommand(redisClient *c) {
3695 robj *o;
3696 int index = atoi(c->argv[2]->ptr);
3697
3698 o = lookupKeyWrite(c->db,c->argv[1]);
3699 if (o == NULL) {
3700 addReply(c,shared.nokeyerr);
3701 } else {
3702 if (o->type != REDIS_LIST) {
3703 addReply(c,shared.wrongtypeerr);
3704 } else {
3705 list *list = o->ptr;
3706 listNode *ln;
3707
3708 ln = listIndex(list, index);
3709 if (ln == NULL) {
3710 addReply(c,shared.outofrangeerr);
3711 } else {
3712 robj *ele = listNodeValue(ln);
3713
3714 decrRefCount(ele);
3715 listNodeValue(ln) = c->argv[3];
3716 incrRefCount(c->argv[3]);
3717 addReply(c,shared.ok);
3718 server.dirty++;
3719 }
3720 }
3721 }
3722 }
3723
3724 static void popGenericCommand(redisClient *c, int where) {
3725 robj *o;
3726
3727 o = lookupKeyWrite(c->db,c->argv[1]);
3728 if (o == NULL) {
3729 addReply(c,shared.nullbulk);
3730 } else {
3731 if (o->type != REDIS_LIST) {
3732 addReply(c,shared.wrongtypeerr);
3733 } else {
3734 list *list = o->ptr;
3735 listNode *ln;
3736
3737 if (where == REDIS_HEAD)
3738 ln = listFirst(list);
3739 else
3740 ln = listLast(list);
3741
3742 if (ln == NULL) {
3743 addReply(c,shared.nullbulk);
3744 } else {
3745 robj *ele = listNodeValue(ln);
3746 addReplyBulkLen(c,ele);
3747 addReply(c,ele);
3748 addReply(c,shared.crlf);
3749 listDelNode(list,ln);
3750 server.dirty++;
3751 }
3752 }
3753 }
3754 }
3755
3756 static void lpopCommand(redisClient *c) {
3757 popGenericCommand(c,REDIS_HEAD);
3758 }
3759
3760 static void rpopCommand(redisClient *c) {
3761 popGenericCommand(c,REDIS_TAIL);
3762 }
3763
3764 static void lrangeCommand(redisClient *c) {
3765 robj *o;
3766 int start = atoi(c->argv[2]->ptr);
3767 int end = atoi(c->argv[3]->ptr);
3768
3769 o = lookupKeyRead(c->db,c->argv[1]);
3770 if (o == NULL) {
3771 addReply(c,shared.nullmultibulk);
3772 } else {
3773 if (o->type != REDIS_LIST) {
3774 addReply(c,shared.wrongtypeerr);
3775 } else {
3776 list *list = o->ptr;
3777 listNode *ln;
3778 int llen = listLength(list);
3779 int rangelen, j;
3780 robj *ele;
3781
3782 /* convert negative indexes */
3783 if (start < 0) start = llen+start;
3784 if (end < 0) end = llen+end;
3785 if (start < 0) start = 0;
3786 if (end < 0) end = 0;
3787
3788 /* indexes sanity checks */
3789 if (start > end || start >= llen) {
3790 /* Out of range start or start > end result in empty list */
3791 addReply(c,shared.emptymultibulk);
3792 return;
3793 }
3794 if (end >= llen) end = llen-1;
3795 rangelen = (end-start)+1;
3796
3797 /* Return the result in form of a multi-bulk reply */
3798 ln = listIndex(list, start);
3799 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
3800 for (j = 0; j < rangelen; j++) {
3801 ele = listNodeValue(ln);
3802 addReplyBulkLen(c,ele);
3803 addReply(c,ele);
3804 addReply(c,shared.crlf);
3805 ln = ln->next;
3806 }
3807 }
3808 }
3809 }
3810
3811 static void ltrimCommand(redisClient *c) {
3812 robj *o;
3813 int start = atoi(c->argv[2]->ptr);
3814 int end = atoi(c->argv[3]->ptr);
3815
3816 o = lookupKeyWrite(c->db,c->argv[1]);
3817 if (o == NULL) {
3818 addReply(c,shared.ok);
3819 } else {
3820 if (o->type != REDIS_LIST) {
3821 addReply(c,shared.wrongtypeerr);
3822 } else {
3823 list *list = o->ptr;
3824 listNode *ln;
3825 int llen = listLength(list);
3826 int j, ltrim, rtrim;
3827
3828 /* convert negative indexes */
3829 if (start < 0) start = llen+start;
3830 if (end < 0) end = llen+end;
3831 if (start < 0) start = 0;
3832 if (end < 0) end = 0;
3833
3834 /* indexes sanity checks */
3835 if (start > end || start >= llen) {
3836 /* Out of range start or start > end result in empty list */
3837 ltrim = llen;
3838 rtrim = 0;
3839 } else {
3840 if (end >= llen) end = llen-1;
3841 ltrim = start;
3842 rtrim = llen-end-1;
3843 }
3844
3845 /* Remove list elements to perform the trim */
3846 for (j = 0; j < ltrim; j++) {
3847 ln = listFirst(list);
3848 listDelNode(list,ln);
3849 }
3850 for (j = 0; j < rtrim; j++) {
3851 ln = listLast(list);
3852 listDelNode(list,ln);
3853 }
3854 server.dirty++;
3855 addReply(c,shared.ok);
3856 }
3857 }
3858 }
3859
3860 static void lremCommand(redisClient *c) {
3861 robj *o;
3862
3863 o = lookupKeyWrite(c->db,c->argv[1]);
3864 if (o == NULL) {
3865 addReply(c,shared.czero);
3866 } else {
3867 if (o->type != REDIS_LIST) {
3868 addReply(c,shared.wrongtypeerr);
3869 } else {
3870 list *list = o->ptr;
3871 listNode *ln, *next;
3872 int toremove = atoi(c->argv[2]->ptr);
3873 int removed = 0;
3874 int fromtail = 0;
3875
3876 if (toremove < 0) {
3877 toremove = -toremove;
3878 fromtail = 1;
3879 }
3880 ln = fromtail ? list->tail : list->head;
3881 while (ln) {
3882 robj *ele = listNodeValue(ln);
3883
3884 next = fromtail ? ln->prev : ln->next;
3885 if (compareStringObjects(ele,c->argv[3]) == 0) {
3886 listDelNode(list,ln);
3887 server.dirty++;
3888 removed++;
3889 if (toremove && removed == toremove) break;
3890 }
3891 ln = next;
3892 }
3893 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
3894 }
3895 }
3896 }
3897
3898 /* This is the semantic of this command:
3899 * RPOPLPUSH srclist dstlist:
3900 * IF LLEN(srclist) > 0
3901 * element = RPOP srclist
3902 * LPUSH dstlist element
3903 * RETURN element
3904 * ELSE
3905 * RETURN nil
3906 * END
3907 * END
3908 *
3909 * The idea is to be able to get an element from a list in a reliable way
3910 * since the element is not just returned but pushed against another list
3911 * as well. This command was originally proposed by Ezra Zygmuntowicz.
3912 */
3913 static void rpoplpushcommand(redisClient *c) {
3914 robj *sobj;
3915
3916 sobj = lookupKeyWrite(c->db,c->argv[1]);
3917 if (sobj == NULL) {
3918 addReply(c,shared.nullbulk);
3919 } else {
3920 if (sobj->type != REDIS_LIST) {
3921 addReply(c,shared.wrongtypeerr);
3922 } else {
3923 list *srclist = sobj->ptr;
3924 listNode *ln = listLast(srclist);
3925
3926 if (ln == NULL) {
3927 addReply(c,shared.nullbulk);
3928 } else {
3929 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
3930 robj *ele = listNodeValue(ln);
3931 list *dstlist;
3932
3933 if (dobj && dobj->type != REDIS_LIST) {
3934 addReply(c,shared.wrongtypeerr);
3935 return;
3936 }
3937
3938 /* Add the element to the target list (unless it's directly
3939 * passed to some BLPOP-ing client */
3940 if (!handleClientsWaitingListPush(c,c->argv[2],ele)) {
3941 if (dobj == NULL) {
3942 /* Create the list if the key does not exist */
3943 dobj = createListObject();
3944 dictAdd(c->db->dict,c->argv[2],dobj);
3945 incrRefCount(c->argv[2]);
3946 }
3947 dstlist = dobj->ptr;
3948 listAddNodeHead(dstlist,ele);
3949 incrRefCount(ele);
3950 }
3951
3952 /* Send the element to the client as reply as well */
3953 addReplyBulkLen(c,ele);
3954 addReply(c,ele);
3955 addReply(c,shared.crlf);
3956
3957 /* Finally remove the element from the source list */
3958 listDelNode(srclist,ln);
3959 server.dirty++;
3960 }
3961 }
3962 }
3963 }
3964
3965
3966 /* ==================================== Sets ================================ */
3967
3968 static void saddCommand(redisClient *c) {
3969 robj *set;
3970
3971 set = lookupKeyWrite(c->db,c->argv[1]);
3972 if (set == NULL) {
3973 set = createSetObject();
3974 dictAdd(c->db->dict,c->argv[1],set);
3975 incrRefCount(c->argv[1]);
3976 } else {
3977 if (set->type != REDIS_SET) {
3978 addReply(c,shared.wrongtypeerr);
3979 return;
3980 }
3981 }
3982 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
3983 incrRefCount(c->argv[2]);
3984 server.dirty++;
3985 addReply(c,shared.cone);
3986 } else {
3987 addReply(c,shared.czero);
3988 }
3989 }
3990
3991 static void sremCommand(redisClient *c) {
3992 robj *set;
3993
3994 set = lookupKeyWrite(c->db,c->argv[1]);
3995 if (set == NULL) {
3996 addReply(c,shared.czero);
3997 } else {
3998 if (set->type != REDIS_SET) {
3999 addReply(c,shared.wrongtypeerr);
4000 return;
4001 }
4002 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
4003 server.dirty++;
4004 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4005 addReply(c,shared.cone);
4006 } else {
4007 addReply(c,shared.czero);
4008 }
4009 }
4010 }
4011
4012 static void smoveCommand(redisClient *c) {
4013 robj *srcset, *dstset;
4014
4015 srcset = lookupKeyWrite(c->db,c->argv[1]);
4016 dstset = lookupKeyWrite(c->db,c->argv[2]);
4017
4018 /* If the source key does not exist return 0, if it's of the wrong type
4019 * raise an error */
4020 if (srcset == NULL || srcset->type != REDIS_SET) {
4021 addReply(c, srcset ? shared.wrongtypeerr : shared.czero);
4022 return;
4023 }
4024 /* Error if the destination key is not a set as well */
4025 if (dstset && dstset->type != REDIS_SET) {
4026 addReply(c,shared.wrongtypeerr);
4027 return;
4028 }
4029 /* Remove the element from the source set */
4030 if (dictDelete(srcset->ptr,c->argv[3]) == DICT_ERR) {
4031 /* Key not found in the src set! return zero */
4032 addReply(c,shared.czero);
4033 return;
4034 }
4035 server.dirty++;
4036 /* Add the element to the destination set */
4037 if (!dstset) {
4038 dstset = createSetObject();
4039 dictAdd(c->db->dict,c->argv[2],dstset);
4040 incrRefCount(c->argv[2]);
4041 }
4042 if (dictAdd(dstset->ptr,c->argv[3],NULL) == DICT_OK)
4043 incrRefCount(c->argv[3]);
4044 addReply(c,shared.cone);
4045 }
4046
4047 static void sismemberCommand(redisClient *c) {
4048 robj *set;
4049
4050 set = lookupKeyRead(c->db,c->argv[1]);
4051 if (set == NULL) {
4052 addReply(c,shared.czero);
4053 } else {
4054 if (set->type != REDIS_SET) {
4055 addReply(c,shared.wrongtypeerr);
4056 return;
4057 }
4058 if (dictFind(set->ptr,c->argv[2]))
4059 addReply(c,shared.cone);
4060 else
4061 addReply(c,shared.czero);
4062 }
4063 }
4064
4065 static void scardCommand(redisClient *c) {
4066 robj *o;
4067 dict *s;
4068
4069 o = lookupKeyRead(c->db,c->argv[1]);
4070 if (o == NULL) {
4071 addReply(c,shared.czero);
4072 return;
4073 } else {
4074 if (o->type != REDIS_SET) {
4075 addReply(c,shared.wrongtypeerr);
4076 } else {
4077 s = o->ptr;
4078 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4079 dictSize(s)));
4080 }
4081 }
4082 }
4083
4084 static void spopCommand(redisClient *c) {
4085 robj *set;
4086 dictEntry *de;
4087
4088 set = lookupKeyWrite(c->db,c->argv[1]);
4089 if (set == NULL) {
4090 addReply(c,shared.nullbulk);
4091 } else {
4092 if (set->type != REDIS_SET) {
4093 addReply(c,shared.wrongtypeerr);
4094 return;
4095 }
4096 de = dictGetRandomKey(set->ptr);
4097 if (de == NULL) {
4098 addReply(c,shared.nullbulk);
4099 } else {
4100 robj *ele = dictGetEntryKey(de);
4101
4102 addReplyBulkLen(c,ele);
4103 addReply(c,ele);
4104 addReply(c,shared.crlf);
4105 dictDelete(set->ptr,ele);
4106 if (htNeedsResize(set->ptr)) dictResize(set->ptr);
4107 server.dirty++;
4108 }
4109 }
4110 }
4111
4112 static void srandmemberCommand(redisClient *c) {
4113 robj *set;
4114 dictEntry *de;
4115
4116 set = lookupKeyRead(c->db,c->argv[1]);
4117 if (set == NULL) {
4118 addReply(c,shared.nullbulk);
4119 } else {
4120 if (set->type != REDIS_SET) {
4121 addReply(c,shared.wrongtypeerr);
4122 return;
4123 }
4124 de = dictGetRandomKey(set->ptr);
4125 if (de == NULL) {
4126 addReply(c,shared.nullbulk);
4127 } else {
4128 robj *ele = dictGetEntryKey(de);
4129
4130 addReplyBulkLen(c,ele);
4131 addReply(c,ele);
4132 addReply(c,shared.crlf);
4133 }
4134 }
4135 }
4136
4137 static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
4138 dict **d1 = (void*) s1, **d2 = (void*) s2;
4139
4140 return dictSize(*d1)-dictSize(*d2);
4141 }
4142
4143 static void sinterGenericCommand(redisClient *c, robj **setskeys, unsigned long setsnum, robj *dstkey) {
4144 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4145 dictIterator *di;
4146 dictEntry *de;
4147 robj *lenobj = NULL, *dstset = NULL;
4148 unsigned long j, cardinality = 0;
4149
4150 for (j = 0; j < setsnum; j++) {
4151 robj *setobj;
4152
4153 setobj = dstkey ?
4154 lookupKeyWrite(c->db,setskeys[j]) :
4155 lookupKeyRead(c->db,setskeys[j]);
4156 if (!setobj) {
4157 zfree(dv);
4158 if (dstkey) {
4159 if (deleteKey(c->db,dstkey))
4160 server.dirty++;
4161 addReply(c,shared.czero);
4162 } else {
4163 addReply(c,shared.nullmultibulk);
4164 }
4165 return;
4166 }
4167 if (setobj->type != REDIS_SET) {
4168 zfree(dv);
4169 addReply(c,shared.wrongtypeerr);
4170 return;
4171 }
4172 dv[j] = setobj->ptr;
4173 }
4174 /* Sort sets from the smallest to largest, this will improve our
4175 * algorithm's performace */
4176 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
4177
4178 /* The first thing we should output is the total number of elements...
4179 * since this is a multi-bulk write, but at this stage we don't know
4180 * the intersection set size, so we use a trick, append an empty object
4181 * to the output list and save the pointer to later modify it with the
4182 * right length */
4183 if (!dstkey) {
4184 lenobj = createObject(REDIS_STRING,NULL);
4185 addReply(c,lenobj);
4186 decrRefCount(lenobj);
4187 } else {
4188 /* If we have a target key where to store the resulting set
4189 * create this key with an empty set inside */
4190 dstset = createSetObject();
4191 }
4192
4193 /* Iterate all the elements of the first (smallest) set, and test
4194 * the element against all the other sets, if at least one set does
4195 * not include the element it is discarded */
4196 di = dictGetIterator(dv[0]);
4197
4198 while((de = dictNext(di)) != NULL) {
4199 robj *ele;
4200
4201 for (j = 1; j < setsnum; j++)
4202 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
4203 if (j != setsnum)
4204 continue; /* at least one set does not contain the member */
4205 ele = dictGetEntryKey(de);
4206 if (!dstkey) {
4207 addReplyBulkLen(c,ele);
4208 addReply(c,ele);
4209 addReply(c,shared.crlf);
4210 cardinality++;
4211 } else {
4212 dictAdd(dstset->ptr,ele,NULL);
4213 incrRefCount(ele);
4214 }
4215 }
4216 dictReleaseIterator(di);
4217
4218 if (dstkey) {
4219 /* Store the resulting set into the target */
4220 deleteKey(c->db,dstkey);
4221 dictAdd(c->db->dict,dstkey,dstset);
4222 incrRefCount(dstkey);
4223 }
4224
4225 if (!dstkey) {
4226 lenobj->ptr = sdscatprintf(sdsempty(),"*%lu\r\n",cardinality);
4227 } else {
4228 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4229 dictSize((dict*)dstset->ptr)));
4230 server.dirty++;
4231 }
4232 zfree(dv);
4233 }
4234
4235 static void sinterCommand(redisClient *c) {
4236 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
4237 }
4238
4239 static void sinterstoreCommand(redisClient *c) {
4240 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
4241 }
4242
4243 #define REDIS_OP_UNION 0
4244 #define REDIS_OP_DIFF 1
4245
4246 static void sunionDiffGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey, int op) {
4247 dict **dv = zmalloc(sizeof(dict*)*setsnum);
4248 dictIterator *di;
4249 dictEntry *de;
4250 robj *dstset = NULL;
4251 int j, cardinality = 0;
4252
4253 for (j = 0; j < setsnum; j++) {
4254 robj *setobj;
4255
4256 setobj = dstkey ?
4257 lookupKeyWrite(c->db,setskeys[j]) :
4258 lookupKeyRead(c->db,setskeys[j]);
4259 if (!setobj) {
4260 dv[j] = NULL;
4261 continue;
4262 }
4263 if (setobj->type != REDIS_SET) {
4264 zfree(dv);
4265 addReply(c,shared.wrongtypeerr);
4266 return;
4267 }
4268 dv[j] = setobj->ptr;
4269 }
4270
4271 /* We need a temp set object to store our union. If the dstkey
4272 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
4273 * this set object will be the resulting object to set into the target key*/
4274 dstset = createSetObject();
4275
4276 /* Iterate all the elements of all the sets, add every element a single
4277 * time to the result set */
4278 for (j = 0; j < setsnum; j++) {
4279 if (op == REDIS_OP_DIFF && j == 0 && !dv[j]) break; /* result set is empty */
4280 if (!dv[j]) continue; /* non existing keys are like empty sets */
4281
4282 di = dictGetIterator(dv[j]);
4283
4284 while((de = dictNext(di)) != NULL) {
4285 robj *ele;
4286
4287 /* dictAdd will not add the same element multiple times */
4288 ele = dictGetEntryKey(de);
4289 if (op == REDIS_OP_UNION || j == 0) {
4290 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
4291 incrRefCount(ele);
4292 cardinality++;
4293 }
4294 } else if (op == REDIS_OP_DIFF) {
4295 if (dictDelete(dstset->ptr,ele) == DICT_OK) {
4296 cardinality--;
4297 }
4298 }
4299 }
4300 dictReleaseIterator(di);
4301
4302 if (op == REDIS_OP_DIFF && cardinality == 0) break; /* result set is empty */
4303 }
4304
4305 /* Output the content of the resulting set, if not in STORE mode */
4306 if (!dstkey) {
4307 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",cardinality));
4308 di = dictGetIterator(dstset->ptr);
4309 while((de = dictNext(di)) != NULL) {
4310 robj *ele;
4311
4312 ele = dictGetEntryKey(de);
4313 addReplyBulkLen(c,ele);
4314 addReply(c,ele);
4315 addReply(c,shared.crlf);
4316 }
4317 dictReleaseIterator(di);
4318 } else {
4319 /* If we have a target key where to store the resulting set
4320 * create this key with the result set inside */
4321 deleteKey(c->db,dstkey);
4322 dictAdd(c->db->dict,dstkey,dstset);
4323 incrRefCount(dstkey);
4324 }
4325
4326 /* Cleanup */
4327 if (!dstkey) {
4328 decrRefCount(dstset);
4329 } else {
4330 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",
4331 dictSize((dict*)dstset->ptr)));
4332 server.dirty++;
4333 }
4334 zfree(dv);
4335 }
4336
4337 static void sunionCommand(redisClient *c) {
4338 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
4339 }
4340
4341 static void sunionstoreCommand(redisClient *c) {
4342 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
4343 }
4344
4345 static void sdiffCommand(redisClient *c) {
4346 sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
4347 }
4348
4349 static void sdiffstoreCommand(redisClient *c) {
4350 sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
4351 }
4352
4353 /* ==================================== ZSets =============================== */
4354
4355 /* ZSETs are ordered sets using two data structures to hold the same elements
4356 * in order to get O(log(N)) INSERT and REMOVE operations into a sorted
4357 * data structure.
4358 *
4359 * The elements are added to an hash table mapping Redis objects to scores.
4360 * At the same time the elements are added to a skip list mapping scores
4361 * to Redis objects (so objects are sorted by scores in this "view"). */
4362
4363 /* This skiplist implementation is almost a C translation of the original
4364 * algorithm described by William Pugh in "Skip Lists: A Probabilistic
4365 * Alternative to Balanced Trees", modified in three ways:
4366 * a) this implementation allows for repeated values.
4367 * b) the comparison is not just by key (our 'score') but by satellite data.
4368 * c) there is a back pointer, so it's a doubly linked list with the back
4369 * pointers being only at "level 1". This allows to traverse the list
4370 * from tail to head, useful for ZREVRANGE. */
4371
4372 static zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
4373 zskiplistNode *zn = zmalloc(sizeof(*zn));
4374
4375 zn->forward = zmalloc(sizeof(zskiplistNode*) * level);
4376 zn->score = score;
4377 zn->obj = obj;
4378 return zn;
4379 }
4380
4381 static zskiplist *zslCreate(void) {
4382 int j;
4383 zskiplist *zsl;
4384
4385 zsl = zmalloc(sizeof(*zsl));
4386 zsl->level = 1;
4387 zsl->length = 0;
4388 zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
4389 for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++)
4390 zsl->header->forward[j] = NULL;
4391 zsl->header->backward = NULL;
4392 zsl->tail = NULL;
4393 return zsl;
4394 }
4395
4396 static void zslFreeNode(zskiplistNode *node) {
4397 decrRefCount(node->obj);
4398 zfree(node->forward);
4399 zfree(node);
4400 }
4401
4402 static void zslFree(zskiplist *zsl) {
4403 zskiplistNode *node = zsl->header->forward[0], *next;
4404
4405 zfree(zsl->header->forward);
4406 zfree(zsl->header);
4407 while(node) {
4408 next = node->forward[0];
4409 zslFreeNode(node);
4410 node = next;
4411 }
4412 zfree(zsl);
4413 }
4414
4415 static int zslRandomLevel(void) {
4416 int level = 1;
4417 while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
4418 level += 1;
4419 return level;
4420 }
4421
4422 static void zslInsert(zskiplist *zsl, double score, robj *obj) {
4423 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4424 int i, level;
4425
4426 x = zsl->header;
4427 for (i = zsl->level-1; i >= 0; i--) {
4428 while (x->forward[i] &&
4429 (x->forward[i]->score < score ||
4430 (x->forward[i]->score == score &&
4431 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4432 x = x->forward[i];
4433 update[i] = x;
4434 }
4435 /* we assume the key is not already inside, since we allow duplicated
4436 * scores, and the re-insertion of score and redis object should never
4437 * happpen since the caller of zslInsert() should test in the hash table
4438 * if the element is already inside or not. */
4439 level = zslRandomLevel();
4440 if (level > zsl->level) {
4441 for (i = zsl->level; i < level; i++)
4442 update[i] = zsl->header;
4443 zsl->level = level;
4444 }
4445 x = zslCreateNode(level,score,obj);
4446 for (i = 0; i < level; i++) {
4447 x->forward[i] = update[i]->forward[i];
4448 update[i]->forward[i] = x;
4449 }
4450 x->backward = (update[0] == zsl->header) ? NULL : update[0];
4451 if (x->forward[0])
4452 x->forward[0]->backward = x;
4453 else
4454 zsl->tail = x;
4455 zsl->length++;
4456 }
4457
4458 /* Delete an element with matching score/object from the skiplist. */
4459 static int zslDelete(zskiplist *zsl, double score, robj *obj) {
4460 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4461 int i;
4462
4463 x = zsl->header;
4464 for (i = zsl->level-1; i >= 0; i--) {
4465 while (x->forward[i] &&
4466 (x->forward[i]->score < score ||
4467 (x->forward[i]->score == score &&
4468 compareStringObjects(x->forward[i]->obj,obj) < 0)))
4469 x = x->forward[i];
4470 update[i] = x;
4471 }
4472 /* We may have multiple elements with the same score, what we need
4473 * is to find the element with both the right score and object. */
4474 x = x->forward[0];
4475 if (x && score == x->score && compareStringObjects(x->obj,obj) == 0) {
4476 for (i = 0; i < zsl->level; i++) {
4477 if (update[i]->forward[i] != x) break;
4478 update[i]->forward[i] = x->forward[i];
4479 }
4480 if (x->forward[0]) {
4481 x->forward[0]->backward = (x->backward == zsl->header) ?
4482 NULL : x->backward;
4483 } else {
4484 zsl->tail = x->backward;
4485 }
4486 zslFreeNode(x);
4487 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4488 zsl->level--;
4489 zsl->length--;
4490 return 1;
4491 } else {
4492 return 0; /* not found */
4493 }
4494 return 0; /* not found */
4495 }
4496
4497 /* Delete all the elements with score between min and max from the skiplist.
4498 * Min and mx are inclusive, so a score >= min || score <= max is deleted.
4499 * Note that this function takes the reference to the hash table view of the
4500 * sorted set, in order to remove the elements from the hash table too. */
4501 static unsigned long zslDeleteRange(zskiplist *zsl, double min, double max, dict *dict) {
4502 zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
4503 unsigned long removed = 0;
4504 int i;
4505
4506 x = zsl->header;
4507 for (i = zsl->level-1; i >= 0; i--) {
4508 while (x->forward[i] && x->forward[i]->score < min)
4509 x = x->forward[i];
4510 update[i] = x;
4511 }
4512 /* We may have multiple elements with the same score, what we need
4513 * is to find the element with both the right score and object. */
4514 x = x->forward[0];
4515 while (x && x->score <= max) {
4516 zskiplistNode *next;
4517
4518 for (i = 0; i < zsl->level; i++) {
4519 if (update[i]->forward[i] != x) break;
4520 update[i]->forward[i] = x->forward[i];
4521 }
4522 if (x->forward[0]) {
4523 x->forward[0]->backward = (x->backward == zsl->header) ?
4524 NULL : x->backward;
4525 } else {
4526 zsl->tail = x->backward;
4527 }
4528 next = x->forward[0];
4529 dictDelete(dict,x->obj);
4530 zslFreeNode(x);
4531 while(zsl->level > 1 && zsl->header->forward[zsl->level-1] == NULL)
4532 zsl->level--;
4533 zsl->length--;
4534 removed++;
4535 x = next;
4536 }
4537 return removed; /* not found */
4538 }
4539
4540 /* Find the first node having a score equal or greater than the specified one.
4541 * Returns NULL if there is no match. */
4542 static zskiplistNode *zslFirstWithScore(zskiplist *zsl, double score) {
4543 zskiplistNode *x;
4544 int i;
4545
4546 x = zsl->header;
4547 for (i = zsl->level-1; i >= 0; i--) {
4548 while (x->forward[i] && x->forward[i]->score < score)
4549 x = x->forward[i];
4550 }
4551 /* We may have multiple elements with the same score, what we need
4552 * is to find the element with both the right score and object. */
4553 return x->forward[0];
4554 }
4555
4556 /* The actual Z-commands implementations */
4557
4558 /* This generic command implements both ZADD and ZINCRBY.
4559 * scoreval is the score if the operation is a ZADD (doincrement == 0) or
4560 * the increment if the operation is a ZINCRBY (doincrement == 1). */
4561 static void zaddGenericCommand(redisClient *c, robj *key, robj *ele, double scoreval, int doincrement) {
4562 robj *zsetobj;
4563 zset *zs;
4564 double *score;
4565
4566 zsetobj = lookupKeyWrite(c->db,key);
4567 if (zsetobj == NULL) {
4568 zsetobj = createZsetObject();
4569 dictAdd(c->db->dict,key,zsetobj);
4570 incrRefCount(key);
4571 } else {
4572 if (zsetobj->type != REDIS_ZSET) {
4573 addReply(c,shared.wrongtypeerr);
4574 return;
4575 }
4576 }
4577 zs = zsetobj->ptr;
4578
4579 /* Ok now since we implement both ZADD and ZINCRBY here the code
4580 * needs to handle the two different conditions. It's all about setting
4581 * '*score', that is, the new score to set, to the right value. */
4582 score = zmalloc(sizeof(double));
4583 if (doincrement) {
4584 dictEntry *de;
4585
4586 /* Read the old score. If the element was not present starts from 0 */
4587 de = dictFind(zs->dict,ele);
4588 if (de) {
4589 double *oldscore = dictGetEntryVal(de);
4590 *score = *oldscore + scoreval;
4591 } else {
4592 *score = scoreval;
4593 }
4594 } else {
4595 *score = scoreval;
4596 }
4597
4598 /* What follows is a simple remove and re-insert operation that is common
4599 * to both ZADD and ZINCRBY... */
4600 if (dictAdd(zs->dict,ele,score) == DICT_OK) {
4601 /* case 1: New element */
4602 incrRefCount(ele); /* added to hash */
4603 zslInsert(zs->zsl,*score,ele);
4604 incrRefCount(ele); /* added to skiplist */
4605 server.dirty++;
4606 if (doincrement)
4607 addReplyDouble(c,*score);
4608 else
4609 addReply(c,shared.cone);
4610 } else {
4611 dictEntry *de;
4612 double *oldscore;
4613
4614 /* case 2: Score update operation */
4615 de = dictFind(zs->dict,ele);
4616 redisAssert(de != NULL);
4617 oldscore = dictGetEntryVal(de);
4618 if (*score != *oldscore) {
4619 int deleted;
4620
4621 /* Remove and insert the element in the skip list with new score */
4622 deleted = zslDelete(zs->zsl,*oldscore,ele);
4623 redisAssert(deleted != 0);
4624 zslInsert(zs->zsl,*score,ele);
4625 incrRefCount(ele);
4626 /* Update the score in the hash table */
4627 dictReplace(zs->dict,ele,score);
4628 server.dirty++;
4629 } else {
4630 zfree(score);
4631 }
4632 if (doincrement)
4633 addReplyDouble(c,*score);
4634 else
4635 addReply(c,shared.czero);
4636 }
4637 }
4638
4639 static void zaddCommand(redisClient *c) {
4640 double scoreval;
4641
4642 scoreval = strtod(c->argv[2]->ptr,NULL);
4643 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,0);
4644 }
4645
4646 static void zincrbyCommand(redisClient *c) {
4647 double scoreval;
4648
4649 scoreval = strtod(c->argv[2]->ptr,NULL);
4650 zaddGenericCommand(c,c->argv[1],c->argv[3],scoreval,1);
4651 }
4652
4653 static void zremCommand(redisClient *c) {
4654 robj *zsetobj;
4655 zset *zs;
4656
4657 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4658 if (zsetobj == NULL) {
4659 addReply(c,shared.czero);
4660 } else {
4661 dictEntry *de;
4662 double *oldscore;
4663 int deleted;
4664
4665 if (zsetobj->type != REDIS_ZSET) {
4666 addReply(c,shared.wrongtypeerr);
4667 return;
4668 }
4669 zs = zsetobj->ptr;
4670 de = dictFind(zs->dict,c->argv[2]);
4671 if (de == NULL) {
4672 addReply(c,shared.czero);
4673 return;
4674 }
4675 /* Delete from the skiplist */
4676 oldscore = dictGetEntryVal(de);
4677 deleted = zslDelete(zs->zsl,*oldscore,c->argv[2]);
4678 redisAssert(deleted != 0);
4679
4680 /* Delete from the hash table */
4681 dictDelete(zs->dict,c->argv[2]);
4682 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4683 server.dirty++;
4684 addReply(c,shared.cone);
4685 }
4686 }
4687
4688 static void zremrangebyscoreCommand(redisClient *c) {
4689 double min = strtod(c->argv[2]->ptr,NULL);
4690 double max = strtod(c->argv[3]->ptr,NULL);
4691 robj *zsetobj;
4692 zset *zs;
4693
4694 zsetobj = lookupKeyWrite(c->db,c->argv[1]);
4695 if (zsetobj == NULL) {
4696 addReply(c,shared.czero);
4697 } else {
4698 long deleted;
4699
4700 if (zsetobj->type != REDIS_ZSET) {
4701 addReply(c,shared.wrongtypeerr);
4702 return;
4703 }
4704 zs = zsetobj->ptr;
4705 deleted = zslDeleteRange(zs->zsl,min,max,zs->dict);
4706 if (htNeedsResize(zs->dict)) dictResize(zs->dict);
4707 server.dirty += deleted;
4708 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",deleted));
4709 }
4710 }
4711
4712 static void zrangeGenericCommand(redisClient *c, int reverse) {
4713 robj *o;
4714 int start = atoi(c->argv[2]->ptr);
4715 int end = atoi(c->argv[3]->ptr);
4716 int withscores = 0;
4717
4718 if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr,"withscores")) {
4719 withscores = 1;
4720 } else if (c->argc >= 5) {
4721 addReply(c,shared.syntaxerr);
4722 return;
4723 }
4724
4725 o = lookupKeyRead(c->db,c->argv[1]);
4726 if (o == NULL) {
4727 addReply(c,shared.nullmultibulk);
4728 } else {
4729 if (o->type != REDIS_ZSET) {
4730 addReply(c,shared.wrongtypeerr);
4731 } else {
4732 zset *zsetobj = o->ptr;
4733 zskiplist *zsl = zsetobj->zsl;
4734 zskiplistNode *ln;
4735
4736 int llen = zsl->length;
4737 int rangelen, j;
4738 robj *ele;
4739
4740 /* convert negative indexes */
4741 if (start < 0) start = llen+start;
4742 if (end < 0) end = llen+end;
4743 if (start < 0) start = 0;
4744 if (end < 0) end = 0;
4745
4746 /* indexes sanity checks */
4747 if (start > end || start >= llen) {
4748 /* Out of range start or start > end result in empty list */
4749 addReply(c,shared.emptymultibulk);
4750 return;
4751 }
4752 if (end >= llen) end = llen-1;
4753 rangelen = (end-start)+1;
4754
4755 /* Return the result in form of a multi-bulk reply */
4756 if (reverse) {
4757 ln = zsl->tail;
4758 while (start--)
4759 ln = ln->backward;
4760 } else {
4761 ln = zsl->header->forward[0];
4762 while (start--)
4763 ln = ln->forward[0];
4764 }
4765
4766 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",
4767 withscores ? (rangelen*2) : rangelen));
4768 for (j = 0; j < rangelen; j++) {
4769 ele = ln->obj;
4770 addReplyBulkLen(c,ele);
4771 addReply(c,ele);
4772 addReply(c,shared.crlf);
4773 if (withscores)
4774 addReplyDouble(c,ln->score);
4775 ln = reverse ? ln->backward : ln->forward[0];
4776 }
4777 }
4778 }
4779 }
4780
4781 static void zrangeCommand(redisClient *c) {
4782 zrangeGenericCommand(c,0);
4783 }
4784
4785 static void zrevrangeCommand(redisClient *c) {
4786 zrangeGenericCommand(c,1);
4787 }
4788
4789 static void zrangebyscoreCommand(redisClient *c) {
4790 robj *o;
4791 double min = strtod(c->argv[2]->ptr,NULL);
4792 double max = strtod(c->argv[3]->ptr,NULL);
4793 int offset = 0, limit = -1;
4794
4795 if (c->argc != 4 && c->argc != 7) {
4796 addReplySds(c,
4797 sdsnew("-ERR wrong number of arguments for ZRANGEBYSCORE\r\n"));
4798 return;
4799 } else if (c->argc == 7 && strcasecmp(c->argv[4]->ptr,"limit")) {
4800 addReply(c,shared.syntaxerr);
4801 return;
4802 } else if (c->argc == 7) {
4803 offset = atoi(c->argv[5]->ptr);
4804 limit = atoi(c->argv[6]->ptr);
4805 if (offset < 0) offset = 0;
4806 }
4807
4808 o = lookupKeyRead(c->db,c->argv[1]);
4809 if (o == NULL) {
4810 addReply(c,shared.nullmultibulk);
4811 } else {
4812 if (o->type != REDIS_ZSET) {
4813 addReply(c,shared.wrongtypeerr);
4814 } else {
4815 zset *zsetobj = o->ptr;
4816 zskiplist *zsl = zsetobj->zsl;
4817 zskiplistNode *ln;
4818 robj *ele, *lenobj;
4819 unsigned int rangelen = 0;
4820
4821 /* Get the first node with the score >= min */
4822 ln = zslFirstWithScore(zsl,min);
4823 if (ln == NULL) {
4824 /* No element matching the speciifed interval */
4825 addReply(c,shared.emptymultibulk);
4826 return;
4827 }
4828
4829 /* We don't know in advance how many matching elements there
4830 * are in the list, so we push this object that will represent
4831 * the multi-bulk length in the output buffer, and will "fix"
4832 * it later */
4833 lenobj = createObject(REDIS_STRING,NULL);
4834 addReply(c,lenobj);
4835 decrRefCount(lenobj);
4836
4837 while(ln && ln->score <= max) {
4838 if (offset) {
4839 offset--;
4840 ln = ln->forward[0];
4841 continue;
4842 }
4843 if (limit == 0) break;
4844 ele = ln->obj;
4845 addReplyBulkLen(c,ele);
4846 addReply(c,ele);
4847 addReply(c,shared.crlf);
4848 ln = ln->forward[0];
4849 rangelen++;
4850 if (limit > 0) limit--;
4851 }
4852 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",rangelen);
4853 }
4854 }
4855 }
4856
4857 static void zcardCommand(redisClient *c) {
4858 robj *o;
4859 zset *zs;
4860
4861 o = lookupKeyRead(c->db,c->argv[1]);
4862 if (o == NULL) {
4863 addReply(c,shared.czero);
4864 return;
4865 } else {
4866 if (o->type != REDIS_ZSET) {
4867 addReply(c,shared.wrongtypeerr);
4868 } else {
4869 zs = o->ptr;
4870 addReplySds(c,sdscatprintf(sdsempty(),":%lu\r\n",zs->zsl->length));
4871 }
4872 }
4873 }
4874
4875 static void zscoreCommand(redisClient *c) {
4876 robj *o;
4877 zset *zs;
4878
4879 o = lookupKeyRead(c->db,c->argv[1]);
4880 if (o == NULL) {
4881 addReply(c,shared.nullbulk);
4882 return;
4883 } else {
4884 if (o->type != REDIS_ZSET) {
4885 addReply(c,shared.wrongtypeerr);
4886 } else {
4887 dictEntry *de;
4888
4889 zs = o->ptr;
4890 de = dictFind(zs->dict,c->argv[2]);
4891 if (!de) {
4892 addReply(c,shared.nullbulk);
4893 } else {
4894 double *score = dictGetEntryVal(de);
4895
4896 addReplyDouble(c,*score);
4897 }
4898 }
4899 }
4900 }
4901
4902 /* ========================= Non type-specific commands ==================== */
4903
4904 static void flushdbCommand(redisClient *c) {
4905 server.dirty += dictSize(c->db->dict);
4906 dictEmpty(c->db->dict);
4907 dictEmpty(c->db->expires);
4908 addReply(c,shared.ok);
4909 }
4910
4911 static void flushallCommand(redisClient *c) {
4912 server.dirty += emptyDb();
4913 addReply(c,shared.ok);
4914 rdbSave(server.dbfilename);
4915 server.dirty++;
4916 }
4917
4918 static redisSortOperation *createSortOperation(int type, robj *pattern) {
4919 redisSortOperation *so = zmalloc(sizeof(*so));
4920 so->type = type;
4921 so->pattern = pattern;
4922 return so;
4923 }
4924
4925 /* Return the value associated to the key with a name obtained
4926 * substituting the first occurence of '*' in 'pattern' with 'subst' */
4927 static robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
4928 char *p;
4929 sds spat, ssub;
4930 robj keyobj;
4931 int prefixlen, sublen, postfixlen;
4932 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
4933 struct {
4934 long len;
4935 long free;
4936 char buf[REDIS_SORTKEY_MAX+1];
4937 } keyname;
4938
4939 /* If the pattern is "#" return the substitution object itself in order
4940 * to implement the "SORT ... GET #" feature. */
4941 spat = pattern->ptr;
4942 if (spat[0] == '#' && spat[1] == '\0') {
4943 return subst;
4944 }
4945
4946 /* The substitution object may be specially encoded. If so we create
4947 * a decoded object on the fly. Otherwise getDecodedObject will just
4948 * increment the ref count, that we'll decrement later. */
4949 subst = getDecodedObject(subst);
4950
4951 ssub = subst->ptr;
4952 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
4953 p = strchr(spat,'*');
4954 if (!p) {
4955 decrRefCount(subst);
4956 return NULL;
4957 }
4958
4959 prefixlen = p-spat;
4960 sublen = sdslen(ssub);
4961 postfixlen = sdslen(spat)-(prefixlen+1);
4962 memcpy(keyname.buf,spat,prefixlen);
4963 memcpy(keyname.buf+prefixlen,ssub,sublen);
4964 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
4965 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
4966 keyname.len = prefixlen+sublen+postfixlen;
4967
4968 initStaticStringObject(keyobj,((char*)&keyname)+(sizeof(long)*2))
4969 decrRefCount(subst);
4970
4971 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
4972 return lookupKeyRead(db,&keyobj);
4973 }
4974
4975 /* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
4976 * the additional parameter is not standard but a BSD-specific we have to
4977 * pass sorting parameters via the global 'server' structure */
4978 static int sortCompare(const void *s1, const void *s2) {
4979 const redisSortObject *so1 = s1, *so2 = s2;
4980 int cmp;
4981
4982 if (!server.sort_alpha) {
4983 /* Numeric sorting. Here it's trivial as we precomputed scores */
4984 if (so1->u.score > so2->u.score) {
4985 cmp = 1;
4986 } else if (so1->u.score < so2->u.score) {
4987 cmp = -1;
4988 } else {
4989 cmp = 0;
4990 }
4991 } else {
4992 /* Alphanumeric sorting */
4993 if (server.sort_bypattern) {
4994 if (!so1->u.cmpobj || !so2->u.cmpobj) {
4995 /* At least one compare object is NULL */
4996 if (so1->u.cmpobj == so2->u.cmpobj)
4997 cmp = 0;
4998 else if (so1->u.cmpobj == NULL)
4999 cmp = -1;
5000 else
5001 cmp = 1;
5002 } else {
5003 /* We have both the objects, use strcoll */
5004 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
5005 }
5006 } else {
5007 /* Compare elements directly */
5008 robj *dec1, *dec2;
5009
5010 dec1 = getDecodedObject(so1->obj);
5011 dec2 = getDecodedObject(so2->obj);
5012 cmp = strcoll(dec1->ptr,dec2->ptr);
5013 decrRefCount(dec1);
5014 decrRefCount(dec2);
5015 }
5016 }
5017 return server.sort_desc ? -cmp : cmp;
5018 }
5019
5020 /* The SORT command is the most complex command in Redis. Warning: this code
5021 * is optimized for speed and a bit less for readability */
5022 static void sortCommand(redisClient *c) {
5023 list *operations;
5024 int outputlen = 0;
5025 int desc = 0, alpha = 0;
5026 int limit_start = 0, limit_count = -1, start, end;
5027 int j, dontsort = 0, vectorlen;
5028 int getop = 0; /* GET operation counter */
5029 robj *sortval, *sortby = NULL, *storekey = NULL;
5030 redisSortObject *vector; /* Resulting vector to sort */
5031
5032 /* Lookup the key to sort. It must be of the right types */
5033 sortval = lookupKeyRead(c->db,c->argv[1]);
5034 if (sortval == NULL) {
5035 addReply(c,shared.nullmultibulk);
5036 return;
5037 }
5038 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST &&
5039 sortval->type != REDIS_ZSET)
5040 {
5041 addReply(c,shared.wrongtypeerr);
5042 return;
5043 }
5044
5045 /* Create a list of operations to perform for every sorted element.
5046 * Operations can be GET/DEL/INCR/DECR */
5047 operations = listCreate();
5048 listSetFreeMethod(operations,zfree);
5049 j = 2;
5050
5051 /* Now we need to protect sortval incrementing its count, in the future
5052 * SORT may have options able to overwrite/delete keys during the sorting
5053 * and the sorted key itself may get destroied */
5054 incrRefCount(sortval);
5055
5056 /* The SORT command has an SQL-alike syntax, parse it */
5057 while(j < c->argc) {
5058 int leftargs = c->argc-j-1;
5059 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
5060 desc = 0;
5061 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
5062 desc = 1;
5063 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
5064 alpha = 1;
5065 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
5066 limit_start = atoi(c->argv[j+1]->ptr);
5067 limit_count = atoi(c->argv[j+2]->ptr);
5068 j+=2;
5069 } else if (!strcasecmp(c->argv[j]->ptr,"store") && leftargs >= 1) {
5070 storekey = c->argv[j+1];
5071 j++;
5072 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
5073 sortby = c->argv[j+1];
5074 /* If the BY pattern does not contain '*', i.e. it is constant,
5075 * we don't need to sort nor to lookup the weight keys. */
5076 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
5077 j++;
5078 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
5079 listAddNodeTail(operations,createSortOperation(
5080 REDIS_SORT_GET,c->argv[j+1]));
5081 getop++;
5082 j++;
5083 } else {
5084 decrRefCount(sortval);
5085 listRelease(operations);
5086 addReply(c,shared.syntaxerr);
5087 return;
5088 }
5089 j++;
5090 }
5091
5092 /* Load the sorting vector with all the objects to sort */
5093 switch(sortval->type) {
5094 case REDIS_LIST: vectorlen = listLength((list*)sortval->ptr); break;
5095 case REDIS_SET: vectorlen = dictSize((dict*)sortval->ptr); break;
5096 case REDIS_ZSET: vectorlen = dictSize(((zset*)sortval->ptr)->dict); break;
5097 default: vectorlen = 0; redisAssert(0); /* Avoid GCC warning */
5098 }
5099 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
5100 j = 0;
5101
5102 if (sortval->type == REDIS_LIST) {
5103 list *list = sortval->ptr;
5104 listNode *ln;
5105
5106 listRewind(list);
5107 while((ln = listYield(list))) {
5108 robj *ele = ln->value;
5109 vector[j].obj = ele;
5110 vector[j].u.score = 0;
5111 vector[j].u.cmpobj = NULL;
5112 j++;
5113 }
5114 } else {
5115 dict *set;
5116 dictIterator *di;
5117 dictEntry *setele;
5118
5119 if (sortval->type == REDIS_SET) {
5120 set = sortval->ptr;
5121 } else {
5122 zset *zs = sortval->ptr;
5123 set = zs->dict;
5124 }
5125
5126 di = dictGetIterator(set);
5127 while((setele = dictNext(di)) != NULL) {
5128 vector[j].obj = dictGetEntryKey(setele);
5129 vector[j].u.score = 0;
5130 vector[j].u.cmpobj = NULL;
5131 j++;
5132 }
5133 dictReleaseIterator(di);
5134 }
5135 redisAssert(j == vectorlen);
5136
5137 /* Now it's time to load the right scores in the sorting vector */
5138 if (dontsort == 0) {
5139 for (j = 0; j < vectorlen; j++) {
5140 if (sortby) {
5141 robj *byval;
5142
5143 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
5144 if (!byval || byval->type != REDIS_STRING) continue;
5145 if (alpha) {
5146 vector[j].u.cmpobj = getDecodedObject(byval);
5147 } else {
5148 if (byval->encoding == REDIS_ENCODING_RAW) {
5149 vector[j].u.score = strtod(byval->ptr,NULL);
5150 } else {
5151 /* Don't need to decode the object if it's
5152 * integer-encoded (the only encoding supported) so
5153 * far. We can just cast it */
5154 if (byval->encoding == REDIS_ENCODING_INT) {
5155 vector[j].u.score = (long)byval->ptr;
5156 } else
5157 redisAssert(1 != 1);
5158 }
5159 }
5160 } else {
5161 if (!alpha) {
5162 if (vector[j].obj->encoding == REDIS_ENCODING_RAW)
5163 vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
5164 else {
5165 if (vector[j].obj->encoding == REDIS_ENCODING_INT)
5166 vector[j].u.score = (long) vector[j].obj->ptr;
5167 else
5168 redisAssert(1 != 1);
5169 }
5170 }
5171 }
5172 }
5173 }
5174
5175 /* We are ready to sort the vector... perform a bit of sanity check
5176 * on the LIMIT option too. We'll use a partial version of quicksort. */
5177 start = (limit_start < 0) ? 0 : limit_start;
5178 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
5179 if (start >= vectorlen) {
5180 start = vectorlen-1;
5181 end = vectorlen-2;
5182 }
5183 if (end >= vectorlen) end = vectorlen-1;
5184
5185 if (dontsort == 0) {
5186 server.sort_desc = desc;
5187 server.sort_alpha = alpha;
5188 server.sort_bypattern = sortby ? 1 : 0;
5189 if (sortby && (start != 0 || end != vectorlen-1))
5190 pqsort(vector,vectorlen,sizeof(redisSortObject),sortCompare, start,end);
5191 else
5192 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
5193 }
5194
5195 /* Send command output to the output buffer, performing the specified
5196 * GET/DEL/INCR/DECR operations if any. */
5197 outputlen = getop ? getop*(end-start+1) : end-start+1;
5198 if (storekey == NULL) {
5199 /* STORE option not specified, sent the sorting result to client */
5200 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
5201 for (j = start; j <= end; j++) {
5202 listNode *ln;
5203 if (!getop) {
5204 addReplyBulkLen(c,vector[j].obj);
5205 addReply(c,vector[j].obj);
5206 addReply(c,shared.crlf);
5207 }
5208 listRewind(operations);
5209 while((ln = listYield(operations))) {
5210 redisSortOperation *sop = ln->value;
5211 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5212 vector[j].obj);
5213
5214 if (sop->type == REDIS_SORT_GET) {
5215 if (!val || val->type != REDIS_STRING) {
5216 addReply(c,shared.nullbulk);
5217 } else {
5218 addReplyBulkLen(c,val);
5219 addReply(c,val);
5220 addReply(c,shared.crlf);
5221 }
5222 } else {
5223 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5224 }
5225 }
5226 }
5227 } else {
5228 robj *listObject = createListObject();
5229 list *listPtr = (list*) listObject->ptr;
5230
5231 /* STORE option specified, set the sorting result as a List object */
5232 for (j = start; j <= end; j++) {
5233 listNode *ln;
5234 if (!getop) {
5235 listAddNodeTail(listPtr,vector[j].obj);
5236 incrRefCount(vector[j].obj);
5237 }
5238 listRewind(operations);
5239 while((ln = listYield(operations))) {
5240 redisSortOperation *sop = ln->value;
5241 robj *val = lookupKeyByPattern(c->db,sop->pattern,
5242 vector[j].obj);
5243
5244 if (sop->type == REDIS_SORT_GET) {
5245 if (!val || val->type != REDIS_STRING) {
5246 listAddNodeTail(listPtr,createStringObject("",0));
5247 } else {
5248 listAddNodeTail(listPtr,val);
5249 incrRefCount(val);
5250 }
5251 } else {
5252 redisAssert(sop->type == REDIS_SORT_GET); /* always fails */
5253 }
5254 }
5255 }
5256 if (dictReplace(c->db->dict,storekey,listObject)) {
5257 incrRefCount(storekey);
5258 }
5259 /* Note: we add 1 because the DB is dirty anyway since even if the
5260 * SORT result is empty a new key is set and maybe the old content
5261 * replaced. */
5262 server.dirty += 1+outputlen;
5263 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",outputlen));
5264 }
5265
5266 /* Cleanup */
5267 decrRefCount(sortval);
5268 listRelease(operations);
5269 for (j = 0; j < vectorlen; j++) {
5270 if (sortby && alpha && vector[j].u.cmpobj)
5271 decrRefCount(vector[j].u.cmpobj);
5272 }
5273 zfree(vector);
5274 }
5275
5276 /* Create the string returned by the INFO command. This is decoupled
5277 * by the INFO command itself as we need to report the same information
5278 * on memory corruption problems. */
5279 static sds genRedisInfoString(void) {
5280 sds info;
5281 time_t uptime = time(NULL)-server.stat_starttime;
5282 int j;
5283
5284 info = sdscatprintf(sdsempty(),
5285 "redis_version:%s\r\n"
5286 "arch_bits:%s\r\n"
5287 "multiplexing_api:%s\r\n"
5288 "uptime_in_seconds:%ld\r\n"
5289 "uptime_in_days:%ld\r\n"
5290 "connected_clients:%d\r\n"
5291 "connected_slaves:%d\r\n"
5292 "blocked_clients:%d\r\n"
5293 "used_memory:%zu\r\n"
5294 "changes_since_last_save:%lld\r\n"
5295 "bgsave_in_progress:%d\r\n"
5296 "last_save_time:%ld\r\n"
5297 "bgrewriteaof_in_progress:%d\r\n"
5298 "total_connections_received:%lld\r\n"
5299 "total_commands_processed:%lld\r\n"
5300 "role:%s\r\n"
5301 ,REDIS_VERSION,
5302 (sizeof(long) == 8) ? "64" : "32",
5303 aeGetApiName(),
5304 uptime,
5305 uptime/(3600*24),
5306 listLength(server.clients)-listLength(server.slaves),
5307 listLength(server.slaves),
5308 server.blockedclients,
5309 server.usedmemory,
5310 server.dirty,
5311 server.bgsavechildpid != -1,
5312 server.lastsave,
5313 server.bgrewritechildpid != -1,
5314 server.stat_numconnections,
5315 server.stat_numcommands,
5316 server.masterhost == NULL ? "master" : "slave"
5317 );
5318 if (server.masterhost) {
5319 info = sdscatprintf(info,
5320 "master_host:%s\r\n"
5321 "master_port:%d\r\n"
5322 "master_link_status:%s\r\n"
5323 "master_last_io_seconds_ago:%d\r\n"
5324 ,server.masterhost,
5325 server.masterport,
5326 (server.replstate == REDIS_REPL_CONNECTED) ?
5327 "up" : "down",
5328 server.master ? ((int)(time(NULL)-server.master->lastinteraction)) : -1
5329 );
5330 }
5331 for (j = 0; j < server.dbnum; j++) {
5332 long long keys, vkeys;
5333
5334 keys = dictSize(server.db[j].dict);
5335 vkeys = dictSize(server.db[j].expires);
5336 if (keys || vkeys) {
5337 info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld\r\n",
5338 j, keys, vkeys);
5339 }
5340 }
5341 return info;
5342 }
5343
5344 static void infoCommand(redisClient *c) {
5345 sds info = genRedisInfoString();
5346 addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n",
5347 (unsigned long)sdslen(info)));
5348 addReplySds(c,info);
5349 addReply(c,shared.crlf);
5350 }
5351
5352 static void monitorCommand(redisClient *c) {
5353 /* ignore MONITOR if aleady slave or in monitor mode */
5354 if (c->flags & REDIS_SLAVE) return;
5355
5356 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
5357 c->slaveseldb = 0;
5358 listAddNodeTail(server.monitors,c);
5359 addReply(c,shared.ok);
5360 }
5361
5362 /* ================================= Expire ================================= */
5363 static int removeExpire(redisDb *db, robj *key) {
5364 if (dictDelete(db->expires,key) == DICT_OK) {
5365 return 1;
5366 } else {
5367 return 0;
5368 }
5369 }
5370
5371 static int setExpire(redisDb *db, robj *key, time_t when) {
5372 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
5373 return 0;
5374 } else {
5375 incrRefCount(key);
5376 return 1;
5377 }
5378 }
5379
5380 /* Return the expire time of the specified key, or -1 if no expire
5381 * is associated with this key (i.e. the key is non volatile) */
5382 static time_t getExpire(redisDb *db, robj *key) {
5383 dictEntry *de;
5384
5385 /* No expire? return ASAP */
5386 if (dictSize(db->expires) == 0 ||
5387 (de = dictFind(db->expires,key)) == NULL) return -1;
5388
5389 return (time_t) dictGetEntryVal(de);
5390 }
5391
5392 static int expireIfNeeded(redisDb *db, robj *key) {
5393 time_t when;
5394 dictEntry *de;
5395
5396 /* No expire? return ASAP */
5397 if (dictSize(db->expires) == 0 ||
5398 (de = dictFind(db->expires,key)) == NULL) return 0;
5399
5400 /* Lookup the expire */
5401 when = (time_t) dictGetEntryVal(de);
5402 if (time(NULL) <= when) return 0;
5403
5404 /* Delete the key */
5405 dictDelete(db->expires,key);
5406 return dictDelete(db->dict,key) == DICT_OK;
5407 }
5408
5409 static int deleteIfVolatile(redisDb *db, robj *key) {
5410 dictEntry *de;
5411
5412 /* No expire? return ASAP */
5413 if (dictSize(db->expires) == 0 ||
5414 (de = dictFind(db->expires,key)) == NULL) return 0;
5415
5416 /* Delete the key */
5417 server.dirty++;
5418 dictDelete(db->expires,key);
5419 return dictDelete(db->dict,key) == DICT_OK;
5420 }
5421
5422 static void expireGenericCommand(redisClient *c, robj *key, time_t seconds) {
5423 dictEntry *de;
5424
5425 de = dictFind(c->db->dict,key);
5426 if (de == NULL) {
5427 addReply(c,shared.czero);
5428 return;
5429 }
5430 if (seconds < 0) {
5431 if (deleteKey(c->db,key)) server.dirty++;
5432 addReply(c, shared.cone);
5433 return;
5434 } else {
5435 time_t when = time(NULL)+seconds;
5436 if (setExpire(c->db,key,when)) {
5437 addReply(c,shared.cone);
5438 server.dirty++;
5439 } else {
5440 addReply(c,shared.czero);
5441 }
5442 return;
5443 }
5444 }
5445
5446 static void expireCommand(redisClient *c) {
5447 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10));
5448 }
5449
5450 static void expireatCommand(redisClient *c) {
5451 expireGenericCommand(c,c->argv[1],strtol(c->argv[2]->ptr,NULL,10)-time(NULL));
5452 }
5453
5454 static void ttlCommand(redisClient *c) {
5455 time_t expire;
5456 int ttl = -1;
5457
5458 expire = getExpire(c->db,c->argv[1]);
5459 if (expire != -1) {
5460 ttl = (int) (expire-time(NULL));
5461 if (ttl < 0) ttl = -1;
5462 }
5463 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",ttl));
5464 }
5465
5466 /* ================================ MULTI/EXEC ============================== */
5467
5468 /* Client state initialization for MULTI/EXEC */
5469 static void initClientMultiState(redisClient *c) {
5470 c->mstate.commands = NULL;
5471 c->mstate.count = 0;
5472 }
5473
5474 /* Release all the resources associated with MULTI/EXEC state */
5475 static void freeClientMultiState(redisClient *c) {
5476 int j;
5477
5478 for (j = 0; j < c->mstate.count; j++) {
5479 int i;
5480 multiCmd *mc = c->mstate.commands+j;
5481
5482 for (i = 0; i < mc->argc; i++)
5483 decrRefCount(mc->argv[i]);
5484 zfree(mc->argv);
5485 }
5486 zfree(c->mstate.commands);
5487 }
5488
5489 /* Add a new command into the MULTI commands queue */
5490 static void queueMultiCommand(redisClient *c, struct redisCommand *cmd) {
5491 multiCmd *mc;
5492 int j;
5493
5494 c->mstate.commands = zrealloc(c->mstate.commands,
5495 sizeof(multiCmd)*(c->mstate.count+1));
5496 mc = c->mstate.commands+c->mstate.count;
5497 mc->cmd = cmd;
5498 mc->argc = c->argc;
5499 mc->argv = zmalloc(sizeof(robj*)*c->argc);
5500 memcpy(mc->argv,c->argv,sizeof(robj*)*c->argc);
5501 for (j = 0; j < c->argc; j++)
5502 incrRefCount(mc->argv[j]);
5503 c->mstate.count++;
5504 }
5505
5506 static void multiCommand(redisClient *c) {
5507 c->flags |= REDIS_MULTI;
5508 addReply(c,shared.ok);
5509 }
5510
5511 static void execCommand(redisClient *c) {
5512 int j;
5513 robj **orig_argv;
5514 int orig_argc;
5515
5516 if (!(c->flags & REDIS_MULTI)) {
5517 addReplySds(c,sdsnew("-ERR EXEC without MULTI\r\n"));
5518 return;
5519 }
5520
5521 orig_argv = c->argv;
5522 orig_argc = c->argc;
5523 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->mstate.count));
5524 for (j = 0; j < c->mstate.count; j++) {
5525 c->argc = c->mstate.commands[j].argc;
5526 c->argv = c->mstate.commands[j].argv;
5527 call(c,c->mstate.commands[j].cmd);
5528 }
5529 c->argv = orig_argv;
5530 c->argc = orig_argc;
5531 freeClientMultiState(c);
5532 initClientMultiState(c);
5533 c->flags &= (~REDIS_MULTI);
5534 }
5535
5536 /* =========================== Blocking Operations ========================= */
5537
5538 /* Currently Redis blocking operations support is limited to list POP ops,
5539 * so the current implementation is not fully generic, but it is also not
5540 * completely specific so it will not require a rewrite to support new
5541 * kind of blocking operations in the future.
5542 *
5543 * Still it's important to note that list blocking operations can be already
5544 * used as a notification mechanism in order to implement other blocking
5545 * operations at application level, so there must be a very strong evidence
5546 * of usefulness and generality before new blocking operations are implemented.
5547 *
5548 * This is how the current blocking POP works, we use BLPOP as example:
5549 * - If the user calls BLPOP and the key exists and contains a non empty list
5550 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
5551 * if there is not to block.
5552 * - If instead BLPOP is called and the key does not exists or the list is
5553 * empty we need to block. In order to do so we remove the notification for
5554 * new data to read in the client socket (so that we'll not serve new
5555 * requests if the blocking request is not served). Also we put the client
5556 * in a dictionary (db->blockingkeys) mapping keys to a list of clients
5557 * blocking for this keys.
5558 * - If a PUSH operation against a key with blocked clients waiting is
5559 * performed, we serve the first in the list: basically instead to push
5560 * the new element inside the list we return it to the (first / oldest)
5561 * blocking client, unblock the client, and remove it form the list.
5562 *
5563 * The above comment and the source code should be enough in order to understand
5564 * the implementation and modify / fix it later.
5565 */
5566
5567 /* Set a client in blocking mode for the specified key, with the specified
5568 * timeout */
5569 static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
5570 dictEntry *de;
5571 list *l;
5572 int j;
5573
5574 c->blockingkeys = zmalloc(sizeof(robj*)*numkeys);
5575 c->blockingkeysnum = numkeys;
5576 c->blockingto = timeout;
5577 for (j = 0; j < numkeys; j++) {
5578 /* Add the key in the client structure, to map clients -> keys */
5579 c->blockingkeys[j] = keys[j];
5580 incrRefCount(keys[j]);
5581
5582 /* And in the other "side", to map keys -> clients */
5583 de = dictFind(c->db->blockingkeys,keys[j]);
5584 if (de == NULL) {
5585 int retval;
5586
5587 /* For every key we take a list of clients blocked for it */
5588 l = listCreate();
5589 retval = dictAdd(c->db->blockingkeys,keys[j],l);
5590 incrRefCount(keys[j]);
5591 assert(retval == DICT_OK);
5592 } else {
5593 l = dictGetEntryVal(de);
5594 }
5595 listAddNodeTail(l,c);
5596 }
5597 /* Mark the client as a blocked client */
5598 c->flags |= REDIS_BLOCKED;
5599 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
5600 server.blockedclients++;
5601 }
5602
5603 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
5604 static void unblockClient(redisClient *c) {
5605 dictEntry *de;
5606 list *l;
5607 int j;
5608
5609 assert(c->blockingkeys != NULL);
5610 /* The client may wait for multiple keys, so unblock it for every key. */
5611 for (j = 0; j < c->blockingkeysnum; j++) {
5612 /* Remove this client from the list of clients waiting for this key. */
5613 de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
5614 assert(de != NULL);
5615 l = dictGetEntryVal(de);
5616 listDelNode(l,listSearchKey(l,c));
5617 /* If the list is empty we need to remove it to avoid wasting memory */
5618 if (listLength(l) == 0)
5619 dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
5620 decrRefCount(c->blockingkeys[j]);
5621 }
5622 /* Cleanup the client structure */
5623 zfree(c->blockingkeys);
5624 c->blockingkeys = NULL;
5625 c->flags &= (~REDIS_BLOCKED);
5626 server.blockedclients--;
5627 /* Ok now we are ready to get read events from socket, note that we
5628 * can't trap errors here as it's possible that unblockClients() is
5629 * called from freeClient() itself, and the only thing we can do
5630 * if we failed to register the READABLE event is to kill the client.
5631 * Still the following function should never fail in the real world as
5632 * we are sure the file descriptor is sane, and we exit on out of mem. */
5633 aeCreateFileEvent(server.el, c->fd, AE_READABLE, readQueryFromClient, c);
5634 /* As a final step we want to process data if there is some command waiting
5635 * in the input buffer. Note that this is safe even if unblockClient()
5636 * gets called from freeClient() because freeClient() will be smart
5637 * enough to call this function *after* c->querybuf was set to NULL. */
5638 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
5639 }
5640
5641 /* This should be called from any function PUSHing into lists.
5642 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
5643 * 'ele' is the element pushed.
5644 *
5645 * If the function returns 0 there was no client waiting for a list push
5646 * against this key.
5647 *
5648 * If the function returns 1 there was a client waiting for a list push
5649 * against this key, the element was passed to this client thus it's not
5650 * needed to actually add it to the list and the caller should return asap. */
5651 static int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
5652 struct dictEntry *de;
5653 redisClient *receiver;
5654 list *l;
5655 listNode *ln;
5656
5657 de = dictFind(c->db->blockingkeys,key);
5658 if (de == NULL) return 0;
5659 l = dictGetEntryVal(de);
5660 ln = listFirst(l);
5661 assert(ln != NULL);
5662 receiver = ln->value;
5663
5664 addReplySds(receiver,sdsnew("*2\r\n"));
5665 addReplyBulkLen(receiver,key);
5666 addReply(receiver,key);
5667 addReply(receiver,shared.crlf);
5668 addReplyBulkLen(receiver,ele);
5669 addReply(receiver,ele);
5670 addReply(receiver,shared.crlf);
5671 unblockClient(receiver);
5672 return 1;
5673 }
5674
5675 /* Blocking RPOP/LPOP */
5676 static void blockingPopGenericCommand(redisClient *c, int where) {
5677 robj *o;
5678 time_t timeout;
5679 int j;
5680
5681 for (j = 1; j < c->argc-1; j++) {
5682 o = lookupKeyWrite(c->db,c->argv[j]);
5683 if (o != NULL) {
5684 if (o->type != REDIS_LIST) {
5685 addReply(c,shared.wrongtypeerr);
5686 return;
5687 } else {
5688 list *list = o->ptr;
5689 if (listLength(list) != 0) {
5690 /* If the list contains elements fall back to the usual
5691 * non-blocking POP operation */
5692 robj *argv[2], **orig_argv;
5693 int orig_argc;
5694
5695 /* We need to alter the command arguments before to call
5696 * popGenericCommand() as the command takes a single key. */
5697 orig_argv = c->argv;
5698 orig_argc = c->argc;
5699 argv[1] = c->argv[j];
5700 c->argv = argv;
5701 c->argc = 2;
5702
5703 /* Also the return value is different, we need to output
5704 * the multi bulk reply header and the key name. The
5705 * "real" command will add the last element (the value)
5706 * for us. If this souds like an hack to you it's just
5707 * because it is... */
5708 addReplySds(c,sdsnew("*2\r\n"));
5709 addReplyBulkLen(c,argv[1]);
5710 addReply(c,argv[1]);
5711 addReply(c,shared.crlf);
5712 popGenericCommand(c,where);
5713
5714 /* Fix the client structure with the original stuff */
5715 c->argv = orig_argv;
5716 c->argc = orig_argc;
5717 return;
5718 }
5719 }
5720 }
5721 }
5722 /* If the list is empty or the key does not exists we must block */
5723 timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
5724 if (timeout > 0) timeout += time(NULL);
5725 blockForKeys(c,c->argv+1,c->argc-2,timeout);
5726 }
5727
5728 static void blpopCommand(redisClient *c) {
5729 blockingPopGenericCommand(c,REDIS_HEAD);
5730 }
5731
5732 static void brpopCommand(redisClient *c) {
5733 blockingPopGenericCommand(c,REDIS_TAIL);
5734 }
5735
5736 /* =============================== Replication ============================= */
5737
5738 static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
5739 ssize_t nwritten, ret = size;
5740 time_t start = time(NULL);
5741
5742 timeout++;
5743 while(size) {
5744 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
5745 nwritten = write(fd,ptr,size);
5746 if (nwritten == -1) return -1;
5747 ptr += nwritten;
5748 size -= nwritten;
5749 }
5750 if ((time(NULL)-start) > timeout) {
5751 errno = ETIMEDOUT;
5752 return -1;
5753 }
5754 }
5755 return ret;
5756 }
5757
5758 static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
5759 ssize_t nread, totread = 0;
5760 time_t start = time(NULL);
5761
5762 timeout++;
5763 while(size) {
5764 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
5765 nread = read(fd,ptr,size);
5766 if (nread == -1) return -1;
5767 ptr += nread;
5768 size -= nread;
5769 totread += nread;
5770 }
5771 if ((time(NULL)-start) > timeout) {
5772 errno = ETIMEDOUT;
5773 return -1;
5774 }
5775 }
5776 return totread;
5777 }
5778
5779 static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
5780 ssize_t nread = 0;
5781
5782 size--;
5783 while(size) {
5784 char c;
5785
5786 if (syncRead(fd,&c,1,timeout) == -1) return -1;
5787 if (c == '\n') {
5788 *ptr = '\0';
5789 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
5790 return nread;
5791 } else {
5792 *ptr++ = c;
5793 *ptr = '\0';
5794 nread++;
5795 }
5796 }
5797 return nread;
5798 }
5799
5800 static void syncCommand(redisClient *c) {
5801 /* ignore SYNC if aleady slave or in monitor mode */
5802 if (c->flags & REDIS_SLAVE) return;
5803
5804 /* SYNC can't be issued when the server has pending data to send to
5805 * the client about already issued commands. We need a fresh reply
5806 * buffer registering the differences between the BGSAVE and the current
5807 * dataset, so that we can copy to other slaves if needed. */
5808 if (listLength(c->reply) != 0) {
5809 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
5810 return;
5811 }
5812
5813 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
5814 /* Here we need to check if there is a background saving operation
5815 * in progress, or if it is required to start one */
5816 if (server.bgsavechildpid != -1) {
5817 /* Ok a background save is in progress. Let's check if it is a good
5818 * one for replication, i.e. if there is another slave that is
5819 * registering differences since the server forked to save */
5820 redisClient *slave;
5821 listNode *ln;
5822
5823 listRewind(server.slaves);
5824 while((ln = listYield(server.slaves))) {
5825 slave = ln->value;
5826 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
5827 }
5828 if (ln) {
5829 /* Perfect, the server is already registering differences for
5830 * another slave. Set the right state, and copy the buffer. */
5831 listRelease(c->reply);
5832 c->reply = listDup(slave->reply);
5833 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5834 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
5835 } else {
5836 /* No way, we need to wait for the next BGSAVE in order to
5837 * register differences */
5838 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
5839 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
5840 }
5841 } else {
5842 /* Ok we don't have a BGSAVE in progress, let's start one */
5843 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
5844 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5845 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
5846 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
5847 return;
5848 }
5849 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5850 }
5851 c->repldbfd = -1;
5852 c->flags |= REDIS_SLAVE;
5853 c->slaveseldb = 0;
5854 listAddNodeTail(server.slaves,c);
5855 return;
5856 }
5857
5858 static void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
5859 redisClient *slave = privdata;
5860 REDIS_NOTUSED(el);
5861 REDIS_NOTUSED(mask);
5862 char buf[REDIS_IOBUF_LEN];
5863 ssize_t nwritten, buflen;
5864
5865 if (slave->repldboff == 0) {
5866 /* Write the bulk write count before to transfer the DB. In theory here
5867 * we don't know how much room there is in the output buffer of the
5868 * socket, but in pratice SO_SNDLOWAT (the minimum count for output
5869 * operations) will never be smaller than the few bytes we need. */
5870 sds bulkcount;
5871
5872 bulkcount = sdscatprintf(sdsempty(),"$%lld\r\n",(unsigned long long)
5873 slave->repldbsize);
5874 if (write(fd,bulkcount,sdslen(bulkcount)) != (signed)sdslen(bulkcount))
5875 {
5876 sdsfree(bulkcount);
5877 freeClient(slave);
5878 return;
5879 }
5880 sdsfree(bulkcount);
5881 }
5882 lseek(slave->repldbfd,slave->repldboff,SEEK_SET);
5883 buflen = read(slave->repldbfd,buf,REDIS_IOBUF_LEN);
5884 if (buflen <= 0) {
5885 redisLog(REDIS_WARNING,"Read error sending DB to slave: %s",
5886 (buflen == 0) ? "premature EOF" : strerror(errno));
5887 freeClient(slave);
5888 return;
5889 }
5890 if ((nwritten = write(fd,buf,buflen)) == -1) {
5891 redisLog(REDIS_DEBUG,"Write error sending DB to slave: %s",
5892 strerror(errno));
5893 freeClient(slave);
5894 return;
5895 }
5896 slave->repldboff += nwritten;
5897 if (slave->repldboff == slave->repldbsize) {
5898 close(slave->repldbfd);
5899 slave->repldbfd = -1;
5900 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5901 slave->replstate = REDIS_REPL_ONLINE;
5902 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE,
5903 sendReplyToClient, slave) == AE_ERR) {
5904 freeClient(slave);
5905 return;
5906 }
5907 addReplySds(slave,sdsempty());
5908 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
5909 }
5910 }
5911
5912 /* This function is called at the end of every backgrond saving.
5913 * The argument bgsaveerr is REDIS_OK if the background saving succeeded
5914 * otherwise REDIS_ERR is passed to the function.
5915 *
5916 * The goal of this function is to handle slaves waiting for a successful
5917 * background saving in order to perform non-blocking synchronization. */
5918 static void updateSlavesWaitingBgsave(int bgsaveerr) {
5919 listNode *ln;
5920 int startbgsave = 0;
5921
5922 listRewind(server.slaves);
5923 while((ln = listYield(server.slaves))) {
5924 redisClient *slave = ln->value;
5925
5926 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
5927 startbgsave = 1;
5928 slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;
5929 } else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {
5930 struct redis_stat buf;
5931
5932 if (bgsaveerr != REDIS_OK) {
5933 freeClient(slave);
5934 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE child returned an error");
5935 continue;
5936 }
5937 if ((slave->repldbfd = open(server.dbfilename,O_RDONLY)) == -1 ||
5938 redis_fstat(slave->repldbfd,&buf) == -1) {
5939 freeClient(slave);
5940 redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));
5941 continue;
5942 }
5943 slave->repldboff = 0;
5944 slave->repldbsize = buf.st_size;
5945 slave->replstate = REDIS_REPL_SEND_BULK;
5946 aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);
5947 if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) {
5948 freeClient(slave);
5949 continue;
5950 }
5951 }
5952 }
5953 if (startbgsave) {
5954 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
5955 listRewind(server.slaves);
5956 redisLog(REDIS_WARNING,"SYNC failed. BGSAVE failed");
5957 while((ln = listYield(server.slaves))) {
5958 redisClient *slave = ln->value;
5959
5960 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)
5961 freeClient(slave);
5962 }
5963 }
5964 }
5965 }
5966
5967 static int syncWithMaster(void) {
5968 char buf[1024], tmpfile[256], authcmd[1024];
5969 int dumpsize;
5970 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
5971 int dfd;
5972
5973 if (fd == -1) {
5974 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
5975 strerror(errno));
5976 return REDIS_ERR;
5977 }
5978
5979 /* AUTH with the master if required. */
5980 if(server.masterauth) {
5981 snprintf(authcmd, 1024, "AUTH %s\r\n", server.masterauth);
5982 if (syncWrite(fd, authcmd, strlen(server.masterauth)+7, 5) == -1) {
5983 close(fd);
5984 redisLog(REDIS_WARNING,"Unable to AUTH to MASTER: %s",
5985 strerror(errno));
5986 return REDIS_ERR;
5987 }
5988 /* Read the AUTH result. */
5989 if (syncReadLine(fd,buf,1024,3600) == -1) {
5990 close(fd);
5991 redisLog(REDIS_WARNING,"I/O error reading auth result from MASTER: %s",
5992 strerror(errno));
5993 return REDIS_ERR;
5994 }
5995 if (buf[0] != '+') {
5996 close(fd);
5997 redisLog(REDIS_WARNING,"Cannot AUTH to MASTER, is the masterauth password correct?");
5998 return REDIS_ERR;
5999 }
6000 }
6001
6002 /* Issue the SYNC command */
6003 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
6004 close(fd);
6005 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
6006 strerror(errno));
6007 return REDIS_ERR;
6008 }
6009 /* Read the bulk write count */
6010 if (syncReadLine(fd,buf,1024,3600) == -1) {
6011 close(fd);
6012 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
6013 strerror(errno));
6014 return REDIS_ERR;
6015 }
6016 if (buf[0] != '$') {
6017 close(fd);
6018 redisLog(REDIS_WARNING,"Bad protocol from MASTER, the first byte is not '$', are you sure the host and port are right?");
6019 return REDIS_ERR;
6020 }
6021 dumpsize = atoi(buf+1);
6022 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
6023 /* Read the bulk write data on a temp file */
6024 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
6025 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
6026 if (dfd == -1) {
6027 close(fd);
6028 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
6029 return REDIS_ERR;
6030 }
6031 while(dumpsize) {
6032 int nread, nwritten;
6033
6034 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
6035 if (nread == -1) {
6036 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
6037 strerror(errno));
6038 close(fd);
6039 close(dfd);
6040 return REDIS_ERR;
6041 }
6042 nwritten = write(dfd,buf,nread);
6043 if (nwritten == -1) {
6044 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
6045 close(fd);
6046 close(dfd);
6047 return REDIS_ERR;
6048 }
6049 dumpsize -= nread;
6050 }
6051 close(dfd);
6052 if (rename(tmpfile,server.dbfilename) == -1) {
6053 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
6054 unlink(tmpfile);
6055 close(fd);
6056 return REDIS_ERR;
6057 }
6058 emptyDb();
6059 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6060 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
6061 close(fd);
6062 return REDIS_ERR;
6063 }
6064 server.master = createClient(fd);
6065 server.master->flags |= REDIS_MASTER;
6066 server.master->authenticated = 1;
6067 server.replstate = REDIS_REPL_CONNECTED;
6068 return REDIS_OK;
6069 }
6070
6071 static void slaveofCommand(redisClient *c) {
6072 if (!strcasecmp(c->argv[1]->ptr,"no") &&
6073 !strcasecmp(c->argv[2]->ptr,"one")) {
6074 if (server.masterhost) {
6075 sdsfree(server.masterhost);
6076 server.masterhost = NULL;
6077 if (server.master) freeClient(server.master);
6078 server.replstate = REDIS_REPL_NONE;
6079 redisLog(REDIS_NOTICE,"MASTER MODE enabled (user request)");
6080 }
6081 } else {
6082 sdsfree(server.masterhost);
6083 server.masterhost = sdsdup(c->argv[1]->ptr);
6084 server.masterport = atoi(c->argv[2]->ptr);
6085 if (server.master) freeClient(server.master);
6086 server.replstate = REDIS_REPL_CONNECT;
6087 redisLog(REDIS_NOTICE,"SLAVE OF %s:%d enabled (user request)",
6088 server.masterhost, server.masterport);
6089 }
6090 addReply(c,shared.ok);
6091 }
6092
6093 /* ============================ Maxmemory directive ======================== */
6094
6095 /* This function gets called when 'maxmemory' is set on the config file to limit
6096 * the max memory used by the server, and we are out of memory.
6097 * This function will try to, in order:
6098 *
6099 * - Free objects from the free list
6100 * - Try to remove keys with an EXPIRE set
6101 *
6102 * It is not possible to free enough memory to reach used-memory < maxmemory
6103 * the server will start refusing commands that will enlarge even more the
6104 * memory usage.
6105 */
6106 static void freeMemoryIfNeeded(void) {
6107 while (server.maxmemory && zmalloc_used_memory() > server.maxmemory) {
6108 if (listLength(server.objfreelist)) {
6109 robj *o;
6110
6111 listNode *head = listFirst(server.objfreelist);
6112 o = listNodeValue(head);
6113 listDelNode(server.objfreelist,head);
6114 zfree(o);
6115 } else {
6116 int j, k, freed = 0;
6117
6118 for (j = 0; j < server.dbnum; j++) {
6119 int minttl = -1;
6120 robj *minkey = NULL;
6121 struct dictEntry *de;
6122
6123 if (dictSize(server.db[j].expires)) {
6124 freed = 1;
6125 /* From a sample of three keys drop the one nearest to
6126 * the natural expire */
6127 for (k = 0; k < 3; k++) {
6128 time_t t;
6129
6130 de = dictGetRandomKey(server.db[j].expires);
6131 t = (time_t) dictGetEntryVal(de);
6132 if (minttl == -1 || t < minttl) {
6133 minkey = dictGetEntryKey(de);
6134 minttl = t;
6135 }
6136 }
6137 deleteKey(server.db+j,minkey);
6138 }
6139 }
6140 if (!freed) return; /* nothing to free... */
6141 }
6142 }
6143 }
6144
6145 /* ============================== Append Only file ========================== */
6146
6147 static void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
6148 sds buf = sdsempty();
6149 int j;
6150 ssize_t nwritten;
6151 time_t now;
6152 robj *tmpargv[3];
6153
6154 /* The DB this command was targetting is not the same as the last command
6155 * we appendend. To issue a SELECT command is needed. */
6156 if (dictid != server.appendseldb) {
6157 char seldb[64];
6158
6159 snprintf(seldb,sizeof(seldb),"%d",dictid);
6160 buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
6161 (unsigned long)strlen(seldb),seldb);
6162 server.appendseldb = dictid;
6163 }
6164
6165 /* "Fix" the argv vector if the command is EXPIRE. We want to translate
6166 * EXPIREs into EXPIREATs calls */
6167 if (cmd->proc == expireCommand) {
6168 long when;
6169
6170 tmpargv[0] = createStringObject("EXPIREAT",8);
6171 tmpargv[1] = argv[1];
6172 incrRefCount(argv[1]);
6173 when = time(NULL)+strtol(argv[2]->ptr,NULL,10);
6174 tmpargv[2] = createObject(REDIS_STRING,
6175 sdscatprintf(sdsempty(),"%ld",when));
6176 argv = tmpargv;
6177 }
6178
6179 /* Append the actual command */
6180 buf = sdscatprintf(buf,"*%d\r\n",argc);
6181 for (j = 0; j < argc; j++) {
6182 robj *o = argv[j];
6183
6184 o = getDecodedObject(o);
6185 buf = sdscatprintf(buf,"$%lu\r\n",(unsigned long)sdslen(o->ptr));
6186 buf = sdscatlen(buf,o->ptr,sdslen(o->ptr));
6187 buf = sdscatlen(buf,"\r\n",2);
6188 decrRefCount(o);
6189 }
6190
6191 /* Free the objects from the modified argv for EXPIREAT */
6192 if (cmd->proc == expireCommand) {
6193 for (j = 0; j < 3; j++)
6194 decrRefCount(argv[j]);
6195 }
6196
6197 /* We want to perform a single write. This should be guaranteed atomic
6198 * at least if the filesystem we are writing is a real physical one.
6199 * While this will save us against the server being killed I don't think
6200 * there is much to do about the whole server stopping for power problems
6201 * or alike */
6202 nwritten = write(server.appendfd,buf,sdslen(buf));
6203 if (nwritten != (signed)sdslen(buf)) {
6204 /* Ooops, we are in troubles. The best thing to do for now is
6205 * to simply exit instead to give the illusion that everything is
6206 * working as expected. */
6207 if (nwritten == -1) {
6208 redisLog(REDIS_WARNING,"Exiting on error writing to the append-only file: %s",strerror(errno));
6209 } else {
6210 redisLog(REDIS_WARNING,"Exiting on short write while writing to the append-only file: %s",strerror(errno));
6211 }
6212 exit(1);
6213 }
6214 /* If a background append only file rewriting is in progress we want to
6215 * accumulate the differences between the child DB and the current one
6216 * in a buffer, so that when the child process will do its work we
6217 * can append the differences to the new append only file. */
6218 if (server.bgrewritechildpid != -1)
6219 server.bgrewritebuf = sdscatlen(server.bgrewritebuf,buf,sdslen(buf));
6220
6221 sdsfree(buf);
6222 now = time(NULL);
6223 if (server.appendfsync == APPENDFSYNC_ALWAYS ||
6224 (server.appendfsync == APPENDFSYNC_EVERYSEC &&
6225 now-server.lastfsync > 1))
6226 {
6227 fsync(server.appendfd); /* Let's try to get this data on the disk */
6228 server.lastfsync = now;
6229 }
6230 }
6231
6232 /* In Redis commands are always executed in the context of a client, so in
6233 * order to load the append only file we need to create a fake client. */
6234 static struct redisClient *createFakeClient(void) {
6235 struct redisClient *c = zmalloc(sizeof(*c));
6236
6237 selectDb(c,0);
6238 c->fd = -1;
6239 c->querybuf = sdsempty();
6240 c->argc = 0;
6241 c->argv = NULL;
6242 c->flags = 0;
6243 /* We set the fake client as a slave waiting for the synchronization
6244 * so that Redis will not try to send replies to this client. */
6245 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
6246 c->reply = listCreate();
6247 listSetFreeMethod(c->reply,decrRefCount);
6248 listSetDupMethod(c->reply,dupClientReplyValue);
6249 return c;
6250 }
6251
6252 static void freeFakeClient(struct redisClient *c) {
6253 sdsfree(c->querybuf);
6254 listRelease(c->reply);
6255 zfree(c);
6256 }
6257
6258 /* Replay the append log file. On error REDIS_OK is returned. On non fatal
6259 * error (the append only file is zero-length) REDIS_ERR is returned. On
6260 * fatal error an error message is logged and the program exists. */
6261 int loadAppendOnlyFile(char *filename) {
6262 struct redisClient *fakeClient;
6263 FILE *fp = fopen(filename,"r");
6264 struct redis_stat sb;
6265
6266 if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
6267 return REDIS_ERR;
6268
6269 if (fp == NULL) {
6270 redisLog(REDIS_WARNING,"Fatal error: can't open the append log file for reading: %s",strerror(errno));
6271 exit(1);
6272 }
6273
6274 fakeClient = createFakeClient();
6275 while(1) {
6276 int argc, j;
6277 unsigned long len;
6278 robj **argv;
6279 char buf[128];
6280 sds argsds;
6281 struct redisCommand *cmd;
6282
6283 if (fgets(buf,sizeof(buf),fp) == NULL) {
6284 if (feof(fp))
6285 break;
6286 else
6287 goto readerr;
6288 }
6289 if (buf[0] != '*') goto fmterr;
6290 argc = atoi(buf+1);
6291 argv = zmalloc(sizeof(robj*)*argc);
6292 for (j = 0; j < argc; j++) {
6293 if (fgets(buf,sizeof(buf),fp) == NULL) goto readerr;
6294 if (buf[0] != '$') goto fmterr;
6295 len = strtol(buf+1,NULL,10);
6296 argsds = sdsnewlen(NULL,len);
6297 if (len && fread(argsds,len,1,fp) == 0) goto fmterr;
6298 argv[j] = createObject(REDIS_STRING,argsds);
6299 if (fread(buf,2,1,fp) == 0) goto fmterr; /* discard CRLF */
6300 }
6301
6302 /* Command lookup */
6303 cmd = lookupCommand(argv[0]->ptr);
6304 if (!cmd) {
6305 redisLog(REDIS_WARNING,"Unknown command '%s' reading the append only file", argv[0]->ptr);
6306 exit(1);
6307 }
6308 /* Try object sharing and encoding */
6309 if (server.shareobjects) {
6310 int j;
6311 for(j = 1; j < argc; j++)
6312 argv[j] = tryObjectSharing(argv[j]);
6313 }
6314 if (cmd->flags & REDIS_CMD_BULK)
6315 tryObjectEncoding(argv[argc-1]);
6316 /* Run the command in the context of a fake client */
6317 fakeClient->argc = argc;
6318 fakeClient->argv = argv;
6319 cmd->proc(fakeClient);
6320 /* Discard the reply objects list from the fake client */
6321 while(listLength(fakeClient->reply))
6322 listDelNode(fakeClient->reply,listFirst(fakeClient->reply));
6323 /* Clean up, ready for the next command */
6324 for (j = 0; j < argc; j++) decrRefCount(argv[j]);
6325 zfree(argv);
6326 }
6327 fclose(fp);
6328 freeFakeClient(fakeClient);
6329 return REDIS_OK;
6330
6331 readerr:
6332 if (feof(fp)) {
6333 redisLog(REDIS_WARNING,"Unexpected end of file reading the append only file");
6334 } else {
6335 redisLog(REDIS_WARNING,"Unrecoverable error reading the append only file: %s", strerror(errno));
6336 }
6337 exit(1);
6338 fmterr:
6339 redisLog(REDIS_WARNING,"Bad file format reading the append only file");
6340 exit(1);
6341 }
6342
6343 /* Write an object into a file in the bulk format $<count>\r\n<payload>\r\n */
6344 static int fwriteBulk(FILE *fp, robj *obj) {
6345 char buf[128];
6346 obj = getDecodedObject(obj);
6347 snprintf(buf,sizeof(buf),"$%ld\r\n",(long)sdslen(obj->ptr));
6348 if (fwrite(buf,strlen(buf),1,fp) == 0) goto err;
6349 if (sdslen(obj->ptr) && fwrite(obj->ptr,sdslen(obj->ptr),1,fp) == 0)
6350 goto err;
6351 if (fwrite("\r\n",2,1,fp) == 0) goto err;
6352 decrRefCount(obj);
6353 return 1;
6354 err:
6355 decrRefCount(obj);
6356 return 0;
6357 }
6358
6359 /* Write a double value in bulk format $<count>\r\n<payload>\r\n */
6360 static int fwriteBulkDouble(FILE *fp, double d) {
6361 char buf[128], dbuf[128];
6362
6363 snprintf(dbuf,sizeof(dbuf),"%.17g\r\n",d);
6364 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(dbuf)-2);
6365 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6366 if (fwrite(dbuf,strlen(dbuf),1,fp) == 0) return 0;
6367 return 1;
6368 }
6369
6370 /* Write a long value in bulk format $<count>\r\n<payload>\r\n */
6371 static int fwriteBulkLong(FILE *fp, long l) {
6372 char buf[128], lbuf[128];
6373
6374 snprintf(lbuf,sizeof(lbuf),"%ld\r\n",l);
6375 snprintf(buf,sizeof(buf),"$%lu\r\n",(unsigned long)strlen(lbuf)-2);
6376 if (fwrite(buf,strlen(buf),1,fp) == 0) return 0;
6377 if (fwrite(lbuf,strlen(lbuf),1,fp) == 0) return 0;
6378 return 1;
6379 }
6380
6381 /* Write a sequence of commands able to fully rebuild the dataset into
6382 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. */
6383 static int rewriteAppendOnlyFile(char *filename) {
6384 dictIterator *di = NULL;
6385 dictEntry *de;
6386 FILE *fp;
6387 char tmpfile[256];
6388 int j;
6389 time_t now = time(NULL);
6390
6391 /* Note that we have to use a different temp name here compared to the
6392 * one used by rewriteAppendOnlyFileBackground() function. */
6393 snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
6394 fp = fopen(tmpfile,"w");
6395 if (!fp) {
6396 redisLog(REDIS_WARNING, "Failed rewriting the append only file: %s", strerror(errno));
6397 return REDIS_ERR;
6398 }
6399 for (j = 0; j < server.dbnum; j++) {
6400 char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
6401 redisDb *db = server.db+j;
6402 dict *d = db->dict;
6403 if (dictSize(d) == 0) continue;
6404 di = dictGetIterator(d);
6405 if (!di) {
6406 fclose(fp);
6407 return REDIS_ERR;
6408 }
6409
6410 /* SELECT the new DB */
6411 if (fwrite(selectcmd,sizeof(selectcmd)-1,1,fp) == 0) goto werr;
6412 if (fwriteBulkLong(fp,j) == 0) goto werr;
6413
6414 /* Iterate this DB writing every entry */
6415 while((de = dictNext(di)) != NULL) {
6416 robj *key = dictGetEntryKey(de);
6417 robj *o = dictGetEntryVal(de);
6418 time_t expiretime = getExpire(db,key);
6419
6420 /* Save the key and associated value */
6421 if (o->type == REDIS_STRING) {
6422 /* Emit a SET command */
6423 char cmd[]="*3\r\n$3\r\nSET\r\n";
6424 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6425 /* Key and value */
6426 if (fwriteBulk(fp,key) == 0) goto werr;
6427 if (fwriteBulk(fp,o) == 0) goto werr;
6428 } else if (o->type == REDIS_LIST) {
6429 /* Emit the RPUSHes needed to rebuild the list */
6430 list *list = o->ptr;
6431 listNode *ln;
6432
6433 listRewind(list);
6434 while((ln = listYield(list))) {
6435 char cmd[]="*3\r\n$5\r\nRPUSH\r\n";
6436 robj *eleobj = listNodeValue(ln);
6437
6438 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6439 if (fwriteBulk(fp,key) == 0) goto werr;
6440 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6441 }
6442 } else if (o->type == REDIS_SET) {
6443 /* Emit the SADDs needed to rebuild the set */
6444 dict *set = o->ptr;
6445 dictIterator *di = dictGetIterator(set);
6446 dictEntry *de;
6447
6448 while((de = dictNext(di)) != NULL) {
6449 char cmd[]="*3\r\n$4\r\nSADD\r\n";
6450 robj *eleobj = dictGetEntryKey(de);
6451
6452 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6453 if (fwriteBulk(fp,key) == 0) goto werr;
6454 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6455 }
6456 dictReleaseIterator(di);
6457 } else if (o->type == REDIS_ZSET) {
6458 /* Emit the ZADDs needed to rebuild the sorted set */
6459 zset *zs = o->ptr;
6460 dictIterator *di = dictGetIterator(zs->dict);
6461 dictEntry *de;
6462
6463 while((de = dictNext(di)) != NULL) {
6464 char cmd[]="*4\r\n$4\r\nZADD\r\n";
6465 robj *eleobj = dictGetEntryKey(de);
6466 double *score = dictGetEntryVal(de);
6467
6468 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6469 if (fwriteBulk(fp,key) == 0) goto werr;
6470 if (fwriteBulkDouble(fp,*score) == 0) goto werr;
6471 if (fwriteBulk(fp,eleobj) == 0) goto werr;
6472 }
6473 dictReleaseIterator(di);
6474 } else {
6475 redisAssert(0 != 0);
6476 }
6477 /* Save the expire time */
6478 if (expiretime != -1) {
6479 char cmd[]="*3\r\n$8\r\nEXPIREAT\r\n";
6480 /* If this key is already expired skip it */
6481 if (expiretime < now) continue;
6482 if (fwrite(cmd,sizeof(cmd)-1,1,fp) == 0) goto werr;
6483 if (fwriteBulk(fp,key) == 0) goto werr;
6484 if (fwriteBulkLong(fp,expiretime) == 0) goto werr;
6485 }
6486 }
6487 dictReleaseIterator(di);
6488 }
6489
6490 /* Make sure data will not remain on the OS's output buffers */
6491 fflush(fp);
6492 fsync(fileno(fp));
6493 fclose(fp);
6494
6495 /* Use RENAME to make sure the DB file is changed atomically only
6496 * if the generate DB file is ok. */
6497 if (rename(tmpfile,filename) == -1) {
6498 redisLog(REDIS_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
6499 unlink(tmpfile);
6500 return REDIS_ERR;
6501 }
6502 redisLog(REDIS_NOTICE,"SYNC append only file rewrite performed");
6503 return REDIS_OK;
6504
6505 werr:
6506 fclose(fp);
6507 unlink(tmpfile);
6508 redisLog(REDIS_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
6509 if (di) dictReleaseIterator(di);
6510 return REDIS_ERR;
6511 }
6512
6513 /* This is how rewriting of the append only file in background works:
6514 *
6515 * 1) The user calls BGREWRITEAOF
6516 * 2) Redis calls this function, that forks():
6517 * 2a) the child rewrite the append only file in a temp file.
6518 * 2b) the parent accumulates differences in server.bgrewritebuf.
6519 * 3) When the child finished '2a' exists.
6520 * 4) The parent will trap the exit code, if it's OK, will append the
6521 * data accumulated into server.bgrewritebuf into the temp file, and
6522 * finally will rename(2) the temp file in the actual file name.
6523 * The the new file is reopened as the new append only file. Profit!
6524 */
6525 static int rewriteAppendOnlyFileBackground(void) {
6526 pid_t childpid;
6527
6528 if (server.bgrewritechildpid != -1) return REDIS_ERR;
6529 if ((childpid = fork()) == 0) {
6530 /* Child */
6531 char tmpfile[256];
6532 close(server.fd);
6533
6534 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
6535 if (rewriteAppendOnlyFile(tmpfile) == REDIS_OK) {
6536 exit(0);
6537 } else {
6538 exit(1);
6539 }
6540 } else {
6541 /* Parent */
6542 if (childpid == -1) {
6543 redisLog(REDIS_WARNING,
6544 "Can't rewrite append only file in background: fork: %s",
6545 strerror(errno));
6546 return REDIS_ERR;
6547 }
6548 redisLog(REDIS_NOTICE,
6549 "Background append only file rewriting started by pid %d",childpid);
6550 server.bgrewritechildpid = childpid;
6551 /* We set appendseldb to -1 in order to force the next call to the
6552 * feedAppendOnlyFile() to issue a SELECT command, so the differences
6553 * accumulated by the parent into server.bgrewritebuf will start
6554 * with a SELECT statement and it will be safe to merge. */
6555 server.appendseldb = -1;
6556 return REDIS_OK;
6557 }
6558 return REDIS_OK; /* unreached */
6559 }
6560
6561 static void bgrewriteaofCommand(redisClient *c) {
6562 if (server.bgrewritechildpid != -1) {
6563 addReplySds(c,sdsnew("-ERR background append only file rewriting already in progress\r\n"));
6564 return;
6565 }
6566 if (rewriteAppendOnlyFileBackground() == REDIS_OK) {
6567 char *status = "+Background append only file rewriting started\r\n";
6568 addReplySds(c,sdsnew(status));
6569 } else {
6570 addReply(c,shared.err);
6571 }
6572 }
6573
6574 static void aofRemoveTempFile(pid_t childpid) {
6575 char tmpfile[256];
6576
6577 snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) childpid);
6578 unlink(tmpfile);
6579 }
6580
6581 /* =============================== Virtual Memory =========================== */
6582 static void vmInit(void) {
6583 off_t totsize;
6584
6585 server.vm_fp = fopen("/tmp/redisvm","w+b");
6586 if (server.vm_fp == NULL) {
6587 redisLog(REDIS_WARNING,"Impossible to open the swap file. Exiting.");
6588 exit(1);
6589 }
6590 server.vm_fd = fileno(server.vm_fp);
6591 server.vm_next_page = 0;
6592 server.vm_near_pages = 0;
6593 totsize = server.vm_pages*server.vm_page_size;
6594 redisLog(REDIS_NOTICE,"Allocating %lld bytes of swap file",totsize);
6595 if (ftruncate(server.vm_fd,totsize) == -1) {
6596 redisLog(REDIS_WARNING,"Can't ftruncate swap file: %s. Exiting.",
6597 strerror(errno));
6598 exit(1);
6599 } else {
6600 redisLog(REDIS_NOTICE,"Swap file allocated with success");
6601 }
6602 /* Try to remove the swap file, so the OS will really delete it from the
6603 * file system when Redis exists. */
6604 unlink("/tmp/redisvm");
6605 }
6606
6607 /* ================================= Debugging ============================== */
6608
6609 static void debugCommand(redisClient *c) {
6610 if (!strcasecmp(c->argv[1]->ptr,"segfault")) {
6611 *((char*)-1) = 'x';
6612 } else if (!strcasecmp(c->argv[1]->ptr,"reload")) {
6613 if (rdbSave(server.dbfilename) != REDIS_OK) {
6614 addReply(c,shared.err);
6615 return;
6616 }
6617 emptyDb();
6618 if (rdbLoad(server.dbfilename) != REDIS_OK) {
6619 addReply(c,shared.err);
6620 return;
6621 }
6622 redisLog(REDIS_WARNING,"DB reloaded by DEBUG RELOAD");
6623 addReply(c,shared.ok);
6624 } else if (!strcasecmp(c->argv[1]->ptr,"loadaof")) {
6625 emptyDb();
6626 if (loadAppendOnlyFile(server.appendfilename) != REDIS_OK) {
6627 addReply(c,shared.err);
6628 return;
6629 }
6630 redisLog(REDIS_WARNING,"Append Only File loaded by DEBUG LOADAOF");
6631 addReply(c,shared.ok);
6632 } else if (!strcasecmp(c->argv[1]->ptr,"object") && c->argc == 3) {
6633 dictEntry *de = dictFind(c->db->dict,c->argv[2]);
6634 robj *key, *val;
6635
6636 if (!de) {
6637 addReply(c,shared.nokeyerr);
6638 return;
6639 }
6640 key = dictGetEntryKey(de);
6641 val = dictGetEntryVal(de);
6642 addReplySds(c,sdscatprintf(sdsempty(),
6643 "+Key at:%p refcount:%d, value at:%p refcount:%d encoding:%d serializedlength:%lld\r\n",
6644 (void*)key, key->refcount, (void*)val, val->refcount,
6645 val->encoding, rdbSavedObjectLen(val)));
6646 } else {
6647 addReplySds(c,sdsnew(
6648 "-ERR Syntax error, try DEBUG [SEGFAULT|OBJECT <key>|RELOAD]\r\n"));
6649 }
6650 }
6651
6652 static void _redisAssert(char *estr) {
6653 redisLog(REDIS_WARNING,"=== ASSERTION FAILED ===");
6654 redisLog(REDIS_WARNING,"==> %s\n",estr);
6655 #ifdef HAVE_BACKTRACE
6656 redisLog(REDIS_WARNING,"(forcing SIGSEGV in order to print the stack trace)");
6657 *((char*)-1) = 'x';
6658 #endif
6659 }
6660
6661 /* =================================== Main! ================================ */
6662
6663 #ifdef __linux__
6664 int linuxOvercommitMemoryValue(void) {
6665 FILE *fp = fopen("/proc/sys/vm/overcommit_memory","r");
6666 char buf[64];
6667
6668 if (!fp) return -1;
6669 if (fgets(buf,64,fp) == NULL) {
6670 fclose(fp);
6671 return -1;
6672 }
6673 fclose(fp);
6674
6675 return atoi(buf);
6676 }
6677
6678 void linuxOvercommitMemoryWarning(void) {
6679 if (linuxOvercommitMemoryValue() == 0) {
6680 redisLog(REDIS_WARNING,"WARNING overcommit_memory is set to 0! Background save may fail under low condition memory. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect.");
6681 }
6682 }
6683 #endif /* __linux__ */
6684
6685 static void daemonize(void) {
6686 int fd;
6687 FILE *fp;
6688
6689 if (fork() != 0) exit(0); /* parent exits */
6690 printf("New pid: %d\n", getpid());
6691 setsid(); /* create a new session */
6692
6693 /* Every output goes to /dev/null. If Redis is daemonized but
6694 * the 'logfile' is set to 'stdout' in the configuration file
6695 * it will not log at all. */
6696 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
6697 dup2(fd, STDIN_FILENO);
6698 dup2(fd, STDOUT_FILENO);
6699 dup2(fd, STDERR_FILENO);
6700 if (fd > STDERR_FILENO) close(fd);
6701 }
6702 /* Try to write the pid file */
6703 fp = fopen(server.pidfile,"w");
6704 if (fp) {
6705 fprintf(fp,"%d\n",getpid());
6706 fclose(fp);
6707 }
6708 }
6709
6710 int main(int argc, char **argv) {
6711 initServerConfig();
6712 if (argc == 2) {
6713 resetServerSaveParams();
6714 loadServerConfig(argv[1]);
6715 } else if (argc > 2) {
6716 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
6717 exit(1);
6718 } else {
6719 redisLog(REDIS_WARNING,"Warning: no config file specified, using the default config. In order to specify a config file use 'redis-server /path/to/redis.conf'");
6720 }
6721 if (server.daemonize) daemonize();
6722 initServer();
6723 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
6724 #ifdef __linux__
6725 linuxOvercommitMemoryWarning();
6726 #endif
6727 if (server.appendonly) {
6728 if (loadAppendOnlyFile(server.appendfilename) == REDIS_OK)
6729 redisLog(REDIS_NOTICE,"DB loaded from append only file");
6730 } else {
6731 if (rdbLoad(server.dbfilename) == REDIS_OK)
6732 redisLog(REDIS_NOTICE,"DB loaded from disk");
6733 }
6734 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
6735 acceptHandler, NULL) == AE_ERR) oom("creating file event");
6736 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
6737 aeMain(server.el);
6738 aeDeleteEventLoop(server.el);
6739 return 0;
6740 }
6741
6742 /* ============================= Backtrace support ========================= */
6743
6744 #ifdef HAVE_BACKTRACE
6745 static char *findFuncName(void *pointer, unsigned long *offset);
6746
6747 static void *getMcontextEip(ucontext_t *uc) {
6748 #if defined(__FreeBSD__)
6749 return (void*) uc->uc_mcontext.mc_eip;
6750 #elif defined(__dietlibc__)
6751 return (void*) uc->uc_mcontext.eip;
6752 #elif defined(__APPLE__) && !defined(MAC_OS_X_VERSION_10_6)
6753 #if __x86_64__
6754 return (void*) uc->uc_mcontext->__ss.__rip;
6755 #else
6756 return (void*) uc->uc_mcontext->__ss.__eip;
6757 #endif
6758 #elif defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_6)
6759 #if defined(_STRUCT_X86_THREAD_STATE64) && !defined(__i386__)
6760 return (void*) uc->uc_mcontext->__ss.__rip;
6761 #else
6762 return (void*) uc->uc_mcontext->__ss.__eip;
6763 #endif
6764 #elif defined(__i386__) || defined(__X86_64__) || defined(__x86_64__)
6765 return (void*) uc->uc_mcontext.gregs[REG_EIP]; /* Linux 32/64 bit */
6766 #elif defined(__ia64__) /* Linux IA64 */
6767 return (void*) uc->uc_mcontext.sc_ip;
6768 #else
6769 return NULL;
6770 #endif
6771 }
6772
6773 static void segvHandler(int sig, siginfo_t *info, void *secret) {
6774 void *trace[100];
6775 char **messages = NULL;
6776 int i, trace_size = 0;
6777 unsigned long offset=0;
6778 ucontext_t *uc = (ucontext_t*) secret;
6779 sds infostring;
6780 REDIS_NOTUSED(info);
6781
6782 redisLog(REDIS_WARNING,
6783 "======= Ooops! Redis %s got signal: -%d- =======", REDIS_VERSION, sig);
6784 infostring = genRedisInfoString();
6785 redisLog(REDIS_WARNING, "%s",infostring);
6786 /* It's not safe to sdsfree() the returned string under memory
6787 * corruption conditions. Let it leak as we are going to abort */
6788
6789 trace_size = backtrace(trace, 100);
6790 /* overwrite sigaction with caller's address */
6791 if (getMcontextEip(uc) != NULL) {
6792 trace[1] = getMcontextEip(uc);
6793 }
6794 messages = backtrace_symbols(trace, trace_size);
6795
6796 for (i=1; i<trace_size; ++i) {
6797 char *fn = findFuncName(trace[i], &offset), *p;
6798
6799 p = strchr(messages[i],'+');
6800 if (!fn || (p && ((unsigned long)strtol(p+1,NULL,10)) < offset)) {
6801 redisLog(REDIS_WARNING,"%s", messages[i]);
6802 } else {
6803 redisLog(REDIS_WARNING,"%d redis-server %p %s + %d", i, trace[i], fn, (unsigned int)offset);
6804 }
6805 }
6806 /* free(messages); Don't call free() with possibly corrupted memory. */
6807 exit(0);
6808 }
6809
6810 static void setupSigSegvAction(void) {
6811 struct sigaction act;
6812
6813 sigemptyset (&act.sa_mask);
6814 /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction
6815 * is used. Otherwise, sa_handler is used */
6816 act.sa_flags = SA_NODEFER | SA_ONSTACK | SA_RESETHAND | SA_SIGINFO;
6817 act.sa_sigaction = segvHandler;
6818 sigaction (SIGSEGV, &act, NULL);
6819 sigaction (SIGBUS, &act, NULL);
6820 sigaction (SIGFPE, &act, NULL);
6821 sigaction (SIGILL, &act, NULL);
6822 sigaction (SIGBUS, &act, NULL);
6823 return;
6824 }
6825
6826 #include "staticsymbols.h"
6827 /* This function try to convert a pointer into a function name. It's used in
6828 * oreder to provide a backtrace under segmentation fault that's able to
6829 * display functions declared as static (otherwise the backtrace is useless). */
6830 static char *findFuncName(void *pointer, unsigned long *offset){
6831 int i, ret = -1;
6832 unsigned long off, minoff = 0;
6833
6834 /* Try to match against the Symbol with the smallest offset */
6835 for (i=0; symsTable[i].pointer; i++) {
6836 unsigned long lp = (unsigned long) pointer;
6837
6838 if (lp != (unsigned long)-1 && lp >= symsTable[i].pointer) {
6839 off=lp-symsTable[i].pointer;
6840 if (ret < 0 || off < minoff) {
6841 minoff=off;
6842 ret=i;
6843 }
6844 }
6845 }
6846 if (ret == -1) return NULL;
6847 *offset = minoff;
6848 return symsTable[ret].name;
6849 }
6850 #else /* HAVE_BACKTRACE */
6851 static void setupSigSegvAction(void) {
6852 }
6853 #endif /* HAVE_BACKTRACE */
6854
6855
6856
6857 /* The End */
6858
6859
6860