]> git.saurik.com Git - redis.git/blame - src/t_list.c
Allow Pub/Sub in contexts where other commands are blocked.
[redis.git] / src / t_list.c
CommitLineData
e2641e09 1#include "redis.h"
2
3/*-----------------------------------------------------------------------------
4 * List API
5 *----------------------------------------------------------------------------*/
6
7/* Check the argument length to see if it requires us to convert the ziplist
8 * to a real list. Only check raw-encoded objects because integer encoded
9 * objects are never too long. */
10void listTypeTryConversion(robj *subject, robj *value) {
11 if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
12 if (value->encoding == REDIS_ENCODING_RAW &&
13 sdslen(value->ptr) > server.list_max_ziplist_value)
14 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
15}
16
17void listTypePush(robj *subject, robj *value, int where) {
18 /* Check if we need to convert the ziplist */
19 listTypeTryConversion(subject,value);
20 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
21 ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
22 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
23
24 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
25 int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
26 value = getDecodedObject(value);
27 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
28 decrRefCount(value);
29 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
30 if (where == REDIS_HEAD) {
31 listAddNodeHead(subject->ptr,value);
32 } else {
33 listAddNodeTail(subject->ptr,value);
34 }
35 incrRefCount(value);
36 } else {
37 redisPanic("Unknown list encoding");
38 }
39}
40
41robj *listTypePop(robj *subject, int where) {
42 robj *value = NULL;
43 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
44 unsigned char *p;
45 unsigned char *vstr;
46 unsigned int vlen;
47 long long vlong;
48 int pos = (where == REDIS_HEAD) ? 0 : -1;
49 p = ziplistIndex(subject->ptr,pos);
50 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
51 if (vstr) {
52 value = createStringObject((char*)vstr,vlen);
53 } else {
54 value = createStringObjectFromLongLong(vlong);
55 }
56 /* We only need to delete an element when it exists */
57 subject->ptr = ziplistDelete(subject->ptr,&p);
58 }
59 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
60 list *list = subject->ptr;
61 listNode *ln;
62 if (where == REDIS_HEAD) {
63 ln = listFirst(list);
64 } else {
65 ln = listLast(list);
66 }
67 if (ln != NULL) {
68 value = listNodeValue(ln);
69 incrRefCount(value);
70 listDelNode(list,ln);
71 }
72 } else {
73 redisPanic("Unknown list encoding");
74 }
75 return value;
76}
77
78unsigned long listTypeLength(robj *subject) {
79 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
80 return ziplistLen(subject->ptr);
81 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
82 return listLength((list*)subject->ptr);
83 } else {
84 redisPanic("Unknown list encoding");
85 }
86}
87
88/* Initialize an iterator at the specified index. */
3c08fdae 89listTypeIterator *listTypeInitIterator(robj *subject, long index, unsigned char direction) {
e2641e09 90 listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
91 li->subject = subject;
92 li->encoding = subject->encoding;
93 li->direction = direction;
94 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
95 li->zi = ziplistIndex(subject->ptr,index);
96 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
97 li->ln = listIndex(subject->ptr,index);
98 } else {
99 redisPanic("Unknown list encoding");
100 }
101 return li;
102}
103
104/* Clean up the iterator. */
105void listTypeReleaseIterator(listTypeIterator *li) {
106 zfree(li);
107}
108
109/* Stores pointer to current the entry in the provided entry structure
110 * and advances the position of the iterator. Returns 1 when the current
111 * entry is in fact an entry, 0 otherwise. */
112int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
113 /* Protect from converting when iterating */
114 redisAssert(li->subject->encoding == li->encoding);
115
116 entry->li = li;
117 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
118 entry->zi = li->zi;
119 if (entry->zi != NULL) {
120 if (li->direction == REDIS_TAIL)
121 li->zi = ziplistNext(li->subject->ptr,li->zi);
122 else
123 li->zi = ziplistPrev(li->subject->ptr,li->zi);
124 return 1;
125 }
126 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
127 entry->ln = li->ln;
128 if (entry->ln != NULL) {
129 if (li->direction == REDIS_TAIL)
130 li->ln = li->ln->next;
131 else
132 li->ln = li->ln->prev;
133 return 1;
134 }
135 } else {
136 redisPanic("Unknown list encoding");
137 }
138 return 0;
139}
140
141/* Return entry or NULL at the current position of the iterator. */
142robj *listTypeGet(listTypeEntry *entry) {
143 listTypeIterator *li = entry->li;
144 robj *value = NULL;
145 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
146 unsigned char *vstr;
147 unsigned int vlen;
148 long long vlong;
149 redisAssert(entry->zi != NULL);
150 if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
151 if (vstr) {
152 value = createStringObject((char*)vstr,vlen);
153 } else {
154 value = createStringObjectFromLongLong(vlong);
155 }
156 }
157 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
158 redisAssert(entry->ln != NULL);
159 value = listNodeValue(entry->ln);
160 incrRefCount(value);
161 } else {
162 redisPanic("Unknown list encoding");
163 }
164 return value;
165}
166
167void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
168 robj *subject = entry->li->subject;
169 if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
170 value = getDecodedObject(value);
171 if (where == REDIS_TAIL) {
172 unsigned char *next = ziplistNext(subject->ptr,entry->zi);
173
174 /* When we insert after the current element, but the current element
175 * is the tail of the list, we need to do a push. */
176 if (next == NULL) {
177 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
178 } else {
179 subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
180 }
181 } else {
182 subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
183 }
184 decrRefCount(value);
185 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
186 if (where == REDIS_TAIL) {
187 listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
188 } else {
189 listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
190 }
191 incrRefCount(value);
192 } else {
193 redisPanic("Unknown list encoding");
194 }
195}
196
197/* Compare the given object with the entry at the current position. */
198int listTypeEqual(listTypeEntry *entry, robj *o) {
199 listTypeIterator *li = entry->li;
200 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
eab0e26e 201 redisAssertWithInfo(NULL,o,o->encoding == REDIS_ENCODING_RAW);
e2641e09 202 return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
203 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
204 return equalStringObjects(o,listNodeValue(entry->ln));
205 } else {
206 redisPanic("Unknown list encoding");
207 }
208}
209
210/* Delete the element pointed to. */
211void listTypeDelete(listTypeEntry *entry) {
212 listTypeIterator *li = entry->li;
213 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
214 unsigned char *p = entry->zi;
215 li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
216
217 /* Update position of the iterator depending on the direction */
218 if (li->direction == REDIS_TAIL)
219 li->zi = p;
220 else
221 li->zi = ziplistPrev(li->subject->ptr,p);
222 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
223 listNode *next;
224 if (li->direction == REDIS_TAIL)
225 next = entry->ln->next;
226 else
227 next = entry->ln->prev;
228 listDelNode(li->subject->ptr,entry->ln);
229 li->ln = next;
230 } else {
231 redisPanic("Unknown list encoding");
232 }
233}
234
235void listTypeConvert(robj *subject, int enc) {
236 listTypeIterator *li;
237 listTypeEntry entry;
eab0e26e 238 redisAssertWithInfo(NULL,subject,subject->type == REDIS_LIST);
e2641e09 239
240 if (enc == REDIS_ENCODING_LINKEDLIST) {
241 list *l = listCreate();
242 listSetFreeMethod(l,decrRefCount);
243
244 /* listTypeGet returns a robj with incremented refcount */
245 li = listTypeInitIterator(subject,0,REDIS_TAIL);
246 while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
247 listTypeReleaseIterator(li);
248
249 subject->encoding = REDIS_ENCODING_LINKEDLIST;
250 zfree(subject->ptr);
251 subject->ptr = l;
252 } else {
253 redisPanic("Unsupported list conversion");
254 }
255}
256
257/*-----------------------------------------------------------------------------
258 * List Commands
259 *----------------------------------------------------------------------------*/
260
261void pushGenericCommand(redisClient *c, int where) {
edba65d0 262 int j, waiting = 0, pushed = 0;
e2641e09 263 robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
fb2feae5 264 int may_have_waiting_clients = (lobj == NULL);
265
266 if (lobj && lobj->type != REDIS_LIST) {
267 addReply(c,shared.wrongtypeerr);
268 return;
269 }
270
271 for (j = 2; j < c->argc; j++) {
272 c->argv[j] = tryObjectEncoding(c->argv[j]);
273 if (may_have_waiting_clients) {
274 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[j])) {
edba65d0 275 waiting++;
fb2feae5 276 continue;
277 } else {
278 may_have_waiting_clients = 0;
279 }
e2641e09 280 }
fb2feae5 281 if (!lobj) {
282 lobj = createZiplistObject();
283 dbAdd(c->db,c->argv[1],lobj);
e2641e09 284 }
fb2feae5 285 listTypePush(lobj,c->argv[j],where);
286 pushed++;
e2641e09 287 }
edba65d0 288 addReplyLongLong(c, waiting + (lobj ? listTypeLength(lobj) : 0));
fb2feae5 289 if (pushed) signalModifiedKey(c->db,c->argv[1]);
290 server.dirty += pushed;
eeb34eff 291
292 /* Alter the replication of the command accordingly to the number of
293 * list elements delivered to clients waiting into a blocking operation.
294 * We do that only if there were waiting clients, and only if still some
295 * element was pushed into the list (othewise dirty is 0 and nothign will
296 * be propagated). */
297 if (waiting && pushed) {
298 /* CMD KEY a b C D E */
cd8bdea3 299 for (j = 0; j < waiting; j++) decrRefCount(c->argv[j+2]);
300 memmove(c->argv+2,c->argv+2+waiting,sizeof(robj*)*pushed);
eeb34eff 301 c->argc -= waiting;
302 }
e2641e09 303}
304
305void lpushCommand(redisClient *c) {
306 pushGenericCommand(c,REDIS_HEAD);
307}
308
309void rpushCommand(redisClient *c) {
310 pushGenericCommand(c,REDIS_TAIL);
311}
312
313void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
314 robj *subject;
315 listTypeIterator *iter;
316 listTypeEntry entry;
317 int inserted = 0;
318
319 if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
320 checkType(c,subject,REDIS_LIST)) return;
321
322 if (refval != NULL) {
323 /* Note: we expect refval to be string-encoded because it is *not* the
324 * last argument of the multi-bulk LINSERT. */
eab0e26e 325 redisAssertWithInfo(c,refval,refval->encoding == REDIS_ENCODING_RAW);
e2641e09 326
327 /* We're not sure if this value can be inserted yet, but we cannot
328 * convert the list inside the iterator. We don't want to loop over
329 * the list twice (once to see if the value can be inserted and once
330 * to do the actual insert), so we assume this value can be inserted
331 * and convert the ziplist to a regular list if necessary. */
332 listTypeTryConversion(subject,val);
333
334 /* Seek refval from head to tail */
335 iter = listTypeInitIterator(subject,0,REDIS_TAIL);
336 while (listTypeNext(iter,&entry)) {
337 if (listTypeEqual(&entry,refval)) {
338 listTypeInsert(&entry,val,where);
339 inserted = 1;
340 break;
341 }
342 }
343 listTypeReleaseIterator(iter);
344
345 if (inserted) {
346 /* Check if the length exceeds the ziplist length threshold. */
347 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
348 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
349 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
cea8c5cd 350 signalModifiedKey(c->db,c->argv[1]);
e2641e09 351 server.dirty++;
352 } else {
353 /* Notify client of a failed insert */
354 addReply(c,shared.cnegone);
355 return;
356 }
357 } else {
358 listTypePush(subject,val,where);
cea8c5cd 359 signalModifiedKey(c->db,c->argv[1]);
e2641e09 360 server.dirty++;
361 }
362
b70d3555 363 addReplyLongLong(c,listTypeLength(subject));
e2641e09 364}
365
366void lpushxCommand(redisClient *c) {
75b41de8 367 c->argv[2] = tryObjectEncoding(c->argv[2]);
e2641e09 368 pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
369}
370
371void rpushxCommand(redisClient *c) {
75b41de8 372 c->argv[2] = tryObjectEncoding(c->argv[2]);
e2641e09 373 pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
374}
375
376void linsertCommand(redisClient *c) {
75b41de8 377 c->argv[4] = tryObjectEncoding(c->argv[4]);
e2641e09 378 if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
379 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
380 } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
381 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD);
382 } else {
383 addReply(c,shared.syntaxerr);
384 }
385}
386
387void llenCommand(redisClient *c) {
388 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
389 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
b70d3555 390 addReplyLongLong(c,listTypeLength(o));
e2641e09 391}
392
393void lindexCommand(redisClient *c) {
394 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
395 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
706b32e0 396 long index;
e2641e09 397 robj *value = NULL;
398
706b32e0
B
399 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
400 return;
401
e2641e09 402 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
403 unsigned char *p;
404 unsigned char *vstr;
405 unsigned int vlen;
406 long long vlong;
407 p = ziplistIndex(o->ptr,index);
408 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
409 if (vstr) {
410 value = createStringObject((char*)vstr,vlen);
411 } else {
412 value = createStringObjectFromLongLong(vlong);
413 }
414 addReplyBulk(c,value);
415 decrRefCount(value);
416 } else {
417 addReply(c,shared.nullbulk);
418 }
419 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
420 listNode *ln = listIndex(o->ptr,index);
421 if (ln != NULL) {
422 value = listNodeValue(ln);
423 addReplyBulk(c,value);
424 } else {
425 addReply(c,shared.nullbulk);
426 }
427 } else {
428 redisPanic("Unknown list encoding");
429 }
430}
431
432void lsetCommand(redisClient *c) {
433 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
434 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
706b32e0 435 long index;
75b41de8 436 robj *value = (c->argv[3] = tryObjectEncoding(c->argv[3]));
e2641e09 437
706b32e0
B
438 if ((getLongFromObjectOrReply(c, c->argv[2], &index, NULL) != REDIS_OK))
439 return;
440
e2641e09 441 listTypeTryConversion(o,value);
442 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
443 unsigned char *p, *zl = o->ptr;
444 p = ziplistIndex(zl,index);
445 if (p == NULL) {
446 addReply(c,shared.outofrangeerr);
447 } else {
448 o->ptr = ziplistDelete(o->ptr,&p);
449 value = getDecodedObject(value);
450 o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
451 decrRefCount(value);
452 addReply(c,shared.ok);
cea8c5cd 453 signalModifiedKey(c->db,c->argv[1]);
e2641e09 454 server.dirty++;
455 }
456 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
457 listNode *ln = listIndex(o->ptr,index);
458 if (ln == NULL) {
459 addReply(c,shared.outofrangeerr);
460 } else {
461 decrRefCount((robj*)listNodeValue(ln));
462 listNodeValue(ln) = value;
463 incrRefCount(value);
464 addReply(c,shared.ok);
cea8c5cd 465 signalModifiedKey(c->db,c->argv[1]);
e2641e09 466 server.dirty++;
467 }
468 } else {
469 redisPanic("Unknown list encoding");
470 }
471}
472
473void popGenericCommand(redisClient *c, int where) {
474 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
475 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
476
477 robj *value = listTypePop(o,where);
478 if (value == NULL) {
479 addReply(c,shared.nullbulk);
480 } else {
481 addReplyBulk(c,value);
482 decrRefCount(value);
483 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
cea8c5cd 484 signalModifiedKey(c->db,c->argv[1]);
e2641e09 485 server.dirty++;
486 }
487}
488
489void lpopCommand(redisClient *c) {
490 popGenericCommand(c,REDIS_HEAD);
491}
492
493void rpopCommand(redisClient *c) {
494 popGenericCommand(c,REDIS_TAIL);
495}
496
497void lrangeCommand(redisClient *c) {
d51ebef5 498 robj *o;
3c08fdae 499 long start, end, llen, rangelen;
e2641e09 500
706b32e0
B
501 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
502 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
503
e2641e09 504 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
505 || checkType(c,o,REDIS_LIST)) return;
506 llen = listTypeLength(o);
507
508 /* convert negative indexes */
509 if (start < 0) start = llen+start;
510 if (end < 0) end = llen+end;
511 if (start < 0) start = 0;
e2641e09 512
d0a4e24e
PN
513 /* Invariant: start >= 0, so this test will be true when end < 0.
514 * The range is empty when start > end or start >= length. */
e2641e09 515 if (start > end || start >= llen) {
e2641e09 516 addReply(c,shared.emptymultibulk);
517 return;
518 }
519 if (end >= llen) end = llen-1;
520 rangelen = (end-start)+1;
521
522 /* Return the result in form of a multi-bulk reply */
0537e7bf 523 addReplyMultiBulkLen(c,rangelen);
d51ebef5 524 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
525 unsigned char *p = ziplistIndex(o->ptr,start);
526 unsigned char *vstr;
527 unsigned int vlen;
528 long long vlong;
529
530 while(rangelen--) {
531 ziplistGet(p,&vstr,&vlen,&vlong);
532 if (vstr) {
533 addReplyBulkCBuffer(c,vstr,vlen);
534 } else {
535 addReplyBulkLongLong(c,vlong);
536 }
537 p = ziplistNext(o->ptr,p);
538 }
539 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
7cfeb8cc 540 listNode *ln;
541
542 /* If we are nearest to the end of the list, reach the element
543 * starting from tail and going backward, as it is faster. */
544 if (start > llen/2) start -= llen;
545 ln = listIndex(o->ptr,start);
d51ebef5 546
547 while(rangelen--) {
548 addReplyBulk(c,ln->value);
549 ln = ln->next;
550 }
551 } else {
552 redisPanic("List encoding is not LINKEDLIST nor ZIPLIST!");
e2641e09 553 }
e2641e09 554}
555
556void ltrimCommand(redisClient *c) {
557 robj *o;
3c08fdae 558 long start, end, llen, j, ltrim, rtrim;
e2641e09 559 list *list;
560 listNode *ln;
561
706b32e0
B
562 if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != REDIS_OK) ||
563 (getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != REDIS_OK)) return;
564
e2641e09 565 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
566 checkType(c,o,REDIS_LIST)) return;
567 llen = listTypeLength(o);
568
569 /* convert negative indexes */
570 if (start < 0) start = llen+start;
571 if (end < 0) end = llen+end;
572 if (start < 0) start = 0;
e2641e09 573
d0a4e24e
PN
574 /* Invariant: start >= 0, so this test will be true when end < 0.
575 * The range is empty when start > end or start >= length. */
e2641e09 576 if (start > end || start >= llen) {
577 /* Out of range start or start > end result in empty list */
578 ltrim = llen;
579 rtrim = 0;
580 } else {
581 if (end >= llen) end = llen-1;
582 ltrim = start;
583 rtrim = llen-end-1;
584 }
585
586 /* Remove list elements to perform the trim */
587 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
588 o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
589 o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
590 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
591 list = o->ptr;
592 for (j = 0; j < ltrim; j++) {
593 ln = listFirst(list);
594 listDelNode(list,ln);
595 }
596 for (j = 0; j < rtrim; j++) {
597 ln = listLast(list);
598 listDelNode(list,ln);
599 }
600 } else {
601 redisPanic("Unknown list encoding");
602 }
603 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
cea8c5cd 604 signalModifiedKey(c->db,c->argv[1]);
e2641e09 605 server.dirty++;
606 addReply(c,shared.ok);
607}
608
609void lremCommand(redisClient *c) {
75b41de8
PN
610 robj *subject, *obj;
611 obj = c->argv[3] = tryObjectEncoding(c->argv[3]);
706b32e0 612 long toremove;
3c08fdae 613 long removed = 0;
e2641e09 614 listTypeEntry entry;
615
706b32e0
B
616 if ((getLongFromObjectOrReply(c, c->argv[2], &toremove, NULL) != REDIS_OK))
617 return;
618
e2641e09 619 subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
620 if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
621
622 /* Make sure obj is raw when we're dealing with a ziplist */
623 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
624 obj = getDecodedObject(obj);
625
626 listTypeIterator *li;
627 if (toremove < 0) {
628 toremove = -toremove;
629 li = listTypeInitIterator(subject,-1,REDIS_HEAD);
630 } else {
631 li = listTypeInitIterator(subject,0,REDIS_TAIL);
632 }
633
634 while (listTypeNext(li,&entry)) {
635 if (listTypeEqual(&entry,obj)) {
636 listTypeDelete(&entry);
637 server.dirty++;
638 removed++;
639 if (toremove && removed == toremove) break;
640 }
641 }
642 listTypeReleaseIterator(li);
643
644 /* Clean up raw encoded object */
645 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
646 decrRefCount(obj);
647
648 if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
b70d3555 649 addReplyLongLong(c,removed);
cea8c5cd 650 if (removed) signalModifiedKey(c->db,c->argv[1]);
e2641e09 651}
652
653/* This is the semantic of this command:
654 * RPOPLPUSH srclist dstlist:
ac06fc01
PN
655 * IF LLEN(srclist) > 0
656 * element = RPOP srclist
657 * LPUSH dstlist element
658 * RETURN element
659 * ELSE
660 * RETURN nil
661 * END
e2641e09 662 * END
663 *
664 * The idea is to be able to get an element from a list in a reliable way
665 * since the element is not just returned but pushed against another list
666 * as well. This command was originally proposed by Ezra Zygmuntowicz.
667 */
ac06fc01 668
c1c9d551 669void rpoplpushHandlePush(redisClient *origclient, redisClient *c, robj *dstkey, robj *dstobj, robj *value) {
c47d152c 670 if (!handleClientsWaitingListPush(origclient,dstkey,value)) {
ac06fc01
PN
671 /* Create the list if the key does not exist */
672 if (!dstobj) {
673 dstobj = createZiplistObject();
674 dbAdd(c->db,dstkey,dstobj);
675 } else {
cea8c5cd 676 signalModifiedKey(c->db,dstkey);
ac06fc01
PN
677 }
678 listTypePush(dstobj,value,REDIS_HEAD);
eeb34eff 679 /* Additionally propagate this PUSH operation together with
680 * the operation performed by the command. */
681 {
682 robj **argv = zmalloc(sizeof(robj*)*3);
683 argv[0] = createStringObject("LPUSH",5);
684 argv[1] = dstkey;
685 argv[2] = value;
686 incrRefCount(argv[1]);
687 incrRefCount(argv[2]);
688 alsoPropagate(server.lpushCommand,c->db->id,argv,3,
689 REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
c1c9d551 690 }
ac06fc01 691 }
ac06fc01
PN
692 /* Always send the pushed value to the client. */
693 addReplyBulk(c,value);
694}
695
8a979f03 696void rpoplpushCommand(redisClient *c) {
e2641e09 697 robj *sobj, *value;
698 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
699 checkType(c,sobj,REDIS_LIST)) return;
700
701 if (listTypeLength(sobj) == 0) {
5ba79bda 702 /* This may only happen after loading very old RDB files. Recent
703 * versions of Redis delete keys of empty lists. */
e2641e09 704 addReply(c,shared.nullbulk);
705 } else {
706 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
c1c9d551 707 robj *touchedkey = c->argv[1];
708
e2641e09 709 if (dobj && checkType(c,dobj,REDIS_LIST)) return;
710 value = listTypePop(sobj,REDIS_TAIL);
c1c9d551 711 /* We saved touched key, and protect it, since rpoplpushHandlePush
712 * may change the client command argument vector. */
713 incrRefCount(touchedkey);
714 rpoplpushHandlePush(c,c,c->argv[2],dobj,value);
e2641e09 715
716 /* listTypePop returns an object with its refcount incremented */
717 decrRefCount(value);
718
719 /* Delete the source list when it is empty */
c1c9d551 720 if (listTypeLength(sobj) == 0) dbDelete(c->db,touchedkey);
721 signalModifiedKey(c->db,touchedkey);
722 decrRefCount(touchedkey);
e2641e09 723 server.dirty++;
eeb34eff 724
725 /* Replicate this as a simple RPOP since the LPUSH side is replicated
726 * by rpoplpushHandlePush() call if needed (it may not be needed
727 * if a client is blocking wait a push against the list). */
728 rewriteClientCommandVector(c,2,
729 resetRefCount(createStringObject("RPOP",4)),
730 c->argv[1]);
e2641e09 731 }
732}
733
734/*-----------------------------------------------------------------------------
735 * Blocking POP operations
736 *----------------------------------------------------------------------------*/
737
738/* Currently Redis blocking operations support is limited to list POP ops,
739 * so the current implementation is not fully generic, but it is also not
740 * completely specific so it will not require a rewrite to support new
741 * kind of blocking operations in the future.
742 *
743 * Still it's important to note that list blocking operations can be already
744 * used as a notification mechanism in order to implement other blocking
745 * operations at application level, so there must be a very strong evidence
746 * of usefulness and generality before new blocking operations are implemented.
747 *
748 * This is how the current blocking POP works, we use BLPOP as example:
749 * - If the user calls BLPOP and the key exists and contains a non empty list
750 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
751 * if there is not to block.
752 * - If instead BLPOP is called and the key does not exists or the list is
753 * empty we need to block. In order to do so we remove the notification for
754 * new data to read in the client socket (so that we'll not serve new
755 * requests if the blocking request is not served). Also we put the client
756 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
757 * blocking for this keys.
758 * - If a PUSH operation against a key with blocked clients waiting is
759 * performed, we serve the first in the list: basically instead to push
760 * the new element inside the list we return it to the (first / oldest)
761 * blocking client, unblock the client, and remove it form the list.
762 *
763 * The above comment and the source code should be enough in order to understand
764 * the implementation and modify / fix it later.
765 */
766
767/* Set a client in blocking mode for the specified key, with the specified
768 * timeout */
ba3b4741 769void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout, robj *target) {
e2641e09 770 dictEntry *de;
771 list *l;
772 int j;
773
e3c51c4b
DJMM
774 c->bpop.keys = zmalloc(sizeof(robj*)*numkeys);
775 c->bpop.count = numkeys;
776 c->bpop.timeout = timeout;
777 c->bpop.target = target;
ba3b4741
DJMM
778
779 if (target != NULL) {
ecf94014 780 incrRefCount(target);
ba3b4741
DJMM
781 }
782
e2641e09 783 for (j = 0; j < numkeys; j++) {
784 /* Add the key in the client structure, to map clients -> keys */
e3c51c4b 785 c->bpop.keys[j] = keys[j];
e2641e09 786 incrRefCount(keys[j]);
787
788 /* And in the other "side", to map keys -> clients */
789 de = dictFind(c->db->blocking_keys,keys[j]);
790 if (de == NULL) {
791 int retval;
792
793 /* For every key we take a list of clients blocked for it */
794 l = listCreate();
795 retval = dictAdd(c->db->blocking_keys,keys[j],l);
796 incrRefCount(keys[j]);
eab0e26e 797 redisAssertWithInfo(c,keys[j],retval == DICT_OK);
e2641e09 798 } else {
c0ba9ebe 799 l = dictGetVal(de);
e2641e09 800 }
801 listAddNodeTail(l,c);
802 }
803 /* Mark the client as a blocked client */
804 c->flags |= REDIS_BLOCKED;
5fa95ad7 805 server.bpop_blocked_clients++;
e2641e09 806}
807
808/* Unblock a client that's waiting in a blocking operation such as BLPOP */
809void unblockClientWaitingData(redisClient *c) {
810 dictEntry *de;
811 list *l;
812 int j;
813
eab0e26e 814 redisAssertWithInfo(c,NULL,c->bpop.keys != NULL);
e2641e09 815 /* The client may wait for multiple keys, so unblock it for every key. */
e3c51c4b 816 for (j = 0; j < c->bpop.count; j++) {
e2641e09 817 /* Remove this client from the list of clients waiting for this key. */
e3c51c4b 818 de = dictFind(c->db->blocking_keys,c->bpop.keys[j]);
eab0e26e 819 redisAssertWithInfo(c,c->bpop.keys[j],de != NULL);
c0ba9ebe 820 l = dictGetVal(de);
e2641e09 821 listDelNode(l,listSearchKey(l,c));
822 /* If the list is empty we need to remove it to avoid wasting memory */
823 if (listLength(l) == 0)
e3c51c4b
DJMM
824 dictDelete(c->db->blocking_keys,c->bpop.keys[j]);
825 decrRefCount(c->bpop.keys[j]);
e2641e09 826 }
ba3b4741 827
e2641e09 828 /* Cleanup the client structure */
e3c51c4b
DJMM
829 zfree(c->bpop.keys);
830 c->bpop.keys = NULL;
c1c9d551 831 if (c->bpop.target) decrRefCount(c->bpop.target);
e3c51c4b 832 c->bpop.target = NULL;
3bcffcbe
PN
833 c->flags &= ~REDIS_BLOCKED;
834 c->flags |= REDIS_UNBLOCKED;
5fa95ad7 835 server.bpop_blocked_clients--;
a4ce7581 836 listAddNodeTail(server.unblocked_clients,c);
e2641e09 837}
838
839/* This should be called from any function PUSHing into lists.
840 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
841 * 'ele' is the element pushed.
842 *
843 * If the function returns 0 there was no client waiting for a list push
844 * against this key.
845 *
846 * If the function returns 1 there was a client waiting for a list push
847 * against this key, the element was passed to this client thus it's not
848 * needed to actually add it to the list and the caller should return asap. */
849int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
850 struct dictEntry *de;
851 redisClient *receiver;
8a88c368
PN
852 int numclients;
853 list *clients;
e2641e09 854 listNode *ln;
8a88c368 855 robj *dstkey, *dstobj;
e2641e09 856
857 de = dictFind(c->db->blocking_keys,key);
858 if (de == NULL) return 0;
c0ba9ebe 859 clients = dictGetVal(de);
8a88c368
PN
860 numclients = listLength(clients);
861
862 /* Try to handle the push as long as there are clients waiting for a push.
863 * Note that "numclients" is used because the list of clients waiting for a
864 * push on "key" is deleted by unblockClient() when empty.
865 *
866 * This loop will have more than 1 iteration when there is a BRPOPLPUSH
867 * that cannot push the target list because it does not contain a list. If
868 * this happens, it simply tries the next client waiting for a push. */
869 while (numclients--) {
870 ln = listFirst(clients);
eab0e26e 871 redisAssertWithInfo(c,key,ln != NULL);
8a88c368
PN
872 receiver = ln->value;
873 dstkey = receiver->bpop.target;
874
c1c9d551 875 /* Protect receiver->bpop.target, that will be freed by
876 * the next unblockClientWaitingData() call. */
877 if (dstkey) incrRefCount(dstkey);
878
8a88c368
PN
879 /* This should remove the first element of the "clients" list. */
880 unblockClientWaitingData(receiver);
8a88c368
PN
881
882 if (dstkey == NULL) {
883 /* BRPOP/BLPOP */
884 addReplyMultiBulkLen(receiver,2);
885 addReplyBulk(receiver,key);
886 addReplyBulk(receiver,ele);
c1c9d551 887 return 1; /* Serve just the first client as in B[RL]POP semantics */
8a88c368 888 } else {
554a5dd2 889 /* BRPOPLPUSH, note that receiver->db is always equal to c->db. */
8a88c368 890 dstobj = lookupKeyWrite(receiver->db,dstkey);
c1c9d551 891 if (!(dstobj && checkType(receiver,dstobj,REDIS_LIST))) {
892 rpoplpushHandlePush(c,receiver,dstkey,dstobj,ele);
8a88c368
PN
893 decrRefCount(dstkey);
894 return 1;
895 }
c1c9d551 896 decrRefCount(dstkey);
8a88c368 897 }
b2a7fd0c 898 }
e2641e09 899
8a88c368 900 return 0;
e2641e09 901}
902
c8a0070a
PN
903int getTimeoutFromObjectOrReply(redisClient *c, robj *object, time_t *timeout) {
904 long tval;
59bd44d1 905
c8a0070a
PN
906 if (getLongFromObjectOrReply(c,object,&tval,
907 "timeout is not an integer or out of range") != REDIS_OK)
59bd44d1 908 return REDIS_ERR;
59bd44d1 909
c8a0070a
PN
910 if (tval < 0) {
911 addReplyError(c,"timeout is negative");
59bd44d1
DJMM
912 return REDIS_ERR;
913 }
914
d1949054 915 if (tval > 0) tval += server.unixtime;
c8a0070a 916 *timeout = tval;
59bd44d1
DJMM
917
918 return REDIS_OK;
e2641e09 919}
920
921/* Blocking RPOP/LPOP */
922void blockingPopGenericCommand(redisClient *c, int where) {
923 robj *o;
924 time_t timeout;
925 int j;
926
c8a0070a 927 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout) != REDIS_OK)
94364d53 928 return;
94364d53 929
e2641e09 930 for (j = 1; j < c->argc-1; j++) {
931 o = lookupKeyWrite(c->db,c->argv[j]);
932 if (o != NULL) {
933 if (o->type != REDIS_LIST) {
934 addReply(c,shared.wrongtypeerr);
935 return;
936 } else {
937 if (listTypeLength(o) != 0) {
c1db214e 938 /* Non empty list, this is like a non normal [LR]POP. */
939 robj *value = listTypePop(o,where);
940 redisAssert(value != NULL);
b2a7fd0c 941
c1db214e 942 addReplyMultiBulkLen(c,2);
943 addReplyBulk(c,c->argv[j]);
944 addReplyBulk(c,value);
945 decrRefCount(value);
946 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[j]);
947 signalModifiedKey(c->db,c->argv[j]);
948 server.dirty++;
949
950 /* Replicate it as an [LR]POP instead of B[LR]POP. */
951 rewriteClientCommandVector(c,2,
952 (where == REDIS_HEAD) ? shared.lpop : shared.rpop,
953 c->argv[j]);
e2641e09 954 return;
955 }
956 }
957 }
958 }
94364d53 959
fb92ecec 960 /* If we are inside a MULTI/EXEC and the list is empty the only thing
961 * we can do is treating it as a timeout (even with timeout 0). */
962 if (c->flags & REDIS_MULTI) {
963 addReply(c,shared.nullmultibulk);
964 return;
965 }
966
e2641e09 967 /* If the list is empty or the key does not exists we must block */
ba3b4741 968 blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
e2641e09 969}
970
971void blpopCommand(redisClient *c) {
972 blockingPopGenericCommand(c,REDIS_HEAD);
973}
974
975void brpopCommand(redisClient *c) {
976 blockingPopGenericCommand(c,REDIS_TAIL);
977}
b2a7fd0c
DJMM
978
979void brpoplpushCommand(redisClient *c) {
ba3b4741 980 time_t timeout;
b2a7fd0c 981
c8a0070a 982 if (getTimeoutFromObjectOrReply(c,c->argv[3],&timeout) != REDIS_OK)
ba3b4741 983 return;
ba3b4741
DJMM
984
985 robj *key = lookupKeyWrite(c->db, c->argv[1]);
986
ba3b4741 987 if (key == NULL) {
ba3b4741 988 if (c->flags & REDIS_MULTI) {
7c25a43a
DJMM
989
990 /* Blocking against an empty list in a multi state
991 * returns immediately. */
d5870d7a 992 addReply(c, shared.nullbulk);
ba3b4741 993 } else {
7c25a43a 994 /* The list is empty and the client blocks. */
ba3b4741
DJMM
995 blockForKeys(c, c->argv + 1, 1, timeout, c->argv[2]);
996 }
ba3b4741 997 } else {
7c25a43a
DJMM
998 if (key->type != REDIS_LIST) {
999 addReply(c, shared.wrongtypeerr);
1000 } else {
1001
1002 /* The list exists and has elements, so
1003 * the regular rpoplpushCommand is executed. */
eab0e26e 1004 redisAssertWithInfo(c,key,listTypeLength(key) > 0);
7c25a43a
DJMM
1005 rpoplpushCommand(c);
1006 }
ba3b4741 1007 }
b2a7fd0c 1008}