]> git.saurik.com Git - redis.git/blob - src/db.c
d545345cb9fdf893ad4546f99bfa4b1c8ccb364d
[redis.git] / src / db.c
1 #include "redis.h"
2
3 #include <signal.h>
4
5 /*-----------------------------------------------------------------------------
6 * C-level DB API
7 *----------------------------------------------------------------------------*/
8
9 robj *lookupKey(redisDb *db, robj *key) {
10 dictEntry *de = dictFind(db->dict,key->ptr);
11 if (de) {
12 robj *val = dictGetEntryVal(de);
13
14 /* Update the access time for the aging algorithm.
15 * Don't do it if we have a saving child, as this will trigger
16 * a copy on write madness. */
17 if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
18 val->lru = server.lruclock;
19
20 if (server.ds_enabled && val->storage == REDIS_DS_SAVING) {
21 /* FIXME: change this code to just wait for our object to
22 * get out of the IO Job. */
23 waitEmptyIOJobsQueue();
24 processAllPendingIOJobs();
25 redisAssert(val->storage != REDIS_DS_SAVING);
26 }
27 server.stat_keyspace_hits++;
28 return val;
29 } else {
30 /* FIXME: Check if the object is on disk, if it is, load it
31 * in a blocking way now. */
32 server.stat_keyspace_misses++;
33 return NULL;
34 }
35 }
36
37 robj *lookupKeyRead(redisDb *db, robj *key) {
38 expireIfNeeded(db,key);
39 return lookupKey(db,key);
40 }
41
42 robj *lookupKeyWrite(redisDb *db, robj *key) {
43 expireIfNeeded(db,key);
44 return lookupKey(db,key);
45 }
46
47 robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
48 robj *o = lookupKeyRead(c->db, key);
49 if (!o) addReply(c,reply);
50 return o;
51 }
52
53 robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
54 robj *o = lookupKeyWrite(c->db, key);
55 if (!o) addReply(c,reply);
56 return o;
57 }
58
59 /* Add the key to the DB. If the key already exists REDIS_ERR is returned,
60 * otherwise REDIS_OK is returned, and the caller should increment the
61 * refcount of 'val'. */
62 int dbAdd(redisDb *db, robj *key, robj *val) {
63 /* Perform a lookup before adding the key, as we need to copy the
64 * key value. */
65 if (dictFind(db->dict, key->ptr) != NULL) {
66 return REDIS_ERR;
67 } else {
68 sds copy = sdsdup(key->ptr);
69 dictAdd(db->dict, copy, val);
70 if (server.ds_enabled) {
71 /* FIXME: remove entry from negative cache */
72 }
73 return REDIS_OK;
74 }
75 }
76
77 /* If the key does not exist, this is just like dbAdd(). Otherwise
78 * the value associated to the key is replaced with the new one.
79 *
80 * On update (key already existed) 0 is returned. Otherwise 1. */
81 int dbReplace(redisDb *db, robj *key, robj *val) {
82 if (dictFind(db->dict,key->ptr) == NULL) {
83 sds copy = sdsdup(key->ptr);
84 dictAdd(db->dict, copy, val);
85 return 1;
86 } else {
87 dictReplace(db->dict, key->ptr, val);
88 return 0;
89 }
90 }
91
92 int dbExists(redisDb *db, robj *key) {
93 return dictFind(db->dict,key->ptr) != NULL;
94 }
95
96 /* Return a random key, in form of a Redis object.
97 * If there are no keys, NULL is returned.
98 *
99 * The function makes sure to return keys not already expired. */
100 robj *dbRandomKey(redisDb *db) {
101 struct dictEntry *de;
102
103 while(1) {
104 sds key;
105 robj *keyobj;
106
107 de = dictGetRandomKey(db->dict);
108 if (de == NULL) return NULL;
109
110 key = dictGetEntryKey(de);
111 keyobj = createStringObject(key,sdslen(key));
112 if (dictFind(db->expires,key)) {
113 if (expireIfNeeded(db,keyobj)) {
114 decrRefCount(keyobj);
115 continue; /* search for another key. This expired. */
116 }
117 }
118 return keyobj;
119 }
120 }
121
122 /* Delete a key, value, and associated expiration entry if any, from the DB */
123 int dbDelete(redisDb *db, robj *key) {
124 /* If VM is enabled make sure to awake waiting clients for this key:
125 * deleting the key will kill the I/O thread bringing the key from swap
126 * to memory, so the client will never be notified and unblocked if we
127 * don't do it now. */
128 if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
129
130 /* FIXME: we need to delete the IO Job loading the key, or simply we can
131 * wait for it to finish. */
132
133 /* Deleting an entry from the expires dict will not free the sds of
134 * the key, because it is shared with the main dictionary. */
135 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
136 return dictDelete(db->dict,key->ptr) == DICT_OK;
137 }
138
139 /* Empty the whole database */
140 long long emptyDb() {
141 int j;
142 long long removed = 0;
143
144 for (j = 0; j < server.dbnum; j++) {
145 removed += dictSize(server.db[j].dict);
146 dictEmpty(server.db[j].dict);
147 dictEmpty(server.db[j].expires);
148 }
149 return removed;
150 }
151
152 int selectDb(redisClient *c, int id) {
153 if (id < 0 || id >= server.dbnum)
154 return REDIS_ERR;
155 c->db = &server.db[id];
156 return REDIS_OK;
157 }
158
159 /*-----------------------------------------------------------------------------
160 * Hooks for key space changes.
161 *
162 * Every time a key in the database is modified the function
163 * signalModifiedKey() is called.
164 *
165 * Every time a DB is flushed the function signalFlushDb() is called.
166 *----------------------------------------------------------------------------*/
167
168 void signalModifiedKey(redisDb *db, robj *key) {
169 touchWatchedKey(db,key);
170 if (server.ds_enabled)
171 cacheScheduleForFlush(db,key);
172 }
173
174 void signalFlushedDb(int dbid) {
175 touchWatchedKeysOnFlush(dbid);
176 if (server.ds_enabled)
177 dsFlushDb(dbid);
178 }
179
180 /*-----------------------------------------------------------------------------
181 * Type agnostic commands operating on the key space
182 *----------------------------------------------------------------------------*/
183
184 void flushdbCommand(redisClient *c) {
185 server.dirty += dictSize(c->db->dict);
186 signalFlushedDb(c->db->id);
187 dictEmpty(c->db->dict);
188 dictEmpty(c->db->expires);
189 addReply(c,shared.ok);
190 }
191
192 void flushallCommand(redisClient *c) {
193 signalFlushedDb(-1);
194 server.dirty += emptyDb();
195 addReply(c,shared.ok);
196 if (server.bgsavechildpid != -1) {
197 kill(server.bgsavechildpid,SIGKILL);
198 rdbRemoveTempFile(server.bgsavechildpid);
199 }
200 rdbSave(server.dbfilename);
201 server.dirty++;
202 }
203
204 void delCommand(redisClient *c) {
205 int deleted = 0, j;
206
207 for (j = 1; j < c->argc; j++) {
208 if (server.ds_enabled) {
209 lookupKeyRead(c->db,c->argv[j]);
210 /* FIXME: this can be optimized a lot, no real need to load
211 * a possibly huge value. */
212 }
213 if (dbDelete(c->db,c->argv[j])) {
214 signalModifiedKey(c->db,c->argv[j]);
215 server.dirty++;
216 deleted++;
217 } else if (server.ds_enabled) {
218 if (cacheKeyMayExist(c->db,c->argv[j]) &&
219 dsExists(c->db,c->argv[j]))
220 {
221 cacheScheduleForFlush(c->db,c->argv[j]);
222 deleted = 1;
223 }
224 }
225 }
226 addReplyLongLong(c,deleted);
227 }
228
229 void existsCommand(redisClient *c) {
230 expireIfNeeded(c->db,c->argv[1]);
231 if (dbExists(c->db,c->argv[1])) {
232 addReply(c, shared.cone);
233 } else {
234 addReply(c, shared.czero);
235 }
236 }
237
238 void selectCommand(redisClient *c) {
239 int id = atoi(c->argv[1]->ptr);
240
241 if (selectDb(c,id) == REDIS_ERR) {
242 addReplyError(c,"invalid DB index");
243 } else {
244 addReply(c,shared.ok);
245 }
246 }
247
248 void randomkeyCommand(redisClient *c) {
249 robj *key;
250
251 if ((key = dbRandomKey(c->db)) == NULL) {
252 addReply(c,shared.nullbulk);
253 return;
254 }
255
256 addReplyBulk(c,key);
257 decrRefCount(key);
258 }
259
260 void keysCommand(redisClient *c) {
261 dictIterator *di;
262 dictEntry *de;
263 sds pattern = c->argv[1]->ptr;
264 int plen = sdslen(pattern), allkeys;
265 unsigned long numkeys = 0;
266 void *replylen = addDeferredMultiBulkLength(c);
267
268 di = dictGetIterator(c->db->dict);
269 allkeys = (pattern[0] == '*' && pattern[1] == '\0');
270 while((de = dictNext(di)) != NULL) {
271 sds key = dictGetEntryKey(de);
272 robj *keyobj;
273
274 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
275 keyobj = createStringObject(key,sdslen(key));
276 if (expireIfNeeded(c->db,keyobj) == 0) {
277 addReplyBulk(c,keyobj);
278 numkeys++;
279 }
280 decrRefCount(keyobj);
281 }
282 }
283 dictReleaseIterator(di);
284 setDeferredMultiBulkLength(c,replylen,numkeys);
285 }
286
287 void dbsizeCommand(redisClient *c) {
288 addReplyLongLong(c,dictSize(c->db->dict));
289 }
290
291 void lastsaveCommand(redisClient *c) {
292 addReplyLongLong(c,server.lastsave);
293 }
294
295 void typeCommand(redisClient *c) {
296 robj *o;
297 char *type;
298
299 o = lookupKeyRead(c->db,c->argv[1]);
300 if (o == NULL) {
301 type = "none";
302 } else {
303 switch(o->type) {
304 case REDIS_STRING: type = "string"; break;
305 case REDIS_LIST: type = "list"; break;
306 case REDIS_SET: type = "set"; break;
307 case REDIS_ZSET: type = "zset"; break;
308 case REDIS_HASH: type = "hash"; break;
309 default: type = "unknown"; break;
310 }
311 }
312 addReplyStatus(c,type);
313 }
314
315 void saveCommand(redisClient *c) {
316 if (server.bgsavechildpid != -1) {
317 addReplyError(c,"Background save already in progress");
318 return;
319 }
320 if (rdbSave(server.dbfilename) == REDIS_OK) {
321 addReply(c,shared.ok);
322 } else {
323 addReply(c,shared.err);
324 }
325 }
326
327 void bgsaveCommand(redisClient *c) {
328 if (server.bgsavechildpid != -1) {
329 addReplyError(c,"Background save already in progress");
330 return;
331 }
332 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
333 addReplyStatus(c,"Background saving started");
334 } else {
335 addReply(c,shared.err);
336 }
337 }
338
339 void shutdownCommand(redisClient *c) {
340 if (prepareForShutdown() == REDIS_OK)
341 exit(0);
342 addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
343 }
344
345 void renameGenericCommand(redisClient *c, int nx) {
346 robj *o;
347
348 /* To use the same key as src and dst is probably an error */
349 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
350 addReply(c,shared.sameobjecterr);
351 return;
352 }
353
354 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
355 return;
356
357 incrRefCount(o);
358 if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
359 if (nx) {
360 decrRefCount(o);
361 addReply(c,shared.czero);
362 return;
363 }
364 dbReplace(c->db,c->argv[2],o);
365 }
366 dbDelete(c->db,c->argv[1]);
367 signalModifiedKey(c->db,c->argv[1]);
368 signalModifiedKey(c->db,c->argv[2]);
369 server.dirty++;
370 addReply(c,nx ? shared.cone : shared.ok);
371 }
372
373 void renameCommand(redisClient *c) {
374 renameGenericCommand(c,0);
375 }
376
377 void renamenxCommand(redisClient *c) {
378 renameGenericCommand(c,1);
379 }
380
381 void moveCommand(redisClient *c) {
382 robj *o;
383 redisDb *src, *dst;
384 int srcid;
385
386 /* Obtain source and target DB pointers */
387 src = c->db;
388 srcid = c->db->id;
389 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
390 addReply(c,shared.outofrangeerr);
391 return;
392 }
393 dst = c->db;
394 selectDb(c,srcid); /* Back to the source DB */
395
396 /* If the user is moving using as target the same
397 * DB as the source DB it is probably an error. */
398 if (src == dst) {
399 addReply(c,shared.sameobjecterr);
400 return;
401 }
402
403 /* Check if the element exists and get a reference */
404 o = lookupKeyWrite(c->db,c->argv[1]);
405 if (!o) {
406 addReply(c,shared.czero);
407 return;
408 }
409
410 /* Try to add the element to the target DB */
411 if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
412 addReply(c,shared.czero);
413 return;
414 }
415 incrRefCount(o);
416
417 /* OK! key moved, free the entry in the source DB */
418 dbDelete(src,c->argv[1]);
419 server.dirty++;
420 addReply(c,shared.cone);
421 }
422
423 /*-----------------------------------------------------------------------------
424 * Expires API
425 *----------------------------------------------------------------------------*/
426
427 int removeExpire(redisDb *db, robj *key) {
428 /* An expire may only be removed if there is a corresponding entry in the
429 * main dict. Otherwise, the key will never be freed. */
430 redisAssert(dictFind(db->dict,key->ptr) != NULL);
431 return dictDelete(db->expires,key->ptr) == DICT_OK;
432 }
433
434 void setExpire(redisDb *db, robj *key, time_t when) {
435 dictEntry *de;
436
437 /* Reuse the sds from the main dict in the expire dict */
438 de = dictFind(db->dict,key->ptr);
439 redisAssert(de != NULL);
440 dictReplace(db->expires,dictGetEntryKey(de),(void*)when);
441 }
442
443 /* Return the expire time of the specified key, or -1 if no expire
444 * is associated with this key (i.e. the key is non volatile) */
445 time_t getExpire(redisDb *db, robj *key) {
446 dictEntry *de;
447
448 /* No expire? return ASAP */
449 if (dictSize(db->expires) == 0 ||
450 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
451
452 /* The entry was found in the expire dict, this means it should also
453 * be present in the main dict (safety check). */
454 redisAssert(dictFind(db->dict,key->ptr) != NULL);
455 return (time_t) dictGetEntryVal(de);
456 }
457
458 /* Propagate expires into slaves and the AOF file.
459 * When a key expires in the master, a DEL operation for this key is sent
460 * to all the slaves and the AOF file if enabled.
461 *
462 * This way the key expiry is centralized in one place, and since both
463 * AOF and the master->slave link guarantee operation ordering, everything
464 * will be consistent even if we allow write operations against expiring
465 * keys. */
466 void propagateExpire(redisDb *db, robj *key) {
467 robj *argv[2];
468
469 argv[0] = createStringObject("DEL",3);
470 argv[1] = key;
471 incrRefCount(key);
472
473 if (server.appendonly)
474 feedAppendOnlyFile(server.delCommand,db->id,argv,2);
475 if (listLength(server.slaves))
476 replicationFeedSlaves(server.slaves,db->id,argv,2);
477
478 decrRefCount(argv[0]);
479 decrRefCount(argv[1]);
480 }
481
482 int expireIfNeeded(redisDb *db, robj *key) {
483 time_t when = getExpire(db,key);
484
485 /* If we are running in the context of a slave, return ASAP:
486 * the slave key expiration is controlled by the master that will
487 * send us synthesized DEL operations for expired keys.
488 *
489 * Still we try to return the right information to the caller,
490 * that is, 0 if we think the key should be still valid, 1 if
491 * we think the key is expired at this time. */
492 if (server.masterhost != NULL) {
493 return time(NULL) > when;
494 }
495
496 if (when < 0) return 0;
497
498 /* Return when this key has not expired */
499 if (time(NULL) <= when) return 0;
500
501 /* Delete the key */
502 server.stat_expiredkeys++;
503 propagateExpire(db,key);
504 return dbDelete(db,key);
505 }
506
507 /*-----------------------------------------------------------------------------
508 * Expires Commands
509 *----------------------------------------------------------------------------*/
510
511 void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
512 dictEntry *de;
513 long seconds;
514
515 if (getLongFromObjectOrReply(c, param, &seconds, NULL) != REDIS_OK) return;
516
517 seconds -= offset;
518
519 de = dictFind(c->db->dict,key->ptr);
520 if (de == NULL) {
521 addReply(c,shared.czero);
522 return;
523 }
524 if (seconds <= 0) {
525 if (dbDelete(c->db,key)) server.dirty++;
526 addReply(c, shared.cone);
527 signalModifiedKey(c->db,key);
528 return;
529 } else {
530 time_t when = time(NULL)+seconds;
531 setExpire(c->db,key,when);
532 addReply(c,shared.cone);
533 signalModifiedKey(c->db,key);
534 server.dirty++;
535 return;
536 }
537 }
538
539 void expireCommand(redisClient *c) {
540 expireGenericCommand(c,c->argv[1],c->argv[2],0);
541 }
542
543 void expireatCommand(redisClient *c) {
544 expireGenericCommand(c,c->argv[1],c->argv[2],time(NULL));
545 }
546
547 void ttlCommand(redisClient *c) {
548 time_t expire, ttl = -1;
549
550 expire = getExpire(c->db,c->argv[1]);
551 if (expire != -1) {
552 ttl = (expire-time(NULL));
553 if (ttl < 0) ttl = -1;
554 }
555 addReplyLongLong(c,(long long)ttl);
556 }
557
558 void persistCommand(redisClient *c) {
559 dictEntry *de;
560
561 de = dictFind(c->db->dict,c->argv[1]->ptr);
562 if (de == NULL) {
563 addReply(c,shared.czero);
564 } else {
565 if (removeExpire(c->db,c->argv[1])) {
566 addReply(c,shared.cone);
567 server.dirty++;
568 } else {
569 addReply(c,shared.czero);
570 }
571 }
572 }