]> git.saurik.com Git - redis.git/blob - src/db.c
blocking load fixed with the new design
[redis.git] / src / db.c
1 #include "redis.h"
2
3 #include <signal.h>
4
5 /*-----------------------------------------------------------------------------
6 * C-level DB API
7 *----------------------------------------------------------------------------*/
8
9 robj *lookupKey(redisDb *db, robj *key) {
10 dictEntry *de = dictFind(db->dict,key->ptr);
11 if (de) {
12 robj *val = dictGetEntryVal(de);
13
14 /* Update the access time for the aging algorithm.
15 * Don't do it if we have a saving child, as this will trigger
16 * a copy on write madness. */
17 if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
18 val->lru = server.lruclock;
19
20 if (server.ds_enabled &&
21 cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG)
22 {
23 /* There is a save in progress for this object!
24 * Wait for it to get out. */
25 waitEmptyIOJobsQueue();
26 processAllPendingIOJobs();
27 redisAssert(!(cacheScheduleIOGetFlags(db,key) & REDIS_IO_SAVEINPROG));
28 }
29 server.stat_keyspace_hits++;
30 return val;
31 } else {
32 time_t expire;
33 robj *val;
34
35 /* Key not found in the in memory hash table, but if disk store is
36 * enabled we may have this key on disk. If so load it in memory
37 * in a blocking way. */
38 if (server.ds_enabled && cacheKeyMayExist(db,key)) {
39 if (cacheScheduleIOGetFlags(db,key) &
40 (REDIS_IO_SAVE|REDIS_IO_SAVEINPROG))
41 {
42 /* There is a save in progress for this object!
43 * Wait for it to get out. */
44 waitEmptyIOJobsQueue();
45 processAllPendingIOJobs();
46 redisAssert((cacheScheduleIOGetFlags(db,key) & (REDIS_IO_SAVE|REDIS_IO_SAVEINPROG)) == 0);
47 }
48
49 redisLog(REDIS_DEBUG,"Force loading key %s via lookup",
50 key->ptr);
51 val = dsGet(db,key,&expire);
52 if (val) {
53 int retval = dbAdd(db,key,val);
54 redisAssert(retval == REDIS_OK);
55 if (expire != -1) setExpire(db,key,expire);
56 server.stat_keyspace_hits++;
57 return val;
58 }
59 }
60 server.stat_keyspace_misses++;
61 return NULL;
62 }
63 }
64
65 robj *lookupKeyRead(redisDb *db, robj *key) {
66 expireIfNeeded(db,key);
67 return lookupKey(db,key);
68 }
69
70 robj *lookupKeyWrite(redisDb *db, robj *key) {
71 expireIfNeeded(db,key);
72 return lookupKey(db,key);
73 }
74
75 robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
76 robj *o = lookupKeyRead(c->db, key);
77 if (!o) addReply(c,reply);
78 return o;
79 }
80
81 robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
82 robj *o = lookupKeyWrite(c->db, key);
83 if (!o) addReply(c,reply);
84 return o;
85 }
86
87 /* Add the key to the DB. If the key already exists REDIS_ERR is returned,
88 * otherwise REDIS_OK is returned, and the caller should increment the
89 * refcount of 'val'. */
90 int dbAdd(redisDb *db, robj *key, robj *val) {
91 /* Perform a lookup before adding the key, as we need to copy the
92 * key value. */
93 if (dictFind(db->dict, key->ptr) != NULL) {
94 return REDIS_ERR;
95 } else {
96 sds copy = sdsdup(key->ptr);
97 dictAdd(db->dict, copy, val);
98 if (server.ds_enabled) {
99 /* FIXME: remove entry from negative cache */
100 }
101 return REDIS_OK;
102 }
103 }
104
105 /* If the key does not exist, this is just like dbAdd(). Otherwise
106 * the value associated to the key is replaced with the new one.
107 *
108 * On update (key already existed) 0 is returned. Otherwise 1. */
109 int dbReplace(redisDb *db, robj *key, robj *val) {
110 robj *oldval;
111
112 if ((oldval = dictFetchValue(db->dict,key->ptr)) == NULL) {
113 sds copy = sdsdup(key->ptr);
114 dictAdd(db->dict, copy, val);
115 return 1;
116 } else {
117 dictReplace(db->dict, key->ptr, val);
118 return 0;
119 }
120 }
121
122 int dbExists(redisDb *db, robj *key) {
123 return dictFind(db->dict,key->ptr) != NULL;
124 }
125
126 /* Return a random key, in form of a Redis object.
127 * If there are no keys, NULL is returned.
128 *
129 * The function makes sure to return keys not already expired. */
130 robj *dbRandomKey(redisDb *db) {
131 struct dictEntry *de;
132
133 while(1) {
134 sds key;
135 robj *keyobj;
136
137 de = dictGetRandomKey(db->dict);
138 if (de == NULL) return NULL;
139
140 key = dictGetEntryKey(de);
141 keyobj = createStringObject(key,sdslen(key));
142 if (dictFind(db->expires,key)) {
143 if (expireIfNeeded(db,keyobj)) {
144 decrRefCount(keyobj);
145 continue; /* search for another key. This expired. */
146 }
147 }
148 return keyobj;
149 }
150 }
151
152 /* Delete a key, value, and associated expiration entry if any, from the DB */
153 int dbDelete(redisDb *db, robj *key) {
154 /* If diskstore is enabled make sure to awake waiting clients for this key
155 * as it is not really useful to wait for a key already deleted to be
156 * loaded from disk. */
157 if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
158
159 /* FIXME: we should mark this key as non existing on disk in the negative
160 * cache. */
161
162 /* Deleting an entry from the expires dict will not free the sds of
163 * the key, because it is shared with the main dictionary. */
164 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
165 return dictDelete(db->dict,key->ptr) == DICT_OK;
166 }
167
168 /* Empty the whole database */
169 long long emptyDb() {
170 int j;
171 long long removed = 0;
172
173 for (j = 0; j < server.dbnum; j++) {
174 removed += dictSize(server.db[j].dict);
175 dictEmpty(server.db[j].dict);
176 dictEmpty(server.db[j].expires);
177 }
178 return removed;
179 }
180
181 int selectDb(redisClient *c, int id) {
182 if (id < 0 || id >= server.dbnum)
183 return REDIS_ERR;
184 c->db = &server.db[id];
185 return REDIS_OK;
186 }
187
188 /*-----------------------------------------------------------------------------
189 * Hooks for key space changes.
190 *
191 * Every time a key in the database is modified the function
192 * signalModifiedKey() is called.
193 *
194 * Every time a DB is flushed the function signalFlushDb() is called.
195 *----------------------------------------------------------------------------*/
196
197 void signalModifiedKey(redisDb *db, robj *key) {
198 touchWatchedKey(db,key);
199 if (server.ds_enabled)
200 cacheScheduleIO(db,key,REDIS_IO_SAVE);
201 }
202
203 void signalFlushedDb(int dbid) {
204 touchWatchedKeysOnFlush(dbid);
205 if (server.ds_enabled)
206 dsFlushDb(dbid);
207 }
208
209 /*-----------------------------------------------------------------------------
210 * Type agnostic commands operating on the key space
211 *----------------------------------------------------------------------------*/
212
213 void flushdbCommand(redisClient *c) {
214 server.dirty += dictSize(c->db->dict);
215 signalFlushedDb(c->db->id);
216 dictEmpty(c->db->dict);
217 dictEmpty(c->db->expires);
218 addReply(c,shared.ok);
219 }
220
221 void flushallCommand(redisClient *c) {
222 signalFlushedDb(-1);
223 server.dirty += emptyDb();
224 addReply(c,shared.ok);
225 if (server.bgsavechildpid != -1) {
226 kill(server.bgsavechildpid,SIGKILL);
227 rdbRemoveTempFile(server.bgsavechildpid);
228 }
229 rdbSave(server.dbfilename);
230 server.dirty++;
231 }
232
233 void delCommand(redisClient *c) {
234 int deleted = 0, j;
235
236 for (j = 1; j < c->argc; j++) {
237 if (server.ds_enabled) {
238 lookupKeyRead(c->db,c->argv[j]);
239 /* FIXME: this can be optimized a lot, no real need to load
240 * a possibly huge value. */
241 }
242 if (dbDelete(c->db,c->argv[j])) {
243 signalModifiedKey(c->db,c->argv[j]);
244 server.dirty++;
245 deleted++;
246 } else if (server.ds_enabled) {
247 if (cacheKeyMayExist(c->db,c->argv[j]) &&
248 dsExists(c->db,c->argv[j]))
249 {
250 cacheScheduleIO(c->db,c->argv[j],REDIS_IO_SAVE);
251 deleted = 1;
252 }
253 }
254 }
255 addReplyLongLong(c,deleted);
256 }
257
258 void existsCommand(redisClient *c) {
259 expireIfNeeded(c->db,c->argv[1]);
260 if (dbExists(c->db,c->argv[1])) {
261 addReply(c, shared.cone);
262 } else {
263 addReply(c, shared.czero);
264 }
265 }
266
267 void selectCommand(redisClient *c) {
268 int id = atoi(c->argv[1]->ptr);
269
270 if (selectDb(c,id) == REDIS_ERR) {
271 addReplyError(c,"invalid DB index");
272 } else {
273 addReply(c,shared.ok);
274 }
275 }
276
277 void randomkeyCommand(redisClient *c) {
278 robj *key;
279
280 if ((key = dbRandomKey(c->db)) == NULL) {
281 addReply(c,shared.nullbulk);
282 return;
283 }
284
285 addReplyBulk(c,key);
286 decrRefCount(key);
287 }
288
289 void keysCommand(redisClient *c) {
290 dictIterator *di;
291 dictEntry *de;
292 sds pattern = c->argv[1]->ptr;
293 int plen = sdslen(pattern), allkeys;
294 unsigned long numkeys = 0;
295 void *replylen = addDeferredMultiBulkLength(c);
296
297 di = dictGetIterator(c->db->dict);
298 allkeys = (pattern[0] == '*' && pattern[1] == '\0');
299 while((de = dictNext(di)) != NULL) {
300 sds key = dictGetEntryKey(de);
301 robj *keyobj;
302
303 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
304 keyobj = createStringObject(key,sdslen(key));
305 if (expireIfNeeded(c->db,keyobj) == 0) {
306 addReplyBulk(c,keyobj);
307 numkeys++;
308 }
309 decrRefCount(keyobj);
310 }
311 }
312 dictReleaseIterator(di);
313 setDeferredMultiBulkLength(c,replylen,numkeys);
314 }
315
316 void dbsizeCommand(redisClient *c) {
317 addReplyLongLong(c,dictSize(c->db->dict));
318 }
319
320 void lastsaveCommand(redisClient *c) {
321 addReplyLongLong(c,server.lastsave);
322 }
323
324 void typeCommand(redisClient *c) {
325 robj *o;
326 char *type;
327
328 o = lookupKeyRead(c->db,c->argv[1]);
329 if (o == NULL) {
330 type = "none";
331 } else {
332 switch(o->type) {
333 case REDIS_STRING: type = "string"; break;
334 case REDIS_LIST: type = "list"; break;
335 case REDIS_SET: type = "set"; break;
336 case REDIS_ZSET: type = "zset"; break;
337 case REDIS_HASH: type = "hash"; break;
338 default: type = "unknown"; break;
339 }
340 }
341 addReplyStatus(c,type);
342 }
343
344 void saveCommand(redisClient *c) {
345 if (server.bgsavechildpid != -1) {
346 addReplyError(c,"Background save already in progress");
347 return;
348 }
349 if (rdbSave(server.dbfilename) == REDIS_OK) {
350 addReply(c,shared.ok);
351 } else {
352 addReply(c,shared.err);
353 }
354 }
355
356 void bgsaveCommand(redisClient *c) {
357 if (server.bgsavechildpid != -1) {
358 addReplyError(c,"Background save already in progress");
359 return;
360 }
361 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
362 addReplyStatus(c,"Background saving started");
363 } else {
364 addReply(c,shared.err);
365 }
366 }
367
368 void shutdownCommand(redisClient *c) {
369 if (prepareForShutdown() == REDIS_OK)
370 exit(0);
371 addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
372 }
373
374 void renameGenericCommand(redisClient *c, int nx) {
375 robj *o;
376
377 /* To use the same key as src and dst is probably an error */
378 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
379 addReply(c,shared.sameobjecterr);
380 return;
381 }
382
383 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
384 return;
385
386 incrRefCount(o);
387 if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
388 if (nx) {
389 decrRefCount(o);
390 addReply(c,shared.czero);
391 return;
392 }
393 dbReplace(c->db,c->argv[2],o);
394 }
395 dbDelete(c->db,c->argv[1]);
396 signalModifiedKey(c->db,c->argv[1]);
397 signalModifiedKey(c->db,c->argv[2]);
398 server.dirty++;
399 addReply(c,nx ? shared.cone : shared.ok);
400 }
401
402 void renameCommand(redisClient *c) {
403 renameGenericCommand(c,0);
404 }
405
406 void renamenxCommand(redisClient *c) {
407 renameGenericCommand(c,1);
408 }
409
410 void moveCommand(redisClient *c) {
411 robj *o;
412 redisDb *src, *dst;
413 int srcid;
414
415 /* Obtain source and target DB pointers */
416 src = c->db;
417 srcid = c->db->id;
418 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
419 addReply(c,shared.outofrangeerr);
420 return;
421 }
422 dst = c->db;
423 selectDb(c,srcid); /* Back to the source DB */
424
425 /* If the user is moving using as target the same
426 * DB as the source DB it is probably an error. */
427 if (src == dst) {
428 addReply(c,shared.sameobjecterr);
429 return;
430 }
431
432 /* Check if the element exists and get a reference */
433 o = lookupKeyWrite(c->db,c->argv[1]);
434 if (!o) {
435 addReply(c,shared.czero);
436 return;
437 }
438
439 /* Try to add the element to the target DB */
440 if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
441 addReply(c,shared.czero);
442 return;
443 }
444 incrRefCount(o);
445
446 /* OK! key moved, free the entry in the source DB */
447 dbDelete(src,c->argv[1]);
448 server.dirty++;
449 addReply(c,shared.cone);
450 }
451
452 /*-----------------------------------------------------------------------------
453 * Expires API
454 *----------------------------------------------------------------------------*/
455
456 int removeExpire(redisDb *db, robj *key) {
457 /* An expire may only be removed if there is a corresponding entry in the
458 * main dict. Otherwise, the key will never be freed. */
459 redisAssert(dictFind(db->dict,key->ptr) != NULL);
460 return dictDelete(db->expires,key->ptr) == DICT_OK;
461 }
462
463 void setExpire(redisDb *db, robj *key, time_t when) {
464 dictEntry *de;
465
466 /* Reuse the sds from the main dict in the expire dict */
467 de = dictFind(db->dict,key->ptr);
468 redisAssert(de != NULL);
469 dictReplace(db->expires,dictGetEntryKey(de),(void*)when);
470 }
471
472 /* Return the expire time of the specified key, or -1 if no expire
473 * is associated with this key (i.e. the key is non volatile) */
474 time_t getExpire(redisDb *db, robj *key) {
475 dictEntry *de;
476
477 /* No expire? return ASAP */
478 if (dictSize(db->expires) == 0 ||
479 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
480
481 /* The entry was found in the expire dict, this means it should also
482 * be present in the main dict (safety check). */
483 redisAssert(dictFind(db->dict,key->ptr) != NULL);
484 return (time_t) dictGetEntryVal(de);
485 }
486
487 /* Propagate expires into slaves and the AOF file.
488 * When a key expires in the master, a DEL operation for this key is sent
489 * to all the slaves and the AOF file if enabled.
490 *
491 * This way the key expiry is centralized in one place, and since both
492 * AOF and the master->slave link guarantee operation ordering, everything
493 * will be consistent even if we allow write operations against expiring
494 * keys. */
495 void propagateExpire(redisDb *db, robj *key) {
496 robj *argv[2];
497
498 argv[0] = createStringObject("DEL",3);
499 argv[1] = key;
500 incrRefCount(key);
501
502 if (server.appendonly)
503 feedAppendOnlyFile(server.delCommand,db->id,argv,2);
504 if (listLength(server.slaves))
505 replicationFeedSlaves(server.slaves,db->id,argv,2);
506
507 decrRefCount(argv[0]);
508 decrRefCount(argv[1]);
509 }
510
511 int expireIfNeeded(redisDb *db, robj *key) {
512 time_t when = getExpire(db,key);
513
514 /* If we are running in the context of a slave, return ASAP:
515 * the slave key expiration is controlled by the master that will
516 * send us synthesized DEL operations for expired keys.
517 *
518 * Still we try to return the right information to the caller,
519 * that is, 0 if we think the key should be still valid, 1 if
520 * we think the key is expired at this time. */
521 if (server.masterhost != NULL) {
522 return time(NULL) > when;
523 }
524
525 if (when < 0) return 0;
526
527 /* Return when this key has not expired */
528 if (time(NULL) <= when) return 0;
529
530 /* Delete the key */
531 server.stat_expiredkeys++;
532 propagateExpire(db,key);
533 return dbDelete(db,key);
534 }
535
536 /*-----------------------------------------------------------------------------
537 * Expires Commands
538 *----------------------------------------------------------------------------*/
539
540 void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
541 dictEntry *de;
542 long seconds;
543
544 if (getLongFromObjectOrReply(c, param, &seconds, NULL) != REDIS_OK) return;
545
546 seconds -= offset;
547
548 de = dictFind(c->db->dict,key->ptr);
549 if (de == NULL) {
550 addReply(c,shared.czero);
551 return;
552 }
553 if (seconds <= 0) {
554 if (dbDelete(c->db,key)) server.dirty++;
555 addReply(c, shared.cone);
556 signalModifiedKey(c->db,key);
557 return;
558 } else {
559 time_t when = time(NULL)+seconds;
560 setExpire(c->db,key,when);
561 addReply(c,shared.cone);
562 signalModifiedKey(c->db,key);
563 server.dirty++;
564 return;
565 }
566 }
567
568 void expireCommand(redisClient *c) {
569 expireGenericCommand(c,c->argv[1],c->argv[2],0);
570 }
571
572 void expireatCommand(redisClient *c) {
573 expireGenericCommand(c,c->argv[1],c->argv[2],time(NULL));
574 }
575
576 void ttlCommand(redisClient *c) {
577 time_t expire, ttl = -1;
578
579 expire = getExpire(c->db,c->argv[1]);
580 if (expire != -1) {
581 ttl = (expire-time(NULL));
582 if (ttl < 0) ttl = -1;
583 }
584 addReplyLongLong(c,(long long)ttl);
585 }
586
587 void persistCommand(redisClient *c) {
588 dictEntry *de;
589
590 de = dictFind(c->db->dict,c->argv[1]->ptr);
591 if (de == NULL) {
592 addReply(c,shared.czero);
593 } else {
594 if (removeExpire(c->db,c->argv[1])) {
595 addReply(c,shared.cone);
596 server.dirty++;
597 } else {
598 addReply(c,shared.czero);
599 }
600 }
601 }