FILE *fp = fopen(filename,"r");
struct redis_stat sb;
int appendonly = server.appendonly;
+ long loops = 0;
if (redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0)
return REDIS_ERR;
server.appendonly = 0;
fakeClient = createFakeClient();
+ startLoading(fp);
+
while(1) {
int argc, j;
unsigned long len;
struct redisCommand *cmd;
int force_swapout;
+ /* Serve the clients from time to time */
+ if (!(loops++ % 1000)) {
+ loadingProgress(ftello(fp));
+ aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
+ }
+
if (fgets(buf,sizeof(buf),fp) == NULL) {
if (feof(fp))
break;
fclose(fp);
freeFakeClient(fakeClient);
server.appendonly = appendonly;
+ stopLoading();
return REDIS_OK;
readerr:
#include <sys/resource.h>
#include <sys/wait.h>
#include <arpa/inet.h>
+#include <sys/stat.h>
int rdbSaveType(FILE *fp, unsigned char type) {
if (fwrite(&type,1,1,fp) == 0) return -1;
return o;
}
+/* Mark that we are loading in the global state and setup the fields
+ * needed to provide loading stats. */
+void startLoading(FILE *fp) {
+ struct stat sb;
+
+ /* Load the DB */
+ server.loading = 1;
+ server.loading_start_time = time(NULL);
+ if (fstat(fileno(fp), &sb) == -1) {
+ server.loading_total_bytes = 1; /* just to avoid division by zero */
+ } else {
+ server.loading_total_bytes = sb.st_size;
+ }
+}
+
+/* Refresh the loading progress info */
+void loadingProgress(off_t pos) {
+ server.loading_loaded_bytes = pos;
+}
+
+/* Loading finished */
+void stopLoading(void) {
+ server.loading = 0;
+}
+
int rdbLoad(char *filename) {
FILE *fp;
uint32_t dbid;
redisDb *db = server.db+0;
char buf[1024];
time_t expiretime, now = time(NULL);
+ long loops = 0;
fp = fopen(filename,"r");
if (!fp) return REDIS_ERR;
redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
return REDIS_ERR;
}
+
+ startLoading(fp);
while(1) {
robj *key, *val;
int force_swapout;
expiretime = -1;
+
+ /* Serve the clients from time to time */
+ if (!(loops++ % 1000)) {
+ loadingProgress(ftello(fp));
+ aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
+ }
+
/* Read type. */
if ((type = rdbLoadType(fp)) == -1) goto eoferr;
if (type == REDIS_EXPIRETIME) {
}
}
fclose(fp);
+ stopLoading();
return REDIS_OK;
eoferr: /* unexpected end of file is handled here with a fatal exit */
"-ERR source and destination objects are the same\r\n"));
shared.outofrangeerr = createObject(REDIS_STRING,sdsnew(
"-ERR index out of range\r\n"));
+ shared.loadingerr = createObject(REDIS_STRING,sdsnew(
+ "-LOADING Redis is loading the dataset in memory\r\n"));
shared.space = createObject(REDIS_STRING,sdsnew(" "));
shared.colon = createObject(REDIS_STRING,sdsnew(":"));
shared.plus = createObject(REDIS_STRING,sdsnew("+"));
server.verbosity = REDIS_VERBOSE;
server.maxidletime = REDIS_MAXIDLETIME;
server.saveparams = NULL;
+ server.loading = 0;
server.logfile = NULL; /* NULL = log on standard output */
server.glueoutputbuf = 1;
server.daemonize = 0;
return REDIS_OK;
}
+ /* Loading DB? Return an error if the command is not INFO */
+ if (server.loading && cmd->proc != infoCommand) {
+ addReply(c, shared.loadingerr);
+ return REDIS_OK;
+ }
+
/* Exec the command */
if (c->flags & REDIS_MULTI &&
cmd->proc != execCommand && cmd->proc != discardCommand &&
"used_memory_rss:%zu\r\n"
"mem_fragmentation_ratio:%.2f\r\n"
"use_tcmalloc:%d\r\n"
+ "loading:%d\r\n"
+ "aof_enabled:%d\r\n"
"changes_since_last_save:%lld\r\n"
"bgsave_in_progress:%d\r\n"
"last_save_time:%ld\r\n"
#else
0,
#endif
+ server.loading,
+ server.appendonly,
server.dirty,
server.bgsavechildpid != -1,
server.lastsave,
);
unlockThreadedIO();
}
+ if (server.loading) {
+ double perc;
+ time_t eta, elapsed;
+ off_t remaining_bytes = server.loading_total_bytes-
+ server.loading_loaded_bytes;
+
+ perc = ((double)server.loading_loaded_bytes /
+ server.loading_total_bytes) * 100;
+
+ elapsed = time(NULL)-server.loading_start_time;
+ if (elapsed == 0) {
+ eta = 1; /* A fake 1 second figure if we don't have enough info */
+ } else {
+ eta = (elapsed*remaining_bytes)/server.loading_loaded_bytes;
+ }
+
+ info = sdscatprintf(info,
+ "loading_start_time:%ld\r\n"
+ "loading_total_bytes:%llu\r\n"
+ "loading_loaded_bytes:%llu\r\n"
+ "loading_loaded_perc:%.2f\r\n"
+ "loading_eta_seconds:%ld\r\n"
+ ,(unsigned long) server.loading_start_time,
+ (unsigned long long) server.loading_total_bytes,
+ (unsigned long long) server.loading_loaded_bytes,
+ perc,
+ eta
+ );
+ }
for (j = 0; j < server.dbnum; j++) {
long long keys, vkeys;
robj *crlf, *ok, *err, *emptybulk, *czero, *cone, *cnegone, *pong, *space,
*colon, *nullbulk, *nullmultibulk, *queued,
*emptymultibulk, *wrongtypeerr, *nokeyerr, *syntaxerr, *sameobjecterr,
- *outofrangeerr, *plus,
+ *outofrangeerr, *loadingerr, *plus,
*select0, *select1, *select2, *select3, *select4,
*select5, *select6, *select7, *select8, *select9,
*messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *mbulk3,
long long dirty_before_bgsave; /* used to restore dirty on failed BGSAVE */
list *clients;
dict *commands; /* Command table hahs table */
+ /* RDB / AOF loading information */
+ int loading;
+ off_t loading_total_bytes;
+ off_t loading_loaded_bytes;
+ time_t loading_start_time;
/* Fast pointers to often looked up command */
struct redisCommand *delCommand, *multiCommand;
list *slaves, *monitors;
void updateSlavesWaitingBgsave(int bgsaveerr);
void replicationCron(void);
+/* Generic persistence functions */
+void startLoading(FILE *fp);
+void loadingProgress(off_t pos);
+void stopLoading(void);
+
/* RDB persistence */
int rdbLoad(char *filename);
int rdbSaveBackground(char *filename);