X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/cf1eefa4206b23327e38a144d8d20543307fdbe4..206568257a6df9effd66f292904d0e75c3c928ad:/src/redis-cli.c diff --git a/src/redis-cli.c b/src/redis-cli.c index d0c9d979..bdaa3964 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -49,6 +49,10 @@ #define REDIS_NOTUSED(V) ((void) V) +#define OUTPUT_STANDARD 0 +#define OUTPUT_RAW 1 +#define OUTPUT_CSV 2 + static redisContext *context; static struct config { char *hostip; @@ -61,11 +65,16 @@ static struct config { int shutdown; int monitor_mode; int pubsub_mode; + int latency_mode; + int cluster_mode; + int cluster_reissue_command; + int slave_mode; int stdinarg; /* get last arg from stdin. (-x option) */ char *auth; - int raw_output; /* output mode per command */ + int output; /* output mode, see OUTPUT_* defines */ sds mb_delim; - char prompt[32]; + char prompt[128]; + char *eval; } config; static void usage(); @@ -87,12 +96,19 @@ static long long mstime(void) { } static void cliRefreshPrompt(void) { - if (config.dbnum == 0) - snprintf(config.prompt,sizeof(config.prompt),"redis %s:%d> ", - config.hostip, config.hostport); + int len; + + if (config.hostsocket != NULL) + len = snprintf(config.prompt,sizeof(config.prompt),"redis %s", + config.hostsocket); else - snprintf(config.prompt,sizeof(config.prompt),"redis %s:%d[%d]> ", - config.hostip, config.hostport, config.dbnum); + len = snprintf(config.prompt,sizeof(config.prompt),"redis %s:%d", + config.hostip, config.hostport); + /* Add [dbnum] if needed */ + if (config.dbnum != 0) + len += snprintf(config.prompt+len,sizeof(config.prompt)-len,"[%d]", + config.dbnum); + snprintf(config.prompt+len,sizeof(config.prompt)-len,"> "); } /*------------------------------------------------------------------------------ @@ -423,10 +439,47 @@ static sds cliFormatReplyRaw(redisReply *r) { return out; } +static sds cliFormatReplyCSV(redisReply *r) { + unsigned int i; + + sds out = sdsempty(); + switch (r->type) { + case REDIS_REPLY_ERROR: + out = sdscat(out,"ERROR,"); + out = sdscatrepr(out,r->str,strlen(r->str)); + break; + case REDIS_REPLY_STATUS: + out = sdscatrepr(out,r->str,r->len); + break; + case REDIS_REPLY_INTEGER: + out = sdscatprintf(out,"%lld",r->integer); + break; + case REDIS_REPLY_STRING: + out = sdscatrepr(out,r->str,r->len); + break; + case REDIS_REPLY_NIL: + out = sdscat(out,"NIL\n"); + break; + case REDIS_REPLY_ARRAY: + for (i = 0; i < r->elements; i++) { + sds tmp = cliFormatReplyCSV(r->element[i]); + out = sdscatlen(out,tmp,sdslen(tmp)); + if (i != r->elements-1) out = sdscat(out,","); + sdsfree(tmp); + } + break; + default: + fprintf(stderr,"Unknown reply type: %d\n", r->type); + exit(1); + } + return out; +} + static int cliReadReply(int output_raw_strings) { void *_reply; redisReply *reply; - sds out; + sds out = NULL; + int output = 1; if (redisGetReply(context,&_reply) != REDIS_OK) { if (config.shutdown) @@ -444,18 +497,53 @@ static int cliReadReply(int output_raw_strings) { } reply = (redisReply*)_reply; - if (output_raw_strings) { - out = cliFormatReplyRaw(reply); - } else { - if (config.raw_output) { + + /* Check if we need to connect to a different node and reissue the + * request. */ + if (config.cluster_mode && reply->type == REDIS_REPLY_ERROR && + (!strncmp(reply->str,"MOVED",5) || !strcmp(reply->str,"ASK"))) + { + char *p = reply->str, *s; + int slot; + + output = 0; + /* Comments show the position of the pointer as: + * + * [S] for pointer 's' + * [P] for pointer 'p' + */ + s = strchr(p,' '); /* MOVED[S]3999 127.0.0.1:6381 */ + p = strchr(s+1,' '); /* MOVED[S]3999[P]127.0.0.1:6381 */ + *p = '\0'; + slot = atoi(s+1); + s = strchr(p+1,':'); /* MOVED 3999[P]127.0.0.1[S]6381 */ + *s = '\0'; + sdsfree(config.hostip); + config.hostip = sdsnew(p+1); + config.hostport = atoi(s+1); + if (config.interactive) + printf("-> Redirected to slot [%d] located at %s:%d\n", + slot, config.hostip, config.hostport); + config.cluster_reissue_command = 1; + } + + if (output) { + if (output_raw_strings) { out = cliFormatReplyRaw(reply); - out = sdscat(out,"\n"); } else { - out = cliFormatReplyTTY(reply,""); + if (config.output == OUTPUT_RAW) { + out = cliFormatReplyRaw(reply); + out = sdscat(out,"\n"); + } else if (config.output == OUTPUT_STANDARD) { + out = cliFormatReplyTTY(reply,""); + } else if (config.output == OUTPUT_CSV) { + out = cliFormatReplyCSV(reply); + out = sdscat(out,"\n"); + } } + fwrite(out,sdslen(out),1,stdout); + sdsfree(out); } - fwrite(out,sdslen(out),1,stdout); - sdsfree(out); freeReplyObject(reply); return REDIS_OK; } @@ -465,6 +553,11 @@ static int cliSendCommand(int argc, char **argv, int repeat) { size_t *argvlen; int j, output_raw; + if (!strcasecmp(command,"help") || !strcasecmp(command,"?")) { + cliOutputHelp(--argc, ++argv); + return REDIS_OK; + } + if (context == NULL) return REDIS_ERR; output_raw = 0; @@ -479,10 +572,6 @@ static int cliSendCommand(int argc, char **argv, int repeat) { output_raw = 1; } - if (!strcasecmp(command,"help") || !strcasecmp(command,"?")) { - cliOutputHelp(--argc, ++argv); - return REDIS_OK; - } if (!strcasecmp(command,"shutdown")) config.shutdown = 1; if (!strcasecmp(command,"monitor")) config.monitor_mode = 1; if (!strcasecmp(command,"subscribe") || @@ -501,7 +590,7 @@ static int cliSendCommand(int argc, char **argv, int repeat) { } if (config.pubsub_mode) { - if (!config.raw_output) + if (config.output != OUTPUT_RAW) printf("Reading messages... (press Ctrl-C to quit)\n"); while (1) { if (cliReadReply(output_raw) != REDIS_OK) exit(1); @@ -538,8 +627,7 @@ static int parseOptions(int argc, char **argv) { if (!strcmp(argv[i],"-h") && !lastarg) { sdsfree(config.hostip); - config.hostip = sdsnew(argv[i+1]); - i++; + config.hostip = sdsnew(argv[++i]); } else if (!strcmp(argv[i],"-h") && lastarg) { usage(); } else if (!strcmp(argv[i],"--help")) { @@ -547,30 +635,33 @@ static int parseOptions(int argc, char **argv) { } else if (!strcmp(argv[i],"-x")) { config.stdinarg = 1; } else if (!strcmp(argv[i],"-p") && !lastarg) { - config.hostport = atoi(argv[i+1]); - i++; + config.hostport = atoi(argv[++i]); } else if (!strcmp(argv[i],"-s") && !lastarg) { - config.hostsocket = argv[i+1]; - i++; + config.hostsocket = argv[++i]; } else if (!strcmp(argv[i],"-r") && !lastarg) { - config.repeat = strtoll(argv[i+1],NULL,10); - i++; + config.repeat = strtoll(argv[++i],NULL,10); } else if (!strcmp(argv[i],"-i") && !lastarg) { - double seconds = atof(argv[i+1]); + double seconds = atof(argv[++i]); config.interval = seconds*1000000; - i++; } else if (!strcmp(argv[i],"-n") && !lastarg) { - config.dbnum = atoi(argv[i+1]); - i++; + config.dbnum = atoi(argv[++i]); } else if (!strcmp(argv[i],"-a") && !lastarg) { - config.auth = argv[i+1]; - i++; + config.auth = argv[++i]; } else if (!strcmp(argv[i],"--raw")) { - config.raw_output = 1; + config.output = OUTPUT_RAW; + } else if (!strcmp(argv[i],"--csv")) { + config.output = OUTPUT_CSV; + } else if (!strcmp(argv[i],"--latency")) { + config.latency_mode = 1; + } else if (!strcmp(argv[i],"--slave")) { + config.slave_mode = 1; + } else if (!strcmp(argv[i],"--eval") && !lastarg) { + config.eval = argv[++i]; + } else if (!strcmp(argv[i],"-c")) { + config.cluster_mode = 1; } else if (!strcmp(argv[i],"-d") && !lastarg) { sdsfree(config.mb_delim); - config.mb_delim = sdsnew(argv[i+1]); - i++; + config.mb_delim = sdsnew(argv[++i]); } else if (!strcmp(argv[i],"-v") || !strcmp(argv[i], "--version")) { sds version = cliVersion(); printf("redis-cli %s\n", version); @@ -616,7 +707,11 @@ static void usage() { " -n Database number\n" " -x Read last argument from STDIN\n" " -d Multi-bulk delimiter in for raw formatting (default: \\n)\n" +" -c Enable cluster mode (follow -ASK and -MOVED redirections)\n" " --raw Use raw formatting for replies (default when STDOUT is not a tty)\n" +" --latency Enter a special mode continuously sampling latency.\n" +" --slave Simulate a slave showing commands received from the master.\n" +" --eval Send an EVAL command using the Lua script at .\n" " --help Output this help and exit\n" " --version Output version and exit\n" "\n" @@ -625,6 +720,8 @@ static void usage() { " redis-cli get mypasswd\n" " redis-cli -r 100 lpush mylist x\n" " redis-cli -r 100 -i 1 info | grep used_memory_human:\n" +" redis-cli --eval myscript.lua key1 key2 , arg1 arg2 arg3\n" +" (Note: when using --eval the comma separates KEYS[] from ARGV[] items)\n" "\n" "When no command is given, redis-cli starts in interactive mode.\n" "Type \"help\" in interactive mode for information on available commands.\n" @@ -675,6 +772,7 @@ static void repl() { if (argv == NULL) { printf("Invalid argument(s)\n"); + free(line); continue; } else if (argc > 0) { if (strcasecmp(argv[0],"quit") == 0 || @@ -693,22 +791,31 @@ static void repl() { int repeat, skipargs = 0; repeat = atoi(argv[0]); - if (repeat) { + if (argc > 1 && repeat) { skipargs = 1; } else { repeat = 1; } - if (cliSendCommand(argc-skipargs,argv+skipargs,repeat) - != REDIS_OK) - { - cliConnect(1); - - /* If we still cannot send the command print error. - * We'll try to reconnect the next time. */ + while (1) { + config.cluster_reissue_command = 0; if (cliSendCommand(argc-skipargs,argv+skipargs,repeat) != REDIS_OK) - cliPrintContextError(); + { + cliConnect(1); + + /* If we still cannot send the command print error. + * We'll try to reconnect the next time. */ + if (cliSendCommand(argc-skipargs,argv+skipargs,repeat) + != REDIS_OK) + cliPrintContextError(); + } + /* Issue the command again if we got redirected in cluster mode */ + if (config.cluster_mode && config.cluster_reissue_command) { + cliConnect(1); + } else { + break; + } } elapsed = mstime()-start_time; if (elapsed >= 500) { @@ -739,6 +846,124 @@ static int noninteractive(int argc, char **argv) { return retval; } +static int evalMode(int argc, char **argv) { + sds script = sdsempty(); + FILE *fp; + char buf[1024]; + size_t nread; + char **argv2; + int j, got_comma = 0, keys = 0; + + /* Load the script from the file, as an sds string. */ + fp = fopen(config.eval,"r"); + if (!fp) { + fprintf(stderr, + "Can't open file '%s': %s\n", config.eval, strerror(errno)); + exit(1); + } + while((nread = fread(buf,1,sizeof(buf),fp)) != 0) { + script = sdscatlen(script,buf,nread); + } + fclose(fp); + + /* Create our argument vector */ + argv2 = zmalloc(sizeof(sds)*(argc+3)); + argv2[0] = sdsnew("EVAL"); + argv2[1] = script; + for (j = 0; j < argc; j++) { + if (!got_comma && argv[j][0] == ',' && argv[j][1] == 0) { + got_comma = 1; + continue; + } + argv2[j+3-got_comma] = sdsnew(argv[j]); + if (!got_comma) keys++; + } + argv2[2] = sdscatprintf(sdsempty(),"%d",keys); + + /* Call it */ + return cliSendCommand(argc+3-got_comma, argv2, config.repeat); +} + +static void latencyMode(void) { + redisReply *reply; + long long start, latency, min, max, tot, count = 0; + double avg; + + if (!context) exit(1); + while(1) { + start = mstime(); + reply = redisCommand(context,"PING"); + if (reply == NULL) { + fprintf(stderr,"\nI/O error\n"); + exit(1); + } + latency = mstime()-start; + freeReplyObject(reply); + count++; + if (count == 1) { + min = max = tot = latency; + avg = (double) latency; + } else { + if (latency < min) min = latency; + if (latency > max) max = latency; + tot += latency; + avg = (double) tot/count; + } + printf("\x1b[0G\x1b[2Kmin: %lld, max: %lld, avg: %.2f (%lld samples)", + min, max, avg, count); + fflush(stdout); + usleep(10000); + } +} + +static void slaveMode(void) { + /* To start we need to send the SYNC command and return the payload. + * The hiredis client lib does not understand this part of the protocol + * and we don't want to mess with its buffers, so everything is performed + * using direct low-level I/O. */ + int fd = context->fd; + char buf[1024], *p; + ssize_t nread; + unsigned long long payload; + + /* Send the SYNC command. */ + if (write(fd,"SYNC\r\n",6) != 6) { + fprintf(stderr,"Error writing to master\n"); + exit(1); + } + + /* Read $\r\n, making sure to read just up to "\n" */ + p = buf; + while(1) { + nread = read(fd,p,1); + if (nread <= 0) { + fprintf(stderr,"Error reading bulk length while SYNCing\n"); + exit(1); + } + if (*p == '\n') break; + p++; + } + *p = '\0'; + payload = strtoull(buf+1,NULL,10); + fprintf(stderr,"SYNC with master, discarding %lld bytes of bulk tranfer...\n", + payload); + + /* Discard the payload. */ + while(payload) { + nread = read(fd,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload); + if (nread <= 0) { + fprintf(stderr,"Error reading RDB payload while SYNCing\n"); + exit(1); + } + payload -= nread; + } + fprintf(stderr,"SYNC done. Logging commands from master.\n"); + + /* Now we can use the hiredis to read the incoming protocol. */ + config.output = OUTPUT_CSV; + while (cliReadReply(0) == REDIS_OK); +} + int main(int argc, char **argv) { int firstarg; @@ -752,9 +977,15 @@ int main(int argc, char **argv) { config.shutdown = 0; config.monitor_mode = 0; config.pubsub_mode = 0; + config.latency_mode = 0; + config.cluster_mode = 0; config.stdinarg = 0; config.auth = NULL; - config.raw_output = !isatty(fileno(stdout)) && (getenv("FAKETTY") == NULL); + config.eval = NULL; + if (!isatty(fileno(stdout)) && (getenv("FAKETTY") == NULL)) + config.output = OUTPUT_RAW; + else + config.output = OUTPUT_STANDARD; config.mb_delim = sdsnew("\n"); cliInitHelp(); @@ -762,8 +993,20 @@ int main(int argc, char **argv) { argc -= firstarg; argv += firstarg; + /* Start in latency mode if appropriate */ + if (config.latency_mode) { + cliConnect(0); + latencyMode(); + } + + /* Start in slave mode if appropriate */ + if (config.slave_mode) { + cliConnect(0); + slaveMode(); + } + /* Start interactive mode when no command is provided */ - if (argc == 0) { + if (argc == 0 && !config.eval) { /* Note that in repl mode we don't abort on connection error. * A new attempt will be performed for every command send. */ cliConnect(0); @@ -772,5 +1015,9 @@ int main(int argc, char **argv) { /* Otherwise, we have some arguments to execute */ if (cliConnect(0) != REDIS_OK) exit(1); - return noninteractive(argc,convertToSds(argc,argv)); + if (config.eval) { + return evalMode(argc,argv); + } else { + return noninteractive(argc,convertToSds(argc,argv)); + } }