]> git.saurik.com Git - redis.git/blob - src/db.c
f360607765cbec7fe21371eea58272ad0d6b6d2e
[redis.git] / src / db.c
1 #include "redis.h"
2
3 #include <signal.h>
4
5 /*-----------------------------------------------------------------------------
6 * C-level DB API
7 *----------------------------------------------------------------------------*/
8
9 robj *lookupKey(redisDb *db, robj *key) {
10 dictEntry *de = dictFind(db->dict,key->ptr);
11 if (de) {
12 robj *val = dictGetEntryVal(de);
13
14 /* Update the access time for the aging algorithm.
15 * Don't do it if we have a saving child, as this will trigger
16 * a copy on write madness. */
17 if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
18 val->lru = server.lruclock;
19
20 if (server.ds_enabled && val->storage == REDIS_DS_SAVING) {
21 /* FIXME: change this code to just wait for our object to
22 * get out of the IO Job. */
23 waitEmptyIOJobsQueue();
24 processAllPendingIOJobs();
25 redisAssert(val->storage != REDIS_DS_SAVING);
26 }
27 server.stat_keyspace_hits++;
28 return val;
29 } else {
30 /* FIXME: Check if the object is on disk, if it is, load it
31 * in a blocking way now. */
32 server.stat_keyspace_misses++;
33 return NULL;
34 }
35 }
36
37 robj *lookupKeyRead(redisDb *db, robj *key) {
38 expireIfNeeded(db,key);
39 return lookupKey(db,key);
40 }
41
42 robj *lookupKeyWrite(redisDb *db, robj *key) {
43 expireIfNeeded(db,key);
44 return lookupKey(db,key);
45 }
46
47 robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
48 robj *o = lookupKeyRead(c->db, key);
49 if (!o) addReply(c,reply);
50 return o;
51 }
52
53 robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
54 robj *o = lookupKeyWrite(c->db, key);
55 if (!o) addReply(c,reply);
56 return o;
57 }
58
59 /* Add the key to the DB. If the key already exists REDIS_ERR is returned,
60 * otherwise REDIS_OK is returned, and the caller should increment the
61 * refcount of 'val'. */
62 int dbAdd(redisDb *db, robj *key, robj *val) {
63 /* Perform a lookup before adding the key, as we need to copy the
64 * key value. */
65 if (dictFind(db->dict, key->ptr) != NULL) {
66 return REDIS_ERR;
67 } else {
68 sds copy = sdsdup(key->ptr);
69 dictAdd(db->dict, copy, val);
70 if (server.ds_enabled) {
71 /* FIXME: remove entry from negative cache */
72 }
73 return REDIS_OK;
74 }
75 }
76
77 /* If the key does not exist, this is just like dbAdd(). Otherwise
78 * the value associated to the key is replaced with the new one.
79 *
80 * On update (key already existed) 0 is returned. Otherwise 1. */
81 int dbReplace(redisDb *db, robj *key, robj *val) {
82 if (dictFind(db->dict,key->ptr) == NULL) {
83 sds copy = sdsdup(key->ptr);
84 dictAdd(db->dict, copy, val);
85 return 1;
86 } else {
87 dictReplace(db->dict, key->ptr, val);
88 return 0;
89 }
90 }
91
92 int dbExists(redisDb *db, robj *key) {
93 return dictFind(db->dict,key->ptr) != NULL;
94 }
95
96 /* Return a random key, in form of a Redis object.
97 * If there are no keys, NULL is returned.
98 *
99 * The function makes sure to return keys not already expired. */
100 robj *dbRandomKey(redisDb *db) {
101 struct dictEntry *de;
102
103 while(1) {
104 sds key;
105 robj *keyobj;
106
107 de = dictGetRandomKey(db->dict);
108 if (de == NULL) return NULL;
109
110 key = dictGetEntryKey(de);
111 keyobj = createStringObject(key,sdslen(key));
112 if (dictFind(db->expires,key)) {
113 if (expireIfNeeded(db,keyobj)) {
114 decrRefCount(keyobj);
115 continue; /* search for another key. This expired. */
116 }
117 }
118 return keyobj;
119 }
120 }
121
122 /* Delete a key, value, and associated expiration entry if any, from the DB */
123 int dbDelete(redisDb *db, robj *key) {
124 /* If VM is enabled make sure to awake waiting clients for this key:
125 * deleting the key will kill the I/O thread bringing the key from swap
126 * to memory, so the client will never be notified and unblocked if we
127 * don't do it now. */
128 if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
129
130 /* FIXME: we need to delete the IO Job loading the key, or simply we can
131 * wait for it to finish. */
132
133 /* Deleting an entry from the expires dict will not free the sds of
134 * the key, because it is shared with the main dictionary. */
135 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
136 return dictDelete(db->dict,key->ptr) == DICT_OK;
137 }
138
139 /* Empty the whole database */
140 long long emptyDb() {
141 int j;
142 long long removed = 0;
143
144 for (j = 0; j < server.dbnum; j++) {
145 removed += dictSize(server.db[j].dict);
146 dictEmpty(server.db[j].dict);
147 dictEmpty(server.db[j].expires);
148 }
149 return removed;
150 }
151
152 int selectDb(redisClient *c, int id) {
153 if (id < 0 || id >= server.dbnum)
154 return REDIS_ERR;
155 c->db = &server.db[id];
156 return REDIS_OK;
157 }
158
159 /*-----------------------------------------------------------------------------
160 * Hooks for key space changes.
161 *
162 * Every time a key in the database is modified the function
163 * signalModifiedKey() is called.
164 *
165 * Every time a DB is flushed the function signalFlushDb() is called.
166 *----------------------------------------------------------------------------*/
167
168 void signalModifiedKey(redisDb *db, robj *key) {
169 touchWatchedKey(db,key);
170 if (server.ds_enabled)
171 cacheScheduleForFlush(db,key);
172 }
173
174 void signalFlushedDb(int dbid) {
175 touchWatchedKeysOnFlush(dbid);
176 if (server.ds_enabled)
177 dsFlushDb(dbid);
178 }
179
180 /*-----------------------------------------------------------------------------
181 * Type agnostic commands operating on the key space
182 *----------------------------------------------------------------------------*/
183
184 void flushdbCommand(redisClient *c) {
185 server.dirty += dictSize(c->db->dict);
186 signalFlushedDb(c->db->id);
187 dictEmpty(c->db->dict);
188 dictEmpty(c->db->expires);
189 addReply(c,shared.ok);
190 }
191
192 void flushallCommand(redisClient *c) {
193 signalFlushedDb(-1);
194 server.dirty += emptyDb();
195 addReply(c,shared.ok);
196 if (server.bgsavechildpid != -1) {
197 kill(server.bgsavechildpid,SIGKILL);
198 rdbRemoveTempFile(server.bgsavechildpid);
199 }
200 rdbSave(server.dbfilename);
201 server.dirty++;
202 }
203
204 void delCommand(redisClient *c) {
205 int deleted = 0, j;
206
207 for (j = 1; j < c->argc; j++) {
208 if (dbDelete(c->db,c->argv[j])) {
209 signalModifiedKey(c->db,c->argv[j]);
210 server.dirty++;
211 deleted++;
212 }
213 }
214 addReplyLongLong(c,deleted);
215 }
216
217 void existsCommand(redisClient *c) {
218 expireIfNeeded(c->db,c->argv[1]);
219 if (dbExists(c->db,c->argv[1])) {
220 addReply(c, shared.cone);
221 } else {
222 addReply(c, shared.czero);
223 }
224 }
225
226 void selectCommand(redisClient *c) {
227 int id = atoi(c->argv[1]->ptr);
228
229 if (selectDb(c,id) == REDIS_ERR) {
230 addReplyError(c,"invalid DB index");
231 } else {
232 addReply(c,shared.ok);
233 }
234 }
235
236 void randomkeyCommand(redisClient *c) {
237 robj *key;
238
239 if ((key = dbRandomKey(c->db)) == NULL) {
240 addReply(c,shared.nullbulk);
241 return;
242 }
243
244 addReplyBulk(c,key);
245 decrRefCount(key);
246 }
247
248 void keysCommand(redisClient *c) {
249 dictIterator *di;
250 dictEntry *de;
251 sds pattern = c->argv[1]->ptr;
252 int plen = sdslen(pattern), allkeys;
253 unsigned long numkeys = 0;
254 void *replylen = addDeferredMultiBulkLength(c);
255
256 di = dictGetIterator(c->db->dict);
257 allkeys = (pattern[0] == '*' && pattern[1] == '\0');
258 while((de = dictNext(di)) != NULL) {
259 sds key = dictGetEntryKey(de);
260 robj *keyobj;
261
262 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
263 keyobj = createStringObject(key,sdslen(key));
264 if (expireIfNeeded(c->db,keyobj) == 0) {
265 addReplyBulk(c,keyobj);
266 numkeys++;
267 }
268 decrRefCount(keyobj);
269 }
270 }
271 dictReleaseIterator(di);
272 setDeferredMultiBulkLength(c,replylen,numkeys);
273 }
274
275 void dbsizeCommand(redisClient *c) {
276 addReplyLongLong(c,dictSize(c->db->dict));
277 }
278
279 void lastsaveCommand(redisClient *c) {
280 addReplyLongLong(c,server.lastsave);
281 }
282
283 void typeCommand(redisClient *c) {
284 robj *o;
285 char *type;
286
287 o = lookupKeyRead(c->db,c->argv[1]);
288 if (o == NULL) {
289 type = "none";
290 } else {
291 switch(o->type) {
292 case REDIS_STRING: type = "string"; break;
293 case REDIS_LIST: type = "list"; break;
294 case REDIS_SET: type = "set"; break;
295 case REDIS_ZSET: type = "zset"; break;
296 case REDIS_HASH: type = "hash"; break;
297 default: type = "unknown"; break;
298 }
299 }
300 addReplyStatus(c,type);
301 }
302
303 void saveCommand(redisClient *c) {
304 if (server.bgsavechildpid != -1) {
305 addReplyError(c,"Background save already in progress");
306 return;
307 }
308 if (rdbSave(server.dbfilename) == REDIS_OK) {
309 addReply(c,shared.ok);
310 } else {
311 addReply(c,shared.err);
312 }
313 }
314
315 void bgsaveCommand(redisClient *c) {
316 if (server.bgsavechildpid != -1) {
317 addReplyError(c,"Background save already in progress");
318 return;
319 }
320 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
321 addReplyStatus(c,"Background saving started");
322 } else {
323 addReply(c,shared.err);
324 }
325 }
326
327 void shutdownCommand(redisClient *c) {
328 if (prepareForShutdown() == REDIS_OK)
329 exit(0);
330 addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
331 }
332
333 void renameGenericCommand(redisClient *c, int nx) {
334 robj *o;
335
336 /* To use the same key as src and dst is probably an error */
337 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
338 addReply(c,shared.sameobjecterr);
339 return;
340 }
341
342 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
343 return;
344
345 incrRefCount(o);
346 if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
347 if (nx) {
348 decrRefCount(o);
349 addReply(c,shared.czero);
350 return;
351 }
352 dbReplace(c->db,c->argv[2],o);
353 }
354 dbDelete(c->db,c->argv[1]);
355 signalModifiedKey(c->db,c->argv[1]);
356 signalModifiedKey(c->db,c->argv[2]);
357 server.dirty++;
358 addReply(c,nx ? shared.cone : shared.ok);
359 }
360
361 void renameCommand(redisClient *c) {
362 renameGenericCommand(c,0);
363 }
364
365 void renamenxCommand(redisClient *c) {
366 renameGenericCommand(c,1);
367 }
368
369 void moveCommand(redisClient *c) {
370 robj *o;
371 redisDb *src, *dst;
372 int srcid;
373
374 /* Obtain source and target DB pointers */
375 src = c->db;
376 srcid = c->db->id;
377 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
378 addReply(c,shared.outofrangeerr);
379 return;
380 }
381 dst = c->db;
382 selectDb(c,srcid); /* Back to the source DB */
383
384 /* If the user is moving using as target the same
385 * DB as the source DB it is probably an error. */
386 if (src == dst) {
387 addReply(c,shared.sameobjecterr);
388 return;
389 }
390
391 /* Check if the element exists and get a reference */
392 o = lookupKeyWrite(c->db,c->argv[1]);
393 if (!o) {
394 addReply(c,shared.czero);
395 return;
396 }
397
398 /* Try to add the element to the target DB */
399 if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
400 addReply(c,shared.czero);
401 return;
402 }
403 incrRefCount(o);
404
405 /* OK! key moved, free the entry in the source DB */
406 dbDelete(src,c->argv[1]);
407 server.dirty++;
408 addReply(c,shared.cone);
409 }
410
411 /*-----------------------------------------------------------------------------
412 * Expires API
413 *----------------------------------------------------------------------------*/
414
415 int removeExpire(redisDb *db, robj *key) {
416 /* An expire may only be removed if there is a corresponding entry in the
417 * main dict. Otherwise, the key will never be freed. */
418 redisAssert(dictFind(db->dict,key->ptr) != NULL);
419 return dictDelete(db->expires,key->ptr) == DICT_OK;
420 }
421
422 void setExpire(redisDb *db, robj *key, time_t when) {
423 dictEntry *de;
424
425 /* Reuse the sds from the main dict in the expire dict */
426 de = dictFind(db->dict,key->ptr);
427 redisAssert(de != NULL);
428 dictReplace(db->expires,dictGetEntryKey(de),(void*)when);
429 }
430
431 /* Return the expire time of the specified key, or -1 if no expire
432 * is associated with this key (i.e. the key is non volatile) */
433 time_t getExpire(redisDb *db, robj *key) {
434 dictEntry *de;
435
436 /* No expire? return ASAP */
437 if (dictSize(db->expires) == 0 ||
438 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
439
440 /* The entry was found in the expire dict, this means it should also
441 * be present in the main dict (safety check). */
442 redisAssert(dictFind(db->dict,key->ptr) != NULL);
443 return (time_t) dictGetEntryVal(de);
444 }
445
446 /* Propagate expires into slaves and the AOF file.
447 * When a key expires in the master, a DEL operation for this key is sent
448 * to all the slaves and the AOF file if enabled.
449 *
450 * This way the key expiry is centralized in one place, and since both
451 * AOF and the master->slave link guarantee operation ordering, everything
452 * will be consistent even if we allow write operations against expiring
453 * keys. */
454 void propagateExpire(redisDb *db, robj *key) {
455 robj *argv[2];
456
457 argv[0] = createStringObject("DEL",3);
458 argv[1] = key;
459 incrRefCount(key);
460
461 if (server.appendonly)
462 feedAppendOnlyFile(server.delCommand,db->id,argv,2);
463 if (listLength(server.slaves))
464 replicationFeedSlaves(server.slaves,db->id,argv,2);
465
466 decrRefCount(argv[0]);
467 decrRefCount(argv[1]);
468 }
469
470 int expireIfNeeded(redisDb *db, robj *key) {
471 time_t when = getExpire(db,key);
472
473 /* If we are running in the context of a slave, return ASAP:
474 * the slave key expiration is controlled by the master that will
475 * send us synthesized DEL operations for expired keys.
476 *
477 * Still we try to return the right information to the caller,
478 * that is, 0 if we think the key should be still valid, 1 if
479 * we think the key is expired at this time. */
480 if (server.masterhost != NULL) {
481 return time(NULL) > when;
482 }
483
484 if (when < 0) return 0;
485
486 /* Return when this key has not expired */
487 if (time(NULL) <= when) return 0;
488
489 /* Delete the key */
490 server.stat_expiredkeys++;
491 propagateExpire(db,key);
492 return dbDelete(db,key);
493 }
494
495 /*-----------------------------------------------------------------------------
496 * Expires Commands
497 *----------------------------------------------------------------------------*/
498
499 void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
500 dictEntry *de;
501 long seconds;
502
503 if (getLongFromObjectOrReply(c, param, &seconds, NULL) != REDIS_OK) return;
504
505 seconds -= offset;
506
507 de = dictFind(c->db->dict,key->ptr);
508 if (de == NULL) {
509 addReply(c,shared.czero);
510 return;
511 }
512 if (seconds <= 0) {
513 if (dbDelete(c->db,key)) server.dirty++;
514 addReply(c, shared.cone);
515 signalModifiedKey(c->db,key);
516 return;
517 } else {
518 time_t when = time(NULL)+seconds;
519 setExpire(c->db,key,when);
520 addReply(c,shared.cone);
521 signalModifiedKey(c->db,key);
522 server.dirty++;
523 return;
524 }
525 }
526
527 void expireCommand(redisClient *c) {
528 expireGenericCommand(c,c->argv[1],c->argv[2],0);
529 }
530
531 void expireatCommand(redisClient *c) {
532 expireGenericCommand(c,c->argv[1],c->argv[2],time(NULL));
533 }
534
535 void ttlCommand(redisClient *c) {
536 time_t expire, ttl = -1;
537
538 expire = getExpire(c->db,c->argv[1]);
539 if (expire != -1) {
540 ttl = (expire-time(NULL));
541 if (ttl < 0) ttl = -1;
542 }
543 addReplyLongLong(c,(long long)ttl);
544 }
545
546 void persistCommand(redisClient *c) {
547 dictEntry *de;
548
549 de = dictFind(c->db->dict,c->argv[1]->ptr);
550 if (de == NULL) {
551 addReply(c,shared.czero);
552 } else {
553 if (removeExpire(c->db,c->argv[1])) {
554 addReply(c,shared.cone);
555 server.dirty++;
556 } else {
557 addReply(c,shared.czero);
558 }
559 }
560 }