]> git.saurik.com Git - redis.git/blame_incremental - redis.c
SUNION, SUNIONSTORE, Initial work on non blocking replication
[redis.git] / redis.c
... / ...
CommitLineData
1/*
2 * Copyright (c) 2006-2009, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30#define REDIS_VERSION "0.091"
31
32#include "fmacros.h"
33
34#include <stdio.h>
35#include <stdlib.h>
36#include <string.h>
37#include <time.h>
38#include <unistd.h>
39#include <signal.h>
40#include <sys/wait.h>
41#include <errno.h>
42#include <assert.h>
43#include <ctype.h>
44#include <stdarg.h>
45#include <inttypes.h>
46#include <arpa/inet.h>
47#include <sys/stat.h>
48#include <fcntl.h>
49#include <sys/time.h>
50#include <sys/resource.h>
51#include <limits.h>
52
53#include "ae.h" /* Event driven programming library */
54#include "sds.h" /* Dynamic safe strings */
55#include "anet.h" /* Networking the easy way */
56#include "dict.h" /* Hash tables */
57#include "adlist.h" /* Linked lists */
58#include "zmalloc.h" /* total memory usage aware version of malloc/free */
59#include "lzf.h"
60
61/* Error codes */
62#define REDIS_OK 0
63#define REDIS_ERR -1
64
65/* Static server configuration */
66#define REDIS_SERVERPORT 6379 /* TCP port */
67#define REDIS_MAXIDLETIME (60*5) /* default client timeout */
68#define REDIS_QUERYBUF_LEN 1024
69#define REDIS_LOADBUF_LEN 1024
70#define REDIS_MAX_ARGS 16
71#define REDIS_DEFAULT_DBNUM 16
72#define REDIS_CONFIGLINE_MAX 1024
73#define REDIS_OBJFREELIST_MAX 1000000 /* Max number of objects to cache */
74#define REDIS_MAX_SYNC_TIME 60 /* Slave can't take more to sync */
75#define REDIS_EXPIRELOOKUPS_PER_CRON 100 /* try to expire 100 keys/second */
76
77/* Hash table parameters */
78#define REDIS_HT_MINFILL 10 /* Minimal hash table fill 10% */
79#define REDIS_HT_MINSLOTS 16384 /* Never resize the HT under this */
80
81/* Command flags */
82#define REDIS_CMD_BULK 1
83#define REDIS_CMD_INLINE 2
84
85/* Object types */
86#define REDIS_STRING 0
87#define REDIS_LIST 1
88#define REDIS_SET 2
89#define REDIS_HASH 3
90
91/* Object types only used for dumping to disk */
92#define REDIS_EXPIRETIME 253
93#define REDIS_SELECTDB 254
94#define REDIS_EOF 255
95
96/* Defines related to the dump file format. To store 32 bits lengths for short
97 * keys requires a lot of space, so we check the most significant 2 bits of
98 * the first byte to interpreter the length:
99 *
100 * 00|000000 => if the two MSB are 00 the len is the 6 bits of this byte
101 * 01|000000 00000000 => 01, the len is 14 byes, 6 bits + 8 bits of next byte
102 * 10|000000 [32 bit integer] => if it's 01, a full 32 bit len will follow
103 * 11|000000 this means: specially encoded object will follow. The six bits
104 * number specify the kind of object that follows.
105 * See the REDIS_RDB_ENC_* defines.
106 *
107 * Lenghts up to 63 are stored using a single byte, most DB keys, and may
108 * values, will fit inside. */
109#define REDIS_RDB_6BITLEN 0
110#define REDIS_RDB_14BITLEN 1
111#define REDIS_RDB_32BITLEN 2
112#define REDIS_RDB_ENCVAL 3
113#define REDIS_RDB_LENERR UINT_MAX
114
115/* When a length of a string object stored on disk has the first two bits
116 * set, the remaining two bits specify a special encoding for the object
117 * accordingly to the following defines: */
118#define REDIS_RDB_ENC_INT8 0 /* 8 bit signed integer */
119#define REDIS_RDB_ENC_INT16 1 /* 16 bit signed integer */
120#define REDIS_RDB_ENC_INT32 2 /* 32 bit signed integer */
121#define REDIS_RDB_ENC_LZF 3 /* string compressed with FASTLZ */
122
123/* Client flags */
124#define REDIS_CLOSE 1 /* This client connection should be closed ASAP */
125#define REDIS_SLAVE 2 /* This client is a slave server */
126#define REDIS_MASTER 4 /* This client is a master server */
127#define REDIS_MONITOR 8 /* This client is a slave monitor, see MONITOR */
128
129/* Slave replication state - slave side */
130#define REDIS_REPL_NONE 0 /* No active replication */
131#define REDIS_REPL_CONNECT 1 /* Must connect to master */
132#define REDIS_REPL_CONNECTED 2 /* Connected to master */
133
134/* Slave replication state - from the point of view of master
135 * Note that in SEND_BULK and ONLINE state the slave receives new updates
136 * in its output queue. In the WAIT_BGSAVE state instead the server is waiting
137 * to start the next background saving in order to send updates to it. */
138#define REDIS_REPL_WAIT_BGSAVE_START 3 /* master waits bgsave to start feeding it */
139#define REDIS_REPL_WAIT_BGSAVE_END 4 /* master waits bgsave to start bulk DB transmission */
140#define REDIS_REPL_SEND_BULK 5 /* master is sending the bulk DB */
141#define REDIS_REPL_ONLINE 6 /* bulk DB already transmitted, receive updates */
142
143/* List related stuff */
144#define REDIS_HEAD 0
145#define REDIS_TAIL 1
146
147/* Sort operations */
148#define REDIS_SORT_GET 0
149#define REDIS_SORT_DEL 1
150#define REDIS_SORT_INCR 2
151#define REDIS_SORT_DECR 3
152#define REDIS_SORT_ASC 4
153#define REDIS_SORT_DESC 5
154#define REDIS_SORTKEY_MAX 1024
155
156/* Log levels */
157#define REDIS_DEBUG 0
158#define REDIS_NOTICE 1
159#define REDIS_WARNING 2
160
161/* Anti-warning macro... */
162#define REDIS_NOTUSED(V) ((void) V)
163
164/*================================= Data types ============================== */
165
166/* A redis object, that is a type able to hold a string / list / set */
167typedef struct redisObject {
168 void *ptr;
169 int type;
170 int refcount;
171} robj;
172
173typedef struct redisDb {
174 dict *dict;
175 dict *expires;
176 int id;
177} redisDb;
178
179/* With multiplexing we need to take per-clinet state.
180 * Clients are taken in a liked list. */
181typedef struct redisClient {
182 int fd;
183 redisDb *db;
184 int dictid;
185 sds querybuf;
186 robj *argv[REDIS_MAX_ARGS];
187 int argc;
188 int bulklen; /* bulk read len. -1 if not in bulk read mode */
189 list *reply;
190 int sentlen;
191 time_t lastinteraction; /* time of the last interaction, used for timeout */
192 int flags; /* REDIS_CLOSE | REDIS_SLAVE | REDIS_MONITOR */
193 int slaveseldb; /* slave selected db, if this client is a slave */
194 int authenticated; /* when requirepass is non-NULL */
195 int replstate; /* replication state if this is a slave */
196 int repldbfd; /* replication DB file descriptor */
197 int repldboff; /* replication DB file offset */
198 off_t repldbsize; /* replication DB file size */
199} redisClient;
200
201struct saveparam {
202 time_t seconds;
203 int changes;
204};
205
206/* Global server state structure */
207struct redisServer {
208 int port;
209 int fd;
210 redisDb *db;
211 dict *sharingpool;
212 unsigned int sharingpoolsize;
213 long long dirty; /* changes to DB from the last save */
214 list *clients;
215 list *slaves, *monitors;
216 char neterr[ANET_ERR_LEN];
217 aeEventLoop *el;
218 int cronloops; /* number of times the cron function run */
219 list *objfreelist; /* A list of freed objects to avoid malloc() */
220 time_t lastsave; /* Unix time of last save succeeede */
221 size_t usedmemory; /* Used memory in megabytes */
222 /* Fields used only for stats */
223 time_t stat_starttime; /* server start time */
224 long long stat_numcommands; /* number of processed commands */
225 long long stat_numconnections; /* number of connections received */
226 /* Configuration */
227 int verbosity;
228 int glueoutputbuf;
229 int maxidletime;
230 int dbnum;
231 int daemonize;
232 char *pidfile;
233 int bgsaveinprogress;
234 struct saveparam *saveparams;
235 int saveparamslen;
236 char *logfile;
237 char *bindaddr;
238 char *dbfilename;
239 char *requirepass;
240 int shareobjects;
241 /* Replication related */
242 int isslave;
243 char *masterhost;
244 int masterport;
245 redisClient *master; /* client that is master for this slave */
246 int replstate;
247 /* Sort parameters - qsort_r() is only available under BSD so we
248 * have to take this state global, in order to pass it to sortCompare() */
249 int sort_desc;
250 int sort_alpha;
251 int sort_bypattern;
252};
253
254typedef void redisCommandProc(redisClient *c);
255struct redisCommand {
256 char *name;
257 redisCommandProc *proc;
258 int arity;
259 int flags;
260};
261
262typedef struct _redisSortObject {
263 robj *obj;
264 union {
265 double score;
266 robj *cmpobj;
267 } u;
268} redisSortObject;
269
270typedef struct _redisSortOperation {
271 int type;
272 robj *pattern;
273} redisSortOperation;
274
275struct sharedObjectsStruct {
276 robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *pong, *space,
277 *colon, *nullbulk, *nullmultibulk,
278 *emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
279 *outofrangeerr, *plus,
280 *select0, *select1, *select2, *select3, *select4,
281 *select5, *select6, *select7, *select8, *select9;
282} shared;
283
284/*================================ Prototypes =============================== */
285
286static void freeStringObject(robj *o);
287static void freeListObject(robj *o);
288static void freeSetObject(robj *o);
289static void decrRefCount(void *o);
290static robj *createObject(int type, void *ptr);
291static void freeClient(redisClient *c);
292static int rdbLoad(char *filename);
293static void addReply(redisClient *c, robj *obj);
294static void addReplySds(redisClient *c, sds s);
295static void incrRefCount(robj *o);
296static int rdbSaveBackground(char *filename);
297static robj *createStringObject(char *ptr, size_t len);
298static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc);
299static int syncWithMaster(void);
300static robj *tryObjectSharing(robj *o);
301static int removeExpire(redisDb *db, robj *key);
302static int expireIfNeeded(redisDb *db, robj *key);
303static int deleteIfVolatile(redisDb *db, robj *key);
304static int deleteKey(redisDb *db, robj *key);
305static time_t getExpire(redisDb *db, robj *key);
306static int setExpire(redisDb *db, robj *key, time_t when);
307
308static void authCommand(redisClient *c);
309static void pingCommand(redisClient *c);
310static void echoCommand(redisClient *c);
311static void setCommand(redisClient *c);
312static void setnxCommand(redisClient *c);
313static void getCommand(redisClient *c);
314static void delCommand(redisClient *c);
315static void existsCommand(redisClient *c);
316static void incrCommand(redisClient *c);
317static void decrCommand(redisClient *c);
318static void incrbyCommand(redisClient *c);
319static void decrbyCommand(redisClient *c);
320static void selectCommand(redisClient *c);
321static void randomkeyCommand(redisClient *c);
322static void keysCommand(redisClient *c);
323static void dbsizeCommand(redisClient *c);
324static void lastsaveCommand(redisClient *c);
325static void saveCommand(redisClient *c);
326static void bgsaveCommand(redisClient *c);
327static void shutdownCommand(redisClient *c);
328static void moveCommand(redisClient *c);
329static void renameCommand(redisClient *c);
330static void renamenxCommand(redisClient *c);
331static void lpushCommand(redisClient *c);
332static void rpushCommand(redisClient *c);
333static void lpopCommand(redisClient *c);
334static void rpopCommand(redisClient *c);
335static void llenCommand(redisClient *c);
336static void lindexCommand(redisClient *c);
337static void lrangeCommand(redisClient *c);
338static void ltrimCommand(redisClient *c);
339static void typeCommand(redisClient *c);
340static void lsetCommand(redisClient *c);
341static void saddCommand(redisClient *c);
342static void sremCommand(redisClient *c);
343static void sismemberCommand(redisClient *c);
344static void scardCommand(redisClient *c);
345static void sinterCommand(redisClient *c);
346static void sinterstoreCommand(redisClient *c);
347static void sunionCommand(redisClient *c);
348static void sunionstoreCommand(redisClient *c);
349static void syncCommand(redisClient *c);
350static void flushdbCommand(redisClient *c);
351static void flushallCommand(redisClient *c);
352static void sortCommand(redisClient *c);
353static void lremCommand(redisClient *c);
354static void infoCommand(redisClient *c);
355static void mgetCommand(redisClient *c);
356static void monitorCommand(redisClient *c);
357static void expireCommand(redisClient *c);
358
359/*================================= Globals ================================= */
360
361/* Global vars */
362static struct redisServer server; /* server global state */
363static struct redisCommand cmdTable[] = {
364 {"get",getCommand,2,REDIS_CMD_INLINE},
365 {"set",setCommand,3,REDIS_CMD_BULK},
366 {"setnx",setnxCommand,3,REDIS_CMD_BULK},
367 {"del",delCommand,2,REDIS_CMD_INLINE},
368 {"exists",existsCommand,2,REDIS_CMD_INLINE},
369 {"incr",incrCommand,2,REDIS_CMD_INLINE},
370 {"decr",decrCommand,2,REDIS_CMD_INLINE},
371 {"mget",mgetCommand,-2,REDIS_CMD_INLINE},
372 {"rpush",rpushCommand,3,REDIS_CMD_BULK},
373 {"lpush",lpushCommand,3,REDIS_CMD_BULK},
374 {"rpop",rpopCommand,2,REDIS_CMD_INLINE},
375 {"lpop",lpopCommand,2,REDIS_CMD_INLINE},
376 {"llen",llenCommand,2,REDIS_CMD_INLINE},
377 {"lindex",lindexCommand,3,REDIS_CMD_INLINE},
378 {"lset",lsetCommand,4,REDIS_CMD_BULK},
379 {"lrange",lrangeCommand,4,REDIS_CMD_INLINE},
380 {"ltrim",ltrimCommand,4,REDIS_CMD_INLINE},
381 {"lrem",lremCommand,4,REDIS_CMD_BULK},
382 {"sadd",saddCommand,3,REDIS_CMD_BULK},
383 {"srem",sremCommand,3,REDIS_CMD_BULK},
384 {"sismember",sismemberCommand,3,REDIS_CMD_BULK},
385 {"scard",scardCommand,2,REDIS_CMD_INLINE},
386 {"sinter",sinterCommand,-2,REDIS_CMD_INLINE},
387 {"sinterstore",sinterstoreCommand,-3,REDIS_CMD_INLINE},
388 {"sunion",sunionCommand,-2,REDIS_CMD_INLINE},
389 {"sunionstore",sunionstoreCommand,-3,REDIS_CMD_INLINE},
390 {"smembers",sinterCommand,2,REDIS_CMD_INLINE},
391 {"incrby",incrbyCommand,3,REDIS_CMD_INLINE},
392 {"decrby",decrbyCommand,3,REDIS_CMD_INLINE},
393 {"randomkey",randomkeyCommand,1,REDIS_CMD_INLINE},
394 {"select",selectCommand,2,REDIS_CMD_INLINE},
395 {"move",moveCommand,3,REDIS_CMD_INLINE},
396 {"rename",renameCommand,3,REDIS_CMD_INLINE},
397 {"renamenx",renamenxCommand,3,REDIS_CMD_INLINE},
398 {"keys",keysCommand,2,REDIS_CMD_INLINE},
399 {"dbsize",dbsizeCommand,1,REDIS_CMD_INLINE},
400 {"auth",authCommand,2,REDIS_CMD_INLINE},
401 {"ping",pingCommand,1,REDIS_CMD_INLINE},
402 {"echo",echoCommand,2,REDIS_CMD_BULK},
403 {"save",saveCommand,1,REDIS_CMD_INLINE},
404 {"bgsave",bgsaveCommand,1,REDIS_CMD_INLINE},
405 {"shutdown",shutdownCommand,1,REDIS_CMD_INLINE},
406 {"lastsave",lastsaveCommand,1,REDIS_CMD_INLINE},
407 {"type",typeCommand,2,REDIS_CMD_INLINE},
408 {"sync",syncCommand,1,REDIS_CMD_INLINE},
409 {"flushdb",flushdbCommand,1,REDIS_CMD_INLINE},
410 {"flushall",flushallCommand,1,REDIS_CMD_INLINE},
411 {"sort",sortCommand,-2,REDIS_CMD_INLINE},
412 {"info",infoCommand,1,REDIS_CMD_INLINE},
413 {"monitor",monitorCommand,1,REDIS_CMD_INLINE},
414 {"expire",expireCommand,3,REDIS_CMD_INLINE},
415 {NULL,NULL,0,0}
416};
417
418/*============================ Utility functions ============================ */
419
420/* Glob-style pattern matching. */
421int stringmatchlen(const char *pattern, int patternLen,
422 const char *string, int stringLen, int nocase)
423{
424 while(patternLen) {
425 switch(pattern[0]) {
426 case '*':
427 while (pattern[1] == '*') {
428 pattern++;
429 patternLen--;
430 }
431 if (patternLen == 1)
432 return 1; /* match */
433 while(stringLen) {
434 if (stringmatchlen(pattern+1, patternLen-1,
435 string, stringLen, nocase))
436 return 1; /* match */
437 string++;
438 stringLen--;
439 }
440 return 0; /* no match */
441 break;
442 case '?':
443 if (stringLen == 0)
444 return 0; /* no match */
445 string++;
446 stringLen--;
447 break;
448 case '[':
449 {
450 int not, match;
451
452 pattern++;
453 patternLen--;
454 not = pattern[0] == '^';
455 if (not) {
456 pattern++;
457 patternLen--;
458 }
459 match = 0;
460 while(1) {
461 if (pattern[0] == '\\') {
462 pattern++;
463 patternLen--;
464 if (pattern[0] == string[0])
465 match = 1;
466 } else if (pattern[0] == ']') {
467 break;
468 } else if (patternLen == 0) {
469 pattern--;
470 patternLen++;
471 break;
472 } else if (pattern[1] == '-' && patternLen >= 3) {
473 int start = pattern[0];
474 int end = pattern[2];
475 int c = string[0];
476 if (start > end) {
477 int t = start;
478 start = end;
479 end = t;
480 }
481 if (nocase) {
482 start = tolower(start);
483 end = tolower(end);
484 c = tolower(c);
485 }
486 pattern += 2;
487 patternLen -= 2;
488 if (c >= start && c <= end)
489 match = 1;
490 } else {
491 if (!nocase) {
492 if (pattern[0] == string[0])
493 match = 1;
494 } else {
495 if (tolower((int)pattern[0]) == tolower((int)string[0]))
496 match = 1;
497 }
498 }
499 pattern++;
500 patternLen--;
501 }
502 if (not)
503 match = !match;
504 if (!match)
505 return 0; /* no match */
506 string++;
507 stringLen--;
508 break;
509 }
510 case '\\':
511 if (patternLen >= 2) {
512 pattern++;
513 patternLen--;
514 }
515 /* fall through */
516 default:
517 if (!nocase) {
518 if (pattern[0] != string[0])
519 return 0; /* no match */
520 } else {
521 if (tolower((int)pattern[0]) != tolower((int)string[0]))
522 return 0; /* no match */
523 }
524 string++;
525 stringLen--;
526 break;
527 }
528 pattern++;
529 patternLen--;
530 if (stringLen == 0) {
531 while(*pattern == '*') {
532 pattern++;
533 patternLen--;
534 }
535 break;
536 }
537 }
538 if (patternLen == 0 && stringLen == 0)
539 return 1;
540 return 0;
541}
542
543void redisLog(int level, const char *fmt, ...)
544{
545 va_list ap;
546 FILE *fp;
547
548 fp = (server.logfile == NULL) ? stdout : fopen(server.logfile,"a");
549 if (!fp) return;
550
551 va_start(ap, fmt);
552 if (level >= server.verbosity) {
553 char *c = ".-*";
554 fprintf(fp,"%c ",c[level]);
555 vfprintf(fp, fmt, ap);
556 fprintf(fp,"\n");
557 fflush(fp);
558 }
559 va_end(ap);
560
561 if (server.logfile) fclose(fp);
562}
563
564/*====================== Hash table type implementation ==================== */
565
566/* This is an hash table type that uses the SDS dynamic strings libary as
567 * keys and radis objects as values (objects can hold SDS strings,
568 * lists, sets). */
569
570static int sdsDictKeyCompare(void *privdata, const void *key1,
571 const void *key2)
572{
573 int l1,l2;
574 DICT_NOTUSED(privdata);
575
576 l1 = sdslen((sds)key1);
577 l2 = sdslen((sds)key2);
578 if (l1 != l2) return 0;
579 return memcmp(key1, key2, l1) == 0;
580}
581
582static void dictRedisObjectDestructor(void *privdata, void *val)
583{
584 DICT_NOTUSED(privdata);
585
586 decrRefCount(val);
587}
588
589static int dictSdsKeyCompare(void *privdata, const void *key1,
590 const void *key2)
591{
592 const robj *o1 = key1, *o2 = key2;
593 return sdsDictKeyCompare(privdata,o1->ptr,o2->ptr);
594}
595
596static unsigned int dictSdsHash(const void *key) {
597 const robj *o = key;
598 return dictGenHashFunction(o->ptr, sdslen((sds)o->ptr));
599}
600
601static dictType setDictType = {
602 dictSdsHash, /* hash function */
603 NULL, /* key dup */
604 NULL, /* val dup */
605 dictSdsKeyCompare, /* key compare */
606 dictRedisObjectDestructor, /* key destructor */
607 NULL /* val destructor */
608};
609
610static dictType hashDictType = {
611 dictSdsHash, /* hash function */
612 NULL, /* key dup */
613 NULL, /* val dup */
614 dictSdsKeyCompare, /* key compare */
615 dictRedisObjectDestructor, /* key destructor */
616 dictRedisObjectDestructor /* val destructor */
617};
618
619/* ========================= Random utility functions ======================= */
620
621/* Redis generally does not try to recover from out of memory conditions
622 * when allocating objects or strings, it is not clear if it will be possible
623 * to report this condition to the client since the networking layer itself
624 * is based on heap allocation for send buffers, so we simply abort.
625 * At least the code will be simpler to read... */
626static void oom(const char *msg) {
627 fprintf(stderr, "%s: Out of memory\n",msg);
628 fflush(stderr);
629 sleep(1);
630 abort();
631}
632
633/* ====================== Redis server networking stuff ===================== */
634void closeTimedoutClients(void) {
635 redisClient *c;
636 listIter *li;
637 listNode *ln;
638 time_t now = time(NULL);
639
640 li = listGetIterator(server.clients,AL_START_HEAD);
641 if (!li) return;
642 while ((ln = listNextElement(li)) != NULL) {
643 c = listNodeValue(ln);
644 if (!(c->flags & REDIS_SLAVE) && /* no timeout for slaves */
645 (now - c->lastinteraction > server.maxidletime)) {
646 redisLog(REDIS_DEBUG,"Closing idle client");
647 freeClient(c);
648 }
649 }
650 listReleaseIterator(li);
651}
652
653int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
654 int j, loops = server.cronloops++;
655 REDIS_NOTUSED(eventLoop);
656 REDIS_NOTUSED(id);
657 REDIS_NOTUSED(clientData);
658
659 /* Update the global state with the amount of used memory */
660 server.usedmemory = zmalloc_used_memory();
661
662 /* If the percentage of used slots in the HT reaches REDIS_HT_MINFILL
663 * we resize the hash table to save memory */
664 for (j = 0; j < server.dbnum; j++) {
665 int size, used, vkeys;
666
667 size = dictSlots(server.db[j].dict);
668 used = dictSize(server.db[j].dict);
669 vkeys = dictSize(server.db[j].expires);
670 if (!(loops % 5) && used > 0) {
671 redisLog(REDIS_DEBUG,"DB %d: %d keys (%d volatile) in %d slots HT.",j,used,vkeys,size);
672 /* dictPrintStats(server.dict); */
673 }
674 if (size && used && size > REDIS_HT_MINSLOTS &&
675 (used*100/size < REDIS_HT_MINFILL)) {
676 redisLog(REDIS_NOTICE,"The hash table %d is too sparse, resize it...",j);
677 dictResize(server.db[j].dict);
678 redisLog(REDIS_NOTICE,"Hash table %d resized.",j);
679 }
680 }
681
682 /* Show information about connected clients */
683 if (!(loops % 5)) {
684 redisLog(REDIS_DEBUG,"%d clients connected (%d slaves), %zu bytes in use",
685 listLength(server.clients)-listLength(server.slaves),
686 listLength(server.slaves),
687 server.usedmemory,
688 dictSize(server.sharingpool));
689 }
690
691 /* Close connections of timedout clients */
692 if (!(loops % 10))
693 closeTimedoutClients();
694
695 /* Check if a background saving in progress terminated */
696 if (server.bgsaveinprogress) {
697 int statloc;
698 if (wait4(-1,&statloc,WNOHANG,NULL)) {
699 int exitcode = WEXITSTATUS(statloc);
700 if (exitcode == 0) {
701 redisLog(REDIS_NOTICE,
702 "Background saving terminated with success");
703 server.dirty = 0;
704 server.lastsave = time(NULL);
705 } else {
706 redisLog(REDIS_WARNING,
707 "Background saving error");
708 }
709 server.bgsaveinprogress = 0;
710 }
711 } else {
712 /* If there is not a background saving in progress check if
713 * we have to save now */
714 time_t now = time(NULL);
715 for (j = 0; j < server.saveparamslen; j++) {
716 struct saveparam *sp = server.saveparams+j;
717
718 if (server.dirty >= sp->changes &&
719 now-server.lastsave > sp->seconds) {
720 redisLog(REDIS_NOTICE,"%d changes in %d seconds. Saving...",
721 sp->changes, sp->seconds);
722 rdbSaveBackground(server.dbfilename);
723 break;
724 }
725 }
726 }
727
728 /* Try to expire a few timed out keys */
729 for (j = 0; j < server.dbnum; j++) {
730 redisDb *db = server.db+j;
731 int num = dictSize(db->expires);
732
733 if (num) {
734 time_t now = time(NULL);
735
736 if (num > REDIS_EXPIRELOOKUPS_PER_CRON)
737 num = REDIS_EXPIRELOOKUPS_PER_CRON;
738 while (num--) {
739 dictEntry *de;
740 time_t t;
741
742 if ((de = dictGetRandomKey(db->expires)) == NULL) break;
743 t = (time_t) dictGetEntryVal(de);
744 if (now > t) {
745 deleteKey(db,dictGetEntryKey(de));
746 }
747 }
748 }
749 }
750
751 /* Check if we should connect to a MASTER */
752 if (server.replstate == REDIS_REPL_CONNECT) {
753 redisLog(REDIS_NOTICE,"Connecting to MASTER...");
754 if (syncWithMaster() == REDIS_OK) {
755 redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync succeeded");
756 }
757 }
758 return 1000;
759}
760
761static void createSharedObjects(void) {
762 shared.crlf = createObject(REDIS_STRING,sdsnew("\r\n"));
763 shared.ok = createObject(REDIS_STRING,sdsnew("+OK\r\n"));
764 shared.err = createObject(REDIS_STRING,sdsnew("-ERR\r\n"));
765 shared.emptybulk = createObject(REDIS_STRING,sdsnew("$0\r\n\r\n"));
766 shared.czero = createObject(REDIS_STRING,sdsnew(":0\r\n"));
767 shared.cone = createObject(REDIS_STRING,sdsnew(":1\r\n"));
768 shared.nullbulk = createObject(REDIS_STRING,sdsnew("$-1\r\n"));
769 shared.nullmultibulk = createObject(REDIS_STRING,sdsnew("*-1\r\n"));
770 shared.emptymultibulk = createObject(REDIS_STRING,sdsnew("*0\r\n"));
771 /* no such key */
772 shared.pong = createObject(REDIS_STRING,sdsnew("+PONG\r\n"));
773 shared.wrongtypeerr = createObject(REDIS_STRING,sdsnew(
774 "-ERR Operation against a key holding the wrong kind of value\r\n"));
775 shared.nokeyerr = createObject(REDIS_STRING,sdsnew(
776 "-ERR no such key\r\n"));
777 shared.syntaxerr = createObject(REDIS_STRING,sdsnew(
778 "-ERR syntax error\r\n"));
779 shared.sameobjecterr = createObject(REDIS_STRING,sdsnew(
780 "-ERR source and destination objects are the same\r\n"));
781 shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
782 "-ERR index out of range\r\n"));
783 shared.space = createObject(REDIS_STRING,sdsnew(" "));
784 shared.colon = createObject(REDIS_STRING,sdsnew(":"));
785 shared.plus = createObject(REDIS_STRING,sdsnew("+"));
786 shared.select0 = createStringObject("select 0\r\n",10);
787 shared.select1 = createStringObject("select 1\r\n",10);
788 shared.select2 = createStringObject("select 2\r\n",10);
789 shared.select3 = createStringObject("select 3\r\n",10);
790 shared.select4 = createStringObject("select 4\r\n",10);
791 shared.select5 = createStringObject("select 5\r\n",10);
792 shared.select6 = createStringObject("select 6\r\n",10);
793 shared.select7 = createStringObject("select 7\r\n",10);
794 shared.select8 = createStringObject("select 8\r\n",10);
795 shared.select9 = createStringObject("select 9\r\n",10);
796}
797
798static void appendServerSaveParams(time_t seconds, int changes) {
799 server.saveparams = zrealloc(server.saveparams,sizeof(struct saveparam)*(server.saveparamslen+1));
800 if (server.saveparams == NULL) oom("appendServerSaveParams");
801 server.saveparams[server.saveparamslen].seconds = seconds;
802 server.saveparams[server.saveparamslen].changes = changes;
803 server.saveparamslen++;
804}
805
806static void ResetServerSaveParams() {
807 zfree(server.saveparams);
808 server.saveparams = NULL;
809 server.saveparamslen = 0;
810}
811
812static void initServerConfig() {
813 server.dbnum = REDIS_DEFAULT_DBNUM;
814 server.port = REDIS_SERVERPORT;
815 server.verbosity = REDIS_DEBUG;
816 server.maxidletime = REDIS_MAXIDLETIME;
817 server.saveparams = NULL;
818 server.logfile = NULL; /* NULL = log on standard output */
819 server.bindaddr = NULL;
820 server.glueoutputbuf = 1;
821 server.daemonize = 0;
822 server.pidfile = "/var/run/redis.pid";
823 server.dbfilename = "dump.rdb";
824 server.requirepass = NULL;
825 server.shareobjects = 0;
826 ResetServerSaveParams();
827
828 appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */
829 appendServerSaveParams(300,100); /* save after 5 minutes and 100 changes */
830 appendServerSaveParams(60,10000); /* save after 1 minute and 10000 changes */
831 /* Replication related */
832 server.isslave = 0;
833 server.masterhost = NULL;
834 server.masterport = 6379;
835 server.master = NULL;
836 server.replstate = REDIS_REPL_NONE;
837}
838
839static void initServer() {
840 int j;
841
842 signal(SIGHUP, SIG_IGN);
843 signal(SIGPIPE, SIG_IGN);
844
845 server.clients = listCreate();
846 server.slaves = listCreate();
847 server.monitors = listCreate();
848 server.objfreelist = listCreate();
849 createSharedObjects();
850 server.el = aeCreateEventLoop();
851 server.db = zmalloc(sizeof(redisDb)*server.dbnum);
852 server.sharingpool = dictCreate(&setDictType,NULL);
853 server.sharingpoolsize = 1024;
854 if (!server.db || !server.clients || !server.slaves || !server.monitors || !server.el || !server.objfreelist)
855 oom("server initialization"); /* Fatal OOM */
856 server.fd = anetTcpServer(server.neterr, server.port, server.bindaddr);
857 if (server.fd == -1) {
858 redisLog(REDIS_WARNING, "Opening TCP port: %s", server.neterr);
859 exit(1);
860 }
861 for (j = 0; j < server.dbnum; j++) {
862 server.db[j].dict = dictCreate(&hashDictType,NULL);
863 server.db[j].expires = dictCreate(&setDictType,NULL);
864 server.db[j].id = j;
865 }
866 server.cronloops = 0;
867 server.bgsaveinprogress = 0;
868 server.lastsave = time(NULL);
869 server.dirty = 0;
870 server.usedmemory = 0;
871 server.stat_numcommands = 0;
872 server.stat_numconnections = 0;
873 server.stat_starttime = time(NULL);
874 aeCreateTimeEvent(server.el, 1000, serverCron, NULL, NULL);
875}
876
877/* Empty the whole database */
878static void emptyDb() {
879 int j;
880
881 for (j = 0; j < server.dbnum; j++) {
882 dictEmpty(server.db[j].dict);
883 dictEmpty(server.db[j].expires);
884 }
885}
886
887/* I agree, this is a very rudimental way to load a configuration...
888 will improve later if the config gets more complex */
889static void loadServerConfig(char *filename) {
890 FILE *fp = fopen(filename,"r");
891 char buf[REDIS_CONFIGLINE_MAX+1], *err = NULL;
892 int linenum = 0;
893 sds line = NULL;
894
895 if (!fp) {
896 redisLog(REDIS_WARNING,"Fatal error, can't open config file");
897 exit(1);
898 }
899 while(fgets(buf,REDIS_CONFIGLINE_MAX+1,fp) != NULL) {
900 sds *argv;
901 int argc, j;
902
903 linenum++;
904 line = sdsnew(buf);
905 line = sdstrim(line," \t\r\n");
906
907 /* Skip comments and blank lines*/
908 if (line[0] == '#' || line[0] == '\0') {
909 sdsfree(line);
910 continue;
911 }
912
913 /* Split into arguments */
914 argv = sdssplitlen(line,sdslen(line)," ",1,&argc);
915 sdstolower(argv[0]);
916
917 /* Execute config directives */
918 if (!strcmp(argv[0],"timeout") && argc == 2) {
919 server.maxidletime = atoi(argv[1]);
920 if (server.maxidletime < 1) {
921 err = "Invalid timeout value"; goto loaderr;
922 }
923 } else if (!strcmp(argv[0],"port") && argc == 2) {
924 server.port = atoi(argv[1]);
925 if (server.port < 1 || server.port > 65535) {
926 err = "Invalid port"; goto loaderr;
927 }
928 } else if (!strcmp(argv[0],"bind") && argc == 2) {
929 server.bindaddr = zstrdup(argv[1]);
930 } else if (!strcmp(argv[0],"save") && argc == 3) {
931 int seconds = atoi(argv[1]);
932 int changes = atoi(argv[2]);
933 if (seconds < 1 || changes < 0) {
934 err = "Invalid save parameters"; goto loaderr;
935 }
936 appendServerSaveParams(seconds,changes);
937 } else if (!strcmp(argv[0],"dir") && argc == 2) {
938 if (chdir(argv[1]) == -1) {
939 redisLog(REDIS_WARNING,"Can't chdir to '%s': %s",
940 argv[1], strerror(errno));
941 exit(1);
942 }
943 } else if (!strcmp(argv[0],"loglevel") && argc == 2) {
944 if (!strcmp(argv[1],"debug")) server.verbosity = REDIS_DEBUG;
945 else if (!strcmp(argv[1],"notice")) server.verbosity = REDIS_NOTICE;
946 else if (!strcmp(argv[1],"warning")) server.verbosity = REDIS_WARNING;
947 else {
948 err = "Invalid log level. Must be one of debug, notice, warning";
949 goto loaderr;
950 }
951 } else if (!strcmp(argv[0],"logfile") && argc == 2) {
952 FILE *fp;
953
954 server.logfile = zstrdup(argv[1]);
955 if (!strcmp(server.logfile,"stdout")) {
956 zfree(server.logfile);
957 server.logfile = NULL;
958 }
959 if (server.logfile) {
960 /* Test if we are able to open the file. The server will not
961 * be able to abort just for this problem later... */
962 fp = fopen(server.logfile,"a");
963 if (fp == NULL) {
964 err = sdscatprintf(sdsempty(),
965 "Can't open the log file: %s", strerror(errno));
966 goto loaderr;
967 }
968 fclose(fp);
969 }
970 } else if (!strcmp(argv[0],"databases") && argc == 2) {
971 server.dbnum = atoi(argv[1]);
972 if (server.dbnum < 1) {
973 err = "Invalid number of databases"; goto loaderr;
974 }
975 } else if (!strcmp(argv[0],"slaveof") && argc == 3) {
976 server.masterhost = sdsnew(argv[1]);
977 server.masterport = atoi(argv[2]);
978 server.replstate = REDIS_REPL_CONNECT;
979 } else if (!strcmp(argv[0],"glueoutputbuf") && argc == 2) {
980 sdstolower(argv[1]);
981 if (!strcmp(argv[1],"yes")) server.glueoutputbuf = 1;
982 else if (!strcmp(argv[1],"no")) server.glueoutputbuf = 0;
983 else {
984 err = "argument must be 'yes' or 'no'"; goto loaderr;
985 }
986 } else if (!strcmp(argv[0],"shareobjects") && argc == 2) {
987 sdstolower(argv[1]);
988 if (!strcmp(argv[1],"yes")) server.shareobjects = 1;
989 else if (!strcmp(argv[1],"no")) server.shareobjects = 0;
990 else {
991 err = "argument must be 'yes' or 'no'"; goto loaderr;
992 }
993 } else if (!strcmp(argv[0],"daemonize") && argc == 2) {
994 sdstolower(argv[1]);
995 if (!strcmp(argv[1],"yes")) server.daemonize = 1;
996 else if (!strcmp(argv[1],"no")) server.daemonize = 0;
997 else {
998 err = "argument must be 'yes' or 'no'"; goto loaderr;
999 }
1000 } else if (!strcmp(argv[0],"requirepass") && argc == 2) {
1001 server.requirepass = zstrdup(argv[1]);
1002 } else if (!strcmp(argv[0],"pidfile") && argc == 2) {
1003 server.pidfile = zstrdup(argv[1]);
1004 } else {
1005 err = "Bad directive or wrong number of arguments"; goto loaderr;
1006 }
1007 for (j = 0; j < argc; j++)
1008 sdsfree(argv[j]);
1009 zfree(argv);
1010 sdsfree(line);
1011 }
1012 fclose(fp);
1013 return;
1014
1015loaderr:
1016 fprintf(stderr, "\n*** FATAL CONFIG FILE ERROR ***\n");
1017 fprintf(stderr, "Reading the configuration file, at line %d\n", linenum);
1018 fprintf(stderr, ">>> '%s'\n", line);
1019 fprintf(stderr, "%s\n", err);
1020 exit(1);
1021}
1022
1023static void freeClientArgv(redisClient *c) {
1024 int j;
1025
1026 for (j = 0; j < c->argc; j++)
1027 decrRefCount(c->argv[j]);
1028 c->argc = 0;
1029}
1030
1031static void freeClient(redisClient *c) {
1032 listNode *ln;
1033
1034 aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
1035 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1036 sdsfree(c->querybuf);
1037 listRelease(c->reply);
1038 freeClientArgv(c);
1039 close(c->fd);
1040 ln = listSearchKey(server.clients,c);
1041 assert(ln != NULL);
1042 listDelNode(server.clients,ln);
1043 if (c->flags & REDIS_SLAVE) {
1044 list *l = (c->flags & REDIS_MONITOR) ? server.monitors : server.slaves;
1045 ln = listSearchKey(l,c);
1046 assert(ln != NULL);
1047 listDelNode(l,ln);
1048 }
1049 if (c->flags & REDIS_MASTER) {
1050 server.master = NULL;
1051 server.replstate = REDIS_REPL_CONNECT;
1052 }
1053 zfree(c);
1054}
1055
1056static void glueReplyBuffersIfNeeded(redisClient *c) {
1057 int totlen = 0;
1058 listNode *ln = c->reply->head, *next;
1059 robj *o;
1060
1061 while(ln) {
1062 o = ln->value;
1063 totlen += sdslen(o->ptr);
1064 ln = ln->next;
1065 /* This optimization makes more sense if we don't have to copy
1066 * too much data */
1067 if (totlen > 1024) return;
1068 }
1069 if (totlen > 0) {
1070 char buf[1024];
1071 int copylen = 0;
1072
1073 ln = c->reply->head;
1074 while(ln) {
1075 next = ln->next;
1076 o = ln->value;
1077 memcpy(buf+copylen,o->ptr,sdslen(o->ptr));
1078 copylen += sdslen(o->ptr);
1079 listDelNode(c->reply,ln);
1080 ln = next;
1081 }
1082 /* Now the output buffer is empty, add the new single element */
1083 addReplySds(c,sdsnewlen(buf,totlen));
1084 }
1085}
1086
1087static void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1088 redisClient *c = privdata;
1089 int nwritten = 0, totwritten = 0, objlen;
1090 robj *o;
1091 REDIS_NOTUSED(el);
1092 REDIS_NOTUSED(mask);
1093
1094 if (server.glueoutputbuf && listLength(c->reply) > 1)
1095 glueReplyBuffersIfNeeded(c);
1096 while(listLength(c->reply)) {
1097 o = listNodeValue(listFirst(c->reply));
1098 objlen = sdslen(o->ptr);
1099
1100 if (objlen == 0) {
1101 listDelNode(c->reply,listFirst(c->reply));
1102 continue;
1103 }
1104
1105 if (c->flags & REDIS_MASTER) {
1106 nwritten = objlen - c->sentlen;
1107 } else {
1108 nwritten = write(fd, ((char*)o->ptr)+c->sentlen, objlen - c->sentlen);
1109 if (nwritten <= 0) break;
1110 }
1111 c->sentlen += nwritten;
1112 totwritten += nwritten;
1113 /* If we fully sent the object on head go to the next one */
1114 if (c->sentlen == objlen) {
1115 listDelNode(c->reply,listFirst(c->reply));
1116 c->sentlen = 0;
1117 }
1118 }
1119 if (nwritten == -1) {
1120 if (errno == EAGAIN) {
1121 nwritten = 0;
1122 } else {
1123 redisLog(REDIS_DEBUG,
1124 "Error writing to client: %s", strerror(errno));
1125 freeClient(c);
1126 return;
1127 }
1128 }
1129 if (totwritten > 0) c->lastinteraction = time(NULL);
1130 if (listLength(c->reply) == 0) {
1131 c->sentlen = 0;
1132 aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
1133 }
1134}
1135
1136static struct redisCommand *lookupCommand(char *name) {
1137 int j = 0;
1138 while(cmdTable[j].name != NULL) {
1139 if (!strcmp(name,cmdTable[j].name)) return &cmdTable[j];
1140 j++;
1141 }
1142 return NULL;
1143}
1144
1145/* resetClient prepare the client to process the next command */
1146static void resetClient(redisClient *c) {
1147 freeClientArgv(c);
1148 c->bulklen = -1;
1149}
1150
1151/* If this function gets called we already read a whole
1152 * command, argments are in the client argv/argc fields.
1153 * processCommand() execute the command or prepare the
1154 * server for a bulk read from the client.
1155 *
1156 * If 1 is returned the client is still alive and valid and
1157 * and other operations can be performed by the caller. Otherwise
1158 * if 0 is returned the client was destroied (i.e. after QUIT). */
1159static int processCommand(redisClient *c) {
1160 struct redisCommand *cmd;
1161 long long dirty;
1162
1163 sdstolower(c->argv[0]->ptr);
1164 /* The QUIT command is handled as a special case. Normal command
1165 * procs are unable to close the client connection safely */
1166 if (!strcmp(c->argv[0]->ptr,"quit")) {
1167 freeClient(c);
1168 return 0;
1169 }
1170 cmd = lookupCommand(c->argv[0]->ptr);
1171 if (!cmd) {
1172 addReplySds(c,sdsnew("-ERR unknown command\r\n"));
1173 resetClient(c);
1174 return 1;
1175 } else if ((cmd->arity > 0 && cmd->arity != c->argc) ||
1176 (c->argc < -cmd->arity)) {
1177 addReplySds(c,sdsnew("-ERR wrong number of arguments\r\n"));
1178 resetClient(c);
1179 return 1;
1180 } else if (cmd->flags & REDIS_CMD_BULK && c->bulklen == -1) {
1181 int bulklen = atoi(c->argv[c->argc-1]->ptr);
1182
1183 decrRefCount(c->argv[c->argc-1]);
1184 if (bulklen < 0 || bulklen > 1024*1024*1024) {
1185 c->argc--;
1186 addReplySds(c,sdsnew("-ERR invalid bulk write count\r\n"));
1187 resetClient(c);
1188 return 1;
1189 }
1190 c->argc--;
1191 c->bulklen = bulklen+2; /* add two bytes for CR+LF */
1192 /* It is possible that the bulk read is already in the
1193 * buffer. Check this condition and handle it accordingly */
1194 if ((signed)sdslen(c->querybuf) >= c->bulklen) {
1195 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1196 c->argc++;
1197 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1198 } else {
1199 return 1;
1200 }
1201 }
1202 /* Let's try to share objects on the command arguments vector */
1203 if (server.shareobjects) {
1204 int j;
1205 for(j = 1; j < c->argc; j++)
1206 c->argv[j] = tryObjectSharing(c->argv[j]);
1207 }
1208 /* Check if the user is authenticated */
1209 if (server.requirepass && !c->authenticated && cmd->proc != authCommand) {
1210 addReplySds(c,sdsnew("-ERR operation not permitted\r\n"));
1211 resetClient(c);
1212 return 1;
1213 }
1214
1215 /* Exec the command */
1216 dirty = server.dirty;
1217 cmd->proc(c);
1218 if (server.dirty-dirty != 0 && listLength(server.slaves))
1219 replicationFeedSlaves(server.slaves,cmd,c->db->id,c->argv,c->argc);
1220 if (listLength(server.monitors))
1221 replicationFeedSlaves(server.monitors,cmd,c->db->id,c->argv,c->argc);
1222 server.stat_numcommands++;
1223
1224 /* Prepare the client for the next command */
1225 if (c->flags & REDIS_CLOSE) {
1226 freeClient(c);
1227 return 0;
1228 }
1229 resetClient(c);
1230 return 1;
1231}
1232
1233static void replicationFeedSlaves(list *slaves, struct redisCommand *cmd, int dictid, robj **argv, int argc) {
1234 listNode *ln = slaves->head;
1235 robj *outv[REDIS_MAX_ARGS*4]; /* enough room for args, spaces, newlines */
1236 int outc = 0, j;
1237
1238 for (j = 0; j < argc; j++) {
1239 if (j != 0) outv[outc++] = shared.space;
1240 if ((cmd->flags & REDIS_CMD_BULK) && j == argc-1) {
1241 robj *lenobj;
1242
1243 lenobj = createObject(REDIS_STRING,
1244 sdscatprintf(sdsempty(),"%d\r\n",sdslen(argv[j]->ptr)));
1245 lenobj->refcount = 0;
1246 outv[outc++] = lenobj;
1247 }
1248 outv[outc++] = argv[j];
1249 }
1250 outv[outc++] = shared.crlf;
1251
1252 /* Increment all the refcounts at start and decrement at end in order to
1253 * be sure to free objects if there is no slave in a replication state
1254 * able to be feed with commands */
1255 for (j = 0; j < outc; j++) incrRefCount(outv[j]);
1256 while(ln) {
1257 redisClient *slave = ln->value;
1258
1259 /* Don't feed slaves that are still waiting for BGSAVE to start */
1260 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {
1261 ln = ln->next;
1262 continue;
1263 }
1264
1265 /* Feed all the other slaves, MONITORs and so on */
1266 if (slave->slaveseldb != dictid) {
1267 robj *selectcmd;
1268
1269 switch(dictid) {
1270 case 0: selectcmd = shared.select0; break;
1271 case 1: selectcmd = shared.select1; break;
1272 case 2: selectcmd = shared.select2; break;
1273 case 3: selectcmd = shared.select3; break;
1274 case 4: selectcmd = shared.select4; break;
1275 case 5: selectcmd = shared.select5; break;
1276 case 6: selectcmd = shared.select6; break;
1277 case 7: selectcmd = shared.select7; break;
1278 case 8: selectcmd = shared.select8; break;
1279 case 9: selectcmd = shared.select9; break;
1280 default:
1281 selectcmd = createObject(REDIS_STRING,
1282 sdscatprintf(sdsempty(),"select %d\r\n",dictid));
1283 selectcmd->refcount = 0;
1284 break;
1285 }
1286 addReply(slave,selectcmd);
1287 slave->slaveseldb = dictid;
1288 }
1289 for (j = 0; j < outc; j++) addReply(slave,outv[j]);
1290 ln = ln->next;
1291 }
1292 for (j = 0; j < outc; j++) decrRefCount(outv[j]);
1293}
1294
1295static void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
1296 redisClient *c = (redisClient*) privdata;
1297 char buf[REDIS_QUERYBUF_LEN];
1298 int nread;
1299 REDIS_NOTUSED(el);
1300 REDIS_NOTUSED(mask);
1301
1302 nread = read(fd, buf, REDIS_QUERYBUF_LEN);
1303 if (nread == -1) {
1304 if (errno == EAGAIN) {
1305 nread = 0;
1306 } else {
1307 redisLog(REDIS_DEBUG, "Reading from client: %s",strerror(errno));
1308 freeClient(c);
1309 return;
1310 }
1311 } else if (nread == 0) {
1312 redisLog(REDIS_DEBUG, "Client closed connection");
1313 freeClient(c);
1314 return;
1315 }
1316 if (nread) {
1317 c->querybuf = sdscatlen(c->querybuf, buf, nread);
1318 c->lastinteraction = time(NULL);
1319 } else {
1320 return;
1321 }
1322
1323again:
1324 if (c->bulklen == -1) {
1325 /* Read the first line of the query */
1326 char *p = strchr(c->querybuf,'\n');
1327 size_t querylen;
1328 if (p) {
1329 sds query, *argv;
1330 int argc, j;
1331
1332 query = c->querybuf;
1333 c->querybuf = sdsempty();
1334 querylen = 1+(p-(query));
1335 if (sdslen(query) > querylen) {
1336 /* leave data after the first line of the query in the buffer */
1337 c->querybuf = sdscatlen(c->querybuf,query+querylen,sdslen(query)-querylen);
1338 }
1339 *p = '\0'; /* remove "\n" */
1340 if (*(p-1) == '\r') *(p-1) = '\0'; /* and "\r" if any */
1341 sdsupdatelen(query);
1342
1343 /* Now we can split the query in arguments */
1344 if (sdslen(query) == 0) {
1345 /* Ignore empty query */
1346 sdsfree(query);
1347 return;
1348 }
1349 argv = sdssplitlen(query,sdslen(query)," ",1,&argc);
1350 sdsfree(query);
1351 if (argv == NULL) oom("sdssplitlen");
1352 for (j = 0; j < argc && j < REDIS_MAX_ARGS; j++) {
1353 if (sdslen(argv[j])) {
1354 c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
1355 c->argc++;
1356 } else {
1357 sdsfree(argv[j]);
1358 }
1359 }
1360 zfree(argv);
1361 /* Execute the command. If the client is still valid
1362 * after processCommand() return and there is something
1363 * on the query buffer try to process the next command. */
1364 if (processCommand(c) && sdslen(c->querybuf)) goto again;
1365 return;
1366 } else if (sdslen(c->querybuf) >= 1024) {
1367 redisLog(REDIS_DEBUG, "Client protocol error");
1368 freeClient(c);
1369 return;
1370 }
1371 } else {
1372 /* Bulk read handling. Note that if we are at this point
1373 the client already sent a command terminated with a newline,
1374 we are reading the bulk data that is actually the last
1375 argument of the command. */
1376 int qbl = sdslen(c->querybuf);
1377
1378 if (c->bulklen <= qbl) {
1379 /* Copy everything but the final CRLF as final argument */
1380 c->argv[c->argc] = createStringObject(c->querybuf,c->bulklen-2);
1381 c->argc++;
1382 c->querybuf = sdsrange(c->querybuf,c->bulklen,-1);
1383 processCommand(c);
1384 return;
1385 }
1386 }
1387}
1388
1389static int selectDb(redisClient *c, int id) {
1390 if (id < 0 || id >= server.dbnum)
1391 return REDIS_ERR;
1392 c->db = &server.db[id];
1393 return REDIS_OK;
1394}
1395
1396static void *dupClientReplyValue(void *o) {
1397 incrRefCount((robj*)o);
1398 return 0;
1399}
1400
1401static redisClient *createClient(int fd) {
1402 redisClient *c = zmalloc(sizeof(*c));
1403
1404 anetNonBlock(NULL,fd);
1405 anetTcpNoDelay(NULL,fd);
1406 if (!c) return NULL;
1407 selectDb(c,0);
1408 c->fd = fd;
1409 c->querybuf = sdsempty();
1410 c->argc = 0;
1411 c->bulklen = -1;
1412 c->sentlen = 0;
1413 c->flags = 0;
1414 c->lastinteraction = time(NULL);
1415 c->authenticated = 0;
1416 c->replstate = REDIS_REPL_NONE;
1417 if ((c->reply = listCreate()) == NULL) oom("listCreate");
1418 listSetFreeMethod(c->reply,decrRefCount);
1419 listSetDupMethod(c->reply,dupClientReplyValue);
1420 if (aeCreateFileEvent(server.el, c->fd, AE_READABLE,
1421 readQueryFromClient, c, NULL) == AE_ERR) {
1422 freeClient(c);
1423 return NULL;
1424 }
1425 if (!listAddNodeTail(server.clients,c)) oom("listAddNodeTail");
1426 return c;
1427}
1428
1429static void addReply(redisClient *c, robj *obj) {
1430 if (listLength(c->reply) == 0 &&
1431 aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
1432 sendReplyToClient, c, NULL) == AE_ERR) return;
1433 if (!listAddNodeTail(c->reply,obj)) oom("listAddNodeTail");
1434 incrRefCount(obj);
1435}
1436
1437static void addReplySds(redisClient *c, sds s) {
1438 robj *o = createObject(REDIS_STRING,s);
1439 addReply(c,o);
1440 decrRefCount(o);
1441}
1442
1443static void acceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
1444 int cport, cfd;
1445 char cip[128];
1446 REDIS_NOTUSED(el);
1447 REDIS_NOTUSED(mask);
1448 REDIS_NOTUSED(privdata);
1449
1450 cfd = anetAccept(server.neterr, fd, cip, &cport);
1451 if (cfd == AE_ERR) {
1452 redisLog(REDIS_DEBUG,"Accepting client connection: %s", server.neterr);
1453 return;
1454 }
1455 redisLog(REDIS_DEBUG,"Accepted %s:%d", cip, cport);
1456 if (createClient(cfd) == NULL) {
1457 redisLog(REDIS_WARNING,"Error allocating resoures for the client");
1458 close(cfd); /* May be already closed, just ingore errors */
1459 return;
1460 }
1461 server.stat_numconnections++;
1462}
1463
1464/* ======================= Redis objects implementation ===================== */
1465
1466static robj *createObject(int type, void *ptr) {
1467 robj *o;
1468
1469 if (listLength(server.objfreelist)) {
1470 listNode *head = listFirst(server.objfreelist);
1471 o = listNodeValue(head);
1472 listDelNode(server.objfreelist,head);
1473 } else {
1474 o = zmalloc(sizeof(*o));
1475 }
1476 if (!o) oom("createObject");
1477 o->type = type;
1478 o->ptr = ptr;
1479 o->refcount = 1;
1480 return o;
1481}
1482
1483static robj *createStringObject(char *ptr, size_t len) {
1484 return createObject(REDIS_STRING,sdsnewlen(ptr,len));
1485}
1486
1487static robj *createListObject(void) {
1488 list *l = listCreate();
1489
1490 if (!l) oom("listCreate");
1491 listSetFreeMethod(l,decrRefCount);
1492 return createObject(REDIS_LIST,l);
1493}
1494
1495static robj *createSetObject(void) {
1496 dict *d = dictCreate(&setDictType,NULL);
1497 if (!d) oom("dictCreate");
1498 return createObject(REDIS_SET,d);
1499}
1500
1501static void freeStringObject(robj *o) {
1502 sdsfree(o->ptr);
1503}
1504
1505static void freeListObject(robj *o) {
1506 listRelease((list*) o->ptr);
1507}
1508
1509static void freeSetObject(robj *o) {
1510 dictRelease((dict*) o->ptr);
1511}
1512
1513static void freeHashObject(robj *o) {
1514 dictRelease((dict*) o->ptr);
1515}
1516
1517static void incrRefCount(robj *o) {
1518 o->refcount++;
1519#ifdef DEBUG_REFCOUNT
1520 if (o->type == REDIS_STRING)
1521 printf("Increment '%s'(%p), now is: %d\n",o->ptr,o,o->refcount);
1522#endif
1523}
1524
1525static void decrRefCount(void *obj) {
1526 robj *o = obj;
1527
1528#ifdef DEBUG_REFCOUNT
1529 if (o->type == REDIS_STRING)
1530 printf("Decrement '%s'(%p), now is: %d\n",o->ptr,o,o->refcount-1);
1531#endif
1532 if (--(o->refcount) == 0) {
1533 switch(o->type) {
1534 case REDIS_STRING: freeStringObject(o); break;
1535 case REDIS_LIST: freeListObject(o); break;
1536 case REDIS_SET: freeSetObject(o); break;
1537 case REDIS_HASH: freeHashObject(o); break;
1538 default: assert(0 != 0); break;
1539 }
1540 if (listLength(server.objfreelist) > REDIS_OBJFREELIST_MAX ||
1541 !listAddNodeHead(server.objfreelist,o))
1542 zfree(o);
1543 }
1544}
1545
1546/* Try to share an object against the shared objects pool */
1547static robj *tryObjectSharing(robj *o) {
1548 struct dictEntry *de;
1549 unsigned long c;
1550
1551 if (o == NULL || server.shareobjects == 0) return o;
1552
1553 assert(o->type == REDIS_STRING);
1554 de = dictFind(server.sharingpool,o);
1555 if (de) {
1556 robj *shared = dictGetEntryKey(de);
1557
1558 c = ((unsigned long) dictGetEntryVal(de))+1;
1559 dictGetEntryVal(de) = (void*) c;
1560 incrRefCount(shared);
1561 decrRefCount(o);
1562 return shared;
1563 } else {
1564 /* Here we are using a stream algorihtm: Every time an object is
1565 * shared we increment its count, everytime there is a miss we
1566 * recrement the counter of a random object. If this object reaches
1567 * zero we remove the object and put the current object instead. */
1568 if (dictSize(server.sharingpool) >=
1569 server.sharingpoolsize) {
1570 de = dictGetRandomKey(server.sharingpool);
1571 assert(de != NULL);
1572 c = ((unsigned long) dictGetEntryVal(de))-1;
1573 dictGetEntryVal(de) = (void*) c;
1574 if (c == 0) {
1575 dictDelete(server.sharingpool,de->key);
1576 }
1577 } else {
1578 c = 0; /* If the pool is empty we want to add this object */
1579 }
1580 if (c == 0) {
1581 int retval;
1582
1583 retval = dictAdd(server.sharingpool,o,(void*)1);
1584 assert(retval == DICT_OK);
1585 incrRefCount(o);
1586 }
1587 return o;
1588 }
1589}
1590
1591static robj *lookupKey(redisDb *db, robj *key) {
1592 dictEntry *de = dictFind(db->dict,key);
1593 return de ? dictGetEntryVal(de) : NULL;
1594}
1595
1596static robj *lookupKeyRead(redisDb *db, robj *key) {
1597 expireIfNeeded(db,key);
1598 return lookupKey(db,key);
1599}
1600
1601static robj *lookupKeyWrite(redisDb *db, robj *key) {
1602 deleteIfVolatile(db,key);
1603 return lookupKey(db,key);
1604}
1605
1606static int deleteKey(redisDb *db, robj *key) {
1607 int retval;
1608
1609 /* We need to protect key from destruction: after the first dictDelete()
1610 * it may happen that 'key' is no longer valid if we don't increment
1611 * it's count. This may happen when we get the object reference directly
1612 * from the hash table with dictRandomKey() or dict iterators */
1613 incrRefCount(key);
1614 if (dictSize(db->expires)) dictDelete(db->expires,key);
1615 retval = dictDelete(db->dict,key);
1616 decrRefCount(key);
1617
1618 return retval == DICT_OK;
1619}
1620
1621/*============================ DB saving/loading ============================ */
1622
1623static int rdbSaveType(FILE *fp, unsigned char type) {
1624 if (fwrite(&type,1,1,fp) == 0) return -1;
1625 return 0;
1626}
1627
1628static int rdbSaveTime(FILE *fp, time_t t) {
1629 int32_t t32 = (int32_t) t;
1630 if (fwrite(&t32,4,1,fp) == 0) return -1;
1631 return 0;
1632}
1633
1634/* check rdbLoadLen() comments for more info */
1635static int rdbSaveLen(FILE *fp, uint32_t len) {
1636 unsigned char buf[2];
1637
1638 if (len < (1<<6)) {
1639 /* Save a 6 bit len */
1640 buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
1641 if (fwrite(buf,1,1,fp) == 0) return -1;
1642 } else if (len < (1<<14)) {
1643 /* Save a 14 bit len */
1644 buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
1645 buf[1] = len&0xFF;
1646 if (fwrite(buf,2,1,fp) == 0) return -1;
1647 } else {
1648 /* Save a 32 bit len */
1649 buf[0] = (REDIS_RDB_32BITLEN<<6);
1650 if (fwrite(buf,1,1,fp) == 0) return -1;
1651 len = htonl(len);
1652 if (fwrite(&len,4,1,fp) == 0) return -1;
1653 }
1654 return 0;
1655}
1656
1657/* String objects in the form "2391" "-100" without any space and with a
1658 * range of values that can fit in an 8, 16 or 32 bit signed value can be
1659 * encoded as integers to save space */
1660int rdbTryIntegerEncoding(sds s, unsigned char *enc) {
1661 long long value;
1662 char *endptr, buf[32];
1663
1664 /* Check if it's possible to encode this value as a number */
1665 value = strtoll(s, &endptr, 10);
1666 if (endptr[0] != '\0') return 0;
1667 snprintf(buf,32,"%lld",value);
1668
1669 /* If the number converted back into a string is not identical
1670 * then it's not possible to encode the string as integer */
1671 if (strlen(buf) != sdslen(s) || memcmp(buf,s,sdslen(s))) return 0;
1672
1673 /* Finally check if it fits in our ranges */
1674 if (value >= -(1<<7) && value <= (1<<7)-1) {
1675 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT8;
1676 enc[1] = value&0xFF;
1677 return 2;
1678 } else if (value >= -(1<<15) && value <= (1<<15)-1) {
1679 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT16;
1680 enc[1] = value&0xFF;
1681 enc[2] = (value>>8)&0xFF;
1682 return 3;
1683 } else if (value >= -((long long)1<<31) && value <= ((long long)1<<31)-1) {
1684 enc[0] = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_INT32;
1685 enc[1] = value&0xFF;
1686 enc[2] = (value>>8)&0xFF;
1687 enc[3] = (value>>16)&0xFF;
1688 enc[4] = (value>>24)&0xFF;
1689 return 5;
1690 } else {
1691 return 0;
1692 }
1693}
1694
1695static int rdbSaveLzfStringObject(FILE *fp, robj *obj) {
1696 unsigned int comprlen, outlen;
1697 unsigned char byte;
1698 void *out;
1699
1700 /* We require at least four bytes compression for this to be worth it */
1701 outlen = sdslen(obj->ptr)-4;
1702 if (outlen <= 0) return 0;
1703 if ((out = zmalloc(outlen)) == NULL) return 0;
1704 comprlen = lzf_compress(obj->ptr, sdslen(obj->ptr), out, outlen);
1705 if (comprlen == 0) {
1706 zfree(out);
1707 return 0;
1708 }
1709 /* Data compressed! Let's save it on disk */
1710 byte = (REDIS_RDB_ENCVAL<<6)|REDIS_RDB_ENC_LZF;
1711 if (fwrite(&byte,1,1,fp) == 0) goto writeerr;
1712 if (rdbSaveLen(fp,comprlen) == -1) goto writeerr;
1713 if (rdbSaveLen(fp,sdslen(obj->ptr)) == -1) goto writeerr;
1714 if (fwrite(out,comprlen,1,fp) == 0) goto writeerr;
1715 zfree(out);
1716 return comprlen;
1717
1718writeerr:
1719 zfree(out);
1720 return -1;
1721}
1722
1723/* Save a string objet as [len][data] on disk. If the object is a string
1724 * representation of an integer value we try to safe it in a special form */
1725static int rdbSaveStringObject(FILE *fp, robj *obj) {
1726 size_t len = sdslen(obj->ptr);
1727 int enclen;
1728
1729 /* Try integer encoding */
1730 if (len <= 11) {
1731 unsigned char buf[5];
1732 if ((enclen = rdbTryIntegerEncoding(obj->ptr,buf)) > 0) {
1733 if (fwrite(buf,enclen,1,fp) == 0) return -1;
1734 return 0;
1735 }
1736 }
1737
1738 /* Try LZF compression - under 20 bytes it's unable to compress even
1739 * aaaaaaaaaaaaaaaaaa so skip it */
1740 if (len > 20) {
1741 int retval;
1742
1743 retval = rdbSaveLzfStringObject(fp,obj);
1744 if (retval == -1) return -1;
1745 if (retval > 0) return 0;
1746 /* retval == 0 means data can't be compressed, save the old way */
1747 }
1748
1749 /* Store verbatim */
1750 if (rdbSaveLen(fp,len) == -1) return -1;
1751 if (len && fwrite(obj->ptr,len,1,fp) == 0) return -1;
1752 return 0;
1753}
1754
1755/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
1756static int rdbSave(char *filename) {
1757 dictIterator *di = NULL;
1758 dictEntry *de;
1759 FILE *fp;
1760 char tmpfile[256];
1761 int j;
1762 time_t now = time(NULL);
1763
1764 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
1765 fp = fopen(tmpfile,"w");
1766 if (!fp) {
1767 redisLog(REDIS_WARNING, "Failed saving the DB: %s", strerror(errno));
1768 return REDIS_ERR;
1769 }
1770 if (fwrite("REDIS0001",9,1,fp) == 0) goto werr;
1771 for (j = 0; j < server.dbnum; j++) {
1772 redisDb *db = server.db+j;
1773 dict *d = db->dict;
1774 if (dictSize(d) == 0) continue;
1775 di = dictGetIterator(d);
1776 if (!di) {
1777 fclose(fp);
1778 return REDIS_ERR;
1779 }
1780
1781 /* Write the SELECT DB opcode */
1782 if (rdbSaveType(fp,REDIS_SELECTDB) == -1) goto werr;
1783 if (rdbSaveLen(fp,j) == -1) goto werr;
1784
1785 /* Iterate this DB writing every entry */
1786 while((de = dictNext(di)) != NULL) {
1787 robj *key = dictGetEntryKey(de);
1788 robj *o = dictGetEntryVal(de);
1789 time_t expiretime = getExpire(db,key);
1790
1791 /* Save the expire time */
1792 if (expiretime != -1) {
1793 /* If this key is already expired skip it */
1794 if (expiretime < now) continue;
1795 if (rdbSaveType(fp,REDIS_EXPIRETIME) == -1) goto werr;
1796 if (rdbSaveTime(fp,expiretime) == -1) goto werr;
1797 }
1798 /* Save the key and associated value */
1799 if (rdbSaveType(fp,o->type) == -1) goto werr;
1800 if (rdbSaveStringObject(fp,key) == -1) goto werr;
1801 if (o->type == REDIS_STRING) {
1802 /* Save a string value */
1803 if (rdbSaveStringObject(fp,o) == -1) goto werr;
1804 } else if (o->type == REDIS_LIST) {
1805 /* Save a list value */
1806 list *list = o->ptr;
1807 listNode *ln = list->head;
1808
1809 if (rdbSaveLen(fp,listLength(list)) == -1) goto werr;
1810 while(ln) {
1811 robj *eleobj = listNodeValue(ln);
1812
1813 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1814 ln = ln->next;
1815 }
1816 } else if (o->type == REDIS_SET) {
1817 /* Save a set value */
1818 dict *set = o->ptr;
1819 dictIterator *di = dictGetIterator(set);
1820 dictEntry *de;
1821
1822 if (!set) oom("dictGetIteraotr");
1823 if (rdbSaveLen(fp,dictSize(set)) == -1) goto werr;
1824 while((de = dictNext(di)) != NULL) {
1825 robj *eleobj = dictGetEntryKey(de);
1826
1827 if (rdbSaveStringObject(fp,eleobj) == -1) goto werr;
1828 }
1829 dictReleaseIterator(di);
1830 } else {
1831 assert(0 != 0);
1832 }
1833 }
1834 dictReleaseIterator(di);
1835 }
1836 /* EOF opcode */
1837 if (rdbSaveType(fp,REDIS_EOF) == -1) goto werr;
1838
1839 /* Make sure data will not remain on the OS's output buffers */
1840 fflush(fp);
1841 fsync(fileno(fp));
1842 fclose(fp);
1843
1844 /* Use RENAME to make sure the DB file is changed atomically only
1845 * if the generate DB file is ok. */
1846 if (rename(tmpfile,filename) == -1) {
1847 redisLog(REDIS_WARNING,"Error moving temp DB file on the final destionation: %s", strerror(errno));
1848 unlink(tmpfile);
1849 return REDIS_ERR;
1850 }
1851 redisLog(REDIS_NOTICE,"DB saved on disk");
1852 server.dirty = 0;
1853 server.lastsave = time(NULL);
1854 return REDIS_OK;
1855
1856werr:
1857 fclose(fp);
1858 unlink(tmpfile);
1859 redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
1860 if (di) dictReleaseIterator(di);
1861 return REDIS_ERR;
1862}
1863
1864static int rdbSaveBackground(char *filename) {
1865 pid_t childpid;
1866
1867 if (server.bgsaveinprogress) return REDIS_ERR;
1868 if ((childpid = fork()) == 0) {
1869 /* Child */
1870 close(server.fd);
1871 if (rdbSave(filename) == REDIS_OK) {
1872 exit(0);
1873 } else {
1874 exit(1);
1875 }
1876 } else {
1877 /* Parent */
1878 redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);
1879 server.bgsaveinprogress = 1;
1880 return REDIS_OK;
1881 }
1882 return REDIS_OK; /* unreached */
1883}
1884
1885static int rdbLoadType(FILE *fp) {
1886 unsigned char type;
1887 if (fread(&type,1,1,fp) == 0) return -1;
1888 return type;
1889}
1890
1891static time_t rdbLoadTime(FILE *fp) {
1892 int32_t t32;
1893 if (fread(&t32,4,1,fp) == 0) return -1;
1894 return (time_t) t32;
1895}
1896
1897/* Load an encoded length from the DB, see the REDIS_RDB_* defines on the top
1898 * of this file for a description of how this are stored on disk.
1899 *
1900 * isencoded is set to 1 if the readed length is not actually a length but
1901 * an "encoding type", check the above comments for more info */
1902static uint32_t rdbLoadLen(FILE *fp, int rdbver, int *isencoded) {
1903 unsigned char buf[2];
1904 uint32_t len;
1905
1906 if (isencoded) *isencoded = 0;
1907 if (rdbver == 0) {
1908 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
1909 return ntohl(len);
1910 } else {
1911 int type;
1912
1913 if (fread(buf,1,1,fp) == 0) return REDIS_RDB_LENERR;
1914 type = (buf[0]&0xC0)>>6;
1915 if (type == REDIS_RDB_6BITLEN) {
1916 /* Read a 6 bit len */
1917 return buf[0]&0x3F;
1918 } else if (type == REDIS_RDB_ENCVAL) {
1919 /* Read a 6 bit len encoding type */
1920 if (isencoded) *isencoded = 1;
1921 return buf[0]&0x3F;
1922 } else if (type == REDIS_RDB_14BITLEN) {
1923 /* Read a 14 bit len */
1924 if (fread(buf+1,1,1,fp) == 0) return REDIS_RDB_LENERR;
1925 return ((buf[0]&0x3F)<<8)|buf[1];
1926 } else {
1927 /* Read a 32 bit len */
1928 if (fread(&len,4,1,fp) == 0) return REDIS_RDB_LENERR;
1929 return ntohl(len);
1930 }
1931 }
1932}
1933
1934static robj *rdbLoadIntegerObject(FILE *fp, int enctype) {
1935 unsigned char enc[4];
1936 long long val;
1937
1938 if (enctype == REDIS_RDB_ENC_INT8) {
1939 if (fread(enc,1,1,fp) == 0) return NULL;
1940 val = (signed char)enc[0];
1941 } else if (enctype == REDIS_RDB_ENC_INT16) {
1942 uint16_t v;
1943 if (fread(enc,2,1,fp) == 0) return NULL;
1944 v = enc[0]|(enc[1]<<8);
1945 val = (int16_t)v;
1946 } else if (enctype == REDIS_RDB_ENC_INT32) {
1947 uint32_t v;
1948 if (fread(enc,4,1,fp) == 0) return NULL;
1949 v = enc[0]|(enc[1]<<8)|(enc[2]<<16)|(enc[3]<<24);
1950 val = (int32_t)v;
1951 } else {
1952 val = 0; /* anti-warning */
1953 assert(0!=0);
1954 }
1955 return createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",val));
1956}
1957
1958static robj *rdbLoadLzfStringObject(FILE*fp, int rdbver) {
1959 unsigned int len, clen;
1960 unsigned char *c = NULL;
1961 sds val = NULL;
1962
1963 if ((clen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
1964 if ((len = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR) return NULL;
1965 if ((c = zmalloc(clen)) == NULL) goto err;
1966 if ((val = sdsnewlen(NULL,len)) == NULL) goto err;
1967 if (fread(c,clen,1,fp) == 0) goto err;
1968 if (lzf_decompress(c,clen,val,len) == 0) goto err;
1969 return createObject(REDIS_STRING,val);
1970err:
1971 zfree(c);
1972 sdsfree(val);
1973 return NULL;
1974}
1975
1976static robj *rdbLoadStringObject(FILE*fp, int rdbver) {
1977 int isencoded;
1978 uint32_t len;
1979 sds val;
1980
1981 len = rdbLoadLen(fp,rdbver,&isencoded);
1982 if (isencoded) {
1983 switch(len) {
1984 case REDIS_RDB_ENC_INT8:
1985 case REDIS_RDB_ENC_INT16:
1986 case REDIS_RDB_ENC_INT32:
1987 return tryObjectSharing(rdbLoadIntegerObject(fp,len));
1988 case REDIS_RDB_ENC_LZF:
1989 return tryObjectSharing(rdbLoadLzfStringObject(fp,rdbver));
1990 default:
1991 assert(0!=0);
1992 }
1993 }
1994
1995 if (len == REDIS_RDB_LENERR) return NULL;
1996 val = sdsnewlen(NULL,len);
1997 if (len && fread(val,len,1,fp) == 0) {
1998 sdsfree(val);
1999 return NULL;
2000 }
2001 return tryObjectSharing(createObject(REDIS_STRING,val));
2002}
2003
2004static int rdbLoad(char *filename) {
2005 FILE *fp;
2006 robj *keyobj = NULL;
2007 uint32_t dbid;
2008 int type, retval, rdbver;
2009 dict *d = server.db[0].dict;
2010 redisDb *db = server.db+0;
2011 char buf[1024];
2012 time_t expiretime = -1, now = time(NULL);
2013
2014 fp = fopen(filename,"r");
2015 if (!fp) return REDIS_ERR;
2016 if (fread(buf,9,1,fp) == 0) goto eoferr;
2017 buf[9] = '\0';
2018 if (memcmp(buf,"REDIS",5) != 0) {
2019 fclose(fp);
2020 redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
2021 return REDIS_ERR;
2022 }
2023 rdbver = atoi(buf+5);
2024 if (rdbver > 1) {
2025 fclose(fp);
2026 redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
2027 return REDIS_ERR;
2028 }
2029 while(1) {
2030 robj *o;
2031
2032 /* Read type. */
2033 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2034 if (type == REDIS_EXPIRETIME) {
2035 if ((expiretime = rdbLoadTime(fp)) == -1) goto eoferr;
2036 /* We read the time so we need to read the object type again */
2037 if ((type = rdbLoadType(fp)) == -1) goto eoferr;
2038 }
2039 if (type == REDIS_EOF) break;
2040 /* Handle SELECT DB opcode as a special case */
2041 if (type == REDIS_SELECTDB) {
2042 if ((dbid = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2043 goto eoferr;
2044 if (dbid >= (unsigned)server.dbnum) {
2045 redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
2046 exit(1);
2047 }
2048 db = server.db+dbid;
2049 d = db->dict;
2050 continue;
2051 }
2052 /* Read key */
2053 if ((keyobj = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2054
2055 if (type == REDIS_STRING) {
2056 /* Read string value */
2057 if ((o = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2058 } else if (type == REDIS_LIST || type == REDIS_SET) {
2059 /* Read list/set value */
2060 uint32_t listlen;
2061
2062 if ((listlen = rdbLoadLen(fp,rdbver,NULL)) == REDIS_RDB_LENERR)
2063 goto eoferr;
2064 o = (type == REDIS_LIST) ? createListObject() : createSetObject();
2065 /* Load every single element of the list/set */
2066 while(listlen--) {
2067 robj *ele;
2068
2069 if ((ele = rdbLoadStringObject(fp,rdbver)) == NULL) goto eoferr;
2070 if (type == REDIS_LIST) {
2071 if (!listAddNodeTail((list*)o->ptr,ele))
2072 oom("listAddNodeTail");
2073 } else {
2074 if (dictAdd((dict*)o->ptr,ele,NULL) == DICT_ERR)
2075 oom("dictAdd");
2076 }
2077 }
2078 } else {
2079 assert(0 != 0);
2080 }
2081 /* Add the new object in the hash table */
2082 retval = dictAdd(d,keyobj,o);
2083 if (retval == DICT_ERR) {
2084 redisLog(REDIS_WARNING,"Loading DB, duplicated key (%s) found! Unrecoverable error, exiting now.", keyobj->ptr);
2085 exit(1);
2086 }
2087 /* Set the expire time if needed */
2088 if (expiretime != -1) {
2089 setExpire(db,keyobj,expiretime);
2090 /* Delete this key if already expired */
2091 if (expiretime < now) deleteKey(db,keyobj);
2092 expiretime = -1;
2093 }
2094 keyobj = o = NULL;
2095 }
2096 fclose(fp);
2097 return REDIS_OK;
2098
2099eoferr: /* unexpected end of file is handled here with a fatal exit */
2100 if (keyobj) decrRefCount(keyobj);
2101 redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, exiting now.");
2102 exit(1);
2103 return REDIS_ERR; /* Just to avoid warning */
2104}
2105
2106/*================================== Commands =============================== */
2107
2108static void authCommand(redisClient *c) {
2109 if (!server.requirepass || !strcmp(c->argv[1]->ptr, server.requirepass)) {
2110 c->authenticated = 1;
2111 addReply(c,shared.ok);
2112 } else {
2113 c->authenticated = 0;
2114 addReply(c,shared.err);
2115 }
2116}
2117
2118static void pingCommand(redisClient *c) {
2119 addReply(c,shared.pong);
2120}
2121
2122static void echoCommand(redisClient *c) {
2123 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
2124 (int)sdslen(c->argv[1]->ptr)));
2125 addReply(c,c->argv[1]);
2126 addReply(c,shared.crlf);
2127}
2128
2129/*=================================== Strings =============================== */
2130
2131static void setGenericCommand(redisClient *c, int nx) {
2132 int retval;
2133
2134 retval = dictAdd(c->db->dict,c->argv[1],c->argv[2]);
2135 if (retval == DICT_ERR) {
2136 if (!nx) {
2137 dictReplace(c->db->dict,c->argv[1],c->argv[2]);
2138 incrRefCount(c->argv[2]);
2139 } else {
2140 addReply(c,shared.czero);
2141 return;
2142 }
2143 } else {
2144 incrRefCount(c->argv[1]);
2145 incrRefCount(c->argv[2]);
2146 }
2147 server.dirty++;
2148 removeExpire(c->db,c->argv[1]);
2149 addReply(c, nx ? shared.cone : shared.ok);
2150}
2151
2152static void setCommand(redisClient *c) {
2153 setGenericCommand(c,0);
2154}
2155
2156static void setnxCommand(redisClient *c) {
2157 setGenericCommand(c,1);
2158}
2159
2160static void getCommand(redisClient *c) {
2161 robj *o = lookupKeyRead(c->db,c->argv[1]);
2162
2163 if (o == NULL) {
2164 addReply(c,shared.nullbulk);
2165 } else {
2166 if (o->type != REDIS_STRING) {
2167 addReply(c,shared.wrongtypeerr);
2168 } else {
2169 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2170 addReply(c,o);
2171 addReply(c,shared.crlf);
2172 }
2173 }
2174}
2175
2176static void mgetCommand(redisClient *c) {
2177 int j;
2178
2179 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",c->argc-1));
2180 for (j = 1; j < c->argc; j++) {
2181 robj *o = lookupKeyRead(c->db,c->argv[j]);
2182 if (o == NULL) {
2183 addReply(c,shared.nullbulk);
2184 } else {
2185 if (o->type != REDIS_STRING) {
2186 addReply(c,shared.nullbulk);
2187 } else {
2188 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(o->ptr)));
2189 addReply(c,o);
2190 addReply(c,shared.crlf);
2191 }
2192 }
2193 }
2194}
2195
2196static void incrDecrCommand(redisClient *c, int incr) {
2197 long long value;
2198 int retval;
2199 robj *o;
2200
2201 o = lookupKeyWrite(c->db,c->argv[1]);
2202 if (o == NULL) {
2203 value = 0;
2204 } else {
2205 if (o->type != REDIS_STRING) {
2206 value = 0;
2207 } else {
2208 char *eptr;
2209
2210 value = strtoll(o->ptr, &eptr, 10);
2211 }
2212 }
2213
2214 value += incr;
2215 o = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"%lld",value));
2216 retval = dictAdd(c->db->dict,c->argv[1],o);
2217 if (retval == DICT_ERR) {
2218 dictReplace(c->db->dict,c->argv[1],o);
2219 removeExpire(c->db,c->argv[1]);
2220 } else {
2221 incrRefCount(c->argv[1]);
2222 }
2223 server.dirty++;
2224 addReply(c,shared.colon);
2225 addReply(c,o);
2226 addReply(c,shared.crlf);
2227}
2228
2229static void incrCommand(redisClient *c) {
2230 incrDecrCommand(c,1);
2231}
2232
2233static void decrCommand(redisClient *c) {
2234 incrDecrCommand(c,-1);
2235}
2236
2237static void incrbyCommand(redisClient *c) {
2238 int incr = atoi(c->argv[2]->ptr);
2239 incrDecrCommand(c,incr);
2240}
2241
2242static void decrbyCommand(redisClient *c) {
2243 int incr = atoi(c->argv[2]->ptr);
2244 incrDecrCommand(c,-incr);
2245}
2246
2247/* ========================= Type agnostic commands ========================= */
2248
2249static void delCommand(redisClient *c) {
2250 if (deleteKey(c->db,c->argv[1])) {
2251 server.dirty++;
2252 addReply(c,shared.cone);
2253 } else {
2254 addReply(c,shared.czero);
2255 }
2256}
2257
2258static void existsCommand(redisClient *c) {
2259 addReply(c,lookupKeyRead(c->db,c->argv[1]) ? shared.cone : shared.czero);
2260}
2261
2262static void selectCommand(redisClient *c) {
2263 int id = atoi(c->argv[1]->ptr);
2264
2265 if (selectDb(c,id) == REDIS_ERR) {
2266 addReplySds(c,sdsnew("-ERR invalid DB index\r\n"));
2267 } else {
2268 addReply(c,shared.ok);
2269 }
2270}
2271
2272static void randomkeyCommand(redisClient *c) {
2273 dictEntry *de;
2274
2275 while(1) {
2276 de = dictGetRandomKey(c->db->dict);
2277 if (!de || expireIfNeeded(c->db,dictGetEntryKey(de)) == 0) break;
2278 }
2279 if (de == NULL) {
2280 addReply(c,shared.plus);
2281 addReply(c,shared.crlf);
2282 } else {
2283 addReply(c,shared.plus);
2284 addReply(c,dictGetEntryKey(de));
2285 addReply(c,shared.crlf);
2286 }
2287}
2288
2289static void keysCommand(redisClient *c) {
2290 dictIterator *di;
2291 dictEntry *de;
2292 sds pattern = c->argv[1]->ptr;
2293 int plen = sdslen(pattern);
2294 int numkeys = 0, keyslen = 0;
2295 robj *lenobj = createObject(REDIS_STRING,NULL);
2296
2297 di = dictGetIterator(c->db->dict);
2298 if (!di) oom("dictGetIterator");
2299 addReply(c,lenobj);
2300 decrRefCount(lenobj);
2301 while((de = dictNext(di)) != NULL) {
2302 robj *keyobj = dictGetEntryKey(de);
2303
2304 sds key = keyobj->ptr;
2305 if ((pattern[0] == '*' && pattern[1] == '\0') ||
2306 stringmatchlen(pattern,plen,key,sdslen(key),0)) {
2307 if (expireIfNeeded(c->db,keyobj) == 0) {
2308 if (numkeys != 0)
2309 addReply(c,shared.space);
2310 addReply(c,keyobj);
2311 numkeys++;
2312 keyslen += sdslen(key);
2313 }
2314 }
2315 }
2316 dictReleaseIterator(di);
2317 lenobj->ptr = sdscatprintf(sdsempty(),"$%lu\r\n",keyslen+(numkeys ? (numkeys-1) : 0));
2318 addReply(c,shared.crlf);
2319}
2320
2321static void dbsizeCommand(redisClient *c) {
2322 addReplySds(c,
2323 sdscatprintf(sdsempty(),":%lu\r\n",dictSize(c->db->dict)));
2324}
2325
2326static void lastsaveCommand(redisClient *c) {
2327 addReplySds(c,
2328 sdscatprintf(sdsempty(),":%lu\r\n",server.lastsave));
2329}
2330
2331static void typeCommand(redisClient *c) {
2332 robj *o;
2333 char *type;
2334
2335 o = lookupKeyRead(c->db,c->argv[1]);
2336 if (o == NULL) {
2337 type = "+none";
2338 } else {
2339 switch(o->type) {
2340 case REDIS_STRING: type = "+string"; break;
2341 case REDIS_LIST: type = "+list"; break;
2342 case REDIS_SET: type = "+set"; break;
2343 default: type = "unknown"; break;
2344 }
2345 }
2346 addReplySds(c,sdsnew(type));
2347 addReply(c,shared.crlf);
2348}
2349
2350static void saveCommand(redisClient *c) {
2351 if (server.bgsaveinprogress) {
2352 addReplySds(c,sdsnew("-ERR background save in progress\r\n"));
2353 return;
2354 }
2355 if (rdbSave(server.dbfilename) == REDIS_OK) {
2356 addReply(c,shared.ok);
2357 } else {
2358 addReply(c,shared.err);
2359 }
2360}
2361
2362static void bgsaveCommand(redisClient *c) {
2363 if (server.bgsaveinprogress) {
2364 addReplySds(c,sdsnew("-ERR background save already in progress\r\n"));
2365 return;
2366 }
2367 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
2368 addReply(c,shared.ok);
2369 } else {
2370 addReply(c,shared.err);
2371 }
2372}
2373
2374static void shutdownCommand(redisClient *c) {
2375 redisLog(REDIS_WARNING,"User requested shutdown, saving DB...");
2376 if (rdbSave(server.dbfilename) == REDIS_OK) {
2377 if (server.daemonize) {
2378 unlink(server.pidfile);
2379 }
2380 redisLog(REDIS_WARNING,"Server exit now, bye bye...");
2381 exit(1);
2382 } else {
2383 redisLog(REDIS_WARNING,"Error trying to save the DB, can't exit");
2384 addReplySds(c,sdsnew("-ERR can't quit, problems saving the DB\r\n"));
2385 }
2386}
2387
2388static void renameGenericCommand(redisClient *c, int nx) {
2389 robj *o;
2390
2391 /* To use the same key as src and dst is probably an error */
2392 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
2393 addReply(c,shared.sameobjecterr);
2394 return;
2395 }
2396
2397 o = lookupKeyWrite(c->db,c->argv[1]);
2398 if (o == NULL) {
2399 addReply(c,shared.nokeyerr);
2400 return;
2401 }
2402 incrRefCount(o);
2403 deleteIfVolatile(c->db,c->argv[2]);
2404 if (dictAdd(c->db->dict,c->argv[2],o) == DICT_ERR) {
2405 if (nx) {
2406 decrRefCount(o);
2407 addReply(c,shared.czero);
2408 return;
2409 }
2410 dictReplace(c->db->dict,c->argv[2],o);
2411 } else {
2412 incrRefCount(c->argv[2]);
2413 }
2414 deleteKey(c->db,c->argv[1]);
2415 server.dirty++;
2416 addReply(c,nx ? shared.cone : shared.ok);
2417}
2418
2419static void renameCommand(redisClient *c) {
2420 renameGenericCommand(c,0);
2421}
2422
2423static void renamenxCommand(redisClient *c) {
2424 renameGenericCommand(c,1);
2425}
2426
2427static void moveCommand(redisClient *c) {
2428 robj *o;
2429 redisDb *src, *dst;
2430 int srcid;
2431
2432 /* Obtain source and target DB pointers */
2433 src = c->db;
2434 srcid = c->db->id;
2435 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
2436 addReply(c,shared.outofrangeerr);
2437 return;
2438 }
2439 dst = c->db;
2440 selectDb(c,srcid); /* Back to the source DB */
2441
2442 /* If the user is moving using as target the same
2443 * DB as the source DB it is probably an error. */
2444 if (src == dst) {
2445 addReply(c,shared.sameobjecterr);
2446 return;
2447 }
2448
2449 /* Check if the element exists and get a reference */
2450 o = lookupKeyWrite(c->db,c->argv[1]);
2451 if (!o) {
2452 addReply(c,shared.czero);
2453 return;
2454 }
2455
2456 /* Try to add the element to the target DB */
2457 deleteIfVolatile(dst,c->argv[1]);
2458 if (dictAdd(dst->dict,c->argv[1],o) == DICT_ERR) {
2459 addReply(c,shared.czero);
2460 return;
2461 }
2462 incrRefCount(c->argv[1]);
2463 incrRefCount(o);
2464
2465 /* OK! key moved, free the entry in the source DB */
2466 deleteKey(src,c->argv[1]);
2467 server.dirty++;
2468 addReply(c,shared.cone);
2469}
2470
2471/* =================================== Lists ================================ */
2472static void pushGenericCommand(redisClient *c, int where) {
2473 robj *lobj;
2474 list *list;
2475
2476 lobj = lookupKeyWrite(c->db,c->argv[1]);
2477 if (lobj == NULL) {
2478 lobj = createListObject();
2479 list = lobj->ptr;
2480 if (where == REDIS_HEAD) {
2481 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2482 } else {
2483 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2484 }
2485 dictAdd(c->db->dict,c->argv[1],lobj);
2486 incrRefCount(c->argv[1]);
2487 incrRefCount(c->argv[2]);
2488 } else {
2489 if (lobj->type != REDIS_LIST) {
2490 addReply(c,shared.wrongtypeerr);
2491 return;
2492 }
2493 list = lobj->ptr;
2494 if (where == REDIS_HEAD) {
2495 if (!listAddNodeHead(list,c->argv[2])) oom("listAddNodeHead");
2496 } else {
2497 if (!listAddNodeTail(list,c->argv[2])) oom("listAddNodeTail");
2498 }
2499 incrRefCount(c->argv[2]);
2500 }
2501 server.dirty++;
2502 addReply(c,shared.ok);
2503}
2504
2505static void lpushCommand(redisClient *c) {
2506 pushGenericCommand(c,REDIS_HEAD);
2507}
2508
2509static void rpushCommand(redisClient *c) {
2510 pushGenericCommand(c,REDIS_TAIL);
2511}
2512
2513static void llenCommand(redisClient *c) {
2514 robj *o;
2515 list *l;
2516
2517 o = lookupKeyRead(c->db,c->argv[1]);
2518 if (o == NULL) {
2519 addReply(c,shared.czero);
2520 return;
2521 } else {
2522 if (o->type != REDIS_LIST) {
2523 addReply(c,shared.wrongtypeerr);
2524 } else {
2525 l = o->ptr;
2526 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",listLength(l)));
2527 }
2528 }
2529}
2530
2531static void lindexCommand(redisClient *c) {
2532 robj *o;
2533 int index = atoi(c->argv[2]->ptr);
2534
2535 o = lookupKeyRead(c->db,c->argv[1]);
2536 if (o == NULL) {
2537 addReply(c,shared.nullbulk);
2538 } else {
2539 if (o->type != REDIS_LIST) {
2540 addReply(c,shared.wrongtypeerr);
2541 } else {
2542 list *list = o->ptr;
2543 listNode *ln;
2544
2545 ln = listIndex(list, index);
2546 if (ln == NULL) {
2547 addReply(c,shared.nullbulk);
2548 } else {
2549 robj *ele = listNodeValue(ln);
2550 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2551 addReply(c,ele);
2552 addReply(c,shared.crlf);
2553 }
2554 }
2555 }
2556}
2557
2558static void lsetCommand(redisClient *c) {
2559 robj *o;
2560 int index = atoi(c->argv[2]->ptr);
2561
2562 o = lookupKeyWrite(c->db,c->argv[1]);
2563 if (o == NULL) {
2564 addReply(c,shared.nokeyerr);
2565 } else {
2566 if (o->type != REDIS_LIST) {
2567 addReply(c,shared.wrongtypeerr);
2568 } else {
2569 list *list = o->ptr;
2570 listNode *ln;
2571
2572 ln = listIndex(list, index);
2573 if (ln == NULL) {
2574 addReply(c,shared.outofrangeerr);
2575 } else {
2576 robj *ele = listNodeValue(ln);
2577
2578 decrRefCount(ele);
2579 listNodeValue(ln) = c->argv[3];
2580 incrRefCount(c->argv[3]);
2581 addReply(c,shared.ok);
2582 server.dirty++;
2583 }
2584 }
2585 }
2586}
2587
2588static void popGenericCommand(redisClient *c, int where) {
2589 robj *o;
2590
2591 o = lookupKeyWrite(c->db,c->argv[1]);
2592 if (o == NULL) {
2593 addReply(c,shared.nullbulk);
2594 } else {
2595 if (o->type != REDIS_LIST) {
2596 addReply(c,shared.wrongtypeerr);
2597 } else {
2598 list *list = o->ptr;
2599 listNode *ln;
2600
2601 if (where == REDIS_HEAD)
2602 ln = listFirst(list);
2603 else
2604 ln = listLast(list);
2605
2606 if (ln == NULL) {
2607 addReply(c,shared.nullbulk);
2608 } else {
2609 robj *ele = listNodeValue(ln);
2610 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2611 addReply(c,ele);
2612 addReply(c,shared.crlf);
2613 listDelNode(list,ln);
2614 server.dirty++;
2615 }
2616 }
2617 }
2618}
2619
2620static void lpopCommand(redisClient *c) {
2621 popGenericCommand(c,REDIS_HEAD);
2622}
2623
2624static void rpopCommand(redisClient *c) {
2625 popGenericCommand(c,REDIS_TAIL);
2626}
2627
2628static void lrangeCommand(redisClient *c) {
2629 robj *o;
2630 int start = atoi(c->argv[2]->ptr);
2631 int end = atoi(c->argv[3]->ptr);
2632
2633 o = lookupKeyRead(c->db,c->argv[1]);
2634 if (o == NULL) {
2635 addReply(c,shared.nullmultibulk);
2636 } else {
2637 if (o->type != REDIS_LIST) {
2638 addReply(c,shared.wrongtypeerr);
2639 } else {
2640 list *list = o->ptr;
2641 listNode *ln;
2642 int llen = listLength(list);
2643 int rangelen, j;
2644 robj *ele;
2645
2646 /* convert negative indexes */
2647 if (start < 0) start = llen+start;
2648 if (end < 0) end = llen+end;
2649 if (start < 0) start = 0;
2650 if (end < 0) end = 0;
2651
2652 /* indexes sanity checks */
2653 if (start > end || start >= llen) {
2654 /* Out of range start or start > end result in empty list */
2655 addReply(c,shared.emptymultibulk);
2656 return;
2657 }
2658 if (end >= llen) end = llen-1;
2659 rangelen = (end-start)+1;
2660
2661 /* Return the result in form of a multi-bulk reply */
2662 ln = listIndex(list, start);
2663 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
2664 for (j = 0; j < rangelen; j++) {
2665 ele = listNodeValue(ln);
2666 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",(int)sdslen(ele->ptr)));
2667 addReply(c,ele);
2668 addReply(c,shared.crlf);
2669 ln = ln->next;
2670 }
2671 }
2672 }
2673}
2674
2675static void ltrimCommand(redisClient *c) {
2676 robj *o;
2677 int start = atoi(c->argv[2]->ptr);
2678 int end = atoi(c->argv[3]->ptr);
2679
2680 o = lookupKeyWrite(c->db,c->argv[1]);
2681 if (o == NULL) {
2682 addReply(c,shared.nokeyerr);
2683 } else {
2684 if (o->type != REDIS_LIST) {
2685 addReply(c,shared.wrongtypeerr);
2686 } else {
2687 list *list = o->ptr;
2688 listNode *ln;
2689 int llen = listLength(list);
2690 int j, ltrim, rtrim;
2691
2692 /* convert negative indexes */
2693 if (start < 0) start = llen+start;
2694 if (end < 0) end = llen+end;
2695 if (start < 0) start = 0;
2696 if (end < 0) end = 0;
2697
2698 /* indexes sanity checks */
2699 if (start > end || start >= llen) {
2700 /* Out of range start or start > end result in empty list */
2701 ltrim = llen;
2702 rtrim = 0;
2703 } else {
2704 if (end >= llen) end = llen-1;
2705 ltrim = start;
2706 rtrim = llen-end-1;
2707 }
2708
2709 /* Remove list elements to perform the trim */
2710 for (j = 0; j < ltrim; j++) {
2711 ln = listFirst(list);
2712 listDelNode(list,ln);
2713 }
2714 for (j = 0; j < rtrim; j++) {
2715 ln = listLast(list);
2716 listDelNode(list,ln);
2717 }
2718 addReply(c,shared.ok);
2719 server.dirty++;
2720 }
2721 }
2722}
2723
2724static void lremCommand(redisClient *c) {
2725 robj *o;
2726
2727 o = lookupKeyWrite(c->db,c->argv[1]);
2728 if (o == NULL) {
2729 addReply(c,shared.nokeyerr);
2730 } else {
2731 if (o->type != REDIS_LIST) {
2732 addReply(c,shared.wrongtypeerr);
2733 } else {
2734 list *list = o->ptr;
2735 listNode *ln, *next;
2736 int toremove = atoi(c->argv[2]->ptr);
2737 int removed = 0;
2738 int fromtail = 0;
2739
2740 if (toremove < 0) {
2741 toremove = -toremove;
2742 fromtail = 1;
2743 }
2744 ln = fromtail ? list->tail : list->head;
2745 while (ln) {
2746 robj *ele = listNodeValue(ln);
2747
2748 next = fromtail ? ln->prev : ln->next;
2749 if (sdscmp(ele->ptr,c->argv[3]->ptr) == 0) {
2750 listDelNode(list,ln);
2751 server.dirty++;
2752 removed++;
2753 if (toremove && removed == toremove) break;
2754 }
2755 ln = next;
2756 }
2757 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
2758 }
2759 }
2760}
2761
2762/* ==================================== Sets ================================ */
2763
2764static void saddCommand(redisClient *c) {
2765 robj *set;
2766
2767 set = lookupKeyWrite(c->db,c->argv[1]);
2768 if (set == NULL) {
2769 set = createSetObject();
2770 dictAdd(c->db->dict,c->argv[1],set);
2771 incrRefCount(c->argv[1]);
2772 } else {
2773 if (set->type != REDIS_SET) {
2774 addReply(c,shared.wrongtypeerr);
2775 return;
2776 }
2777 }
2778 if (dictAdd(set->ptr,c->argv[2],NULL) == DICT_OK) {
2779 incrRefCount(c->argv[2]);
2780 server.dirty++;
2781 addReply(c,shared.cone);
2782 } else {
2783 addReply(c,shared.czero);
2784 }
2785}
2786
2787static void sremCommand(redisClient *c) {
2788 robj *set;
2789
2790 set = lookupKeyWrite(c->db,c->argv[1]);
2791 if (set == NULL) {
2792 addReply(c,shared.czero);
2793 } else {
2794 if (set->type != REDIS_SET) {
2795 addReply(c,shared.wrongtypeerr);
2796 return;
2797 }
2798 if (dictDelete(set->ptr,c->argv[2]) == DICT_OK) {
2799 server.dirty++;
2800 addReply(c,shared.cone);
2801 } else {
2802 addReply(c,shared.czero);
2803 }
2804 }
2805}
2806
2807static void sismemberCommand(redisClient *c) {
2808 robj *set;
2809
2810 set = lookupKeyRead(c->db,c->argv[1]);
2811 if (set == NULL) {
2812 addReply(c,shared.czero);
2813 } else {
2814 if (set->type != REDIS_SET) {
2815 addReply(c,shared.wrongtypeerr);
2816 return;
2817 }
2818 if (dictFind(set->ptr,c->argv[2]))
2819 addReply(c,shared.cone);
2820 else
2821 addReply(c,shared.czero);
2822 }
2823}
2824
2825static void scardCommand(redisClient *c) {
2826 robj *o;
2827 dict *s;
2828
2829 o = lookupKeyRead(c->db,c->argv[1]);
2830 if (o == NULL) {
2831 addReply(c,shared.czero);
2832 return;
2833 } else {
2834 if (o->type != REDIS_SET) {
2835 addReply(c,shared.wrongtypeerr);
2836 } else {
2837 s = o->ptr;
2838 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",
2839 dictSize(s)));
2840 }
2841 }
2842}
2843
2844static int qsortCompareSetsByCardinality(const void *s1, const void *s2) {
2845 dict **d1 = (void*) s1, **d2 = (void*) s2;
2846
2847 return dictSize(*d1)-dictSize(*d2);
2848}
2849
2850static void sinterGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
2851 dict **dv = zmalloc(sizeof(dict*)*setsnum);
2852 dictIterator *di;
2853 dictEntry *de;
2854 robj *lenobj = NULL, *dstset = NULL;
2855 int j, cardinality = 0;
2856
2857 if (!dv) oom("sinterCommand");
2858 for (j = 0; j < setsnum; j++) {
2859 robj *setobj;
2860
2861 setobj = dstkey ?
2862 lookupKeyWrite(c->db,setskeys[j]) :
2863 lookupKeyRead(c->db,setskeys[j]);
2864 if (!setobj) {
2865 zfree(dv);
2866 if (dstkey) {
2867 deleteKey(c->db,dstkey);
2868 addReply(c,shared.ok);
2869 } else {
2870 addReply(c,shared.nullmultibulk);
2871 }
2872 return;
2873 }
2874 if (setobj->type != REDIS_SET) {
2875 zfree(dv);
2876 addReply(c,shared.wrongtypeerr);
2877 return;
2878 }
2879 dv[j] = setobj->ptr;
2880 }
2881 /* Sort sets from the smallest to largest, this will improve our
2882 * algorithm's performace */
2883 qsort(dv,setsnum,sizeof(dict*),qsortCompareSetsByCardinality);
2884
2885 /* The first thing we should output is the total number of elements...
2886 * since this is a multi-bulk write, but at this stage we don't know
2887 * the intersection set size, so we use a trick, append an empty object
2888 * to the output list and save the pointer to later modify it with the
2889 * right length */
2890 if (!dstkey) {
2891 lenobj = createObject(REDIS_STRING,NULL);
2892 addReply(c,lenobj);
2893 decrRefCount(lenobj);
2894 } else {
2895 /* If we have a target key where to store the resulting set
2896 * create this key with an empty set inside */
2897 dstset = createSetObject();
2898 deleteKey(c->db,dstkey);
2899 dictAdd(c->db->dict,dstkey,dstset);
2900 incrRefCount(dstkey);
2901 }
2902
2903 /* Iterate all the elements of the first (smallest) set, and test
2904 * the element against all the other sets, if at least one set does
2905 * not include the element it is discarded */
2906 di = dictGetIterator(dv[0]);
2907 if (!di) oom("dictGetIterator");
2908
2909 while((de = dictNext(di)) != NULL) {
2910 robj *ele;
2911
2912 for (j = 1; j < setsnum; j++)
2913 if (dictFind(dv[j],dictGetEntryKey(de)) == NULL) break;
2914 if (j != setsnum)
2915 continue; /* at least one set does not contain the member */
2916 ele = dictGetEntryKey(de);
2917 if (!dstkey) {
2918 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(ele->ptr)));
2919 addReply(c,ele);
2920 addReply(c,shared.crlf);
2921 cardinality++;
2922 } else {
2923 dictAdd(dstset->ptr,ele,NULL);
2924 incrRefCount(ele);
2925 }
2926 }
2927 dictReleaseIterator(di);
2928
2929 if (!dstkey) {
2930 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
2931 } else {
2932 addReply(c,shared.ok);
2933 server.dirty++;
2934 }
2935 zfree(dv);
2936}
2937
2938static void sinterCommand(redisClient *c) {
2939 sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
2940}
2941
2942static void sinterstoreCommand(redisClient *c) {
2943 sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
2944}
2945
2946static void sunionGenericCommand(redisClient *c, robj **setskeys, int setsnum, robj *dstkey) {
2947 dict **dv = zmalloc(sizeof(dict*)*setsnum);
2948 dictIterator *di;
2949 dictEntry *de;
2950 robj *lenobj = NULL, *dstset = NULL;
2951 int j, cardinality = 0;
2952
2953 if (!dv) oom("sunionCommand");
2954 for (j = 0; j < setsnum; j++) {
2955 robj *setobj;
2956
2957 setobj = dstkey ?
2958 lookupKeyWrite(c->db,setskeys[j]) :
2959 lookupKeyRead(c->db,setskeys[j]);
2960 if (!setobj) {
2961 dv[j] = NULL;
2962 continue;
2963 }
2964 if (setobj->type != REDIS_SET) {
2965 zfree(dv);
2966 addReply(c,shared.wrongtypeerr);
2967 return;
2968 }
2969 dv[j] = setobj->ptr;
2970 }
2971
2972 /* We need a temp set object to store our union. If the dstkey
2973 * is not NULL (that is, we are inside an SUNIONSTORE operation) then
2974 * this set object will be the resulting object to set into the target key*/
2975 dstset = createSetObject();
2976
2977 /* The first thing we should output is the total number of elements...
2978 * since this is a multi-bulk write, but at this stage we don't know
2979 * the intersection set size, so we use a trick, append an empty object
2980 * to the output list and save the pointer to later modify it with the
2981 * right length */
2982 if (!dstkey) {
2983 lenobj = createObject(REDIS_STRING,NULL);
2984 addReply(c,lenobj);
2985 decrRefCount(lenobj);
2986 } else {
2987 /* If we have a target key where to store the resulting set
2988 * create this key with an empty set inside */
2989 deleteKey(c->db,dstkey);
2990 dictAdd(c->db->dict,dstkey,dstset);
2991 incrRefCount(dstkey);
2992 server.dirty++;
2993 }
2994
2995 /* Iterate all the elements of all the sets, add every element a single
2996 * time to the result set */
2997 for (j = 0; j < setsnum; j++) {
2998 if (!dv[j]) continue; /* non existing keys are like empty sets */
2999
3000 di = dictGetIterator(dv[j]);
3001 if (!di) oom("dictGetIterator");
3002
3003 while((de = dictNext(di)) != NULL) {
3004 robj *ele;
3005
3006 /* dictAdd will not add the same element multiple times */
3007 ele = dictGetEntryKey(de);
3008 if (dictAdd(dstset->ptr,ele,NULL) == DICT_OK) {
3009 incrRefCount(ele);
3010 if (!dstkey) {
3011 addReplySds(c,sdscatprintf(sdsempty(),
3012 "$%d\r\n",sdslen(ele->ptr)));
3013 addReply(c,ele);
3014 addReply(c,shared.crlf);
3015 cardinality++;
3016 }
3017 }
3018 }
3019 dictReleaseIterator(di);
3020 }
3021
3022 if (!dstkey) {
3023 lenobj->ptr = sdscatprintf(sdsempty(),"*%d\r\n",cardinality);
3024 decrRefCount(dstset);
3025 } else {
3026 addReply(c,shared.ok);
3027 server.dirty++;
3028 }
3029 zfree(dv);
3030}
3031
3032static void sunionCommand(redisClient *c) {
3033 sunionGenericCommand(c,c->argv+1,c->argc-1,NULL);
3034}
3035
3036static void sunionstoreCommand(redisClient *c) {
3037 sunionGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
3038}
3039
3040static void flushdbCommand(redisClient *c) {
3041 dictEmpty(c->db->dict);
3042 dictEmpty(c->db->expires);
3043 server.dirty++;
3044 addReply(c,shared.ok);
3045 rdbSave(server.dbfilename);
3046}
3047
3048static void flushallCommand(redisClient *c) {
3049 emptyDb();
3050 server.dirty++;
3051 addReply(c,shared.ok);
3052 rdbSave(server.dbfilename);
3053}
3054
3055redisSortOperation *createSortOperation(int type, robj *pattern) {
3056 redisSortOperation *so = zmalloc(sizeof(*so));
3057 if (!so) oom("createSortOperation");
3058 so->type = type;
3059 so->pattern = pattern;
3060 return so;
3061}
3062
3063/* Return the value associated to the key with a name obtained
3064 * substituting the first occurence of '*' in 'pattern' with 'subst' */
3065robj *lookupKeyByPattern(redisDb *db, robj *pattern, robj *subst) {
3066 char *p;
3067 sds spat, ssub;
3068 robj keyobj;
3069 int prefixlen, sublen, postfixlen;
3070 /* Expoit the internal sds representation to create a sds string allocated on the stack in order to make this function faster */
3071 struct {
3072 long len;
3073 long free;
3074 char buf[REDIS_SORTKEY_MAX+1];
3075 } keyname;
3076
3077 spat = pattern->ptr;
3078 ssub = subst->ptr;
3079 if (sdslen(spat)+sdslen(ssub)-1 > REDIS_SORTKEY_MAX) return NULL;
3080 p = strchr(spat,'*');
3081 if (!p) return NULL;
3082
3083 prefixlen = p-spat;
3084 sublen = sdslen(ssub);
3085 postfixlen = sdslen(spat)-(prefixlen+1);
3086 memcpy(keyname.buf,spat,prefixlen);
3087 memcpy(keyname.buf+prefixlen,ssub,sublen);
3088 memcpy(keyname.buf+prefixlen+sublen,p+1,postfixlen);
3089 keyname.buf[prefixlen+sublen+postfixlen] = '\0';
3090 keyname.len = prefixlen+sublen+postfixlen;
3091
3092 keyobj.refcount = 1;
3093 keyobj.type = REDIS_STRING;
3094 keyobj.ptr = ((char*)&keyname)+(sizeof(long)*2);
3095
3096 /* printf("lookup '%s' => %p\n", keyname.buf,de); */
3097 return lookupKeyRead(db,&keyobj);
3098}
3099
3100/* sortCompare() is used by qsort in sortCommand(). Given that qsort_r with
3101 * the additional parameter is not standard but a BSD-specific we have to
3102 * pass sorting parameters via the global 'server' structure */
3103static int sortCompare(const void *s1, const void *s2) {
3104 const redisSortObject *so1 = s1, *so2 = s2;
3105 int cmp;
3106
3107 if (!server.sort_alpha) {
3108 /* Numeric sorting. Here it's trivial as we precomputed scores */
3109 if (so1->u.score > so2->u.score) {
3110 cmp = 1;
3111 } else if (so1->u.score < so2->u.score) {
3112 cmp = -1;
3113 } else {
3114 cmp = 0;
3115 }
3116 } else {
3117 /* Alphanumeric sorting */
3118 if (server.sort_bypattern) {
3119 if (!so1->u.cmpobj || !so2->u.cmpobj) {
3120 /* At least one compare object is NULL */
3121 if (so1->u.cmpobj == so2->u.cmpobj)
3122 cmp = 0;
3123 else if (so1->u.cmpobj == NULL)
3124 cmp = -1;
3125 else
3126 cmp = 1;
3127 } else {
3128 /* We have both the objects, use strcoll */
3129 cmp = strcoll(so1->u.cmpobj->ptr,so2->u.cmpobj->ptr);
3130 }
3131 } else {
3132 /* Compare elements directly */
3133 cmp = strcoll(so1->obj->ptr,so2->obj->ptr);
3134 }
3135 }
3136 return server.sort_desc ? -cmp : cmp;
3137}
3138
3139/* The SORT command is the most complex command in Redis. Warning: this code
3140 * is optimized for speed and a bit less for readability */
3141static void sortCommand(redisClient *c) {
3142 list *operations;
3143 int outputlen = 0;
3144 int desc = 0, alpha = 0;
3145 int limit_start = 0, limit_count = -1, start, end;
3146 int j, dontsort = 0, vectorlen;
3147 int getop = 0; /* GET operation counter */
3148 robj *sortval, *sortby = NULL;
3149 redisSortObject *vector; /* Resulting vector to sort */
3150
3151 /* Lookup the key to sort. It must be of the right types */
3152 sortval = lookupKeyRead(c->db,c->argv[1]);
3153 if (sortval == NULL) {
3154 addReply(c,shared.nokeyerr);
3155 return;
3156 }
3157 if (sortval->type != REDIS_SET && sortval->type != REDIS_LIST) {
3158 addReply(c,shared.wrongtypeerr);
3159 return;
3160 }
3161
3162 /* Create a list of operations to perform for every sorted element.
3163 * Operations can be GET/DEL/INCR/DECR */
3164 operations = listCreate();
3165 listSetFreeMethod(operations,zfree);
3166 j = 2;
3167
3168 /* Now we need to protect sortval incrementing its count, in the future
3169 * SORT may have options able to overwrite/delete keys during the sorting
3170 * and the sorted key itself may get destroied */
3171 incrRefCount(sortval);
3172
3173 /* The SORT command has an SQL-alike syntax, parse it */
3174 while(j < c->argc) {
3175 int leftargs = c->argc-j-1;
3176 if (!strcasecmp(c->argv[j]->ptr,"asc")) {
3177 desc = 0;
3178 } else if (!strcasecmp(c->argv[j]->ptr,"desc")) {
3179 desc = 1;
3180 } else if (!strcasecmp(c->argv[j]->ptr,"alpha")) {
3181 alpha = 1;
3182 } else if (!strcasecmp(c->argv[j]->ptr,"limit") && leftargs >= 2) {
3183 limit_start = atoi(c->argv[j+1]->ptr);
3184 limit_count = atoi(c->argv[j+2]->ptr);
3185 j+=2;
3186 } else if (!strcasecmp(c->argv[j]->ptr,"by") && leftargs >= 1) {
3187 sortby = c->argv[j+1];
3188 /* If the BY pattern does not contain '*', i.e. it is constant,
3189 * we don't need to sort nor to lookup the weight keys. */
3190 if (strchr(c->argv[j+1]->ptr,'*') == NULL) dontsort = 1;
3191 j++;
3192 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3193 listAddNodeTail(operations,createSortOperation(
3194 REDIS_SORT_GET,c->argv[j+1]));
3195 getop++;
3196 j++;
3197 } else if (!strcasecmp(c->argv[j]->ptr,"del") && leftargs >= 1) {
3198 listAddNodeTail(operations,createSortOperation(
3199 REDIS_SORT_DEL,c->argv[j+1]));
3200 j++;
3201 } else if (!strcasecmp(c->argv[j]->ptr,"incr") && leftargs >= 1) {
3202 listAddNodeTail(operations,createSortOperation(
3203 REDIS_SORT_INCR,c->argv[j+1]));
3204 j++;
3205 } else if (!strcasecmp(c->argv[j]->ptr,"get") && leftargs >= 1) {
3206 listAddNodeTail(operations,createSortOperation(
3207 REDIS_SORT_DECR,c->argv[j+1]));
3208 j++;
3209 } else {
3210 decrRefCount(sortval);
3211 listRelease(operations);
3212 addReply(c,shared.syntaxerr);
3213 return;
3214 }
3215 j++;
3216 }
3217
3218 /* Load the sorting vector with all the objects to sort */
3219 vectorlen = (sortval->type == REDIS_LIST) ?
3220 listLength((list*)sortval->ptr) :
3221 dictSize((dict*)sortval->ptr);
3222 vector = zmalloc(sizeof(redisSortObject)*vectorlen);
3223 if (!vector) oom("allocating objects vector for SORT");
3224 j = 0;
3225 if (sortval->type == REDIS_LIST) {
3226 list *list = sortval->ptr;
3227 listNode *ln = list->head;
3228 while(ln) {
3229 robj *ele = ln->value;
3230 vector[j].obj = ele;
3231 vector[j].u.score = 0;
3232 vector[j].u.cmpobj = NULL;
3233 ln = ln->next;
3234 j++;
3235 }
3236 } else {
3237 dict *set = sortval->ptr;
3238 dictIterator *di;
3239 dictEntry *setele;
3240
3241 di = dictGetIterator(set);
3242 if (!di) oom("dictGetIterator");
3243 while((setele = dictNext(di)) != NULL) {
3244 vector[j].obj = dictGetEntryKey(setele);
3245 vector[j].u.score = 0;
3246 vector[j].u.cmpobj = NULL;
3247 j++;
3248 }
3249 dictReleaseIterator(di);
3250 }
3251 assert(j == vectorlen);
3252
3253 /* Now it's time to load the right scores in the sorting vector */
3254 if (dontsort == 0) {
3255 for (j = 0; j < vectorlen; j++) {
3256 if (sortby) {
3257 robj *byval;
3258
3259 byval = lookupKeyByPattern(c->db,sortby,vector[j].obj);
3260 if (!byval || byval->type != REDIS_STRING) continue;
3261 if (alpha) {
3262 vector[j].u.cmpobj = byval;
3263 incrRefCount(byval);
3264 } else {
3265 vector[j].u.score = strtod(byval->ptr,NULL);
3266 }
3267 } else {
3268 if (!alpha) vector[j].u.score = strtod(vector[j].obj->ptr,NULL);
3269 }
3270 }
3271 }
3272
3273 /* We are ready to sort the vector... perform a bit of sanity check
3274 * on the LIMIT option too. We'll use a partial version of quicksort. */
3275 start = (limit_start < 0) ? 0 : limit_start;
3276 end = (limit_count < 0) ? vectorlen-1 : start+limit_count-1;
3277 if (start >= vectorlen) {
3278 start = vectorlen-1;
3279 end = vectorlen-2;
3280 }
3281 if (end >= vectorlen) end = vectorlen-1;
3282
3283 if (dontsort == 0) {
3284 server.sort_desc = desc;
3285 server.sort_alpha = alpha;
3286 server.sort_bypattern = sortby ? 1 : 0;
3287 qsort(vector,vectorlen,sizeof(redisSortObject),sortCompare);
3288 }
3289
3290 /* Send command output to the output buffer, performing the specified
3291 * GET/DEL/INCR/DECR operations if any. */
3292 outputlen = getop ? getop*(end-start+1) : end-start+1;
3293 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",outputlen));
3294 for (j = start; j <= end; j++) {
3295 listNode *ln = operations->head;
3296 if (!getop) {
3297 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3298 sdslen(vector[j].obj->ptr)));
3299 addReply(c,vector[j].obj);
3300 addReply(c,shared.crlf);
3301 }
3302 while(ln) {
3303 redisSortOperation *sop = ln->value;
3304 robj *val = lookupKeyByPattern(c->db,sop->pattern,
3305 vector[j].obj);
3306
3307 if (sop->type == REDIS_SORT_GET) {
3308 if (!val || val->type != REDIS_STRING) {
3309 addReply(c,shared.nullbulk);
3310 } else {
3311 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",
3312 sdslen(val->ptr)));
3313 addReply(c,val);
3314 addReply(c,shared.crlf);
3315 }
3316 } else if (sop->type == REDIS_SORT_DEL) {
3317 /* TODO */
3318 }
3319 ln = ln->next;
3320 }
3321 }
3322
3323 /* Cleanup */
3324 decrRefCount(sortval);
3325 listRelease(operations);
3326 for (j = 0; j < vectorlen; j++) {
3327 if (sortby && alpha && vector[j].u.cmpobj)
3328 decrRefCount(vector[j].u.cmpobj);
3329 }
3330 zfree(vector);
3331}
3332
3333static void infoCommand(redisClient *c) {
3334 sds info;
3335 time_t uptime = time(NULL)-server.stat_starttime;
3336
3337 info = sdscatprintf(sdsempty(),
3338 "redis_version:%s\r\n"
3339 "connected_clients:%d\r\n"
3340 "connected_slaves:%d\r\n"
3341 "used_memory:%zu\r\n"
3342 "changes_since_last_save:%lld\r\n"
3343 "last_save_time:%d\r\n"
3344 "total_connections_received:%lld\r\n"
3345 "total_commands_processed:%lld\r\n"
3346 "uptime_in_seconds:%d\r\n"
3347 "uptime_in_days:%d\r\n"
3348 ,REDIS_VERSION,
3349 listLength(server.clients)-listLength(server.slaves),
3350 listLength(server.slaves),
3351 server.usedmemory,
3352 server.dirty,
3353 server.lastsave,
3354 server.stat_numconnections,
3355 server.stat_numcommands,
3356 uptime,
3357 uptime/(3600*24)
3358 );
3359 addReplySds(c,sdscatprintf(sdsempty(),"$%d\r\n",sdslen(info)));
3360 addReplySds(c,info);
3361 addReply(c,shared.crlf);
3362}
3363
3364static void monitorCommand(redisClient *c) {
3365 /* ignore MONITOR if aleady slave or in monitor mode */
3366 if (c->flags & REDIS_SLAVE) return;
3367
3368 c->flags |= (REDIS_SLAVE|REDIS_MONITOR);
3369 c->slaveseldb = 0;
3370 if (!listAddNodeTail(server.monitors,c)) oom("listAddNodeTail");
3371 addReply(c,shared.ok);
3372}
3373
3374/* ================================= Expire ================================= */
3375static int removeExpire(redisDb *db, robj *key) {
3376 if (dictDelete(db->expires,key) == DICT_OK) {
3377 return 1;
3378 } else {
3379 return 0;
3380 }
3381}
3382
3383static int setExpire(redisDb *db, robj *key, time_t when) {
3384 if (dictAdd(db->expires,key,(void*)when) == DICT_ERR) {
3385 return 0;
3386 } else {
3387 incrRefCount(key);
3388 return 1;
3389 }
3390}
3391
3392/* Return the expire time of the specified key, or -1 if no expire
3393 * is associated with this key (i.e. the key is non volatile) */
3394static time_t getExpire(redisDb *db, robj *key) {
3395 dictEntry *de;
3396
3397 /* No expire? return ASAP */
3398 if (dictSize(db->expires) == 0 ||
3399 (de = dictFind(db->expires,key)) == NULL) return -1;
3400
3401 return (time_t) dictGetEntryVal(de);
3402}
3403
3404static int expireIfNeeded(redisDb *db, robj *key) {
3405 time_t when;
3406 dictEntry *de;
3407
3408 /* No expire? return ASAP */
3409 if (dictSize(db->expires) == 0 ||
3410 (de = dictFind(db->expires,key)) == NULL) return 0;
3411
3412 /* Lookup the expire */
3413 when = (time_t) dictGetEntryVal(de);
3414 if (time(NULL) <= when) return 0;
3415
3416 /* Delete the key */
3417 dictDelete(db->expires,key);
3418 return dictDelete(db->dict,key) == DICT_OK;
3419}
3420
3421static int deleteIfVolatile(redisDb *db, robj *key) {
3422 dictEntry *de;
3423
3424 /* No expire? return ASAP */
3425 if (dictSize(db->expires) == 0 ||
3426 (de = dictFind(db->expires,key)) == NULL) return 0;
3427
3428 /* Delete the key */
3429 server.dirty++;
3430 dictDelete(db->expires,key);
3431 return dictDelete(db->dict,key) == DICT_OK;
3432}
3433
3434static void expireCommand(redisClient *c) {
3435 dictEntry *de;
3436 int seconds = atoi(c->argv[2]->ptr);
3437
3438 de = dictFind(c->db->dict,c->argv[1]);
3439 if (de == NULL) {
3440 addReply(c,shared.czero);
3441 return;
3442 }
3443 if (seconds <= 0) {
3444 addReply(c, shared.czero);
3445 return;
3446 } else {
3447 time_t when = time(NULL)+seconds;
3448 if (setExpire(c->db,c->argv[1],when))
3449 addReply(c,shared.cone);
3450 else
3451 addReply(c,shared.czero);
3452 return;
3453 }
3454}
3455
3456/* =============================== Replication ============================= */
3457
3458/* Send the whole output buffer syncronously to the slave. This a general operation in theory, but it is actually useful only for replication. */
3459static int flushClientOutput(redisClient *c) {
3460 int retval;
3461 time_t start = time(NULL);
3462
3463 while(listLength(c->reply)) {
3464 if (time(NULL)-start > 5) return REDIS_ERR; /* 5 seconds timeout */
3465 retval = aeWait(c->fd,AE_WRITABLE,1000);
3466 if (retval == -1) {
3467 return REDIS_ERR;
3468 } else if (retval & AE_WRITABLE) {
3469 sendReplyToClient(NULL, c->fd, c, AE_WRITABLE);
3470 }
3471 }
3472 return REDIS_OK;
3473}
3474
3475static int syncWrite(int fd, char *ptr, ssize_t size, int timeout) {
3476 ssize_t nwritten, ret = size;
3477 time_t start = time(NULL);
3478
3479 timeout++;
3480 while(size) {
3481 if (aeWait(fd,AE_WRITABLE,1000) & AE_WRITABLE) {
3482 nwritten = write(fd,ptr,size);
3483 if (nwritten == -1) return -1;
3484 ptr += nwritten;
3485 size -= nwritten;
3486 }
3487 if ((time(NULL)-start) > timeout) {
3488 errno = ETIMEDOUT;
3489 return -1;
3490 }
3491 }
3492 return ret;
3493}
3494
3495static int syncRead(int fd, char *ptr, ssize_t size, int timeout) {
3496 ssize_t nread, totread = 0;
3497 time_t start = time(NULL);
3498
3499 timeout++;
3500 while(size) {
3501 if (aeWait(fd,AE_READABLE,1000) & AE_READABLE) {
3502 nread = read(fd,ptr,size);
3503 if (nread == -1) return -1;
3504 ptr += nread;
3505 size -= nread;
3506 totread += nread;
3507 }
3508 if ((time(NULL)-start) > timeout) {
3509 errno = ETIMEDOUT;
3510 return -1;
3511 }
3512 }
3513 return totread;
3514}
3515
3516static int syncReadLine(int fd, char *ptr, ssize_t size, int timeout) {
3517 ssize_t nread = 0;
3518
3519 size--;
3520 while(size) {
3521 char c;
3522
3523 if (syncRead(fd,&c,1,timeout) == -1) return -1;
3524 if (c == '\n') {
3525 *ptr = '\0';
3526 if (nread && *(ptr-1) == '\r') *(ptr-1) = '\0';
3527 return nread;
3528 } else {
3529 *ptr++ = c;
3530 *ptr = '\0';
3531 nread++;
3532 }
3533 }
3534 return nread;
3535}
3536
3537static void syncCommand(redisClient *c) {
3538 /* ignore SYNC if aleady slave or in monitor mode */
3539 if (c->flags & REDIS_SLAVE) return;
3540
3541 /* SYNC can't be issued when the server has pending data to send to
3542 * the client about already issued commands. We need a fresh reply
3543 * buffer registering the differences between the BGSAVE and the current
3544 * dataset, so that we can copy to other slaves if needed. */
3545 if (listLength(c->reply) != 0) {
3546 addReplySds(c,sdsnew("-ERR SYNC is invalid with pending input\r\n"));
3547 return;
3548 }
3549
3550 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3551 /* Here we need to check if there is a background saving operation
3552 * in progress, or if it is required to start one */
3553 if (server.bgsaveinprogress) {
3554 /* Ok a background save is in progress. Let's check if it is a good
3555 * one for replication, i.e. if there is another slave that is
3556 * registering differences since the server forked to save */
3557 redisClient *slave;
3558 listNode *ln;
3559
3560 ln = server.slaves->head;
3561 while(ln) {
3562 slave = ln->value;
3563 if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;
3564 ln = ln->next;
3565 }
3566 if (ln) {
3567 /* Perfect, the server is already registering differences for
3568 * another slave. Set the right state, and copy the buffer. */
3569 listRelease(c->reply);
3570 c->reply = listDup(slave->reply);
3571 if (!c->reply) oom("listDup copying slave reply list");
3572 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3573 redisLog(REDIS_NOTICE,"Waiting for end of BGSAVE for SYNC");
3574 } else {
3575 /* No way, we need to wait for the next BGSAVE in order to
3576 * register differences */
3577 c->replstate = REDIS_REPL_WAIT_BGSAVE_START;
3578 redisLog(REDIS_NOTICE,"Waiting for next BGSAVE for SYNC");
3579 }
3580 } else {
3581 /* Ok we don't have a BGSAVE in progress, let's start one */
3582 redisLog(REDIS_NOTICE,"Starting BGSAVE for SYNC");
3583 if (rdbSaveBackground(server.dbfilename) != REDIS_OK) {
3584 redisLog(REDIS_NOTICE,"Replication failed, can't BGSAVE");
3585 addReplySds(c,sdsnew("-ERR Unalbe to perform background save\r\n"));
3586 return;
3587 }
3588 c->replstate = REDIS_REPL_WAIT_BGSAVE_END;
3589 }
3590 c->flags |= REDIS_SLAVE;
3591 c->slaveseldb = 0;
3592 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3593 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3594 return;
3595}
3596
3597#if 0
3598static void _syncCommand(redisClient *c) {
3599 struct stat sb;
3600 int fd = -1, len;
3601 time_t start = time(NULL);
3602 char sizebuf[32];
3603
3604 /* ignore SYNC if aleady slave or in monitor mode */
3605 if (c->flags & REDIS_SLAVE) return;
3606
3607 redisLog(REDIS_NOTICE,"Slave ask for synchronization");
3608 if (flushClientOutput(c) == REDIS_ERR ||
3609 rdbSave(server.dbfilename) != REDIS_OK)
3610 goto closeconn;
3611
3612 fd = open(server.dbfilename, O_RDONLY);
3613 if (fd == -1 || fstat(fd,&sb) == -1) goto closeconn;
3614 len = sb.st_size;
3615
3616 snprintf(sizebuf,32,"$%d\r\n",len);
3617 if (syncWrite(c->fd,sizebuf,strlen(sizebuf),5) == -1) goto closeconn;
3618 while(len) {
3619 char buf[1024];
3620 int nread;
3621
3622 if (time(NULL)-start > REDIS_MAX_SYNC_TIME) goto closeconn;
3623 nread = read(fd,buf,1024);
3624 if (nread == -1) goto closeconn;
3625 len -= nread;
3626 if (syncWrite(c->fd,buf,nread,5) == -1) goto closeconn;
3627 }
3628 if (syncWrite(c->fd,"\r\n",2,5) == -1) goto closeconn;
3629 close(fd);
3630 c->flags |= REDIS_SLAVE;
3631 c->slaveseldb = 0;
3632 if (!listAddNodeTail(server.slaves,c)) oom("listAddNodeTail");
3633 redisLog(REDIS_NOTICE,"Synchronization with slave succeeded");
3634 return;
3635
3636closeconn:
3637 if (fd != -1) close(fd);
3638 c->flags |= REDIS_CLOSE;
3639 redisLog(REDIS_WARNING,"Synchronization with slave failed");
3640 return;
3641}
3642#endif
3643
3644static int syncWithMaster(void) {
3645 char buf[1024], tmpfile[256];
3646 int dumpsize;
3647 int fd = anetTcpConnect(NULL,server.masterhost,server.masterport);
3648 int dfd;
3649
3650 if (fd == -1) {
3651 redisLog(REDIS_WARNING,"Unable to connect to MASTER: %s",
3652 strerror(errno));
3653 return REDIS_ERR;
3654 }
3655 /* Issue the SYNC command */
3656 if (syncWrite(fd,"SYNC \r\n",7,5) == -1) {
3657 close(fd);
3658 redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",
3659 strerror(errno));
3660 return REDIS_ERR;
3661 }
3662 /* Read the bulk write count */
3663 if (syncReadLine(fd,buf,1024,5) == -1) {
3664 close(fd);
3665 redisLog(REDIS_WARNING,"I/O error reading bulk count from MASTER: %s",
3666 strerror(errno));
3667 return REDIS_ERR;
3668 }
3669 dumpsize = atoi(buf+1);
3670 redisLog(REDIS_NOTICE,"Receiving %d bytes data dump from MASTER",dumpsize);
3671 /* Read the bulk write data on a temp file */
3672 snprintf(tmpfile,256,"temp-%d.%ld.rdb",(int)time(NULL),(long int)random());
3673 dfd = open(tmpfile,O_CREAT|O_WRONLY,0644);
3674 if (dfd == -1) {
3675 close(fd);
3676 redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));
3677 return REDIS_ERR;
3678 }
3679 while(dumpsize) {
3680 int nread, nwritten;
3681
3682 nread = read(fd,buf,(dumpsize < 1024)?dumpsize:1024);
3683 if (nread == -1) {
3684 redisLog(REDIS_WARNING,"I/O error trying to sync with MASTER: %s",
3685 strerror(errno));
3686 close(fd);
3687 close(dfd);
3688 return REDIS_ERR;
3689 }
3690 nwritten = write(dfd,buf,nread);
3691 if (nwritten == -1) {
3692 redisLog(REDIS_WARNING,"Write error writing to the DB dump file needed for MASTER <-> SLAVE synchrnonization: %s", strerror(errno));
3693 close(fd);
3694 close(dfd);
3695 return REDIS_ERR;
3696 }
3697 dumpsize -= nread;
3698 }
3699 close(dfd);
3700 if (rename(tmpfile,server.dbfilename) == -1) {
3701 redisLog(REDIS_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> SLAVE synchronization: %s", strerror(errno));
3702 unlink(tmpfile);
3703 close(fd);
3704 return REDIS_ERR;
3705 }
3706 emptyDb();
3707 if (rdbLoad(server.dbfilename) != REDIS_OK) {
3708 redisLog(REDIS_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
3709 close(fd);
3710 return REDIS_ERR;
3711 }
3712 server.master = createClient(fd);
3713 server.master->flags |= REDIS_MASTER;
3714 server.replstate = REDIS_REPL_CONNECTED;
3715 return REDIS_OK;
3716}
3717
3718/* =================================== Main! ================================ */
3719
3720static void daemonize(void) {
3721 int fd;
3722 FILE *fp;
3723
3724 if (fork() != 0) exit(0); /* parent exits */
3725 setsid(); /* create a new session */
3726
3727 /* Every output goes to /dev/null. If Redis is daemonized but
3728 * the 'logfile' is set to 'stdout' in the configuration file
3729 * it will not log at all. */
3730 if ((fd = open("/dev/null", O_RDWR, 0)) != -1) {
3731 dup2(fd, STDIN_FILENO);
3732 dup2(fd, STDOUT_FILENO);
3733 dup2(fd, STDERR_FILENO);
3734 if (fd > STDERR_FILENO) close(fd);
3735 }
3736 /* Try to write the pid file */
3737 fp = fopen(server.pidfile,"w");
3738 if (fp) {
3739 fprintf(fp,"%d\n",getpid());
3740 fclose(fp);
3741 }
3742}
3743
3744int main(int argc, char **argv) {
3745 initServerConfig();
3746 if (argc == 2) {
3747 ResetServerSaveParams();
3748 loadServerConfig(argv[1]);
3749 } else if (argc > 2) {
3750 fprintf(stderr,"Usage: ./redis-server [/path/to/redis.conf]\n");
3751 exit(1);
3752 }
3753 initServer();
3754 if (server.daemonize) daemonize();
3755 redisLog(REDIS_NOTICE,"Server started, Redis version " REDIS_VERSION);
3756 if (rdbLoad(server.dbfilename) == REDIS_OK)
3757 redisLog(REDIS_NOTICE,"DB loaded from disk");
3758 if (aeCreateFileEvent(server.el, server.fd, AE_READABLE,
3759 acceptHandler, NULL, NULL) == AE_ERR) oom("creating file event");
3760 redisLog(REDIS_NOTICE,"The server is now ready to accept connections on port %d", server.port);
3761 aeMain(server.el);
3762 aeDeleteEventLoop(server.el);
3763 return 0;
3764}