]> git.saurik.com Git - redis.git/blob - src/t_list.c
A reimplementation of blocking operation internals.
[redis.git] / src / t_list.c
1 #include "redis.h"
2
3 void signalListAsReady(redisClient *c, robj *key);
4
5 /*-----------------------------------------------------------------------------
6 * List API
7 *----------------------------------------------------------------------------*/
8
9 /* Check the argument length to see if it requires us to convert the ziplist
10 * to a real list. Only check raw-encoded objects because integer encoded
11 * objects are never too long. */
12 void listTypeTryConversion(robj *subject, robj *value) {
13 if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
14 if (value->encoding == REDIS_ENCODING_RAW &&
15 sdslen(value->ptr) > server.list_max_ziplist_value)
16 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
17 }
18
19 /* The function pushes an elmenet to the specified list object 'subject',
20 * at head or tail position as specified by 'where'.
21 *
22 * There is no need for the caller to incremnet the refcount of 'value' as
23 * the function takes care of it if needed. */
24 void listTypePush(robj *subject, robj *value, int where) {
25 /* Check if we need to convert the ziplist */
26 listTypeTryConversion(subject,value);
27 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
28 ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
29 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
30
31 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
32 int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
33 value = getDecodedObject(value);
34 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
35 decrRefCount(value);
36 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
37 if (where == REDIS_HEAD) {
38 listAddNodeHead(subject->ptr,value);
39 } else {
40 listAddNodeTail(subject->ptr,value);
41 }
42 incrRefCount(value);
43 } else {
44 redisPanic("Unknown list encoding");
45 }
46 }
47
48 robj *listTypePop(robj *subject, int where) {
49 robj *value = NULL;
50 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
51 unsigned char *p;
52 unsigned char *vstr;
53 unsigned int vlen;
54 long long vlong;
55 int pos = (where == REDIS_HEAD) ? 0 : -1;
56 p = ziplistIndex(subject->ptr,pos);
57 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
58 if (vstr) {
59 value = createStringObject((char*)vstr,vlen);
60 } else {
61 value = createStringObjectFromLongLong(vlong);
62 }
63 /* We only need to delete an element when it exists */
64 subject->ptr = ziplistDelete(subject->ptr,&p);
65 }
66 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
67 list *list = subject->ptr;
68 listNode *ln;
69 if (where == REDIS_HEAD) {
70 ln = listFirst(list);
71 } else {
72 ln = listLast(list);
73 }
74 if (ln != NULL) {
75 value = listNodeValue(ln);
76 incrRefCount(value);
77 listDelNode(list,ln);
78 }
79 } else {
80 redisPanic("Unknown list encoding");
81 }
82 return value;
83 }
84
85 unsigned long listTypeLength(robj *subject) {
86 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
87 return ziplistLen(subject->ptr);
88 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
89 return listLength((list*)subject->ptr);
90 } else {
91 redisPanic("Unknown list encoding");
92 }
93 }
94
95 /* Initialize an iterator at the specified index. */
96 listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) {
97 listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
98 li->subject = subject;
99 li->encoding = subject->encoding;
100 li->direction = direction;
101 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
102 li->zi = ziplistIndex(subject->ptr,index);
103 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
104 li->ln = listIndex(subject->ptr,index);
105 } else {
106 redisPanic("Unknown list encoding");
107 }
108 return li;
109 }
110
111 /* Clean up the iterator. */
112 void listTypeReleaseIterator(listTypeIterator *li) {
113 zfree(li);
114 }
115
116 /* Stores pointer to current the entry in the provided entry structure
117 * and advances the position of the iterator. Returns 1 when the current
118 * entry is in fact an entry, 0 otherwise. */
119 int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
120 /* Protect from converting when iterating */
121 redisAssert(li->subject->encoding == li->encoding);
122
123 entry->li = li;
124 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
125 entry->zi = li->zi;
126 if (entry->zi != NULL) {
127 if (li->direction == REDIS_TAIL)
128 li->zi = ziplistNext(li->subject->ptr,li->zi);
129 else
130 li->zi = ziplistPrev(li->subject->ptr,li->zi);
131 return 1;
132 }
133 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
134 entry->ln = li->ln;
135 if (entry->ln != NULL) {
136 if (li->direction == REDIS_TAIL)
137 li->ln = li->ln->next;
138 else
139 li->ln = li->ln->prev;
140 return 1;
141 }
142 } else {
143 redisPanic("Unknown list encoding");
144 }
145 return 0;
146 }
147
148 /* Return entry or NULL at the current position of the iterator. */
149 robj *listTypeGet(listTypeEntry *entry) {
150 listTypeIterator *li = entry->li;
151 robj *value = NULL;
152 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
153 unsigned char *vstr;
154 unsigned int vlen;
155 long long vlong;
156 redisAssert(entry->zi != NULL);
157 if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
158 if (vstr) {
159 value = createStringObject((char*)vstr,vlen);
160 } else {
161 value = createStringObjectFromLongLong(vlong);
162 }
163 }
164 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
165 redisAssert(entry->ln != NULL);
166 value = listNodeValue(entry->ln);
167 incrRefCount(value);
168 } else {
169 redisPanic("Unknown list encoding");
170 }
171 return value;
172 }
173
174 void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
175 robj *subject = entry->li->subject;
176 if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
177 value = getDecodedObject(value);
178 if (where == REDIS_TAIL) {
179 unsigned char *next = ziplistNext(subject->ptr,entry->zi);
180
181 /* When we insert after the current element, but the current element
182 * is the tail of the list, we need to do a push. */
183 if (next == NULL) {
184 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
185 } else {
186 subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
187 }
188 } else {
189 subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
190 }
191 decrRefCount(value);
192 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
193 if (where == REDIS_TAIL) {
194 listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
195 } else {
196 listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
197 }
198 incrRefCount(value);
199 } else {
200 redisPanic("Unknown list encoding");
201 }
202 }
203
204 /* Compare the given object with the entry at the current position. */
205 int listTypeEqual(listTypeEntry *entry, robj *o) {
206 listTypeIterator *li = entry->li;
207 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
208 redisAssertWithInfo(NULL,o,o->encoding == REDIS_ENCODING_RAW);
209 return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
210 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
211 return equalStringObjects(o,listNodeValue(entry->ln));
212 } else {
213 redisPanic("Unknown list encoding");
214 }
215 }
216
217 /* Delete the element pointed to. */
218 void listTypeDelete(listTypeEntry *entry) {
219 listTypeIterator *li = entry->li;
220 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
221 unsigned char *p = entry->zi;
222 li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
223
224 /* Update position of the iterator depending on the direction */
225 if (li->direction == REDIS_TAIL)
226 li->zi = p;
227 else
228 li->zi = ziplistPrev(li->subject->ptr,p);
229 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
230 listNode *next;
231 if (li->direction == REDIS_TAIL)
232 next = entry->ln->next;
233 else
234 next = entry->ln->prev;
235 listDelNode(li->subject->ptr,entry->ln);
236 li->ln = next;
237 } else {
238 redisPanic("Unknown list encoding");
239 }
240 }
241
242 void listTypeConvert(robj *subject, int enc) {
243 listTypeIterator *li;
244 listTypeEntry entry;
245 redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
246
247 if (enc == REDIS_ENCODING_LINKEDLIST) {
248 list *l = listCreate();
249 listSetFreeMethod(l,decrRefCount);
250
251 /* listTypeGet returns a robj with incremented refcount */
252 li = listTypeInitIterator(subject,0,REDIS_TAIL);
253 while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
254 listTypeReleaseIterator(li);
255
256 subject->encoding = REDIS_ENCODING_LINKEDLIST;
257 zfree(subject->ptr);
258 subject->ptr = l;
259 } else {
260 redisPanic("Unsupported list conversion");
261 }
262 }
263
264 /*-----------------------------------------------------------------------------
265 * List Commands
266 *----------------------------------------------------------------------------*/
267
268 void pushGenericCommand(redisClient *c, int where) {
269 int j, waiting = 0, pushed = 0;
270 robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
271 int may_have_waiting_clients = (lobj == NULL);
272
273 if (lobj && lobj->type != REDIS_LIST) {
274 addReply(c,shared.wrongtypeerr);
275 return;
276 }
277
278 if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
279
280 for (j = 2; j < c->argc; j++) {
281 c->argv[j] = tryObjectEncoding(c->argv[j]);
282 if (!lobj) {
283 lobj = createZiplistObject();
284 dbAdd(c->db,c->argv[1],lobj);
285 }
286 listTypePush(lobj,c->argv[j],where);
287 pushed++;
288 }
289 addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
290 if (pushed) signalModifiedKey(c->db,c->argv[1]);
291 server.dirty += pushed;
292 }
293
294 void lpushCommand(redisClient *c) {
295 pushGenericCommand(c,REDIS_HEAD);
296 }
297
298 void rpushCommand(redisClient *c) {
299 pushGenericCommand(c,REDIS_TAIL);
300 }
301
302 void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
303 robj *subject;
304 listTypeIterator *iter;
305 listTypeEntry entry;
306 int inserted = 0;
307
308 if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
309 checkType(c,subject,REDIS_LIST)) return;
310
311 if (refval != NULL) {
312 /* Note: we expect refval to be string-encoded because it is *not* the
313 * last argument of the multi-bulk LINSERT. */
314 redisAssertWithInfo(c,refval,refval->encoding == REDIS_ENCODING_RAW);
315
316 /* We're not sure if this value can be inserted yet, but we cannot
317 * convert the list inside the iterator. We don't want to loop over
318 * the list twice (once to see if the value can be inserted and once
319 * to do the actual insert), so we assume this value can be inserted
320 * and convert the ziplist to a regular list if necessary. */
321 listTypeTryConversion(subject,val);
322
323 /* Seek refval from head to tail */
324 iter = listTypeInitIterator(subject,0,REDIS_TAIL);
325 while (listTypeNext(iter,&entry)) {
326 if (listTypeEqual(&entry,refval)) {
327 listTypeInsert(&entry,val,where);
328 inserted = 1;
329 break;
330 }
331 }
332 listTypeReleaseIterator(iter);
333
334 if (inserted) {
335 /* Check if the length exceeds the ziplist length threshold. */
336 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
337 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
338 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
339 signalModifiedKey(c->db,c->argv[1]);
340 server.dirty++;
341 } else {
342 /* Notify client of a failed insert */
343 addReply(c,shared.cnegone);
344 return;
345 }
346 } else {
347 listTypePush(subject,val,where);
348 signalModifiedKey(c->db,c->argv[1]);
349 server.dirty++;
350 }
351
352 addReplyLongLong(c,listTypeLength(subject));
353 }
354
355 void lpushxCommand(redisClient *c) {
356 c->argv[2] = tryObjectEncoding(c->argv[2]);
357 pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
358 }
359
360 void rpushxCommand(redisClient *c) {
361 c->argv[2] = tryObjectEncoding(c->argv[2]);
362 pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
363 }
364
365 void linsertCommand(redisClient *c) {
366 c->argv[4] = tryObjectEncoding(c->argv[4]);
367 if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
368 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
369 } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
370 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD);
371 } else {
372 addReply(c,shared.syntaxerr);
373 }
374 }
375
376 void llenCommand(redisClient *c) {
377 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
378 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
379 addReplyLongLong(c,listTypeLength(o));
380 }
381
382 void lindexCommand(redisClient *c) {
383 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
384 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
385 long index;
386 robj *value = NULL;
387
388 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
389 return;
390
391 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
392 unsigned char *p;
393 unsigned char *vstr;
394 unsigned int vlen;
395 long long vlong;
396 p = ziplistIndex(o->ptr,index);
397 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
398 if (vstr) {
399 value = createStringObject((char*)vstr,vlen);
400 } else {
401 value = createStringObjectFromLongLong(vlong);
402 }
403 addReplyBulk(c,value);
404 decrRefCount(value);
405 } else {
406 addReply(c,shared.nullbulk);
407 }
408 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
409 listNode *ln = listIndex(o->ptr,index);
410 if (ln != NULL) {
411 value = listNodeValue(ln);
412 addReplyBulk(c,value);
413 } else {
414 addReply(c,shared.nullbulk);
415 }
416 } else {
417 redisPanic("Unknown list encoding");
418 }
419 }
420
421 void lsetCommand(redisClient *c) {
422 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
423 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
424 long index;
425 robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
426
427 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
428 return;
429
430 listTypeTryConversion(o,value);
431 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
432 unsigned char *p, *zl = o->ptr;
433 p = ziplistIndex(zl,index);
434 if (p == NULL) {
435 addReply(c,shared.outofrangeerr);
436 } else {
437 o->ptr = ziplistDelete(o->ptr,&p);
438 value = getDecodedObject(value);
439 o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
440 decrRefCount(value);
441 addReply(c,shared.ok);
442 signalModifiedKey(c->db,c->argv[1]);
443 server.dirty++;
444 }
445 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
446 listNode *ln = listIndex(o->ptr,index);
447 if (ln == NULL) {
448 addReply(c,shared.outofrangeerr);
449 } else {
450 decrRefCount((robj*)listNodeValue(ln));
451 listNodeValue(ln) = value;
452 incrRefCount(value);
453 addReply(c,shared.ok);
454 signalModifiedKey(c->db,c->argv[1]);
455 server.dirty++;
456 }
457 } else {
458 redisPanic("Unknown list encoding");
459 }
460 }
461
462 void popGenericCommand(redisClient *c, int where) {
463 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
464 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
465
466 robj *value = listTypePop(o,where);
467 if (value == NULL) {
468 addReply(c,shared.nullbulk);
469 } else {
470 addReplyBulk(c,value);
471 decrRefCount(value);
472 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
473 signalModifiedKey(c->db,c->argv[1]);
474 server.dirty++;
475 }
476 }
477
478 void lpopCommand(redisClient *c) {
479 popGenericCommand(c,REDIS_HEAD);
480 }
481
482 void rpopCommand(redisClient *c) {
483 popGenericCommand(c,REDIS_TAIL);
484 }
485
486 void lrangeCommand(redisClient *c) {
487 robj *o;
488 long start, end, llen, rangelen;
489
490 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
491 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
492
493 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
494 || checkType(c,o,REDIS_LIST)) return;
495 llen = listTypeLength(o);
496
497 /* convert negative indexes */
498 if (start < 0) start = llen+start;
499 if (end < 0) end = llen+end;
500 if (start < 0) start = 0;
501
502 /* Invariant: start >= 0, so this test will be true when end < 0.
503 * The range is empty when start > end or start >= length. */
504 if (start > end || start >= llen) {
505 addReply(c,shared.emptymultibulk);
506 return;
507 }
508 if (end >= llen) end = llen-1;
509 rangelen = (end-start)+1;
510
511 /* Return the result in form of a multi-bulk reply */
512 addReplyMultiBulkLen(c,rangelen);
513 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
514 unsigned char *p = ziplistIndex(o->ptr,start);
515 unsigned char *vstr;
516 unsigned int vlen;
517 long long vlong;
518
519 while(rangelen--) {
520 ziplistGet(p,&vstr,&vlen,&vlong);
521 if (vstr) {
522 addReplyBulkCBuffer(c,vstr,vlen);
523 } else {
524 addReplyBulkLongLong(c,vlong);
525 }
526 p = ziplistNext(o->ptr,p);
527 }
528 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
529 listNode *ln;
530
531 /* If we are nearest to the end of the list, reach the element
532 * starting from tail and going backward, as it is faster. */
533 if (start > llen/2) start -= llen;
534 ln = listIndex(o->ptr,start);
535
536 while(rangelen--) {
537 addReplyBulk(c,ln->value);
538 ln = ln->next;
539 }
540 } else {
541 redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
542 }
543 }
544
545 void ltrimCommand(redisClient *c) {
546 robj *o;
547 long start, end, llen, j, ltrim, rtrim;
548 list *list;
549 listNode *ln;
550
551 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
552 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
553
554 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
555 checkType(c,o,REDIS_LIST)) return;
556 llen = listTypeLength(o);
557
558 /* convert negative indexes */
559 if (start < 0) start = llen+start;
560 if (end < 0) end = llen+end;
561 if (start < 0) start = 0;
562
563 /* Invariant: start >= 0, so this test will be true when end < 0.
564 * The range is empty when start > end or start >= length. */
565 if (start > end || start >= llen) {
566 /* Out of range start or start > end result in empty list */
567 ltrim = llen;
568 rtrim = 0;
569 } else {
570 if (end >= llen) end = llen-1;
571 ltrim = start;
572 rtrim = llen-end-1;
573 }
574
575 /* Remove list elements to perform the trim */
576 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
577 o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
578 o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
579 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
580 list = o->ptr;
581 for (j = 0; j < ltrim; j++) {
582 ln = listFirst(list);
583 listDelNode(list,ln);
584 }
585 for (j = 0; j < rtrim; j++) {
586 ln = listLast(list);
587 listDelNode(list,ln);
588 }
589 } else {
590 redisPanic("Unknown list encoding");
591 }
592 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
593 signalModifiedKey(c->db,c->argv[1]);
594 server.dirty++;
595 addReply(c,shared.ok);
596 }
597
598 void lremCommand(redisClient *c) {
599 robj *subject, *obj;
600 obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
601 long toremove;
602 long removed = 0;
603 listTypeEntry entry;
604
605 if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
606 return;
607
608 subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
609 if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
610
611 /* Make sure obj is raw when we're dealing with a ziplist */
612 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
613 obj = getDecodedObject(obj);
614
615 listTypeIterator *li;
616 if (toremove < 0) {
617 toremove = -toremove;
618 li = listTypeInitIterator(subject,-1,REDIS_HEAD);
619 } else {
620 li = listTypeInitIterator(subject,0,REDIS_TAIL);
621 }
622
623 while (listTypeNext(li,&entry)) {
624 if (listTypeEqual(&entry,obj)) {
625 listTypeDelete(&entry);
626 server.dirty++;
627 removed++;
628 if (toremove && removed == toremove) break;
629 }
630 }
631 listTypeReleaseIterator(li);
632
633 /* Clean up raw encoded object */
634 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
635 decrRefCount(obj);
636
637 if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
638 addReplyLongLong(c,removed);
639 if (removed) signalModifiedKey(c->db,c->argv[1]);
640 }
641
642 /* This is the semantic of this command:
643 * RPOPLPUSH srclist dstlist:
644 * IF LLEN(srclist) > 0
645 * element = RPOP srclist
646 * LPUSH dstlist element
647 * RETURN element
648 * ELSE
649 * RETURN nil
650 * END
651 * END
652 *
653 * The idea is to be able to get an element from a list in a reliable way
654 * since the element is not just returned but pushed against another list
655 * as well. This command was originally proposed by Ezra Zygmuntowicz.
656 */
657
658 void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
659 /* Create the list if the key does not exist */
660 if (!dstobj) {
661 dstobj = createZiplistObject();
662 dbAdd(c->db,dstkey,dstobj);
663 signalListAsReady(c,dstkey);
664 }
665 signalModifiedKey(c->db,dstkey);
666 listTypePush(dstobj,value,REDIS_HEAD);
667 /* Always send the pushed value to the client. */
668 addReplyBulk(c,value);
669 }
670
671 void rpoplpushCommand(redisClient *c) {
672 robj *sobj, *value;
673 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
674 checkType(c,sobj,REDIS_LIST)) return;
675
676 if (listTypeLength(sobj) == 0) {
677 /* This may only happen after loading very old RDB files. Recent
678 * versions of Redis delete keys of empty lists. */
679 addReply(c,shared.nullbulk);
680 } else {
681 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
682 robj *touchedkey = c->argv[1];
683
684 if (dobj && checkType(c,dobj,REDIS_LIST)) return;
685 value = listTypePop(sobj,REDIS_TAIL);
686 /* We saved touched key, and protect it, since rpoplpushHandlePush
687 * may change the client command argument vector (it does not
688 * currently). */
689 incrRefCount(touchedkey);
690 rpoplpushHandlePush(c,c->argv[2],dobj,value);
691
692 /* listTypePop returns an object with its refcount incremented */
693 decrRefCount(value);
694
695 /* Delete the source list when it is empty */
696 if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
697 signalModifiedKey(c->db,touchedkey);
698 decrRefCount(touchedkey);
699 server.dirty++;
700 }
701 }
702
703 /*-----------------------------------------------------------------------------
704 * Blocking POP operations
705 *----------------------------------------------------------------------------*/
706
707 /* This is how the current blocking POP works, we use BLPOP as example:
708 * - If the user calls BLPOP and the key exists and contains a non empty list
709 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
710 * if blocking is not required.
711 * - If instead BLPOP is called and the key does not exists or the list is
712 * empty we need to block. In order to do so we remove the notification for
713 * new data to read in the client socket (so that we'll not serve new
714 * requests if the blocking request is not served). Also we put the client
715 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
716 * blocking for this keys.
717 * - If a PUSH operation against a key with blocked clients waiting is
718 * performed, we mark this key as "ready", and after the current command,
719 * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
720 * for this list, from the one that blocked first, to the last, accordingly
721 * to the number of elements we have in the ready list.
722 */
723
724 /* Set a client in blocking mode for the specified key, with the specified
725 * timeout */
726 void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
727 dictEntry *de;
728 list *l;
729 int j;
730
731 c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
732 c->bpop.count = numkeys;
733 c->bpop.timeout = timeout;
734 c->bpop.target = target;
735
736 if (target != NULL) {
737 incrRefCount(target);
738 }
739
740 for (j = 0; j < numkeys; j++) {
741 /* Add the key in the client structure, to map clients -> keys */
742 c->bpop.keys[j] = keys[j];
743 incrRefCount(keys[j]);
744
745 /* And in the other "side", to map keys -> clients */
746 de = dictFind(c->db->blocking_keys,keys[j]);
747 if (de == NULL) {
748 int retval;
749
750 /* For every key we take a list of clients blocked for it */
751 l = listCreate();
752 retval = dictAdd(c->db->blocking_keys,keys[j],l);
753 incrRefCount(keys[j]);
754 redisAssertWithInfo(c,keys[j],retval == DICT_OK);
755 } else {
756 l = dictGetVal(de);
757 }
758 listAddNodeTail(l,c);
759 }
760 /* Mark the client as a blocked client */
761 c->flags |= REDIS_BLOCKED;
762 server.bpop_blocked_clients++;
763 }
764
765 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
766 void unblockClientWaitingData(redisClient *c) {
767 dictEntry *de;
768 list *l;
769 int j;
770
771 redisAssertWithInfo(c,NULL,c->bpop.keys != NULL);
772 /* The client may wait for multiple keys, so unblock it for every key. */
773 for (j = 0; j < c->bpop.count; j++) {
774 /* Remove this client from the list of clients waiting for this key. */
775 de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
776 redisAssertWithInfo(c,c->bpop.keys[j],de != NULL);
777 l = dictGetVal(de);
778 listDelNode(l,listSearchKey(l,c));
779 /* If the list is empty we need to remove it to avoid wasting memory */
780 if (listLength(l) == 0)
781 dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
782 decrRefCount(c->bpop.keys[j]);
783 }
784
785 /* Cleanup the client structure */
786 zfree(c->bpop.keys);
787 c->bpop.keys = NULL;
788 if (c->bpop.target) decrRefCount(c->bpop.target);
789 c->bpop.target = NULL;
790 c->flags &= ~REDIS_BLOCKED;
791 c->flags |= REDIS_UNBLOCKED;
792 server.bpop_blocked_clients--;
793 listAddNodeTail(server.unblocked_clients,c);
794 }
795
796 /* If the specified key has clients blocked waiting for list pushes, this
797 * function will put the key reference into the server.ready_keys list.
798 * Note that db->ready_keys is an hash table that allows us to avoid putting
799 * the same key agains and again in the list in case of multiple pushes
800 * made by a script or in the context of MULTI/EXEC.
801 *
802 * The list will be finally processed by handleClientsBlockedOnLists() */
803 void signalListAsReady(redisClient *c, robj *key) {
804 readyList *rl;
805
806 /* No clients blocking for this key? No need to queue it. */
807 if (dictFind(c->db->blocking_keys,key) == NULL) return;
808
809 /* Key was already signaled? No need to queue it again. */
810 if (dictFind(c->db->ready_keys,key) != NULL) return;
811
812 /* Ok, we need to queue this key into server.ready_keys. */
813 rl = zmalloc(sizeof(*rl));
814 rl->key = key;
815 rl->db = c->db;
816 incrRefCount(key);
817 listAddNodeTail(server.ready_keys,rl);
818
819 /* We also add the key in the db->ready_keys dictionary in order
820 * to avoid adding it multiple times into a list with a simple O(1)
821 * check. */
822 incrRefCount(key);
823 redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
824 }
825
826 /* This is an helper function for handleClientsBlockedOnLists(). It's work
827 * is to serve a specific client (receiver) that is blocked on 'key'
828 * in the context of the specified 'db', doing the following:
829 *
830 * 1) Provide the client with the 'value' element.
831 * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
832 * 'value' element on the destionation list (the LPUSH side of the command).
833 * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
834 * the AOF and replication channel.
835 *
836 * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
837 * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
838 * we can propagate the command properly.
839 *
840 * The function returns REDIS_OK if we are able to serve the client, otherwise
841 * REDIS_ERR is returned to signal the caller that the list POP operation
842 * should be undoed as the client was not served: This only happens for
843 * BRPOPLPUSH that fails to push the value to the destination key as it is
844 * of the wrong type. */
845 int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
846 {
847 robj *argv[3];
848
849 if (dstkey == NULL) {
850 /* Propagate the [LR]POP operation. */
851 argv[0] = (where == REDIS_HEAD) ? shared.lpop :
852 shared.rpop;
853 argv[1] = key;
854 propagate((where == REDIS_HEAD) ?
855 server.lpopCommand : server.rpopCommand,
856 db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
857
858 /* BRPOP/BLPOP */
859 addReplyMultiBulkLen(receiver,2);
860 addReplyBulk(receiver,key);
861 addReplyBulk(receiver,value);
862 } else {
863 /* BRPOPLPUSH */
864 robj *dstobj =
865 lookupKeyWrite(receiver->db,dstkey);
866 if (!(dstobj &&
867 checkType(receiver,dstobj,REDIS_LIST)))
868 {
869 /* Propagate the RPOP operation. */
870 argv[0] = shared.rpop;
871 argv[1] = key;
872 propagate(server.rpopCommand,
873 db->id,argv,2,
874 REDIS_PROPAGATE_AOF|
875 REDIS_PROPAGATE_REPL);
876 rpoplpushHandlePush(receiver,dstkey,dstobj,
877 value);
878 /* Propagate the LPUSH operation. */
879 argv[0] = shared.lpush;
880 argv[1] = dstkey;
881 argv[2] = value;
882 propagate(server.lpushCommand,
883 db->id,argv,3,
884 REDIS_PROPAGATE_AOF|
885 REDIS_PROPAGATE_REPL);
886 } else {
887 /* BRPOPLPUSH failed because of wrong
888 * destination type. */
889 return REDIS_ERR;
890 }
891 }
892 return REDIS_OK;
893 }
894
895 /* This function should be called by Redis every time a single command,
896 * a MULTI/EXEC block, or a Lua script, terminated its execution after
897 * being called by a client.
898 *
899 * All the keys with at least one client blocked that received at least
900 * one new element via some PUSH operation are accumulated into
901 * the server.ready_keys list. This function will run the list and will
902 * serve clients accordingly. Note that the function will iterate again and
903 * again as a result of serving BRPOPLPUSH we can have new blocking clients
904 * to serve because of the PUSH side of BRPOPLPUSH. */
905 void handleClientsBlockedOnLists(void) {
906 while(listLength(server.ready_keys) != 0) {
907 list *l;
908
909 /* Point server.ready_keys to a fresh list and save the current one
910 * locally. This way as we run the old list we are free to call
911 * signalListAsReady() that may push new elements in server.ready_keys
912 * when handling clients blocked into BRPOPLPUSH. */
913 l = server.ready_keys;
914 server.ready_keys = listCreate();
915
916 while(listLength(l) != 0) {
917 listNode *ln = listFirst(l);
918 readyList *rl = ln->value;
919
920 /* First of all remove this key from db->ready_keys so that
921 * we can safely call signalListAsReady() against this key. */
922 dictDelete(rl->db->ready_keys,rl->key);
923
924 /* If the key exists and it's a list, serve blocked clients
925 * with data. */
926 robj *o = lookupKeyWrite(rl->db,rl->key);
927 if (o != NULL && o->type == REDIS_LIST) {
928 dictEntry *de;
929
930 /* We serve clients in the same order they blocked for
931 * this key, from the first blocked to the last. */
932 de = dictFind(rl->db->blocking_keys,rl->key);
933 if (de) {
934 list *clients = dictGetVal(de);
935 int numclients = listLength(clients);
936
937 while(numclients--) {
938 listNode *clientnode = listFirst(clients);
939 redisClient *receiver = clientnode->value;
940 robj *dstkey = receiver->bpop.target;
941 int where = (receiver->lastcmd &&
942 receiver->lastcmd->proc == blpopCommand) ?
943 REDIS_HEAD : REDIS_TAIL;
944 robj *value = listTypePop(o,where);
945
946 if (value) {
947 /* Protect receiver->bpop.target, that will be
948 * freed by the next unblockClientWaitingData()
949 * call. */
950 if (dstkey) incrRefCount(dstkey);
951 unblockClientWaitingData(receiver);
952
953 if (serveClientBlockedOnList(receiver,
954 rl->key,dstkey,rl->db,value,
955 where) == REDIS_ERR)
956 {
957 /* If we failed serving the client we need
958 * to also undo the POP operation. */
959 listTypePush(o,value,where);
960 }
961
962 if (dstkey) decrRefCount(dstkey);
963 decrRefCount(value);
964 } else {
965 break;
966 }
967 }
968 }
969
970 if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
971 /* We don't call signalModifiedKey() as it was already called
972 * when an element was pushed on the list. */
973 }
974
975 /* Free this item. */
976 decrRefCount(rl->key);
977 zfree(rl);
978 listDelNode(l,ln);
979 }
980 listRelease(l); /* We have the new list on place at this point. */
981 }
982 }
983
984 int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
985 long tval;
986
987 if (getLongFromObjectOrReply(c,object,&tval,
988 "timeout is not an integer or out of range") != REDIS_OK)
989 return REDIS_ERR;
990
991 if (tval < 0) {
992 addReplyError(c,"timeout is negative");
993 return REDIS_ERR;
994 }
995
996 if (tval > 0) tval += server.unixtime;
997 *timeout = tval;
998
999 return REDIS_OK;
1000 }
1001
1002 /* Blocking RPOP/LPOP */
1003 void blockingPopGenericCommand(redisClient *c, int where) {
1004 robj *o;
1005 time_t timeout;
1006 int j;
1007
1008 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
1009 return;
1010
1011 for (j = 1; j < c->argc-1; j++) {
1012 o = lookupKeyWrite(c->db,c->argv[j]);
1013 if (o != NULL) {
1014 if (o->type != REDIS_LIST) {
1015 addReply(c,shared.wrongtypeerr);
1016 return;
1017 } else {
1018 if (listTypeLength(o) != 0) {
1019 /* Non empty list, this is like a non normal [LR]POP. */
1020 robj *value = listTypePop(o,where);
1021 redisAssert(value != NULL);
1022
1023 addReplyMultiBulkLen(c,2);
1024 addReplyBulk(c,c->argv[j]);
1025 addReplyBulk(c,value);
1026 decrRefCount(value);
1027 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[j]);
1028 signalModifiedKey(c->db,c->argv[j]);
1029 server.dirty++;
1030
1031 /* Replicate it as an [LR]POP instead of B[LR]POP. */
1032 rewriteClientCommandVector(c,2,
1033 (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
1034 c->argv[j]);
1035 return;
1036 }
1037 }
1038 }
1039 }
1040
1041 /* If we are inside a MULTI/EXEC and the list is empty the only thing
1042 * we can do is treating it as a timeout (even with timeout 0). */
1043 if (c->flags & REDIS_MULTI) {
1044 addReply(c,shared.nullmultibulk);
1045 return;
1046 }
1047
1048 /* If the list is empty or the key does not exists we must block */
1049 blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
1050 }
1051
1052 void blpopCommand(redisClient *c) {
1053 blockingPopGenericCommand(c,REDIS_HEAD);
1054 }
1055
1056 void brpopCommand(redisClient *c) {
1057 blockingPopGenericCommand(c,REDIS_TAIL);
1058 }
1059
1060 void brpoplpushCommand(redisClient *c) {
1061 time_t timeout;
1062
1063 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
1064 return;
1065
1066 robj *key = lookupKeyWrite(c->db, c->argv[1]);
1067
1068 if (key == NULL) {
1069 if (c->flags & REDIS_MULTI) {
1070 /* Blocking against an empty list in a multi state
1071 * returns immediately. */
1072 addReply(c, shared.nullbulk);
1073 } else {
1074 /* The list is empty and the client blocks. */
1075 blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
1076 }
1077 } else {
1078 if (key->type != REDIS_LIST) {
1079 addReply(c, shared.wrongtypeerr);
1080 } else {
1081 /* The list exists and has elements, so
1082 * the regular rpoplpushCommand is executed. */
1083 redisAssertWithInfo(c,key,listTypeLength(key) > 0);
1084 rpoplpushCommand(c);
1085 }
1086 }
1087 }