]> git.saurik.com Git - redis.git/blame - src/t_list.c
Make an EXEC test more latency proof.
[redis.git] / src / t_list.c
CommitLineData
d288ee65 1/*
2 * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions are met:
7 *
8 * * Redistributions of source code must retain the above copyright notice,
9 * this list of conditions and the following disclaimer.
10 * * Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 * * Neither the name of Redis nor the names of its contributors may be used
14 * to endorse or promote products derived from this software without
15 * specific prior written permission.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
21 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 * POSSIBILITY OF SUCH DAMAGE.
28 */
29
e2641e09 30#include "redis.h"
31
f444e2af 32void signalListAsReady(redisClient *c, robj *key);
33
e2641e09 34/*-----------------------------------------------------------------------------
35 * List API
36 *----------------------------------------------------------------------------*/
37
38/* Check the argument length to see if it requires us to convert the ziplist
39 * to a real list. Only check raw-encoded objects because integer encoded
40 * objects are never too long. */
41void listTypeTryConversion(robj *subject, robj *value) {
42 if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
43 if (value->encoding == REDIS_ENCODING_RAW &&
44 sdslen(value->ptr) > server.list_max_ziplist_value)
45 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
46}
47
f444e2af 48/* The function pushes an elmenet to the specified list object 'subject',
49 * at head or tail position as specified by 'where'.
50 *
51 * There is no need for the caller to incremnet the refcount of 'value' as
52 * the function takes care of it if needed. */
e2641e09 53void listTypePush(robj *subject, robj *value, int where) {
54 /* Check if we need to convert the ziplist */
55 listTypeTryConversion(subject,value);
56 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
57 ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
58 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
59
60 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
61 int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
62 value = getDecodedObject(value);
63 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
64 decrRefCount(value);
65 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
66 if (where == REDIS_HEAD) {
67 listAddNodeHead(subject->ptr,value);
68 } else {
69 listAddNodeTail(subject->ptr,value);
70 }
71 incrRefCount(value);
72 } else {
73 redisPanic("Unknown list encoding");
74 }
75}
76
77robj *listTypePop(robj *subject, int where) {
78 robj *value = NULL;
79 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
80 unsigned char *p;
81 unsigned char *vstr;
82 unsigned int vlen;
83 long long vlong;
84 int pos = (where == REDIS_HEAD) ? 0 : -1;
85 p = ziplistIndex(subject->ptr,pos);
86 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
87 if (vstr) {
88 value = createStringObject((char*)vstr,vlen);
89 } else {
90 value = createStringObjectFromLongLong(vlong);
91 }
92 /* We only need to delete an element when it exists */
93 subject->ptr = ziplistDelete(subject->ptr,&p);
94 }
95 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
96 list *list = subject->ptr;
97 listNode *ln;
98 if (where == REDIS_HEAD) {
99 ln = listFirst(list);
100 } else {
101 ln = listLast(list);
102 }
103 if (ln != NULL) {
104 value = listNodeValue(ln);
105 incrRefCount(value);
106 listDelNode(list,ln);
107 }
108 } else {
109 redisPanic("Unknown list encoding");
110 }
111 return value;
112}
113
114unsigned long listTypeLength(robj *subject) {
115 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
116 return ziplistLen(subject->ptr);
117 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
118 return listLength((list*)subject->ptr);
119 } else {
120 redisPanic("Unknown list encoding");
121 }
122}
123
124/* Initialize an iterator at the specified index. */
3c08fdae 125listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) {
e2641e09 126 listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
127 li->subject = subject;
128 li->encoding = subject->encoding;
129 li->direction = direction;
130 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
131 li->zi = ziplistIndex(subject->ptr,index);
132 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
133 li->ln = listIndex(subject->ptr,index);
134 } else {
135 redisPanic("Unknown list encoding");
136 }
137 return li;
138}
139
140/* Clean up the iterator. */
141void listTypeReleaseIterator(listTypeIterator *li) {
142 zfree(li);
143}
144
145/* Stores pointer to current the entry in the provided entry structure
146 * and advances the position of the iterator. Returns 1 when the current
147 * entry is in fact an entry, 0 otherwise. */
148int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
149 /* Protect from converting when iterating */
150 redisAssert(li->subject->encoding == li->encoding);
151
152 entry->li = li;
153 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
154 entry->zi = li->zi;
155 if (entry->zi != NULL) {
156 if (li->direction == REDIS_TAIL)
157 li->zi = ziplistNext(li->subject->ptr,li->zi);
158 else
159 li->zi = ziplistPrev(li->subject->ptr,li->zi);
160 return 1;
161 }
162 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
163 entry->ln = li->ln;
164 if (entry->ln != NULL) {
165 if (li->direction == REDIS_TAIL)
166 li->ln = li->ln->next;
167 else
168 li->ln = li->ln->prev;
169 return 1;
170 }
171 } else {
172 redisPanic("Unknown list encoding");
173 }
174 return 0;
175}
176
177/* Return entry or NULL at the current position of the iterator. */
178robj *listTypeGet(listTypeEntry *entry) {
179 listTypeIterator *li = entry->li;
180 robj *value = NULL;
181 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
182 unsigned char *vstr;
183 unsigned int vlen;
184 long long vlong;
185 redisAssert(entry->zi != NULL);
186 if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
187 if (vstr) {
188 value = createStringObject((char*)vstr,vlen);
189 } else {
190 value = createStringObjectFromLongLong(vlong);
191 }
192 }
193 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
194 redisAssert(entry->ln != NULL);
195 value = listNodeValue(entry->ln);
196 incrRefCount(value);
197 } else {
198 redisPanic("Unknown list encoding");
199 }
200 return value;
201}
202
203void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
204 robj *subject = entry->li->subject;
205 if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
206 value = getDecodedObject(value);
207 if (where == REDIS_TAIL) {
208 unsigned char *next = ziplistNext(subject->ptr,entry->zi);
209
210 /* When we insert after the current element, but the current element
211 * is the tail of the list, we need to do a push. */
212 if (next == NULL) {
213 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
214 } else {
215 subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
216 }
217 } else {
218 subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
219 }
220 decrRefCount(value);
221 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
222 if (where == REDIS_TAIL) {
223 listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
224 } else {
225 listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
226 }
227 incrRefCount(value);
228 } else {
229 redisPanic("Unknown list encoding");
230 }
231}
232
233/* Compare the given object with the entry at the current position. */
234int listTypeEqual(listTypeEntry *entry, robj *o) {
235 listTypeIterator *li = entry->li;
236 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
eab0e26e 237 redisAssertWithInfo(NULL,o,o->encoding == REDIS_ENCODING_RAW);
e2641e09 238 return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
239 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
240 return equalStringObjects(o,listNodeValue(entry->ln));
241 } else {
242 redisPanic("Unknown list encoding");
243 }
244}
245
246/* Delete the element pointed to. */
247void listTypeDelete(listTypeEntry *entry) {
248 listTypeIterator *li = entry->li;
249 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
250 unsigned char *p = entry->zi;
251 li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
252
253 /* Update position of the iterator depending on the direction */
254 if (li->direction == REDIS_TAIL)
255 li->zi = p;
256 else
257 li->zi = ziplistPrev(li->subject->ptr,p);
258 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
259 listNode *next;
260 if (li->direction == REDIS_TAIL)
261 next = entry->ln->next;
262 else
263 next = entry->ln->prev;
264 listDelNode(li->subject->ptr,entry->ln);
265 li->ln = next;
266 } else {
267 redisPanic("Unknown list encoding");
268 }
269}
270
271void listTypeConvert(robj *subject, int enc) {
272 listTypeIterator *li;
273 listTypeEntry entry;
eab0e26e 274 redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
e2641e09 275
276 if (enc == REDIS_ENCODING_LINKEDLIST) {
277 list *l = listCreate();
278 listSetFreeMethod(l,decrRefCount);
279
280 /* listTypeGet returns a robj with incremented refcount */
281 li = listTypeInitIterator(subject,0,REDIS_TAIL);
282 while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
283 listTypeReleaseIterator(li);
284
285 subject->encoding = REDIS_ENCODING_LINKEDLIST;
286 zfree(subject->ptr);
287 subject->ptr = l;
288 } else {
289 redisPanic("Unsupported list conversion");
290 }
291}
292
293/*-----------------------------------------------------------------------------
294 * List Commands
295 *----------------------------------------------------------------------------*/
296
297void pushGenericCommand(redisClient *c, int where) {
edba65d0 298 int j, waiting = 0, pushed = 0;
e2641e09 299 robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
fb2feae5 300 int may_have_waiting_clients = (lobj == NULL);
301
302 if (lobj && lobj->type != REDIS_LIST) {
303 addReply(c,shared.wrongtypeerr);
304 return;
305 }
306
f444e2af 307 if (may_have_waiting_clients) signalListAsReady(c,c->argv[1]);
308
fb2feae5 309 for (j = 2; j < c->argc; j++) {
310 c->argv[j] = tryObjectEncoding(c->argv[j]);
fb2feae5 311 if (!lobj) {
312 lobj = createZiplistObject();
313 dbAdd(c->db,c->argv[1],lobj);
e2641e09 314 }
fb2feae5 315 listTypePush(lobj,c->argv[j],where);
316 pushed++;
e2641e09 317 }
edba65d0 318 addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
fb2feae5 319 if (pushed) signalModifiedKey(c->db,c->argv[1]);
320 server.dirty += pushed;
e2641e09 321}
322
323void lpushCommand(redisClient *c) {
324 pushGenericCommand(c,REDIS_HEAD);
325}
326
327void rpushCommand(redisClient *c) {
328 pushGenericCommand(c,REDIS_TAIL);
329}
330
331void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
332 robj *subject;
333 listTypeIterator *iter;
334 listTypeEntry entry;
335 int inserted = 0;
336
337 if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
338 checkType(c,subject,REDIS_LIST)) return;
339
340 if (refval != NULL) {
341 /* Note: we expect refval to be string-encoded because it is *not* the
342 * last argument of the multi-bulk LINSERT. */
eab0e26e 343 redisAssertWithInfo(c,refval,refval->encoding == REDIS_ENCODING_RAW);
e2641e09 344
345 /* We're not sure if this value can be inserted yet, but we cannot
346 * convert the list inside the iterator. We don't want to loop over
347 * the list twice (once to see if the value can be inserted and once
348 * to do the actual insert), so we assume this value can be inserted
349 * and convert the ziplist to a regular list if necessary. */
350 listTypeTryConversion(subject,val);
351
352 /* Seek refval from head to tail */
353 iter = listTypeInitIterator(subject,0,REDIS_TAIL);
354 while (listTypeNext(iter,&entry)) {
355 if (listTypeEqual(&entry,refval)) {
356 listTypeInsert(&entry,val,where);
357 inserted = 1;
358 break;
359 }
360 }
361 listTypeReleaseIterator(iter);
362
363 if (inserted) {
364 /* Check if the length exceeds the ziplist length threshold. */
365 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
366 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
367 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
cea8c5cd 368 signalModifiedKey(c->db,c->argv[1]);
e2641e09 369 server.dirty++;
370 } else {
371 /* Notify client of a failed insert */
372 addReply(c,shared.cnegone);
373 return;
374 }
375 } else {
376 listTypePush(subject,val,where);
cea8c5cd 377 signalModifiedKey(c->db,c->argv[1]);
e2641e09 378 server.dirty++;
379 }
380
b70d3555 381 addReplyLongLong(c,listTypeLength(subject));
e2641e09 382}
383
384void lpushxCommand(redisClient *c) {
75b41de8 385 c->argv[2] = tryObjectEncoding(c->argv[2]);
e2641e09 386 pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
387}
388
389void rpushxCommand(redisClient *c) {
75b41de8 390 c->argv[2] = tryObjectEncoding(c->argv[2]);
e2641e09 391 pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
392}
393
394void linsertCommand(redisClient *c) {
75b41de8 395 c->argv[4] = tryObjectEncoding(c->argv[4]);
e2641e09 396 if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
397 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
398 } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
399 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD);
400 } else {
401 addReply(c,shared.syntaxerr);
402 }
403}
404
405void llenCommand(redisClient *c) {
406 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
407 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
b70d3555 408 addReplyLongLong(c,listTypeLength(o));
e2641e09 409}
410
411void lindexCommand(redisClient *c) {
412 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
413 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
706b32e0 414 long index;
e2641e09 415 robj *value = NULL;
416
706b32e0
B
417 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
418 return;
419
e2641e09 420 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
421 unsigned char *p;
422 unsigned char *vstr;
423 unsigned int vlen;
424 long long vlong;
425 p = ziplistIndex(o->ptr,index);
426 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
427 if (vstr) {
428 value = createStringObject((char*)vstr,vlen);
429 } else {
430 value = createStringObjectFromLongLong(vlong);
431 }
432 addReplyBulk(c,value);
433 decrRefCount(value);
434 } else {
435 addReply(c,shared.nullbulk);
436 }
437 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
438 listNode *ln = listIndex(o->ptr,index);
439 if (ln != NULL) {
440 value = listNodeValue(ln);
441 addReplyBulk(c,value);
442 } else {
443 addReply(c,shared.nullbulk);
444 }
445 } else {
446 redisPanic("Unknown list encoding");
447 }
448}
449
450void lsetCommand(redisClient *c) {
451 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
452 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
706b32e0 453 long index;
75b41de8 454 robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
e2641e09 455
706b32e0
B
456 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
457 return;
458
e2641e09 459 listTypeTryConversion(o,value);
460 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
461 unsigned char *p, *zl = o->ptr;
462 p = ziplistIndex(zl,index);
463 if (p == NULL) {
464 addReply(c,shared.outofrangeerr);
465 } else {
466 o->ptr = ziplistDelete(o->ptr,&p);
467 value = getDecodedObject(value);
468 o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
469 decrRefCount(value);
470 addReply(c,shared.ok);
cea8c5cd 471 signalModifiedKey(c->db,c->argv[1]);
e2641e09 472 server.dirty++;
473 }
474 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
475 listNode *ln = listIndex(o->ptr,index);
476 if (ln == NULL) {
477 addReply(c,shared.outofrangeerr);
478 } else {
479 decrRefCount((robj*)listNodeValue(ln));
480 listNodeValue(ln) = value;
481 incrRefCount(value);
482 addReply(c,shared.ok);
cea8c5cd 483 signalModifiedKey(c->db,c->argv[1]);
e2641e09 484 server.dirty++;
485 }
486 } else {
487 redisPanic("Unknown list encoding");
488 }
489}
490
491void popGenericCommand(redisClient *c, int where) {
492 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
493 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
494
495 robj *value = listTypePop(o,where);
496 if (value == NULL) {
497 addReply(c,shared.nullbulk);
498 } else {
499 addReplyBulk(c,value);
500 decrRefCount(value);
501 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
cea8c5cd 502 signalModifiedKey(c->db,c->argv[1]);
e2641e09 503 server.dirty++;
504 }
505}
506
507void lpopCommand(redisClient *c) {
508 popGenericCommand(c,REDIS_HEAD);
509}
510
511void rpopCommand(redisClient *c) {
512 popGenericCommand(c,REDIS_TAIL);
513}
514
515void lrangeCommand(redisClient *c) {
d51ebef5 516 robj *o;
3c08fdae 517 long start, end, llen, rangelen;
e2641e09 518
706b32e0
B
519 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
520 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
521
e2641e09 522 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
523 || checkType(c,o,REDIS_LIST)) return;
524 llen = listTypeLength(o);
525
526 /* convert negative indexes */
527 if (start < 0) start = llen+start;
528 if (end < 0) end = llen+end;
529 if (start < 0) start = 0;
e2641e09 530
d0a4e24e
PN
531 /* Invariant: start >= 0, so this test will be true when end < 0.
532 * The range is empty when start > end or start >= length. */
e2641e09 533 if (start > end || start >= llen) {
e2641e09 534 addReply(c,shared.emptymultibulk);
535 return;
536 }
537 if (end >= llen) end = llen-1;
538 rangelen = (end-start)+1;
539
540 /* Return the result in form of a multi-bulk reply */
0537e7bf 541 addReplyMultiBulkLen(c,rangelen);
d51ebef5 542 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
543 unsigned char *p = ziplistIndex(o->ptr,start);
544 unsigned char *vstr;
545 unsigned int vlen;
546 long long vlong;
547
548 while(rangelen--) {
549 ziplistGet(p,&vstr,&vlen,&vlong);
550 if (vstr) {
551 addReplyBulkCBuffer(c,vstr,vlen);
552 } else {
553 addReplyBulkLongLong(c,vlong);
554 }
555 p = ziplistNext(o->ptr,p);
556 }
557 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
7cfeb8cc 558 listNode *ln;
559
560 /* If we are nearest to the end of the list, reach the element
561 * starting from tail and going backward, as it is faster. */
562 if (start > llen/2) start -= llen;
563 ln = listIndex(o->ptr,start);
d51ebef5 564
565 while(rangelen--) {
566 addReplyBulk(c,ln->value);
567 ln = ln->next;
568 }
569 } else {
570 redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
e2641e09 571 }
e2641e09 572}
573
574void ltrimCommand(redisClient *c) {
575 robj *o;
3c08fdae 576 long start, end, llen, j, ltrim, rtrim;
e2641e09 577 list *list;
578 listNode *ln;
579
706b32e0
B
580 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
581 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
582
e2641e09 583 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
584 checkType(c,o,REDIS_LIST)) return;
585 llen = listTypeLength(o);
586
587 /* convert negative indexes */
588 if (start < 0) start = llen+start;
589 if (end < 0) end = llen+end;
590 if (start < 0) start = 0;
e2641e09 591
d0a4e24e
PN
592 /* Invariant: start >= 0, so this test will be true when end < 0.
593 * The range is empty when start > end or start >= length. */
e2641e09 594 if (start > end || start >= llen) {
595 /* Out of range start or start > end result in empty list */
596 ltrim = llen;
597 rtrim = 0;
598 } else {
599 if (end >= llen) end = llen-1;
600 ltrim = start;
601 rtrim = llen-end-1;
602 }
603
604 /* Remove list elements to perform the trim */
605 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
606 o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
607 o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
608 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
609 list = o->ptr;
610 for (j = 0; j < ltrim; j++) {
611 ln = listFirst(list);
612 listDelNode(list,ln);
613 }
614 for (j = 0; j < rtrim; j++) {
615 ln = listLast(list);
616 listDelNode(list,ln);
617 }
618 } else {
619 redisPanic("Unknown list encoding");
620 }
621 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
cea8c5cd 622 signalModifiedKey(c->db,c->argv[1]);
e2641e09 623 server.dirty++;
624 addReply(c,shared.ok);
625}
626
627void lremCommand(redisClient *c) {
75b41de8
PN
628 robj *subject, *obj;
629 obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
706b32e0 630 long toremove;
3c08fdae 631 long removed = 0;
e2641e09 632 listTypeEntry entry;
633
706b32e0
B
634 if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
635 return;
636
e2641e09 637 subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
638 if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
639
640 /* Make sure obj is raw when we're dealing with a ziplist */
641 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
642 obj = getDecodedObject(obj);
643
644 listTypeIterator *li;
645 if (toremove < 0) {
646 toremove = -toremove;
647 li = listTypeInitIterator(subject,-1,REDIS_HEAD);
648 } else {
649 li = listTypeInitIterator(subject,0,REDIS_TAIL);
650 }
651
652 while (listTypeNext(li,&entry)) {
653 if (listTypeEqual(&entry,obj)) {
654 listTypeDelete(&entry);
655 server.dirty++;
656 removed++;
657 if (toremove && removed == toremove) break;
658 }
659 }
660 listTypeReleaseIterator(li);
661
662 /* Clean up raw encoded object */
663 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
664 decrRefCount(obj);
665
666 if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
b70d3555 667 addReplyLongLong(c,removed);
cea8c5cd 668 if (removed) signalModifiedKey(c->db,c->argv[1]);
e2641e09 669}
670
671/* This is the semantic of this command:
672 * RPOPLPUSH srclist dstlist:
ac06fc01
PN
673 * IF LLEN(srclist) > 0
674 * element = RPOP srclist
675 * LPUSH dstlist element
676 * RETURN element
677 * ELSE
678 * RETURN nil
679 * END
e2641e09 680 * END
681 *
682 * The idea is to be able to get an element from a list in a reliable way
683 * since the element is not just returned but pushed against another list
684 * as well. This command was originally proposed by Ezra Zygmuntowicz.
685 */
ac06fc01 686
f444e2af 687void rpoplpushHandlePush(redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
688 /* Create the list if the key does not exist */
689 if (!dstobj) {
690 dstobj = createZiplistObject();
691 dbAdd(c->db,dstkey,dstobj);
692 signalListAsReady(c,dstkey);
ac06fc01 693 }
f444e2af 694 signalModifiedKey(c->db,dstkey);
695 listTypePush(dstobj,value,REDIS_HEAD);
ac06fc01
PN
696 /* Always send the pushed value to the client. */
697 addReplyBulk(c,value);
698}
699
8a979f03 700void rpoplpushCommand(redisClient *c) {
e2641e09 701 robj *sobj, *value;
702 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
703 checkType(c,sobj,REDIS_LIST)) return;
704
705 if (listTypeLength(sobj) == 0) {
60ef787e 706 /* This may only happen after loading very old RDB files. Recent
707 * versions of Redis delete keys of empty lists. */
e2641e09 708 addReply(c,shared.nullbulk);
709 } else {
710 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
c1c9d551 711 robj *touchedkey = c->argv[1];
712
e2641e09 713 if (dobj && checkType(c,dobj,REDIS_LIST)) return;
714 value = listTypePop(sobj,REDIS_TAIL);
c1c9d551 715 /* We saved touched key, and protect it, since rpoplpushHandlePush
f444e2af 716 * may change the client command argument vector (it does not
717 * currently). */
c1c9d551 718 incrRefCount(touchedkey);
f444e2af 719 rpoplpushHandlePush(c,c->argv[2],dobj,value);
e2641e09 720
721 /* listTypePop returns an object with its refcount incremented */
722 decrRefCount(value);
723
724 /* Delete the source list when it is empty */
c1c9d551 725 if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
726 signalModifiedKey(c->db,touchedkey);
727 decrRefCount(touchedkey);
e2641e09 728 server.dirty++;
729 }
730}
731
732/*-----------------------------------------------------------------------------
733 * Blocking POP operations
734 *----------------------------------------------------------------------------*/
735
f444e2af 736/* This is how the current blocking POP works, we use BLPOP as example:
e2641e09 737 * - If the user calls BLPOP and the key exists and contains a non empty list
738 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
f444e2af 739 * if blocking is not required.
e2641e09 740 * - If instead BLPOP is called and the key does not exists or the list is
741 * empty we need to block. In order to do so we remove the notification for
742 * new data to read in the client socket (so that we'll not serve new
743 * requests if the blocking request is not served). Also we put the client
744 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
745 * blocking for this keys.
746 * - If a PUSH operation against a key with blocked clients waiting is
f444e2af 747 * performed, we mark this key as "ready", and after the current command,
748 * MULTI/EXEC block, or script, is executed, we serve all the clients waiting
749 * for this list, from the one that blocked first, to the last, accordingly
750 * to the number of elements we have in the ready list.
e2641e09 751 */
752
753/* Set a client in blocking mode for the specified key, with the specified
754 * timeout */
ba3b4741 755void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
e2641e09 756 dictEntry *de;
757 list *l;
758 int j;
759
e3c51c4b
DJMM
760 c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
761 c->bpop.count = numkeys;
762 c->bpop.timeout = timeout;
763 c->bpop.target = target;
ba3b4741
DJMM
764
765 if (target != NULL) {
ecf94014 766 incrRefCount(target);
ba3b4741
DJMM
767 }
768
e2641e09 769 for (j = 0; j < numkeys; j++) {
770 /* Add the key in the client structure, to map clients -> keys */
e3c51c4b 771 c->bpop.keys[j] = keys[j];
e2641e09 772 incrRefCount(keys[j]);
773
774 /* And in the other "side", to map keys -> clients */
775 de = dictFind(c->db->blocking_keys,keys[j]);
776 if (de == NULL) {
777 int retval;
778
779 /* For every key we take a list of clients blocked for it */
780 l = listCreate();
781 retval = dictAdd(c->db->blocking_keys,keys[j],l);
782 incrRefCount(keys[j]);
eab0e26e 783 redisAssertWithInfo(c,keys[j],retval == DICT_OK);
e2641e09 784 } else {
c0ba9ebe 785 l = dictGetVal(de);
e2641e09 786 }
787 listAddNodeTail(l,c);
788 }
789 /* Mark the client as a blocked client */
790 c->flags |= REDIS_BLOCKED;
5fa95ad7 791 server.bpop_blocked_clients++;
e2641e09 792}
793
794/* Unblock a client that's waiting in a blocking operation such as BLPOP */
795void unblockClientWaitingData(redisClient *c) {
796 dictEntry *de;
797 list *l;
798 int j;
799
eab0e26e 800 redisAssertWithInfo(c,NULL,c->bpop.keys != NULL);
e2641e09 801 /* The client may wait for multiple keys, so unblock it for every key. */
e3c51c4b 802 for (j = 0; j < c->bpop.count; j++) {
e2641e09 803 /* Remove this client from the list of clients waiting for this key. */
e3c51c4b 804 de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
eab0e26e 805 redisAssertWithInfo(c,c->bpop.keys[j],de != NULL);
c0ba9ebe 806 l = dictGetVal(de);
e2641e09 807 listDelNode(l,listSearchKey(l,c));
808 /* If the list is empty we need to remove it to avoid wasting memory */
809 if (listLength(l) == 0)
e3c51c4b
DJMM
810 dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
811 decrRefCount(c->bpop.keys[j]);
e2641e09 812 }
ba3b4741 813
e2641e09 814 /* Cleanup the client structure */
e3c51c4b
DJMM
815 zfree(c->bpop.keys);
816 c->bpop.keys = NULL;
c1c9d551 817 if (c->bpop.target) decrRefCount(c->bpop.target);
e3c51c4b 818 c->bpop.target = NULL;
3bcffcbe
PN
819 c->flags &= ~REDIS_BLOCKED;
820 c->flags |= REDIS_UNBLOCKED;
5fa95ad7 821 server.bpop_blocked_clients--;
a4ce7581 822 listAddNodeTail(server.unblocked_clients,c);
e2641e09 823}
824
f444e2af 825/* If the specified key has clients blocked waiting for list pushes, this
826 * function will put the key reference into the server.ready_keys list.
827 * Note that db->ready_keys is an hash table that allows us to avoid putting
828 * the same key agains and again in the list in case of multiple pushes
829 * made by a script or in the context of MULTI/EXEC.
e2641e09 830 *
f444e2af 831 * The list will be finally processed by handleClientsBlockedOnLists() */
832void signalListAsReady(redisClient *c, robj *key) {
833 readyList *rl;
834
835 /* No clients blocking for this key? No need to queue it. */
836 if (dictFind(c->db->blocking_keys,key) == NULL) return;
837
838 /* Key was already signaled? No need to queue it again. */
839 if (dictFind(c->db->ready_keys,key) != NULL) return;
840
841 /* Ok, we need to queue this key into server.ready_keys. */
842 rl = zmalloc(sizeof(*rl));
843 rl->key = key;
844 rl->db = c->db;
845 incrRefCount(key);
846 listAddNodeTail(server.ready_keys,rl);
847
848 /* We also add the key in the db->ready_keys dictionary in order
849 * to avoid adding it multiple times into a list with a simple O(1)
850 * check. */
851 incrRefCount(key);
852 redisAssert(dictAdd(c->db->ready_keys,key,NULL) == DICT_OK);
853}
854
855/* This is an helper function for handleClientsBlockedOnLists(). It's work
856 * is to serve a specific client (receiver) that is blocked on 'key'
857 * in the context of the specified 'db', doing the following:
e2641e09 858 *
f444e2af 859 * 1) Provide the client with the 'value' element.
860 * 2) If the dstkey is not NULL (we are serving a BRPOPLPUSH) also push the
861 * 'value' element on the destionation list (the LPUSH side of the command).
862 * 3) Propagate the resulting BRPOP, BLPOP and additional LPUSH if any into
863 * the AOF and replication channel.
864 *
865 * The argument 'where' is REDIS_TAIL or REDIS_HEAD, and indicates if the
866 * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that
867 * we can propagate the command properly.
868 *
869 * The function returns REDIS_OK if we are able to serve the client, otherwise
870 * REDIS_ERR is returned to signal the caller that the list POP operation
871 * should be undoed as the client was not served: This only happens for
872 * BRPOPLPUSH that fails to push the value to the destination key as it is
873 * of the wrong type. */
874int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
875{
876 robj *argv[3];
877
878 if (dstkey == NULL) {
879 /* Propagate the [LR]POP operation. */
880 argv[0] = (where == REDIS_HEAD) ? shared.lpop :
881 shared.rpop;
882 argv[1] = key;
883 propagate((where == REDIS_HEAD) ?
884 server.lpopCommand : server.rpopCommand,
885 db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
886
887 /* BRPOP/BLPOP */
888 addReplyMultiBulkLen(receiver,2);
889 addReplyBulk(receiver,key);
890 addReplyBulk(receiver,value);
891 } else {
892 /* BRPOPLPUSH */
893 robj *dstobj =
894 lookupKeyWrite(receiver->db,dstkey);
895 if (!(dstobj &&
896 checkType(receiver,dstobj,REDIS_LIST)))
897 {
898 /* Propagate the RPOP operation. */
899 argv[0] = shared.rpop;
900 argv[1] = key;
901 propagate(server.rpopCommand,
902 db->id,argv,2,
903 REDIS_PROPAGATE_AOF|
904 REDIS_PROPAGATE_REPL);
905 rpoplpushHandlePush(receiver,dstkey,dstobj,
906 value);
907 /* Propagate the LPUSH operation. */
908 argv[0] = shared.lpush;
909 argv[1] = dstkey;
910 argv[2] = value;
911 propagate(server.lpushCommand,
912 db->id,argv,3,
913 REDIS_PROPAGATE_AOF|
914 REDIS_PROPAGATE_REPL);
8a88c368 915 } else {
f444e2af 916 /* BRPOPLPUSH failed because of wrong
917 * destination type. */
918 return REDIS_ERR;
8a88c368 919 }
b2a7fd0c 920 }
f444e2af 921 return REDIS_OK;
922}
e2641e09 923
f444e2af 924/* This function should be called by Redis every time a single command,
925 * a MULTI/EXEC block, or a Lua script, terminated its execution after
926 * being called by a client.
927 *
928 * All the keys with at least one client blocked that received at least
929 * one new element via some PUSH operation are accumulated into
930 * the server.ready_keys list. This function will run the list and will
931 * serve clients accordingly. Note that the function will iterate again and
932 * again as a result of serving BRPOPLPUSH we can have new blocking clients
933 * to serve because of the PUSH side of BRPOPLPUSH. */
934void handleClientsBlockedOnLists(void) {
935 while(listLength(server.ready_keys) != 0) {
936 list *l;
937
938 /* Point server.ready_keys to a fresh list and save the current one
939 * locally. This way as we run the old list we are free to call
940 * signalListAsReady() that may push new elements in server.ready_keys
941 * when handling clients blocked into BRPOPLPUSH. */
942 l = server.ready_keys;
943 server.ready_keys = listCreate();
944
945 while(listLength(l) != 0) {
946 listNode *ln = listFirst(l);
947 readyList *rl = ln->value;
948
949 /* First of all remove this key from db->ready_keys so that
950 * we can safely call signalListAsReady() against this key. */
951 dictDelete(rl->db->ready_keys,rl->key);
952
953 /* If the key exists and it's a list, serve blocked clients
954 * with data. */
955 robj *o = lookupKeyWrite(rl->db,rl->key);
956 if (o != NULL && o->type == REDIS_LIST) {
957 dictEntry *de;
958
959 /* We serve clients in the same order they blocked for
960 * this key, from the first blocked to the last. */
961 de = dictFind(rl->db->blocking_keys,rl->key);
962 if (de) {
963 list *clients = dictGetVal(de);
964 int numclients = listLength(clients);
965
966 while(numclients--) {
967 listNode *clientnode = listFirst(clients);
968 redisClient *receiver = clientnode->value;
969 robj *dstkey = receiver->bpop.target;
970 int where = (receiver->lastcmd &&
971 receiver->lastcmd->proc == blpopCommand) ?
972 REDIS_HEAD : REDIS_TAIL;
973 robj *value = listTypePop(o,where);
974
975 if (value) {
976 /* Protect receiver->bpop.target, that will be
977 * freed by the next unblockClientWaitingData()
978 * call. */
979 if (dstkey) incrRefCount(dstkey);
980 unblockClientWaitingData(receiver);
981
982 if (serveClientBlockedOnList(receiver,
983 rl->key,dstkey,rl->db,value,
984 where) == REDIS_ERR)
985 {
986 /* If we failed serving the client we need
987 * to also undo the POP operation. */
988 listTypePush(o,value,where);
989 }
990
991 if (dstkey) decrRefCount(dstkey);
992 decrRefCount(value);
993 } else {
994 break;
995 }
996 }
997 }
998
999 if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
1000 /* We don't call signalModifiedKey() as it was already called
1001 * when an element was pushed on the list. */
1002 }
1003
1004 /* Free this item. */
1005 decrRefCount(rl->key);
1006 zfree(rl);
1007 listDelNode(l,ln);
1008 }
1009 listRelease(l); /* We have the new list on place at this point. */
1010 }
e2641e09 1011}
1012
c8a0070a
PN
1013int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
1014 long tval;
59bd44d1 1015
c8a0070a
PN
1016 if (getLongFromObjectOrReply(c,object,&tval,
1017 "timeout is not an integer or out of range") != REDIS_OK)
59bd44d1 1018 return REDIS_ERR;
59bd44d1 1019
c8a0070a
PN
1020 if (tval < 0) {
1021 addReplyError(c,"timeout is negative");
59bd44d1
DJMM
1022 return REDIS_ERR;
1023 }
1024
56ff70f8 1025 if (tval > 0) tval += server.unixtime;
c8a0070a 1026 *timeout = tval;
59bd44d1
DJMM
1027
1028 return REDIS_OK;
e2641e09 1029}
1030
1031/* Blocking RPOP/LPOP */
1032void blockingPopGenericCommand(redisClient *c, int where) {
1033 robj *o;
1034 time_t timeout;
1035 int j;
1036
c8a0070a 1037 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
94364d53 1038 return;
94364d53 1039
e2641e09 1040 for (j = 1; j < c->argc-1; j++) {
1041 o = lookupKeyWrite(c->db,c->argv[j]);
1042 if (o != NULL) {
1043 if (o->type != REDIS_LIST) {
1044 addReply(c,shared.wrongtypeerr);
1045 return;
1046 } else {
1047 if (listTypeLength(o) != 0) {
c1db214e 1048 /* Non empty list, this is like a non normal [LR]POP. */
1049 robj *value = listTypePop(o,where);
1050 redisAssert(value != NULL);
b2a7fd0c 1051
c1db214e 1052 addReplyMultiBulkLen(c,2);
1053 addReplyBulk(c,c->argv[j]);
1054 addReplyBulk(c,value);
1055 decrRefCount(value);
1056 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[j]);
1057 signalModifiedKey(c->db,c->argv[j]);
1058 server.dirty++;
1059
1060 /* Replicate it as an [LR]POP instead of B[LR]POP. */
1061 rewriteClientCommandVector(c,2,
1062 (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
1063 c->argv[j]);
e2641e09 1064 return;
1065 }
1066 }
1067 }
1068 }
94364d53 1069
fb92ecec 1070 /* If we are inside a MULTI/EXEC and the list is empty the only thing
1071 * we can do is treating it as a timeout (even with timeout 0). */
1072 if (c->flags & REDIS_MULTI) {
1073 addReply(c,shared.nullmultibulk);
1074 return;
1075 }
1076
e2641e09 1077 /* If the list is empty or the key does not exists we must block */
ba3b4741 1078 blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
e2641e09 1079}
1080
1081void blpopCommand(redisClient *c) {
1082 blockingPopGenericCommand(c,REDIS_HEAD);
1083}
1084
1085void brpopCommand(redisClient *c) {
1086 blockingPopGenericCommand(c,REDIS_TAIL);
1087}
b2a7fd0c
DJMM
1088
1089void brpoplpushCommand(redisClient *c) {
ba3b4741 1090 time_t timeout;
b2a7fd0c 1091
c8a0070a 1092 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
ba3b4741 1093 return;
ba3b4741
DJMM
1094
1095 robj *key = lookupKeyWrite(c->db, c->argv[1]);
1096
ba3b4741 1097 if (key == NULL) {
ba3b4741 1098 if (c->flags & REDIS_MULTI) {
7c25a43a
DJMM
1099 /* Blocking against an empty list in a multi state
1100 * returns immediately. */
d5870d7a 1101 addReply(c, shared.nullbulk);
ba3b4741 1102 } else {
7c25a43a 1103 /* The list is empty and the client blocks. */
ba3b4741
DJMM
1104 blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
1105 }
ba3b4741 1106 } else {
7c25a43a
DJMM
1107 if (key->type != REDIS_LIST) {
1108 addReply(c, shared.wrongtypeerr);
1109 } else {
7c25a43a
DJMM
1110 /* The list exists and has elements, so
1111 * the regular rpoplpushCommand is executed. */
eab0e26e 1112 redisAssertWithInfo(c,key,listTypeLength(key) > 0);
7c25a43a
DJMM
1113 rpoplpushCommand(c);
1114 }
ba3b4741 1115 }
b2a7fd0c 1116}