]> git.saurik.com Git - redis.git/blob - src/t_list.c
Move creating socket/bind+listen on socket to separate functions
[redis.git] / src / t_list.c
1 #include "redis.h"
2
3 /*-----------------------------------------------------------------------------
4 * List API
5 *----------------------------------------------------------------------------*/
6
7 /* Check the argument length to see if it requires us to convert the ziplist
8 * to a real list. Only check raw-encoded objects because integer encoded
9 * objects are never too long. */
10 void listTypeTryConversion(robj *subject, robj *value) {
11 if (subject->encoding != REDIS_ENCODING_ZIPLIST) return;
12 if (value->encoding == REDIS_ENCODING_RAW &&
13 sdslen(value->ptr) > server.list_max_ziplist_value)
14 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
15 }
16
17 void listTypePush(robj *subject, robj *value, int where) {
18 /* Check if we need to convert the ziplist */
19 listTypeTryConversion(subject,value);
20 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
21 ziplistLen(subject->ptr) >= server.list_max_ziplist_entries)
22 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
23
24 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
25 int pos = (where == REDIS_HEAD) ? ZIPLIST_HEAD : ZIPLIST_TAIL;
26 value = getDecodedObject(value);
27 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),pos);
28 decrRefCount(value);
29 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
30 if (where == REDIS_HEAD) {
31 listAddNodeHead(subject->ptr,value);
32 } else {
33 listAddNodeTail(subject->ptr,value);
34 }
35 incrRefCount(value);
36 } else {
37 redisPanic("Unknown list encoding");
38 }
39 }
40
41 robj *listTypePop(robj *subject, int where) {
42 robj *value = NULL;
43 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
44 unsigned char *p;
45 unsigned char *vstr;
46 unsigned int vlen;
47 long long vlong;
48 int pos = (where == REDIS_HEAD) ? 0 : -1;
49 p = ziplistIndex(subject->ptr,pos);
50 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
51 if (vstr) {
52 value = createStringObject((char*)vstr,vlen);
53 } else {
54 value = createStringObjectFromLongLong(vlong);
55 }
56 /* We only need to delete an element when it exists */
57 subject->ptr = ziplistDelete(subject->ptr,&p);
58 }
59 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
60 list *list = subject->ptr;
61 listNode *ln;
62 if (where == REDIS_HEAD) {
63 ln = listFirst(list);
64 } else {
65 ln = listLast(list);
66 }
67 if (ln != NULL) {
68 value = listNodeValue(ln);
69 incrRefCount(value);
70 listDelNode(list,ln);
71 }
72 } else {
73 redisPanic("Unknown list encoding");
74 }
75 return value;
76 }
77
78 unsigned long listTypeLength(robj *subject) {
79 if (subject->encoding == REDIS_ENCODING_ZIPLIST) {
80 return ziplistLen(subject->ptr);
81 } else if (subject->encoding == REDIS_ENCODING_LINKEDLIST) {
82 return listLength((list*)subject->ptr);
83 } else {
84 redisPanic("Unknown list encoding");
85 }
86 }
87
88 /* Initialize an iterator at the specified index. */
89 listTypeIterator *listTypeInitIterator(robj *subject, int index, unsigned char direction) {
90 listTypeIterator *li = zmalloc(sizeof(listTypeIterator));
91 li->subject = subject;
92 li->encoding = subject->encoding;
93 li->direction = direction;
94 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
95 li->zi = ziplistIndex(subject->ptr,index);
96 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
97 li->ln = listIndex(subject->ptr,index);
98 } else {
99 redisPanic("Unknown list encoding");
100 }
101 return li;
102 }
103
104 /* Clean up the iterator. */
105 void listTypeReleaseIterator(listTypeIterator *li) {
106 zfree(li);
107 }
108
109 /* Stores pointer to current the entry in the provided entry structure
110 * and advances the position of the iterator. Returns 1 when the current
111 * entry is in fact an entry, 0 otherwise. */
112 int listTypeNext(listTypeIterator *li, listTypeEntry *entry) {
113 /* Protect from converting when iterating */
114 redisAssert(li->subject->encoding == li->encoding);
115
116 entry->li = li;
117 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
118 entry->zi = li->zi;
119 if (entry->zi != NULL) {
120 if (li->direction == REDIS_TAIL)
121 li->zi = ziplistNext(li->subject->ptr,li->zi);
122 else
123 li->zi = ziplistPrev(li->subject->ptr,li->zi);
124 return 1;
125 }
126 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
127 entry->ln = li->ln;
128 if (entry->ln != NULL) {
129 if (li->direction == REDIS_TAIL)
130 li->ln = li->ln->next;
131 else
132 li->ln = li->ln->prev;
133 return 1;
134 }
135 } else {
136 redisPanic("Unknown list encoding");
137 }
138 return 0;
139 }
140
141 /* Return entry or NULL at the current position of the iterator. */
142 robj *listTypeGet(listTypeEntry *entry) {
143 listTypeIterator *li = entry->li;
144 robj *value = NULL;
145 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
146 unsigned char *vstr;
147 unsigned int vlen;
148 long long vlong;
149 redisAssert(entry->zi != NULL);
150 if (ziplistGet(entry->zi,&vstr,&vlen,&vlong)) {
151 if (vstr) {
152 value = createStringObject((char*)vstr,vlen);
153 } else {
154 value = createStringObjectFromLongLong(vlong);
155 }
156 }
157 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
158 redisAssert(entry->ln != NULL);
159 value = listNodeValue(entry->ln);
160 incrRefCount(value);
161 } else {
162 redisPanic("Unknown list encoding");
163 }
164 return value;
165 }
166
167 void listTypeInsert(listTypeEntry *entry, robj *value, int where) {
168 robj *subject = entry->li->subject;
169 if (entry->li->encoding == REDIS_ENCODING_ZIPLIST) {
170 value = getDecodedObject(value);
171 if (where == REDIS_TAIL) {
172 unsigned char *next = ziplistNext(subject->ptr,entry->zi);
173
174 /* When we insert after the current element, but the current element
175 * is the tail of the list, we need to do a push. */
176 if (next == NULL) {
177 subject->ptr = ziplistPush(subject->ptr,value->ptr,sdslen(value->ptr),REDIS_TAIL);
178 } else {
179 subject->ptr = ziplistInsert(subject->ptr,next,value->ptr,sdslen(value->ptr));
180 }
181 } else {
182 subject->ptr = ziplistInsert(subject->ptr,entry->zi,value->ptr,sdslen(value->ptr));
183 }
184 decrRefCount(value);
185 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
186 if (where == REDIS_TAIL) {
187 listInsertNode(subject->ptr,entry->ln,value,AL_START_TAIL);
188 } else {
189 listInsertNode(subject->ptr,entry->ln,value,AL_START_HEAD);
190 }
191 incrRefCount(value);
192 } else {
193 redisPanic("Unknown list encoding");
194 }
195 }
196
197 /* Compare the given object with the entry at the current position. */
198 int listTypeEqual(listTypeEntry *entry, robj *o) {
199 listTypeIterator *li = entry->li;
200 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
201 redisAssert(o->encoding == REDIS_ENCODING_RAW);
202 return ziplistCompare(entry->zi,o->ptr,sdslen(o->ptr));
203 } else if (li->encoding == REDIS_ENCODING_LINKEDLIST) {
204 return equalStringObjects(o,listNodeValue(entry->ln));
205 } else {
206 redisPanic("Unknown list encoding");
207 }
208 }
209
210 /* Delete the element pointed to. */
211 void listTypeDelete(listTypeEntry *entry) {
212 listTypeIterator *li = entry->li;
213 if (li->encoding == REDIS_ENCODING_ZIPLIST) {
214 unsigned char *p = entry->zi;
215 li->subject->ptr = ziplistDelete(li->subject->ptr,&p);
216
217 /* Update position of the iterator depending on the direction */
218 if (li->direction == REDIS_TAIL)
219 li->zi = p;
220 else
221 li->zi = ziplistPrev(li->subject->ptr,p);
222 } else if (entry->li->encoding == REDIS_ENCODING_LINKEDLIST) {
223 listNode *next;
224 if (li->direction == REDIS_TAIL)
225 next = entry->ln->next;
226 else
227 next = entry->ln->prev;
228 listDelNode(li->subject->ptr,entry->ln);
229 li->ln = next;
230 } else {
231 redisPanic("Unknown list encoding");
232 }
233 }
234
235 void listTypeConvert(robj *subject, int enc) {
236 listTypeIterator *li;
237 listTypeEntry entry;
238 redisAssert(subject->type == REDIS_LIST);
239
240 if (enc == REDIS_ENCODING_LINKEDLIST) {
241 list *l = listCreate();
242 listSetFreeMethod(l,decrRefCount);
243
244 /* listTypeGet returns a robj with incremented refcount */
245 li = listTypeInitIterator(subject,0,REDIS_TAIL);
246 while (listTypeNext(li,&entry)) listAddNodeTail(l,listTypeGet(&entry));
247 listTypeReleaseIterator(li);
248
249 subject->encoding = REDIS_ENCODING_LINKEDLIST;
250 zfree(subject->ptr);
251 subject->ptr = l;
252 } else {
253 redisPanic("Unsupported list conversion");
254 }
255 }
256
257 /*-----------------------------------------------------------------------------
258 * List Commands
259 *----------------------------------------------------------------------------*/
260
261 void pushGenericCommand(redisClient *c, int where) {
262 robj *lobj = lookupKeyWrite(c->db,c->argv[1]);
263 if (lobj == NULL) {
264 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
265 addReply(c,shared.cone);
266 return;
267 }
268 lobj = createZiplistObject();
269 dbAdd(c->db,c->argv[1],lobj);
270 } else {
271 if (lobj->type != REDIS_LIST) {
272 addReply(c,shared.wrongtypeerr);
273 return;
274 }
275 if (handleClientsWaitingListPush(c,c->argv[1],c->argv[2])) {
276 touchWatchedKey(c->db,c->argv[1]);
277 addReply(c,shared.cone);
278 return;
279 }
280 }
281 listTypePush(lobj,c->argv[2],where);
282 addReplyLongLong(c,listTypeLength(lobj));
283 touchWatchedKey(c->db,c->argv[1]);
284 server.dirty++;
285 }
286
287 void lpushCommand(redisClient *c) {
288 pushGenericCommand(c,REDIS_HEAD);
289 }
290
291 void rpushCommand(redisClient *c) {
292 pushGenericCommand(c,REDIS_TAIL);
293 }
294
295 void pushxGenericCommand(redisClient *c, robj *refval, robj *val, int where) {
296 robj *subject;
297 listTypeIterator *iter;
298 listTypeEntry entry;
299 int inserted = 0;
300
301 if ((subject = lookupKeyReadOrReply(c,c->argv[1],shared.czero)) == NULL ||
302 checkType(c,subject,REDIS_LIST)) return;
303
304 if (refval != NULL) {
305 /* Note: we expect refval to be string-encoded because it is *not* the
306 * last argument of the multi-bulk LINSERT. */
307 redisAssert(refval->encoding == REDIS_ENCODING_RAW);
308
309 /* We're not sure if this value can be inserted yet, but we cannot
310 * convert the list inside the iterator. We don't want to loop over
311 * the list twice (once to see if the value can be inserted and once
312 * to do the actual insert), so we assume this value can be inserted
313 * and convert the ziplist to a regular list if necessary. */
314 listTypeTryConversion(subject,val);
315
316 /* Seek refval from head to tail */
317 iter = listTypeInitIterator(subject,0,REDIS_TAIL);
318 while (listTypeNext(iter,&entry)) {
319 if (listTypeEqual(&entry,refval)) {
320 listTypeInsert(&entry,val,where);
321 inserted = 1;
322 break;
323 }
324 }
325 listTypeReleaseIterator(iter);
326
327 if (inserted) {
328 /* Check if the length exceeds the ziplist length threshold. */
329 if (subject->encoding == REDIS_ENCODING_ZIPLIST &&
330 ziplistLen(subject->ptr) > server.list_max_ziplist_entries)
331 listTypeConvert(subject,REDIS_ENCODING_LINKEDLIST);
332 touchWatchedKey(c->db,c->argv[1]);
333 server.dirty++;
334 } else {
335 /* Notify client of a failed insert */
336 addReply(c,shared.cnegone);
337 return;
338 }
339 } else {
340 listTypePush(subject,val,where);
341 touchWatchedKey(c->db,c->argv[1]);
342 server.dirty++;
343 }
344
345 addReplyUlong(c,listTypeLength(subject));
346 }
347
348 void lpushxCommand(redisClient *c) {
349 pushxGenericCommand(c,NULL,c->argv[2],REDIS_HEAD);
350 }
351
352 void rpushxCommand(redisClient *c) {
353 pushxGenericCommand(c,NULL,c->argv[2],REDIS_TAIL);
354 }
355
356 void linsertCommand(redisClient *c) {
357 if (strcasecmp(c->argv[2]->ptr,"after") == 0) {
358 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_TAIL);
359 } else if (strcasecmp(c->argv[2]->ptr,"before") == 0) {
360 pushxGenericCommand(c,c->argv[3],c->argv[4],REDIS_HEAD);
361 } else {
362 addReply(c,shared.syntaxerr);
363 }
364 }
365
366 void llenCommand(redisClient *c) {
367 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.czero);
368 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
369 addReplyUlong(c,listTypeLength(o));
370 }
371
372 void lindexCommand(redisClient *c) {
373 robj *o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk);
374 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
375 int index = atoi(c->argv[2]->ptr);
376 robj *value = NULL;
377
378 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
379 unsigned char *p;
380 unsigned char *vstr;
381 unsigned int vlen;
382 long long vlong;
383 p = ziplistIndex(o->ptr,index);
384 if (ziplistGet(p,&vstr,&vlen,&vlong)) {
385 if (vstr) {
386 value = createStringObject((char*)vstr,vlen);
387 } else {
388 value = createStringObjectFromLongLong(vlong);
389 }
390 addReplyBulk(c,value);
391 decrRefCount(value);
392 } else {
393 addReply(c,shared.nullbulk);
394 }
395 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
396 listNode *ln = listIndex(o->ptr,index);
397 if (ln != NULL) {
398 value = listNodeValue(ln);
399 addReplyBulk(c,value);
400 } else {
401 addReply(c,shared.nullbulk);
402 }
403 } else {
404 redisPanic("Unknown list encoding");
405 }
406 }
407
408 void lsetCommand(redisClient *c) {
409 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr);
410 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
411 int index = atoi(c->argv[2]->ptr);
412 robj *value = c->argv[3];
413
414 listTypeTryConversion(o,value);
415 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
416 unsigned char *p, *zl = o->ptr;
417 p = ziplistIndex(zl,index);
418 if (p == NULL) {
419 addReply(c,shared.outofrangeerr);
420 } else {
421 o->ptr = ziplistDelete(o->ptr,&p);
422 value = getDecodedObject(value);
423 o->ptr = ziplistInsert(o->ptr,p,value->ptr,sdslen(value->ptr));
424 decrRefCount(value);
425 addReply(c,shared.ok);
426 touchWatchedKey(c->db,c->argv[1]);
427 server.dirty++;
428 }
429 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
430 listNode *ln = listIndex(o->ptr,index);
431 if (ln == NULL) {
432 addReply(c,shared.outofrangeerr);
433 } else {
434 decrRefCount((robj*)listNodeValue(ln));
435 listNodeValue(ln) = value;
436 incrRefCount(value);
437 addReply(c,shared.ok);
438 touchWatchedKey(c->db,c->argv[1]);
439 server.dirty++;
440 }
441 } else {
442 redisPanic("Unknown list encoding");
443 }
444 }
445
446 void popGenericCommand(redisClient *c, int where) {
447 robj *o = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk);
448 if (o == NULL || checkType(c,o,REDIS_LIST)) return;
449
450 robj *value = listTypePop(o,where);
451 if (value == NULL) {
452 addReply(c,shared.nullbulk);
453 } else {
454 addReplyBulk(c,value);
455 decrRefCount(value);
456 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
457 touchWatchedKey(c->db,c->argv[1]);
458 server.dirty++;
459 }
460 }
461
462 void lpopCommand(redisClient *c) {
463 popGenericCommand(c,REDIS_HEAD);
464 }
465
466 void rpopCommand(redisClient *c) {
467 popGenericCommand(c,REDIS_TAIL);
468 }
469
470 void lrangeCommand(redisClient *c) {
471 robj *o, *value;
472 int start = atoi(c->argv[2]->ptr);
473 int end = atoi(c->argv[3]->ptr);
474 int llen;
475 int rangelen, j;
476 listTypeEntry entry;
477
478 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
479 || checkType(c,o,REDIS_LIST)) return;
480 llen = listTypeLength(o);
481
482 /* convert negative indexes */
483 if (start < 0) start = llen+start;
484 if (end < 0) end = llen+end;
485 if (start < 0) start = 0;
486
487 /* Invariant: start >= 0, so this test will be true when end < 0.
488 * The range is empty when start > end or start >= length. */
489 if (start > end || start >= llen) {
490 addReply(c,shared.emptymultibulk);
491 return;
492 }
493 if (end >= llen) end = llen-1;
494 rangelen = (end-start)+1;
495
496 /* Return the result in form of a multi-bulk reply */
497 addReplySds(c,sdscatprintf(sdsempty(),"*%d\r\n",rangelen));
498 listTypeIterator *li = listTypeInitIterator(o,start,REDIS_TAIL);
499 for (j = 0; j < rangelen; j++) {
500 redisAssert(listTypeNext(li,&entry));
501 value = listTypeGet(&entry);
502 addReplyBulk(c,value);
503 decrRefCount(value);
504 }
505 listTypeReleaseIterator(li);
506 }
507
508 void ltrimCommand(redisClient *c) {
509 robj *o;
510 int start = atoi(c->argv[2]->ptr);
511 int end = atoi(c->argv[3]->ptr);
512 int llen;
513 int j, ltrim, rtrim;
514 list *list;
515 listNode *ln;
516
517 if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.ok)) == NULL ||
518 checkType(c,o,REDIS_LIST)) return;
519 llen = listTypeLength(o);
520
521 /* convert negative indexes */
522 if (start < 0) start = llen+start;
523 if (end < 0) end = llen+end;
524 if (start < 0) start = 0;
525
526 /* Invariant: start >= 0, so this test will be true when end < 0.
527 * The range is empty when start > end or start >= length. */
528 if (start > end || start >= llen) {
529 /* Out of range start or start > end result in empty list */
530 ltrim = llen;
531 rtrim = 0;
532 } else {
533 if (end >= llen) end = llen-1;
534 ltrim = start;
535 rtrim = llen-end-1;
536 }
537
538 /* Remove list elements to perform the trim */
539 if (o->encoding == REDIS_ENCODING_ZIPLIST) {
540 o->ptr = ziplistDeleteRange(o->ptr,0,ltrim);
541 o->ptr = ziplistDeleteRange(o->ptr,-rtrim,rtrim);
542 } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
543 list = o->ptr;
544 for (j = 0; j < ltrim; j++) {
545 ln = listFirst(list);
546 listDelNode(list,ln);
547 }
548 for (j = 0; j < rtrim; j++) {
549 ln = listLast(list);
550 listDelNode(list,ln);
551 }
552 } else {
553 redisPanic("Unknown list encoding");
554 }
555 if (listTypeLength(o) == 0) dbDelete(c->db,c->argv[1]);
556 touchWatchedKey(c->db,c->argv[1]);
557 server.dirty++;
558 addReply(c,shared.ok);
559 }
560
561 void lremCommand(redisClient *c) {
562 robj *subject, *obj = c->argv[3];
563 int toremove = atoi(c->argv[2]->ptr);
564 int removed = 0;
565 listTypeEntry entry;
566
567 subject = lookupKeyWriteOrReply(c,c->argv[1],shared.czero);
568 if (subject == NULL || checkType(c,subject,REDIS_LIST)) return;
569
570 /* Make sure obj is raw when we're dealing with a ziplist */
571 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
572 obj = getDecodedObject(obj);
573
574 listTypeIterator *li;
575 if (toremove < 0) {
576 toremove = -toremove;
577 li = listTypeInitIterator(subject,-1,REDIS_HEAD);
578 } else {
579 li = listTypeInitIterator(subject,0,REDIS_TAIL);
580 }
581
582 while (listTypeNext(li,&entry)) {
583 if (listTypeEqual(&entry,obj)) {
584 listTypeDelete(&entry);
585 server.dirty++;
586 removed++;
587 if (toremove && removed == toremove) break;
588 }
589 }
590 listTypeReleaseIterator(li);
591
592 /* Clean up raw encoded object */
593 if (subject->encoding == REDIS_ENCODING_ZIPLIST)
594 decrRefCount(obj);
595
596 if (listTypeLength(subject) == 0) dbDelete(c->db,c->argv[1]);
597 addReplySds(c,sdscatprintf(sdsempty(),":%d\r\n",removed));
598 if (removed) touchWatchedKey(c->db,c->argv[1]);
599 }
600
601 /* This is the semantic of this command:
602 * RPOPLPUSH srclist dstlist:
603 * IF LLEN(srclist) > 0
604 * element = RPOP srclist
605 * LPUSH dstlist element
606 * RETURN element
607 * ELSE
608 * RETURN nil
609 * END
610 * END
611 *
612 * The idea is to be able to get an element from a list in a reliable way
613 * since the element is not just returned but pushed against another list
614 * as well. This command was originally proposed by Ezra Zygmuntowicz.
615 */
616 void rpoplpushcommand(redisClient *c) {
617 robj *sobj, *value;
618 if ((sobj = lookupKeyWriteOrReply(c,c->argv[1],shared.nullbulk)) == NULL ||
619 checkType(c,sobj,REDIS_LIST)) return;
620
621 if (listTypeLength(sobj) == 0) {
622 addReply(c,shared.nullbulk);
623 } else {
624 robj *dobj = lookupKeyWrite(c->db,c->argv[2]);
625 if (dobj && checkType(c,dobj,REDIS_LIST)) return;
626 value = listTypePop(sobj,REDIS_TAIL);
627
628 /* Add the element to the target list (unless it's directly
629 * passed to some BLPOP-ing client */
630 if (!handleClientsWaitingListPush(c,c->argv[2],value)) {
631 /* Create the list if the key does not exist */
632 if (!dobj) {
633 dobj = createZiplistObject();
634 dbAdd(c->db,c->argv[2],dobj);
635 }
636 listTypePush(dobj,value,REDIS_HEAD);
637 }
638
639 /* Send the element to the client as reply as well */
640 addReplyBulk(c,value);
641
642 /* listTypePop returns an object with its refcount incremented */
643 decrRefCount(value);
644
645 /* Delete the source list when it is empty */
646 if (listTypeLength(sobj) == 0) dbDelete(c->db,c->argv[1]);
647 touchWatchedKey(c->db,c->argv[1]);
648 server.dirty++;
649 }
650 }
651
652 /*-----------------------------------------------------------------------------
653 * Blocking POP operations
654 *----------------------------------------------------------------------------*/
655
656 /* Currently Redis blocking operations support is limited to list POP ops,
657 * so the current implementation is not fully generic, but it is also not
658 * completely specific so it will not require a rewrite to support new
659 * kind of blocking operations in the future.
660 *
661 * Still it's important to note that list blocking operations can be already
662 * used as a notification mechanism in order to implement other blocking
663 * operations at application level, so there must be a very strong evidence
664 * of usefulness and generality before new blocking operations are implemented.
665 *
666 * This is how the current blocking POP works, we use BLPOP as example:
667 * - If the user calls BLPOP and the key exists and contains a non empty list
668 * then LPOP is called instead. So BLPOP is semantically the same as LPOP
669 * if there is not to block.
670 * - If instead BLPOP is called and the key does not exists or the list is
671 * empty we need to block. In order to do so we remove the notification for
672 * new data to read in the client socket (so that we'll not serve new
673 * requests if the blocking request is not served). Also we put the client
674 * in a dictionary (db->blocking_keys) mapping keys to a list of clients
675 * blocking for this keys.
676 * - If a PUSH operation against a key with blocked clients waiting is
677 * performed, we serve the first in the list: basically instead to push
678 * the new element inside the list we return it to the (first / oldest)
679 * blocking client, unblock the client, and remove it form the list.
680 *
681 * The above comment and the source code should be enough in order to understand
682 * the implementation and modify / fix it later.
683 */
684
685 /* Set a client in blocking mode for the specified key, with the specified
686 * timeout */
687 void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
688 dictEntry *de;
689 list *l;
690 int j;
691
692 c->blocking_keys = zmalloc(sizeof(robj*)*numkeys);
693 c->blocking_keys_num = numkeys;
694 c->blockingto = timeout;
695 for (j = 0; j < numkeys; j++) {
696 /* Add the key in the client structure, to map clients -> keys */
697 c->blocking_keys[j] = keys[j];
698 incrRefCount(keys[j]);
699
700 /* And in the other "side", to map keys -> clients */
701 de = dictFind(c->db->blocking_keys,keys[j]);
702 if (de == NULL) {
703 int retval;
704
705 /* For every key we take a list of clients blocked for it */
706 l = listCreate();
707 retval = dictAdd(c->db->blocking_keys,keys[j],l);
708 incrRefCount(keys[j]);
709 redisAssert(retval == DICT_OK);
710 } else {
711 l = dictGetEntryVal(de);
712 }
713 listAddNodeTail(l,c);
714 }
715 /* Mark the client as a blocked client */
716 c->flags |= REDIS_BLOCKED;
717 server.blpop_blocked_clients++;
718 }
719
720 /* Unblock a client that's waiting in a blocking operation such as BLPOP */
721 void unblockClientWaitingData(redisClient *c) {
722 dictEntry *de;
723 list *l;
724 int j;
725
726 redisAssert(c->blocking_keys != NULL);
727 /* The client may wait for multiple keys, so unblock it for every key. */
728 for (j = 0; j < c->blocking_keys_num; j++) {
729 /* Remove this client from the list of clients waiting for this key. */
730 de = dictFind(c->db->blocking_keys,c->blocking_keys[j]);
731 redisAssert(de != NULL);
732 l = dictGetEntryVal(de);
733 listDelNode(l,listSearchKey(l,c));
734 /* If the list is empty we need to remove it to avoid wasting memory */
735 if (listLength(l) == 0)
736 dictDelete(c->db->blocking_keys,c->blocking_keys[j]);
737 decrRefCount(c->blocking_keys[j]);
738 }
739 /* Cleanup the client structure */
740 zfree(c->blocking_keys);
741 c->blocking_keys = NULL;
742 c->flags &= (~REDIS_BLOCKED);
743 server.blpop_blocked_clients--;
744 /* We want to process data if there is some command waiting
745 * in the input buffer. Note that this is safe even if
746 * unblockClientWaitingData() gets called from freeClient() because
747 * freeClient() will be smart enough to call this function
748 * *after* c->querybuf was set to NULL. */
749 if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
750 }
751
752 /* This should be called from any function PUSHing into lists.
753 * 'c' is the "pushing client", 'key' is the key it is pushing data against,
754 * 'ele' is the element pushed.
755 *
756 * If the function returns 0 there was no client waiting for a list push
757 * against this key.
758 *
759 * If the function returns 1 there was a client waiting for a list push
760 * against this key, the element was passed to this client thus it's not
761 * needed to actually add it to the list and the caller should return asap. */
762 int handleClientsWaitingListPush(redisClient *c, robj *key, robj *ele) {
763 struct dictEntry *de;
764 redisClient *receiver;
765 list *l;
766 listNode *ln;
767
768 de = dictFind(c->db->blocking_keys,key);
769 if (de == NULL) return 0;
770 l = dictGetEntryVal(de);
771 ln = listFirst(l);
772 redisAssert(ln != NULL);
773 receiver = ln->value;
774
775 addReplySds(receiver,sdsnew("*2\r\n"));
776 addReplyBulk(receiver,key);
777 addReplyBulk(receiver,ele);
778 unblockClientWaitingData(receiver);
779 return 1;
780 }
781
782 /* Blocking RPOP/LPOP */
783 void blockingPopGenericCommand(redisClient *c, int where) {
784 robj *o;
785 time_t timeout;
786 int j;
787
788 for (j = 1; j < c->argc-1; j++) {
789 o = lookupKeyWrite(c->db,c->argv[j]);
790 if (o != NULL) {
791 if (o->type != REDIS_LIST) {
792 addReply(c,shared.wrongtypeerr);
793 return;
794 } else {
795 if (listTypeLength(o) != 0) {
796 /* If the list contains elements fall back to the usual
797 * non-blocking POP operation */
798 robj *argv[2], **orig_argv;
799 int orig_argc;
800
801 /* We need to alter the command arguments before to call
802 * popGenericCommand() as the command takes a single key. */
803 orig_argv = c->argv;
804 orig_argc = c->argc;
805 argv[1] = c->argv[j];
806 c->argv = argv;
807 c->argc = 2;
808
809 /* Also the return value is different, we need to output
810 * the multi bulk reply header and the key name. The
811 * "real" command will add the last element (the value)
812 * for us. If this souds like an hack to you it's just
813 * because it is... */
814 addReplySds(c,sdsnew("*2\r\n"));
815 addReplyBulk(c,argv[1]);
816 popGenericCommand(c,where);
817
818 /* Fix the client structure with the original stuff */
819 c->argv = orig_argv;
820 c->argc = orig_argc;
821 return;
822 }
823 }
824 }
825 }
826 /* If the list is empty or the key does not exists we must block */
827 timeout = strtol(c->argv[c->argc-1]->ptr,NULL,10);
828 if (timeout > 0) timeout += time(NULL);
829 blockForKeys(c,c->argv+1,c->argc-2,timeout);
830 }
831
832 void blpopCommand(redisClient *c) {
833 blockingPopGenericCommand(c,REDIS_HEAD);
834 }
835
836 void brpopCommand(redisClient *c) {
837 blockingPopGenericCommand(c,REDIS_TAIL);
838 }