]> git.saurik.com Git - redis.git/blob - src/t_list.c
Make an EXEC test more latency proof.
[redis.git] / src / t_list.c
1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "redis.h"
31
32 void signalListAsReady(redisClient *c, robj *key);
33
34 /*-----------------------------------------------------------------------------
35 * List API
36 *----------------------------------------------------------------------------*/
37
38 /* Check the argument length to see if it requires us to convert the ziplist
39 * to a real list. Only check raw-encoded objects because integer encoded
40 * objects are never too long. */
41 void listTypeTryConversion(robj *subject, robj *value) {
42 if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
43 if (value->encoding == REDIS_ENCODING_RAW &&
44 sdslen(value->ptr) > server.list_max_ziplist_value)
45 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
46 }
47
48 /* The function pushes an elmenet to the specified list object 'subject',
49 * at head or tail position as specified by 'where'.
50 *
51 * There is no need for the caller to incremnet the refcount of 'value' as
52 * the function takes care of it if needed. */
53 void listTypePush(robj *subject, robj *value, int where) {
54 /* Check if we need to convert the ziplist */
55 listTypeTryConversion(subject,value);
56 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
57 ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
58 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
59
60 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
61 int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
62 value = getDecodedObject(value);
63 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
64 decrRefCount(value);
65 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
66 if (where == REDIS_HEAD) {
67 listAddNodeHead(subject->ptr,value);
68 } else {
69 listAddNodeTail(subject->ptr,value);
70 }
71 incrRefCount(value);
72 } else {
73 redisPanic("Unknown list encoding");
74 }
75 }
76
77 robj *listTypePop(robj *subject, int where) {
78 robj *value = NULL;
79 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
80 unsigned char *p;
81 unsigned char *vstr;
82 unsigned int vlen;
83 long long vlong;
84 int pos = (where == REDIS_HEAD) ? 0 : -1;
85 p = ziplistIndex(subject->ptr,pos);
86 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
87 if (vstr) {
88 value = createStringObject((char*)vstr,vlen);
89 } else {
90 value = createStringObjectFromLongLong(vlong);
91 }
92 /* We only need to delete an element when it exists */
93 subject->ptr = ziplistDelete(subject->ptr,&p);
94 }
95 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
96 list *list = subject->ptr;
97 listNode *ln;
98 if (where == REDIS_HEAD) {
99 ln = listFirst(list);
100 } else {
101 ln = listLast(list);
102 }
103 if (ln != NULL) {
104 value = listNodeValue(ln);
105 incrRefCount(value);
106 listDelNode(list,ln);
107 }
108 } else {
109 redisPanic("Unknown list encoding");
110 }
111 return value;
112 }
113
114 unsigned long listTypeLength(robj *subject) {
115 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
116 return ziplistLen(subject->ptr);
117 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
118 return listLength((list*)subject->ptr);
119 } else {
120 redisPanic("Unknown list encoding");
121 }
122 }
123
124 /* Initialize an iterator at the specified index. */
125 listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) {
126 listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
127 li->subject = subject;
128 li->encoding = subject->encoding;
129 li->direction = direction;
130 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
131 li->zi = ziplistIndex(subject->ptr,index);
132 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
133 li->ln = listIndex(subject->ptr,index);
134 } else {
135 redisPanic("Unknown list encoding");
136 }
137 return li;
138 }
139
140 /* Clean up the iterator. */
141 void listTypeReleaseIterator(listTypeIterator *li) {
142 zfree(li);
143 }
144
145 /* Stores pointer to current the entry in the provided entry structure
146 * and advances the position of the iterator. Returns 1 when the current
147 * entry is in fact an entry, 0 otherwise. */
148 int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
149 /* Protect from converting when iterating */
150 redisAssert(li->subject->encoding == li->encoding);
151
152 entry->li = li;
153 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
154 entry->zi = li->zi;
155 if (entry->zi != NULL) {
156 if (li->direction == REDIS_TAIL)
157 li->zi = ziplistNext(li->subject->ptr,li->zi);
158 else
159 li->zi = ziplistPrev(li->subject->ptr,li->zi);
160 return 1;
161 }
162 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
163 entry->ln = li->ln;
164 if (entry->ln != NULL) {
165 if (li->direction == REDIS_TAIL)
166 li->ln = li->ln->next;
167 else
168 li->ln = li->ln->prev;
169 return 1;
170 }
171 } else {
172 redisPanic("Unknown list encoding");
173 }
174 return 0;
175 }
176
177 /* Return entry or NULL at the current position of the iterator. */
178 robj *listTypeGet(listTypeEntry *entry) {
179 listTypeIterator *li = entry->li;
180 robj *value = NULL;
181 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
182 unsigned char *vstr;
183 unsigned int vlen;
184 long long vlong;
185 redisAssert(entry->zi != NULL);
186 if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
187 if (vstr) {
188 value = createStringObject((char*)vstr,vlen);
189 } else {
190 value = createStringObjectFromLongLong(vlong);
191 }
192 }
193 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
194 redisAssert(entry->ln != NULL);
195 value = listNodeValue(entry->ln);
196 incrRefCount(value);
197 } else {
198 redisPanic("Unknown list encoding");
199 }
200 return value;
201 }
202
203 void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
204 robj *subject = entry->li->subject;
205 if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
206 value = getDecodedObject(value);
207 if (where == REDIS_TAIL) {
208 unsigned char *next = ziplistNext(subject->ptr,entry->zi);
209
210 /* When we insert after the current element, but the current element
211 * is the tail of the list, we need to do a push. */
212 if (next == NULL) {
213 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
214 } else {
215 subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
216 }
217 } else {
218 subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
219 }
220 decrRefCount(value);
221 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
222 if (where == REDIS_TAIL) {
223 listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
224 } else {
225 listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
226 }
227 incrRefCount(value);
228 } else {
229 redisPanic("Unknown list encoding");
230 }
231 }
232
233 /* Compare the given object with the entry at the current position. */
234 int listTypeEqual(listTypeEntry *entry, robj *o) {
235 listTypeIterator *li = entry->li;
236 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
237 redisAssertWithInfo(NULL,o,o->encoding == REDIS_ENCODING_RAW);
238 return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
239 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
240 return equalStringObjects(o,listNodeValue(entry->ln));
241 } else {
242 redisPanic("Unknown list encoding");
243 }
244 }
245
246 /* Delete the element pointed to. */
247 void listTypeDelete(listTypeEntry *entry) {
248 listTypeIterator *li = entry->li;
249 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
250 unsigned char *p = entry->zi;
251 li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
252
253 /* Update position of the iterator depending on the direction */
254 if (li->direction == REDIS_TAIL)
255 li->zi = p;
256 else
257 li->zi = ziplistPrev(li->subject->ptr,p);
258 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
259 listNode *next;
260 if (li->direction == REDIS_TAIL)
261 next = entry->ln->next;
262 else
263 next = entry->ln->prev;
264 listDelNode(li->subject->ptr,entry->ln);
265 li->ln = next;
266 } else {
267 redisPanic("Unknown list encoding");
268 }
269 }
270
271 void listTypeConvert(robj *subject, int enc) {
272 listTypeIterator *li;
273 listTypeEntry entry;
274 redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
275
276 if (enc == REDIS_ENCODING_LINKEDLIST) {
277 list *l = listCreate();
278 listSetFreeMethod(l,decrRefCount);
279
280 /* listTypeGet returns a robj with incremented refcount */
281 li = listTypeInitIterator(subject,0,REDIS_TAIL);
282 while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
283 listTypeReleaseIterator(li);
284
285 subject->encoding = REDIS_ENCODING_LINKEDLIST;
286 zfree(subject->ptr);
287 subject->ptr = l;
288 } else {
289 redisPanic("Unsupported list conversion");
290 }
291 }
292
293 /*-----------------------------------------------------------------------------
294 * List Commands
295 *----------------------------------------------------------------------------*/
296
297 void pushGenericCommand(redisClient *c, int where) {
298 int j, waiting = 0, pushed = 0;
299 robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
300 int may_have_waiting_clients = (lobj == NULL);
301
302 if (lobj && lobj->type != REDIS_LIST) {
303 addReply(c,shared.wrongtypeerr);
304 return;
305 }
306
307 if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
308
309 for (j = 2; j < c->argc; j++) {
310 c->argv[j] = tryObjectEncoding(c->argv[j]);
311 if (!lobj) {
312 lobj = createZiplistObject();
313 dbAdd(c->db,c->argv[1],lobj);
314 }
315 listTypePush(lobj,c->argv[j],where);
316 pushed++;
317 }
318 addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
319 if (pushed) signalModifiedKey(c->db,c->argv[1]);
320 server.dirty += pushed;
321 }
322
323 void lpushCommand(redisClient *c) {
324 pushGenericCommand(c,REDIS_HEAD);
325 }
326
327 void rpushCommand(redisClient *c) {
328 pushGenericCommand(c,REDIS_TAIL);
329 }
330
331 void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
332 robj *subject;
333 listTypeIterator *iter;
334 listTypeEntry entry;
335 int inserted = 0;
336
337 if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
338 checkType(c,subject,REDIS_LIST)) return;
339
340 if (refval != NULL) {
341 /* Note: we expect refval to be string-encoded because it is *not* the
342 * last argument of the multi-bulk LINSERT. */
343 redisAssertWithInfo(c,refval,refval->encoding == REDIS_ENCODING_RAW);
344
345 /* We're not sure if this value can be inserted yet, but we cannot
346 * convert the list inside the iterator. We don't want to loop over
347 * the list twice (once to see if the value can be inserted and once
348 * to do the actual insert), so we assume this value can be inserted
349 * and convert the ziplist to a regular list if necessary. */
350 listTypeTryConversion(subject,val);
351
352 /* Seek refval from head to tail */
353 iter = listTypeInitIterator(subject,0,REDIS_TAIL);
354 while (listTypeNext(iter,&entry)) {
355 if (listTypeEqual(&entry,refval)) {
356 listTypeInsert(&entry,val,where);
357 inserted = 1;
358 break;
359 }
360 }
361 listTypeReleaseIterator(iter);
362
363 if (inserted) {
364 /* Check if the length exceeds the ziplist length threshold. */
365 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
366 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
367 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
368 signalModifiedKey(c->db,c->argv[1]);
369 server.dirty++;
370 } else {
371 /* Notify client of a failed insert */
372 addReply(c,shared.cnegone);
373 return;
374 }
375 } else {
376 listTypePush(subject,val,where);
377 signalModifiedKey(c->db,c->argv[1]);
378 server.dirty++;
379 }
380
381 addReplyLongLong(c,listTypeLength(subject));
382 }
383
384 void lpushxCommand(redisClient *c) {
385 c->argv[2] = tryObjectEncoding(c->argv[2]);
386 pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
387 }
388
389 void rpushxCommand(redisClient *c) {
390 c->argv[2] = tryObjectEncoding(c->argv[2]);
391 pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
392 }
393
394 void linsertCommand(redisClient *c) {
395 c->argv[4] = tryObjectEncoding(c->argv[4]);
396 if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
397 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
398 } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
399 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD);
400 } else {
401 addReply(c,shared.syntaxerr);
402 }
403 }
404
405 void llenCommand(redisClient *c) {
406 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
407 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
408 addReplyLongLong(c,listTypeLength(o));
409 }
410
411 void lindexCommand(redisClient *c) {
412 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
413 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
414 long index;
415 robj *value = NULL;
416
417 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
418 return;
419
420 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
421 unsigned char *p;
422 unsigned char *vstr;
423 unsigned int vlen;
424 long long vlong;
425 p = ziplistIndex(o->ptr,index);
426 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
427 if (vstr) {
428 value = createStringObject((char*)vstr,vlen);
429 } else {
430 value = createStringObjectFromLongLong(vlong);
431 }
432 addReplyBulk(c,value);
433 decrRefCount(value);
434 } else {
435 addReply(c,shared.nullbulk);
436 }
437 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
438 listNode *ln = listIndex(o->ptr,index);
439 if (ln != NULL) {
440 value = listNodeValue(ln);
441 addReplyBulk(c,value);
442 } else {
443 addReply(c,shared.nullbulk);
444 }
445 } else {
446 redisPanic("Unknown list encoding");
447 }
448 }
449
450 void lsetCommand(redisClient *c) {
451 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
452 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
453 long index;
454 robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
455
456 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
457 return;
458
459 listTypeTryConversion(o,value);
460 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
461 unsigned char *p, *zl = o->ptr;
462 p = ziplistIndex(zl,index);
463 if (p == NULL) {
464 addReply(c,shared.outofrangeerr);
465 } else {
466 o->ptr = ziplistDelete(o->ptr,&p);
467 value = getDecodedObject(value);
468 o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
469 decrRefCount(value);
470 addReply(c,shared.ok);
471 signalModifiedKey(c->db,c->argv[1]);
472 server.dirty++;
473 }
474 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
475 listNode *ln = listIndex(o->ptr,index);
476 if (ln == NULL) {
477 addReply(c,shared.outofrangeerr);
478 } else {
479 decrRefCount((robj*)listNodeValue(ln));
480 listNodeValue(ln) = value;
481 incrRefCount(value);
482 addReply(c,shared.ok);
483 signalModifiedKey(c->db,c->argv[1]);
484 server.dirty++;
485 }
486 } else {
487 redisPanic("Unknown list encoding");
488 }
489 }
490
491 void popGenericCommand(redisClient *c, int where) {
492 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
493 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
494
495 robj *value = listTypePop(o,where);
496 if (value == NULL) {
497 addReply(c,shared.nullbulk);
498 } else {
499 addReplyBulk(c,value);
500 decrRefCount(value);
501 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
502 signalModifiedKey(c->db,c->argv[1]);
503 server.dirty++;
504 }
505 }
506
507 void lpopCommand(redisClient *c) {
508 popGenericCommand(c,REDIS_HEAD);
509 }
510
511 void rpopCommand(redisClient *c) {
512 popGenericCommand(c,REDIS_TAIL);
513 }
514
515 void lrangeCommand(redisClient *c) {
516 robj *o;
517 long start, end, llen, rangelen;
518
519 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
520 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
521
522 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
523 || checkType(c,o,REDIS_LIST)) return;
524 llen = listTypeLength(o);
525
526 /* convert negative indexes */
527 if (start < 0) start = llen+start;
528 if (end < 0) end = llen+end;
529 if (start < 0) start = 0;
530
531 /* Invariant: start >= 0, so this test will be true when end < 0.
532 * The range is empty when start > end or start >= length. */
533 if (start > end || start >= llen) {
534 addReply(c,shared.emptymultibulk);
535 return;
536 }
537 if (end >= llen) end = llen-1;
538 rangelen = (end-start)+1;
539
540 /* Return the result in form of a multi-bulk reply */
541 addReplyMultiBulkLen(c,rangelen);
542 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
543 unsigned char *p = ziplistIndex(o->ptr,start);
544 unsigned char *vstr;
545 unsigned int vlen;
546 long long vlong;
547
548 while(rangelen--) {
549 ziplistGet(p,&vstr,&vlen,&vlong);
550 if (vstr) {
551 addReplyBulkCBuffer(c,vstr,vlen);
552 } else {
553 addReplyBulkLongLong(c,vlong);
554 }
555 p = ziplistNext(o->ptr,p);
556 }
557 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
558 listNode *ln;
559
560 /* If we are nearest to the end of the list, reach the element
561 * starting from tail and going backward, as it is faster. */
562 if (start > llen/2) start -= llen;
563 ln = listIndex(o->ptr,start);
564
565 while(rangelen--) {
566 addReplyBulk(c,ln->value);
567 ln = ln->next;
568 }
569 } else {
570 redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
571 }
572 }
573
574 void ltrimCommand(redisClient *c) {
575 robj *o;
576 long start, end, llen, j, ltrim, rtrim;
577 list *list;
578 listNode *ln;
579
580 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
581 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
582
583 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
584 checkType(c,o,REDIS_LIST)) return;
585 llen = listTypeLength(o);
586
587 /* convert negative indexes */
588 if (start < 0) start = llen+start;
589 if (end < 0) end = llen+end;
590 if (start < 0) start = 0;
591
592 /* Invariant: start >= 0, so this test will be true when end < 0.
593 * The range is empty when start > end or start >= length. */
594 if (start > end || start >= llen) {
595 /* Out of range start or start > end result in empty list */
596 ltrim = llen;
597 rtrim = 0;
598 } else {
599 if (end >= llen) end = llen-1;
600 ltrim = start;
601 rtrim = llen-end-1;
602 }
603
604 /* Remove list elements to perform the trim */
605 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
606 o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
607 o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
608 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
609 list = o->ptr;
610 for (j = 0; j < ltrim; j++) {
611 ln = listFirst(list);
612 listDelNode(list,ln);
613 }
614 for (j = 0; j < rtrim; j++) {
615 ln = listLast(list);
616 listDelNode(list,ln);
617 }
618 } else {
619 redisPanic("Unknown list encoding");
620 }
621 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
622 signalModifiedKey(c->db,c->argv[1]);
623 server.dirty++;
624 addReply(c,shared.ok);
625 }
626
627 void lremCommand(redisClient *c) {
628 robj *subject, *obj;
629 obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
630 long toremove;
631 long removed = 0;
632 listTypeEntry entry;
633
634 if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
635 return;
636
637 subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
638 if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
639
640 /* Make sure obj is raw when we're dealing with a ziplist */
641 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
642 obj = getDecodedObject(obj);
643
644 listTypeIterator *li;
645 if (toremove < 0) {
646 toremove = -toremove;
647 li = listTypeInitIterator(subject,-1,REDIS_HEAD);
648 } else {
649 li = listTypeInitIterator(subject,0,REDIS_TAIL);
650 }
651
652 while (listTypeNext(li,&entry)) {
653 if (listTypeEqual(&entry,obj)) {
654 listTypeDelete(&entry);
655 server.dirty++;
656 removed++;
657 if (toremove && removed == toremove) break;
658 }
659 }
660 listTypeReleaseIterator(li);
661
662 /* Clean up raw encoded object */
663 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
664 decrRefCount(obj);
665
666 if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
667 addReplyLongLong(c,removed);
668 if (removed) signalModifiedKey(c->db,c->argv[1]);
669 }
670
671 /* This is the semantic of this command:
672 * RPOPLPUSH srclist dstlist:
673 * IF LLEN(srclist) > 0
674 * element = RPOP srclist
675 * LPUSH dstlist element
676 * RETURN element
677 * ELSE
678 * RETURN nil
679 * END
680 * END
681 *
682 * The idea is to be able to get an element from a list in a reliable way
683 * since the element is not just returned but pushed against another list
684 * as well. This command was originally proposed by Ezra Zygmuntowicz.
685 */
686
687 void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
688 /* Create the list if the key does not exist */
689 if (!dstobj) {
690 dstobj = createZiplistObject();
691 dbAdd(c->db,dstkey,dstobj);
692 signalListAsReady(c,dstkey);
693 }
694 signalModifiedKey(c->db,dstkey);
695 listTypePush(dstobj,value,REDIS_HEAD);
696 /* Always send the pushed value to the client. */
697 addReplyBulk(c,value);
698 }
699
700 void rpoplpushCommand(redisClient *c) {
701 robj *sobj, *value;
702 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
703 checkType(c,sobj,REDIS_LIST)) return;
704
705 if (listTypeLength(sobj) == 0) {
706 /* This may only happen after loading very old RDB files. Recent
707 * versions of Redis delete keys of empty lists. */
708 addReply(c,shared.nullbulk);
709 } else {
710 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
711 robj *touchedkey = c->argv[1];
712
713 if (dobj && checkType(c,dobj,REDIS_LIST)) return;
714 value = listTypePop(sobj,REDIS_TAIL);
715 /* We saved touched key, and protect it, since rpoplpushHandlePush
716 * may change the client command argument vector (it does not
717 * currently). */
718 incrRefCount(touchedkey);
719 rpoplpushHandlePush(c,c->argv[2],dobj,value);
720
721 /* listTypePop returns an object with its refcount incremented */
722 decrRefCount(value);
723
724 /* Delete the source list when it is empty */
725 if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
726 signalModifiedKey(c->db,touchedkey);
727 decrRefCount(touchedkey);
728 server.dirty++;
729 }
730 }
731
732 /*-----------------------------------------------------------------------------
733 * Blocking POP operations
734 *----------------------------------------------------------------------------*/
735
736 /* This is how the current blocking POP works, we use BLPOP as example:
737 * - If the user calls BLPOP and the key exists and contains a non empty list
738 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
739 * if blocking is not required.
740 * - If instead BLPOP is called and the key does not exists or the list is
741 * empty we need to block. In order to do so we remove the notification for
742 * new data to read in the client socket (so that we'll not serve new
743 * requests if the blocking request is not served). Also we put the client
744 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
745 * blocking for this keys.
746 * - If a PUSH operation against a key with blocked clients waiting is
747 * performed, we mark this key as "ready", and after the current command,
748 * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
749 * for this list, from the one that blocked first, to the last, accordingly
750 * to the number of elements we have in the ready list.
751 */
752
753 /* Set a client in blocking mode for the specified key, with the specified
754 * timeout */
755 void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
756 dictEntry *de;
757 list *l;
758 int j;
759
760 c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
761 c->bpop.count = numkeys;
762 c->bpop.timeout = timeout;
763 c->bpop.target = target;
764
765 if (target != NULL) {
766 incrRefCount(target);
767 }
768
769 for (j = 0; j < numkeys; j++) {
770 /* Add the key in the client structure, to map clients -> keys */
771 c->bpop.keys[j] = keys[j];
772 incrRefCount(keys[j]);
773
774 /* And in the other "side", to map keys -> clients */
775 de = dictFind(c->db->blocking_keys,keys[j]);
776 if (de == NULL) {
777 int retval;
778
779 /* For every key we take a list of clients blocked for it */
780 l = listCreate();
781 retval = dictAdd(c->db->blocking_keys,keys[j],l);
782 incrRefCount(keys[j]);
783 redisAssertWithInfo(c,keys[j],retval == DICT_OK);
784 } else {
785 l = dictGetVal(de);
786 }
787 listAddNodeTail(l,c);
788 }
789 /* Mark the client as a blocked client */
790 c->flags |= REDIS_BLOCKED;
791 server.bpop_blocked_clients++;
792 }
793
794 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
795 void unblockClientWaitingData(redisClient *c) {
796 dictEntry *de;
797 list *l;
798 int j;
799
800 redisAssertWithInfo(c,NULL,c->bpop.keys != NULL);
801 /* The client may wait for multiple keys, so unblock it for every key. */
802 for (j = 0; j < c->bpop.count; j++) {
803 /* Remove this client from the list of clients waiting for this key. */
804 de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
805 redisAssertWithInfo(c,c->bpop.keys[j],de != NULL);
806 l = dictGetVal(de);
807 listDelNode(l,listSearchKey(l,c));
808 /* If the list is empty we need to remove it to avoid wasting memory */
809 if (listLength(l) == 0)
810 dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
811 decrRefCount(c->bpop.keys[j]);
812 }
813
814 /* Cleanup the client structure */
815 zfree(c->bpop.keys);
816 c->bpop.keys = NULL;
817 if (c->bpop.target) decrRefCount(c->bpop.target);
818 c->bpop.target = NULL;
819 c->flags &= ~REDIS_BLOCKED;
820 c->flags |= REDIS_UNBLOCKED;
821 server.bpop_blocked_clients--;
822 listAddNodeTail(server.unblocked_clients,c);
823 }
824
825 /* If the specified key has clients blocked waiting for list pushes, this
826 * function will put the key reference into the server.ready_keys list.
827 * Note that db->ready_keys is an hash table that allows us to avoid putting
828 * the same key agains and again in the list in case of multiple pushes
829 * made by a script or in the context of MULTI/EXEC.
830 *
831 * The list will be finally processed by handleClientsBlockedOnLists() */
832 void signalListAsReady(redisClient *c, robj *key) {
833 readyList *rl;
834
835 /* No clients blocking for this key? No need to queue it. */
836 if (dictFind(c->db->blocking_keys,key) == NULL) return;
837
838 /* Key was already signaled? No need to queue it again. */
839 if (dictFind(c->db->ready_keys,key) != NULL) return;
840
841 /* Ok, we need to queue this key into server.ready_keys. */
842 rl = zmalloc(sizeof(*rl));
843 rl->key = key;
844 rl->db = c->db;
845 incrRefCount(key);
846 listAddNodeTail(server.ready_keys,rl);
847
848 /* We also add the key in the db->ready_keys dictionary in order
849 * to avoid adding it multiple times into a list with a simple O(1)
850 * check. */
851 incrRefCount(key);
852 redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
853 }
854
855 /* This is an helper function for handleClientsBlockedOnLists(). It's work
856 * is to serve a specific client (receiver) that is blocked on 'key'
857 * in the context of the specified 'db', doing the following:
858 *
859 * 1) Provide the client with the 'value' element.
860 * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
861 * 'value' element on the destionation list (the LPUSH side of the command).
862 * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
863 * the AOF and replication channel.
864 *
865 * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
866 * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
867 * we can propagate the command properly.
868 *
869 * The function returns REDIS_OK if we are able to serve the client, otherwise
870 * REDIS_ERR is returned to signal the caller that the list POP operation
871 * should be undoed as the client was not served: This only happens for
872 * BRPOPLPUSH that fails to push the value to the destination key as it is
873 * of the wrong type. */
874 int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
875 {
876 robj *argv[3];
877
878 if (dstkey == NULL) {
879 /* Propagate the [LR]POP operation. */
880 argv[0] = (where == REDIS_HEAD) ? shared.lpop :
881 shared.rpop;
882 argv[1] = key;
883 propagate((where == REDIS_HEAD) ?
884 server.lpopCommand : server.rpopCommand,
885 db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
886
887 /* BRPOP/BLPOP */
888 addReplyMultiBulkLen(receiver,2);
889 addReplyBulk(receiver,key);
890 addReplyBulk(receiver,value);
891 } else {
892 /* BRPOPLPUSH */
893 robj *dstobj =
894 lookupKeyWrite(receiver->db,dstkey);
895 if (!(dstobj &&
896 checkType(receiver,dstobj,REDIS_LIST)))
897 {
898 /* Propagate the RPOP operation. */
899 argv[0] = shared.rpop;
900 argv[1] = key;
901 propagate(server.rpopCommand,
902 db->id,argv,2,
903 REDIS_PROPAGATE_AOF|
904 REDIS_PROPAGATE_REPL);
905 rpoplpushHandlePush(receiver,dstkey,dstobj,
906 value);
907 /* Propagate the LPUSH operation. */
908 argv[0] = shared.lpush;
909 argv[1] = dstkey;
910 argv[2] = value;
911 propagate(server.lpushCommand,
912 db->id,argv,3,
913 REDIS_PROPAGATE_AOF|
914 REDIS_PROPAGATE_REPL);
915 } else {
916 /* BRPOPLPUSH failed because of wrong
917 * destination type. */
918 return REDIS_ERR;
919 }
920 }
921 return REDIS_OK;
922 }
923
924 /* This function should be called by Redis every time a single command,
925 * a MULTI/EXEC block, or a Lua script, terminated its execution after
926 * being called by a client.
927 *
928 * All the keys with at least one client blocked that received at least
929 * one new element via some PUSH operation are accumulated into
930 * the server.ready_keys list. This function will run the list and will
931 * serve clients accordingly. Note that the function will iterate again and
932 * again as a result of serving BRPOPLPUSH we can have new blocking clients
933 * to serve because of the PUSH side of BRPOPLPUSH. */
934 void handleClientsBlockedOnLists(void) {
935 while(listLength(server.ready_keys) != 0) {
936 list *l;
937
938 /* Point server.ready_keys to a fresh list and save the current one
939 * locally. This way as we run the old list we are free to call
940 * signalListAsReady() that may push new elements in server.ready_keys
941 * when handling clients blocked into BRPOPLPUSH. */
942 l = server.ready_keys;
943 server.ready_keys = listCreate();
944
945 while(listLength(l) != 0) {
946 listNode *ln = listFirst(l);
947 readyList *rl = ln->value;
948
949 /* First of all remove this key from db->ready_keys so that
950 * we can safely call signalListAsReady() against this key. */
951 dictDelete(rl->db->ready_keys,rl->key);
952
953 /* If the key exists and it's a list, serve blocked clients
954 * with data. */
955 robj *o = lookupKeyWrite(rl->db,rl->key);
956 if (o != NULL && o->type == REDIS_LIST) {
957 dictEntry *de;
958
959 /* We serve clients in the same order they blocked for
960 * this key, from the first blocked to the last. */
961 de = dictFind(rl->db->blocking_keys,rl->key);
962 if (de) {
963 list *clients = dictGetVal(de);
964 int numclients = listLength(clients);
965
966 while(numclients--) {
967 listNode *clientnode = listFirst(clients);
968 redisClient *receiver = clientnode->value;
969 robj *dstkey = receiver->bpop.target;
970 int where = (receiver->lastcmd &&
971 receiver->lastcmd->proc == blpopCommand) ?
972 REDIS_HEAD : REDIS_TAIL;
973 robj *value = listTypePop(o,where);
974
975 if (value) {
976 /* Protect receiver->bpop.target, that will be
977 * freed by the next unblockClientWaitingData()
978 * call. */
979 if (dstkey) incrRefCount(dstkey);
980 unblockClientWaitingData(receiver);
981
982 if (serveClientBlockedOnList(receiver,
983 rl->key,dstkey,rl->db,value,
984 where) == REDIS_ERR)
985 {
986 /* If we failed serving the client we need
987 * to also undo the POP operation. */
988 listTypePush(o,value,where);
989 }
990
991 if (dstkey) decrRefCount(dstkey);
992 decrRefCount(value);
993 } else {
994 break;
995 }
996 }
997 }
998
999 if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
1000 /* We don't call signalModifiedKey() as it was already called
1001 * when an element was pushed on the list. */
1002 }
1003
1004 /* Free this item. */
1005 decrRefCount(rl->key);
1006 zfree(rl);
1007 listDelNode(l,ln);
1008 }
1009 listRelease(l); /* We have the new list on place at this point. */
1010 }
1011 }
1012
1013 int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
1014 long tval;
1015
1016 if (getLongFromObjectOrReply(c,object,&tval,
1017 "timeout is not an integer or out of range") != REDIS_OK)
1018 return REDIS_ERR;
1019
1020 if (tval < 0) {
1021 addReplyError(c,"timeout is negative");
1022 return REDIS_ERR;
1023 }
1024
1025 if (tval > 0) tval += server.unixtime;
1026 *timeout = tval;
1027
1028 return REDIS_OK;
1029 }
1030
1031 /* Blocking RPOP/LPOP */
1032 void blockingPopGenericCommand(redisClient *c, int where) {
1033 robj *o;
1034 time_t timeout;
1035 int j;
1036
1037 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
1038 return;
1039
1040 for (j = 1; j < c->argc-1; j++) {
1041 o = lookupKeyWrite(c->db,c->argv[j]);
1042 if (o != NULL) {
1043 if (o->type != REDIS_LIST) {
1044 addReply(c,shared.wrongtypeerr);
1045 return;
1046 } else {
1047 if (listTypeLength(o) != 0) {
1048 /* Non empty list, this is like a non normal [LR]POP. */
1049 robj *value = listTypePop(o,where);
1050 redisAssert(value != NULL);
1051
1052 addReplyMultiBulkLen(c,2);
1053 addReplyBulk(c,c->argv[j]);
1054 addReplyBulk(c,value);
1055 decrRefCount(value);
1056 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[j]);
1057 signalModifiedKey(c->db,c->argv[j]);
1058 server.dirty++;
1059
1060 /* Replicate it as an [LR]POP instead of B[LR]POP. */
1061 rewriteClientCommandVector(c,2,
1062 (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
1063 c->argv[j]);
1064 return;
1065 }
1066 }
1067 }
1068 }
1069
1070 /* If we are inside a MULTI/EXEC and the list is empty the only thing
1071 * we can do is treating it as a timeout (even with timeout 0). */
1072 if (c->flags & REDIS_MULTI) {
1073 addReply(c,shared.nullmultibulk);
1074 return;
1075 }
1076
1077 /* If the list is empty or the key does not exists we must block */
1078 blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
1079 }
1080
1081 void blpopCommand(redisClient *c) {
1082 blockingPopGenericCommand(c,REDIS_HEAD);
1083 }
1084
1085 void brpopCommand(redisClient *c) {
1086 blockingPopGenericCommand(c,REDIS_TAIL);
1087 }
1088
1089 void brpoplpushCommand(redisClient *c) {
1090 time_t timeout;
1091
1092 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
1093 return;
1094
1095 robj *key = lookupKeyWrite(c->db, c->argv[1]);
1096
1097 if (key == NULL) {
1098 if (c->flags & REDIS_MULTI) {
1099 /* Blocking against an empty list in a multi state
1100 * returns immediately. */
1101 addReply(c, shared.nullbulk);
1102 } else {
1103 /* The list is empty and the client blocks. */
1104 blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
1105 }
1106 } else {
1107 if (key->type != REDIS_LIST) {
1108 addReply(c, shared.wrongtypeerr);
1109 } else {
1110 /* The list exists and has elements, so
1111 * the regular rpoplpushCommand is executed. */
1112 redisAssertWithInfo(c,key,listTypeLength(key) > 0);
1113 rpoplpushCommand(c);
1114 }
1115 }
1116 }