X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/8562798308391d489016b3995d438b6187b5980a..d310fbedabd3101505b694f5c25a2e48480a3c2b:/src/redis-cli.c diff --git a/src/redis-cli.c b/src/redis-cli.c index c631aa79..8d20d1cd 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -46,6 +47,8 @@ #include "zmalloc.h" #include "linenoise.h" #include "help.h" +#include "anet.h" +#include "ae.h" #define REDIS_NOTUSED(V) ((void) V) @@ -69,6 +72,8 @@ static struct config { int cluster_mode; int cluster_reissue_command; int slave_mode; + int pipe_mode; + int bigkeys; int stdinarg; /* get last arg from stdin. (-x option) */ char *auth; int output; /* output mode, see OUTPUT_* defines */ @@ -478,7 +483,7 @@ static sds cliFormatReplyCSV(redisReply *r) { static int cliReadReply(int output_raw_strings) { void *_reply; redisReply *reply; - sds out; + sds out = NULL; int output = 1; if (redisGetReply(context,&_reply) != REDIS_OK) { @@ -498,7 +503,8 @@ static int cliReadReply(int output_raw_strings) { reply = (redisReply*)_reply; - /* Check if we need to connect to a different node and reissue the request. */ + /* Check if we need to connect to a different node and reissue the + * request. */ if (config.cluster_mode && reply->type == REDIS_REPLY_ERROR && (!strncmp(reply->str,"MOVED",5) || !strcmp(reply->str,"ASK"))) { @@ -654,6 +660,10 @@ static int parseOptions(int argc, char **argv) { config.latency_mode = 1; } else if (!strcmp(argv[i],"--slave")) { config.slave_mode = 1; + } else if (!strcmp(argv[i],"--pipe")) { + config.pipe_mode = 1; + } else if (!strcmp(argv[i],"--bigkeys")) { + config.bigkeys = 1; } else if (!strcmp(argv[i],"--eval") && !lastarg) { config.eval = argv[++i]; } else if (!strcmp(argv[i],"-c")) { @@ -702,15 +712,17 @@ static void usage() { " -a Password to use when connecting to the server\n" " -r Execute specified command N times\n" " -i When -r is used, waits seconds per command.\n" -" It is possible to specify sub-second times like -i 0.1.\n" +" It is possible to specify sub-second times like -i 0.1\n" " -n Database number\n" " -x Read last argument from STDIN\n" " -d Multi-bulk delimiter in for raw formatting (default: \\n)\n" " -c Enable cluster mode (follow -ASK and -MOVED redirections)\n" " --raw Use raw formatting for replies (default when STDOUT is not a tty)\n" -" --latency Enter a special mode continuously sampling latency.\n" -" --slave Simulate a slave showing commands received from the master.\n" -" --eval Send an EVAL command using the Lua script at .\n" +" --latency Enter a special mode continuously sampling latency\n" +" --slave Simulate a slave showing commands received from the master\n" +" --pipe Transfer raw Redis protocol from stdin to server\n" +" --bigkeys Sample Redis keys looking for big keys\n" +" --eval Send an EVAL command using the Lua script at \n" " --help Output this help and exit\n" " --version Output version and exit\n" "\n" @@ -885,7 +897,7 @@ static int evalMode(int argc, char **argv) { static void latencyMode(void) { redisReply *reply; - long long start, latency, min, max, tot, count = 0; + long long start, latency, min = 0, max = 0, tot = 0, count = 0; double avg; if (!context) exit(1); @@ -926,7 +938,10 @@ static void slaveMode(void) { unsigned long long payload; /* Send the SYNC command. */ - write(fd,"SYNC\r\n",6); + if (write(fd,"SYNC\r\n",6) != 6) { + fprintf(stderr,"Error writing to master\n"); + exit(1); + } /* Read $\r\n, making sure to read just up to "\n" */ p = buf; @@ -960,6 +975,218 @@ static void slaveMode(void) { while (cliReadReply(0) == REDIS_OK); } +static void pipeMode(void) { + int fd = context->fd; + long long errors = 0, replies = 0, obuf_len = 0, obuf_pos = 0; + char ibuf[1024*16], obuf[1024*16]; /* Input and output buffers */ + char aneterr[ANET_ERR_LEN]; + redisReader *reader = redisReaderCreate(); + redisReply *reply; + int eof = 0; /* True once we consumed all the standard input. */ + int done = 0; + char magic[20]; /* Special reply we recognize. */ + + srand(time(NULL)); + + /* Use non blocking I/O. */ + if (anetNonBlock(aneterr,fd) == ANET_ERR) { + fprintf(stderr, "Can't set the socket in non blocking mode: %s\n", + aneterr); + exit(1); + } + + /* Transfer raw protocol and read replies from the server at the same + * time. */ + while(!done) { + int mask = AE_READABLE; + + if (!eof || obuf_len != 0) mask |= AE_WRITABLE; + mask = aeWait(fd,mask,1000); + + /* Handle the readable state: we can read replies from the server. */ + if (mask & AE_READABLE) { + ssize_t nread; + + /* Read from socket and feed the hiredis reader. */ + do { + nread = read(fd,ibuf,sizeof(ibuf)); + if (nread == -1 && errno != EAGAIN && errno != EINTR) { + fprintf(stderr, "Error reading from the server: %s\n", + strerror(errno)); + exit(1); + } + if (nread > 0) redisReaderFeed(reader,ibuf,nread); + } while(nread > 0); + + /* Consume replies. */ + do { + if (redisReaderGetReply(reader,(void**)&reply) == REDIS_ERR) { + fprintf(stderr, "Error reading replies from server\n"); + exit(1); + } + if (reply) { + if (reply->type == REDIS_REPLY_ERROR) { + fprintf(stderr,"%s\n", reply->str); + errors++; + } else if (eof && reply->type == REDIS_REPLY_STRING && + reply->len == 20) { + /* Check if this is the reply to our final ECHO + * command. If so everything was received + * from the server. */ + if (memcmp(reply->str,magic,20) == 0) { + printf("Last reply received from server.\n"); + done = 1; + replies--; + } + } + replies++; + freeReplyObject(reply); + } + } while(reply); + } + + /* Handle the writable state: we can send protocol to the server. */ + if (mask & AE_WRITABLE) { + while(1) { + /* Transfer current buffer to server. */ + if (obuf_len != 0) { + ssize_t nwritten = write(fd,obuf+obuf_pos,obuf_len); + + if (nwritten == -1) { + if (errno != EAGAIN && errno != EINTR) { + fprintf(stderr, "Error writing to the server: %s\n", + strerror(errno)); + exit(1); + } else { + nwritten = 0; + } + } + obuf_len -= nwritten; + obuf_pos += nwritten; + if (obuf_len != 0) break; /* Can't accept more data. */ + } + /* If buffer is empty, load from stdin. */ + if (obuf_len == 0 && !eof) { + ssize_t nread = read(STDIN_FILENO,obuf,sizeof(obuf)); + + if (nread == 0) { + char echo[] = + "*2\r\n$4\r\nECHO\r\n$20\r\n01234567890123456789\r\n"; + int j; + + eof = 1; + /* Everything transfered, so we queue a special + * ECHO command that we can match in the replies + * to make sure everything was read from the server. */ + for (j = 0; j < 20; j++) + magic[j] = rand() & 0xff; + memcpy(echo+19,magic,20); + memcpy(obuf,echo,sizeof(echo)-1); + obuf_len = sizeof(echo)-1; + obuf_pos = 0; + printf("All data transferred. Waiting for the last reply...\n"); + } else if (nread == -1) { + fprintf(stderr, "Error reading from stdin: %s\n", + strerror(errno)); + exit(1); + } else { + obuf_len = nread; + obuf_pos = 0; + } + } + if (obuf_len == 0 && eof) break; + } + } + } + redisReaderFree(reader); + printf("errors: %lld, replies: %lld\n", errors, replies); + if (errors) + exit(1); + else + exit(0); +} + +#define TYPE_STRING 0 +#define TYPE_LIST 1 +#define TYPE_SET 2 +#define TYPE_HASH 3 +#define TYPE_ZSET 4 + +static void findBigKeys(void) { + unsigned long long biggest[5] = {0,0,0,0,0}; + unsigned long long samples = 0; + redisReply *reply1, *reply2, *reply3 = NULL; + char *sizecmd, *typename[] = {"string","list","set","hash","zset"}; + int type; + + printf("\n# Press ctrl+c when you have had enough of it... :)\n"); + printf("# You can use -i 0.1 to sleep 0.1 sec every 100 sampled keys\n"); + printf("# in order to reduce server load (usually not needed).\n\n"); + while(1) { + /* Sample with RANDOMKEY */ + reply1 = redisCommand(context,"RANDOMKEY"); + if (reply1 == NULL) { + fprintf(stderr,"\nI/O error\n"); + exit(1); + } else if (reply1->type == REDIS_REPLY_ERROR) { + fprintf(stderr, "RANDOMKEY error: %s\n", + reply1->str); + exit(1); + } + /* Get the key type */ + reply2 = redisCommand(context,"TYPE %s",reply1->str); + assert(reply2 && reply2->type == REDIS_REPLY_STATUS); + samples++; + + /* Get the key "size" */ + if (!strcmp(reply2->str,"string")) { + sizecmd = "STRLEN"; + type = TYPE_STRING; + } else if (!strcmp(reply2->str,"list")) { + sizecmd = "LLEN"; + type = TYPE_LIST; + } else if (!strcmp(reply2->str,"set")) { + sizecmd = "SCARD"; + type = TYPE_SET; + } else if (!strcmp(reply2->str,"hash")) { + sizecmd = "HLEN"; + type = TYPE_HASH; + } else if (!strcmp(reply2->str,"zset")) { + sizecmd = "ZCARD"; + type = TYPE_ZSET; + } else if (!strcmp(reply2->str,"none")) { + freeReplyObject(reply1); + freeReplyObject(reply2); + freeReplyObject(reply3); + continue; + } else { + fprintf(stderr, "Unknown key type '%s' for key '%s'\n", + reply2->str, reply1->str); + exit(1); + } + + reply3 = redisCommand(context,"%s %s", sizecmd, reply1->str); + if (reply3 && reply3->type == REDIS_REPLY_INTEGER) { + if (biggest[type] < reply3->integer) { + printf("[%6s] %s | biggest so far with size %llu\n", + typename[type], reply1->str, + (unsigned long long) reply3->integer); + biggest[type] = reply3->integer; + } + } + + if ((samples % 1000000) == 0) + printf("(%llu keys sampled)\n", samples); + + if ((samples % 100) == 0 && config.interval) + usleep(config.interval); + + freeReplyObject(reply1); + freeReplyObject(reply2); + if (reply3) freeReplyObject(reply3); + } +} + int main(int argc, char **argv) { int firstarg; @@ -975,6 +1202,9 @@ int main(int argc, char **argv) { config.pubsub_mode = 0; config.latency_mode = 0; config.cluster_mode = 0; + config.slave_mode = 0; + config.pipe_mode = 0; + config.bigkeys = 0; config.stdinarg = 0; config.auth = NULL; config.eval = NULL; @@ -989,18 +1219,30 @@ int main(int argc, char **argv) { argc -= firstarg; argv += firstarg; - /* Start in latency mode if appropriate */ + /* Latency mode */ if (config.latency_mode) { cliConnect(0); latencyMode(); } - /* Start in slave mode if appropriate */ + /* Slave mode */ if (config.slave_mode) { cliConnect(0); slaveMode(); } + /* Pipe mode */ + if (config.pipe_mode) { + if (cliConnect(0) == REDIS_ERR) exit(1); + pipeMode(); + } + + /* Find big keys */ + if (config.bigkeys) { + cliConnect(0); + findBigKeys(); + } + /* Start interactive mode when no command is provided */ if (argc == 0 && !config.eval) { /* Note that in repl mode we don't abort on connection error.