]> git.saurik.com Git - redis.git/blame_incremental - src/db.c
Merge remote branch 'pietern/brpoplpush'
[redis.git] / src / db.c
... / ...
CommitLineData
1#include "redis.h"
2
3#include <signal.h>
4
5/*-----------------------------------------------------------------------------
6 * C-level DB API
7 *----------------------------------------------------------------------------*/
8
9robj *lookupKey(redisDb *db, robj *key) {
10 dictEntry *de = dictFind(db->dict,key->ptr);
11 if (de) {
12 robj *val = dictGetEntryVal(de);
13
14 /* Update the access time for the aging algorithm.
15 * Don't do it if we have a saving child, as this will trigger
16 * a copy on write madness. */
17 if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
18 val->lru = server.lruclock;
19
20 if (server.vm_enabled) {
21 if (val->storage == REDIS_VM_MEMORY ||
22 val->storage == REDIS_VM_SWAPPING)
23 {
24 /* If we were swapping the object out, cancel the operation */
25 if (val->storage == REDIS_VM_SWAPPING)
26 vmCancelThreadedIOJob(val);
27 } else {
28 int notify = (val->storage == REDIS_VM_LOADING);
29
30 /* Our value was swapped on disk. Bring it at home. */
31 redisAssert(val->type == REDIS_VMPOINTER);
32 val = vmLoadObject(val);
33 dictGetEntryVal(de) = val;
34
35 /* Clients blocked by the VM subsystem may be waiting for
36 * this key... */
37 if (notify) handleClientsBlockedOnSwappedKey(db,key);
38 }
39 }
40 server.stat_keyspace_hits++;
41 return val;
42 } else {
43 server.stat_keyspace_misses++;
44 return NULL;
45 }
46}
47
48robj *lookupKeyRead(redisDb *db, robj *key) {
49 expireIfNeeded(db,key);
50 return lookupKey(db,key);
51}
52
53robj *lookupKeyWrite(redisDb *db, robj *key) {
54 expireIfNeeded(db,key);
55 return lookupKey(db,key);
56}
57
58robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
59 robj *o = lookupKeyRead(c->db, key);
60 if (!o) addReply(c,reply);
61 return o;
62}
63
64robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
65 robj *o = lookupKeyWrite(c->db, key);
66 if (!o) addReply(c,reply);
67 return o;
68}
69
70/* Add the key to the DB. If the key already exists REDIS_ERR is returned,
71 * otherwise REDIS_OK is returned, and the caller should increment the
72 * refcount of 'val'. */
73int dbAdd(redisDb *db, robj *key, robj *val) {
74 /* Perform a lookup before adding the key, as we need to copy the
75 * key value. */
76 if (dictFind(db->dict, key->ptr) != NULL) {
77 return REDIS_ERR;
78 } else {
79 sds copy = sdsdup(key->ptr);
80 dictAdd(db->dict, copy, val);
81 return REDIS_OK;
82 }
83}
84
85/* If the key does not exist, this is just like dbAdd(). Otherwise
86 * the value associated to the key is replaced with the new one.
87 *
88 * On update (key already existed) 0 is returned. Otherwise 1. */
89int dbReplace(redisDb *db, robj *key, robj *val) {
90 if (dictFind(db->dict,key->ptr) == NULL) {
91 sds copy = sdsdup(key->ptr);
92 dictAdd(db->dict, copy, val);
93 return 1;
94 } else {
95 dictReplace(db->dict, key->ptr, val);
96 return 0;
97 }
98}
99
100int dbExists(redisDb *db, robj *key) {
101 return dictFind(db->dict,key->ptr) != NULL;
102}
103
104/* Return a random key, in form of a Redis object.
105 * If there are no keys, NULL is returned.
106 *
107 * The function makes sure to return keys not already expired. */
108robj *dbRandomKey(redisDb *db) {
109 struct dictEntry *de;
110
111 while(1) {
112 sds key;
113 robj *keyobj;
114
115 de = dictGetRandomKey(db->dict);
116 if (de == NULL) return NULL;
117
118 key = dictGetEntryKey(de);
119 keyobj = createStringObject(key,sdslen(key));
120 if (dictFind(db->expires,key)) {
121 if (expireIfNeeded(db,keyobj)) {
122 decrRefCount(keyobj);
123 continue; /* search for another key. This expired. */
124 }
125 }
126 return keyobj;
127 }
128}
129
130/* Delete a key, value, and associated expiration entry if any, from the DB */
131int dbDelete(redisDb *db, robj *key) {
132 /* If VM is enabled make sure to awake waiting clients for this key:
133 * deleting the key will kill the I/O thread bringing the key from swap
134 * to memory, so the client will never be notified and unblocked if we
135 * don't do it now. */
136 if (server.vm_enabled) handleClientsBlockedOnSwappedKey(db,key);
137 /* Deleting an entry from the expires dict will not free the sds of
138 * the key, because it is shared with the main dictionary. */
139 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
140 return dictDelete(db->dict,key->ptr) == DICT_OK;
141}
142
143/* Empty the whole database */
144long long emptyDb() {
145 int j;
146 long long removed = 0;
147
148 for (j = 0; j < server.dbnum; j++) {
149 removed += dictSize(server.db[j].dict);
150 dictEmpty(server.db[j].dict);
151 dictEmpty(server.db[j].expires);
152 }
153 return removed;
154}
155
156int selectDb(redisClient *c, int id) {
157 if (id < 0 || id >= server.dbnum)
158 return REDIS_ERR;
159 c->db = &server.db[id];
160 return REDIS_OK;
161}
162
163/*-----------------------------------------------------------------------------
164 * Type agnostic commands operating on the key space
165 *----------------------------------------------------------------------------*/
166
167void flushdbCommand(redisClient *c) {
168 server.dirty += dictSize(c->db->dict);
169 touchWatchedKeysOnFlush(c->db->id);
170 dictEmpty(c->db->dict);
171 dictEmpty(c->db->expires);
172 addReply(c,shared.ok);
173}
174
175void flushallCommand(redisClient *c) {
176 touchWatchedKeysOnFlush(-1);
177 server.dirty += emptyDb();
178 addReply(c,shared.ok);
179 if (server.bgsavechildpid != -1) {
180 kill(server.bgsavechildpid,SIGKILL);
181 rdbRemoveTempFile(server.bgsavechildpid);
182 }
183 rdbSave(server.dbfilename);
184 server.dirty++;
185}
186
187void delCommand(redisClient *c) {
188 int deleted = 0, j;
189
190 for (j = 1; j < c->argc; j++) {
191 if (dbDelete(c->db,c->argv[j])) {
192 touchWatchedKey(c->db,c->argv[j]);
193 server.dirty++;
194 deleted++;
195 }
196 }
197 addReplyLongLong(c,deleted);
198}
199
200void existsCommand(redisClient *c) {
201 expireIfNeeded(c->db,c->argv[1]);
202 if (dbExists(c->db,c->argv[1])) {
203 addReply(c, shared.cone);
204 } else {
205 addReply(c, shared.czero);
206 }
207}
208
209void selectCommand(redisClient *c) {
210 int id = atoi(c->argv[1]->ptr);
211
212 if (selectDb(c,id) == REDIS_ERR) {
213 addReplyError(c,"invalid DB index");
214 } else {
215 addReply(c,shared.ok);
216 }
217}
218
219void randomkeyCommand(redisClient *c) {
220 robj *key;
221
222 if ((key = dbRandomKey(c->db)) == NULL) {
223 addReply(c,shared.nullbulk);
224 return;
225 }
226
227 addReplyBulk(c,key);
228 decrRefCount(key);
229}
230
231void keysCommand(redisClient *c) {
232 dictIterator *di;
233 dictEntry *de;
234 sds pattern = c->argv[1]->ptr;
235 int plen = sdslen(pattern), allkeys;
236 unsigned long numkeys = 0;
237 void *replylen = addDeferredMultiBulkLength(c);
238
239 di = dictGetIterator(c->db->dict);
240 allkeys = (pattern[0] == '*' && pattern[1] == '\0');
241 while((de = dictNext(di)) != NULL) {
242 sds key = dictGetEntryKey(de);
243 robj *keyobj;
244
245 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
246 keyobj = createStringObject(key,sdslen(key));
247 if (expireIfNeeded(c->db,keyobj) == 0) {
248 addReplyBulk(c,keyobj);
249 numkeys++;
250 }
251 decrRefCount(keyobj);
252 }
253 }
254 dictReleaseIterator(di);
255 setDeferredMultiBulkLength(c,replylen,numkeys);
256}
257
258void dbsizeCommand(redisClient *c) {
259 addReplyLongLong(c,dictSize(c->db->dict));
260}
261
262void lastsaveCommand(redisClient *c) {
263 addReplyLongLong(c,server.lastsave);
264}
265
266void typeCommand(redisClient *c) {
267 robj *o;
268 char *type;
269
270 o = lookupKeyRead(c->db,c->argv[1]);
271 if (o == NULL) {
272 type = "none";
273 } else {
274 switch(o->type) {
275 case REDIS_STRING: type = "string"; break;
276 case REDIS_LIST: type = "list"; break;
277 case REDIS_SET: type = "set"; break;
278 case REDIS_ZSET: type = "zset"; break;
279 case REDIS_HASH: type = "hash"; break;
280 default: type = "unknown"; break;
281 }
282 }
283 addReplyStatus(c,type);
284}
285
286void saveCommand(redisClient *c) {
287 if (server.bgsavechildpid != -1) {
288 addReplyError(c,"Background save already in progress");
289 return;
290 }
291 if (rdbSave(server.dbfilename) == REDIS_OK) {
292 addReply(c,shared.ok);
293 } else {
294 addReply(c,shared.err);
295 }
296}
297
298void bgsaveCommand(redisClient *c) {
299 if (server.bgsavechildpid != -1) {
300 addReplyError(c,"Background save already in progress");
301 return;
302 }
303 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
304 addReplyStatus(c,"Background saving started");
305 } else {
306 addReply(c,shared.err);
307 }
308}
309
310void shutdownCommand(redisClient *c) {
311 if (prepareForShutdown() == REDIS_OK)
312 exit(0);
313 addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
314}
315
316void renameGenericCommand(redisClient *c, int nx) {
317 robj *o;
318
319 /* To use the same key as src and dst is probably an error */
320 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
321 addReply(c,shared.sameobjecterr);
322 return;
323 }
324
325 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
326 return;
327
328 incrRefCount(o);
329 if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
330 if (nx) {
331 decrRefCount(o);
332 addReply(c,shared.czero);
333 return;
334 }
335 dbReplace(c->db,c->argv[2],o);
336 }
337 dbDelete(c->db,c->argv[1]);
338 touchWatchedKey(c->db,c->argv[1]);
339 touchWatchedKey(c->db,c->argv[2]);
340 server.dirty++;
341 addReply(c,nx ? shared.cone : shared.ok);
342}
343
344void renameCommand(redisClient *c) {
345 renameGenericCommand(c,0);
346}
347
348void renamenxCommand(redisClient *c) {
349 renameGenericCommand(c,1);
350}
351
352void moveCommand(redisClient *c) {
353 robj *o;
354 redisDb *src, *dst;
355 int srcid;
356
357 /* Obtain source and target DB pointers */
358 src = c->db;
359 srcid = c->db->id;
360 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
361 addReply(c,shared.outofrangeerr);
362 return;
363 }
364 dst = c->db;
365 selectDb(c,srcid); /* Back to the source DB */
366
367 /* If the user is moving using as target the same
368 * DB as the source DB it is probably an error. */
369 if (src == dst) {
370 addReply(c,shared.sameobjecterr);
371 return;
372 }
373
374 /* Check if the element exists and get a reference */
375 o = lookupKeyWrite(c->db,c->argv[1]);
376 if (!o) {
377 addReply(c,shared.czero);
378 return;
379 }
380
381 /* Try to add the element to the target DB */
382 if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
383 addReply(c,shared.czero);
384 return;
385 }
386 incrRefCount(o);
387
388 /* OK! key moved, free the entry in the source DB */
389 dbDelete(src,c->argv[1]);
390 server.dirty++;
391 addReply(c,shared.cone);
392}
393
394/*-----------------------------------------------------------------------------
395 * Expires API
396 *----------------------------------------------------------------------------*/
397
398int removeExpire(redisDb *db, robj *key) {
399 /* An expire may only be removed if there is a corresponding entry in the
400 * main dict. Otherwise, the key will never be freed. */
401 redisAssert(dictFind(db->dict,key->ptr) != NULL);
402 return dictDelete(db->expires,key->ptr) == DICT_OK;
403}
404
405void setExpire(redisDb *db, robj *key, time_t when) {
406 dictEntry *de;
407
408 /* Reuse the sds from the main dict in the expire dict */
409 de = dictFind(db->dict,key->ptr);
410 redisAssert(de != NULL);
411 dictReplace(db->expires,dictGetEntryKey(de),(void*)when);
412}
413
414/* Return the expire time of the specified key, or -1 if no expire
415 * is associated with this key (i.e. the key is non volatile) */
416time_t getExpire(redisDb *db, robj *key) {
417 dictEntry *de;
418
419 /* No expire? return ASAP */
420 if (dictSize(db->expires) == 0 ||
421 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
422
423 /* The entry was found in the expire dict, this means it should also
424 * be present in the main dict (safety check). */
425 redisAssert(dictFind(db->dict,key->ptr) != NULL);
426 return (time_t) dictGetEntryVal(de);
427}
428
429/* Propagate expires into slaves and the AOF file.
430 * When a key expires in the master, a DEL operation for this key is sent
431 * to all the slaves and the AOF file if enabled.
432 *
433 * This way the key expiry is centralized in one place, and since both
434 * AOF and the master->slave link guarantee operation ordering, everything
435 * will be consistent even if we allow write operations against expiring
436 * keys. */
437void propagateExpire(redisDb *db, robj *key) {
438 robj *argv[2];
439
440 argv[0] = createStringObject("DEL",3);
441 argv[1] = key;
442 incrRefCount(key);
443
444 if (server.appendonly)
445 feedAppendOnlyFile(server.delCommand,db->id,argv,2);
446 if (listLength(server.slaves))
447 replicationFeedSlaves(server.slaves,db->id,argv,2);
448
449 decrRefCount(argv[0]);
450 decrRefCount(argv[1]);
451}
452
453int expireIfNeeded(redisDb *db, robj *key) {
454 time_t when = getExpire(db,key);
455
456 /* If we are running in the context of a slave, return ASAP:
457 * the slave key expiration is controlled by the master that will
458 * send us synthesized DEL operations for expired keys.
459 *
460 * Still we try to return the right information to the caller,
461 * that is, 0 if we think the key should be still valid, 1 if
462 * we think the key is expired at this time. */
463 if (server.masterhost != NULL) {
464 return time(NULL) > when;
465 }
466
467 if (when < 0) return 0;
468
469 /* Return when this key has not expired */
470 if (time(NULL) <= when) return 0;
471
472 /* Delete the key */
473 server.stat_expiredkeys++;
474 propagateExpire(db,key);
475 return dbDelete(db,key);
476}
477
478/*-----------------------------------------------------------------------------
479 * Expires Commands
480 *----------------------------------------------------------------------------*/
481
482void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
483 dictEntry *de;
484 long seconds;
485
486 if (getLongFromObjectOrReply(c, param, &seconds, NULL) != REDIS_OK) return;
487
488 seconds -= offset;
489
490 de = dictFind(c->db->dict,key->ptr);
491 if (de == NULL) {
492 addReply(c,shared.czero);
493 return;
494 }
495 if (seconds <= 0) {
496 if (dbDelete(c->db,key)) server.dirty++;
497 addReply(c, shared.cone);
498 touchWatchedKey(c->db,key);
499 return;
500 } else {
501 time_t when = time(NULL)+seconds;
502 setExpire(c->db,key,when);
503 addReply(c,shared.cone);
504 touchWatchedKey(c->db,key);
505 server.dirty++;
506 return;
507 }
508}
509
510void expireCommand(redisClient *c) {
511 expireGenericCommand(c,c->argv[1],c->argv[2],0);
512}
513
514void expireatCommand(redisClient *c) {
515 expireGenericCommand(c,c->argv[1],c->argv[2],time(NULL));
516}
517
518void ttlCommand(redisClient *c) {
519 time_t expire, ttl = -1;
520
521 expire = getExpire(c->db,c->argv[1]);
522 if (expire != -1) {
523 ttl = (expire-time(NULL));
524 if (ttl < 0) ttl = -1;
525 }
526 addReplyLongLong(c,(long long)ttl);
527}
528
529void persistCommand(redisClient *c) {
530 dictEntry *de;
531
532 de = dictFind(c->db->dict,c->argv[1]->ptr);
533 if (de == NULL) {
534 addReply(c,shared.czero);
535 } else {
536 if (removeExpire(c->db,c->argv[1])) {
537 addReply(c,shared.cone);
538 server.dirty++;
539 } else {
540 addReply(c,shared.czero);
541 }
542 }
543}