]> git.saurik.com Git - redis.git/blob - src/t_list.c
Merge remote-tracking branch 'origin/unstable' into unstable
[redis.git] / src / t_list.c
1 #include "redis.h"
2
3 /*-----------------------------------------------------------------------------
4 * List API
5 *----------------------------------------------------------------------------*/
6
7 /* Check the argument length to see if it requires us to convert the ziplist
8 * to a real list. Only check raw-encoded objects because integer encoded
9 * objects are never too long. */
10 void listTypeTryConversion(robj *subject, robj *value) {
11 if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
12 if (value->encoding == REDIS_ENCODING_RAW &&
13 sdslen(value->ptr) > server.list_max_ziplist_value)
14 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
15 }
16
17 void listTypePush(robj *subject, robj *value, int where) {
18 /* Check if we need to convert the ziplist */
19 listTypeTryConversion(subject,value);
20 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
21 ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
22 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
23
24 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
25 int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
26 value = getDecodedObject(value);
27 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
28 decrRefCount(value);
29 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
30 if (where == REDIS_HEAD) {
31 listAddNodeHead(subject->ptr,value);
32 } else {
33 listAddNodeTail(subject->ptr,value);
34 }
35 incrRefCount(value);
36 } else {
37 redisPanic("Unknown list encoding");
38 }
39 }
40
41 robj *listTypePop(robj *subject, int where) {
42 robj *value = NULL;
43 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
44 unsigned char *p;
45 unsigned char *vstr;
46 unsigned int vlen;
47 long long vlong;
48 int pos = (where == REDIS_HEAD) ? 0 : -1;
49 p = ziplistIndex(subject->ptr,pos);
50 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
51 if (vstr) {
52 value = createStringObject((char*)vstr,vlen);
53 } else {
54 value = createStringObjectFromLongLong(vlong);
55 }
56 /* We only need to delete an element when it exists */
57 subject->ptr = ziplistDelete(subject->ptr,&p);
58 }
59 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
60 list *list = subject->ptr;
61 listNode *ln;
62 if (where == REDIS_HEAD) {
63 ln = listFirst(list);
64 } else {
65 ln = listLast(list);
66 }
67 if (ln != NULL) {
68 value = listNodeValue(ln);
69 incrRefCount(value);
70 listDelNode(list,ln);
71 }
72 } else {
73 redisPanic("Unknown list encoding");
74 }
75 return value;
76 }
77
78 unsigned long listTypeLength(robj *subject) {
79 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
80 return ziplistLen(subject->ptr);
81 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
82 return listLength((list*)subject->ptr);
83 } else {
84 redisPanic("Unknown list encoding");
85 }
86 }
87
88 /* Initialize an iterator at the specified index. */
89 listTypeIterator *listTypeInitIterator(robj *subject, int index, unsigned char direction) {
90 listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
91 li->subject = subject;
92 li->encoding = subject->encoding;
93 li->direction = direction;
94 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
95 li->zi = ziplistIndex(subject->ptr,index);
96 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
97 li->ln = listIndex(subject->ptr,index);
98 } else {
99 redisPanic("Unknown list encoding");
100 }
101 return li;
102 }
103
104 /* Clean up the iterator. */
105 void listTypeReleaseIterator(listTypeIterator *li) {
106 zfree(li);
107 }
108
109 /* Stores pointer to current the entry in the provided entry structure
110 * and advances the position of the iterator. Returns 1 when the current
111 * entry is in fact an entry, 0 otherwise. */
112 int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
113 /* Protect from converting when iterating */
114 redisAssert(li->subject->encoding == li->encoding);
115
116 entry->li = li;
117 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
118 entry->zi = li->zi;
119 if (entry->zi != NULL) {
120 if (li->direction == REDIS_TAIL)
121 li->zi = ziplistNext(li->subject->ptr,li->zi);
122 else
123 li->zi = ziplistPrev(li->subject->ptr,li->zi);
124 return 1;
125 }
126 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
127 entry->ln = li->ln;
128 if (entry->ln != NULL) {
129 if (li->direction == REDIS_TAIL)
130 li->ln = li->ln->next;
131 else
132 li->ln = li->ln->prev;
133 return 1;
134 }
135 } else {
136 redisPanic("Unknown list encoding");
137 }
138 return 0;
139 }
140
141 /* Return entry or NULL at the current position of the iterator. */
142 robj *listTypeGet(listTypeEntry *entry) {
143 listTypeIterator *li = entry->li;
144 robj *value = NULL;
145 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
146 unsigned char *vstr;
147 unsigned int vlen;
148 long long vlong;
149 redisAssert(entry->zi != NULL);
150 if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
151 if (vstr) {
152 value = createStringObject((char*)vstr,vlen);
153 } else {
154 value = createStringObjectFromLongLong(vlong);
155 }
156 }
157 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
158 redisAssert(entry->ln != NULL);
159 value = listNodeValue(entry->ln);
160 incrRefCount(value);
161 } else {
162 redisPanic("Unknown list encoding");
163 }
164 return value;
165 }
166
167 void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
168 robj *subject = entry->li->subject;
169 if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
170 value = getDecodedObject(value);
171 if (where == REDIS_TAIL) {
172 unsigned char *next = ziplistNext(subject->ptr,entry->zi);
173
174 /* When we insert after the current element, but the current element
175 * is the tail of the list, we need to do a push. */
176 if (next == NULL) {
177 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
178 } else {
179 subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
180 }
181 } else {
182 subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
183 }
184 decrRefCount(value);
185 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
186 if (where == REDIS_TAIL) {
187 listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
188 } else {
189 listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
190 }
191 incrRefCount(value);
192 } else {
193 redisPanic("Unknown list encoding");
194 }
195 }
196
197 /* Compare the given object with the entry at the current position. */
198 int listTypeEqual(listTypeEntry *entry, robj *o) {
199 listTypeIterator *li = entry->li;
200 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
201 redisAssertWithInfo(NULL,o,o->encoding == REDIS_ENCODING_RAW);
202 return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
203 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
204 return equalStringObjects(o,listNodeValue(entry->ln));
205 } else {
206 redisPanic("Unknown list encoding");
207 }
208 }
209
210 /* Delete the element pointed to. */
211 void listTypeDelete(listTypeEntry *entry) {
212 listTypeIterator *li = entry->li;
213 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
214 unsigned char *p = entry->zi;
215 li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
216
217 /* Update position of the iterator depending on the direction */
218 if (li->direction == REDIS_TAIL)
219 li->zi = p;
220 else
221 li->zi = ziplistPrev(li->subject->ptr,p);
222 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
223 listNode *next;
224 if (li->direction == REDIS_TAIL)
225 next = entry->ln->next;
226 else
227 next = entry->ln->prev;
228 listDelNode(li->subject->ptr,entry->ln);
229 li->ln = next;
230 } else {
231 redisPanic("Unknown list encoding");
232 }
233 }
234
235 void listTypeConvert(robj *subject, int enc) {
236 listTypeIterator *li;
237 listTypeEntry entry;
238 redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
239
240 if (enc == REDIS_ENCODING_LINKEDLIST) {
241 list *l = listCreate();
242 listSetFreeMethod(l,decrRefCount);
243
244 /* listTypeGet returns a robj with incremented refcount */
245 li = listTypeInitIterator(subject,0,REDIS_TAIL);
246 while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
247 listTypeReleaseIterator(li);
248
249 subject->encoding = REDIS_ENCODING_LINKEDLIST;
250 zfree(subject->ptr);
251 subject->ptr = l;
252 } else {
253 redisPanic("Unsupported list conversion");
254 }
255 }
256
257 /*-----------------------------------------------------------------------------
258 * List Commands
259 *----------------------------------------------------------------------------*/
260
261 void pushGenericCommand(redisClient *c, int where) {
262 int j, addlen = 0, pushed = 0;
263 robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
264 int may_have_waiting_clients = (lobj == NULL);
265
266 if (lobj && lobj->type != REDIS_LIST) {
267 addReply(c,shared.wrongtypeerr);
268 return;
269 }
270
271 for (j = 2; j < c->argc; j++) {
272 c->argv[j] = tryObjectEncoding(c->argv[j]);
273 if (may_have_waiting_clients) {
274 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
275 addlen++;
276 continue;
277 } else {
278 may_have_waiting_clients = 0;
279 }
280 }
281 if (!lobj) {
282 lobj = createZiplistObject();
283 dbAdd(c->db,c->argv[1],lobj);
284 }
285 listTypePush(lobj,c->argv[j],where);
286 pushed++;
287 }
288 addReplyLongLong(c,addlen + (lobj ? listTypeLength(lobj) : 0));
289 if (pushed) signalModifiedKey(c->db,c->argv[1]);
290 server.dirty += pushed;
291 }
292
293 void lpushCommand(redisClient *c) {
294 pushGenericCommand(c,REDIS_HEAD);
295 }
296
297 void rpushCommand(redisClient *c) {
298 pushGenericCommand(c,REDIS_TAIL);
299 }
300
301 void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
302 robj *subject;
303 listTypeIterator *iter;
304 listTypeEntry entry;
305 int inserted = 0;
306
307 if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
308 checkType(c,subject,REDIS_LIST)) return;
309
310 if (refval != NULL) {
311 /* Note: we expect refval to be string-encoded because it is *not* the
312 * last argument of the multi-bulk LINSERT. */
313 redisAssertWithInfo(c,refval,refval->encoding == REDIS_ENCODING_RAW);
314
315 /* We're not sure if this value can be inserted yet, but we cannot
316 * convert the list inside the iterator. We don't want to loop over
317 * the list twice (once to see if the value can be inserted and once
318 * to do the actual insert), so we assume this value can be inserted
319 * and convert the ziplist to a regular list if necessary. */
320 listTypeTryConversion(subject,val);
321
322 /* Seek refval from head to tail */
323 iter = listTypeInitIterator(subject,0,REDIS_TAIL);
324 while (listTypeNext(iter,&entry)) {
325 if (listTypeEqual(&entry,refval)) {
326 listTypeInsert(&entry,val,where);
327 inserted = 1;
328 break;
329 }
330 }
331 listTypeReleaseIterator(iter);
332
333 if (inserted) {
334 /* Check if the length exceeds the ziplist length threshold. */
335 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
336 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
337 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
338 signalModifiedKey(c->db,c->argv[1]);
339 server.dirty++;
340 } else {
341 /* Notify client of a failed insert */
342 addReply(c,shared.cnegone);
343 return;
344 }
345 } else {
346 listTypePush(subject,val,where);
347 signalModifiedKey(c->db,c->argv[1]);
348 server.dirty++;
349 }
350
351 addReplyLongLong(c,listTypeLength(subject));
352 }
353
354 void lpushxCommand(redisClient *c) {
355 c->argv[2] = tryObjectEncoding(c->argv[2]);
356 pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
357 }
358
359 void rpushxCommand(redisClient *c) {
360 c->argv[2] = tryObjectEncoding(c->argv[2]);
361 pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
362 }
363
364 void linsertCommand(redisClient *c) {
365 c->argv[4] = tryObjectEncoding(c->argv[4]);
366 if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
367 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
368 } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
369 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD);
370 } else {
371 addReply(c,shared.syntaxerr);
372 }
373 }
374
375 void llenCommand(redisClient *c) {
376 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
377 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
378 addReplyLongLong(c,listTypeLength(o));
379 }
380
381 void lindexCommand(redisClient *c) {
382 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
383 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
384 long index;
385 robj *value = NULL;
386
387 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
388 return;
389
390 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
391 unsigned char *p;
392 unsigned char *vstr;
393 unsigned int vlen;
394 long long vlong;
395 p = ziplistIndex(o->ptr,index);
396 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
397 if (vstr) {
398 value = createStringObject((char*)vstr,vlen);
399 } else {
400 value = createStringObjectFromLongLong(vlong);
401 }
402 addReplyBulk(c,value);
403 decrRefCount(value);
404 } else {
405 addReply(c,shared.nullbulk);
406 }
407 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
408 listNode *ln = listIndex(o->ptr,index);
409 if (ln != NULL) {
410 value = listNodeValue(ln);
411 addReplyBulk(c,value);
412 } else {
413 addReply(c,shared.nullbulk);
414 }
415 } else {
416 redisPanic("Unknown list encoding");
417 }
418 }
419
420 void lsetCommand(redisClient *c) {
421 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
422 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
423 long index;
424 robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
425
426 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
427 return;
428
429 listTypeTryConversion(o,value);
430 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
431 unsigned char *p, *zl = o->ptr;
432 p = ziplistIndex(zl,index);
433 if (p == NULL) {
434 addReply(c,shared.outofrangeerr);
435 } else {
436 o->ptr = ziplistDelete(o->ptr,&p);
437 value = getDecodedObject(value);
438 o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
439 decrRefCount(value);
440 addReply(c,shared.ok);
441 signalModifiedKey(c->db,c->argv[1]);
442 server.dirty++;
443 }
444 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
445 listNode *ln = listIndex(o->ptr,index);
446 if (ln == NULL) {
447 addReply(c,shared.outofrangeerr);
448 } else {
449 decrRefCount((robj*)listNodeValue(ln));
450 listNodeValue(ln) = value;
451 incrRefCount(value);
452 addReply(c,shared.ok);
453 signalModifiedKey(c->db,c->argv[1]);
454 server.dirty++;
455 }
456 } else {
457 redisPanic("Unknown list encoding");
458 }
459 }
460
461 void popGenericCommand(redisClient *c, int where) {
462 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
463 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
464
465 robj *value = listTypePop(o,where);
466 if (value == NULL) {
467 addReply(c,shared.nullbulk);
468 } else {
469 addReplyBulk(c,value);
470 decrRefCount(value);
471 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
472 signalModifiedKey(c->db,c->argv[1]);
473 server.dirty++;
474 }
475 }
476
477 void lpopCommand(redisClient *c) {
478 popGenericCommand(c,REDIS_HEAD);
479 }
480
481 void rpopCommand(redisClient *c) {
482 popGenericCommand(c,REDIS_TAIL);
483 }
484
485 void lrangeCommand(redisClient *c) {
486 robj *o;
487 long start;
488 long end;
489 int llen;
490 int rangelen;
491
492 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
493 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
494
495 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
496 || checkType(c,o,REDIS_LIST)) return;
497 llen = listTypeLength(o);
498
499 /* convert negative indexes */
500 if (start < 0) start = llen+start;
501 if (end < 0) end = llen+end;
502 if (start < 0) start = 0;
503
504 /* Invariant: start >= 0, so this test will be true when end < 0.
505 * The range is empty when start > end or start >= length. */
506 if (start > end || start >= llen) {
507 addReply(c,shared.emptymultibulk);
508 return;
509 }
510 if (end >= llen) end = llen-1;
511 rangelen = (end-start)+1;
512
513 /* Return the result in form of a multi-bulk reply */
514 addReplyMultiBulkLen(c,rangelen);
515 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
516 unsigned char *p = ziplistIndex(o->ptr,start);
517 unsigned char *vstr;
518 unsigned int vlen;
519 long long vlong;
520
521 while(rangelen--) {
522 ziplistGet(p,&vstr,&vlen,&vlong);
523 if (vstr) {
524 addReplyBulkCBuffer(c,vstr,vlen);
525 } else {
526 addReplyBulkLongLong(c,vlong);
527 }
528 p = ziplistNext(o->ptr,p);
529 }
530 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
531 listNode *ln;
532
533 /* If we are nearest to the end of the list, reach the element
534 * starting from tail and going backward, as it is faster. */
535 if (start > llen/2) start -= llen;
536 ln = listIndex(o->ptr,start);
537
538 while(rangelen--) {
539 addReplyBulk(c,ln->value);
540 ln = ln->next;
541 }
542 } else {
543 redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
544 }
545 }
546
547 void ltrimCommand(redisClient *c) {
548 robj *o;
549 long start;
550 long end;
551 int llen;
552 int j, ltrim, rtrim;
553 list *list;
554 listNode *ln;
555
556 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
557 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
558
559 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
560 checkType(c,o,REDIS_LIST)) return;
561 llen = listTypeLength(o);
562
563 /* convert negative indexes */
564 if (start < 0) start = llen+start;
565 if (end < 0) end = llen+end;
566 if (start < 0) start = 0;
567
568 /* Invariant: start >= 0, so this test will be true when end < 0.
569 * The range is empty when start > end or start >= length. */
570 if (start > end || start >= llen) {
571 /* Out of range start or start > end result in empty list */
572 ltrim = llen;
573 rtrim = 0;
574 } else {
575 if (end >= llen) end = llen-1;
576 ltrim = start;
577 rtrim = llen-end-1;
578 }
579
580 /* Remove list elements to perform the trim */
581 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
582 o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
583 o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
584 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
585 list = o->ptr;
586 for (j = 0; j < ltrim; j++) {
587 ln = listFirst(list);
588 listDelNode(list,ln);
589 }
590 for (j = 0; j < rtrim; j++) {
591 ln = listLast(list);
592 listDelNode(list,ln);
593 }
594 } else {
595 redisPanic("Unknown list encoding");
596 }
597 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
598 signalModifiedKey(c->db,c->argv[1]);
599 server.dirty++;
600 addReply(c,shared.ok);
601 }
602
603 void lremCommand(redisClient *c) {
604 robj *subject, *obj;
605 obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
606 long toremove;
607 int removed = 0;
608 listTypeEntry entry;
609
610 if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
611 return;
612
613 subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
614 if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
615
616 /* Make sure obj is raw when we're dealing with a ziplist */
617 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
618 obj = getDecodedObject(obj);
619
620 listTypeIterator *li;
621 if (toremove < 0) {
622 toremove = -toremove;
623 li = listTypeInitIterator(subject,-1,REDIS_HEAD);
624 } else {
625 li = listTypeInitIterator(subject,0,REDIS_TAIL);
626 }
627
628 while (listTypeNext(li,&entry)) {
629 if (listTypeEqual(&entry,obj)) {
630 listTypeDelete(&entry);
631 server.dirty++;
632 removed++;
633 if (toremove && removed == toremove) break;
634 }
635 }
636 listTypeReleaseIterator(li);
637
638 /* Clean up raw encoded object */
639 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
640 decrRefCount(obj);
641
642 if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
643 addReplyLongLong(c,removed);
644 if (removed) signalModifiedKey(c->db,c->argv[1]);
645 }
646
647 /* This is the semantic of this command:
648 * RPOPLPUSH srclist dstlist:
649 * IF LLEN(srclist) > 0
650 * element = RPOP srclist
651 * LPUSH dstlist element
652 * RETURN element
653 * ELSE
654 * RETURN nil
655 * END
656 * END
657 *
658 * The idea is to be able to get an element from a list in a reliable way
659 * since the element is not just returned but pushed against another list
660 * as well. This command was originally proposed by Ezra Zygmuntowicz.
661 */
662
663 void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
664 robj *aux;
665
666 if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
667 /* Create the list if the key does not exist */
668 if (!dstobj) {
669 dstobj = createZiplistObject();
670 dbAdd(c->db,dstkey,dstobj);
671 } else {
672 signalModifiedKey(c->db,dstkey);
673 }
674 listTypePush(dstobj,value,REDIS_HEAD);
675 /* If we are pushing as a result of LPUSH against a key
676 * watched by BRPOPLPUSH, we need to rewrite the command vector
677 * as an LPUSH.
678 *
679 * If this is called directly by RPOPLPUSH (either directly
680 * or via a BRPOPLPUSH where the popped list exists)
681 * we should replicate the RPOPLPUSH command itself. */
682 if (c != origclient) {
683 aux = createStringObject("LPUSH",5);
684 rewriteClientCommandVector(origclient,3,aux,dstkey,value);
685 decrRefCount(aux);
686 } else {
687 /* Make sure to always use RPOPLPUSH in the replication / AOF,
688 * even if the original command was BRPOPLPUSH. */
689 aux = createStringObject("RPOPLPUSH",9);
690 rewriteClientCommandVector(origclient,3,aux,c->argv[1],c->argv[2]);
691 decrRefCount(aux);
692 }
693 server.dirty++;
694 }
695
696 /* Always send the pushed value to the client. */
697 addReplyBulk(c,value);
698 }
699
700 void rpoplpushCommand(redisClient *c) {
701 robj *sobj, *value;
702 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
703 checkType(c,sobj,REDIS_LIST)) return;
704
705 if (listTypeLength(sobj) == 0) {
706 addReply(c,shared.nullbulk);
707 } else {
708 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
709 robj *touchedkey = c->argv[1];
710
711 if (dobj && checkType(c,dobj,REDIS_LIST)) return;
712 value = listTypePop(sobj,REDIS_TAIL);
713 /* We saved touched key, and protect it, since rpoplpushHandlePush
714 * may change the client command argument vector. */
715 incrRefCount(touchedkey);
716 rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
717
718 /* listTypePop returns an object with its refcount incremented */
719 decrRefCount(value);
720
721 /* Delete the source list when it is empty */
722 if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
723 signalModifiedKey(c->db,touchedkey);
724 decrRefCount(touchedkey);
725 server.dirty++;
726 }
727 }
728
729 /*-----------------------------------------------------------------------------
730 * Blocking POP operations
731 *----------------------------------------------------------------------------*/
732
733 /* Currently Redis blocking operations support is limited to list POP ops,
734 * so the current implementation is not fully generic, but it is also not
735 * completely specific so it will not require a rewrite to support new
736 * kind of blocking operations in the future.
737 *
738 * Still it's important to note that list blocking operations can be already
739 * used as a notification mechanism in order to implement other blocking
740 * operations at application level, so there must be a very strong evidence
741 * of usefulness and generality before new blocking operations are implemented.
742 *
743 * This is how the current blocking POP works, we use BLPOP as example:
744 * - If the user calls BLPOP and the key exists and contains a non empty list
745 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
746 * if there is not to block.
747 * - If instead BLPOP is called and the key does not exists or the list is
748 * empty we need to block. In order to do so we remove the notification for
749 * new data to read in the client socket (so that we'll not serve new
750 * requests if the blocking request is not served). Also we put the client
751 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
752 * blocking for this keys.
753 * - If a PUSH operation against a key with blocked clients waiting is
754 * performed, we serve the first in the list: basically instead to push
755 * the new element inside the list we return it to the (first / oldest)
756 * blocking client, unblock the client, and remove it form the list.
757 *
758 * The above comment and the source code should be enough in order to understand
759 * the implementation and modify / fix it later.
760 */
761
762 /* Set a client in blocking mode for the specified key, with the specified
763 * timeout */
764 void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
765 dictEntry *de;
766 list *l;
767 int j;
768
769 c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
770 c->bpop.count = numkeys;
771 c->bpop.timeout = timeout;
772 c->bpop.target = target;
773
774 if (target != NULL) {
775 incrRefCount(target);
776 }
777
778 for (j = 0; j < numkeys; j++) {
779 /* Add the key in the client structure, to map clients -> keys */
780 c->bpop.keys[j] = keys[j];
781 incrRefCount(keys[j]);
782
783 /* And in the other "side", to map keys -> clients */
784 de = dictFind(c->db->blocking_keys,keys[j]);
785 if (de == NULL) {
786 int retval;
787
788 /* For every key we take a list of clients blocked for it */
789 l = listCreate();
790 retval = dictAdd(c->db->blocking_keys,keys[j],l);
791 incrRefCount(keys[j]);
792 redisAssertWithInfo(c,keys[j],retval == DICT_OK);
793 } else {
794 l = dictGetVal(de);
795 }
796 listAddNodeTail(l,c);
797 }
798 /* Mark the client as a blocked client */
799 c->flags |= REDIS_BLOCKED;
800 server.bpop_blocked_clients++;
801 }
802
803 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
804 void unblockClientWaitingData(redisClient *c) {
805 dictEntry *de;
806 list *l;
807 int j;
808
809 redisAssertWithInfo(c,NULL,c->bpop.keys != NULL);
810 /* The client may wait for multiple keys, so unblock it for every key. */
811 for (j = 0; j < c->bpop.count; j++) {
812 /* Remove this client from the list of clients waiting for this key. */
813 de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
814 redisAssertWithInfo(c,c->bpop.keys[j],de != NULL);
815 l = dictGetVal(de);
816 listDelNode(l,listSearchKey(l,c));
817 /* If the list is empty we need to remove it to avoid wasting memory */
818 if (listLength(l) == 0)
819 dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
820 decrRefCount(c->bpop.keys[j]);
821 }
822
823 /* Cleanup the client structure */
824 zfree(c->bpop.keys);
825 c->bpop.keys = NULL;
826 if (c->bpop.target) decrRefCount(c->bpop.target);
827 c->bpop.target = NULL;
828 c->flags &= ~REDIS_BLOCKED;
829 c->flags |= REDIS_UNBLOCKED;
830 server.bpop_blocked_clients--;
831 listAddNodeTail(server.unblocked_clients,c);
832 }
833
834 /* This should be called from any function PUSHing into lists.
835 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
836 * 'ele' is the element pushed.
837 *
838 * If the function returns 0 there was no client waiting for a list push
839 * against this key.
840 *
841 * If the function returns 1 there was a client waiting for a list push
842 * against this key, the element was passed to this client thus it's not
843 * needed to actually add it to the list and the caller should return asap. */
844 int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
845 struct dictEntry *de;
846 redisClient *receiver;
847 int numclients;
848 list *clients;
849 listNode *ln;
850 robj *dstkey, *dstobj;
851
852 de = dictFind(c->db->blocking_keys,key);
853 if (de == NULL) return 0;
854 clients = dictGetVal(de);
855 numclients = listLength(clients);
856
857 /* Try to handle the push as long as there are clients waiting for a push.
858 * Note that "numclients" is used because the list of clients waiting for a
859 * push on "key" is deleted by unblockClient() when empty.
860 *
861 * This loop will have more than 1 iteration when there is a BRPOPLPUSH
862 * that cannot push the target list because it does not contain a list. If
863 * this happens, it simply tries the next client waiting for a push. */
864 while (numclients--) {
865 ln = listFirst(clients);
866 redisAssertWithInfo(c,key,ln != NULL);
867 receiver = ln->value;
868 dstkey = receiver->bpop.target;
869
870 /* Protect receiver->bpop.target, that will be freed by
871 * the next unblockClientWaitingData() call. */
872 if (dstkey) incrRefCount(dstkey);
873
874 /* This should remove the first element of the "clients" list. */
875 unblockClientWaitingData(receiver);
876
877 if (dstkey == NULL) {
878 /* BRPOP/BLPOP */
879 addReplyMultiBulkLen(receiver,2);
880 addReplyBulk(receiver,key);
881 addReplyBulk(receiver,ele);
882 return 1; /* Serve just the first client as in B[RL]POP semantics */
883 } else {
884 /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
885 dstobj = lookupKeyWrite(receiver->db,dstkey);
886 if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
887 rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
888 decrRefCount(dstkey);
889 return 1;
890 }
891 decrRefCount(dstkey);
892 }
893 }
894
895 return 0;
896 }
897
898 int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
899 long tval;
900
901 if (getLongFromObjectOrReply(c,object,&tval,
902 "timeout is not an integer or out of range") != REDIS_OK)
903 return REDIS_ERR;
904
905 if (tval < 0) {
906 addReplyError(c,"timeout is negative");
907 return REDIS_ERR;
908 }
909
910 if (tval > 0) tval += time(NULL);
911 *timeout = tval;
912
913 return REDIS_OK;
914 }
915
916 /* Blocking RPOP/LPOP */
917 void blockingPopGenericCommand(redisClient *c, int where) {
918 robj *o;
919 time_t timeout;
920 int j;
921
922 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
923 return;
924
925 for (j = 1; j < c->argc-1; j++) {
926 o = lookupKeyWrite(c->db,c->argv[j]);
927 if (o != NULL) {
928 if (o->type != REDIS_LIST) {
929 addReply(c,shared.wrongtypeerr);
930 return;
931 } else {
932 if (listTypeLength(o) != 0) {
933 /* If the list contains elements fall back to the usual
934 * non-blocking POP operation */
935 struct redisCommand *orig_cmd;
936 robj *argv[2], **orig_argv;
937 int orig_argc;
938
939 /* We need to alter the command arguments before to call
940 * popGenericCommand() as the command takes a single key. */
941 orig_argv = c->argv;
942 orig_argc = c->argc;
943 orig_cmd = c->cmd;
944 argv[1] = c->argv[j];
945 c->argv = argv;
946 c->argc = 2;
947
948 /* Also the return value is different, we need to output
949 * the multi bulk reply header and the key name. The
950 * "real" command will add the last element (the value)
951 * for us. If this souds like an hack to you it's just
952 * because it is... */
953 addReplyMultiBulkLen(c,2);
954 addReplyBulk(c,argv[1]);
955
956 popGenericCommand(c,where);
957
958 /* Fix the client structure with the original stuff */
959 c->argv = orig_argv;
960 c->argc = orig_argc;
961 c->cmd = orig_cmd;
962
963 return;
964 }
965 }
966 }
967 }
968
969 /* If we are inside a MULTI/EXEC and the list is empty the only thing
970 * we can do is treating it as a timeout (even with timeout 0). */
971 if (c->flags & REDIS_MULTI) {
972 addReply(c,shared.nullmultibulk);
973 return;
974 }
975
976 /* If the list is empty or the key does not exists we must block */
977 blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
978 }
979
980 void blpopCommand(redisClient *c) {
981 blockingPopGenericCommand(c,REDIS_HEAD);
982 }
983
984 void brpopCommand(redisClient *c) {
985 blockingPopGenericCommand(c,REDIS_TAIL);
986 }
987
988 void brpoplpushCommand(redisClient *c) {
989 time_t timeout;
990
991 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
992 return;
993
994 robj *key = lookupKeyWrite(c->db, c->argv[1]);
995
996 if (key == NULL) {
997 if (c->flags & REDIS_MULTI) {
998
999 /* Blocking against an empty list in a multi state
1000 * returns immediately. */
1001 addReply(c, shared.nullbulk);
1002 } else {
1003 /* The list is empty and the client blocks. */
1004 blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
1005 }
1006 } else {
1007 if (key->type != REDIS_LIST) {
1008 addReply(c, shared.wrongtypeerr);
1009 } else {
1010
1011 /* The list exists and has elements, so
1012 * the regular rpoplpushCommand is executed. */
1013 redisAssertWithInfo(c,key,listTypeLength(key) > 0);
1014 rpoplpushCommand(c);
1015 }
1016 }
1017 }