#include <string.h>
#include <stdlib.h>
#include <unistd.h>
+#include <time.h>
#include <ctype.h>
#include <errno.h>
#include <sys/stat.h>
#include "zmalloc.h"
#include "linenoise.h"
#include "help.h"
+#include "anet.h"
+#include "ae.h"
#define REDIS_NOTUSED(V) ((void) V)
+#define OUTPUT_STANDARD 0
+#define OUTPUT_RAW 1
+#define OUTPUT_CSV 2
+
static redisContext *context;
static struct config {
char *hostip;
int latency_mode;
int cluster_mode;
int cluster_reissue_command;
+ int slave_mode;
+ int pipe_mode;
+ int bigkeys;
int stdinarg; /* get last arg from stdin. (-x option) */
char *auth;
- int raw_output; /* output mode per command */
+ int output; /* output mode, see OUTPUT_* defines */
sds mb_delim;
char prompt[128];
char *eval;
return out;
}
+static sds cliFormatReplyCSV(redisReply *r) {
+ unsigned int i;
+
+ sds out = sdsempty();
+ switch (r->type) {
+ case REDIS_REPLY_ERROR:
+ out = sdscat(out,"ERROR,");
+ out = sdscatrepr(out,r->str,strlen(r->str));
+ break;
+ case REDIS_REPLY_STATUS:
+ out = sdscatrepr(out,r->str,r->len);
+ break;
+ case REDIS_REPLY_INTEGER:
+ out = sdscatprintf(out,"%lld",r->integer);
+ break;
+ case REDIS_REPLY_STRING:
+ out = sdscatrepr(out,r->str,r->len);
+ break;
+ case REDIS_REPLY_NIL:
+ out = sdscat(out,"NIL\n");
+ break;
+ case REDIS_REPLY_ARRAY:
+ for (i = 0; i < r->elements; i++) {
+ sds tmp = cliFormatReplyCSV(r->element[i]);
+ out = sdscatlen(out,tmp,sdslen(tmp));
+ if (i != r->elements-1) out = sdscat(out,",");
+ sdsfree(tmp);
+ }
+ break;
+ default:
+ fprintf(stderr,"Unknown reply type: %d\n", r->type);
+ exit(1);
+ }
+ return out;
+}
+
static int cliReadReply(int output_raw_strings) {
void *_reply;
redisReply *reply;
- sds out;
+ sds out = NULL;
int output = 1;
if (redisGetReply(context,&_reply) != REDIS_OK) {
reply = (redisReply*)_reply;
- /* Check if we need to connect to a different node and reissue the request. */
+ /* Check if we need to connect to a different node and reissue the
+ * request. */
if (config.cluster_mode && reply->type == REDIS_REPLY_ERROR &&
(!strncmp(reply->str,"MOVED",5) || !strcmp(reply->str,"ASK")))
{
if (output_raw_strings) {
out = cliFormatReplyRaw(reply);
} else {
- if (config.raw_output) {
+ if (config.output == OUTPUT_RAW) {
out = cliFormatReplyRaw(reply);
out = sdscat(out,"\n");
- } else {
+ } else if (config.output == OUTPUT_STANDARD) {
out = cliFormatReplyTTY(reply,"");
+ } else if (config.output == OUTPUT_CSV) {
+ out = cliFormatReplyCSV(reply);
+ out = sdscat(out,"\n");
}
}
fwrite(out,sdslen(out),1,stdout);
}
if (config.pubsub_mode) {
- if (!config.raw_output)
+ if (config.output != OUTPUT_RAW)
printf("Reading messages... (press Ctrl-C to quit)\n");
while (1) {
if (cliReadReply(output_raw) != REDIS_OK) exit(1);
} else if (!strcmp(argv[i],"-a") && !lastarg) {
config.auth = argv[++i];
} else if (!strcmp(argv[i],"--raw")) {
- config.raw_output = 1;
+ config.output = OUTPUT_RAW;
+ } else if (!strcmp(argv[i],"--csv")) {
+ config.output = OUTPUT_CSV;
} else if (!strcmp(argv[i],"--latency")) {
config.latency_mode = 1;
+ } else if (!strcmp(argv[i],"--slave")) {
+ config.slave_mode = 1;
+ } else if (!strcmp(argv[i],"--pipe")) {
+ config.pipe_mode = 1;
+ } else if (!strcmp(argv[i],"--bigkeys")) {
+ config.bigkeys = 1;
} else if (!strcmp(argv[i],"--eval") && !lastarg) {
config.eval = argv[++i];
} else if (!strcmp(argv[i],"-c")) {
" -a <password> Password to use when connecting to the server\n"
" -r <repeat> Execute specified command N times\n"
" -i <interval> When -r is used, waits <interval> seconds per command.\n"
-" It is possible to specify sub-second times like -i 0.1.\n"
+" It is possible to specify sub-second times like -i 0.1\n"
" -n <db> Database number\n"
" -x Read last argument from STDIN\n"
" -d <delimiter> Multi-bulk delimiter in for raw formatting (default: \\n)\n"
" -c Enable cluster mode (follow -ASK and -MOVED redirections)\n"
" --raw Use raw formatting for replies (default when STDOUT is not a tty)\n"
-" --latency Enter a special mode continuously sampling latency.\n"
-" --eval <file> Send an EVAL command using the Lua script at <file>.\n"
+" --latency Enter a special mode continuously sampling latency\n"
+" --slave Simulate a slave showing commands received from the master\n"
+" --pipe Transfer raw Redis protocol from stdin to server\n"
+" --bigkeys Sample Redis keys looking for big keys\n"
+" --eval <file> Send an EVAL command using the Lua script at <file>\n"
" --help Output this help and exit\n"
" --version Output version and exit\n"
"\n"
static void latencyMode(void) {
redisReply *reply;
- long long start, latency, min, max, tot, count = 0;
+ long long start, latency, min = 0, max = 0, tot = 0, count = 0;
double avg;
if (!context) exit(1);
}
}
+static void slaveMode(void) {
+ /* To start we need to send the SYNC command and return the payload.
+ * The hiredis client lib does not understand this part of the protocol
+ * and we don't want to mess with its buffers, so everything is performed
+ * using direct low-level I/O. */
+ int fd = context->fd;
+ char buf[1024], *p;
+ ssize_t nread;
+ unsigned long long payload;
+
+ /* Send the SYNC command. */
+ if (write(fd,"SYNC\r\n",6) != 6) {
+ fprintf(stderr,"Error writing to master\n");
+ exit(1);
+ }
+
+ /* Read $<payload>\r\n, making sure to read just up to "\n" */
+ p = buf;
+ while(1) {
+ nread = read(fd,p,1);
+ if (nread <= 0) {
+ fprintf(stderr,"Error reading bulk length while SYNCing\n");
+ exit(1);
+ }
+ if (*p == '\n') break;
+ p++;
+ }
+ *p = '\0';
+ payload = strtoull(buf+1,NULL,10);
+ fprintf(stderr,"SYNC with master, discarding %lld bytes of bulk tranfer...\n",
+ payload);
+
+ /* Discard the payload. */
+ while(payload) {
+ nread = read(fd,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload);
+ if (nread <= 0) {
+ fprintf(stderr,"Error reading RDB payload while SYNCing\n");
+ exit(1);
+ }
+ payload -= nread;
+ }
+ fprintf(stderr,"SYNC done. Logging commands from master.\n");
+
+ /* Now we can use the hiredis to read the incoming protocol. */
+ config.output = OUTPUT_CSV;
+ while (cliReadReply(0) == REDIS_OK);
+}
+
+static void pipeMode(void) {
+ int fd = context->fd;
+ long long errors = 0, replies = 0, obuf_len = 0, obuf_pos = 0;
+ char ibuf[1024*16], obuf[1024*16]; /* Input and output buffers */
+ char aneterr[ANET_ERR_LEN];
+ redisReader *reader = redisReaderCreate();
+ redisReply *reply;
+ int eof = 0; /* True once we consumed all the standard input. */
+ int done = 0;
+ char magic[20]; /* Special reply we recognize. */
+
+ srand(time(NULL));
+
+ /* Use non blocking I/O. */
+ if (anetNonBlock(aneterr,fd) == ANET_ERR) {
+ fprintf(stderr, "Can't set the socket in non blocking mode: %s\n",
+ aneterr);
+ exit(1);
+ }
+
+ /* Transfer raw protocol and read replies from the server at the same
+ * time. */
+ while(!done) {
+ int mask = AE_READABLE;
+
+ if (!eof || obuf_len != 0) mask |= AE_WRITABLE;
+ mask = aeWait(fd,mask,1000);
+
+ /* Handle the readable state: we can read replies from the server. */
+ if (mask & AE_READABLE) {
+ ssize_t nread;
+
+ /* Read from socket and feed the hiredis reader. */
+ do {
+ nread = read(fd,ibuf,sizeof(ibuf));
+ if (nread == -1 && errno != EAGAIN && errno != EINTR) {
+ fprintf(stderr, "Error reading from the server: %s\n",
+ strerror(errno));
+ exit(1);
+ }
+ if (nread > 0) redisReaderFeed(reader,ibuf,nread);
+ } while(nread > 0);
+
+ /* Consume replies. */
+ do {
+ if (redisReaderGetReply(reader,(void**)&reply) == REDIS_ERR) {
+ fprintf(stderr, "Error reading replies from server\n");
+ exit(1);
+ }
+ if (reply) {
+ if (reply->type == REDIS_REPLY_ERROR) {
+ fprintf(stderr,"%s\n", reply->str);
+ errors++;
+ } else if (eof && reply->type == REDIS_REPLY_STRING &&
+ reply->len == 20) {
+ /* Check if this is the reply to our final ECHO
+ * command. If so everything was received
+ * from the server. */
+ if (memcmp(reply->str,magic,20) == 0) {
+ printf("Last reply received from server.\n");
+ done = 1;
+ replies--;
+ }
+ }
+ replies++;
+ freeReplyObject(reply);
+ }
+ } while(reply);
+ }
+
+ /* Handle the writable state: we can send protocol to the server. */
+ if (mask & AE_WRITABLE) {
+ while(1) {
+ /* Transfer current buffer to server. */
+ if (obuf_len != 0) {
+ ssize_t nwritten = write(fd,obuf+obuf_pos,obuf_len);
+
+ if (nwritten == -1) {
+ if (errno != EAGAIN && errno != EINTR) {
+ fprintf(stderr, "Error writing to the server: %s\n",
+ strerror(errno));
+ exit(1);
+ } else {
+ nwritten = 0;
+ }
+ }
+ obuf_len -= nwritten;
+ obuf_pos += nwritten;
+ if (obuf_len != 0) break; /* Can't accept more data. */
+ }
+ /* If buffer is empty, load from stdin. */
+ if (obuf_len == 0 && !eof) {
+ ssize_t nread = read(STDIN_FILENO,obuf,sizeof(obuf));
+
+ if (nread == 0) {
+ char echo[] =
+ "*2\r\n$4\r\nECHO\r\n$20\r\n01234567890123456789\r\n";
+ int j;
+
+ eof = 1;
+ /* Everything transfered, so we queue a special
+ * ECHO command that we can match in the replies
+ * to make sure everything was read from the server. */
+ for (j = 0; j < 20; j++)
+ magic[j] = rand() & 0xff;
+ memcpy(echo+19,magic,20);
+ memcpy(obuf,echo,sizeof(echo)-1);
+ obuf_len = sizeof(echo)-1;
+ obuf_pos = 0;
+ printf("All data transferred. Waiting for the last reply...\n");
+ } else if (nread == -1) {
+ fprintf(stderr, "Error reading from stdin: %s\n",
+ strerror(errno));
+ exit(1);
+ } else {
+ obuf_len = nread;
+ obuf_pos = 0;
+ }
+ }
+ if (obuf_len == 0 && eof) break;
+ }
+ }
+ }
+ redisReaderFree(reader);
+ printf("errors: %lld, replies: %lld\n", errors, replies);
+ if (errors)
+ exit(1);
+ else
+ exit(0);
+}
+
+#define TYPE_STRING 0
+#define TYPE_LIST 1
+#define TYPE_SET 2
+#define TYPE_HASH 3
+#define TYPE_ZSET 4
+
+static void findBigKeys(void) {
+ unsigned long long biggest[5] = {0,0,0,0,0};
+ unsigned long long samples = 0;
+ redisReply *reply1, *reply2, *reply3 = NULL;
+ char *sizecmd, *typename[] = {"string","list","set","hash","zset"};
+ int type;
+
+ printf("\n# Press ctrl+c when you have had enough of it... :)\n");
+ printf("# You can use -i 0.1 to sleep 0.1 sec every 100 sampled keys\n");
+ printf("# in order to reduce server load (usually not needed).\n\n");
+ while(1) {
+ /* Sample with RANDOMKEY */
+ reply1 = redisCommand(context,"RANDOMKEY");
+ if (reply1 == NULL) {
+ fprintf(stderr,"\nI/O error\n");
+ exit(1);
+ } else if (reply1->type == REDIS_REPLY_ERROR) {
+ fprintf(stderr, "RANDOMKEY error: %s\n",
+ reply1->str);
+ exit(1);
+ }
+ /* Get the key type */
+ reply2 = redisCommand(context,"TYPE %s",reply1->str);
+ assert(reply2 && reply2->type == REDIS_REPLY_STATUS);
+ samples++;
+
+ /* Get the key "size" */
+ if (!strcmp(reply2->str,"string")) {
+ sizecmd = "STRLEN";
+ type = TYPE_STRING;
+ } else if (!strcmp(reply2->str,"list")) {
+ sizecmd = "LLEN";
+ type = TYPE_LIST;
+ } else if (!strcmp(reply2->str,"set")) {
+ sizecmd = "SCARD";
+ type = TYPE_SET;
+ } else if (!strcmp(reply2->str,"hash")) {
+ sizecmd = "HLEN";
+ type = TYPE_HASH;
+ } else if (!strcmp(reply2->str,"zset")) {
+ sizecmd = "ZCARD";
+ type = TYPE_ZSET;
+ } else if (!strcmp(reply2->str,"none")) {
+ freeReplyObject(reply1);
+ freeReplyObject(reply2);
+ freeReplyObject(reply3);
+ continue;
+ } else {
+ fprintf(stderr, "Unknown key type '%s' for key '%s'\n",
+ reply2->str, reply1->str);
+ exit(1);
+ }
+
+ reply3 = redisCommand(context,"%s %s", sizecmd, reply1->str);
+ if (reply3 && reply3->type == REDIS_REPLY_INTEGER) {
+ if (biggest[type] < reply3->integer) {
+ printf("[%6s] %s | biggest so far with size %llu\n",
+ typename[type], reply1->str,
+ (unsigned long long) reply3->integer);
+ biggest[type] = reply3->integer;
+ }
+ }
+
+ if ((samples % 1000000) == 0)
+ printf("(%llu keys sampled)\n", samples);
+
+ if ((samples % 100) == 0 && config.interval)
+ usleep(config.interval);
+
+ freeReplyObject(reply1);
+ freeReplyObject(reply2);
+ if (reply3) freeReplyObject(reply3);
+ }
+}
+
int main(int argc, char **argv) {
int firstarg;
config.pubsub_mode = 0;
config.latency_mode = 0;
config.cluster_mode = 0;
+ config.slave_mode = 0;
+ config.pipe_mode = 0;
+ config.bigkeys = 0;
config.stdinarg = 0;
config.auth = NULL;
config.eval = NULL;
- config.raw_output = !isatty(fileno(stdout)) && (getenv("FAKETTY") == NULL);
+ if (!isatty(fileno(stdout)) && (getenv("FAKETTY") == NULL))
+ config.output = OUTPUT_RAW;
+ else
+ config.output = OUTPUT_STANDARD;
config.mb_delim = sdsnew("\n");
cliInitHelp();
argc -= firstarg;
argv += firstarg;
- /* Start in latency mode if appropriate */
+ /* Latency mode */
if (config.latency_mode) {
cliConnect(0);
latencyMode();
}
+ /* Slave mode */
+ if (config.slave_mode) {
+ cliConnect(0);
+ slaveMode();
+ }
+
+ /* Pipe mode */
+ if (config.pipe_mode) {
+ cliConnect(0);
+ pipeMode();
+ }
+
+ /* Find big keys */
+ if (config.bigkeys) {
+ cliConnect(0);
+ findBigKeys();
+ }
+
/* Start interactive mode when no command is provided */
if (argc == 0 && !config.eval) {
/* Note that in repl mode we don't abort on connection error.