]> git.saurik.com Git - redis.git/blob - src/db.c
implemented a different approach to IO scheduling, so object->storage is no longer...
[redis.git] / src / db.c
1 #include "redis.h"
2
3 #include <signal.h>
4
5 /*-----------------------------------------------------------------------------
6 * C-level DB API
7 *----------------------------------------------------------------------------*/
8
9 robj *lookupKey(redisDb *db, robj *key) {
10 dictEntry *de = dictFind(db->dict,key->ptr);
11 if (de) {
12 robj *val = dictGetEntryVal(de);
13
14 /* Update the access time for the aging algorithm.
15 * Don't do it if we have a saving child, as this will trigger
16 * a copy on write madness. */
17 if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
18 val->lru = server.lruclock;
19
20 if (server.ds_enabled &&
21 cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG)
22 {
23 /* There is a save in progress for this object!
24 * Wait for it to get out. */
25 waitEmptyIOJobsQueue();
26 processAllPendingIOJobs();
27 redisAssert(!(cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG));
28 }
29 server.stat_keyspace_hits++;
30 return val;
31 } else {
32 time_t expire;
33 robj *val;
34
35 /* Key not found in the in memory hash table, but if disk store is
36 * enabled we may have this key on disk. If so load it in memory
37 * in a blocking way.
38 *
39 * FIXME: race condition here. If there was an already scheduled
40 * async loading of this key, what may happen is that the old
41 * key is loaded in memory if this gets deleted in the meantime. */
42 if (server.ds_enabled && cacheKeyMayExist(db,key)) {
43 redisLog(REDIS_DEBUG,"Force loading key %s via lookup",
44 key->ptr);
45 val = dsGet(db,key,&expire);
46 if (val) {
47 int retval = dbAdd(db,key,val);
48 redisAssert(retval == REDIS_OK);
49 if (expire != -1) setExpire(db,key,expire);
50 server.stat_keyspace_hits++;
51 return val;
52 }
53 }
54 server.stat_keyspace_misses++;
55 return NULL;
56 }
57 }
58
59 robj *lookupKeyRead(redisDb *db, robj *key) {
60 expireIfNeeded(db,key);
61 return lookupKey(db,key);
62 }
63
64 robj *lookupKeyWrite(redisDb *db, robj *key) {
65 expireIfNeeded(db,key);
66 return lookupKey(db,key);
67 }
68
69 robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
70 robj *o = lookupKeyRead(c->db, key);
71 if (!o) addReply(c,reply);
72 return o;
73 }
74
75 robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
76 robj *o = lookupKeyWrite(c->db, key);
77 if (!o) addReply(c,reply);
78 return o;
79 }
80
81 /* Add the key to the DB. If the key already exists REDIS_ERR is returned,
82 * otherwise REDIS_OK is returned, and the caller should increment the
83 * refcount of 'val'. */
84 int dbAdd(redisDb *db, robj *key, robj *val) {
85 /* Perform a lookup before adding the key, as we need to copy the
86 * key value. */
87 if (dictFind(db->dict, key->ptr) != NULL) {
88 return REDIS_ERR;
89 } else {
90 sds copy = sdsdup(key->ptr);
91 dictAdd(db->dict, copy, val);
92 if (server.ds_enabled) {
93 /* FIXME: remove entry from negative cache */
94 }
95 return REDIS_OK;
96 }
97 }
98
99 /* If the key does not exist, this is just like dbAdd(). Otherwise
100 * the value associated to the key is replaced with the new one.
101 *
102 * On update (key already existed) 0 is returned. Otherwise 1. */
103 int dbReplace(redisDb *db, robj *key, robj *val) {
104 robj *oldval;
105
106 if ((oldval = dictFetchValue(db->dict,key->ptr)) == NULL) {
107 sds copy = sdsdup(key->ptr);
108 dictAdd(db->dict, copy, val);
109 return 1;
110 } else {
111 dictReplace(db->dict, key->ptr, val);
112 return 0;
113 }
114 }
115
116 int dbExists(redisDb *db, robj *key) {
117 return dictFind(db->dict,key->ptr) != NULL;
118 }
119
120 /* Return a random key, in form of a Redis object.
121 * If there are no keys, NULL is returned.
122 *
123 * The function makes sure to return keys not already expired. */
124 robj *dbRandomKey(redisDb *db) {
125 struct dictEntry *de;
126
127 while(1) {
128 sds key;
129 robj *keyobj;
130
131 de = dictGetRandomKey(db->dict);
132 if (de == NULL) return NULL;
133
134 key = dictGetEntryKey(de);
135 keyobj = createStringObject(key,sdslen(key));
136 if (dictFind(db->expires,key)) {
137 if (expireIfNeeded(db,keyobj)) {
138 decrRefCount(keyobj);
139 continue; /* search for another key. This expired. */
140 }
141 }
142 return keyobj;
143 }
144 }
145
146 /* Delete a key, value, and associated expiration entry if any, from the DB */
147 int dbDelete(redisDb *db, robj *key) {
148 /* If diskstore is enabled make sure to awake waiting clients for this key
149 * as it is not really useful to wait for a key already deleted to be
150 * loaded from disk. */
151 if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
152
153 /* FIXME: we should mark this key as non existing on disk in the negative
154 * cache. */
155
156 /* Deleting an entry from the expires dict will not free the sds of
157 * the key, because it is shared with the main dictionary. */
158 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
159 return dictDelete(db->dict,key->ptr) == DICT_OK;
160 }
161
162 /* Empty the whole database */
163 long long emptyDb() {
164 int j;
165 long long removed = 0;
166
167 for (j = 0; j < server.dbnum; j++) {
168 removed += dictSize(server.db[j].dict);
169 dictEmpty(server.db[j].dict);
170 dictEmpty(server.db[j].expires);
171 }
172 return removed;
173 }
174
175 int selectDb(redisClient *c, int id) {
176 if (id < 0 || id >= server.dbnum)
177 return REDIS_ERR;
178 c->db = &server.db[id];
179 return REDIS_OK;
180 }
181
182 /*-----------------------------------------------------------------------------
183 * Hooks for key space changes.
184 *
185 * Every time a key in the database is modified the function
186 * signalModifiedKey() is called.
187 *
188 * Every time a DB is flushed the function signalFlushDb() is called.
189 *----------------------------------------------------------------------------*/
190
191 void signalModifiedKey(redisDb *db, robj *key) {
192 touchWatchedKey(db,key);
193 if (server.ds_enabled)
194 cacheScheduleIO(db,key,REDIS_IO_SAVE);
195 }
196
197 void signalFlushedDb(int dbid) {
198 touchWatchedKeysOnFlush(dbid);
199 if (server.ds_enabled)
200 dsFlushDb(dbid);
201 }
202
203 /*-----------------------------------------------------------------------------
204 * Type agnostic commands operating on the key space
205 *----------------------------------------------------------------------------*/
206
207 void flushdbCommand(redisClient *c) {
208 server.dirty += dictSize(c->db->dict);
209 signalFlushedDb(c->db->id);
210 dictEmpty(c->db->dict);
211 dictEmpty(c->db->expires);
212 addReply(c,shared.ok);
213 }
214
215 void flushallCommand(redisClient *c) {
216 signalFlushedDb(-1);
217 server.dirty += emptyDb();
218 addReply(c,shared.ok);
219 if (server.bgsavechildpid != -1) {
220 kill(server.bgsavechildpid,SIGKILL);
221 rdbRemoveTempFile(server.bgsavechildpid);
222 }
223 rdbSave(server.dbfilename);
224 server.dirty++;
225 }
226
227 void delCommand(redisClient *c) {
228 int deleted = 0, j;
229
230 for (j = 1; j < c->argc; j++) {
231 if (server.ds_enabled) {
232 lookupKeyRead(c->db,c->argv[j]);
233 /* FIXME: this can be optimized a lot, no real need to load
234 * a possibly huge value. */
235 }
236 if (dbDelete(c->db,c->argv[j])) {
237 signalModifiedKey(c->db,c->argv[j]);
238 server.dirty++;
239 deleted++;
240 } else if (server.ds_enabled) {
241 if (cacheKeyMayExist(c->db,c->argv[j]) &&
242 dsExists(c->db,c->argv[j]))
243 {
244 cacheScheduleIO(c->db,c->argv[j],REDIS_IO_SAVE);
245 deleted = 1;
246 }
247 }
248 }
249 addReplyLongLong(c,deleted);
250 }
251
252 void existsCommand(redisClient *c) {
253 expireIfNeeded(c->db,c->argv[1]);
254 if (dbExists(c->db,c->argv[1])) {
255 addReply(c, shared.cone);
256 } else {
257 addReply(c, shared.czero);
258 }
259 }
260
261 void selectCommand(redisClient *c) {
262 int id = atoi(c->argv[1]->ptr);
263
264 if (selectDb(c,id) == REDIS_ERR) {
265 addReplyError(c,"invalid DB index");
266 } else {
267 addReply(c,shared.ok);
268 }
269 }
270
271 void randomkeyCommand(redisClient *c) {
272 robj *key;
273
274 if ((key = dbRandomKey(c->db)) == NULL) {
275 addReply(c,shared.nullbulk);
276 return;
277 }
278
279 addReplyBulk(c,key);
280 decrRefCount(key);
281 }
282
283 void keysCommand(redisClient *c) {
284 dictIterator *di;
285 dictEntry *de;
286 sds pattern = c->argv[1]->ptr;
287 int plen = sdslen(pattern), allkeys;
288 unsigned long numkeys = 0;
289 void *replylen = addDeferredMultiBulkLength(c);
290
291 di = dictGetIterator(c->db->dict);
292 allkeys = (pattern[0] == '*' && pattern[1] == '\0');
293 while((de = dictNext(di)) != NULL) {
294 sds key = dictGetEntryKey(de);
295 robj *keyobj;
296
297 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
298 keyobj = createStringObject(key,sdslen(key));
299 if (expireIfNeeded(c->db,keyobj) == 0) {
300 addReplyBulk(c,keyobj);
301 numkeys++;
302 }
303 decrRefCount(keyobj);
304 }
305 }
306 dictReleaseIterator(di);
307 setDeferredMultiBulkLength(c,replylen,numkeys);
308 }
309
310 void dbsizeCommand(redisClient *c) {
311 addReplyLongLong(c,dictSize(c->db->dict));
312 }
313
314 void lastsaveCommand(redisClient *c) {
315 addReplyLongLong(c,server.lastsave);
316 }
317
318 void typeCommand(redisClient *c) {
319 robj *o;
320 char *type;
321
322 o = lookupKeyRead(c->db,c->argv[1]);
323 if (o == NULL) {
324 type = "none";
325 } else {
326 switch(o->type) {
327 case REDIS_STRING: type = "string"; break;
328 case REDIS_LIST: type = "list"; break;
329 case REDIS_SET: type = "set"; break;
330 case REDIS_ZSET: type = "zset"; break;
331 case REDIS_HASH: type = "hash"; break;
332 default: type = "unknown"; break;
333 }
334 }
335 addReplyStatus(c,type);
336 }
337
338 void saveCommand(redisClient *c) {
339 if (server.bgsavechildpid != -1) {
340 addReplyError(c,"Background save already in progress");
341 return;
342 }
343 if (rdbSave(server.dbfilename) == REDIS_OK) {
344 addReply(c,shared.ok);
345 } else {
346 addReply(c,shared.err);
347 }
348 }
349
350 void bgsaveCommand(redisClient *c) {
351 if (server.bgsavechildpid != -1) {
352 addReplyError(c,"Background save already in progress");
353 return;
354 }
355 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
356 addReplyStatus(c,"Background saving started");
357 } else {
358 addReply(c,shared.err);
359 }
360 }
361
362 void shutdownCommand(redisClient *c) {
363 if (prepareForShutdown() == REDIS_OK)
364 exit(0);
365 addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
366 }
367
368 void renameGenericCommand(redisClient *c, int nx) {
369 robj *o;
370
371 /* To use the same key as src and dst is probably an error */
372 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
373 addReply(c,shared.sameobjecterr);
374 return;
375 }
376
377 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
378 return;
379
380 incrRefCount(o);
381 if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
382 if (nx) {
383 decrRefCount(o);
384 addReply(c,shared.czero);
385 return;
386 }
387 dbReplace(c->db,c->argv[2],o);
388 }
389 dbDelete(c->db,c->argv[1]);
390 signalModifiedKey(c->db,c->argv[1]);
391 signalModifiedKey(c->db,c->argv[2]);
392 server.dirty++;
393 addReply(c,nx ? shared.cone : shared.ok);
394 }
395
396 void renameCommand(redisClient *c) {
397 renameGenericCommand(c,0);
398 }
399
400 void renamenxCommand(redisClient *c) {
401 renameGenericCommand(c,1);
402 }
403
404 void moveCommand(redisClient *c) {
405 robj *o;
406 redisDb *src, *dst;
407 int srcid;
408
409 /* Obtain source and target DB pointers */
410 src = c->db;
411 srcid = c->db->id;
412 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
413 addReply(c,shared.outofrangeerr);
414 return;
415 }
416 dst = c->db;
417 selectDb(c,srcid); /* Back to the source DB */
418
419 /* If the user is moving using as target the same
420 * DB as the source DB it is probably an error. */
421 if (src == dst) {
422 addReply(c,shared.sameobjecterr);
423 return;
424 }
425
426 /* Check if the element exists and get a reference */
427 o = lookupKeyWrite(c->db,c->argv[1]);
428 if (!o) {
429 addReply(c,shared.czero);
430 return;
431 }
432
433 /* Try to add the element to the target DB */
434 if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
435 addReply(c,shared.czero);
436 return;
437 }
438 incrRefCount(o);
439
440 /* OK! key moved, free the entry in the source DB */
441 dbDelete(src,c->argv[1]);
442 server.dirty++;
443 addReply(c,shared.cone);
444 }
445
446 /*-----------------------------------------------------------------------------
447 * Expires API
448 *----------------------------------------------------------------------------*/
449
450 int removeExpire(redisDb *db, robj *key) {
451 /* An expire may only be removed if there is a corresponding entry in the
452 * main dict. Otherwise, the key will never be freed. */
453 redisAssert(dictFind(db->dict,key->ptr) != NULL);
454 return dictDelete(db->expires,key->ptr) == DICT_OK;
455 }
456
457 void setExpire(redisDb *db, robj *key, time_t when) {
458 dictEntry *de;
459
460 /* Reuse the sds from the main dict in the expire dict */
461 de = dictFind(db->dict,key->ptr);
462 redisAssert(de != NULL);
463 dictReplace(db->expires,dictGetEntryKey(de),(void*)when);
464 }
465
466 /* Return the expire time of the specified key, or -1 if no expire
467 * is associated with this key (i.e. the key is non volatile) */
468 time_t getExpire(redisDb *db, robj *key) {
469 dictEntry *de;
470
471 /* No expire? return ASAP */
472 if (dictSize(db->expires) == 0 ||
473 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
474
475 /* The entry was found in the expire dict, this means it should also
476 * be present in the main dict (safety check). */
477 redisAssert(dictFind(db->dict,key->ptr) != NULL);
478 return (time_t) dictGetEntryVal(de);
479 }
480
481 /* Propagate expires into slaves and the AOF file.
482 * When a key expires in the master, a DEL operation for this key is sent
483 * to all the slaves and the AOF file if enabled.
484 *
485 * This way the key expiry is centralized in one place, and since both
486 * AOF and the master->slave link guarantee operation ordering, everything
487 * will be consistent even if we allow write operations against expiring
488 * keys. */
489 void propagateExpire(redisDb *db, robj *key) {
490 robj *argv[2];
491
492 argv[0] = createStringObject("DEL",3);
493 argv[1] = key;
494 incrRefCount(key);
495
496 if (server.appendonly)
497 feedAppendOnlyFile(server.delCommand,db->id,argv,2);
498 if (listLength(server.slaves))
499 replicationFeedSlaves(server.slaves,db->id,argv,2);
500
501 decrRefCount(argv[0]);
502 decrRefCount(argv[1]);
503 }
504
505 int expireIfNeeded(redisDb *db, robj *key) {
506 time_t when = getExpire(db,key);
507
508 /* If we are running in the context of a slave, return ASAP:
509 * the slave key expiration is controlled by the master that will
510 * send us synthesized DEL operations for expired keys.
511 *
512 * Still we try to return the right information to the caller,
513 * that is, 0 if we think the key should be still valid, 1 if
514 * we think the key is expired at this time. */
515 if (server.masterhost != NULL) {
516 return time(NULL) > when;
517 }
518
519 if (when < 0) return 0;
520
521 /* Return when this key has not expired */
522 if (time(NULL) <= when) return 0;
523
524 /* Delete the key */
525 server.stat_expiredkeys++;
526 propagateExpire(db,key);
527 return dbDelete(db,key);
528 }
529
530 /*-----------------------------------------------------------------------------
531 * Expires Commands
532 *----------------------------------------------------------------------------*/
533
534 void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
535 dictEntry *de;
536 long seconds;
537
538 if (getLongFromObjectOrReply(c, param, &seconds, NULL) != REDIS_OK) return;
539
540 seconds -= offset;
541
542 de = dictFind(c->db->dict,key->ptr);
543 if (de == NULL) {
544 addReply(c,shared.czero);
545 return;
546 }
547 if (seconds <= 0) {
548 if (dbDelete(c->db,key)) server.dirty++;
549 addReply(c, shared.cone);
550 signalModifiedKey(c->db,key);
551 return;
552 } else {
553 time_t when = time(NULL)+seconds;
554 setExpire(c->db,key,when);
555 addReply(c,shared.cone);
556 signalModifiedKey(c->db,key);
557 server.dirty++;
558 return;
559 }
560 }
561
562 void expireCommand(redisClient *c) {
563 expireGenericCommand(c,c->argv[1],c->argv[2],0);
564 }
565
566 void expireatCommand(redisClient *c) {
567 expireGenericCommand(c,c->argv[1],c->argv[2],time(NULL));
568 }
569
570 void ttlCommand(redisClient *c) {
571 time_t expire, ttl = -1;
572
573 expire = getExpire(c->db,c->argv[1]);
574 if (expire != -1) {
575 ttl = (expire-time(NULL));
576 if (ttl < 0) ttl = -1;
577 }
578 addReplyLongLong(c,(long long)ttl);
579 }
580
581 void persistCommand(redisClient *c) {
582 dictEntry *de;
583
584 de = dictFind(c->db->dict,c->argv[1]->ptr);
585 if (de == NULL) {
586 addReply(c,shared.czero);
587 } else {
588 if (removeExpire(c->db,c->argv[1])) {
589 addReply(c,shared.cone);
590 server.dirty++;
591 } else {
592 addReply(c,shared.czero);
593 }
594 }
595 }