{"setnx",setnxCommand,3,REDIS_CMD_DENYOOM,NULL,0,0,0},
{"setex",setexCommand,4,REDIS_CMD_DENYOOM,NULL,0,0,0},
{"append",appendCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1},
- {"substr",substrCommand,4,0,NULL,1,1,1},
{"strlen",strlenCommand,2,0,NULL,1,1,1},
{"del",delCommand,-2,0,NULL,0,0,0},
{"exists",existsCommand,2,0,NULL,1,1,1},
+ {"setbit",setbitCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1},
+ {"getbit",getbitCommand,3,0,NULL,1,1,1},
+ {"setrange",setrangeCommand,4,REDIS_CMD_DENYOOM,NULL,1,1,1},
+ {"getrange",getrangeCommand,4,0,NULL,1,1,1},
+ {"substr",getrangeCommand,4,0,NULL,1,1,1},
{"incr",incrCommand,2,REDIS_CMD_DENYOOM,NULL,1,1,1},
{"decr",decrCommand,2,REDIS_CMD_DENYOOM,NULL,1,1,1},
{"mget",mgetCommand,-2,0,NULL,1,-1,1},
{"rpop",rpopCommand,2,0,NULL,1,1,1},
{"lpop",lpopCommand,2,0,NULL,1,1,1},
{"brpop",brpopCommand,-3,0,NULL,1,1,1},
+ {"brpoplpush",brpoplpushCommand,4,REDIS_CMD_DENYOOM,NULL,1,2,1},
{"blpop",blpopCommand,-3,0,NULL,1,1,1},
{"llen",llenCommand,2,0,NULL,1,1,1},
{"lindex",lindexCommand,3,0,NULL,1,1,1},
{"lrange",lrangeCommand,4,0,NULL,1,1,1},
{"ltrim",ltrimCommand,4,0,NULL,1,1,1},
{"lrem",lremCommand,4,0,NULL,1,1,1},
- {"rpoplpush",rpoplpushcommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1},
+ {"rpoplpush",rpoplpushCommand,3,REDIS_CMD_DENYOOM,NULL,1,2,1},
{"sadd",saddCommand,3,REDIS_CMD_DENYOOM,NULL,1,1,1},
{"srem",sremCommand,3,0,NULL,1,1,1},
{"smove",smoveCommand,4,0,NULL,1,2,1},
/*============================ Utility functions ============================ */
void redisLog(int level, const char *fmt, ...) {
+ const int syslogLevelMap[] = { LOG_DEBUG, LOG_INFO, LOG_NOTICE, LOG_WARNING };
+ const char *c = ".-*#";
+ time_t now = time(NULL);
va_list ap;
FILE *fp;
- char *c = ".-*#";
char buf[64];
- time_t now;
+ char msg[REDIS_MAX_LOGMSG_LEN];
if (level < server.verbosity) return;
if (!fp) return;
va_start(ap, fmt);
- now = time(NULL);
- strftime(buf,64,"%d %b %H:%M:%S",localtime(&now));
- fprintf(fp,"[%d] %s %c ",(int)getpid(),buf,c[level]);
- vfprintf(fp, fmt, ap);
- fprintf(fp,"\n");
- fflush(fp);
+ vsnprintf(msg, sizeof(msg), fmt, ap);
va_end(ap);
+ strftime(buf,sizeof(buf),"%d %b %H:%M:%S",localtime(&now));
+ fprintf(fp,"[%d] %s %c %s\n",(int)getpid(),buf,c[level],msg);
+ fflush(fp);
+
if (server.logfile) fclose(fp);
+
+ if (server.syslog_enabled) syslog(syslogLevelMap[level], "%s", msg);
}
/* Redis generally does not try to recover from out of memory conditions
}
/* Close connections of timedout clients */
- if ((server.maxidletime && !(loops % 100)) || server.blpop_blocked_clients)
+ if ((server.maxidletime && !(loops % 100)) || server.bpop_blocked_clients)
closeTimedoutClients();
/* Check if a background saving or AOF rewrite in progress terminated */
* for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
REDIS_NOTUSED(eventLoop);
+ listNode *ln;
+ redisClient *c;
/* Awake clients that got all the swapped keys they requested */
if (server.vm_enabled && listLength(server.io_ready_clients)) {
listIter li;
- listNode *ln;
listRewind(server.io_ready_clients,&li);
while((ln = listNext(&li))) {
- redisClient *c = ln->value;
+ c = ln->value;
struct redisCommand *cmd;
/* Resume the client. */
processInputBuffer(c);
}
}
+
+ /* Try to process pending commands for clients that were just unblocked. */
+ while (listLength(server.unblocked_clients)) {
+ ln = listFirst(server.unblocked_clients);
+ redisAssert(ln != NULL);
+ c = ln->value;
+ listDelNode(server.unblocked_clients,ln);
+
+ /* Process remaining data in the input buffer. */
+ if (c->querybuf && sdslen(c->querybuf) > 0)
+ processInputBuffer(c);
+ }
+
/* Write the AOF buffer on disk */
flushAppendOnlyFile();
}
server.saveparams = NULL;
server.loading = 0;
server.logfile = NULL; /* NULL = log on standard output */
+ server.syslog_enabled = 0;
+ server.syslog_ident = zstrdup("redis");
+ server.syslog_facility = LOG_LOCAL0;
server.glueoutputbuf = 1;
server.daemonize = 0;
server.appendonly = 0;
server.rdbcompression = 1;
server.activerehashing = 1;
server.maxclients = 0;
- server.blpop_blocked_clients = 0;
+ server.bpop_blocked_clients = 0;
server.maxmemory = 0;
server.maxmemory_policy = REDIS_MAXMEMORY_VOLATILE_LRU;
server.maxmemory_samples = 3;
signal(SIGPIPE, SIG_IGN);
setupSigSegvAction();
+ if (server.syslog_enabled) {
+ openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
+ server.syslog_facility);
+ }
+
server.mainthread = pthread_self();
server.clients = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
+ server.unblocked_clients = listCreate();
createSharedObjects();
server.el = aeCreateEventLoop();
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
(float)c_ru.ru_stime.tv_sec+(float)c_ru.ru_stime.tv_usec/1000000,
listLength(server.clients)-listLength(server.slaves),
listLength(server.slaves),
- server.blpop_blocked_clients,
+ server.bpop_blocked_clients,
zmalloc_used_memory(),
hmem,
zmalloc_get_rss(),
#include <errno.h>
#include <inttypes.h>
#include <pthread.h>
+ #include <syslog.h>
#include "ae.h" /* Event driven programming library */
#include "sds.h" /* Dynamic safe strings */
#define REDIS_REQUEST_MAX_SIZE (1024*1024*256) /* max bytes in inline command */
#define REDIS_SHARED_INTEGERS 10000
#define REDIS_REPLY_CHUNK_BYTES (5*1500) /* 5 TCP packets with default MTU */
+ #define REDIS_MAX_LOGMSG_LEN 1024 /* Default maximum length of syslog messages */
/* If more then REDIS_WRITEV_THRESHOLD write packets are pending use writev */
#define REDIS_WRITEV_THRESHOLD 3
int count; /* Total number of MULTI commands */
} multiState;
+typedef struct blockingState {
+ robj **keys; /* The key we are waiting to terminate a blocking
+ * operation such as BLPOP. Otherwise NULL. */
+ int count; /* Number of blocking keys */
+ time_t timeout; /* Blocking operation timeout. If UNIX current time
+ * is >= timeout then the operation timed out. */
+ robj *target; /* The key that should receive the element,
+ * for BRPOPLPUSH. */
+} blockingState;
+
/* With multiplexing we need to take per-clinet state.
* Clients are taken in a liked list. */
typedef struct redisClient {
long repldboff; /* replication DB file offset */
off_t repldbsize; /* replication DB file size */
multiState mstate; /* MULTI/EXEC state */
- robj **blocking_keys; /* The key we are waiting to terminate a blocking
- * operation such as BLPOP. Otherwise NULL. */
- int blocking_keys_num; /* Number of blocking keys */
- time_t blockingto; /* Blocking operation timeout. If UNIX current time
- * is >= blockingto then the operation timed out. */
+ blockingState bpop; /* blocking state */
list *io_keys; /* Keys this client is waiting to be loaded from the
* swap file in order to continue. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
struct saveparam *saveparams;
int saveparamslen;
char *logfile;
+ int syslog_enabled;
+ char *syslog_ident;
+ int syslog_facility;
char *dbfilename;
char *appendfilename;
char *requirepass;
int maxmemory_policy;
int maxmemory_samples;
/* Blocked clients */
- unsigned int blpop_blocked_clients;
+ unsigned int bpop_blocked_clients;
unsigned int vm_blocked_clients;
+ list *unblocked_clients;
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
int setTypeIsMember(robj *subject, robj *value);
setTypeIterator *setTypeInitIterator(robj *subject);
void setTypeReleaseIterator(setTypeIterator *si);
-robj *setTypeNext(setTypeIterator *si);
-robj *setTypeRandomElement(robj *subject);
+int setTypeNext(setTypeIterator *si, robj **objele, int64_t *llele);
+robj *setTypeNextObject(setTypeIterator *si);
+int setTypeRandomElement(robj *setobj, robj **objele, int64_t *llele);
unsigned long setTypeSize(robj *subject);
void setTypeConvert(robj *subject, int enc);
void convertToRealHash(robj *o);
void hashTypeTryConversion(robj *subject, robj **argv, int start, int end);
void hashTypeTryObjectEncoding(robj *subject, robj **o1, robj **o2);
-robj *hashTypeGet(robj *o, robj *key);
+int hashTypeGet(robj *o, robj *key, robj **objval, unsigned char **v, unsigned int *vlen);
+robj *hashTypeGetObject(robj *o, robj *key);
int hashTypeExists(robj *o, robj *key);
int hashTypeSet(robj *o, robj *key, robj *value);
int hashTypeDelete(robj *o, robj *key);
hashTypeIterator *hashTypeInitIterator(robj *subject);
void hashTypeReleaseIterator(hashTypeIterator *hi);
int hashTypeNext(hashTypeIterator *hi);
-robj *hashTypeCurrent(hashTypeIterator *hi, int what);
+int hashTypeCurrent(hashTypeIterator *hi, int what, robj **objval, unsigned char **v, unsigned int *vlen);
+robj *hashTypeCurrentObject(hashTypeIterator *hi, int what);
robj *hashTypeLookupWriteOrCreate(redisClient *c, robj *key);
/* Pub / Sub */
void getCommand(redisClient *c);
void delCommand(redisClient *c);
void existsCommand(redisClient *c);
+void setbitCommand(redisClient *c);
+void getbitCommand(redisClient *c);
+void setrangeCommand(redisClient *c);
+void getrangeCommand(redisClient *c);
void incrCommand(redisClient *c);
void decrCommand(redisClient *c);
void incrbyCommand(redisClient *c);
void flushallCommand(redisClient *c);
void sortCommand(redisClient *c);
void lremCommand(redisClient *c);
-void rpoplpushcommand(redisClient *c);
+void rpoplpushCommand(redisClient *c);
void infoCommand(redisClient *c);
void mgetCommand(redisClient *c);
void monitorCommand(redisClient *c);
void discardCommand(redisClient *c);
void blpopCommand(redisClient *c);
void brpopCommand(redisClient *c);
+void brpoplpushCommand(redisClient *c);
void appendCommand(redisClient *c);
-void substrCommand(redisClient *c);
void strlenCommand(redisClient *c);
void zrankCommand(redisClient *c);
void zrevrankCommand(redisClient *c);