]> git.saurik.com Git - redis.git/blob - src/db.c
disk store logged messages improved
[redis.git] / src / db.c
1 #include "redis.h"
2
3 #include <signal.h>
4
5 /*-----------------------------------------------------------------------------
6 * C-level DB API
7 *----------------------------------------------------------------------------*/
8
9 robj *lookupKey(redisDb *db, robj *key) {
10 dictEntry *de = dictFind(db->dict,key->ptr);
11 if (de) {
12 robj *val = dictGetEntryVal(de);
13
14 /* Update the access time for the aging algorithm.
15 * Don't do it if we have a saving child, as this will trigger
16 * a copy on write madness. */
17 if (server.bgsavechildpid == -1 && server.bgrewritechildpid == -1)
18 val->lru = server.lruclock;
19
20 if (server.ds_enabled && val->storage == REDIS_DS_SAVING) {
21 /* FIXME: change this code to just wait for our object to
22 * get out of the IO Job. */
23 waitEmptyIOJobsQueue();
24 redisAssert(val->storage != REDIS_DS_SAVING);
25 }
26 server.stat_keyspace_hits++;
27 return val;
28 } else {
29 /* FIXME: Check if the object is on disk, if it is, load it
30 * in a blocking way now. */
31 server.stat_keyspace_misses++;
32 return NULL;
33 }
34 }
35
36 robj *lookupKeyRead(redisDb *db, robj *key) {
37 expireIfNeeded(db,key);
38 return lookupKey(db,key);
39 }
40
41 robj *lookupKeyWrite(redisDb *db, robj *key) {
42 expireIfNeeded(db,key);
43 return lookupKey(db,key);
44 }
45
46 robj *lookupKeyReadOrReply(redisClient *c, robj *key, robj *reply) {
47 robj *o = lookupKeyRead(c->db, key);
48 if (!o) addReply(c,reply);
49 return o;
50 }
51
52 robj *lookupKeyWriteOrReply(redisClient *c, robj *key, robj *reply) {
53 robj *o = lookupKeyWrite(c->db, key);
54 if (!o) addReply(c,reply);
55 return o;
56 }
57
58 /* Add the key to the DB. If the key already exists REDIS_ERR is returned,
59 * otherwise REDIS_OK is returned, and the caller should increment the
60 * refcount of 'val'. */
61 int dbAdd(redisDb *db, robj *key, robj *val) {
62 /* Perform a lookup before adding the key, as we need to copy the
63 * key value. */
64 if (dictFind(db->dict, key->ptr) != NULL) {
65 return REDIS_ERR;
66 } else {
67 sds copy = sdsdup(key->ptr);
68 dictAdd(db->dict, copy, val);
69 return REDIS_OK;
70 }
71 }
72
73 /* If the key does not exist, this is just like dbAdd(). Otherwise
74 * the value associated to the key is replaced with the new one.
75 *
76 * On update (key already existed) 0 is returned. Otherwise 1. */
77 int dbReplace(redisDb *db, robj *key, robj *val) {
78 if (dictFind(db->dict,key->ptr) == NULL) {
79 sds copy = sdsdup(key->ptr);
80 dictAdd(db->dict, copy, val);
81 return 1;
82 } else {
83 dictReplace(db->dict, key->ptr, val);
84 return 0;
85 }
86 }
87
88 int dbExists(redisDb *db, robj *key) {
89 return dictFind(db->dict,key->ptr) != NULL;
90 }
91
92 /* Return a random key, in form of a Redis object.
93 * If there are no keys, NULL is returned.
94 *
95 * The function makes sure to return keys not already expired. */
96 robj *dbRandomKey(redisDb *db) {
97 struct dictEntry *de;
98
99 while(1) {
100 sds key;
101 robj *keyobj;
102
103 de = dictGetRandomKey(db->dict);
104 if (de == NULL) return NULL;
105
106 key = dictGetEntryKey(de);
107 keyobj = createStringObject(key,sdslen(key));
108 if (dictFind(db->expires,key)) {
109 if (expireIfNeeded(db,keyobj)) {
110 decrRefCount(keyobj);
111 continue; /* search for another key. This expired. */
112 }
113 }
114 return keyobj;
115 }
116 }
117
118 /* Delete a key, value, and associated expiration entry if any, from the DB */
119 int dbDelete(redisDb *db, robj *key) {
120 /* If VM is enabled make sure to awake waiting clients for this key:
121 * deleting the key will kill the I/O thread bringing the key from swap
122 * to memory, so the client will never be notified and unblocked if we
123 * don't do it now. */
124 if (server.ds_enabled) handleClientsBlockedOnSwappedKey(db,key);
125
126 /* FIXME: we need to delete the IO Job loading the key, or simply we can
127 * wait for it to finish. */
128
129 /* Deleting an entry from the expires dict will not free the sds of
130 * the key, because it is shared with the main dictionary. */
131 if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
132 return dictDelete(db->dict,key->ptr) == DICT_OK;
133 }
134
135 /* Empty the whole database */
136 long long emptyDb() {
137 int j;
138 long long removed = 0;
139
140 for (j = 0; j < server.dbnum; j++) {
141 removed += dictSize(server.db[j].dict);
142 dictEmpty(server.db[j].dict);
143 dictEmpty(server.db[j].expires);
144 }
145 return removed;
146 }
147
148 int selectDb(redisClient *c, int id) {
149 if (id < 0 || id >= server.dbnum)
150 return REDIS_ERR;
151 c->db = &server.db[id];
152 return REDIS_OK;
153 }
154
155 /*-----------------------------------------------------------------------------
156 * Hooks for key space changes.
157 *
158 * Every time a key in the database is modified the function
159 * signalModifiedKey() is called.
160 *
161 * Every time a DB is flushed the function signalFlushDb() is called.
162 *----------------------------------------------------------------------------*/
163
164 void signalModifiedKey(redisDb *db, robj *key) {
165 touchWatchedKey(db,key);
166 if (server.ds_enabled)
167 cacheScheduleForFlush(db,key);
168 }
169
170 void signalFlushedDb(int dbid) {
171 touchWatchedKeysOnFlush(dbid);
172 if (server.ds_enabled)
173 dsFlushDb(dbid);
174 }
175
176 /*-----------------------------------------------------------------------------
177 * Type agnostic commands operating on the key space
178 *----------------------------------------------------------------------------*/
179
180 void flushdbCommand(redisClient *c) {
181 server.dirty += dictSize(c->db->dict);
182 signalFlushedDb(c->db->id);
183 dictEmpty(c->db->dict);
184 dictEmpty(c->db->expires);
185 addReply(c,shared.ok);
186 }
187
188 void flushallCommand(redisClient *c) {
189 signalFlushedDb(-1);
190 server.dirty += emptyDb();
191 addReply(c,shared.ok);
192 if (server.bgsavechildpid != -1) {
193 kill(server.bgsavechildpid,SIGKILL);
194 rdbRemoveTempFile(server.bgsavechildpid);
195 }
196 rdbSave(server.dbfilename);
197 server.dirty++;
198 }
199
200 void delCommand(redisClient *c) {
201 int deleted = 0, j;
202
203 for (j = 1; j < c->argc; j++) {
204 if (dbDelete(c->db,c->argv[j])) {
205 signalModifiedKey(c->db,c->argv[j]);
206 server.dirty++;
207 deleted++;
208 }
209 }
210 addReplyLongLong(c,deleted);
211 }
212
213 void existsCommand(redisClient *c) {
214 expireIfNeeded(c->db,c->argv[1]);
215 if (dbExists(c->db,c->argv[1])) {
216 addReply(c, shared.cone);
217 } else {
218 addReply(c, shared.czero);
219 }
220 }
221
222 void selectCommand(redisClient *c) {
223 int id = atoi(c->argv[1]->ptr);
224
225 if (selectDb(c,id) == REDIS_ERR) {
226 addReplyError(c,"invalid DB index");
227 } else {
228 addReply(c,shared.ok);
229 }
230 }
231
232 void randomkeyCommand(redisClient *c) {
233 robj *key;
234
235 if ((key = dbRandomKey(c->db)) == NULL) {
236 addReply(c,shared.nullbulk);
237 return;
238 }
239
240 addReplyBulk(c,key);
241 decrRefCount(key);
242 }
243
244 void keysCommand(redisClient *c) {
245 dictIterator *di;
246 dictEntry *de;
247 sds pattern = c->argv[1]->ptr;
248 int plen = sdslen(pattern), allkeys;
249 unsigned long numkeys = 0;
250 void *replylen = addDeferredMultiBulkLength(c);
251
252 di = dictGetIterator(c->db->dict);
253 allkeys = (pattern[0] == '*' && pattern[1] == '\0');
254 while((de = dictNext(di)) != NULL) {
255 sds key = dictGetEntryKey(de);
256 robj *keyobj;
257
258 if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
259 keyobj = createStringObject(key,sdslen(key));
260 if (expireIfNeeded(c->db,keyobj) == 0) {
261 addReplyBulk(c,keyobj);
262 numkeys++;
263 }
264 decrRefCount(keyobj);
265 }
266 }
267 dictReleaseIterator(di);
268 setDeferredMultiBulkLength(c,replylen,numkeys);
269 }
270
271 void dbsizeCommand(redisClient *c) {
272 addReplyLongLong(c,dictSize(c->db->dict));
273 }
274
275 void lastsaveCommand(redisClient *c) {
276 addReplyLongLong(c,server.lastsave);
277 }
278
279 void typeCommand(redisClient *c) {
280 robj *o;
281 char *type;
282
283 o = lookupKeyRead(c->db,c->argv[1]);
284 if (o == NULL) {
285 type = "none";
286 } else {
287 switch(o->type) {
288 case REDIS_STRING: type = "string"; break;
289 case REDIS_LIST: type = "list"; break;
290 case REDIS_SET: type = "set"; break;
291 case REDIS_ZSET: type = "zset"; break;
292 case REDIS_HASH: type = "hash"; break;
293 default: type = "unknown"; break;
294 }
295 }
296 addReplyStatus(c,type);
297 }
298
299 void saveCommand(redisClient *c) {
300 if (server.bgsavechildpid != -1) {
301 addReplyError(c,"Background save already in progress");
302 return;
303 }
304 if (rdbSave(server.dbfilename) == REDIS_OK) {
305 addReply(c,shared.ok);
306 } else {
307 addReply(c,shared.err);
308 }
309 }
310
311 void bgsaveCommand(redisClient *c) {
312 if (server.bgsavechildpid != -1) {
313 addReplyError(c,"Background save already in progress");
314 return;
315 }
316 if (rdbSaveBackground(server.dbfilename) == REDIS_OK) {
317 addReplyStatus(c,"Background saving started");
318 } else {
319 addReply(c,shared.err);
320 }
321 }
322
323 void shutdownCommand(redisClient *c) {
324 if (prepareForShutdown() == REDIS_OK)
325 exit(0);
326 addReplyError(c,"Errors trying to SHUTDOWN. Check logs.");
327 }
328
329 void renameGenericCommand(redisClient *c, int nx) {
330 robj *o;
331
332 /* To use the same key as src and dst is probably an error */
333 if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) {
334 addReply(c,shared.sameobjecterr);
335 return;
336 }
337
338 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
339 return;
340
341 incrRefCount(o);
342 if (dbAdd(c->db,c->argv[2],o) == REDIS_ERR) {
343 if (nx) {
344 decrRefCount(o);
345 addReply(c,shared.czero);
346 return;
347 }
348 dbReplace(c->db,c->argv[2],o);
349 }
350 dbDelete(c->db,c->argv[1]);
351 signalModifiedKey(c->db,c->argv[1]);
352 signalModifiedKey(c->db,c->argv[2]);
353 server.dirty++;
354 addReply(c,nx ? shared.cone : shared.ok);
355 }
356
357 void renameCommand(redisClient *c) {
358 renameGenericCommand(c,0);
359 }
360
361 void renamenxCommand(redisClient *c) {
362 renameGenericCommand(c,1);
363 }
364
365 void moveCommand(redisClient *c) {
366 robj *o;
367 redisDb *src, *dst;
368 int srcid;
369
370 /* Obtain source and target DB pointers */
371 src = c->db;
372 srcid = c->db->id;
373 if (selectDb(c,atoi(c->argv[2]->ptr)) == REDIS_ERR) {
374 addReply(c,shared.outofrangeerr);
375 return;
376 }
377 dst = c->db;
378 selectDb(c,srcid); /* Back to the source DB */
379
380 /* If the user is moving using as target the same
381 * DB as the source DB it is probably an error. */
382 if (src == dst) {
383 addReply(c,shared.sameobjecterr);
384 return;
385 }
386
387 /* Check if the element exists and get a reference */
388 o = lookupKeyWrite(c->db,c->argv[1]);
389 if (!o) {
390 addReply(c,shared.czero);
391 return;
392 }
393
394 /* Try to add the element to the target DB */
395 if (dbAdd(dst,c->argv[1],o) == REDIS_ERR) {
396 addReply(c,shared.czero);
397 return;
398 }
399 incrRefCount(o);
400
401 /* OK! key moved, free the entry in the source DB */
402 dbDelete(src,c->argv[1]);
403 server.dirty++;
404 addReply(c,shared.cone);
405 }
406
407 /*-----------------------------------------------------------------------------
408 * Expires API
409 *----------------------------------------------------------------------------*/
410
411 int removeExpire(redisDb *db, robj *key) {
412 /* An expire may only be removed if there is a corresponding entry in the
413 * main dict. Otherwise, the key will never be freed. */
414 redisAssert(dictFind(db->dict,key->ptr) != NULL);
415 return dictDelete(db->expires,key->ptr) == DICT_OK;
416 }
417
418 void setExpire(redisDb *db, robj *key, time_t when) {
419 dictEntry *de;
420
421 /* Reuse the sds from the main dict in the expire dict */
422 de = dictFind(db->dict,key->ptr);
423 redisAssert(de != NULL);
424 dictReplace(db->expires,dictGetEntryKey(de),(void*)when);
425 }
426
427 /* Return the expire time of the specified key, or -1 if no expire
428 * is associated with this key (i.e. the key is non volatile) */
429 time_t getExpire(redisDb *db, robj *key) {
430 dictEntry *de;
431
432 /* No expire? return ASAP */
433 if (dictSize(db->expires) == 0 ||
434 (de = dictFind(db->expires,key->ptr)) == NULL) return -1;
435
436 /* The entry was found in the expire dict, this means it should also
437 * be present in the main dict (safety check). */
438 redisAssert(dictFind(db->dict,key->ptr) != NULL);
439 return (time_t) dictGetEntryVal(de);
440 }
441
442 /* Propagate expires into slaves and the AOF file.
443 * When a key expires in the master, a DEL operation for this key is sent
444 * to all the slaves and the AOF file if enabled.
445 *
446 * This way the key expiry is centralized in one place, and since both
447 * AOF and the master->slave link guarantee operation ordering, everything
448 * will be consistent even if we allow write operations against expiring
449 * keys. */
450 void propagateExpire(redisDb *db, robj *key) {
451 robj *argv[2];
452
453 argv[0] = createStringObject("DEL",3);
454 argv[1] = key;
455 incrRefCount(key);
456
457 if (server.appendonly)
458 feedAppendOnlyFile(server.delCommand,db->id,argv,2);
459 if (listLength(server.slaves))
460 replicationFeedSlaves(server.slaves,db->id,argv,2);
461
462 decrRefCount(argv[0]);
463 decrRefCount(argv[1]);
464 }
465
466 int expireIfNeeded(redisDb *db, robj *key) {
467 time_t when = getExpire(db,key);
468
469 /* If we are running in the context of a slave, return ASAP:
470 * the slave key expiration is controlled by the master that will
471 * send us synthesized DEL operations for expired keys.
472 *
473 * Still we try to return the right information to the caller,
474 * that is, 0 if we think the key should be still valid, 1 if
475 * we think the key is expired at this time. */
476 if (server.masterhost != NULL) {
477 return time(NULL) > when;
478 }
479
480 if (when < 0) return 0;
481
482 /* Return when this key has not expired */
483 if (time(NULL) <= when) return 0;
484
485 /* Delete the key */
486 server.stat_expiredkeys++;
487 propagateExpire(db,key);
488 return dbDelete(db,key);
489 }
490
491 /*-----------------------------------------------------------------------------
492 * Expires Commands
493 *----------------------------------------------------------------------------*/
494
495 void expireGenericCommand(redisClient *c, robj *key, robj *param, long offset) {
496 dictEntry *de;
497 long seconds;
498
499 if (getLongFromObjectOrReply(c, param, &seconds, NULL) != REDIS_OK) return;
500
501 seconds -= offset;
502
503 de = dictFind(c->db->dict,key->ptr);
504 if (de == NULL) {
505 addReply(c,shared.czero);
506 return;
507 }
508 if (seconds <= 0) {
509 if (dbDelete(c->db,key)) server.dirty++;
510 addReply(c, shared.cone);
511 signalModifiedKey(c->db,key);
512 return;
513 } else {
514 time_t when = time(NULL)+seconds;
515 setExpire(c->db,key,when);
516 addReply(c,shared.cone);
517 signalModifiedKey(c->db,key);
518 server.dirty++;
519 return;
520 }
521 }
522
523 void expireCommand(redisClient *c) {
524 expireGenericCommand(c,c->argv[1],c->argv[2],0);
525 }
526
527 void expireatCommand(redisClient *c) {
528 expireGenericCommand(c,c->argv[1],c->argv[2],time(NULL));
529 }
530
531 void ttlCommand(redisClient *c) {
532 time_t expire, ttl = -1;
533
534 expire = getExpire(c->db,c->argv[1]);
535 if (expire != -1) {
536 ttl = (expire-time(NULL));
537 if (ttl < 0) ttl = -1;
538 }
539 addReplyLongLong(c,(long long)ttl);
540 }
541
542 void persistCommand(redisClient *c) {
543 dictEntry *de;
544
545 de = dictFind(c->db->dict,c->argv[1]->ptr);
546 if (de == NULL) {
547 addReply(c,shared.czero);
548 } else {
549 if (removeExpire(c->db,c->argv[1])) {
550 addReply(c,shared.cone);
551 server.dirty++;
552 } else {
553 addReply(c,shared.czero);
554 }
555 }
556 }