#define REDIS_CMD_SORT_FOR_SCRIPT 256 /* "S" flag */
#define REDIS_CMD_LOADING 512 /* "l" flag */
#define REDIS_CMD_STALE 1024 /* "t" flag */
+#define REDIS_CMD_SKIP_MONITOR 2048 /* "M" flag */
/* Object types */
#define REDIS_STRING 0
dict *dict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP) */
+ dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id;
} redisDb;
* for BRPOPLPUSH. */
} blockingState;
+/* The following structure represents a node in the server.ready_keys list,
+ * where we accumulate all the keys that had clients blocked with a blocking
+ * operation such as B[LR]POP, but received new data in the context of the
+ * last executed command.
+ *
+ * After the execution of every command or script, we run this list to check
+ * if as a result we should serve data to clients blocked, unblocking them.
+ * Note that server.ready_keys will not have duplicates as there dictionary
+ * also called ready_keys in every structure representing a Redis database,
+ * where we make sure to remember if a given key was already added in the
+ * server.ready_keys list. */
+typedef struct readyList {
+ redisDb *db;
+ robj *key;
+} readyList;
+
/* With multiplexing we need to take per-clinet state.
* Clients are taken in a liked list. */
typedef struct redisClient {
*masterdownerr, *roslaveerr,
*oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *rpop, *lpop,
+ *lpush,
*select[REDIS_SHARED_SELECT_CMDS],
*integers[REDIS_SHARED_INTEGERS],
*mbulkhdr[REDIS_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
off_t loading_loaded_bytes;
time_t loading_start_time;
/* Fast pointers to often looked up command */
- struct redisCommand *delCommand, *multiCommand, *lpushCommand;
+ struct redisCommand *delCommand, *multiCommand, *lpushCommand, *lpopCommand,
+ *rpopCommand;
/* Fields used only for stats */
time_t stat_starttime; /* Server start time */
long long stat_numcommands; /* Number of processed commands */
/* Blocked clients */
unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
list *unblocked_clients; /* list of clients to unblock before next loop */
+ list *ready_keys; /* List of readyList structures for BLPOP & co */
/* Sort parameters - qsort_r() is only available under BSD so we
* have to take this state global, in order to pass it to sortCompare() */
int sort_desc;
void listTypeDelete(listTypeEntry *entry);
void listTypeConvert(robj *subject, int enc);
void unblockClientWaitingData(redisClient *c);
-int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele);
+void handleClientsBlockedOnLists(void);
void popGenericCommand(redisClient *c, int where);
/* MULTI/EXEC/WATCH... */