]> git.saurik.com Git - redis.git/blob - src/db.c
1a30034e811306e2848c4d27cee2c2511b3789d2
[redis.git] / src / db.c
1 #include "redis.h"
2
3 #include <signal.h>
4
5 /*-----------------------------------------------------------------------------
6 * C-level DB API
7 *----------------------------------------------------------------------------*/
8
9 robj *lookupKey(redisDb *db, robj *key) {
10 dictEntry *de = dictFind(db->dict,key->ptr);
11 if (de) {
12 robj *val = dictGetEntryVal(de);
13
14 /* Update the access time for the aging algorithm.
15 * Don't do it if we have a saving child, as this will trigger
16 * a copy on write madness. */
17 if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
18 val->lru = server.lruclock;
19
20 if (server.ds_enabled && val->storage == REDIS_DS_SAVING) {
21 /* FIXME: change this code to just wait for our object to
22 * get out of the IO Job. */
23 waitEmptyIOJobsQueue();
24 processAllPendingIOJobs();
25 redisAssert(val->storage != REDIS_DS_SAVING);
26 }
27 server.stat_keyspace_hits++;
28 return val;
29 } else {
30 time_t expire;
31 robj *val;
32
33 /* Key not found in the in memory hash table, but if disk store is
34 * enabled we may have this key on disk. If so load it in memory
35 * in a blocking way.
36 *
37 * FIXME: race condition here. If there was an already scheduled
38 * async loading of this key, what may happen is that the old
39 * key is loaded in memory if this gets deleted in the meantime. */
40 if (server.ds_enabled && cacheKeyMayExist(db,key)) {
41 redisLog(REDIS_DEBUG,"Force loading key %s via lookup",
42 key->ptr);
43 val = dsGet(db,key,&expire);
44 if (val) {
45 int retval = dbAdd(db,key,val);
46 redisAssert(retval == REDIS_OK);
47 if (expire != -1) setExpire(db,key,expire);
48 server.stat_keyspace_hits++;
49 return val;
50 }
51 }
52 server.stat_keyspace_misses++;
53 return NULL;
54 }
55 }
56
57 robj *lookupKeyRead(redisDb *db, robj *key) {
58 expireIfNeeded(db,key);
59 return lookupKey(db,key);
60 }
61
62 robj *lookupKeyWrite(redisDb *db, robj *key) {
63 expireIfNeeded(db,key);
64 return lookupKey(db,key);
65 }
66
67 robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
68 robj *o = lookupKeyRead(c->db, key);
69 if (!o) addReply(c,reply);
70 return o;
71 }
72
73 robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
74 robj *o = lookupKeyWrite(c->db, key);
75 if (!o) addReply(c,reply);
76 return o;
77 }
78
79 /* Add the key to the DB. If the key already exists REDIS_ERR is returned,
80 * otherwise REDIS_OK is returned, and the caller should increment the
81 * refcount of 'val'. */
82 int dbAdd(redisDb *db, robj *key, robj *val) {
83 /* Perform a lookup before adding the key, as we need to copy the
84 * key value. */
85 if (dictFind(db->dict, key->ptr) != NULL) {
86 return REDIS_ERR;
87 } else {
88 sds copy = sdsdup(key->ptr);
89 dictAdd(db->dict, copy, val);
90 if (server.ds_enabled) {
91 /* FIXME: remove entry from negative cache */
92 }
93 return REDIS_OK;
94 }
95 }
96
97 /* If the key does not exist, this is just like dbAdd(). Otherwise
98 * the value associated to the key is replaced with the new one.
99 *
100 * On update (key already existed) 0 is returned. Otherwise 1. */
101 int dbReplace(redisDb *db, robj *key, robj *val) {
102 robj *oldval;
103
104 if ((oldval = dictFetchValue(db->dict,key->ptr)) == NULL) {
105 sds copy = sdsdup(key->ptr);
106 dictAdd(db->dict, copy, val);
107 return 1;
108 } else {
109 val->storage = oldval->storage;
110 dictReplace(db->dict, key->ptr, val);
111 return 0;
112 }
113 }
114
115 int dbExists(redisDb *db, robj *key) {
116 return dictFind(db->dict,key->ptr) != NULL;
117 }
118
119 /* Return a random key, in form of a Redis object.
120 * If there are no keys, NULL is returned.
121 *
122 * The function makes sure to return keys not already expired. */
123 robj *dbRandomKey(redisDb *db) {
124 struct dictEntry *de;
125
126 while(1) {
127 sds key;
128 robj *keyobj;
129
130 de = dictGetRandomKey(db->dict);
131 if (de == NULL) return NULL;
132
133 key = dictGetEntryKey(de);
134 keyobj = createStringObject(key,sdslen(key));
135 if (dictFind(db->expires,key)) {
136 if (expireIfNeeded(db,keyobj)) {
137 decrRefCount(keyobj);
138 continue; /* search for another key. This expired. */
139 }
140 }
141 return keyobj;
142 }
143 }
144
145 /* Delete a key, value, and associated expiration entry if any, from the DB */
146 int dbDelete(redisDb *db, robj *key) {
147 /* If diskstore is enabled make sure to awake waiting clients for this key
148 * as it is not really useful to wait for a key already deleted to be
149 * loaded from disk. */
150 if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
151
152 /* Mark this key as non existing on disk as well */
153 cacheSetKeyDoesNotExistRemember(db,key);
154
155 /* Deleting an entry from the expires dict will not free the sds of
156 * the key, because it is shared with the main dictionary. */
157 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
158 return dictDelete(db->dict,key->ptr) == DICT_OK;
159 }
160
161 /* Empty the whole database */
162 long long emptyDb() {
163 int j;
164 long long removed = 0;
165
166 for (j = 0; j < server.dbnum; j++) {
167 removed += dictSize(server.db[j].dict);
168 dictEmpty(server.db[j].dict);
169 dictEmpty(server.db[j].expires);
170 }
171 return removed;
172 }
173
174 int selectDb(redisClient *c, int id) {
175 if (id < 0 || id >= server.dbnum)
176 return REDIS_ERR;
177 c->db = &server.db[id];
178 return REDIS_OK;
179 }
180
181 /*-----------------------------------------------------------------------------
182 * Hooks for key space changes.
183 *
184 * Every time a key in the database is modified the function
185 * signalModifiedKey() is called.
186 *
187 * Every time a DB is flushed the function signalFlushDb() is called.
188 *----------------------------------------------------------------------------*/
189
190 void signalModifiedKey(redisDb *db, robj *key) {
191 touchWatchedKey(db,key);
192 if (server.ds_enabled)
193 cacheScheduleForFlush(db,key);
194 }
195
196 void signalFlushedDb(int dbid) {
197 touchWatchedKeysOnFlush(dbid);
198 if (server.ds_enabled)
199 dsFlushDb(dbid);
200 }
201
202 /*-----------------------------------------------------------------------------
203 * Type agnostic commands operating on the key space
204 *----------------------------------------------------------------------------*/
205
206 void flushdbCommand(redisClient *c) {
207 server.dirty += dictSize(c->db->dict);
208 signalFlushedDb(c->db->id);
209 dictEmpty(c->db->dict);
210 dictEmpty(c->db->expires);
211 addReply(c,shared.ok);
212 }
213
214 void flushallCommand(redisClient *c) {
215 signalFlushedDb(-1);
216 server.dirty += emptyDb();
217 addReply(c,shared.ok);
218 if (server.bgsavechildpid != -1) {
219 kill(server.bgsavechildpid,SIGKILL);
220 rdbRemoveTempFile(server.bgsavechildpid);
221 }
222 rdbSave(server.dbfilename);
223 server.dirty++;
224 }
225
226 void delCommand(redisClient *c) {
227 int deleted = 0, j;
228
229 for (j = 1; j < c->argc; j++) {
230 if (server.ds_enabled) {
231 lookupKeyRead(c->db,c->argv[j]);
232 /* FIXME: this can be optimized a lot, no real need to load
233 * a possibly huge value. */
234 }
235 if (dbDelete(c->db,c->argv[j])) {
236 signalModifiedKey(c->db,c->argv[j]);
237 server.dirty++;
238 deleted++;
239 } else if (server.ds_enabled) {
240 if (cacheKeyMayExist(c->db,c->argv[j]) &&
241 dsExists(c->db,c->argv[j]))
242 {
243 cacheScheduleForFlush(c->db,c->argv[j]);
244 deleted = 1;
245 }
246 }
247 }
248 addReplyLongLong(c,deleted);
249 }
250
251 void existsCommand(redisClient *c) {
252 expireIfNeeded(c->db,c->argv[1]);
253 if (dbExists(c->db,c->argv[1])) {
254 addReply(c, shared.cone);
255 } else {
256 addReply(c, shared.czero);
257 }
258 }
259
260 void selectCommand(redisClient *c) {
261 int id = atoi(c->argv[1]->ptr);
262
263 if (selectDb(c,id) == REDIS_ERR) {
264 addReplyError(c,"invalid DB index");
265 } else {
266 addReply(c,shared.ok);
267 }
268 }
269
270 void randomkeyCommand(redisClient *c) {
271 robj *key;
272
273 if ((key = dbRandomKey(c->db)) == NULL) {
274 addReply(c,shared.nullbulk);
275 return;
276 }
277
278 addReplyBulk(c,key);
279 decrRefCount(key);
280 }
281
282 void keysCommand(redisClient *c) {
283 dictIterator *di;
284 dictEntry *de;
285 sds pattern = c->argv[1]->ptr;
286 int plen = sdslen(pattern), allkeys;
287 unsigned long numkeys = 0;
288 void *replylen = addDeferredMultiBulkLength(c);
289
290 di = dictGetIterator(c->db->dict);
291 allkeys = (pattern[0] == '*' && pattern[1] == '\0');
292 while((de = dictNext(di)) != NULL) {
293 sds key = dictGetEntryKey(de);
294 robj *keyobj;
295
296 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
297 keyobj = createStringObject(key,sdslen(key));
298 if (expireIfNeeded(c->db,keyobj) == 0) {
299 addReplyBulk(c,keyobj);
300 numkeys++;
301 }
302 decrRefCount(keyobj);
303 }
304 }
305 dictReleaseIterator(di);
306 setDeferredMultiBulkLength(c,replylen,numkeys);
307 }
308
309 void dbsizeCommand(redisClient *c) {
310 addReplyLongLong(c,dictSize(c->db->dict));
311 }
312
313 void lastsaveCommand(redisClient *c) {
314 addReplyLongLong(c,server.lastsave);
315 }
316
317 void typeCommand(redisClient *c) {
318 robj *o;
319 char *type;
320
321 o = lookupKeyRead(c->db,c->argv[1]);
322 if (o == NULL) {
323 type = "none";
324 } else {
325 switch(o->type) {
326 case REDIS_STRING: type = "string"; break;
327 case REDIS_LIST: type = "list"; break;
328 case REDIS_SET: type = "set"; break;
329 case REDIS_ZSET: type = "zset"; break;
330 case REDIS_HASH: type = "hash"; break;
331 default: type = "unknown"; break;
332 }
333 }
334 addReplyStatus(c,type);
335 }
336
337 void saveCommand(redisClient *c) {
338 if (server.bgsavechildpid != -1) {
339 addReplyError(c,"Background save already in progress");
340 return;
341 }
342 if (rdbSave(server.dbfilename) == REDIS_OK) {
343 addReply(c,shared.ok);
344 } else {
345 addReply(c,shared.err);
346 }
347 }
348
349 void bgsaveCommand(redisClient *c) {
350 if (server.bgsavechildpid != -1) {
351 addReplyError(c,"Background save already in progress");
352 return;
353 }
354 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
355 addReplyStatus(c,"Background saving started");
356 } else {
357 addReply(c,shared.err);
358 }
359 }
360
361 void shutdownCommand(redisClient *c) {
362 if (prepareForShutdown() == REDIS_OK)
363 exit(0);
364 addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
365 }
366
367 void renameGenericCommand(redisClient *c, int nx) {
368 robj *o;
369
370 /* To use the same key as src and dst is probably an error */
371 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
372 addReply(c,shared.sameobjecterr);
373 return;
374 }
375
376 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
377 return;
378
379 incrRefCount(o);
380 if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
381 if (nx) {
382 decrRefCount(o);
383 addReply(c,shared.czero);
384 return;
385 }
386 dbReplace(c->db,c->argv[2],o);
387 }
388 dbDelete(c->db,c->argv[1]);
389 signalModifiedKey(c->db,c->argv[1]);
390 signalModifiedKey(c->db,c->argv[2]);
391 server.dirty++;
392 addReply(c,nx ? shared.cone : shared.ok);
393 }
394
395 void renameCommand(redisClient *c) {
396 renameGenericCommand(c,0);
397 }
398
399 void renamenxCommand(redisClient *c) {
400 renameGenericCommand(c,1);
401 }
402
403 void moveCommand(redisClient *c) {
404 robj *o;
405 redisDb *src, *dst;
406 int srcid;
407
408 /* Obtain source and target DB pointers */
409 src = c->db;
410 srcid = c->db->id;
411 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
412 addReply(c,shared.outofrangeerr);
413 return;
414 }
415 dst = c->db;
416 selectDb(c,srcid); /* Back to the source DB */
417
418 /* If the user is moving using as target the same
419 * DB as the source DB it is probably an error. */
420 if (src == dst) {
421 addReply(c,shared.sameobjecterr);
422 return;
423 }
424
425 /* Check if the element exists and get a reference */
426 o = lookupKeyWrite(c->db,c->argv[1]);
427 if (!o) {
428 addReply(c,shared.czero);
429 return;
430 }
431
432 /* Try to add the element to the target DB */
433 if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
434 addReply(c,shared.czero);
435 return;
436 }
437 incrRefCount(o);
438
439 /* OK! key moved, free the entry in the source DB */
440 dbDelete(src,c->argv[1]);
441 server.dirty++;
442 addReply(c,shared.cone);
443 }
444
445 /*-----------------------------------------------------------------------------
446 * Expires API
447 *----------------------------------------------------------------------------*/
448
449 int removeExpire(redisDb *db, robj *key) {
450 /* An expire may only be removed if there is a corresponding entry in the
451 * main dict. Otherwise, the key will never be freed. */
452 redisAssert(dictFind(db->dict,key->ptr) != NULL);
453 return dictDelete(db->expires,key->ptr) == DICT_OK;
454 }
455
456 void setExpire(redisDb *db, robj *key, time_t when) {
457 dictEntry *de;
458
459 /* Reuse the sds from the main dict in the expire dict */
460 de = dictFind(db->dict,key->ptr);
461 redisAssert(de != NULL);
462 dictReplace(db->expires,dictGetEntryKey(de),(void*)when);
463 }
464
465 /* Return the expire time of the specified key, or -1 if no expire
466 * is associated with this key (i.e. the key is non volatile) */
467 time_t getExpire(redisDb *db, robj *key) {
468 dictEntry *de;
469
470 /* No expire? return ASAP */
471 if (dictSize(db->expires) == 0 ||
472 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
473
474 /* The entry was found in the expire dict, this means it should also
475 * be present in the main dict (safety check). */
476 redisAssert(dictFind(db->dict,key->ptr) != NULL);
477 return (time_t) dictGetEntryVal(de);
478 }
479
480 /* Propagate expires into slaves and the AOF file.
481 * When a key expires in the master, a DEL operation for this key is sent
482 * to all the slaves and the AOF file if enabled.
483 *
484 * This way the key expiry is centralized in one place, and since both
485 * AOF and the master->slave link guarantee operation ordering, everything
486 * will be consistent even if we allow write operations against expiring
487 * keys. */
488 void propagateExpire(redisDb *db, robj *key) {
489 robj *argv[2];
490
491 argv[0] = createStringObject("DEL",3);
492 argv[1] = key;
493 incrRefCount(key);
494
495 if (server.appendonly)
496 feedAppendOnlyFile(server.delCommand,db->id,argv,2);
497 if (listLength(server.slaves))
498 replicationFeedSlaves(server.slaves,db->id,argv,2);
499
500 decrRefCount(argv[0]);
501 decrRefCount(argv[1]);
502 }
503
504 int expireIfNeeded(redisDb *db, robj *key) {
505 time_t when = getExpire(db,key);
506
507 /* If we are running in the context of a slave, return ASAP:
508 * the slave key expiration is controlled by the master that will
509 * send us synthesized DEL operations for expired keys.
510 *
511 * Still we try to return the right information to the caller,
512 * that is, 0 if we think the key should be still valid, 1 if
513 * we think the key is expired at this time. */
514 if (server.masterhost != NULL) {
515 return time(NULL) > when;
516 }
517
518 if (when < 0) return 0;
519
520 /* Return when this key has not expired */
521 if (time(NULL) <= when) return 0;
522
523 /* Delete the key */
524 server.stat_expiredkeys++;
525 propagateExpire(db,key);
526 return dbDelete(db,key);
527 }
528
529 /*-----------------------------------------------------------------------------
530 * Expires Commands
531 *----------------------------------------------------------------------------*/
532
533 void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
534 dictEntry *de;
535 long seconds;
536
537 if (getLongFromObjectOrReply(c, param, &seconds, NULL) != REDIS_OK) return;
538
539 seconds -= offset;
540
541 de = dictFind(c->db->dict,key->ptr);
542 if (de == NULL) {
543 addReply(c,shared.czero);
544 return;
545 }
546 if (seconds <= 0) {
547 if (dbDelete(c->db,key)) server.dirty++;
548 addReply(c, shared.cone);
549 signalModifiedKey(c->db,key);
550 return;
551 } else {
552 time_t when = time(NULL)+seconds;
553 setExpire(c->db,key,when);
554 addReply(c,shared.cone);
555 signalModifiedKey(c->db,key);
556 server.dirty++;
557 return;
558 }
559 }
560
561 void expireCommand(redisClient *c) {
562 expireGenericCommand(c,c->argv[1],c->argv[2],0);
563 }
564
565 void expireatCommand(redisClient *c) {
566 expireGenericCommand(c,c->argv[1],c->argv[2],time(NULL));
567 }
568
569 void ttlCommand(redisClient *c) {
570 time_t expire, ttl = -1;
571
572 expire = getExpire(c->db,c->argv[1]);
573 if (expire != -1) {
574 ttl = (expire-time(NULL));
575 if (ttl < 0) ttl = -1;
576 }
577 addReplyLongLong(c,(long long)ttl);
578 }
579
580 void persistCommand(redisClient *c) {
581 dictEntry *de;
582
583 de = dictFind(c->db->dict,c->argv[1]->ptr);
584 if (de == NULL) {
585 addReply(c,shared.czero);
586 } else {
587 if (removeExpire(c->db,c->argv[1])) {
588 addReply(c,shared.cone);
589 server.dirty++;
590 } else {
591 addReply(c,shared.czero);
592 }
593 }
594 }