]> git.saurik.com Git - redis.git/blob - src/t_list.c
Client should not block multiple times on the same key.
[redis.git] / src / t_list.c
1 /*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
30 #include "redis.h"
31
32 void signalListAsReady(redisClient *c, robj *key);
33
34 /*-----------------------------------------------------------------------------
35 * List API
36 *----------------------------------------------------------------------------*/
37
38 /* Check the argument length to see if it requires us to convert the ziplist
39 * to a real list. Only check raw-encoded objects because integer encoded
40 * objects are never too long. */
41 void listTypeTryConversion(robj *subject, robj *value) {
42 if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
43 if (value->encoding == REDIS_ENCODING_RAW &&
44 sdslen(value->ptr) > server.list_max_ziplist_value)
45 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
46 }
47
48 /* The function pushes an elmenet to the specified list object 'subject',
49 * at head or tail position as specified by 'where'.
50 *
51 * There is no need for the caller to incremnet the refcount of 'value' as
52 * the function takes care of it if needed. */
53 void listTypePush(robj *subject, robj *value, int where) {
54 /* Check if we need to convert the ziplist */
55 listTypeTryConversion(subject,value);
56 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
57 ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
58 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
59
60 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
61 int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
62 value = getDecodedObject(value);
63 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
64 decrRefCount(value);
65 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
66 if (where == REDIS_HEAD) {
67 listAddNodeHead(subject->ptr,value);
68 } else {
69 listAddNodeTail(subject->ptr,value);
70 }
71 incrRefCount(value);
72 } else {
73 redisPanic("Unknown list encoding");
74 }
75 }
76
77 robj *listTypePop(robj *subject, int where) {
78 robj *value = NULL;
79 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
80 unsigned char *p;
81 unsigned char *vstr;
82 unsigned int vlen;
83 long long vlong;
84 int pos = (where == REDIS_HEAD) ? 0 : -1;
85 p = ziplistIndex(subject->ptr,pos);
86 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
87 if (vstr) {
88 value = createStringObject((char*)vstr,vlen);
89 } else {
90 value = createStringObjectFromLongLong(vlong);
91 }
92 /* We only need to delete an element when it exists */
93 subject->ptr = ziplistDelete(subject->ptr,&p);
94 }
95 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
96 list *list = subject->ptr;
97 listNode *ln;
98 if (where == REDIS_HEAD) {
99 ln = listFirst(list);
100 } else {
101 ln = listLast(list);
102 }
103 if (ln != NULL) {
104 value = listNodeValue(ln);
105 incrRefCount(value);
106 listDelNode(list,ln);
107 }
108 } else {
109 redisPanic("Unknown list encoding");
110 }
111 return value;
112 }
113
114 unsigned long listTypeLength(robj *subject) {
115 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
116 return ziplistLen(subject->ptr);
117 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
118 return listLength((list*)subject->ptr);
119 } else {
120 redisPanic("Unknown list encoding");
121 }
122 }
123
124 /* Initialize an iterator at the specified index. */
125 listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) {
126 listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
127 li->subject = subject;
128 li->encoding = subject->encoding;
129 li->direction = direction;
130 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
131 li->zi = ziplistIndex(subject->ptr,index);
132 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
133 li->ln = listIndex(subject->ptr,index);
134 } else {
135 redisPanic("Unknown list encoding");
136 }
137 return li;
138 }
139
140 /* Clean up the iterator. */
141 void listTypeReleaseIterator(listTypeIterator *li) {
142 zfree(li);
143 }
144
145 /* Stores pointer to current the entry in the provided entry structure
146 * and advances the position of the iterator. Returns 1 when the current
147 * entry is in fact an entry, 0 otherwise. */
148 int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
149 /* Protect from converting when iterating */
150 redisAssert(li->subject->encoding == li->encoding);
151
152 entry->li = li;
153 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
154 entry->zi = li->zi;
155 if (entry->zi != NULL) {
156 if (li->direction == REDIS_TAIL)
157 li->zi = ziplistNext(li->subject->ptr,li->zi);
158 else
159 li->zi = ziplistPrev(li->subject->ptr,li->zi);
160 return 1;
161 }
162 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
163 entry->ln = li->ln;
164 if (entry->ln != NULL) {
165 if (li->direction == REDIS_TAIL)
166 li->ln = li->ln->next;
167 else
168 li->ln = li->ln->prev;
169 return 1;
170 }
171 } else {
172 redisPanic("Unknown list encoding");
173 }
174 return 0;
175 }
176
177 /* Return entry or NULL at the current position of the iterator. */
178 robj *listTypeGet(listTypeEntry *entry) {
179 listTypeIterator *li = entry->li;
180 robj *value = NULL;
181 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
182 unsigned char *vstr;
183 unsigned int vlen;
184 long long vlong;
185 redisAssert(entry->zi != NULL);
186 if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
187 if (vstr) {
188 value = createStringObject((char*)vstr,vlen);
189 } else {
190 value = createStringObjectFromLongLong(vlong);
191 }
192 }
193 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
194 redisAssert(entry->ln != NULL);
195 value = listNodeValue(entry->ln);
196 incrRefCount(value);
197 } else {
198 redisPanic("Unknown list encoding");
199 }
200 return value;
201 }
202
203 void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
204 robj *subject = entry->li->subject;
205 if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
206 value = getDecodedObject(value);
207 if (where == REDIS_TAIL) {
208 unsigned char *next = ziplistNext(subject->ptr,entry->zi);
209
210 /* When we insert after the current element, but the current element
211 * is the tail of the list, we need to do a push. */
212 if (next == NULL) {
213 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
214 } else {
215 subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
216 }
217 } else {
218 subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
219 }
220 decrRefCount(value);
221 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
222 if (where == REDIS_TAIL) {
223 listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
224 } else {
225 listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
226 }
227 incrRefCount(value);
228 } else {
229 redisPanic("Unknown list encoding");
230 }
231 }
232
233 /* Compare the given object with the entry at the current position. */
234 int listTypeEqual(listTypeEntry *entry, robj *o) {
235 listTypeIterator *li = entry->li;
236 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
237 redisAssertWithInfo(NULL,o,o->encoding == REDIS_ENCODING_RAW);
238 return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
239 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
240 return equalStringObjects(o,listNodeValue(entry->ln));
241 } else {
242 redisPanic("Unknown list encoding");
243 }
244 }
245
246 /* Delete the element pointed to. */
247 void listTypeDelete(listTypeEntry *entry) {
248 listTypeIterator *li = entry->li;
249 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
250 unsigned char *p = entry->zi;
251 li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
252
253 /* Update position of the iterator depending on the direction */
254 if (li->direction == REDIS_TAIL)
255 li->zi = p;
256 else
257 li->zi = ziplistPrev(li->subject->ptr,p);
258 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
259 listNode *next;
260 if (li->direction == REDIS_TAIL)
261 next = entry->ln->next;
262 else
263 next = entry->ln->prev;
264 listDelNode(li->subject->ptr,entry->ln);
265 li->ln = next;
266 } else {
267 redisPanic("Unknown list encoding");
268 }
269 }
270
271 void listTypeConvert(robj *subject, int enc) {
272 listTypeIterator *li;
273 listTypeEntry entry;
274 redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
275
276 if (enc == REDIS_ENCODING_LINKEDLIST) {
277 list *l = listCreate();
278 listSetFreeMethod(l,decrRefCount);
279
280 /* listTypeGet returns a robj with incremented refcount */
281 li = listTypeInitIterator(subject,0,REDIS_TAIL);
282 while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
283 listTypeReleaseIterator(li);
284
285 subject->encoding = REDIS_ENCODING_LINKEDLIST;
286 zfree(subject->ptr);
287 subject->ptr = l;
288 } else {
289 redisPanic("Unsupported list conversion");
290 }
291 }
292
293 /*-----------------------------------------------------------------------------
294 * List Commands
295 *----------------------------------------------------------------------------*/
296
297 void pushGenericCommand(redisClient *c, int where) {
298 int j, waiting = 0, pushed = 0;
299 robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
300 int may_have_waiting_clients = (lobj == NULL);
301
302 if (lobj && lobj->type != REDIS_LIST) {
303 addReply(c,shared.wrongtypeerr);
304 return;
305 }
306
307 if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
308
309 for (j = 2; j < c->argc; j++) {
310 c->argv[j] = tryObjectEncoding(c->argv[j]);
311 if (!lobj) {
312 lobj = createZiplistObject();
313 dbAdd(c->db,c->argv[1],lobj);
314 }
315 listTypePush(lobj,c->argv[j],where);
316 pushed++;
317 }
318 addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
319 if (pushed) signalModifiedKey(c->db,c->argv[1]);
320 server.dirty += pushed;
321 }
322
323 void lpushCommand(redisClient *c) {
324 pushGenericCommand(c,REDIS_HEAD);
325 }
326
327 void rpushCommand(redisClient *c) {
328 pushGenericCommand(c,REDIS_TAIL);
329 }
330
331 void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
332 robj *subject;
333 listTypeIterator *iter;
334 listTypeEntry entry;
335 int inserted = 0;
336
337 if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
338 checkType(c,subject,REDIS_LIST)) return;
339
340 if (refval != NULL) {
341 /* Note: we expect refval to be string-encoded because it is *not* the
342 * last argument of the multi-bulk LINSERT. */
343 redisAssertWithInfo(c,refval,refval->encoding == REDIS_ENCODING_RAW);
344
345 /* We're not sure if this value can be inserted yet, but we cannot
346 * convert the list inside the iterator. We don't want to loop over
347 * the list twice (once to see if the value can be inserted and once
348 * to do the actual insert), so we assume this value can be inserted
349 * and convert the ziplist to a regular list if necessary. */
350 listTypeTryConversion(subject,val);
351
352 /* Seek refval from head to tail */
353 iter = listTypeInitIterator(subject,0,REDIS_TAIL);
354 while (listTypeNext(iter,&entry)) {
355 if (listTypeEqual(&entry,refval)) {
356 listTypeInsert(&entry,val,where);
357 inserted = 1;
358 break;
359 }
360 }
361 listTypeReleaseIterator(iter);
362
363 if (inserted) {
364 /* Check if the length exceeds the ziplist length threshold. */
365 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
366 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
367 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
368 signalModifiedKey(c->db,c->argv[1]);
369 server.dirty++;
370 } else {
371 /* Notify client of a failed insert */
372 addReply(c,shared.cnegone);
373 return;
374 }
375 } else {
376 listTypePush(subject,val,where);
377 signalModifiedKey(c->db,c->argv[1]);
378 server.dirty++;
379 }
380
381 addReplyLongLong(c,listTypeLength(subject));
382 }
383
384 void lpushxCommand(redisClient *c) {
385 c->argv[2] = tryObjectEncoding(c->argv[2]);
386 pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
387 }
388
389 void rpushxCommand(redisClient *c) {
390 c->argv[2] = tryObjectEncoding(c->argv[2]);
391 pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
392 }
393
394 void linsertCommand(redisClient *c) {
395 c->argv[4] = tryObjectEncoding(c->argv[4]);
396 if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
397 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
398 } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
399 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD);
400 } else {
401 addReply(c,shared.syntaxerr);
402 }
403 }
404
405 void llenCommand(redisClient *c) {
406 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
407 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
408 addReplyLongLong(c,listTypeLength(o));
409 }
410
411 void lindexCommand(redisClient *c) {
412 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
413 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
414 long index;
415 robj *value = NULL;
416
417 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
418 return;
419
420 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
421 unsigned char *p;
422 unsigned char *vstr;
423 unsigned int vlen;
424 long long vlong;
425 p = ziplistIndex(o->ptr,index);
426 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
427 if (vstr) {
428 value = createStringObject((char*)vstr,vlen);
429 } else {
430 value = createStringObjectFromLongLong(vlong);
431 }
432 addReplyBulk(c,value);
433 decrRefCount(value);
434 } else {
435 addReply(c,shared.nullbulk);
436 }
437 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
438 listNode *ln = listIndex(o->ptr,index);
439 if (ln != NULL) {
440 value = listNodeValue(ln);
441 addReplyBulk(c,value);
442 } else {
443 addReply(c,shared.nullbulk);
444 }
445 } else {
446 redisPanic("Unknown list encoding");
447 }
448 }
449
450 void lsetCommand(redisClient *c) {
451 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
452 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
453 long index;
454 robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
455
456 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
457 return;
458
459 listTypeTryConversion(o,value);
460 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
461 unsigned char *p, *zl = o->ptr;
462 p = ziplistIndex(zl,index);
463 if (p == NULL) {
464 addReply(c,shared.outofrangeerr);
465 } else {
466 o->ptr = ziplistDelete(o->ptr,&p);
467 value = getDecodedObject(value);
468 o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
469 decrRefCount(value);
470 addReply(c,shared.ok);
471 signalModifiedKey(c->db,c->argv[1]);
472 server.dirty++;
473 }
474 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
475 listNode *ln = listIndex(o->ptr,index);
476 if (ln == NULL) {
477 addReply(c,shared.outofrangeerr);
478 } else {
479 decrRefCount((robj*)listNodeValue(ln));
480 listNodeValue(ln) = value;
481 incrRefCount(value);
482 addReply(c,shared.ok);
483 signalModifiedKey(c->db,c->argv[1]);
484 server.dirty++;
485 }
486 } else {
487 redisPanic("Unknown list encoding");
488 }
489 }
490
491 void popGenericCommand(redisClient *c, int where) {
492 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
493 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
494
495 robj *value = listTypePop(o,where);
496 if (value == NULL) {
497 addReply(c,shared.nullbulk);
498 } else {
499 addReplyBulk(c,value);
500 decrRefCount(value);
501 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
502 signalModifiedKey(c->db,c->argv[1]);
503 server.dirty++;
504 }
505 }
506
507 void lpopCommand(redisClient *c) {
508 popGenericCommand(c,REDIS_HEAD);
509 }
510
511 void rpopCommand(redisClient *c) {
512 popGenericCommand(c,REDIS_TAIL);
513 }
514
515 void lrangeCommand(redisClient *c) {
516 robj *o;
517 long start, end, llen, rangelen;
518
519 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
520 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
521
522 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
523 || checkType(c,o,REDIS_LIST)) return;
524 llen = listTypeLength(o);
525
526 /* convert negative indexes */
527 if (start < 0) start = llen+start;
528 if (end < 0) end = llen+end;
529 if (start < 0) start = 0;
530
531 /* Invariant: start >= 0, so this test will be true when end < 0.
532 * The range is empty when start > end or start >= length. */
533 if (start > end || start >= llen) {
534 addReply(c,shared.emptymultibulk);
535 return;
536 }
537 if (end >= llen) end = llen-1;
538 rangelen = (end-start)+1;
539
540 /* Return the result in form of a multi-bulk reply */
541 addReplyMultiBulkLen(c,rangelen);
542 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
543 unsigned char *p = ziplistIndex(o->ptr,start);
544 unsigned char *vstr;
545 unsigned int vlen;
546 long long vlong;
547
548 while(rangelen--) {
549 ziplistGet(p,&vstr,&vlen,&vlong);
550 if (vstr) {
551 addReplyBulkCBuffer(c,vstr,vlen);
552 } else {
553 addReplyBulkLongLong(c,vlong);
554 }
555 p = ziplistNext(o->ptr,p);
556 }
557 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
558 listNode *ln;
559
560 /* If we are nearest to the end of the list, reach the element
561 * starting from tail and going backward, as it is faster. */
562 if (start > llen/2) start -= llen;
563 ln = listIndex(o->ptr,start);
564
565 while(rangelen--) {
566 addReplyBulk(c,ln->value);
567 ln = ln->next;
568 }
569 } else {
570 redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
571 }
572 }
573
574 void ltrimCommand(redisClient *c) {
575 robj *o;
576 long start, end, llen, j, ltrim, rtrim;
577 list *list;
578 listNode *ln;
579
580 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
581 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
582
583 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
584 checkType(c,o,REDIS_LIST)) return;
585 llen = listTypeLength(o);
586
587 /* convert negative indexes */
588 if (start < 0) start = llen+start;
589 if (end < 0) end = llen+end;
590 if (start < 0) start = 0;
591
592 /* Invariant: start >= 0, so this test will be true when end < 0.
593 * The range is empty when start > end or start >= length. */
594 if (start > end || start >= llen) {
595 /* Out of range start or start > end result in empty list */
596 ltrim = llen;
597 rtrim = 0;
598 } else {
599 if (end >= llen) end = llen-1;
600 ltrim = start;
601 rtrim = llen-end-1;
602 }
603
604 /* Remove list elements to perform the trim */
605 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
606 o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
607 o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
608 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
609 list = o->ptr;
610 for (j = 0; j < ltrim; j++) {
611 ln = listFirst(list);
612 listDelNode(list,ln);
613 }
614 for (j = 0; j < rtrim; j++) {
615 ln = listLast(list);
616 listDelNode(list,ln);
617 }
618 } else {
619 redisPanic("Unknown list encoding");
620 }
621 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
622 signalModifiedKey(c->db,c->argv[1]);
623 server.dirty++;
624 addReply(c,shared.ok);
625 }
626
627 void lremCommand(redisClient *c) {
628 robj *subject, *obj;
629 obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
630 long toremove;
631 long removed = 0;
632 listTypeEntry entry;
633
634 if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
635 return;
636
637 subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
638 if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
639
640 /* Make sure obj is raw when we're dealing with a ziplist */
641 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
642 obj = getDecodedObject(obj);
643
644 listTypeIterator *li;
645 if (toremove < 0) {
646 toremove = -toremove;
647 li = listTypeInitIterator(subject,-1,REDIS_HEAD);
648 } else {
649 li = listTypeInitIterator(subject,0,REDIS_TAIL);
650 }
651
652 while (listTypeNext(li,&entry)) {
653 if (listTypeEqual(&entry,obj)) {
654 listTypeDelete(&entry);
655 server.dirty++;
656 removed++;
657 if (toremove && removed == toremove) break;
658 }
659 }
660 listTypeReleaseIterator(li);
661
662 /* Clean up raw encoded object */
663 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
664 decrRefCount(obj);
665
666 if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
667 addReplyLongLong(c,removed);
668 if (removed) signalModifiedKey(c->db,c->argv[1]);
669 }
670
671 /* This is the semantic of this command:
672 * RPOPLPUSH srclist dstlist:
673 * IF LLEN(srclist) > 0
674 * element = RPOP srclist
675 * LPUSH dstlist element
676 * RETURN element
677 * ELSE
678 * RETURN nil
679 * END
680 * END
681 *
682 * The idea is to be able to get an element from a list in a reliable way
683 * since the element is not just returned but pushed against another list
684 * as well. This command was originally proposed by Ezra Zygmuntowicz.
685 */
686
687 void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
688 /* Create the list if the key does not exist */
689 if (!dstobj) {
690 dstobj = createZiplistObject();
691 dbAdd(c->db,dstkey,dstobj);
692 signalListAsReady(c,dstkey);
693 }
694 signalModifiedKey(c->db,dstkey);
695 listTypePush(dstobj,value,REDIS_HEAD);
696 /* Always send the pushed value to the client. */
697 addReplyBulk(c,value);
698 }
699
700 void rpoplpushCommand(redisClient *c) {
701 robj *sobj, *value;
702 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
703 checkType(c,sobj,REDIS_LIST)) return;
704
705 if (listTypeLength(sobj) == 0) {
706 /* This may only happen after loading very old RDB files. Recent
707 * versions of Redis delete keys of empty lists. */
708 addReply(c,shared.nullbulk);
709 } else {
710 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
711 robj *touchedkey = c->argv[1];
712
713 if (dobj && checkType(c,dobj,REDIS_LIST)) return;
714 value = listTypePop(sobj,REDIS_TAIL);
715 /* We saved touched key, and protect it, since rpoplpushHandlePush
716 * may change the client command argument vector (it does not
717 * currently). */
718 incrRefCount(touchedkey);
719 rpoplpushHandlePush(c,c->argv[2],dobj,value);
720
721 /* listTypePop returns an object with its refcount incremented */
722 decrRefCount(value);
723
724 /* Delete the source list when it is empty */
725 if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
726 signalModifiedKey(c->db,touchedkey);
727 decrRefCount(touchedkey);
728 server.dirty++;
729 }
730 }
731
732 /*-----------------------------------------------------------------------------
733 * Blocking POP operations
734 *----------------------------------------------------------------------------*/
735
736 /* This is how the current blocking POP works, we use BLPOP as example:
737 * - If the user calls BLPOP and the key exists and contains a non empty list
738 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
739 * if blocking is not required.
740 * - If instead BLPOP is called and the key does not exists or the list is
741 * empty we need to block. In order to do so we remove the notification for
742 * new data to read in the client socket (so that we'll not serve new
743 * requests if the blocking request is not served). Also we put the client
744 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
745 * blocking for this keys.
746 * - If a PUSH operation against a key with blocked clients waiting is
747 * performed, we mark this key as "ready", and after the current command,
748 * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
749 * for this list, from the one that blocked first, to the last, accordingly
750 * to the number of elements we have in the ready list.
751 */
752
753 /* Set a client in blocking mode for the specified key, with the specified
754 * timeout */
755 void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
756 dict *added;
757 dictEntry *de;
758 list *l;
759 int j, i;
760
761 c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
762 c->bpop.timeout = timeout;
763 c->bpop.target = target;
764
765 if (target != NULL) incrRefCount(target);
766
767 /* Create a dictionary that we use to avoid adding duplicated keys
768 * in case the user calls something like: "BLPOP foo foo foo 0".
769 * The rest of the implementation is simpler if we know there are no
770 * duplications in the key waiting list. */
771 added = dictCreate(&setDictType,NULL);
772
773 i = 0; /* The index for c->bpop.keys[...], we can't use the j loop
774 variable as the list of keys may have duplicated elements. */
775 for (j = 0; j < numkeys; j++) {
776 /* Add the key in the "added" dictionary to make sure there are
777 * no duplicated keys. */
778 if (dictAdd(added,keys[j],NULL) != DICT_OK) continue;
779 incrRefCount(keys[j]);
780
781 /* Add the key in the client structure, to map clients -> keys */
782 c->bpop.keys[i++] = keys[j];
783 incrRefCount(keys[j]);
784
785 /* And in the other "side", to map keys -> clients */
786 de = dictFind(c->db->blocking_keys,keys[j]);
787 if (de == NULL) {
788 int retval;
789
790 /* For every key we take a list of clients blocked for it */
791 l = listCreate();
792 retval = dictAdd(c->db->blocking_keys,keys[j],l);
793 incrRefCount(keys[j]);
794 redisAssertWithInfo(c,keys[j],retval == DICT_OK);
795 } else {
796 l = dictGetVal(de);
797 }
798 listAddNodeTail(l,c);
799 }
800 c->bpop.count = i;
801
802 /* Mark the client as a blocked client */
803 c->flags |= REDIS_BLOCKED;
804 server.bpop_blocked_clients++;
805 dictRelease(added);
806 }
807
808 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
809 void unblockClientWaitingData(redisClient *c) {
810 dictEntry *de;
811 list *l;
812 int j;
813
814 redisAssertWithInfo(c,NULL,c->bpop.keys != NULL);
815 /* The client may wait for multiple keys, so unblock it for every key. */
816 for (j = 0; j < c->bpop.count; j++) {
817 /* Remove this client from the list of clients waiting for this key. */
818 de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
819 redisAssertWithInfo(c,c->bpop.keys[j],de != NULL);
820 l = dictGetVal(de);
821 listDelNode(l,listSearchKey(l,c));
822 /* If the list is empty we need to remove it to avoid wasting memory */
823 if (listLength(l) == 0)
824 dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
825 decrRefCount(c->bpop.keys[j]);
826 }
827
828 /* Cleanup the client structure */
829 zfree(c->bpop.keys);
830 c->bpop.keys = NULL;
831 if (c->bpop.target) decrRefCount(c->bpop.target);
832 c->bpop.target = NULL;
833 c->flags &= ~REDIS_BLOCKED;
834 c->flags |= REDIS_UNBLOCKED;
835 server.bpop_blocked_clients--;
836 listAddNodeTail(server.unblocked_clients,c);
837 }
838
839 /* If the specified key has clients blocked waiting for list pushes, this
840 * function will put the key reference into the server.ready_keys list.
841 * Note that db->ready_keys is an hash table that allows us to avoid putting
842 * the same key agains and again in the list in case of multiple pushes
843 * made by a script or in the context of MULTI/EXEC.
844 *
845 * The list will be finally processed by handleClientsBlockedOnLists() */
846 void signalListAsReady(redisClient *c, robj *key) {
847 readyList *rl;
848
849 /* No clients blocking for this key? No need to queue it. */
850 if (dictFind(c->db->blocking_keys,key) == NULL) return;
851
852 /* Key was already signaled? No need to queue it again. */
853 if (dictFind(c->db->ready_keys,key) != NULL) return;
854
855 /* Ok, we need to queue this key into server.ready_keys. */
856 rl = zmalloc(sizeof(*rl));
857 rl->key = key;
858 rl->db = c->db;
859 incrRefCount(key);
860 listAddNodeTail(server.ready_keys,rl);
861
862 /* We also add the key in the db->ready_keys dictionary in order
863 * to avoid adding it multiple times into a list with a simple O(1)
864 * check. */
865 incrRefCount(key);
866 redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
867 }
868
869 /* This is an helper function for handleClientsBlockedOnLists(). It's work
870 * is to serve a specific client (receiver) that is blocked on 'key'
871 * in the context of the specified 'db', doing the following:
872 *
873 * 1) Provide the client with the 'value' element.
874 * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
875 * 'value' element on the destionation list (the LPUSH side of the command).
876 * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
877 * the AOF and replication channel.
878 *
879 * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
880 * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
881 * we can propagate the command properly.
882 *
883 * The function returns REDIS_OK if we are able to serve the client, otherwise
884 * REDIS_ERR is returned to signal the caller that the list POP operation
885 * should be undoed as the client was not served: This only happens for
886 * BRPOPLPUSH that fails to push the value to the destination key as it is
887 * of the wrong type. */
888 int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
889 {
890 robj *argv[3];
891
892 if (dstkey == NULL) {
893 /* Propagate the [LR]POP operation. */
894 argv[0] = (where == REDIS_HEAD) ? shared.lpop :
895 shared.rpop;
896 argv[1] = key;
897 propagate((where == REDIS_HEAD) ?
898 server.lpopCommand : server.rpopCommand,
899 db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
900
901 /* BRPOP/BLPOP */
902 addReplyMultiBulkLen(receiver,2);
903 addReplyBulk(receiver,key);
904 addReplyBulk(receiver,value);
905 } else {
906 /* BRPOPLPUSH */
907 robj *dstobj =
908 lookupKeyWrite(receiver->db,dstkey);
909 if (!(dstobj &&
910 checkType(receiver,dstobj,REDIS_LIST)))
911 {
912 /* Propagate the RPOP operation. */
913 argv[0] = shared.rpop;
914 argv[1] = key;
915 propagate(server.rpopCommand,
916 db->id,argv,2,
917 REDIS_PROPAGATE_AOF|
918 REDIS_PROPAGATE_REPL);
919 rpoplpushHandlePush(receiver,dstkey,dstobj,
920 value);
921 /* Propagate the LPUSH operation. */
922 argv[0] = shared.lpush;
923 argv[1] = dstkey;
924 argv[2] = value;
925 propagate(server.lpushCommand,
926 db->id,argv,3,
927 REDIS_PROPAGATE_AOF|
928 REDIS_PROPAGATE_REPL);
929 } else {
930 /* BRPOPLPUSH failed because of wrong
931 * destination type. */
932 return REDIS_ERR;
933 }
934 }
935 return REDIS_OK;
936 }
937
938 /* This function should be called by Redis every time a single command,
939 * a MULTI/EXEC block, or a Lua script, terminated its execution after
940 * being called by a client.
941 *
942 * All the keys with at least one client blocked that received at least
943 * one new element via some PUSH operation are accumulated into
944 * the server.ready_keys list. This function will run the list and will
945 * serve clients accordingly. Note that the function will iterate again and
946 * again as a result of serving BRPOPLPUSH we can have new blocking clients
947 * to serve because of the PUSH side of BRPOPLPUSH. */
948 void handleClientsBlockedOnLists(void) {
949 while(listLength(server.ready_keys) != 0) {
950 list *l;
951
952 /* Point server.ready_keys to a fresh list and save the current one
953 * locally. This way as we run the old list we are free to call
954 * signalListAsReady() that may push new elements in server.ready_keys
955 * when handling clients blocked into BRPOPLPUSH. */
956 l = server.ready_keys;
957 server.ready_keys = listCreate();
958
959 while(listLength(l) != 0) {
960 listNode *ln = listFirst(l);
961 readyList *rl = ln->value;
962
963 /* First of all remove this key from db->ready_keys so that
964 * we can safely call signalListAsReady() against this key. */
965 dictDelete(rl->db->ready_keys,rl->key);
966
967 /* If the key exists and it's a list, serve blocked clients
968 * with data. */
969 robj *o = lookupKeyWrite(rl->db,rl->key);
970 if (o != NULL && o->type == REDIS_LIST) {
971 dictEntry *de;
972
973 /* We serve clients in the same order they blocked for
974 * this key, from the first blocked to the last. */
975 de = dictFind(rl->db->blocking_keys,rl->key);
976 if (de) {
977 list *clients = dictGetVal(de);
978 int numclients = listLength(clients);
979
980 while(numclients--) {
981 listNode *clientnode = listFirst(clients);
982 redisClient *receiver = clientnode->value;
983 robj *dstkey = receiver->bpop.target;
984 int where = (receiver->lastcmd &&
985 receiver->lastcmd->proc == blpopCommand) ?
986 REDIS_HEAD : REDIS_TAIL;
987 robj *value = listTypePop(o,where);
988
989 if (value) {
990 /* Protect receiver->bpop.target, that will be
991 * freed by the next unblockClientWaitingData()
992 * call. */
993 if (dstkey) incrRefCount(dstkey);
994 unblockClientWaitingData(receiver);
995
996 if (serveClientBlockedOnList(receiver,
997 rl->key,dstkey,rl->db,value,
998 where) == REDIS_ERR)
999 {
1000 /* If we failed serving the client we need
1001 * to also undo the POP operation. */
1002 listTypePush(o,value,where);
1003 }
1004
1005 if (dstkey) decrRefCount(dstkey);
1006 decrRefCount(value);
1007 } else {
1008 break;
1009 }
1010 }
1011 }
1012
1013 if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
1014 /* We don't call signalModifiedKey() as it was already called
1015 * when an element was pushed on the list. */
1016 }
1017
1018 /* Free this item. */
1019 decrRefCount(rl->key);
1020 zfree(rl);
1021 listDelNode(l,ln);
1022 }
1023 listRelease(l); /* We have the new list on place at this point. */
1024 }
1025 }
1026
1027 int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
1028 long tval;
1029
1030 if (getLongFromObjectOrReply(c,object,&tval,
1031 "timeout is not an integer or out of range") != REDIS_OK)
1032 return REDIS_ERR;
1033
1034 if (tval < 0) {
1035 addReplyError(c,"timeout is negative");
1036 return REDIS_ERR;
1037 }
1038
1039 if (tval > 0) tval += server.unixtime;
1040 *timeout = tval;
1041
1042 return REDIS_OK;
1043 }
1044
1045 /* Blocking RPOP/LPOP */
1046 void blockingPopGenericCommand(redisClient *c, int where) {
1047 robj *o;
1048 time_t timeout;
1049 int j;
1050
1051 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
1052 return;
1053
1054 for (j = 1; j < c->argc-1; j++) {
1055 o = lookupKeyWrite(c->db,c->argv[j]);
1056 if (o != NULL) {
1057 if (o->type != REDIS_LIST) {
1058 addReply(c,shared.wrongtypeerr);
1059 return;
1060 } else {
1061 if (listTypeLength(o) != 0) {
1062 /* Non empty list, this is like a non normal [LR]POP. */
1063 robj *value = listTypePop(o,where);
1064 redisAssert(value != NULL);
1065
1066 addReplyMultiBulkLen(c,2);
1067 addReplyBulk(c,c->argv[j]);
1068 addReplyBulk(c,value);
1069 decrRefCount(value);
1070 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[j]);
1071 signalModifiedKey(c->db,c->argv[j]);
1072 server.dirty++;
1073
1074 /* Replicate it as an [LR]POP instead of B[LR]POP. */
1075 rewriteClientCommandVector(c,2,
1076 (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
1077 c->argv[j]);
1078 return;
1079 }
1080 }
1081 }
1082 }
1083
1084 /* If we are inside a MULTI/EXEC and the list is empty the only thing
1085 * we can do is treating it as a timeout (even with timeout 0). */
1086 if (c->flags & REDIS_MULTI) {
1087 addReply(c,shared.nullmultibulk);
1088 return;
1089 }
1090
1091 /* If the list is empty or the key does not exists we must block */
1092 blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
1093 }
1094
1095 void blpopCommand(redisClient *c) {
1096 blockingPopGenericCommand(c,REDIS_HEAD);
1097 }
1098
1099 void brpopCommand(redisClient *c) {
1100 blockingPopGenericCommand(c,REDIS_TAIL);
1101 }
1102
1103 void brpoplpushCommand(redisClient *c) {
1104 time_t timeout;
1105
1106 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
1107 return;
1108
1109 robj *key = lookupKeyWrite(c->db, c->argv[1]);
1110
1111 if (key == NULL) {
1112 if (c->flags & REDIS_MULTI) {
1113 /* Blocking against an empty list in a multi state
1114 * returns immediately. */
1115 addReply(c, shared.nullbulk);
1116 } else {
1117 /* The list is empty and the client blocks. */
1118 blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
1119 }
1120 } else {
1121 if (key->type != REDIS_LIST) {
1122 addReply(c, shared.wrongtypeerr);
1123 } else {
1124 /* The list exists and has elements, so
1125 * the regular rpoplpushCommand is executed. */
1126 redisAssertWithInfo(c,key,listTypeLength(key) > 0);
1127 rpoplpushCommand(c);
1128 }
1129 }
1130 }