]> git.saurik.com Git - redis.git/blob - src/db.c
major bug and a dead lock fixed
[redis.git] / src / db.c
1 #include "redis.h"
2
3 #include <signal.h>
4
5 /*-----------------------------------------------------------------------------
6 * C-level DB API
7 *----------------------------------------------------------------------------*/
8
9 robj *lookupKey(redisDb *db, robj *key) {
10 dictEntry *de = dictFind(db->dict,key->ptr);
11 if (de) {
12 robj *val = dictGetEntryVal(de);
13
14 /* Update the access time for the aging algorithm.
15 * Don't do it if we have a saving child, as this will trigger
16 * a copy on write madness. */
17 if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
18 val->lru = server.lruclock;
19
20 if (server.ds_enabled && val->storage == REDIS_DS_SAVING) {
21 /* FIXME: change this code to just wait for our object to
22 * get out of the IO Job. */
23 waitEmptyIOJobsQueue();
24 processAllPendingIOJobs();
25 redisAssert(val->storage != REDIS_DS_SAVING);
26 }
27 server.stat_keyspace_hits++;
28 return val;
29 } else {
30 time_t expire;
31 robj *val;
32
33 /* Key not found in the in memory hash table, but if disk store is
34 * enabled we may have this key on disk. If so load it in memory
35 * in a blocking way.
36 *
37 * FIXME: race condition here. If there was an already scheduled
38 * async loading of this key, what may happen is that the old
39 * key is loaded in memory if this gets deleted in the meantime. */
40 if (server.ds_enabled && cacheKeyMayExist(db,key)) {
41 val = dsGet(db,key,&expire);
42 if (val) {
43 int retval = dbAdd(db,key,val);
44 redisAssert(retval == REDIS_OK);
45 if (expire != -1) setExpire(db,key,expire);
46 server.stat_keyspace_hits++;
47 return val;
48 }
49 }
50 server.stat_keyspace_misses++;
51 return NULL;
52 }
53 }
54
55 robj *lookupKeyRead(redisDb *db, robj *key) {
56 expireIfNeeded(db,key);
57 return lookupKey(db,key);
58 }
59
60 robj *lookupKeyWrite(redisDb *db, robj *key) {
61 expireIfNeeded(db,key);
62 return lookupKey(db,key);
63 }
64
65 robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
66 robj *o = lookupKeyRead(c->db, key);
67 if (!o) addReply(c,reply);
68 return o;
69 }
70
71 robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
72 robj *o = lookupKeyWrite(c->db, key);
73 if (!o) addReply(c,reply);
74 return o;
75 }
76
77 /* Add the key to the DB. If the key already exists REDIS_ERR is returned,
78 * otherwise REDIS_OK is returned, and the caller should increment the
79 * refcount of 'val'. */
80 int dbAdd(redisDb *db, robj *key, robj *val) {
81 /* Perform a lookup before adding the key, as we need to copy the
82 * key value. */
83 if (dictFind(db->dict, key->ptr) != NULL) {
84 return REDIS_ERR;
85 } else {
86 sds copy = sdsdup(key->ptr);
87 dictAdd(db->dict, copy, val);
88 if (server.ds_enabled) {
89 /* FIXME: remove entry from negative cache */
90 }
91 return REDIS_OK;
92 }
93 }
94
95 /* If the key does not exist, this is just like dbAdd(). Otherwise
96 * the value associated to the key is replaced with the new one.
97 *
98 * On update (key already existed) 0 is returned. Otherwise 1. */
99 int dbReplace(redisDb *db, robj *key, robj *val) {
100 robj *oldval;
101
102 if ((oldval = dictFetchValue(db->dict,key->ptr)) == NULL) {
103 sds copy = sdsdup(key->ptr);
104 dictAdd(db->dict, copy, val);
105 return 1;
106 } else {
107 val->storage = oldval->storage;
108 dictReplace(db->dict, key->ptr, val);
109 return 0;
110 }
111 }
112
113 int dbExists(redisDb *db, robj *key) {
114 return dictFind(db->dict,key->ptr) != NULL;
115 }
116
117 /* Return a random key, in form of a Redis object.
118 * If there are no keys, NULL is returned.
119 *
120 * The function makes sure to return keys not already expired. */
121 robj *dbRandomKey(redisDb *db) {
122 struct dictEntry *de;
123
124 while(1) {
125 sds key;
126 robj *keyobj;
127
128 de = dictGetRandomKey(db->dict);
129 if (de == NULL) return NULL;
130
131 key = dictGetEntryKey(de);
132 keyobj = createStringObject(key,sdslen(key));
133 if (dictFind(db->expires,key)) {
134 if (expireIfNeeded(db,keyobj)) {
135 decrRefCount(keyobj);
136 continue; /* search for another key. This expired. */
137 }
138 }
139 return keyobj;
140 }
141 }
142
143 /* Delete a key, value, and associated expiration entry if any, from the DB */
144 int dbDelete(redisDb *db, robj *key) {
145 /* If VM is enabled make sure to awake waiting clients for this key:
146 * deleting the key will kill the I/O thread bringing the key from swap
147 * to memory, so the client will never be notified and unblocked if we
148 * don't do it now. */
149 if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
150
151 /* FIXME: we need to delete the IO Job loading the key, or simply we can
152 * wait for it to finish. */
153
154 /* Deleting an entry from the expires dict will not free the sds of
155 * the key, because it is shared with the main dictionary. */
156 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
157 return dictDelete(db->dict,key->ptr) == DICT_OK;
158 }
159
160 /* Empty the whole database */
161 long long emptyDb() {
162 int j;
163 long long removed = 0;
164
165 for (j = 0; j < server.dbnum; j++) {
166 removed += dictSize(server.db[j].dict);
167 dictEmpty(server.db[j].dict);
168 dictEmpty(server.db[j].expires);
169 }
170 return removed;
171 }
172
173 int selectDb(redisClient *c, int id) {
174 if (id < 0 || id >= server.dbnum)
175 return REDIS_ERR;
176 c->db = &server.db[id];
177 return REDIS_OK;
178 }
179
180 /*-----------------------------------------------------------------------------
181 * Hooks for key space changes.
182 *
183 * Every time a key in the database is modified the function
184 * signalModifiedKey() is called.
185 *
186 * Every time a DB is flushed the function signalFlushDb() is called.
187 *----------------------------------------------------------------------------*/
188
189 void signalModifiedKey(redisDb *db, robj *key) {
190 touchWatchedKey(db,key);
191 if (server.ds_enabled)
192 cacheScheduleForFlush(db,key);
193 }
194
195 void signalFlushedDb(int dbid) {
196 touchWatchedKeysOnFlush(dbid);
197 if (server.ds_enabled)
198 dsFlushDb(dbid);
199 }
200
201 /*-----------------------------------------------------------------------------
202 * Type agnostic commands operating on the key space
203 *----------------------------------------------------------------------------*/
204
205 void flushdbCommand(redisClient *c) {
206 server.dirty += dictSize(c->db->dict);
207 signalFlushedDb(c->db->id);
208 dictEmpty(c->db->dict);
209 dictEmpty(c->db->expires);
210 addReply(c,shared.ok);
211 }
212
213 void flushallCommand(redisClient *c) {
214 signalFlushedDb(-1);
215 server.dirty += emptyDb();
216 addReply(c,shared.ok);
217 if (server.bgsavechildpid != -1) {
218 kill(server.bgsavechildpid,SIGKILL);
219 rdbRemoveTempFile(server.bgsavechildpid);
220 }
221 rdbSave(server.dbfilename);
222 server.dirty++;
223 }
224
225 void delCommand(redisClient *c) {
226 int deleted = 0, j;
227
228 for (j = 1; j < c->argc; j++) {
229 if (server.ds_enabled) {
230 lookupKeyRead(c->db,c->argv[j]);
231 /* FIXME: this can be optimized a lot, no real need to load
232 * a possibly huge value. */
233 }
234 if (dbDelete(c->db,c->argv[j])) {
235 signalModifiedKey(c->db,c->argv[j]);
236 server.dirty++;
237 deleted++;
238 } else if (server.ds_enabled) {
239 if (cacheKeyMayExist(c->db,c->argv[j]) &&
240 dsExists(c->db,c->argv[j]))
241 {
242 cacheScheduleForFlush(c->db,c->argv[j]);
243 deleted = 1;
244 }
245 }
246 }
247 addReplyLongLong(c,deleted);
248 }
249
250 void existsCommand(redisClient *c) {
251 expireIfNeeded(c->db,c->argv[1]);
252 if (dbExists(c->db,c->argv[1])) {
253 addReply(c, shared.cone);
254 } else {
255 addReply(c, shared.czero);
256 }
257 }
258
259 void selectCommand(redisClient *c) {
260 int id = atoi(c->argv[1]->ptr);
261
262 if (selectDb(c,id) == REDIS_ERR) {
263 addReplyError(c,"invalid DB index");
264 } else {
265 addReply(c,shared.ok);
266 }
267 }
268
269 void randomkeyCommand(redisClient *c) {
270 robj *key;
271
272 if ((key = dbRandomKey(c->db)) == NULL) {
273 addReply(c,shared.nullbulk);
274 return;
275 }
276
277 addReplyBulk(c,key);
278 decrRefCount(key);
279 }
280
281 void keysCommand(redisClient *c) {
282 dictIterator *di;
283 dictEntry *de;
284 sds pattern = c->argv[1]->ptr;
285 int plen = sdslen(pattern), allkeys;
286 unsigned long numkeys = 0;
287 void *replylen = addDeferredMultiBulkLength(c);
288
289 di = dictGetIterator(c->db->dict);
290 allkeys = (pattern[0] == '*' && pattern[1] == '\0');
291 while((de = dictNext(di)) != NULL) {
292 sds key = dictGetEntryKey(de);
293 robj *keyobj;
294
295 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
296 keyobj = createStringObject(key,sdslen(key));
297 if (expireIfNeeded(c->db,keyobj) == 0) {
298 addReplyBulk(c,keyobj);
299 numkeys++;
300 }
301 decrRefCount(keyobj);
302 }
303 }
304 dictReleaseIterator(di);
305 setDeferredMultiBulkLength(c,replylen,numkeys);
306 }
307
308 void dbsizeCommand(redisClient *c) {
309 addReplyLongLong(c,dictSize(c->db->dict));
310 }
311
312 void lastsaveCommand(redisClient *c) {
313 addReplyLongLong(c,server.lastsave);
314 }
315
316 void typeCommand(redisClient *c) {
317 robj *o;
318 char *type;
319
320 o = lookupKeyRead(c->db,c->argv[1]);
321 if (o == NULL) {
322 type = "none";
323 } else {
324 switch(o->type) {
325 case REDIS_STRING: type = "string"; break;
326 case REDIS_LIST: type = "list"; break;
327 case REDIS_SET: type = "set"; break;
328 case REDIS_ZSET: type = "zset"; break;
329 case REDIS_HASH: type = "hash"; break;
330 default: type = "unknown"; break;
331 }
332 }
333 addReplyStatus(c,type);
334 }
335
336 void saveCommand(redisClient *c) {
337 if (server.bgsavechildpid != -1) {
338 addReplyError(c,"Background save already in progress");
339 return;
340 }
341 if (rdbSave(server.dbfilename) == REDIS_OK) {
342 addReply(c,shared.ok);
343 } else {
344 addReply(c,shared.err);
345 }
346 }
347
348 void bgsaveCommand(redisClient *c) {
349 if (server.bgsavechildpid != -1) {
350 addReplyError(c,"Background save already in progress");
351 return;
352 }
353 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
354 addReplyStatus(c,"Background saving started");
355 } else {
356 addReply(c,shared.err);
357 }
358 }
359
360 void shutdownCommand(redisClient *c) {
361 if (prepareForShutdown() == REDIS_OK)
362 exit(0);
363 addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
364 }
365
366 void renameGenericCommand(redisClient *c, int nx) {
367 robj *o;
368
369 /* To use the same key as src and dst is probably an error */
370 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
371 addReply(c,shared.sameobjecterr);
372 return;
373 }
374
375 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
376 return;
377
378 incrRefCount(o);
379 if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
380 if (nx) {
381 decrRefCount(o);
382 addReply(c,shared.czero);
383 return;
384 }
385 dbReplace(c->db,c->argv[2],o);
386 }
387 dbDelete(c->db,c->argv[1]);
388 signalModifiedKey(c->db,c->argv[1]);
389 signalModifiedKey(c->db,c->argv[2]);
390 server.dirty++;
391 addReply(c,nx ? shared.cone : shared.ok);
392 }
393
394 void renameCommand(redisClient *c) {
395 renameGenericCommand(c,0);
396 }
397
398 void renamenxCommand(redisClient *c) {
399 renameGenericCommand(c,1);
400 }
401
402 void moveCommand(redisClient *c) {
403 robj *o;
404 redisDb *src, *dst;
405 int srcid;
406
407 /* Obtain source and target DB pointers */
408 src = c->db;
409 srcid = c->db->id;
410 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
411 addReply(c,shared.outofrangeerr);
412 return;
413 }
414 dst = c->db;
415 selectDb(c,srcid); /* Back to the source DB */
416
417 /* If the user is moving using as target the same
418 * DB as the source DB it is probably an error. */
419 if (src == dst) {
420 addReply(c,shared.sameobjecterr);
421 return;
422 }
423
424 /* Check if the element exists and get a reference */
425 o = lookupKeyWrite(c->db,c->argv[1]);
426 if (!o) {
427 addReply(c,shared.czero);
428 return;
429 }
430
431 /* Try to add the element to the target DB */
432 if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
433 addReply(c,shared.czero);
434 return;
435 }
436 incrRefCount(o);
437
438 /* OK! key moved, free the entry in the source DB */
439 dbDelete(src,c->argv[1]);
440 server.dirty++;
441 addReply(c,shared.cone);
442 }
443
444 /*-----------------------------------------------------------------------------
445 * Expires API
446 *----------------------------------------------------------------------------*/
447
448 int removeExpire(redisDb *db, robj *key) {
449 /* An expire may only be removed if there is a corresponding entry in the
450 * main dict. Otherwise, the key will never be freed. */
451 redisAssert(dictFind(db->dict,key->ptr) != NULL);
452 return dictDelete(db->expires,key->ptr) == DICT_OK;
453 }
454
455 void setExpire(redisDb *db, robj *key, time_t when) {
456 dictEntry *de;
457
458 /* Reuse the sds from the main dict in the expire dict */
459 de = dictFind(db->dict,key->ptr);
460 redisAssert(de != NULL);
461 dictReplace(db->expires,dictGetEntryKey(de),(void*)when);
462 }
463
464 /* Return the expire time of the specified key, or -1 if no expire
465 * is associated with this key (i.e. the key is non volatile) */
466 time_t getExpire(redisDb *db, robj *key) {
467 dictEntry *de;
468
469 /* No expire? return ASAP */
470 if (dictSize(db->expires) == 0 ||
471 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
472
473 /* The entry was found in the expire dict, this means it should also
474 * be present in the main dict (safety check). */
475 redisAssert(dictFind(db->dict,key->ptr) != NULL);
476 return (time_t) dictGetEntryVal(de);
477 }
478
479 /* Propagate expires into slaves and the AOF file.
480 * When a key expires in the master, a DEL operation for this key is sent
481 * to all the slaves and the AOF file if enabled.
482 *
483 * This way the key expiry is centralized in one place, and since both
484 * AOF and the master->slave link guarantee operation ordering, everything
485 * will be consistent even if we allow write operations against expiring
486 * keys. */
487 void propagateExpire(redisDb *db, robj *key) {
488 robj *argv[2];
489
490 argv[0] = createStringObject("DEL",3);
491 argv[1] = key;
492 incrRefCount(key);
493
494 if (server.appendonly)
495 feedAppendOnlyFile(server.delCommand,db->id,argv,2);
496 if (listLength(server.slaves))
497 replicationFeedSlaves(server.slaves,db->id,argv,2);
498
499 decrRefCount(argv[0]);
500 decrRefCount(argv[1]);
501 }
502
503 int expireIfNeeded(redisDb *db, robj *key) {
504 time_t when = getExpire(db,key);
505
506 /* If we are running in the context of a slave, return ASAP:
507 * the slave key expiration is controlled by the master that will
508 * send us synthesized DEL operations for expired keys.
509 *
510 * Still we try to return the right information to the caller,
511 * that is, 0 if we think the key should be still valid, 1 if
512 * we think the key is expired at this time. */
513 if (server.masterhost != NULL) {
514 return time(NULL) > when;
515 }
516
517 if (when < 0) return 0;
518
519 /* Return when this key has not expired */
520 if (time(NULL) <= when) return 0;
521
522 /* Delete the key */
523 server.stat_expiredkeys++;
524 propagateExpire(db,key);
525 return dbDelete(db,key);
526 }
527
528 /*-----------------------------------------------------------------------------
529 * Expires Commands
530 *----------------------------------------------------------------------------*/
531
532 void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
533 dictEntry *de;
534 long seconds;
535
536 if (getLongFromObjectOrReply(c, param, &seconds, NULL) != REDIS_OK) return;
537
538 seconds -= offset;
539
540 de = dictFind(c->db->dict,key->ptr);
541 if (de == NULL) {
542 addReply(c,shared.czero);
543 return;
544 }
545 if (seconds <= 0) {
546 if (dbDelete(c->db,key)) server.dirty++;
547 addReply(c, shared.cone);
548 signalModifiedKey(c->db,key);
549 return;
550 } else {
551 time_t when = time(NULL)+seconds;
552 setExpire(c->db,key,when);
553 addReply(c,shared.cone);
554 signalModifiedKey(c->db,key);
555 server.dirty++;
556 return;
557 }
558 }
559
560 void expireCommand(redisClient *c) {
561 expireGenericCommand(c,c->argv[1],c->argv[2],0);
562 }
563
564 void expireatCommand(redisClient *c) {
565 expireGenericCommand(c,c->argv[1],c->argv[2],time(NULL));
566 }
567
568 void ttlCommand(redisClient *c) {
569 time_t expire, ttl = -1;
570
571 expire = getExpire(c->db,c->argv[1]);
572 if (expire != -1) {
573 ttl = (expire-time(NULL));
574 if (ttl < 0) ttl = -1;
575 }
576 addReplyLongLong(c,(long long)ttl);
577 }
578
579 void persistCommand(redisClient *c) {
580 dictEntry *de;
581
582 de = dictFind(c->db->dict,c->argv[1]->ptr);
583 if (de == NULL) {
584 addReply(c,shared.czero);
585 } else {
586 if (removeExpire(c->db,c->argv[1])) {
587 addReply(c,shared.cone);
588 server.dirty++;
589 } else {
590 addReply(c,shared.czero);
591 }
592 }
593 }