]> git.saurik.com Git - redis.git/blame_incremental - src/t_list.c
Merge branch 'unstable' of github.com:antirez/redis-private into unstable
[redis.git] / src / t_list.c
... / ...
CommitLineData
1#include "redis.h"
2
3/*-----------------------------------------------------------------------------
4 * List API
5 *----------------------------------------------------------------------------*/
6
7/* Check the argument length to see if it requires us to convert the ziplist
8 * to a real list. Only check raw-encoded objects because integer encoded
9 * objects are never too long. */
10void listTypeTryConversion(robj *subject, robj *value) {
11 if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
12 if (value->encoding == REDIS_ENCODING_RAW &&
13 sdslen(value->ptr) > server.list_max_ziplist_value)
14 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
15}
16
17void listTypePush(robj *subject, robj *value, int where) {
18 /* Check if we need to convert the ziplist */
19 listTypeTryConversion(subject,value);
20 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
21 ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
22 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
23
24 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
25 int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
26 value = getDecodedObject(value);
27 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
28 decrRefCount(value);
29 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
30 if (where == REDIS_HEAD) {
31 listAddNodeHead(subject->ptr,value);
32 } else {
33 listAddNodeTail(subject->ptr,value);
34 }
35 incrRefCount(value);
36 } else {
37 redisPanic("Unknown list encoding");
38 }
39}
40
41robj *listTypePop(robj *subject, int where) {
42 robj *value = NULL;
43 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
44 unsigned char *p;
45 unsigned char *vstr;
46 unsigned int vlen;
47 long long vlong;
48 int pos = (where == REDIS_HEAD) ? 0 : -1;
49 p = ziplistIndex(subject->ptr,pos);
50 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
51 if (vstr) {
52 value = createStringObject((char*)vstr,vlen);
53 } else {
54 value = createStringObjectFromLongLong(vlong);
55 }
56 /* We only need to delete an element when it exists */
57 subject->ptr = ziplistDelete(subject->ptr,&p);
58 }
59 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
60 list *list = subject->ptr;
61 listNode *ln;
62 if (where == REDIS_HEAD) {
63 ln = listFirst(list);
64 } else {
65 ln = listLast(list);
66 }
67 if (ln != NULL) {
68 value = listNodeValue(ln);
69 incrRefCount(value);
70 listDelNode(list,ln);
71 }
72 } else {
73 redisPanic("Unknown list encoding");
74 }
75 return value;
76}
77
78unsigned long listTypeLength(robj *subject) {
79 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
80 return ziplistLen(subject->ptr);
81 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
82 return listLength((list*)subject->ptr);
83 } else {
84 redisPanic("Unknown list encoding");
85 }
86}
87
88/* Initialize an iterator at the specified index. */
89listTypeIterator *listTypeInitIterator(robj *subject, int index, unsigned char direction) {
90 listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
91 li->subject = subject;
92 li->encoding = subject->encoding;
93 li->direction = direction;
94 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
95 li->zi = ziplistIndex(subject->ptr,index);
96 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
97 li->ln = listIndex(subject->ptr,index);
98 } else {
99 redisPanic("Unknown list encoding");
100 }
101 return li;
102}
103
104/* Clean up the iterator. */
105void listTypeReleaseIterator(listTypeIterator *li) {
106 zfree(li);
107}
108
109/* Stores pointer to current the entry in the provided entry structure
110 * and advances the position of the iterator. Returns 1 when the current
111 * entry is in fact an entry, 0 otherwise. */
112int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
113 /* Protect from converting when iterating */
114 redisAssert(li->subject->encoding == li->encoding);
115
116 entry->li = li;
117 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
118 entry->zi = li->zi;
119 if (entry->zi != NULL) {
120 if (li->direction == REDIS_TAIL)
121 li->zi = ziplistNext(li->subject->ptr,li->zi);
122 else
123 li->zi = ziplistPrev(li->subject->ptr,li->zi);
124 return 1;
125 }
126 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
127 entry->ln = li->ln;
128 if (entry->ln != NULL) {
129 if (li->direction == REDIS_TAIL)
130 li->ln = li->ln->next;
131 else
132 li->ln = li->ln->prev;
133 return 1;
134 }
135 } else {
136 redisPanic("Unknown list encoding");
137 }
138 return 0;
139}
140
141/* Return entry or NULL at the current position of the iterator. */
142robj *listTypeGet(listTypeEntry *entry) {
143 listTypeIterator *li = entry->li;
144 robj *value = NULL;
145 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
146 unsigned char *vstr;
147 unsigned int vlen;
148 long long vlong;
149 redisAssert(entry->zi != NULL);
150 if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
151 if (vstr) {
152 value = createStringObject((char*)vstr,vlen);
153 } else {
154 value = createStringObjectFromLongLong(vlong);
155 }
156 }
157 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
158 redisAssert(entry->ln != NULL);
159 value = listNodeValue(entry->ln);
160 incrRefCount(value);
161 } else {
162 redisPanic("Unknown list encoding");
163 }
164 return value;
165}
166
167void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
168 robj *subject = entry->li->subject;
169 if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
170 value = getDecodedObject(value);
171 if (where == REDIS_TAIL) {
172 unsigned char *next = ziplistNext(subject->ptr,entry->zi);
173
174 /* When we insert after the current element, but the current element
175 * is the tail of the list, we need to do a push. */
176 if (next == NULL) {
177 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
178 } else {
179 subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
180 }
181 } else {
182 subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
183 }
184 decrRefCount(value);
185 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
186 if (where == REDIS_TAIL) {
187 listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
188 } else {
189 listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
190 }
191 incrRefCount(value);
192 } else {
193 redisPanic("Unknown list encoding");
194 }
195}
196
197/* Compare the given object with the entry at the current position. */
198int listTypeEqual(listTypeEntry *entry, robj *o) {
199 listTypeIterator *li = entry->li;
200 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
201 redisAssert(o->encoding == REDIS_ENCODING_RAW);
202 return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
203 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
204 return equalStringObjects(o,listNodeValue(entry->ln));
205 } else {
206 redisPanic("Unknown list encoding");
207 }
208}
209
210/* Delete the element pointed to. */
211void listTypeDelete(listTypeEntry *entry) {
212 listTypeIterator *li = entry->li;
213 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
214 unsigned char *p = entry->zi;
215 li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
216
217 /* Update position of the iterator depending on the direction */
218 if (li->direction == REDIS_TAIL)
219 li->zi = p;
220 else
221 li->zi = ziplistPrev(li->subject->ptr,p);
222 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
223 listNode *next;
224 if (li->direction == REDIS_TAIL)
225 next = entry->ln->next;
226 else
227 next = entry->ln->prev;
228 listDelNode(li->subject->ptr,entry->ln);
229 li->ln = next;
230 } else {
231 redisPanic("Unknown list encoding");
232 }
233}
234
235void listTypeConvert(robj *subject, int enc) {
236 listTypeIterator *li;
237 listTypeEntry entry;
238 redisAssert(subject->type == REDIS_LIST);
239
240 if (enc == REDIS_ENCODING_LINKEDLIST) {
241 list *l = listCreate();
242 listSetFreeMethod(l,decrRefCount);
243
244 /* listTypeGet returns a robj with incremented refcount */
245 li = listTypeInitIterator(subject,0,REDIS_TAIL);
246 while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
247 listTypeReleaseIterator(li);
248
249 subject->encoding = REDIS_ENCODING_LINKEDLIST;
250 zfree(subject->ptr);
251 subject->ptr = l;
252 } else {
253 redisPanic("Unsupported list conversion");
254 }
255}
256
257/*-----------------------------------------------------------------------------
258 * List Commands
259 *----------------------------------------------------------------------------*/
260
261void pushGenericCommand(redisClient *c, int where) {
262 robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
263 c->argv[2] = tryObjectEncoding(c->argv[2]);
264 if (lobj == NULL) {
265 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
266 addReply(c,shared.cone);
267 return;
268 }
269 lobj = createZiplistObject();
270 dbAdd(c->db,c->argv[1],lobj);
271 } else {
272 if (lobj->type != REDIS_LIST) {
273 addReply(c,shared.wrongtypeerr);
274 return;
275 }
276 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
277 signalModifiedKey(c->db,c->argv[1]);
278 addReply(c,shared.cone);
279 return;
280 }
281 }
282 listTypePush(lobj,c->argv[2],where);
283 addReplyLongLong(c,listTypeLength(lobj));
284 signalModifiedKey(c->db,c->argv[1]);
285 server.dirty++;
286}
287
288void lpushCommand(redisClient *c) {
289 pushGenericCommand(c,REDIS_HEAD);
290}
291
292void rpushCommand(redisClient *c) {
293 pushGenericCommand(c,REDIS_TAIL);
294}
295
296void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
297 robj *subject;
298 listTypeIterator *iter;
299 listTypeEntry entry;
300 int inserted = 0;
301
302 if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
303 checkType(c,subject,REDIS_LIST)) return;
304
305 if (refval != NULL) {
306 /* Note: we expect refval to be string-encoded because it is *not* the
307 * last argument of the multi-bulk LINSERT. */
308 redisAssert(refval->encoding == REDIS_ENCODING_RAW);
309
310 /* We're not sure if this value can be inserted yet, but we cannot
311 * convert the list inside the iterator. We don't want to loop over
312 * the list twice (once to see if the value can be inserted and once
313 * to do the actual insert), so we assume this value can be inserted
314 * and convert the ziplist to a regular list if necessary. */
315 listTypeTryConversion(subject,val);
316
317 /* Seek refval from head to tail */
318 iter = listTypeInitIterator(subject,0,REDIS_TAIL);
319 while (listTypeNext(iter,&entry)) {
320 if (listTypeEqual(&entry,refval)) {
321 listTypeInsert(&entry,val,where);
322 inserted = 1;
323 break;
324 }
325 }
326 listTypeReleaseIterator(iter);
327
328 if (inserted) {
329 /* Check if the length exceeds the ziplist length threshold. */
330 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
331 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
332 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
333 signalModifiedKey(c->db,c->argv[1]);
334 server.dirty++;
335 } else {
336 /* Notify client of a failed insert */
337 addReply(c,shared.cnegone);
338 return;
339 }
340 } else {
341 listTypePush(subject,val,where);
342 signalModifiedKey(c->db,c->argv[1]);
343 server.dirty++;
344 }
345
346 addReplyLongLong(c,listTypeLength(subject));
347}
348
349void lpushxCommand(redisClient *c) {
350 c->argv[2] = tryObjectEncoding(c->argv[2]);
351 pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
352}
353
354void rpushxCommand(redisClient *c) {
355 c->argv[2] = tryObjectEncoding(c->argv[2]);
356 pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
357}
358
359void linsertCommand(redisClient *c) {
360 c->argv[4] = tryObjectEncoding(c->argv[4]);
361 if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
362 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
363 } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
364 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD);
365 } else {
366 addReply(c,shared.syntaxerr);
367 }
368}
369
370void llenCommand(redisClient *c) {
371 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
372 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
373 addReplyLongLong(c,listTypeLength(o));
374}
375
376void lindexCommand(redisClient *c) {
377 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
378 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
379 int index = atoi(c->argv[2]->ptr);
380 robj *value = NULL;
381
382 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
383 unsigned char *p;
384 unsigned char *vstr;
385 unsigned int vlen;
386 long long vlong;
387 p = ziplistIndex(o->ptr,index);
388 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
389 if (vstr) {
390 value = createStringObject((char*)vstr,vlen);
391 } else {
392 value = createStringObjectFromLongLong(vlong);
393 }
394 addReplyBulk(c,value);
395 decrRefCount(value);
396 } else {
397 addReply(c,shared.nullbulk);
398 }
399 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
400 listNode *ln = listIndex(o->ptr,index);
401 if (ln != NULL) {
402 value = listNodeValue(ln);
403 addReplyBulk(c,value);
404 } else {
405 addReply(c,shared.nullbulk);
406 }
407 } else {
408 redisPanic("Unknown list encoding");
409 }
410}
411
412void lsetCommand(redisClient *c) {
413 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
414 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
415 int index = atoi(c->argv[2]->ptr);
416 robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
417
418 listTypeTryConversion(o,value);
419 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
420 unsigned char *p, *zl = o->ptr;
421 p = ziplistIndex(zl,index);
422 if (p == NULL) {
423 addReply(c,shared.outofrangeerr);
424 } else {
425 o->ptr = ziplistDelete(o->ptr,&p);
426 value = getDecodedObject(value);
427 o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
428 decrRefCount(value);
429 addReply(c,shared.ok);
430 signalModifiedKey(c->db,c->argv[1]);
431 server.dirty++;
432 }
433 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
434 listNode *ln = listIndex(o->ptr,index);
435 if (ln == NULL) {
436 addReply(c,shared.outofrangeerr);
437 } else {
438 decrRefCount((robj*)listNodeValue(ln));
439 listNodeValue(ln) = value;
440 incrRefCount(value);
441 addReply(c,shared.ok);
442 signalModifiedKey(c->db,c->argv[1]);
443 server.dirty++;
444 }
445 } else {
446 redisPanic("Unknown list encoding");
447 }
448}
449
450void popGenericCommand(redisClient *c, int where) {
451 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
452 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
453
454 robj *value = listTypePop(o,where);
455 if (value == NULL) {
456 addReply(c,shared.nullbulk);
457 } else {
458 addReplyBulk(c,value);
459 decrRefCount(value);
460 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
461 signalModifiedKey(c->db,c->argv[1]);
462 server.dirty++;
463 }
464}
465
466void lpopCommand(redisClient *c) {
467 popGenericCommand(c,REDIS_HEAD);
468}
469
470void rpopCommand(redisClient *c) {
471 popGenericCommand(c,REDIS_TAIL);
472}
473
474void lrangeCommand(redisClient *c) {
475 robj *o;
476 int start = atoi(c->argv[2]->ptr);
477 int end = atoi(c->argv[3]->ptr);
478 int llen;
479 int rangelen;
480
481 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
482 || checkType(c,o,REDIS_LIST)) return;
483 llen = listTypeLength(o);
484
485 /* convert negative indexes */
486 if (start < 0) start = llen+start;
487 if (end < 0) end = llen+end;
488 if (start < 0) start = 0;
489
490 /* Invariant: start >= 0, so this test will be true when end < 0.
491 * The range is empty when start > end or start >= length. */
492 if (start > end || start >= llen) {
493 addReply(c,shared.emptymultibulk);
494 return;
495 }
496 if (end >= llen) end = llen-1;
497 rangelen = (end-start)+1;
498
499 /* Return the result in form of a multi-bulk reply */
500 addReplyMultiBulkLen(c,rangelen);
501 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
502 unsigned char *p = ziplistIndex(o->ptr,start);
503 unsigned char *vstr;
504 unsigned int vlen;
505 long long vlong;
506
507 while(rangelen--) {
508 ziplistGet(p,&vstr,&vlen,&vlong);
509 if (vstr) {
510 addReplyBulkCBuffer(c,vstr,vlen);
511 } else {
512 addReplyBulkLongLong(c,vlong);
513 }
514 p = ziplistNext(o->ptr,p);
515 }
516 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
517 listNode *ln = listIndex(o->ptr,start);
518
519 while(rangelen--) {
520 addReplyBulk(c,ln->value);
521 ln = ln->next;
522 }
523 } else {
524 redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
525 }
526}
527
528void ltrimCommand(redisClient *c) {
529 robj *o;
530 int start = atoi(c->argv[2]->ptr);
531 int end = atoi(c->argv[3]->ptr);
532 int llen;
533 int j, ltrim, rtrim;
534 list *list;
535 listNode *ln;
536
537 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
538 checkType(c,o,REDIS_LIST)) return;
539 llen = listTypeLength(o);
540
541 /* convert negative indexes */
542 if (start < 0) start = llen+start;
543 if (end < 0) end = llen+end;
544 if (start < 0) start = 0;
545
546 /* Invariant: start >= 0, so this test will be true when end < 0.
547 * The range is empty when start > end or start >= length. */
548 if (start > end || start >= llen) {
549 /* Out of range start or start > end result in empty list */
550 ltrim = llen;
551 rtrim = 0;
552 } else {
553 if (end >= llen) end = llen-1;
554 ltrim = start;
555 rtrim = llen-end-1;
556 }
557
558 /* Remove list elements to perform the trim */
559 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
560 o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
561 o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
562 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
563 list = o->ptr;
564 for (j = 0; j < ltrim; j++) {
565 ln = listFirst(list);
566 listDelNode(list,ln);
567 }
568 for (j = 0; j < rtrim; j++) {
569 ln = listLast(list);
570 listDelNode(list,ln);
571 }
572 } else {
573 redisPanic("Unknown list encoding");
574 }
575 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
576 signalModifiedKey(c->db,c->argv[1]);
577 server.dirty++;
578 addReply(c,shared.ok);
579}
580
581void lremCommand(redisClient *c) {
582 robj *subject, *obj;
583 obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
584 int toremove = atoi(c->argv[2]->ptr);
585 int removed = 0;
586 listTypeEntry entry;
587
588 subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
589 if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
590
591 /* Make sure obj is raw when we're dealing with a ziplist */
592 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
593 obj = getDecodedObject(obj);
594
595 listTypeIterator *li;
596 if (toremove < 0) {
597 toremove = -toremove;
598 li = listTypeInitIterator(subject,-1,REDIS_HEAD);
599 } else {
600 li = listTypeInitIterator(subject,0,REDIS_TAIL);
601 }
602
603 while (listTypeNext(li,&entry)) {
604 if (listTypeEqual(&entry,obj)) {
605 listTypeDelete(&entry);
606 server.dirty++;
607 removed++;
608 if (toremove && removed == toremove) break;
609 }
610 }
611 listTypeReleaseIterator(li);
612
613 /* Clean up raw encoded object */
614 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
615 decrRefCount(obj);
616
617 if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
618 addReplyLongLong(c,removed);
619 if (removed) signalModifiedKey(c->db,c->argv[1]);
620}
621
622/* This is the semantic of this command:
623 * RPOPLPUSH srclist dstlist:
624 * IF LLEN(srclist) > 0
625 * element = RPOP srclist
626 * LPUSH dstlist element
627 * RETURN element
628 * ELSE
629 * RETURN nil
630 * END
631 * END
632 *
633 * The idea is to be able to get an element from a list in a reliable way
634 * since the element is not just returned but pushed against another list
635 * as well. This command was originally proposed by Ezra Zygmuntowicz.
636 */
637
638void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
639 if (!handleClientsWaitingListPush(c,dstkey,value)) {
640 /* Create the list if the key does not exist */
641 if (!dstobj) {
642 dstobj = createZiplistObject();
643 dbAdd(c->db,dstkey,dstobj);
644 } else {
645 signalModifiedKey(c->db,dstkey);
646 server.dirty++;
647 }
648 listTypePush(dstobj,value,REDIS_HEAD);
649 }
650
651 /* Always send the pushed value to the client. */
652 addReplyBulk(c,value);
653}
654
655void rpoplpushCommand(redisClient *c) {
656 robj *sobj, *value;
657 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
658 checkType(c,sobj,REDIS_LIST)) return;
659
660 if (listTypeLength(sobj) == 0) {
661 addReply(c,shared.nullbulk);
662 } else {
663 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
664 if (dobj && checkType(c,dobj,REDIS_LIST)) return;
665 value = listTypePop(sobj,REDIS_TAIL);
666 rpoplpushHandlePush(c,c->argv[2],dobj,value);
667
668 /* listTypePop returns an object with its refcount incremented */
669 decrRefCount(value);
670
671 /* Delete the source list when it is empty */
672 if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
673 signalModifiedKey(c->db,c->argv[1]);
674 server.dirty++;
675 }
676}
677
678/*-----------------------------------------------------------------------------
679 * Blocking POP operations
680 *----------------------------------------------------------------------------*/
681
682/* Currently Redis blocking operations support is limited to list POP ops,
683 * so the current implementation is not fully generic, but it is also not
684 * completely specific so it will not require a rewrite to support new
685 * kind of blocking operations in the future.
686 *
687 * Still it's important to note that list blocking operations can be already
688 * used as a notification mechanism in order to implement other blocking
689 * operations at application level, so there must be a very strong evidence
690 * of usefulness and generality before new blocking operations are implemented.
691 *
692 * This is how the current blocking POP works, we use BLPOP as example:
693 * - If the user calls BLPOP and the key exists and contains a non empty list
694 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
695 * if there is not to block.
696 * - If instead BLPOP is called and the key does not exists or the list is
697 * empty we need to block. In order to do so we remove the notification for
698 * new data to read in the client socket (so that we'll not serve new
699 * requests if the blocking request is not served). Also we put the client
700 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
701 * blocking for this keys.
702 * - If a PUSH operation against a key with blocked clients waiting is
703 * performed, we serve the first in the list: basically instead to push
704 * the new element inside the list we return it to the (first / oldest)
705 * blocking client, unblock the client, and remove it form the list.
706 *
707 * The above comment and the source code should be enough in order to understand
708 * the implementation and modify / fix it later.
709 */
710
711/* Set a client in blocking mode for the specified key, with the specified
712 * timeout */
713void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
714 dictEntry *de;
715 list *l;
716 int j;
717
718 c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
719 c->bpop.count = numkeys;
720 c->bpop.timeout = timeout;
721 c->bpop.target = target;
722
723 if (target != NULL) {
724 incrRefCount(target);
725 }
726
727 for (j = 0; j < numkeys; j++) {
728 /* Add the key in the client structure, to map clients -> keys */
729 c->bpop.keys[j] = keys[j];
730 incrRefCount(keys[j]);
731
732 /* And in the other "side", to map keys -> clients */
733 de = dictFind(c->db->blocking_keys,keys[j]);
734 if (de == NULL) {
735 int retval;
736
737 /* For every key we take a list of clients blocked for it */
738 l = listCreate();
739 retval = dictAdd(c->db->blocking_keys,keys[j],l);
740 incrRefCount(keys[j]);
741 redisAssert(retval == DICT_OK);
742 } else {
743 l = dictGetEntryVal(de);
744 }
745 listAddNodeTail(l,c);
746 }
747 /* Mark the client as a blocked client */
748 c->flags |= REDIS_BLOCKED;
749 server.bpop_blocked_clients++;
750}
751
752/* Unblock a client that's waiting in a blocking operation such as BLPOP */
753void unblockClientWaitingData(redisClient *c) {
754 dictEntry *de;
755 list *l;
756 int j;
757
758 redisAssert(c->bpop.keys != NULL);
759 /* The client may wait for multiple keys, so unblock it for every key. */
760 for (j = 0; j < c->bpop.count; j++) {
761 /* Remove this client from the list of clients waiting for this key. */
762 de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
763 redisAssert(de != NULL);
764 l = dictGetEntryVal(de);
765 listDelNode(l,listSearchKey(l,c));
766 /* If the list is empty we need to remove it to avoid wasting memory */
767 if (listLength(l) == 0)
768 dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
769 decrRefCount(c->bpop.keys[j]);
770 }
771
772 /* Cleanup the client structure */
773 zfree(c->bpop.keys);
774 c->bpop.keys = NULL;
775 c->bpop.target = NULL;
776 c->flags &= (~REDIS_BLOCKED);
777 server.bpop_blocked_clients--;
778 listAddNodeTail(server.unblocked_clients,c);
779}
780
781/* This should be called from any function PUSHing into lists.
782 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
783 * 'ele' is the element pushed.
784 *
785 * If the function returns 0 there was no client waiting for a list push
786 * against this key.
787 *
788 * If the function returns 1 there was a client waiting for a list push
789 * against this key, the element was passed to this client thus it's not
790 * needed to actually add it to the list and the caller should return asap. */
791int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
792 struct dictEntry *de;
793 redisClient *receiver;
794 int numclients;
795 list *clients;
796 listNode *ln;
797 robj *dstkey, *dstobj;
798
799 de = dictFind(c->db->blocking_keys,key);
800 if (de == NULL) return 0;
801 clients = dictGetEntryVal(de);
802 numclients = listLength(clients);
803
804 /* Try to handle the push as long as there are clients waiting for a push.
805 * Note that "numclients" is used because the list of clients waiting for a
806 * push on "key" is deleted by unblockClient() when empty.
807 *
808 * This loop will have more than 1 iteration when there is a BRPOPLPUSH
809 * that cannot push the target list because it does not contain a list. If
810 * this happens, it simply tries the next client waiting for a push. */
811 while (numclients--) {
812 ln = listFirst(clients);
813 redisAssert(ln != NULL);
814 receiver = ln->value;
815 dstkey = receiver->bpop.target;
816
817 /* This should remove the first element of the "clients" list. */
818 unblockClientWaitingData(receiver);
819 redisAssert(ln != listFirst(clients));
820
821 if (dstkey == NULL) {
822 /* BRPOP/BLPOP */
823 addReplyMultiBulkLen(receiver,2);
824 addReplyBulk(receiver,key);
825 addReplyBulk(receiver,ele);
826 return 1;
827 } else {
828 /* BRPOPLPUSH */
829 dstobj = lookupKeyWrite(receiver->db,dstkey);
830 if (dstobj && checkType(receiver,dstobj,REDIS_LIST)) {
831 decrRefCount(dstkey);
832 } else {
833 rpoplpushHandlePush(receiver,dstkey,dstobj,ele);
834 decrRefCount(dstkey);
835 return 1;
836 }
837 }
838 }
839
840 return 0;
841}
842
843int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
844 long tval;
845
846 if (getLongFromObjectOrReply(c,object,&tval,
847 "timeout is not an integer or out of range") != REDIS_OK)
848 return REDIS_ERR;
849
850 if (tval < 0) {
851 addReplyError(c,"timeout is negative");
852 return REDIS_ERR;
853 }
854
855 if (tval > 0) tval += time(NULL);
856 *timeout = tval;
857
858 return REDIS_OK;
859}
860
861/* Blocking RPOP/LPOP */
862void blockingPopGenericCommand(redisClient *c, int where) {
863 robj *o;
864 time_t timeout;
865 int j;
866
867 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
868 return;
869
870 for (j = 1; j < c->argc-1; j++) {
871 o = lookupKeyWrite(c->db,c->argv[j]);
872 if (o != NULL) {
873 if (o->type != REDIS_LIST) {
874 addReply(c,shared.wrongtypeerr);
875 return;
876 } else {
877 if (listTypeLength(o) != 0) {
878 /* If the list contains elements fall back to the usual
879 * non-blocking POP operation */
880 robj *argv[2], **orig_argv;
881 int orig_argc;
882
883 /* We need to alter the command arguments before to call
884 * popGenericCommand() as the command takes a single key. */
885 orig_argv = c->argv;
886 orig_argc = c->argc;
887 argv[1] = c->argv[j];
888 c->argv = argv;
889 c->argc = 2;
890
891 /* Also the return value is different, we need to output
892 * the multi bulk reply header and the key name. The
893 * "real" command will add the last element (the value)
894 * for us. If this souds like an hack to you it's just
895 * because it is... */
896 addReplyMultiBulkLen(c,2);
897 addReplyBulk(c,argv[1]);
898
899 popGenericCommand(c,where);
900
901 /* Fix the client structure with the original stuff */
902 c->argv = orig_argv;
903 c->argc = orig_argc;
904
905 return;
906 }
907 }
908 }
909 }
910
911 /* If we are inside a MULTI/EXEC and the list is empty the only thing
912 * we can do is treating it as a timeout (even with timeout 0). */
913 if (c->flags & REDIS_MULTI) {
914 addReply(c,shared.nullmultibulk);
915 return;
916 }
917
918 /* If the list is empty or the key does not exists we must block */
919 blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
920}
921
922void blpopCommand(redisClient *c) {
923 blockingPopGenericCommand(c,REDIS_HEAD);
924}
925
926void brpopCommand(redisClient *c) {
927 blockingPopGenericCommand(c,REDIS_TAIL);
928}
929
930void brpoplpushCommand(redisClient *c) {
931 time_t timeout;
932
933 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
934 return;
935
936 robj *key = lookupKeyWrite(c->db, c->argv[1]);
937
938 if (key == NULL) {
939 if (c->flags & REDIS_MULTI) {
940
941 /* Blocking against an empty list in a multi state
942 * returns immediately. */
943 addReply(c, shared.nullmultibulk);
944 } else {
945 /* The list is empty and the client blocks. */
946 blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
947 }
948 } else {
949 if (key->type != REDIS_LIST) {
950 addReply(c, shared.wrongtypeerr);
951 } else {
952
953 /* The list exists and has elements, so
954 * the regular rpoplpushCommand is executed. */
955 redisAssert(listTypeLength(key) > 0);
956 rpoplpushCommand(c);
957 }
958 }
959}