]> git.saurik.com Git - redis.git/blob - src/db.c
diskstore bug fixing and negative cache proper implementation
[redis.git] / src / db.c
1 #include "redis.h"
2
3 #include <signal.h>
4
5 /*-----------------------------------------------------------------------------
6 * C-level DB API
7 *----------------------------------------------------------------------------*/
8
9 robj *lookupKey(redisDb *db, robj *key) {
10 dictEntry *de = dictFind(db->dict,key->ptr);
11 if (de) {
12 robj *val = dictGetEntryVal(de);
13
14 /* Update the access time for the aging algorithm.
15 * Don't do it if we have a saving child, as this will trigger
16 * a copy on write madness. */
17 if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
18 val->lru = server.lruclock;
19
20 if (server.ds_enabled &&
21 cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG)
22 {
23 /* There is a save in progress for this object!
24 * Wait for it to get out. */
25 waitEmptyIOJobsQueue();
26 processAllPendingIOJobs();
27 redisAssert(!(cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG));
28 }
29 server.stat_keyspace_hits++;
30 return val;
31 } else {
32 time_t expire;
33 robj *val;
34
35 /* Key not found in the in memory hash table, but if disk store is
36 * enabled we may have this key on disk. If so load it in memory
37 * in a blocking way. */
38 if (server.ds_enabled && cacheKeyMayExist(db,key)) {
39 if (cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG) {
40 /* There is a save in progress for this object!
41 * Wait for it to get out. */
42 waitEmptyIOJobsQueue();
43 processAllPendingIOJobs();
44 redisAssert((cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG) == 0);
45 }
46
47 redisLog(REDIS_DEBUG,"Force loading key %s via lookup",
48 key->ptr);
49 val = dsGet(db,key,&expire);
50 if (val) {
51 int retval = dbAdd(db,key,val);
52 redisAssert(retval == REDIS_OK);
53 if (expire != -1) setExpire(db,key,expire);
54 server.stat_keyspace_hits++;
55 return val;
56 }
57 }
58 server.stat_keyspace_misses++;
59 return NULL;
60 }
61 }
62
63 robj *lookupKeyRead(redisDb *db, robj *key) {
64 expireIfNeeded(db,key);
65 return lookupKey(db,key);
66 }
67
68 robj *lookupKeyWrite(redisDb *db, robj *key) {
69 expireIfNeeded(db,key);
70 return lookupKey(db,key);
71 }
72
73 robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
74 robj *o = lookupKeyRead(c->db, key);
75 if (!o) addReply(c,reply);
76 return o;
77 }
78
79 robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
80 robj *o = lookupKeyWrite(c->db, key);
81 if (!o) addReply(c,reply);
82 return o;
83 }
84
85 /* Add the key to the DB. If the key already exists REDIS_ERR is returned,
86 * otherwise REDIS_OK is returned, and the caller should increment the
87 * refcount of 'val'. */
88 int dbAdd(redisDb *db, robj *key, robj *val) {
89 /* Perform a lookup before adding the key, as we need to copy the
90 * key value. */
91 if (dictFind(db->dict, key->ptr) != NULL) {
92 return REDIS_ERR;
93 } else {
94 sds copy = sdsdup(key->ptr);
95 dictAdd(db->dict, copy, val);
96 if (server.ds_enabled) cacheSetKeyMayExist(db,key);
97 return REDIS_OK;
98 }
99 }
100
101 /* If the key does not exist, this is just like dbAdd(). Otherwise
102 * the value associated to the key is replaced with the new one.
103 *
104 * On update (key already existed) 0 is returned. Otherwise 1. */
105 int dbReplace(redisDb *db, robj *key, robj *val) {
106 robj *oldval;
107 int retval;
108
109 if ((oldval = dictFetchValue(db->dict,key->ptr)) == NULL) {
110 sds copy = sdsdup(key->ptr);
111 dictAdd(db->dict, copy, val);
112 retval = 1;
113 } else {
114 dictReplace(db->dict, key->ptr, val);
115 retval = 0;
116 }
117 if (server.ds_enabled) cacheSetKeyMayExist(db,key);
118 return retval;
119 }
120
121 int dbExists(redisDb *db, robj *key) {
122 return dictFind(db->dict,key->ptr) != NULL;
123 }
124
125 /* Return a random key, in form of a Redis object.
126 * If there are no keys, NULL is returned.
127 *
128 * The function makes sure to return keys not already expired. */
129 robj *dbRandomKey(redisDb *db) {
130 struct dictEntry *de;
131
132 while(1) {
133 sds key;
134 robj *keyobj;
135
136 de = dictGetRandomKey(db->dict);
137 if (de == NULL) return NULL;
138
139 key = dictGetEntryKey(de);
140 keyobj = createStringObject(key,sdslen(key));
141 if (dictFind(db->expires,key)) {
142 if (expireIfNeeded(db,keyobj)) {
143 decrRefCount(keyobj);
144 continue; /* search for another key. This expired. */
145 }
146 }
147 return keyobj;
148 }
149 }
150
151 /* Delete a key, value, and associated expiration entry if any, from the DB */
152 int dbDelete(redisDb *db, robj *key) {
153 /* If diskstore is enabled make sure to awake waiting clients for this key
154 * as it is not really useful to wait for a key already deleted to be
155 * loaded from disk. */
156 if (server.ds_enabled) {
157 handleClientsBlockedOnSwappedKey(db,key);
158 cacheSetKeyDoesNotExist(db,key);
159 }
160
161 /* Deleting an entry from the expires dict will not free the sds of
162 * the key, because it is shared with the main dictionary. */
163 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
164 return dictDelete(db->dict,key->ptr) == DICT_OK;
165 }
166
167 /* Empty the whole database */
168 long long emptyDb() {
169 int j;
170 long long removed = 0;
171
172 for (j = 0; j < server.dbnum; j++) {
173 removed += dictSize(server.db[j].dict);
174 dictEmpty(server.db[j].dict);
175 dictEmpty(server.db[j].expires);
176 }
177 return removed;
178 }
179
180 int selectDb(redisClient *c, int id) {
181 if (id < 0 || id >= server.dbnum)
182 return REDIS_ERR;
183 c->db = &server.db[id];
184 return REDIS_OK;
185 }
186
187 /*-----------------------------------------------------------------------------
188 * Hooks for key space changes.
189 *
190 * Every time a key in the database is modified the function
191 * signalModifiedKey() is called.
192 *
193 * Every time a DB is flushed the function signalFlushDb() is called.
194 *----------------------------------------------------------------------------*/
195
196 void signalModifiedKey(redisDb *db, robj *key) {
197 touchWatchedKey(db,key);
198 if (server.ds_enabled)
199 cacheScheduleIO(db,key,REDIS_IO_SAVE);
200 }
201
202 void signalFlushedDb(int dbid) {
203 touchWatchedKeysOnFlush(dbid);
204 }
205
206 /*-----------------------------------------------------------------------------
207 * Type agnostic commands operating on the key space
208 *----------------------------------------------------------------------------*/
209
210 void flushdbCommand(redisClient *c) {
211 server.dirty += dictSize(c->db->dict);
212 signalFlushedDb(c->db->id);
213 dictEmpty(c->db->dict);
214 dictEmpty(c->db->expires);
215 if (server.ds_enabled) dsFlushDb(c->db->id);
216 addReply(c,shared.ok);
217 }
218
219 void flushallCommand(redisClient *c) {
220 signalFlushedDb(-1);
221 server.dirty += emptyDb();
222 addReply(c,shared.ok);
223 if (server.bgsavechildpid != -1) {
224 kill(server.bgsavechildpid,SIGKILL);
225 rdbRemoveTempFile(server.bgsavechildpid);
226 }
227 if (server.ds_enabled)
228 dsFlushDb(-1);
229 else
230 rdbSave(server.dbfilename);
231 server.dirty++;
232 }
233
234 void delCommand(redisClient *c) {
235 int deleted = 0, j;
236
237 for (j = 1; j < c->argc; j++) {
238 if (server.ds_enabled) {
239 lookupKeyRead(c->db,c->argv[j]);
240 /* FIXME: this can be optimized a lot, no real need to load
241 * a possibly huge value. */
242 }
243 if (dbDelete(c->db,c->argv[j])) {
244 signalModifiedKey(c->db,c->argv[j]);
245 server.dirty++;
246 deleted++;
247 } else if (server.ds_enabled) {
248 if (cacheKeyMayExist(c->db,c->argv[j]) &&
249 dsExists(c->db,c->argv[j]))
250 {
251 cacheScheduleIO(c->db,c->argv[j],REDIS_IO_SAVE);
252 deleted = 1;
253 }
254 }
255 }
256 addReplyLongLong(c,deleted);
257 }
258
259 void existsCommand(redisClient *c) {
260 expireIfNeeded(c->db,c->argv[1]);
261 if (dbExists(c->db,c->argv[1])) {
262 addReply(c, shared.cone);
263 } else {
264 addReply(c, shared.czero);
265 }
266 }
267
268 void selectCommand(redisClient *c) {
269 int id = atoi(c->argv[1]->ptr);
270
271 if (selectDb(c,id) == REDIS_ERR) {
272 addReplyError(c,"invalid DB index");
273 } else {
274 addReply(c,shared.ok);
275 }
276 }
277
278 void randomkeyCommand(redisClient *c) {
279 robj *key;
280
281 if ((key = dbRandomKey(c->db)) == NULL) {
282 addReply(c,shared.nullbulk);
283 return;
284 }
285
286 addReplyBulk(c,key);
287 decrRefCount(key);
288 }
289
290 void keysCommand(redisClient *c) {
291 dictIterator *di;
292 dictEntry *de;
293 sds pattern = c->argv[1]->ptr;
294 int plen = sdslen(pattern), allkeys;
295 unsigned long numkeys = 0;
296 void *replylen = addDeferredMultiBulkLength(c);
297
298 di = dictGetIterator(c->db->dict);
299 allkeys = (pattern[0] == '*' && pattern[1] == '\0');
300 while((de = dictNext(di)) != NULL) {
301 sds key = dictGetEntryKey(de);
302 robj *keyobj;
303
304 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
305 keyobj = createStringObject(key,sdslen(key));
306 if (expireIfNeeded(c->db,keyobj) == 0) {
307 addReplyBulk(c,keyobj);
308 numkeys++;
309 }
310 decrRefCount(keyobj);
311 }
312 }
313 dictReleaseIterator(di);
314 setDeferredMultiBulkLength(c,replylen,numkeys);
315 }
316
317 void dbsizeCommand(redisClient *c) {
318 addReplyLongLong(c,dictSize(c->db->dict));
319 }
320
321 void lastsaveCommand(redisClient *c) {
322 addReplyLongLong(c,server.lastsave);
323 }
324
325 void typeCommand(redisClient *c) {
326 robj *o;
327 char *type;
328
329 o = lookupKeyRead(c->db,c->argv[1]);
330 if (o == NULL) {
331 type = "none";
332 } else {
333 switch(o->type) {
334 case REDIS_STRING: type = "string"; break;
335 case REDIS_LIST: type = "list"; break;
336 case REDIS_SET: type = "set"; break;
337 case REDIS_ZSET: type = "zset"; break;
338 case REDIS_HASH: type = "hash"; break;
339 default: type = "unknown"; break;
340 }
341 }
342 addReplyStatus(c,type);
343 }
344
345 void saveCommand(redisClient *c) {
346 if (server.bgsavechildpid != -1) {
347 addReplyError(c,"Background save already in progress");
348 return;
349 }
350 if (rdbSave(server.dbfilename) == REDIS_OK) {
351 addReply(c,shared.ok);
352 } else {
353 addReply(c,shared.err);
354 }
355 }
356
357 void bgsaveCommand(redisClient *c) {
358 if (server.bgsavechildpid != -1) {
359 addReplyError(c,"Background save already in progress");
360 return;
361 }
362 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
363 addReplyStatus(c,"Background saving started");
364 } else {
365 addReply(c,shared.err);
366 }
367 }
368
369 void shutdownCommand(redisClient *c) {
370 if (prepareForShutdown() == REDIS_OK)
371 exit(0);
372 addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
373 }
374
375 void renameGenericCommand(redisClient *c, int nx) {
376 robj *o;
377
378 /* To use the same key as src and dst is probably an error */
379 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
380 addReply(c,shared.sameobjecterr);
381 return;
382 }
383
384 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
385 return;
386
387 incrRefCount(o);
388 if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
389 if (nx) {
390 decrRefCount(o);
391 addReply(c,shared.czero);
392 return;
393 }
394 dbReplace(c->db,c->argv[2],o);
395 }
396 dbDelete(c->db,c->argv[1]);
397 signalModifiedKey(c->db,c->argv[1]);
398 signalModifiedKey(c->db,c->argv[2]);
399 server.dirty++;
400 addReply(c,nx ? shared.cone : shared.ok);
401 }
402
403 void renameCommand(redisClient *c) {
404 renameGenericCommand(c,0);
405 }
406
407 void renamenxCommand(redisClient *c) {
408 renameGenericCommand(c,1);
409 }
410
411 void moveCommand(redisClient *c) {
412 robj *o;
413 redisDb *src, *dst;
414 int srcid;
415
416 /* Obtain source and target DB pointers */
417 src = c->db;
418 srcid = c->db->id;
419 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
420 addReply(c,shared.outofrangeerr);
421 return;
422 }
423 dst = c->db;
424 selectDb(c,srcid); /* Back to the source DB */
425
426 /* If the user is moving using as target the same
427 * DB as the source DB it is probably an error. */
428 if (src == dst) {
429 addReply(c,shared.sameobjecterr);
430 return;
431 }
432
433 /* Check if the element exists and get a reference */
434 o = lookupKeyWrite(c->db,c->argv[1]);
435 if (!o) {
436 addReply(c,shared.czero);
437 return;
438 }
439
440 /* Try to add the element to the target DB */
441 if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
442 addReply(c,shared.czero);
443 return;
444 }
445 incrRefCount(o);
446
447 /* OK! key moved, free the entry in the source DB */
448 dbDelete(src,c->argv[1]);
449 server.dirty++;
450 addReply(c,shared.cone);
451 }
452
453 /*-----------------------------------------------------------------------------
454 * Expires API
455 *----------------------------------------------------------------------------*/
456
457 int removeExpire(redisDb *db, robj *key) {
458 /* An expire may only be removed if there is a corresponding entry in the
459 * main dict. Otherwise, the key will never be freed. */
460 redisAssert(dictFind(db->dict,key->ptr) != NULL);
461 return dictDelete(db->expires,key->ptr) == DICT_OK;
462 }
463
464 void setExpire(redisDb *db, robj *key, time_t when) {
465 dictEntry *de;
466
467 /* Reuse the sds from the main dict in the expire dict */
468 de = dictFind(db->dict,key->ptr);
469 redisAssert(de != NULL);
470 dictReplace(db->expires,dictGetEntryKey(de),(void*)when);
471 }
472
473 /* Return the expire time of the specified key, or -1 if no expire
474 * is associated with this key (i.e. the key is non volatile) */
475 time_t getExpire(redisDb *db, robj *key) {
476 dictEntry *de;
477
478 /* No expire? return ASAP */
479 if (dictSize(db->expires) == 0 ||
480 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
481
482 /* The entry was found in the expire dict, this means it should also
483 * be present in the main dict (safety check). */
484 redisAssert(dictFind(db->dict,key->ptr) != NULL);
485 return (time_t) dictGetEntryVal(de);
486 }
487
488 /* Propagate expires into slaves and the AOF file.
489 * When a key expires in the master, a DEL operation for this key is sent
490 * to all the slaves and the AOF file if enabled.
491 *
492 * This way the key expiry is centralized in one place, and since both
493 * AOF and the master->slave link guarantee operation ordering, everything
494 * will be consistent even if we allow write operations against expiring
495 * keys. */
496 void propagateExpire(redisDb *db, robj *key) {
497 robj *argv[2];
498
499 argv[0] = createStringObject("DEL",3);
500 argv[1] = key;
501 incrRefCount(key);
502
503 if (server.appendonly)
504 feedAppendOnlyFile(server.delCommand,db->id,argv,2);
505 if (listLength(server.slaves))
506 replicationFeedSlaves(server.slaves,db->id,argv,2);
507
508 decrRefCount(argv[0]);
509 decrRefCount(argv[1]);
510 }
511
512 int expireIfNeeded(redisDb *db, robj *key) {
513 time_t when = getExpire(db,key);
514
515 /* If we are running in the context of a slave, return ASAP:
516 * the slave key expiration is controlled by the master that will
517 * send us synthesized DEL operations for expired keys.
518 *
519 * Still we try to return the right information to the caller,
520 * that is, 0 if we think the key should be still valid, 1 if
521 * we think the key is expired at this time. */
522 if (server.masterhost != NULL) {
523 return time(NULL) > when;
524 }
525
526 if (when < 0) return 0;
527
528 /* Return when this key has not expired */
529 if (time(NULL) <= when) return 0;
530
531 /* Delete the key */
532 server.stat_expiredkeys++;
533 propagateExpire(db,key);
534 return dbDelete(db,key);
535 }
536
537 /*-----------------------------------------------------------------------------
538 * Expires Commands
539 *----------------------------------------------------------------------------*/
540
541 void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
542 dictEntry *de;
543 long seconds;
544
545 if (getLongFromObjectOrReply(c, param, &seconds, NULL) != REDIS_OK) return;
546
547 seconds -= offset;
548
549 de = dictFind(c->db->dict,key->ptr);
550 if (de == NULL) {
551 addReply(c,shared.czero);
552 return;
553 }
554 if (seconds <= 0) {
555 if (dbDelete(c->db,key)) server.dirty++;
556 addReply(c, shared.cone);
557 signalModifiedKey(c->db,key);
558 return;
559 } else {
560 time_t when = time(NULL)+seconds;
561 setExpire(c->db,key,when);
562 addReply(c,shared.cone);
563 signalModifiedKey(c->db,key);
564 server.dirty++;
565 return;
566 }
567 }
568
569 void expireCommand(redisClient *c) {
570 expireGenericCommand(c,c->argv[1],c->argv[2],0);
571 }
572
573 void expireatCommand(redisClient *c) {
574 expireGenericCommand(c,c->argv[1],c->argv[2],time(NULL));
575 }
576
577 void ttlCommand(redisClient *c) {
578 time_t expire, ttl = -1;
579
580 expire = getExpire(c->db,c->argv[1]);
581 if (expire != -1) {
582 ttl = (expire-time(NULL));
583 if (ttl < 0) ttl = -1;
584 }
585 addReplyLongLong(c,(long long)ttl);
586 }
587
588 void persistCommand(redisClient *c) {
589 dictEntry *de;
590
591 de = dictFind(c->db->dict,c->argv[1]->ptr);
592 if (de == NULL) {
593 addReply(c,shared.czero);
594 } else {
595 if (removeExpire(c->db,c->argv[1])) {
596 addReply(c,shared.cone);
597 server.dirty++;
598 } else {
599 addReply(c,shared.czero);
600 }
601 }
602 }