CONFIG SET/GET support for new automatic AOF rewrite parameters
[redis.git] / src / t_list.c
CommitLineData
e2641e09 1#include "redis.h"
2
3/*-----------------------------------------------------------------------------
4 * List API
5 *----------------------------------------------------------------------------*/
6
7/* Check the argument length to see if it requires us to convert the ziplist
8 * to a real list. Only check raw-encoded objects because integer encoded
9 * objects are never too long. */
10void listTypeTryConversion(robj *subject, robj *value) {
11 if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
12 if (value->encoding == REDIS_ENCODING_RAW &&
13 sdslen(value->ptr) > server.list_max_ziplist_value)
14 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
15}
16
17void listTypePush(robj *subject, robj *value, int where) {
18 /* Check if we need to convert the ziplist */
19 listTypeTryConversion(subject,value);
20 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
21 ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
22 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
23
24 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
25 int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
26 value = getDecodedObject(value);
27 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
28 decrRefCount(value);
29 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
30 if (where == REDIS_HEAD) {
31 listAddNodeHead(subject->ptr,value);
32 } else {
33 listAddNodeTail(subject->ptr,value);
34 }
35 incrRefCount(value);
36 } else {
37 redisPanic("Unknown list encoding");
38 }
39}
40
41robj *listTypePop(robj *subject, int where) {
42 robj *value = NULL;
43 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
44 unsigned char *p;
45 unsigned char *vstr;
46 unsigned int vlen;
47 long long vlong;
48 int pos = (where == REDIS_HEAD) ? 0 : -1;
49 p = ziplistIndex(subject->ptr,pos);
50 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
51 if (vstr) {
52 value = createStringObject((char*)vstr,vlen);
53 } else {
54 value = createStringObjectFromLongLong(vlong);
55 }
56 /* We only need to delete an element when it exists */
57 subject->ptr = ziplistDelete(subject->ptr,&p);
58 }
59 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
60 list *list = subject->ptr;
61 listNode *ln;
62 if (where == REDIS_HEAD) {
63 ln = listFirst(list);
64 } else {
65 ln = listLast(list);
66 }
67 if (ln != NULL) {
68 value = listNodeValue(ln);
69 incrRefCount(value);
70 listDelNode(list,ln);
71 }
72 } else {
73 redisPanic("Unknown list encoding");
74 }
75 return value;
76}
77
78unsigned long listTypeLength(robj *subject) {
79 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
80 return ziplistLen(subject->ptr);
81 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
82 return listLength((list*)subject->ptr);
83 } else {
84 redisPanic("Unknown list encoding");
85 }
86}
87
88/* Initialize an iterator at the specified index. */
89listTypeIterator *listTypeInitIterator(robj *subject, int index, unsigned char direction) {
90 listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
91 li->subject = subject;
92 li->encoding = subject->encoding;
93 li->direction = direction;
94 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
95 li->zi = ziplistIndex(subject->ptr,index);
96 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
97 li->ln = listIndex(subject->ptr,index);
98 } else {
99 redisPanic("Unknown list encoding");
100 }
101 return li;
102}
103
104/* Clean up the iterator. */
105void listTypeReleaseIterator(listTypeIterator *li) {
106 zfree(li);
107}
108
109/* Stores pointer to current the entry in the provided entry structure
110 * and advances the position of the iterator. Returns 1 when the current
111 * entry is in fact an entry, 0 otherwise. */
112int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
113 /* Protect from converting when iterating */
114 redisAssert(li->subject->encoding == li->encoding);
115
116 entry->li = li;
117 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
118 entry->zi = li->zi;
119 if (entry->zi != NULL) {
120 if (li->direction == REDIS_TAIL)
121 li->zi = ziplistNext(li->subject->ptr,li->zi);
122 else
123 li->zi = ziplistPrev(li->subject->ptr,li->zi);
124 return 1;
125 }
126 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
127 entry->ln = li->ln;
128 if (entry->ln != NULL) {
129 if (li->direction == REDIS_TAIL)
130 li->ln = li->ln->next;
131 else
132 li->ln = li->ln->prev;
133 return 1;
134 }
135 } else {
136 redisPanic("Unknown list encoding");
137 }
138 return 0;
139}
140
141/* Return entry or NULL at the current position of the iterator. */
142robj *listTypeGet(listTypeEntry *entry) {
143 listTypeIterator *li = entry->li;
144 robj *value = NULL;
145 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
146 unsigned char *vstr;
147 unsigned int vlen;
148 long long vlong;
149 redisAssert(entry->zi != NULL);
150 if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
151 if (vstr) {
152 value = createStringObject((char*)vstr,vlen);
153 } else {
154 value = createStringObjectFromLongLong(vlong);
155 }
156 }
157 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
158 redisAssert(entry->ln != NULL);
159 value = listNodeValue(entry->ln);
160 incrRefCount(value);
161 } else {
162 redisPanic("Unknown list encoding");
163 }
164 return value;
165}
166
167void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
168 robj *subject = entry->li->subject;
169 if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
170 value = getDecodedObject(value);
171 if (where == REDIS_TAIL) {
172 unsigned char *next = ziplistNext(subject->ptr,entry->zi);
173
174 /* When we insert after the current element, but the current element
175 * is the tail of the list, we need to do a push. */
176 if (next == NULL) {
177 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
178 } else {
179 subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
180 }
181 } else {
182 subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
183 }
184 decrRefCount(value);
185 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
186 if (where == REDIS_TAIL) {
187 listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
188 } else {
189 listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
190 }
191 incrRefCount(value);
192 } else {
193 redisPanic("Unknown list encoding");
194 }
195}
196
197/* Compare the given object with the entry at the current position. */
198int listTypeEqual(listTypeEntry *entry, robj *o) {
199 listTypeIterator *li = entry->li;
200 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
201 redisAssert(o->encoding == REDIS_ENCODING_RAW);
202 return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
203 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
204 return equalStringObjects(o,listNodeValue(entry->ln));
205 } else {
206 redisPanic("Unknown list encoding");
207 }
208}
209
210/* Delete the element pointed to. */
211void listTypeDelete(listTypeEntry *entry) {
212 listTypeIterator *li = entry->li;
213 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
214 unsigned char *p = entry->zi;
215 li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
216
217 /* Update position of the iterator depending on the direction */
218 if (li->direction == REDIS_TAIL)
219 li->zi = p;
220 else
221 li->zi = ziplistPrev(li->subject->ptr,p);
222 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
223 listNode *next;
224 if (li->direction == REDIS_TAIL)
225 next = entry->ln->next;
226 else
227 next = entry->ln->prev;
228 listDelNode(li->subject->ptr,entry->ln);
229 li->ln = next;
230 } else {
231 redisPanic("Unknown list encoding");
232 }
233}
234
235void listTypeConvert(robj *subject, int enc) {
236 listTypeIterator *li;
237 listTypeEntry entry;
238 redisAssert(subject->type == REDIS_LIST);
239
240 if (enc == REDIS_ENCODING_LINKEDLIST) {
241 list *l = listCreate();
242 listSetFreeMethod(l,decrRefCount);
243
244 /* listTypeGet returns a robj with incremented refcount */
245 li = listTypeInitIterator(subject,0,REDIS_TAIL);
246 while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
247 listTypeReleaseIterator(li);
248
249 subject->encoding = REDIS_ENCODING_LINKEDLIST;
250 zfree(subject->ptr);
251 subject->ptr = l;
252 } else {
253 redisPanic("Unsupported list conversion");
254 }
255}
256
257/*-----------------------------------------------------------------------------
258 * List Commands
259 *----------------------------------------------------------------------------*/
260
261void pushGenericCommand(redisClient *c, int where) {
fb2feae5 262 int j, addlen = 0, pushed = 0;
e2641e09 263 robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
fb2feae5 264 int may_have_waiting_clients = (lobj == NULL);
265
266 if (lobj && lobj->type != REDIS_LIST) {
267 addReply(c,shared.wrongtypeerr);
268 return;
269 }
270
271 for (j = 2; j < c->argc; j++) {
272 c->argv[j] = tryObjectEncoding(c->argv[j]);
273 if (may_have_waiting_clients) {
274 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
275 addlen++;
276 continue;
277 } else {
278 may_have_waiting_clients = 0;
279 }
e2641e09 280 }
fb2feae5 281 if (!lobj) {
282 lobj = createZiplistObject();
283 dbAdd(c->db,c->argv[1],lobj);
e2641e09 284 }
fb2feae5 285 listTypePush(lobj,c->argv[j],where);
286 pushed++;
e2641e09 287 }
fb2feae5 288 addReplyLongLong(c,addlen + (lobj ? listTypeLength(lobj) : 0));
289 if (pushed) signalModifiedKey(c->db,c->argv[1]);
290 server.dirty += pushed;
e2641e09 291}
292
293void lpushCommand(redisClient *c) {
294 pushGenericCommand(c,REDIS_HEAD);
295}
296
297void rpushCommand(redisClient *c) {
298 pushGenericCommand(c,REDIS_TAIL);
299}
300
301void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
302 robj *subject;
303 listTypeIterator *iter;
304 listTypeEntry entry;
305 int inserted = 0;
306
307 if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
308 checkType(c,subject,REDIS_LIST)) return;
309
310 if (refval != NULL) {
311 /* Note: we expect refval to be string-encoded because it is *not* the
312 * last argument of the multi-bulk LINSERT. */
313 redisAssert(refval->encoding == REDIS_ENCODING_RAW);
314
315 /* We're not sure if this value can be inserted yet, but we cannot
316 * convert the list inside the iterator. We don't want to loop over
317 * the list twice (once to see if the value can be inserted and once
318 * to do the actual insert), so we assume this value can be inserted
319 * and convert the ziplist to a regular list if necessary. */
320 listTypeTryConversion(subject,val);
321
322 /* Seek refval from head to tail */
323 iter = listTypeInitIterator(subject,0,REDIS_TAIL);
324 while (listTypeNext(iter,&entry)) {
325 if (listTypeEqual(&entry,refval)) {
326 listTypeInsert(&entry,val,where);
327 inserted = 1;
328 break;
329 }
330 }
331 listTypeReleaseIterator(iter);
332
333 if (inserted) {
334 /* Check if the length exceeds the ziplist length threshold. */
335 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
336 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
337 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
cea8c5cd 338 signalModifiedKey(c->db,c->argv[1]);
e2641e09 339 server.dirty++;
340 } else {
341 /* Notify client of a failed insert */
342 addReply(c,shared.cnegone);
343 return;
344 }
345 } else {
346 listTypePush(subject,val,where);
cea8c5cd 347 signalModifiedKey(c->db,c->argv[1]);
e2641e09 348 server.dirty++;
349 }
350
b70d3555 351 addReplyLongLong(c,listTypeLength(subject));
e2641e09 352}
353
354void lpushxCommand(redisClient *c) {
75b41de8 355 c->argv[2] = tryObjectEncoding(c->argv[2]);
e2641e09 356 pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
357}
358
359void rpushxCommand(redisClient *c) {
75b41de8 360 c->argv[2] = tryObjectEncoding(c->argv[2]);
e2641e09 361 pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
362}
363
364void linsertCommand(redisClient *c) {
75b41de8 365 c->argv[4] = tryObjectEncoding(c->argv[4]);
e2641e09 366 if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
367 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
368 } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
369 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD);
370 } else {
371 addReply(c,shared.syntaxerr);
372 }
373}
374
375void llenCommand(redisClient *c) {
376 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
377 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
b70d3555 378 addReplyLongLong(c,listTypeLength(o));
e2641e09 379}
380
381void lindexCommand(redisClient *c) {
382 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
383 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
384 int index = atoi(c->argv[2]->ptr);
385 robj *value = NULL;
386
387 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
388 unsigned char *p;
389 unsigned char *vstr;
390 unsigned int vlen;
391 long long vlong;
392 p = ziplistIndex(o->ptr,index);
393 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
394 if (vstr) {
395 value = createStringObject((char*)vstr,vlen);
396 } else {
397 value = createStringObjectFromLongLong(vlong);
398 }
399 addReplyBulk(c,value);
400 decrRefCount(value);
401 } else {
402 addReply(c,shared.nullbulk);
403 }
404 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
405 listNode *ln = listIndex(o->ptr,index);
406 if (ln != NULL) {
407 value = listNodeValue(ln);
408 addReplyBulk(c,value);
409 } else {
410 addReply(c,shared.nullbulk);
411 }
412 } else {
413 redisPanic("Unknown list encoding");
414 }
415}
416
417void lsetCommand(redisClient *c) {
418 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
419 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
420 int index = atoi(c->argv[2]->ptr);
75b41de8 421 robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
e2641e09 422
423 listTypeTryConversion(o,value);
424 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
425 unsigned char *p, *zl = o->ptr;
426 p = ziplistIndex(zl,index);
427 if (p == NULL) {
428 addReply(c,shared.outofrangeerr);
429 } else {
430 o->ptr = ziplistDelete(o->ptr,&p);
431 value = getDecodedObject(value);
432 o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
433 decrRefCount(value);
434 addReply(c,shared.ok);
cea8c5cd 435 signalModifiedKey(c->db,c->argv[1]);
e2641e09 436 server.dirty++;
437 }
438 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
439 listNode *ln = listIndex(o->ptr,index);
440 if (ln == NULL) {
441 addReply(c,shared.outofrangeerr);
442 } else {
443 decrRefCount((robj*)listNodeValue(ln));
444 listNodeValue(ln) = value;
445 incrRefCount(value);
446 addReply(c,shared.ok);
cea8c5cd 447 signalModifiedKey(c->db,c->argv[1]);
e2641e09 448 server.dirty++;
449 }
450 } else {
451 redisPanic("Unknown list encoding");
452 }
453}
454
455void popGenericCommand(redisClient *c, int where) {
456 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
457 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
458
459 robj *value = listTypePop(o,where);
460 if (value == NULL) {
461 addReply(c,shared.nullbulk);
462 } else {
463 addReplyBulk(c,value);
464 decrRefCount(value);
465 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
cea8c5cd 466 signalModifiedKey(c->db,c->argv[1]);
e2641e09 467 server.dirty++;
468 }
469}
470
471void lpopCommand(redisClient *c) {
472 popGenericCommand(c,REDIS_HEAD);
473}
474
475void rpopCommand(redisClient *c) {
476 popGenericCommand(c,REDIS_TAIL);
477}
478
479void lrangeCommand(redisClient *c) {
d51ebef5 480 robj *o;
e2641e09 481 int start = atoi(c->argv[2]->ptr);
482 int end = atoi(c->argv[3]->ptr);
483 int llen;
d51ebef5 484 int rangelen;
e2641e09 485
486 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
487 || checkType(c,o,REDIS_LIST)) return;
488 llen = listTypeLength(o);
489
490 /* convert negative indexes */
491 if (start < 0) start = llen+start;
492 if (end < 0) end = llen+end;
493 if (start < 0) start = 0;
e2641e09 494
d0a4e24e
PN
495 /* Invariant: start >= 0, so this test will be true when end < 0.
496 * The range is empty when start > end or start >= length. */
e2641e09 497 if (start > end || start >= llen) {
e2641e09 498 addReply(c,shared.emptymultibulk);
499 return;
500 }
501 if (end >= llen) end = llen-1;
502 rangelen = (end-start)+1;
503
504 /* Return the result in form of a multi-bulk reply */
0537e7bf 505 addReplyMultiBulkLen(c,rangelen);
d51ebef5 506 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
507 unsigned char *p = ziplistIndex(o->ptr,start);
508 unsigned char *vstr;
509 unsigned int vlen;
510 long long vlong;
511
512 while(rangelen--) {
513 ziplistGet(p,&vstr,&vlen,&vlong);
514 if (vstr) {
515 addReplyBulkCBuffer(c,vstr,vlen);
516 } else {
517 addReplyBulkLongLong(c,vlong);
518 }
519 p = ziplistNext(o->ptr,p);
520 }
521 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
522 listNode *ln = listIndex(o->ptr,start);
523
524 while(rangelen--) {
525 addReplyBulk(c,ln->value);
526 ln = ln->next;
527 }
528 } else {
529 redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
e2641e09 530 }
e2641e09 531}
532
533void ltrimCommand(redisClient *c) {
534 robj *o;
535 int start = atoi(c->argv[2]->ptr);
536 int end = atoi(c->argv[3]->ptr);
537 int llen;
538 int j, ltrim, rtrim;
539 list *list;
540 listNode *ln;
541
542 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
543 checkType(c,o,REDIS_LIST)) return;
544 llen = listTypeLength(o);
545
546 /* convert negative indexes */
547 if (start < 0) start = llen+start;
548 if (end < 0) end = llen+end;
549 if (start < 0) start = 0;
e2641e09 550
d0a4e24e
PN
551 /* Invariant: start >= 0, so this test will be true when end < 0.
552 * The range is empty when start > end or start >= length. */
e2641e09 553 if (start > end || start >= llen) {
554 /* Out of range start or start > end result in empty list */
555 ltrim = llen;
556 rtrim = 0;
557 } else {
558 if (end >= llen) end = llen-1;
559 ltrim = start;
560 rtrim = llen-end-1;
561 }
562
563 /* Remove list elements to perform the trim */
564 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
565 o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
566 o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
567 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
568 list = o->ptr;
569 for (j = 0; j < ltrim; j++) {
570 ln = listFirst(list);
571 listDelNode(list,ln);
572 }
573 for (j = 0; j < rtrim; j++) {
574 ln = listLast(list);
575 listDelNode(list,ln);
576 }
577 } else {
578 redisPanic("Unknown list encoding");
579 }
580 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
cea8c5cd 581 signalModifiedKey(c->db,c->argv[1]);
e2641e09 582 server.dirty++;
583 addReply(c,shared.ok);
584}
585
586void lremCommand(redisClient *c) {
75b41de8
PN
587 robj *subject, *obj;
588 obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
e2641e09 589 int toremove = atoi(c->argv[2]->ptr);
590 int removed = 0;
591 listTypeEntry entry;
592
593 subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
594 if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
595
596 /* Make sure obj is raw when we're dealing with a ziplist */
597 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
598 obj = getDecodedObject(obj);
599
600 listTypeIterator *li;
601 if (toremove < 0) {
602 toremove = -toremove;
603 li = listTypeInitIterator(subject,-1,REDIS_HEAD);
604 } else {
605 li = listTypeInitIterator(subject,0,REDIS_TAIL);
606 }
607
608 while (listTypeNext(li,&entry)) {
609 if (listTypeEqual(&entry,obj)) {
610 listTypeDelete(&entry);
611 server.dirty++;
612 removed++;
613 if (toremove && removed == toremove) break;
614 }
615 }
616 listTypeReleaseIterator(li);
617
618 /* Clean up raw encoded object */
619 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
620 decrRefCount(obj);
621
622 if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
b70d3555 623 addReplyLongLong(c,removed);
cea8c5cd 624 if (removed) signalModifiedKey(c->db,c->argv[1]);
e2641e09 625}
626
627/* This is the semantic of this command:
628 * RPOPLPUSH srclist dstlist:
ac06fc01
PN
629 * IF LLEN(srclist) > 0
630 * element = RPOP srclist
631 * LPUSH dstlist element
632 * RETURN element
633 * ELSE
634 * RETURN nil
635 * END
e2641e09 636 * END
637 *
638 * The idea is to be able to get an element from a list in a reliable way
639 * since the element is not just returned but pushed against another list
640 * as well. This command was originally proposed by Ezra Zygmuntowicz.
641 */
ac06fc01
PN
642
643void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
644 if (!handleClientsWaitingListPush(c,dstkey,value)) {
645 /* Create the list if the key does not exist */
646 if (!dstobj) {
647 dstobj = createZiplistObject();
648 dbAdd(c->db,dstkey,dstobj);
649 } else {
cea8c5cd 650 signalModifiedKey(c->db,dstkey);
ac06fc01
PN
651 server.dirty++;
652 }
653 listTypePush(dstobj,value,REDIS_HEAD);
654 }
655
656 /* Always send the pushed value to the client. */
657 addReplyBulk(c,value);
658}
659
8a979f03 660void rpoplpushCommand(redisClient *c) {
e2641e09 661 robj *sobj, *value;
662 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
663 checkType(c,sobj,REDIS_LIST)) return;
664
665 if (listTypeLength(sobj) == 0) {
666 addReply(c,shared.nullbulk);
667 } else {
668 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
669 if (dobj && checkType(c,dobj,REDIS_LIST)) return;
670 value = listTypePop(sobj,REDIS_TAIL);
ac06fc01 671 rpoplpushHandlePush(c,c->argv[2],dobj,value);
e2641e09 672
673 /* listTypePop returns an object with its refcount incremented */
674 decrRefCount(value);
675
676 /* Delete the source list when it is empty */
677 if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
cea8c5cd 678 signalModifiedKey(c->db,c->argv[1]);
e2641e09 679 server.dirty++;
680 }
681}
682
683/*-----------------------------------------------------------------------------
684 * Blocking POP operations
685 *----------------------------------------------------------------------------*/
686
687/* Currently Redis blocking operations support is limited to list POP ops,
688 * so the current implementation is not fully generic, but it is also not
689 * completely specific so it will not require a rewrite to support new
690 * kind of blocking operations in the future.
691 *
692 * Still it's important to note that list blocking operations can be already
693 * used as a notification mechanism in order to implement other blocking
694 * operations at application level, so there must be a very strong evidence
695 * of usefulness and generality before new blocking operations are implemented.
696 *
697 * This is how the current blocking POP works, we use BLPOP as example:
698 * - If the user calls BLPOP and the key exists and contains a non empty list
699 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
700 * if there is not to block.
701 * - If instead BLPOP is called and the key does not exists or the list is
702 * empty we need to block. In order to do so we remove the notification for
703 * new data to read in the client socket (so that we'll not serve new
704 * requests if the blocking request is not served). Also we put the client
705 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
706 * blocking for this keys.
707 * - If a PUSH operation against a key with blocked clients waiting is
708 * performed, we serve the first in the list: basically instead to push
709 * the new element inside the list we return it to the (first / oldest)
710 * blocking client, unblock the client, and remove it form the list.
711 *
712 * The above comment and the source code should be enough in order to understand
713 * the implementation and modify / fix it later.
714 */
715
716/* Set a client in blocking mode for the specified key, with the specified
717 * timeout */
ba3b4741 718void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
e2641e09 719 dictEntry *de;
720 list *l;
721 int j;
722
e3c51c4b
DJMM
723 c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
724 c->bpop.count = numkeys;
725 c->bpop.timeout = timeout;
726 c->bpop.target = target;
ba3b4741
DJMM
727
728 if (target != NULL) {
ecf94014 729 incrRefCount(target);
ba3b4741
DJMM
730 }
731
e2641e09 732 for (j = 0; j < numkeys; j++) {
733 /* Add the key in the client structure, to map clients -> keys */
e3c51c4b 734 c->bpop.keys[j] = keys[j];
e2641e09 735 incrRefCount(keys[j]);
736
737 /* And in the other "side", to map keys -> clients */
738 de = dictFind(c->db->blocking_keys,keys[j]);
739 if (de == NULL) {
740 int retval;
741
742 /* For every key we take a list of clients blocked for it */
743 l = listCreate();
744 retval = dictAdd(c->db->blocking_keys,keys[j],l);
745 incrRefCount(keys[j]);
746 redisAssert(retval == DICT_OK);
747 } else {
748 l = dictGetEntryVal(de);
749 }
750 listAddNodeTail(l,c);
751 }
752 /* Mark the client as a blocked client */
753 c->flags |= REDIS_BLOCKED;
5fa95ad7 754 server.bpop_blocked_clients++;
e2641e09 755}
756
757/* Unblock a client that's waiting in a blocking operation such as BLPOP */
758void unblockClientWaitingData(redisClient *c) {
759 dictEntry *de;
760 list *l;
761 int j;
762
e3c51c4b 763 redisAssert(c->bpop.keys != NULL);
e2641e09 764 /* The client may wait for multiple keys, so unblock it for every key. */
e3c51c4b 765 for (j = 0; j < c->bpop.count; j++) {
e2641e09 766 /* Remove this client from the list of clients waiting for this key. */
e3c51c4b 767 de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
e2641e09 768 redisAssert(de != NULL);
769 l = dictGetEntryVal(de);
770 listDelNode(l,listSearchKey(l,c));
771 /* If the list is empty we need to remove it to avoid wasting memory */
772 if (listLength(l) == 0)
e3c51c4b
DJMM
773 dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
774 decrRefCount(c->bpop.keys[j]);
e2641e09 775 }
ba3b4741 776
e2641e09 777 /* Cleanup the client structure */
e3c51c4b
DJMM
778 zfree(c->bpop.keys);
779 c->bpop.keys = NULL;
780 c->bpop.target = NULL;
3bcffcbe
PN
781 c->flags &= ~REDIS_BLOCKED;
782 c->flags |= REDIS_UNBLOCKED;
5fa95ad7 783 server.bpop_blocked_clients--;
a4ce7581 784 listAddNodeTail(server.unblocked_clients,c);
e2641e09 785}
786
787/* This should be called from any function PUSHing into lists.
788 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
789 * 'ele' is the element pushed.
790 *
791 * If the function returns 0 there was no client waiting for a list push
792 * against this key.
793 *
794 * If the function returns 1 there was a client waiting for a list push
795 * against this key, the element was passed to this client thus it's not
796 * needed to actually add it to the list and the caller should return asap. */
797int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
798 struct dictEntry *de;
799 redisClient *receiver;
8a88c368
PN
800 int numclients;
801 list *clients;
e2641e09 802 listNode *ln;
8a88c368 803 robj *dstkey, *dstobj;
e2641e09 804
805 de = dictFind(c->db->blocking_keys,key);
806 if (de == NULL) return 0;
8a88c368
PN
807 clients = dictGetEntryVal(de);
808 numclients = listLength(clients);
809
810 /* Try to handle the push as long as there are clients waiting for a push.
811 * Note that "numclients" is used because the list of clients waiting for a
812 * push on "key" is deleted by unblockClient() when empty.
813 *
814 * This loop will have more than 1 iteration when there is a BRPOPLPUSH
815 * that cannot push the target list because it does not contain a list. If
816 * this happens, it simply tries the next client waiting for a push. */
817 while (numclients--) {
818 ln = listFirst(clients);
819 redisAssert(ln != NULL);
820 receiver = ln->value;
821 dstkey = receiver->bpop.target;
822
823 /* This should remove the first element of the "clients" list. */
824 unblockClientWaitingData(receiver);
8a88c368
PN
825
826 if (dstkey == NULL) {
827 /* BRPOP/BLPOP */
828 addReplyMultiBulkLen(receiver,2);
829 addReplyBulk(receiver,key);
830 addReplyBulk(receiver,ele);
831 return 1;
832 } else {
554a5dd2 833 /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
8a88c368
PN
834 dstobj = lookupKeyWrite(receiver->db,dstkey);
835 if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
836 decrRefCount(dstkey);
837 } else {
838 rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
839 decrRefCount(dstkey);
840 return 1;
841 }
842 }
b2a7fd0c 843 }
e2641e09 844
8a88c368 845 return 0;
e2641e09 846}
847
c8a0070a
PN
848int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
849 long tval;
59bd44d1 850
c8a0070a
PN
851 if (getLongFromObjectOrReply(c,object,&tval,
852 "timeout is not an integer or out of range") != REDIS_OK)
59bd44d1 853 return REDIS_ERR;
59bd44d1 854
c8a0070a
PN
855 if (tval < 0) {
856 addReplyError(c,"timeout is negative");
59bd44d1
DJMM
857 return REDIS_ERR;
858 }
859
c8a0070a
PN
860 if (tval > 0) tval += time(NULL);
861 *timeout = tval;
59bd44d1
DJMM
862
863 return REDIS_OK;
e2641e09 864}
865
866/* Blocking RPOP/LPOP */
867void blockingPopGenericCommand(redisClient *c, int where) {
868 robj *o;
869 time_t timeout;
870 int j;
871
c8a0070a 872 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
94364d53 873 return;
94364d53 874
e2641e09 875 for (j = 1; j < c->argc-1; j++) {
876 o = lookupKeyWrite(c->db,c->argv[j]);
877 if (o != NULL) {
878 if (o->type != REDIS_LIST) {
879 addReply(c,shared.wrongtypeerr);
880 return;
881 } else {
882 if (listTypeLength(o) != 0) {
883 /* If the list contains elements fall back to the usual
884 * non-blocking POP operation */
885 robj *argv[2], **orig_argv;
886 int orig_argc;
887
888 /* We need to alter the command arguments before to call
889 * popGenericCommand() as the command takes a single key. */
890 orig_argv = c->argv;
891 orig_argc = c->argc;
892 argv[1] = c->argv[j];
893 c->argv = argv;
894 c->argc = 2;
895
896 /* Also the return value is different, we need to output
897 * the multi bulk reply header and the key name. The
898 * "real" command will add the last element (the value)
899 * for us. If this souds like an hack to you it's just
900 * because it is... */
0537e7bf 901 addReplyMultiBulkLen(c,2);
e2641e09 902 addReplyBulk(c,argv[1]);
ba3b4741 903
e2641e09 904 popGenericCommand(c,where);
905
906 /* Fix the client structure with the original stuff */
907 c->argv = orig_argv;
908 c->argc = orig_argc;
b2a7fd0c 909
e2641e09 910 return;
911 }
912 }
913 }
914 }
94364d53 915
fb92ecec 916 /* If we are inside a MULTI/EXEC and the list is empty the only thing
917 * we can do is treating it as a timeout (even with timeout 0). */
918 if (c->flags & REDIS_MULTI) {
919 addReply(c,shared.nullmultibulk);
920 return;
921 }
922
e2641e09 923 /* If the list is empty or the key does not exists we must block */
ba3b4741 924 blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
e2641e09 925}
926
927void blpopCommand(redisClient *c) {
928 blockingPopGenericCommand(c,REDIS_HEAD);
929}
930
931void brpopCommand(redisClient *c) {
932 blockingPopGenericCommand(c,REDIS_TAIL);
933}
b2a7fd0c
DJMM
934
935void brpoplpushCommand(redisClient *c) {
ba3b4741 936 time_t timeout;
b2a7fd0c 937
c8a0070a 938 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
ba3b4741 939 return;
ba3b4741
DJMM
940
941 robj *key = lookupKeyWrite(c->db, c->argv[1]);
942
ba3b4741 943 if (key == NULL) {
ba3b4741 944 if (c->flags & REDIS_MULTI) {
7c25a43a
DJMM
945
946 /* Blocking against an empty list in a multi state
947 * returns immediately. */
d5870d7a 948 addReply(c, shared.nullbulk);
ba3b4741 949 } else {
7c25a43a 950 /* The list is empty and the client blocks. */
ba3b4741
DJMM
951 blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
952 }
ba3b4741 953 } else {
7c25a43a
DJMM
954 if (key->type != REDIS_LIST) {
955 addReply(c, shared.wrongtypeerr);
956 } else {
957
958 /* The list exists and has elements, so
959 * the regular rpoplpushCommand is executed. */
960 redisAssert(listTypeLength(key) > 0);
961 rpoplpushCommand(c);
962 }
ba3b4741 963 }
b2a7fd0c 964}