X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/ecfaf6da92d8d482460d9491a69d80f91b2af7c0..6fa987e390f12388e1597ab5c46f58618c859912:/benchmark.c diff --git a/benchmark.c b/benchmark.c index 38e85410..2984efe4 100644 --- a/benchmark.c +++ b/benchmark.c @@ -28,6 +28,8 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include "fmacros.h" + #include #include #include @@ -46,6 +48,7 @@ #define REPLY_INT 0 #define REPLY_RETCODE 1 #define REPLY_BULK 2 +#define REPLY_MBULK 3 #define CLIENT_CONNECTING 0 #define CLIENT_SENDQUERY 1 @@ -56,6 +59,7 @@ #define REDIS_NOTUSED(V) ((void) V) static struct config { + int debug; int numclients; int requests; int liveclients; @@ -74,6 +78,7 @@ static struct config { list *clients; int quiet; int loop; + int idlemode; } config; typedef struct _client { @@ -81,7 +86,9 @@ typedef struct _client { int fd; sds obuf; sds ibuf; + int mbulk; /* Number of elements in an mbulk reply */ int readlen; /* readlen == -1 means read a single line */ + int totreceived; unsigned int written; /* bytes of 'obuf' already written */ int replytype; long long start; /* start time in milliseconds */ @@ -130,13 +137,17 @@ static void freeAllClients(void) { static void resetClient(client c) { aeDeleteFileEvent(config.el,c->fd,AE_WRITABLE); aeDeleteFileEvent(config.el,c->fd,AE_READABLE); - aeCreateFileEvent(config.el,c->fd, AE_WRITABLE,writeHandler,c,NULL); + aeCreateFileEvent(config.el,c->fd, AE_WRITABLE,writeHandler,c); sdsfree(c->ibuf); c->ibuf = sdsempty(); - c->readlen = (c->replytype == REPLY_BULK) ? -1 : 0; + c->readlen = (c->replytype == REPLY_BULK || + c->replytype == REPLY_MBULK) ? -1 : 0; + c->mbulk = -1; c->written = 0; + c->totreceived = 0; c->state = CLIENT_SENDQUERY; c->start = mstime(); + createMissingClients(c); } static void randomizeClientKey(client c) { @@ -152,13 +163,33 @@ static void randomizeClientKey(client c) { memcpy(p,buf,strlen(buf)); } +static void prepareClientForReply(client c, int type) { + if (type == REPLY_BULK) { + c->replytype = REPLY_BULK; + c->readlen = -1; + } else if (type == REPLY_MBULK) { + c->replytype = REPLY_MBULK; + c->readlen = -1; + c->mbulk = -1; + } else { + c->replytype = type; + c->readlen = 0; + } +} + static void clientDone(client c) { + static int last_tot_received = 1; + long long latency; config.donerequests ++; latency = mstime() - c->start; if (latency > MAX_LATENCY) latency = MAX_LATENCY; config.latency[latency]++; + if (config.debug && last_tot_received != c->totreceived) { + printf("Tot bytes received: %d\n", c->totreceived); + last_tot_received = c->totreceived; + } if (config.donerequests == config.requests) { freeClient(c); aeStop(config.el); @@ -195,23 +226,55 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) freeClient(c); return; } + c->totreceived += nread; c->ibuf = sdscatlen(c->ibuf,buf,nread); +processdata: + /* Are we waiting for the first line of the command of for sdf + * count in bulk or multi bulk operations? */ if (c->replytype == REPLY_INT || c->replytype == REPLY_RETCODE || - (c->replytype == REPLY_BULK && c->readlen == -1)) { + (c->replytype == REPLY_BULK && c->readlen == -1) || + (c->replytype == REPLY_MBULK && c->readlen == -1) || + (c->replytype == REPLY_MBULK && c->mbulk == -1)) { char *p; + /* Check if the first line is complete. This is only true if + * there is a newline inside the buffer. */ if ((p = strchr(c->ibuf,'\n')) != NULL) { - if (c->replytype == REPLY_BULK) { + if (c->replytype == REPLY_BULK || + (c->replytype == REPLY_MBULK && c->mbulk != -1)) + { + /* Read the count of a bulk reply (being it a single bulk or + * a multi bulk reply). "$" for the protocol spec. */ *p = '\0'; *(p-1) = '\0'; c->readlen = atoi(c->ibuf+1)+2; + // printf("BULK ATOI: %s\n", c->ibuf+1); + /* Handle null bulk reply "$-1" */ if (c->readlen-2 == -1) { clientDone(c); return; } + /* Leave all the rest in the input buffer */ c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1); + /* fall through to reach the point where the code will try + * to check if the bulk reply is complete. */ + } else if (c->replytype == REPLY_MBULK && c->mbulk == -1) { + /* Read the count of a multi bulk reply. That is, how many + * bulk replies we have to read next. "*" protocol. */ + *p = '\0'; + *(p-1) = '\0'; + c->mbulk = atoi(c->ibuf+1); + /* Handle null bulk reply "*-1" */ + if (c->mbulk == -1) { + clientDone(c); + return; + } + // printf("%p) %d elements list\n", c, c->mbulk); + /* Leave all the rest in the input buffer */ + c->ibuf = sdsrange(c->ibuf,(p-c->ibuf)+1,-1); + goto processdata; } else { c->ibuf = sdstrim(c->ibuf,"\r\n"); clientDone(c); @@ -219,9 +282,28 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) } } } - /* bulk read */ - if ((unsigned)c->readlen == sdslen(c->ibuf)) - clientDone(c); + /* bulk read, did we read everything? */ + if (((c->replytype == REPLY_MBULK && c->mbulk != -1) || + (c->replytype == REPLY_BULK)) && c->readlen != -1 && + (unsigned)c->readlen <= sdslen(c->ibuf)) + { + // printf("BULKSTATUS mbulk:%d readlen:%d sdslen:%d\n", + // c->mbulk,c->readlen,sdslen(c->ibuf)); + if (c->replytype == REPLY_BULK) { + clientDone(c); + } else if (c->replytype == REPLY_MBULK) { + // printf("%p) %d (%d)) ",c, c->mbulk, c->readlen); + // fwrite(c->ibuf,c->readlen,1,stdout); + // printf("\n"); + if (--c->mbulk == 0) { + clientDone(c); + } else { + c->ibuf = sdsrange(c->ibuf,c->readlen,-1); + c->readlen = -1; + goto processdata; + } + } + } } static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) @@ -240,14 +322,15 @@ static void writeHandler(aeEventLoop *el, int fd, void *privdata, int mask) int len = sdslen(c->obuf) - c->written; int nwritten = write(c->fd, ptr, len); if (nwritten == -1) { - fprintf(stderr, "Writing to socket: %s\n", strerror(errno)); + if (errno != EPIPE) + fprintf(stderr, "Writing to socket: %s\n", strerror(errno)); freeClient(c); return; } c->written += nwritten; if (sdslen(c->obuf) == c->written) { aeDeleteFileEvent(config.el,c->fd,AE_WRITABLE); - aeCreateFileEvent(config.el,c->fd,AE_READABLE,readHandler,c,NULL); + aeCreateFileEvent(config.el,c->fd,AE_READABLE,readHandler,c); c->state = CLIENT_READREPLY; } } @@ -266,10 +349,12 @@ static client createClient(void) { anetTcpNoDelay(NULL,c->fd); c->obuf = sdsempty(); c->ibuf = sdsempty(); + c->mbulk = -1; c->readlen = 0; c->written = 0; + c->totreceived = 0; c->state = CLIENT_CONNECTING; - aeCreateFileEvent(config.el, c->fd, AE_WRITABLE, writeHandler, c, NULL); + aeCreateFileEvent(config.el, c->fd, AE_WRITABLE, writeHandler, c); config.liveclients++; listAddNodeTail(config.clients,c); return c; @@ -282,9 +367,7 @@ static void createMissingClients(client c) { sdsfree(new->obuf); new->obuf = sdsdup(c->obuf); if (config.randomkeys) randomizeClientKey(c); - new->replytype = c->replytype; - if (c->replytype == REPLY_BULK) - new->readlen = -1; + prepareClientForReply(new,c->replytype); } } @@ -368,6 +451,10 @@ void parseOptions(int argc, char **argv) { config.quiet = 1; } else if (!strcmp(argv[i],"-l")) { config.loop = 1; + } else if (!strcmp(argv[i],"-D")) { + config.debug = 1; + } else if (!strcmp(argv[i],"-I")) { + config.idlemode = 1; } else { printf("Wrong option '%s' or option argument missing\n\n",argv[i]); printf("Usage: redis-benchmark [-h ] [-p ] [-c ] [-n [-k ]\n\n"); @@ -386,6 +473,8 @@ void parseOptions(int argc, char **argv) { printf(" range will be allowed.\n"); printf(" -q Quiet. Just show query/sec values\n"); printf(" -l Loop. Run the tests forever\n"); + printf(" -I Idle mode. Just open N idle connections and wait.\n"); + printf(" -D Debug mode. more verbose.\n"); exit(1); } } @@ -397,6 +486,7 @@ int main(int argc, char **argv) { signal(SIGHUP, SIG_IGN); signal(SIGPIPE, SIG_IGN); + config.debug = 0; config.numclients = 50; config.requests = 10000; config.liveclients = 0; @@ -408,6 +498,7 @@ int main(int argc, char **argv) { config.randomkeys_keyspacelen = 0; config.quiet = 0; config.loop = 0; + config.idlemode = 0; config.latency = NULL; config.clients = listCreate(); config.latency = zmalloc(sizeof(int)*(MAX_LATENCY+1)); @@ -418,19 +509,22 @@ int main(int argc, char **argv) { parseOptions(argc,argv); if (config.keepalive == 0) { - printf("WARNING: keepalive disabled, you probably need 'echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse' in order to use a lot of clients/requests\n"); + printf("WARNING: keepalive disabled, you probably need 'echo 1 > /proc/sys/net/ipv4/tcp_tw_reuse' for Linux and 'sudo sysctl -w net.inet.tcp.msl=1000' for Mac OS X in order to use a lot of clients/requests\n"); } - do { + if (config.idlemode) { + printf("Creating %d idle connections and waiting forever (Ctrl+C when done)\n", config.numclients); prepareForBenchmark(); c = createClient(); if (!c) exit(1); - c->obuf = sdscat(c->obuf,"PING\r\n"); - c->replytype = REPLY_RETCODE; + c->obuf = sdsempty(); + prepareClientForReply(c,REPLY_RETCODE); /* will never receive it */ createMissingClients(c); aeMain(config.el); - endBenchmark("PING"); + /* and will wait for every */ + } + do { prepareForBenchmark(); c = createClient(); if (!c) exit(1); @@ -442,7 +536,7 @@ int main(int argc, char **argv) { data[config.datasize+1] = '\n'; c->obuf = sdscatlen(c->obuf,data,config.datasize+2); } - c->replytype = REPLY_RETCODE; + prepareClientForReply(c,REPLY_RETCODE); createMissingClients(c); aeMain(config.el); endBenchmark("SET"); @@ -451,8 +545,7 @@ int main(int argc, char **argv) { c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"GET foo_rand000000000000\r\n"); - c->replytype = REPLY_BULK; - c->readlen = -1; + prepareClientForReply(c,REPLY_BULK); createMissingClients(c); aeMain(config.el); endBenchmark("GET"); @@ -461,7 +554,7 @@ int main(int argc, char **argv) { c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"INCR counter_rand000000000000\r\n"); - c->replytype = REPLY_INT; + prepareClientForReply(c,REPLY_INT); createMissingClients(c); aeMain(config.el); endBenchmark("INCR"); @@ -470,7 +563,7 @@ int main(int argc, char **argv) { c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n"); - c->replytype = REPLY_INT; + prepareClientForReply(c,REPLY_INT); createMissingClients(c); aeMain(config.el); endBenchmark("LPUSH"); @@ -479,12 +572,65 @@ int main(int argc, char **argv) { c = createClient(); if (!c) exit(1); c->obuf = sdscat(c->obuf,"LPOP mylist\r\n"); - c->replytype = REPLY_BULK; - c->readlen = -1; + prepareClientForReply(c,REPLY_BULK); createMissingClients(c); aeMain(config.el); endBenchmark("LPOP"); + prepareForBenchmark(); + c = createClient(); + if (!c) exit(1); + c->obuf = sdscat(c->obuf,"PING\r\n"); + prepareClientForReply(c,REPLY_RETCODE); + createMissingClients(c); + aeMain(config.el); + endBenchmark("PING"); + + prepareForBenchmark(); + c = createClient(); + if (!c) exit(1); + c->obuf = sdscat(c->obuf,"LPUSH mylist 3\r\nbar\r\n"); + prepareClientForReply(c,REPLY_RETCODE); + createMissingClients(c); + aeMain(config.el); + endBenchmark("LPUSH (again, in order to bench LRANGE)"); + + prepareForBenchmark(); + c = createClient(); + if (!c) exit(1); + c->obuf = sdscat(c->obuf,"LRANGE mylist 0 99\r\n"); + prepareClientForReply(c,REPLY_MBULK); + createMissingClients(c); + aeMain(config.el); + endBenchmark("LRANGE (first 100 elements)"); + + prepareForBenchmark(); + c = createClient(); + if (!c) exit(1); + c->obuf = sdscat(c->obuf,"LRANGE mylist 0 299\r\n"); + prepareClientForReply(c,REPLY_MBULK); + createMissingClients(c); + aeMain(config.el); + endBenchmark("LRANGE (first 300 elements)"); + + prepareForBenchmark(); + c = createClient(); + if (!c) exit(1); + c->obuf = sdscat(c->obuf,"LRANGE mylist 0 449\r\n"); + prepareClientForReply(c,REPLY_MBULK); + createMissingClients(c); + aeMain(config.el); + endBenchmark("LRANGE (first 450 elements)"); + + prepareForBenchmark(); + c = createClient(); + if (!c) exit(1); + c->obuf = sdscat(c->obuf,"LRANGE mylist 0 599\r\n"); + prepareClientForReply(c,REPLY_MBULK); + createMissingClients(c); + aeMain(config.el); + endBenchmark("LRANGE (first 600 elements)"); + printf("\n"); } while(config.loop);