]> git.saurik.com Git - redis.git/blob - src/db.c
cf99bbb299780f3ac81e8e53b908d6d0afc28235
[redis.git] / src / db.c
1 #include "redis.h"
2
3 #include <signal.h>
4
5 /*-----------------------------------------------------------------------------
6 * C-level DB API
7 *----------------------------------------------------------------------------*/
8
9 robj *lookupKey(redisDb *db, robj *key) {
10 dictEntry *de = dictFind(db->dict,key->ptr);
11 if (de) {
12 robj *val = dictGetEntryVal(de);
13
14 /* Update the access time for the aging algorithm.
15 * Don't do it if we have a saving child, as this will trigger
16 * a copy on write madness. */
17 if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
18 val->lru = server.lruclock;
19
20 if (server.ds_enabled && val->storage == REDIS_DS_SAVING) {
21 /* FIXME: change this code to just wait for our object to
22 * get out of the IO Job. */
23 waitEmptyIOJobsQueue();
24 redisAssert(val->storage != REDIS_DS_SAVING);
25 }
26 server.stat_keyspace_hits++;
27 return val;
28 } else {
29 /* FIXME: Check if the object is on disk, if it is, load it
30 * in a blocking way now. */
31 server.stat_keyspace_misses++;
32 return NULL;
33 }
34 }
35
36 robj *lookupKeyRead(redisDb *db, robj *key) {
37 expireIfNeeded(db,key);
38 return lookupKey(db,key);
39 }
40
41 robj *lookupKeyWrite(redisDb *db, robj *key) {
42 expireIfNeeded(db,key);
43 return lookupKey(db,key);
44 }
45
46 robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
47 robj *o = lookupKeyRead(c->db, key);
48 if (!o) addReply(c,reply);
49 return o;
50 }
51
52 robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
53 robj *o = lookupKeyWrite(c->db, key);
54 if (!o) addReply(c,reply);
55 return o;
56 }
57
58 /* Add the key to the DB. If the key already exists REDIS_ERR is returned,
59 * otherwise REDIS_OK is returned, and the caller should increment the
60 * refcount of 'val'. */
61 int dbAdd(redisDb *db, robj *key, robj *val) {
62 /* Perform a lookup before adding the key, as we need to copy the
63 * key value. */
64 if (dictFind(db->dict, key->ptr) != NULL) {
65 return REDIS_ERR;
66 } else {
67 sds copy = sdsdup(key->ptr);
68 dictAdd(db->dict, copy, val);
69 if (server.ds_enabled) {
70 /* FIXME: remove entry from negative cache */
71 }
72 return REDIS_OK;
73 }
74 }
75
76 /* If the key does not exist, this is just like dbAdd(). Otherwise
77 * the value associated to the key is replaced with the new one.
78 *
79 * On update (key already existed) 0 is returned. Otherwise 1. */
80 int dbReplace(redisDb *db, robj *key, robj *val) {
81 if (dictFind(db->dict,key->ptr) == NULL) {
82 sds copy = sdsdup(key->ptr);
83 dictAdd(db->dict, copy, val);
84 return 1;
85 } else {
86 dictReplace(db->dict, key->ptr, val);
87 return 0;
88 }
89 }
90
91 int dbExists(redisDb *db, robj *key) {
92 return dictFind(db->dict,key->ptr) != NULL;
93 }
94
95 /* Return a random key, in form of a Redis object.
96 * If there are no keys, NULL is returned.
97 *
98 * The function makes sure to return keys not already expired. */
99 robj *dbRandomKey(redisDb *db) {
100 struct dictEntry *de;
101
102 while(1) {
103 sds key;
104 robj *keyobj;
105
106 de = dictGetRandomKey(db->dict);
107 if (de == NULL) return NULL;
108
109 key = dictGetEntryKey(de);
110 keyobj = createStringObject(key,sdslen(key));
111 if (dictFind(db->expires,key)) {
112 if (expireIfNeeded(db,keyobj)) {
113 decrRefCount(keyobj);
114 continue; /* search for another key. This expired. */
115 }
116 }
117 return keyobj;
118 }
119 }
120
121 /* Delete a key, value, and associated expiration entry if any, from the DB */
122 int dbDelete(redisDb *db, robj *key) {
123 /* If VM is enabled make sure to awake waiting clients for this key:
124 * deleting the key will kill the I/O thread bringing the key from swap
125 * to memory, so the client will never be notified and unblocked if we
126 * don't do it now. */
127 if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
128
129 /* FIXME: we need to delete the IO Job loading the key, or simply we can
130 * wait for it to finish. */
131
132 /* Deleting an entry from the expires dict will not free the sds of
133 * the key, because it is shared with the main dictionary. */
134 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
135 return dictDelete(db->dict,key->ptr) == DICT_OK;
136 }
137
138 /* Empty the whole database */
139 long long emptyDb() {
140 int j;
141 long long removed = 0;
142
143 for (j = 0; j < server.dbnum; j++) {
144 removed += dictSize(server.db[j].dict);
145 dictEmpty(server.db[j].dict);
146 dictEmpty(server.db[j].expires);
147 }
148 return removed;
149 }
150
151 int selectDb(redisClient *c, int id) {
152 if (id < 0 || id >= server.dbnum)
153 return REDIS_ERR;
154 c->db = &server.db[id];
155 return REDIS_OK;
156 }
157
158 /*-----------------------------------------------------------------------------
159 * Hooks for key space changes.
160 *
161 * Every time a key in the database is modified the function
162 * signalModifiedKey() is called.
163 *
164 * Every time a DB is flushed the function signalFlushDb() is called.
165 *----------------------------------------------------------------------------*/
166
167 void signalModifiedKey(redisDb *db, robj *key) {
168 touchWatchedKey(db,key);
169 if (server.ds_enabled)
170 cacheScheduleForFlush(db,key);
171 }
172
173 void signalFlushedDb(int dbid) {
174 touchWatchedKeysOnFlush(dbid);
175 if (server.ds_enabled)
176 dsFlushDb(dbid);
177 }
178
179 /*-----------------------------------------------------------------------------
180 * Type agnostic commands operating on the key space
181 *----------------------------------------------------------------------------*/
182
183 void flushdbCommand(redisClient *c) {
184 server.dirty += dictSize(c->db->dict);
185 signalFlushedDb(c->db->id);
186 dictEmpty(c->db->dict);
187 dictEmpty(c->db->expires);
188 addReply(c,shared.ok);
189 }
190
191 void flushallCommand(redisClient *c) {
192 signalFlushedDb(-1);
193 server.dirty += emptyDb();
194 addReply(c,shared.ok);
195 if (server.bgsavechildpid != -1) {
196 kill(server.bgsavechildpid,SIGKILL);
197 rdbRemoveTempFile(server.bgsavechildpid);
198 }
199 rdbSave(server.dbfilename);
200 server.dirty++;
201 }
202
203 void delCommand(redisClient *c) {
204 int deleted = 0, j;
205
206 for (j = 1; j < c->argc; j++) {
207 if (dbDelete(c->db,c->argv[j])) {
208 signalModifiedKey(c->db,c->argv[j]);
209 server.dirty++;
210 deleted++;
211 }
212 }
213 addReplyLongLong(c,deleted);
214 }
215
216 void existsCommand(redisClient *c) {
217 expireIfNeeded(c->db,c->argv[1]);
218 if (dbExists(c->db,c->argv[1])) {
219 addReply(c, shared.cone);
220 } else {
221 addReply(c, shared.czero);
222 }
223 }
224
225 void selectCommand(redisClient *c) {
226 int id = atoi(c->argv[1]->ptr);
227
228 if (selectDb(c,id) == REDIS_ERR) {
229 addReplyError(c,"invalid DB index");
230 } else {
231 addReply(c,shared.ok);
232 }
233 }
234
235 void randomkeyCommand(redisClient *c) {
236 robj *key;
237
238 if ((key = dbRandomKey(c->db)) == NULL) {
239 addReply(c,shared.nullbulk);
240 return;
241 }
242
243 addReplyBulk(c,key);
244 decrRefCount(key);
245 }
246
247 void keysCommand(redisClient *c) {
248 dictIterator *di;
249 dictEntry *de;
250 sds pattern = c->argv[1]->ptr;
251 int plen = sdslen(pattern), allkeys;
252 unsigned long numkeys = 0;
253 void *replylen = addDeferredMultiBulkLength(c);
254
255 di = dictGetIterator(c->db->dict);
256 allkeys = (pattern[0] == '*' && pattern[1] == '\0');
257 while((de = dictNext(di)) != NULL) {
258 sds key = dictGetEntryKey(de);
259 robj *keyobj;
260
261 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
262 keyobj = createStringObject(key,sdslen(key));
263 if (expireIfNeeded(c->db,keyobj) == 0) {
264 addReplyBulk(c,keyobj);
265 numkeys++;
266 }
267 decrRefCount(keyobj);
268 }
269 }
270 dictReleaseIterator(di);
271 setDeferredMultiBulkLength(c,replylen,numkeys);
272 }
273
274 void dbsizeCommand(redisClient *c) {
275 addReplyLongLong(c,dictSize(c->db->dict));
276 }
277
278 void lastsaveCommand(redisClient *c) {
279 addReplyLongLong(c,server.lastsave);
280 }
281
282 void typeCommand(redisClient *c) {
283 robj *o;
284 char *type;
285
286 o = lookupKeyRead(c->db,c->argv[1]);
287 if (o == NULL) {
288 type = "none";
289 } else {
290 switch(o->type) {
291 case REDIS_STRING: type = "string"; break;
292 case REDIS_LIST: type = "list"; break;
293 case REDIS_SET: type = "set"; break;
294 case REDIS_ZSET: type = "zset"; break;
295 case REDIS_HASH: type = "hash"; break;
296 default: type = "unknown"; break;
297 }
298 }
299 addReplyStatus(c,type);
300 }
301
302 void saveCommand(redisClient *c) {
303 if (server.bgsavechildpid != -1) {
304 addReplyError(c,"Background save already in progress");
305 return;
306 }
307 if (rdbSave(server.dbfilename) == REDIS_OK) {
308 addReply(c,shared.ok);
309 } else {
310 addReply(c,shared.err);
311 }
312 }
313
314 void bgsaveCommand(redisClient *c) {
315 if (server.bgsavechildpid != -1) {
316 addReplyError(c,"Background save already in progress");
317 return;
318 }
319 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
320 addReplyStatus(c,"Background saving started");
321 } else {
322 addReply(c,shared.err);
323 }
324 }
325
326 void shutdownCommand(redisClient *c) {
327 if (prepareForShutdown() == REDIS_OK)
328 exit(0);
329 addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
330 }
331
332 void renameGenericCommand(redisClient *c, int nx) {
333 robj *o;
334
335 /* To use the same key as src and dst is probably an error */
336 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
337 addReply(c,shared.sameobjecterr);
338 return;
339 }
340
341 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
342 return;
343
344 incrRefCount(o);
345 if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
346 if (nx) {
347 decrRefCount(o);
348 addReply(c,shared.czero);
349 return;
350 }
351 dbReplace(c->db,c->argv[2],o);
352 }
353 dbDelete(c->db,c->argv[1]);
354 signalModifiedKey(c->db,c->argv[1]);
355 signalModifiedKey(c->db,c->argv[2]);
356 server.dirty++;
357 addReply(c,nx ? shared.cone : shared.ok);
358 }
359
360 void renameCommand(redisClient *c) {
361 renameGenericCommand(c,0);
362 }
363
364 void renamenxCommand(redisClient *c) {
365 renameGenericCommand(c,1);
366 }
367
368 void moveCommand(redisClient *c) {
369 robj *o;
370 redisDb *src, *dst;
371 int srcid;
372
373 /* Obtain source and target DB pointers */
374 src = c->db;
375 srcid = c->db->id;
376 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
377 addReply(c,shared.outofrangeerr);
378 return;
379 }
380 dst = c->db;
381 selectDb(c,srcid); /* Back to the source DB */
382
383 /* If the user is moving using as target the same
384 * DB as the source DB it is probably an error. */
385 if (src == dst) {
386 addReply(c,shared.sameobjecterr);
387 return;
388 }
389
390 /* Check if the element exists and get a reference */
391 o = lookupKeyWrite(c->db,c->argv[1]);
392 if (!o) {
393 addReply(c,shared.czero);
394 return;
395 }
396
397 /* Try to add the element to the target DB */
398 if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
399 addReply(c,shared.czero);
400 return;
401 }
402 incrRefCount(o);
403
404 /* OK! key moved, free the entry in the source DB */
405 dbDelete(src,c->argv[1]);
406 server.dirty++;
407 addReply(c,shared.cone);
408 }
409
410 /*-----------------------------------------------------------------------------
411 * Expires API
412 *----------------------------------------------------------------------------*/
413
414 int removeExpire(redisDb *db, robj *key) {
415 /* An expire may only be removed if there is a corresponding entry in the
416 * main dict. Otherwise, the key will never be freed. */
417 redisAssert(dictFind(db->dict,key->ptr) != NULL);
418 return dictDelete(db->expires,key->ptr) == DICT_OK;
419 }
420
421 void setExpire(redisDb *db, robj *key, time_t when) {
422 dictEntry *de;
423
424 /* Reuse the sds from the main dict in the expire dict */
425 de = dictFind(db->dict,key->ptr);
426 redisAssert(de != NULL);
427 dictReplace(db->expires,dictGetEntryKey(de),(void*)when);
428 }
429
430 /* Return the expire time of the specified key, or -1 if no expire
431 * is associated with this key (i.e. the key is non volatile) */
432 time_t getExpire(redisDb *db, robj *key) {
433 dictEntry *de;
434
435 /* No expire? return ASAP */
436 if (dictSize(db->expires) == 0 ||
437 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
438
439 /* The entry was found in the expire dict, this means it should also
440 * be present in the main dict (safety check). */
441 redisAssert(dictFind(db->dict,key->ptr) != NULL);
442 return (time_t) dictGetEntryVal(de);
443 }
444
445 /* Propagate expires into slaves and the AOF file.
446 * When a key expires in the master, a DEL operation for this key is sent
447 * to all the slaves and the AOF file if enabled.
448 *
449 * This way the key expiry is centralized in one place, and since both
450 * AOF and the master->slave link guarantee operation ordering, everything
451 * will be consistent even if we allow write operations against expiring
452 * keys. */
453 void propagateExpire(redisDb *db, robj *key) {
454 robj *argv[2];
455
456 argv[0] = createStringObject("DEL",3);
457 argv[1] = key;
458 incrRefCount(key);
459
460 if (server.appendonly)
461 feedAppendOnlyFile(server.delCommand,db->id,argv,2);
462 if (listLength(server.slaves))
463 replicationFeedSlaves(server.slaves,db->id,argv,2);
464
465 decrRefCount(argv[0]);
466 decrRefCount(argv[1]);
467 }
468
469 int expireIfNeeded(redisDb *db, robj *key) {
470 time_t when = getExpire(db,key);
471
472 /* If we are running in the context of a slave, return ASAP:
473 * the slave key expiration is controlled by the master that will
474 * send us synthesized DEL operations for expired keys.
475 *
476 * Still we try to return the right information to the caller,
477 * that is, 0 if we think the key should be still valid, 1 if
478 * we think the key is expired at this time. */
479 if (server.masterhost != NULL) {
480 return time(NULL) > when;
481 }
482
483 if (when < 0) return 0;
484
485 /* Return when this key has not expired */
486 if (time(NULL) <= when) return 0;
487
488 /* Delete the key */
489 server.stat_expiredkeys++;
490 propagateExpire(db,key);
491 return dbDelete(db,key);
492 }
493
494 /*-----------------------------------------------------------------------------
495 * Expires Commands
496 *----------------------------------------------------------------------------*/
497
498 void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
499 dictEntry *de;
500 long seconds;
501
502 if (getLongFromObjectOrReply(c, param, &seconds, NULL) != REDIS_OK) return;
503
504 seconds -= offset;
505
506 de = dictFind(c->db->dict,key->ptr);
507 if (de == NULL) {
508 addReply(c,shared.czero);
509 return;
510 }
511 if (seconds <= 0) {
512 if (dbDelete(c->db,key)) server.dirty++;
513 addReply(c, shared.cone);
514 signalModifiedKey(c->db,key);
515 return;
516 } else {
517 time_t when = time(NULL)+seconds;
518 setExpire(c->db,key,when);
519 addReply(c,shared.cone);
520 signalModifiedKey(c->db,key);
521 server.dirty++;
522 return;
523 }
524 }
525
526 void expireCommand(redisClient *c) {
527 expireGenericCommand(c,c->argv[1],c->argv[2],0);
528 }
529
530 void expireatCommand(redisClient *c) {
531 expireGenericCommand(c,c->argv[1],c->argv[2],time(NULL));
532 }
533
534 void ttlCommand(redisClient *c) {
535 time_t expire, ttl = -1;
536
537 expire = getExpire(c->db,c->argv[1]);
538 if (expire != -1) {
539 ttl = (expire-time(NULL));
540 if (ttl < 0) ttl = -1;
541 }
542 addReplyLongLong(c,(long long)ttl);
543 }
544
545 void persistCommand(redisClient *c) {
546 dictEntry *de;
547
548 de = dictFind(c->db->dict,c->argv[1]->ptr);
549 if (de == NULL) {
550 addReply(c,shared.czero);
551 } else {
552 if (removeExpire(c->db,c->argv[1])) {
553 addReply(c,shared.cone);
554 server.dirty++;
555 } else {
556 addReply(c,shared.czero);
557 }
558 }
559 }