]> git.saurik.com Git - redis.git/blame - src/t_list.c
Sentinel: when leader in wait-start, sense another leader as race.
[redis.git] / src / t_list.c
CommitLineData
e2641e09 1#include "redis.h"
2
f444e2af 3void signalListAsReady(redisClient *c, robj *key);
4
e2641e09 5/*-----------------------------------------------------------------------------
6 * List API
7 *----------------------------------------------------------------------------*/
8
9/* Check the argument length to see if it requires us to convert the ziplist
10 * to a real list. Only check raw-encoded objects because integer encoded
11 * objects are never too long. */
12void listTypeTryConversion(robj *subject, robj *value) {
13 if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
14 if (value->encoding == REDIS_ENCODING_RAW &&
15 sdslen(value->ptr) > server.list_max_ziplist_value)
16 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
17}
18
f444e2af 19/* The function pushes an elmenet to the specified list object 'subject',
20 * at head or tail position as specified by 'where'.
21 *
22 * There is no need for the caller to incremnet the refcount of 'value' as
23 * the function takes care of it if needed. */
e2641e09 24void listTypePush(robj *subject, robj *value, int where) {
25 /* Check if we need to convert the ziplist */
26 listTypeTryConversion(subject,value);
27 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
28 ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
29 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
30
31 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
32 int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
33 value = getDecodedObject(value);
34 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
35 decrRefCount(value);
36 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
37 if (where == REDIS_HEAD) {
38 listAddNodeHead(subject->ptr,value);
39 } else {
40 listAddNodeTail(subject->ptr,value);
41 }
42 incrRefCount(value);
43 } else {
44 redisPanic("Unknown list encoding");
45 }
46}
47
48robj *listTypePop(robj *subject, int where) {
49 robj *value = NULL;
50 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
51 unsigned char *p;
52 unsigned char *vstr;
53 unsigned int vlen;
54 long long vlong;
55 int pos = (where == REDIS_HEAD) ? 0 : -1;
56 p = ziplistIndex(subject->ptr,pos);
57 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
58 if (vstr) {
59 value = createStringObject((char*)vstr,vlen);
60 } else {
61 value = createStringObjectFromLongLong(vlong);
62 }
63 /* We only need to delete an element when it exists */
64 subject->ptr = ziplistDelete(subject->ptr,&p);
65 }
66 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
67 list *list = subject->ptr;
68 listNode *ln;
69 if (where == REDIS_HEAD) {
70 ln = listFirst(list);
71 } else {
72 ln = listLast(list);
73 }
74 if (ln != NULL) {
75 value = listNodeValue(ln);
76 incrRefCount(value);
77 listDelNode(list,ln);
78 }
79 } else {
80 redisPanic("Unknown list encoding");
81 }
82 return value;
83}
84
85unsigned long listTypeLength(robj *subject) {
86 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
87 return ziplistLen(subject->ptr);
88 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
89 return listLength((list*)subject->ptr);
90 } else {
91 redisPanic("Unknown list encoding");
92 }
93}
94
95/* Initialize an iterator at the specified index. */
3c08fdae 96listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) {
e2641e09 97 listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
98 li->subject = subject;
99 li->encoding = subject->encoding;
100 li->direction = direction;
101 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
102 li->zi = ziplistIndex(subject->ptr,index);
103 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
104 li->ln = listIndex(subject->ptr,index);
105 } else {
106 redisPanic("Unknown list encoding");
107 }
108 return li;
109}
110
111/* Clean up the iterator. */
112void listTypeReleaseIterator(listTypeIterator *li) {
113 zfree(li);
114}
115
116/* Stores pointer to current the entry in the provided entry structure
117 * and advances the position of the iterator. Returns 1 when the current
118 * entry is in fact an entry, 0 otherwise. */
119int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
120 /* Protect from converting when iterating */
121 redisAssert(li->subject->encoding == li->encoding);
122
123 entry->li = li;
124 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
125 entry->zi = li->zi;
126 if (entry->zi != NULL) {
127 if (li->direction == REDIS_TAIL)
128 li->zi = ziplistNext(li->subject->ptr,li->zi);
129 else
130 li->zi = ziplistPrev(li->subject->ptr,li->zi);
131 return 1;
132 }
133 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
134 entry->ln = li->ln;
135 if (entry->ln != NULL) {
136 if (li->direction == REDIS_TAIL)
137 li->ln = li->ln->next;
138 else
139 li->ln = li->ln->prev;
140 return 1;
141 }
142 } else {
143 redisPanic("Unknown list encoding");
144 }
145 return 0;
146}
147
148/* Return entry or NULL at the current position of the iterator. */
149robj *listTypeGet(listTypeEntry *entry) {
150 listTypeIterator *li = entry->li;
151 robj *value = NULL;
152 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
153 unsigned char *vstr;
154 unsigned int vlen;
155 long long vlong;
156 redisAssert(entry->zi != NULL);
157 if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
158 if (vstr) {
159 value = createStringObject((char*)vstr,vlen);
160 } else {
161 value = createStringObjectFromLongLong(vlong);
162 }
163 }
164 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
165 redisAssert(entry->ln != NULL);
166 value = listNodeValue(entry->ln);
167 incrRefCount(value);
168 } else {
169 redisPanic("Unknown list encoding");
170 }
171 return value;
172}
173
174void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
175 robj *subject = entry->li->subject;
176 if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
177 value = getDecodedObject(value);
178 if (where == REDIS_TAIL) {
179 unsigned char *next = ziplistNext(subject->ptr,entry->zi);
180
181 /* When we insert after the current element, but the current element
182 * is the tail of the list, we need to do a push. */
183 if (next == NULL) {
184 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
185 } else {
186 subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
187 }
188 } else {
189 subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
190 }
191 decrRefCount(value);
192 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
193 if (where == REDIS_TAIL) {
194 listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
195 } else {
196 listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
197 }
198 incrRefCount(value);
199 } else {
200 redisPanic("Unknown list encoding");
201 }
202}
203
204/* Compare the given object with the entry at the current position. */
205int listTypeEqual(listTypeEntry *entry, robj *o) {
206 listTypeIterator *li = entry->li;
207 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
eab0e26e 208 redisAssertWithInfo(NULL,o,o->encoding == REDIS_ENCODING_RAW);
e2641e09 209 return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
210 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
211 return equalStringObjects(o,listNodeValue(entry->ln));
212 } else {
213 redisPanic("Unknown list encoding");
214 }
215}
216
217/* Delete the element pointed to. */
218void listTypeDelete(listTypeEntry *entry) {
219 listTypeIterator *li = entry->li;
220 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
221 unsigned char *p = entry->zi;
222 li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
223
224 /* Update position of the iterator depending on the direction */
225 if (li->direction == REDIS_TAIL)
226 li->zi = p;
227 else
228 li->zi = ziplistPrev(li->subject->ptr,p);
229 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
230 listNode *next;
231 if (li->direction == REDIS_TAIL)
232 next = entry->ln->next;
233 else
234 next = entry->ln->prev;
235 listDelNode(li->subject->ptr,entry->ln);
236 li->ln = next;
237 } else {
238 redisPanic("Unknown list encoding");
239 }
240}
241
242void listTypeConvert(robj *subject, int enc) {
243 listTypeIterator *li;
244 listTypeEntry entry;
eab0e26e 245 redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
e2641e09 246
247 if (enc == REDIS_ENCODING_LINKEDLIST) {
248 list *l = listCreate();
249 listSetFreeMethod(l,decrRefCount);
250
251 /* listTypeGet returns a robj with incremented refcount */
252 li = listTypeInitIterator(subject,0,REDIS_TAIL);
253 while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
254 listTypeReleaseIterator(li);
255
256 subject->encoding = REDIS_ENCODING_LINKEDLIST;
257 zfree(subject->ptr);
258 subject->ptr = l;
259 } else {
260 redisPanic("Unsupported list conversion");
261 }
262}
263
264/*-----------------------------------------------------------------------------
265 * List Commands
266 *----------------------------------------------------------------------------*/
267
268void pushGenericCommand(redisClient *c, int where) {
edba65d0 269 int j, waiting = 0, pushed = 0;
e2641e09 270 robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
fb2feae5 271 int may_have_waiting_clients = (lobj == NULL);
272
273 if (lobj && lobj->type != REDIS_LIST) {
274 addReply(c,shared.wrongtypeerr);
275 return;
276 }
277
f444e2af 278 if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
279
fb2feae5 280 for (j = 2; j < c->argc; j++) {
281 c->argv[j] = tryObjectEncoding(c->argv[j]);
fb2feae5 282 if (!lobj) {
283 lobj = createZiplistObject();
284 dbAdd(c->db,c->argv[1],lobj);
e2641e09 285 }
fb2feae5 286 listTypePush(lobj,c->argv[j],where);
287 pushed++;
e2641e09 288 }
edba65d0 289 addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
fb2feae5 290 if (pushed) signalModifiedKey(c->db,c->argv[1]);
291 server.dirty += pushed;
e2641e09 292}
293
294void lpushCommand(redisClient *c) {
295 pushGenericCommand(c,REDIS_HEAD);
296}
297
298void rpushCommand(redisClient *c) {
299 pushGenericCommand(c,REDIS_TAIL);
300}
301
302void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
303 robj *subject;
304 listTypeIterator *iter;
305 listTypeEntry entry;
306 int inserted = 0;
307
308 if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
309 checkType(c,subject,REDIS_LIST)) return;
310
311 if (refval != NULL) {
312 /* Note: we expect refval to be string-encoded because it is *not* the
313 * last argument of the multi-bulk LINSERT. */
eab0e26e 314 redisAssertWithInfo(c,refval,refval->encoding == REDIS_ENCODING_RAW);
e2641e09 315
316 /* We're not sure if this value can be inserted yet, but we cannot
317 * convert the list inside the iterator. We don't want to loop over
318 * the list twice (once to see if the value can be inserted and once
319 * to do the actual insert), so we assume this value can be inserted
320 * and convert the ziplist to a regular list if necessary. */
321 listTypeTryConversion(subject,val);
322
323 /* Seek refval from head to tail */
324 iter = listTypeInitIterator(subject,0,REDIS_TAIL);
325 while (listTypeNext(iter,&entry)) {
326 if (listTypeEqual(&entry,refval)) {
327 listTypeInsert(&entry,val,where);
328 inserted = 1;
329 break;
330 }
331 }
332 listTypeReleaseIterator(iter);
333
334 if (inserted) {
335 /* Check if the length exceeds the ziplist length threshold. */
336 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
337 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
338 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
cea8c5cd 339 signalModifiedKey(c->db,c->argv[1]);
e2641e09 340 server.dirty++;
341 } else {
342 /* Notify client of a failed insert */
343 addReply(c,shared.cnegone);
344 return;
345 }
346 } else {
347 listTypePush(subject,val,where);
cea8c5cd 348 signalModifiedKey(c->db,c->argv[1]);
e2641e09 349 server.dirty++;
350 }
351
b70d3555 352 addReplyLongLong(c,listTypeLength(subject));
e2641e09 353}
354
355void lpushxCommand(redisClient *c) {
75b41de8 356 c->argv[2] = tryObjectEncoding(c->argv[2]);
e2641e09 357 pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
358}
359
360void rpushxCommand(redisClient *c) {
75b41de8 361 c->argv[2] = tryObjectEncoding(c->argv[2]);
e2641e09 362 pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
363}
364
365void linsertCommand(redisClient *c) {
75b41de8 366 c->argv[4] = tryObjectEncoding(c->argv[4]);
e2641e09 367 if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
368 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
369 } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
370 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD);
371 } else {
372 addReply(c,shared.syntaxerr);
373 }
374}
375
376void llenCommand(redisClient *c) {
377 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
378 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
b70d3555 379 addReplyLongLong(c,listTypeLength(o));
e2641e09 380}
381
382void lindexCommand(redisClient *c) {
383 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
384 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
706b32e0 385 long index;
e2641e09 386 robj *value = NULL;
387
706b32e0
B
388 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
389 return;
390
e2641e09 391 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
392 unsigned char *p;
393 unsigned char *vstr;
394 unsigned int vlen;
395 long long vlong;
396 p = ziplistIndex(o->ptr,index);
397 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
398 if (vstr) {
399 value = createStringObject((char*)vstr,vlen);
400 } else {
401 value = createStringObjectFromLongLong(vlong);
402 }
403 addReplyBulk(c,value);
404 decrRefCount(value);
405 } else {
406 addReply(c,shared.nullbulk);
407 }
408 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
409 listNode *ln = listIndex(o->ptr,index);
410 if (ln != NULL) {
411 value = listNodeValue(ln);
412 addReplyBulk(c,value);
413 } else {
414 addReply(c,shared.nullbulk);
415 }
416 } else {
417 redisPanic("Unknown list encoding");
418 }
419}
420
421void lsetCommand(redisClient *c) {
422 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
423 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
706b32e0 424 long index;
75b41de8 425 robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
e2641e09 426
706b32e0
B
427 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
428 return;
429
e2641e09 430 listTypeTryConversion(o,value);
431 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
432 unsigned char *p, *zl = o->ptr;
433 p = ziplistIndex(zl,index);
434 if (p == NULL) {
435 addReply(c,shared.outofrangeerr);
436 } else {
437 o->ptr = ziplistDelete(o->ptr,&p);
438 value = getDecodedObject(value);
439 o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
440 decrRefCount(value);
441 addReply(c,shared.ok);
cea8c5cd 442 signalModifiedKey(c->db,c->argv[1]);
e2641e09 443 server.dirty++;
444 }
445 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
446 listNode *ln = listIndex(o->ptr,index);
447 if (ln == NULL) {
448 addReply(c,shared.outofrangeerr);
449 } else {
450 decrRefCount((robj*)listNodeValue(ln));
451 listNodeValue(ln) = value;
452 incrRefCount(value);
453 addReply(c,shared.ok);
cea8c5cd 454 signalModifiedKey(c->db,c->argv[1]);
e2641e09 455 server.dirty++;
456 }
457 } else {
458 redisPanic("Unknown list encoding");
459 }
460}
461
462void popGenericCommand(redisClient *c, int where) {
463 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
464 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
465
466 robj *value = listTypePop(o,where);
467 if (value == NULL) {
468 addReply(c,shared.nullbulk);
469 } else {
470 addReplyBulk(c,value);
471 decrRefCount(value);
472 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
cea8c5cd 473 signalModifiedKey(c->db,c->argv[1]);
e2641e09 474 server.dirty++;
475 }
476}
477
478void lpopCommand(redisClient *c) {
479 popGenericCommand(c,REDIS_HEAD);
480}
481
482void rpopCommand(redisClient *c) {
483 popGenericCommand(c,REDIS_TAIL);
484}
485
486void lrangeCommand(redisClient *c) {
d51ebef5 487 robj *o;
3c08fdae 488 long start, end, llen, rangelen;
e2641e09 489
706b32e0
B
490 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
491 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
492
e2641e09 493 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
494 || checkType(c,o,REDIS_LIST)) return;
495 llen = listTypeLength(o);
496
497 /* convert negative indexes */
498 if (start < 0) start = llen+start;
499 if (end < 0) end = llen+end;
500 if (start < 0) start = 0;
e2641e09 501
d0a4e24e
PN
502 /* Invariant: start >= 0, so this test will be true when end < 0.
503 * The range is empty when start > end or start >= length. */
e2641e09 504 if (start > end || start >= llen) {
e2641e09 505 addReply(c,shared.emptymultibulk);
506 return;
507 }
508 if (end >= llen) end = llen-1;
509 rangelen = (end-start)+1;
510
511 /* Return the result in form of a multi-bulk reply */
0537e7bf 512 addReplyMultiBulkLen(c,rangelen);
d51ebef5 513 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
514 unsigned char *p = ziplistIndex(o->ptr,start);
515 unsigned char *vstr;
516 unsigned int vlen;
517 long long vlong;
518
519 while(rangelen--) {
520 ziplistGet(p,&vstr,&vlen,&vlong);
521 if (vstr) {
522 addReplyBulkCBuffer(c,vstr,vlen);
523 } else {
524 addReplyBulkLongLong(c,vlong);
525 }
526 p = ziplistNext(o->ptr,p);
527 }
528 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
7cfeb8cc 529 listNode *ln;
530
531 /* If we are nearest to the end of the list, reach the element
532 * starting from tail and going backward, as it is faster. */
533 if (start > llen/2) start -= llen;
534 ln = listIndex(o->ptr,start);
d51ebef5 535
536 while(rangelen--) {
537 addReplyBulk(c,ln->value);
538 ln = ln->next;
539 }
540 } else {
541 redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
e2641e09 542 }
e2641e09 543}
544
545void ltrimCommand(redisClient *c) {
546 robj *o;
3c08fdae 547 long start, end, llen, j, ltrim, rtrim;
e2641e09 548 list *list;
549 listNode *ln;
550
706b32e0
B
551 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
552 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
553
e2641e09 554 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
555 checkType(c,o,REDIS_LIST)) return;
556 llen = listTypeLength(o);
557
558 /* convert negative indexes */
559 if (start < 0) start = llen+start;
560 if (end < 0) end = llen+end;
561 if (start < 0) start = 0;
e2641e09 562
d0a4e24e
PN
563 /* Invariant: start >= 0, so this test will be true when end < 0.
564 * The range is empty when start > end or start >= length. */
e2641e09 565 if (start > end || start >= llen) {
566 /* Out of range start or start > end result in empty list */
567 ltrim = llen;
568 rtrim = 0;
569 } else {
570 if (end >= llen) end = llen-1;
571 ltrim = start;
572 rtrim = llen-end-1;
573 }
574
575 /* Remove list elements to perform the trim */
576 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
577 o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
578 o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
579 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
580 list = o->ptr;
581 for (j = 0; j < ltrim; j++) {
582 ln = listFirst(list);
583 listDelNode(list,ln);
584 }
585 for (j = 0; j < rtrim; j++) {
586 ln = listLast(list);
587 listDelNode(list,ln);
588 }
589 } else {
590 redisPanic("Unknown list encoding");
591 }
592 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
cea8c5cd 593 signalModifiedKey(c->db,c->argv[1]);
e2641e09 594 server.dirty++;
595 addReply(c,shared.ok);
596}
597
598void lremCommand(redisClient *c) {
75b41de8
PN
599 robj *subject, *obj;
600 obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
706b32e0 601 long toremove;
3c08fdae 602 long removed = 0;
e2641e09 603 listTypeEntry entry;
604
706b32e0
B
605 if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
606 return;
607
e2641e09 608 subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
609 if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
610
611 /* Make sure obj is raw when we're dealing with a ziplist */
612 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
613 obj = getDecodedObject(obj);
614
615 listTypeIterator *li;
616 if (toremove < 0) {
617 toremove = -toremove;
618 li = listTypeInitIterator(subject,-1,REDIS_HEAD);
619 } else {
620 li = listTypeInitIterator(subject,0,REDIS_TAIL);
621 }
622
623 while (listTypeNext(li,&entry)) {
624 if (listTypeEqual(&entry,obj)) {
625 listTypeDelete(&entry);
626 server.dirty++;
627 removed++;
628 if (toremove && removed == toremove) break;
629 }
630 }
631 listTypeReleaseIterator(li);
632
633 /* Clean up raw encoded object */
634 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
635 decrRefCount(obj);
636
637 if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
b70d3555 638 addReplyLongLong(c,removed);
cea8c5cd 639 if (removed) signalModifiedKey(c->db,c->argv[1]);
e2641e09 640}
641
642/* This is the semantic of this command:
643 * RPOPLPUSH srclist dstlist:
ac06fc01
PN
644 * IF LLEN(srclist) > 0
645 * element = RPOP srclist
646 * LPUSH dstlist element
647 * RETURN element
648 * ELSE
649 * RETURN nil
650 * END
e2641e09 651 * END
652 *
653 * The idea is to be able to get an element from a list in a reliable way
654 * since the element is not just returned but pushed against another list
655 * as well. This command was originally proposed by Ezra Zygmuntowicz.
656 */
ac06fc01 657
f444e2af 658void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
659 /* Create the list if the key does not exist */
660 if (!dstobj) {
661 dstobj = createZiplistObject();
662 dbAdd(c->db,dstkey,dstobj);
663 signalListAsReady(c,dstkey);
ac06fc01 664 }
f444e2af 665 signalModifiedKey(c->db,dstkey);
666 listTypePush(dstobj,value,REDIS_HEAD);
ac06fc01
PN
667 /* Always send the pushed value to the client. */
668 addReplyBulk(c,value);
669}
670
8a979f03 671void rpoplpushCommand(redisClient *c) {
e2641e09 672 robj *sobj, *value;
673 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
674 checkType(c,sobj,REDIS_LIST)) return;
675
676 if (listTypeLength(sobj) == 0) {
60ef787e 677 /* This may only happen after loading very old RDB files. Recent
678 * versions of Redis delete keys of empty lists. */
e2641e09 679 addReply(c,shared.nullbulk);
680 } else {
681 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
c1c9d551 682 robj *touchedkey = c->argv[1];
683
e2641e09 684 if (dobj && checkType(c,dobj,REDIS_LIST)) return;
685 value = listTypePop(sobj,REDIS_TAIL);
c1c9d551 686 /* We saved touched key, and protect it, since rpoplpushHandlePush
f444e2af 687 * may change the client command argument vector (it does not
688 * currently). */
c1c9d551 689 incrRefCount(touchedkey);
f444e2af 690 rpoplpushHandlePush(c,c->argv[2],dobj,value);
e2641e09 691
692 /* listTypePop returns an object with its refcount incremented */
693 decrRefCount(value);
694
695 /* Delete the source list when it is empty */
c1c9d551 696 if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
697 signalModifiedKey(c->db,touchedkey);
698 decrRefCount(touchedkey);
e2641e09 699 server.dirty++;
700 }
701}
702
703/*-----------------------------------------------------------------------------
704 * Blocking POP operations
705 *----------------------------------------------------------------------------*/
706
f444e2af 707/* This is how the current blocking POP works, we use BLPOP as example:
e2641e09 708 * - If the user calls BLPOP and the key exists and contains a non empty list
709 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
f444e2af 710 * if blocking is not required.
e2641e09 711 * - If instead BLPOP is called and the key does not exists or the list is
712 * empty we need to block. In order to do so we remove the notification for
713 * new data to read in the client socket (so that we'll not serve new
714 * requests if the blocking request is not served). Also we put the client
715 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
716 * blocking for this keys.
717 * - If a PUSH operation against a key with blocked clients waiting is
f444e2af 718 * performed, we mark this key as "ready", and after the current command,
719 * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
720 * for this list, from the one that blocked first, to the last, accordingly
721 * to the number of elements we have in the ready list.
e2641e09 722 */
723
724/* Set a client in blocking mode for the specified key, with the specified
725 * timeout */
ba3b4741 726void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
e2641e09 727 dictEntry *de;
728 list *l;
729 int j;
730
e3c51c4b
DJMM
731 c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
732 c->bpop.count = numkeys;
733 c->bpop.timeout = timeout;
734 c->bpop.target = target;
ba3b4741
DJMM
735
736 if (target != NULL) {
ecf94014 737 incrRefCount(target);
ba3b4741
DJMM
738 }
739
e2641e09 740 for (j = 0; j < numkeys; j++) {
741 /* Add the key in the client structure, to map clients -> keys */
e3c51c4b 742 c->bpop.keys[j] = keys[j];
e2641e09 743 incrRefCount(keys[j]);
744
745 /* And in the other "side", to map keys -> clients */
746 de = dictFind(c->db->blocking_keys,keys[j]);
747 if (de == NULL) {
748 int retval;
749
750 /* For every key we take a list of clients blocked for it */
751 l = listCreate();
752 retval = dictAdd(c->db->blocking_keys,keys[j],l);
753 incrRefCount(keys[j]);
eab0e26e 754 redisAssertWithInfo(c,keys[j],retval == DICT_OK);
e2641e09 755 } else {
c0ba9ebe 756 l = dictGetVal(de);
e2641e09 757 }
758 listAddNodeTail(l,c);
759 }
760 /* Mark the client as a blocked client */
761 c->flags |= REDIS_BLOCKED;
5fa95ad7 762 server.bpop_blocked_clients++;
e2641e09 763}
764
765/* Unblock a client that's waiting in a blocking operation such as BLPOP */
766void unblockClientWaitingData(redisClient *c) {
767 dictEntry *de;
768 list *l;
769 int j;
770
eab0e26e 771 redisAssertWithInfo(c,NULL,c->bpop.keys != NULL);
e2641e09 772 /* The client may wait for multiple keys, so unblock it for every key. */
e3c51c4b 773 for (j = 0; j < c->bpop.count; j++) {
e2641e09 774 /* Remove this client from the list of clients waiting for this key. */
e3c51c4b 775 de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
eab0e26e 776 redisAssertWithInfo(c,c->bpop.keys[j],de != NULL);
c0ba9ebe 777 l = dictGetVal(de);
e2641e09 778 listDelNode(l,listSearchKey(l,c));
779 /* If the list is empty we need to remove it to avoid wasting memory */
780 if (listLength(l) == 0)
e3c51c4b
DJMM
781 dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
782 decrRefCount(c->bpop.keys[j]);
e2641e09 783 }
ba3b4741 784
e2641e09 785 /* Cleanup the client structure */
e3c51c4b
DJMM
786 zfree(c->bpop.keys);
787 c->bpop.keys = NULL;
c1c9d551 788 if (c->bpop.target) decrRefCount(c->bpop.target);
e3c51c4b 789 c->bpop.target = NULL;
3bcffcbe
PN
790 c->flags &= ~REDIS_BLOCKED;
791 c->flags |= REDIS_UNBLOCKED;
5fa95ad7 792 server.bpop_blocked_clients--;
a4ce7581 793 listAddNodeTail(server.unblocked_clients,c);
e2641e09 794}
795
f444e2af 796/* If the specified key has clients blocked waiting for list pushes, this
797 * function will put the key reference into the server.ready_keys list.
798 * Note that db->ready_keys is an hash table that allows us to avoid putting
799 * the same key agains and again in the list in case of multiple pushes
800 * made by a script or in the context of MULTI/EXEC.
e2641e09 801 *
f444e2af 802 * The list will be finally processed by handleClientsBlockedOnLists() */
803void signalListAsReady(redisClient *c, robj *key) {
804 readyList *rl;
805
806 /* No clients blocking for this key? No need to queue it. */
807 if (dictFind(c->db->blocking_keys,key) == NULL) return;
808
809 /* Key was already signaled? No need to queue it again. */
810 if (dictFind(c->db->ready_keys,key) != NULL) return;
811
812 /* Ok, we need to queue this key into server.ready_keys. */
813 rl = zmalloc(sizeof(*rl));
814 rl->key = key;
815 rl->db = c->db;
816 incrRefCount(key);
817 listAddNodeTail(server.ready_keys,rl);
818
819 /* We also add the key in the db->ready_keys dictionary in order
820 * to avoid adding it multiple times into a list with a simple O(1)
821 * check. */
822 incrRefCount(key);
823 redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
824}
825
826/* This is an helper function for handleClientsBlockedOnLists(). It's work
827 * is to serve a specific client (receiver) that is blocked on 'key'
828 * in the context of the specified 'db', doing the following:
e2641e09 829 *
f444e2af 830 * 1) Provide the client with the 'value' element.
831 * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
832 * 'value' element on the destionation list (the LPUSH side of the command).
833 * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
834 * the AOF and replication channel.
835 *
836 * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
837 * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
838 * we can propagate the command properly.
839 *
840 * The function returns REDIS_OK if we are able to serve the client, otherwise
841 * REDIS_ERR is returned to signal the caller that the list POP operation
842 * should be undoed as the client was not served: This only happens for
843 * BRPOPLPUSH that fails to push the value to the destination key as it is
844 * of the wrong type. */
845int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
846{
847 robj *argv[3];
848
849 if (dstkey == NULL) {
850 /* Propagate the [LR]POP operation. */
851 argv[0] = (where == REDIS_HEAD) ? shared.lpop :
852 shared.rpop;
853 argv[1] = key;
854 propagate((where == REDIS_HEAD) ?
855 server.lpopCommand : server.rpopCommand,
856 db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
857
858 /* BRPOP/BLPOP */
859 addReplyMultiBulkLen(receiver,2);
860 addReplyBulk(receiver,key);
861 addReplyBulk(receiver,value);
862 } else {
863 /* BRPOPLPUSH */
864 robj *dstobj =
865 lookupKeyWrite(receiver->db,dstkey);
866 if (!(dstobj &&
867 checkType(receiver,dstobj,REDIS_LIST)))
868 {
869 /* Propagate the RPOP operation. */
870 argv[0] = shared.rpop;
871 argv[1] = key;
872 propagate(server.rpopCommand,
873 db->id,argv,2,
874 REDIS_PROPAGATE_AOF|
875 REDIS_PROPAGATE_REPL);
876 rpoplpushHandlePush(receiver,dstkey,dstobj,
877 value);
878 /* Propagate the LPUSH operation. */
879 argv[0] = shared.lpush;
880 argv[1] = dstkey;
881 argv[2] = value;
882 propagate(server.lpushCommand,
883 db->id,argv,3,
884 REDIS_PROPAGATE_AOF|
885 REDIS_PROPAGATE_REPL);
8a88c368 886 } else {
f444e2af 887 /* BRPOPLPUSH failed because of wrong
888 * destination type. */
889 return REDIS_ERR;
8a88c368 890 }
b2a7fd0c 891 }
f444e2af 892 return REDIS_OK;
893}
e2641e09 894
f444e2af 895/* This function should be called by Redis every time a single command,
896 * a MULTI/EXEC block, or a Lua script, terminated its execution after
897 * being called by a client.
898 *
899 * All the keys with at least one client blocked that received at least
900 * one new element via some PUSH operation are accumulated into
901 * the server.ready_keys list. This function will run the list and will
902 * serve clients accordingly. Note that the function will iterate again and
903 * again as a result of serving BRPOPLPUSH we can have new blocking clients
904 * to serve because of the PUSH side of BRPOPLPUSH. */
905void handleClientsBlockedOnLists(void) {
906 while(listLength(server.ready_keys) != 0) {
907 list *l;
908
909 /* Point server.ready_keys to a fresh list and save the current one
910 * locally. This way as we run the old list we are free to call
911 * signalListAsReady() that may push new elements in server.ready_keys
912 * when handling clients blocked into BRPOPLPUSH. */
913 l = server.ready_keys;
914 server.ready_keys = listCreate();
915
916 while(listLength(l) != 0) {
917 listNode *ln = listFirst(l);
918 readyList *rl = ln->value;
919
920 /* First of all remove this key from db->ready_keys so that
921 * we can safely call signalListAsReady() against this key. */
922 dictDelete(rl->db->ready_keys,rl->key);
923
924 /* If the key exists and it's a list, serve blocked clients
925 * with data. */
926 robj *o = lookupKeyWrite(rl->db,rl->key);
927 if (o != NULL && o->type == REDIS_LIST) {
928 dictEntry *de;
929
930 /* We serve clients in the same order they blocked for
931 * this key, from the first blocked to the last. */
932 de = dictFind(rl->db->blocking_keys,rl->key);
933 if (de) {
934 list *clients = dictGetVal(de);
935 int numclients = listLength(clients);
936
937 while(numclients--) {
938 listNode *clientnode = listFirst(clients);
939 redisClient *receiver = clientnode->value;
940 robj *dstkey = receiver->bpop.target;
941 int where = (receiver->lastcmd &&
942 receiver->lastcmd->proc == blpopCommand) ?
943 REDIS_HEAD : REDIS_TAIL;
944 robj *value = listTypePop(o,where);
945
946 if (value) {
947 /* Protect receiver->bpop.target, that will be
948 * freed by the next unblockClientWaitingData()
949 * call. */
950 if (dstkey) incrRefCount(dstkey);
951 unblockClientWaitingData(receiver);
952
953 if (serveClientBlockedOnList(receiver,
954 rl->key,dstkey,rl->db,value,
955 where) == REDIS_ERR)
956 {
957 /* If we failed serving the client we need
958 * to also undo the POP operation. */
959 listTypePush(o,value,where);
960 }
961
962 if (dstkey) decrRefCount(dstkey);
963 decrRefCount(value);
964 } else {
965 break;
966 }
967 }
968 }
969
970 if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
971 /* We don't call signalModifiedKey() as it was already called
972 * when an element was pushed on the list. */
973 }
974
975 /* Free this item. */
976 decrRefCount(rl->key);
977 zfree(rl);
978 listDelNode(l,ln);
979 }
980 listRelease(l); /* We have the new list on place at this point. */
981 }
e2641e09 982}
983
c8a0070a
PN
984int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
985 long tval;
59bd44d1 986
c8a0070a
PN
987 if (getLongFromObjectOrReply(c,object,&tval,
988 "timeout is not an integer or out of range") != REDIS_OK)
59bd44d1 989 return REDIS_ERR;
59bd44d1 990
c8a0070a
PN
991 if (tval < 0) {
992 addReplyError(c,"timeout is negative");
59bd44d1
DJMM
993 return REDIS_ERR;
994 }
995
56ff70f8 996 if (tval > 0) tval += server.unixtime;
c8a0070a 997 *timeout = tval;
59bd44d1
DJMM
998
999 return REDIS_OK;
e2641e09 1000}
1001
1002/* Blocking RPOP/LPOP */
1003void blockingPopGenericCommand(redisClient *c, int where) {
1004 robj *o;
1005 time_t timeout;
1006 int j;
1007
c8a0070a 1008 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
94364d53 1009 return;
94364d53 1010
e2641e09 1011 for (j = 1; j < c->argc-1; j++) {
1012 o = lookupKeyWrite(c->db,c->argv[j]);
1013 if (o != NULL) {
1014 if (o->type != REDIS_LIST) {
1015 addReply(c,shared.wrongtypeerr);
1016 return;
1017 } else {
1018 if (listTypeLength(o) != 0) {
c1db214e 1019 /* Non empty list, this is like a non normal [LR]POP. */
1020 robj *value = listTypePop(o,where);
1021 redisAssert(value != NULL);
b2a7fd0c 1022
c1db214e 1023 addReplyMultiBulkLen(c,2);
1024 addReplyBulk(c,c->argv[j]);
1025 addReplyBulk(c,value);
1026 decrRefCount(value);
1027 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[j]);
1028 signalModifiedKey(c->db,c->argv[j]);
1029 server.dirty++;
1030
1031 /* Replicate it as an [LR]POP instead of B[LR]POP. */
1032 rewriteClientCommandVector(c,2,
1033 (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
1034 c->argv[j]);
e2641e09 1035 return;
1036 }
1037 }
1038 }
1039 }
94364d53 1040
fb92ecec 1041 /* If we are inside a MULTI/EXEC and the list is empty the only thing
1042 * we can do is treating it as a timeout (even with timeout 0). */
1043 if (c->flags & REDIS_MULTI) {
1044 addReply(c,shared.nullmultibulk);
1045 return;
1046 }
1047
e2641e09 1048 /* If the list is empty or the key does not exists we must block */
ba3b4741 1049 blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
e2641e09 1050}
1051
1052void blpopCommand(redisClient *c) {
1053 blockingPopGenericCommand(c,REDIS_HEAD);
1054}
1055
1056void brpopCommand(redisClient *c) {
1057 blockingPopGenericCommand(c,REDIS_TAIL);
1058}
b2a7fd0c
DJMM
1059
1060void brpoplpushCommand(redisClient *c) {
ba3b4741 1061 time_t timeout;
b2a7fd0c 1062
c8a0070a 1063 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
ba3b4741 1064 return;
ba3b4741
DJMM
1065
1066 robj *key = lookupKeyWrite(c->db, c->argv[1]);
1067
ba3b4741 1068 if (key == NULL) {
ba3b4741 1069 if (c->flags & REDIS_MULTI) {
7c25a43a
DJMM
1070 /* Blocking against an empty list in a multi state
1071 * returns immediately. */
d5870d7a 1072 addReply(c, shared.nullbulk);
ba3b4741 1073 } else {
7c25a43a 1074 /* The list is empty and the client blocks. */
ba3b4741
DJMM
1075 blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
1076 }
ba3b4741 1077 } else {
7c25a43a
DJMM
1078 if (key->type != REDIS_LIST) {
1079 addReply(c, shared.wrongtypeerr);
1080 } else {
7c25a43a
DJMM
1081 /* The list exists and has elements, so
1082 * the regular rpoplpushCommand is executed. */
eab0e26e 1083 redisAssertWithInfo(c,key,listTypeLength(key) > 0);
7c25a43a
DJMM
1084 rpoplpushCommand(c);
1085 }
ba3b4741 1086 }
b2a7fd0c 1087}