]> git.saurik.com Git - redis.git/blobdiff - redis.tcl
store the hash iterator on the heap instead of the stack
[redis.git] / redis.tcl
index ef210cbdb87a657c04180fe863b94984ab8a1571..c76166ef0df009d0436970d3cd4475eef34c1197 100644 (file)
--- a/redis.tcl
+++ b/redis.tcl
@@ -9,12 +9,31 @@
 # $r lpush mylist bar
 # $r lrange mylist 0 -1
 # $r close
 # $r lpush mylist bar
 # $r lrange mylist 0 -1
 # $r close
+#
+# Non blocking usage example:
+#
+# proc handlePong {r type reply} {
+#     puts "PONG $type '$reply'"
+#     if {$reply ne "PONG"} {
+#         $r ping [list handlePong]
+#     }
+# }
+# 
+# set r [redis]
+# $r blocking 0
+# $r get fo [list handlePong]
+#
+# vwait forever
 
 package provide redis 0.1
 
 namespace eval redis {}
 set ::redis::id 0
 array set ::redis::fd {}
 
 package provide redis 0.1
 
 namespace eval redis {}
 set ::redis::id 0
 array set ::redis::fd {}
+array set ::redis::blocking {}
+array set ::redis::callback {}
+array set ::redis::state {} ;# State in non-blocking reply reading
+array set ::redis::statestack {} ;# Stack of states, for nested mbulks
 array set ::redis::bulkarg {}
 array set ::redis::multibulkarg {}
 
 array set ::redis::bulkarg {}
 array set ::redis::multibulkarg {}
 
@@ -27,7 +46,7 @@ foreach redis_bulk_cmd {
 
 # Flag commands requiring last argument as a bulk write operation
 foreach redis_multibulk_cmd {
 
 # Flag commands requiring last argument as a bulk write operation
 foreach redis_multibulk_cmd {
-    mset msetnx hset
+    mset msetnx hset hsetnx hmset hmget
 } {
     set ::redis::multibulkarg($redis_multibulk_cmd) {}
 }
 } {
     set ::redis::multibulkarg($redis_multibulk_cmd) {}
 }
@@ -40,11 +59,21 @@ proc redis {{server 127.0.0.1} {port 6379}} {
     fconfigure $fd -translation binary
     set id [incr ::redis::id]
     set ::redis::fd($id) $fd
     fconfigure $fd -translation binary
     set id [incr ::redis::id]
     set ::redis::fd($id) $fd
+    set ::redis::blocking($id) 1
+    ::redis::redis_reset_state $id
     interp alias {} ::redis::redisHandle$id {} ::redis::__dispatch__ $id
 }
 
 proc ::redis::__dispatch__ {id method args} {
     set fd $::redis::fd($id)
     interp alias {} ::redis::redisHandle$id {} ::redis::__dispatch__ $id
 }
 
 proc ::redis::__dispatch__ {id method args} {
     set fd $::redis::fd($id)
+    set blocking $::redis::blocking($id)
+    if {$blocking == 0} {
+        if {[llength $args] == 0} {
+            error "Please provide a callback in non-blocking mode"
+        }
+        set callback [lindex $args end]
+        set args [lrange $args 0 end-1]
+    }
     if {[info command ::redis::__method__$method] eq {}} {
         if {[info exists ::redis::bulkarg($method)]} {
             set cmd "$method "
     if {[info command ::redis::__method__$method] eq {}} {
         if {[info exists ::redis::bulkarg($method)]} {
             set cmd "$method "
@@ -65,15 +94,32 @@ proc ::redis::__dispatch__ {id method args} {
             append cmd [join $args]
             ::redis::redis_writenl $fd $cmd
         }
             append cmd [join $args]
             ::redis::redis_writenl $fd $cmd
         }
-        ::redis::redis_read_reply $fd
+        if {$blocking} {
+            ::redis::redis_read_reply $fd
+        } else {
+            # Every well formed reply read will pop an element from this
+            # list and use it as a callback. So pipelining is supported
+            # in non blocking mode.
+            lappend ::redis::callback($id) $callback
+            fileevent $fd readable [list ::redis::redis_readable $fd $id]
+        }
     } else {
         uplevel 1 [list ::redis::__method__$method $id $fd] $args
     }
 }
 
     } else {
         uplevel 1 [list ::redis::__method__$method $id $fd] $args
     }
 }
 
+proc ::redis::__method__blocking {id fd val} {
+    set ::redis::blocking($id) $val
+    fconfigure $fd -blocking $val
+}
+
 proc ::redis::__method__close {id fd} {
     catch {close $fd}
     catch {unset ::redis::fd($id)}
 proc ::redis::__method__close {id fd} {
     catch {close $fd}
     catch {unset ::redis::fd($id)}
+    catch {unset ::redis::blocking($id)}
+    catch {unset ::redis::state($id)}
+    catch {unset ::redis::statestack($id)}
+    catch {unset ::redis::callback($id)}
     catch {interp alias {} ::redis::redisHandle$id {}}
 }
 
     catch {interp alias {} ::redis::redisHandle$id {}}
 }
 
@@ -129,3 +175,79 @@ proc ::redis::redis_read_reply fd {
         default {return -code error "Bad protocol, $type as reply type byte"}
     }
 }
         default {return -code error "Bad protocol, $type as reply type byte"}
     }
 }
+
+proc ::redis::redis_reset_state id {
+    set ::redis::state($id) [dict create buf {} mbulk -1 bulk -1 reply {}]
+    set ::redis::statestack($id) {}
+}
+
+proc ::redis::redis_call_callback {id type reply} {
+    set cb [lindex $::redis::callback($id) 0]
+    set ::redis::callback($id) [lrange $::redis::callback($id) 1 end]
+    uplevel #0 $cb [list ::redis::redisHandle$id $type $reply]
+    ::redis::redis_reset_state $id
+}
+
+# Read a reply in non-blocking mode.
+proc ::redis::redis_readable {fd id} {
+    if {[eof $fd]} {
+        redis_call_callback $id eof {}
+        ::redis::__method__close $id $fd
+        return
+    }
+    if {[dict get $::redis::state($id) bulk] == -1} {
+        set line [gets $fd]
+        if {$line eq {}} return ;# No complete line available, return
+        switch -exact -- [string index $line 0] {
+            : -
+            + {redis_call_callback $id reply [string range $line 1 end-1]}
+            - {redis_call_callback $id err [string range $line 1 end-1]}
+            $ {
+                dict set ::redis::state($id) bulk \
+                    [expr [string range $line 1 end-1]+2]
+                if {[dict get $::redis::state($id) bulk] == 1} {
+                    # We got a $-1, hack the state to play well with this.
+                    dict set ::redis::state($id) bulk 2
+                    dict set ::redis::state($id) buf "\r\n"
+                    ::redis::redis_readable $fd $id
+                }
+            }
+            * {
+                dict set ::redis::state($id) mbulk [string range $line 1 end-1]
+                # Handle *-1
+                if {[dict get $::redis::state($id) mbulk] == -1} {
+                    redis_call_callback $id reply {}
+                }
+            }
+            default {
+                redis_call_callback $id err \
+                    "Bad protocol, $type as reply type byte"
+            }
+        }
+    } else {
+        set totlen [dict get $::redis::state($id) bulk]
+        set buflen [string length [dict get $::redis::state($id) buf]]
+        set toread [expr {$totlen-$buflen}]
+        set data [read $fd $toread]
+        set nread [string length $data]
+        dict append ::redis::state($id) buf $data
+        # Check if we read a complete bulk reply
+        if {[string length [dict get $::redis::state($id) buf]] ==
+            [dict get $::redis::state($id) bulk]} {
+            if {[dict get $::redis::state($id) mbulk] == -1} {
+                redis_call_callback $id reply \
+                    [string range [dict get $::redis::state($id) buf] 0 end-2]
+            } else {
+                dict with ::redis::state($id) {
+                    lappend reply [string range $buf 0 end-2]
+                    incr mbulk -1
+                    set bulk -1
+                }
+                if {[dict get $::redis::state($id) mbulk] == 0} {
+                    redis_call_callback $id reply \
+                        [dict get $::redis::state($id) reply]
+                }
+            }
+        }
+    }
+}