X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/92b27fe946004935f0f29277d2a762c63af55ce3..05df7621750eb4bde669141afd0809f747d8f263:/redis.tcl diff --git a/redis.tcl b/redis.tcl index ac9f8601..a1f3f566 100644 --- a/redis.tcl +++ b/redis.tcl @@ -9,18 +9,37 @@ # $r lpush mylist bar # $r lrange mylist 0 -1 # $r close +# +# Non blocking usage example: +# +# proc handlePong {r type reply} { +# puts "PONG $type '$reply'" +# if {$reply ne "PONG"} { +# $r ping [list handlePong] +# } +# } +# +# set r [redis] +# $r blocking 0 +# $r get fo [list handlePong] +# +# vwait forever package provide redis 0.1 namespace eval redis {} set ::redis::id 0 array set ::redis::fd {} +array set ::redis::blocking {} +array set ::redis::callback {} +array set ::redis::state {} ;# State in non-blocking reply reading +array set ::redis::statestack {} ;# Stack of states, for nested mbulks array set ::redis::bulkarg {} array set ::redis::multibulkarg {} # Flag commands requiring last argument as a bulk write operation foreach redis_bulk_cmd { - set setnx rpush lpush lset lrem sadd srem sismember echo getset smove zadd zrem zscore zincrby append zrank zrevrank hget hdel + set setnx rpush lpush lset lrem sadd srem sismember echo getset smove zadd zrem zscore zincrby append zrank zrevrank hget hdel hexists } { set ::redis::bulkarg($redis_bulk_cmd) {} } @@ -40,11 +59,21 @@ proc redis {{server 127.0.0.1} {port 6379}} { fconfigure $fd -translation binary set id [incr ::redis::id] set ::redis::fd($id) $fd + set ::redis::blocking($id) 1 + ::redis::redis_reset_state $id interp alias {} ::redis::redisHandle$id {} ::redis::__dispatch__ $id } proc ::redis::__dispatch__ {id method args} { set fd $::redis::fd($id) + set blocking $::redis::blocking($id) + if {$blocking == 0} { + if {[llength $args] == 0} { + error "Please provide a callback in non-blocking mode" + } + set callback [lindex $args end] + set args [lrange $args 0 end-1] + } if {[info command ::redis::__method__$method] eq {}} { if {[info exists ::redis::bulkarg($method)]} { set cmd "$method " @@ -65,15 +94,32 @@ proc ::redis::__dispatch__ {id method args} { append cmd [join $args] ::redis::redis_writenl $fd $cmd } - ::redis::redis_read_reply $fd + if {$blocking} { + ::redis::redis_read_reply $fd + } else { + # Every well formed reply read will pop an element from this + # list and use it as a callback. So pipelining is supported + # in non blocking mode. + lappend ::redis::callback($id) $callback + fileevent $fd readable [list ::redis::redis_readable $fd $id] + } } else { uplevel 1 [list ::redis::__method__$method $id $fd] $args } } +proc ::redis::__method__blocking {id fd val} { + set ::redis::blocking($id) $val + fconfigure $fd -blocking $val +} + proc ::redis::__method__close {id fd} { catch {close $fd} catch {unset ::redis::fd($id)} + catch {unset ::redis::blocking($id)} + catch {unset ::redis::state($id)} + catch {unset ::redis::statestack($id)} + catch {unset ::redis::callback($id)} catch {interp alias {} ::redis::redisHandle$id {}} } @@ -129,3 +175,79 @@ proc ::redis::redis_read_reply fd { default {return -code error "Bad protocol, $type as reply type byte"} } } + +proc ::redis::redis_reset_state id { + set ::redis::state($id) [dict create buf {} mbulk -1 bulk -1 reply {}] + set ::redis::statestack($id) {} +} + +proc ::redis::redis_call_callback {id type reply} { + set cb [lindex $::redis::callback($id) 0] + set ::redis::callback($id) [lrange $::redis::callback($id) 1 end] + uplevel #0 $cb [list ::redis::redisHandle$id $type $reply] + ::redis::redis_reset_state $id +} + +# Read a reply in non-blocking mode. +proc ::redis::redis_readable {fd id} { + if {[eof $fd]} { + redis_call_callback $id eof {} + ::redis::__method__close $id $fd + return + } + if {[dict get $::redis::state($id) bulk] == -1} { + set line [gets $fd] + if {$line eq {}} return ;# No complete line available, return + switch -exact -- [string index $line 0] { + : - + + {redis_call_callback $id reply [string range $line 1 end-1]} + - {redis_call_callback $id err [string range $line 1 end-1]} + $ { + dict set ::redis::state($id) bulk \ + [expr [string range $line 1 end-1]+2] + if {[dict get $::redis::state($id) bulk] == 1} { + # We got a $-1, hack the state to play well with this. + dict set ::redis::state($id) bulk 2 + dict set ::redis::state($id) buf "\r\n" + ::redis::redis_readable $fd $id + } + } + * { + dict set ::redis::state($id) mbulk [string range $line 1 end-1] + # Handle *-1 + if {[dict get $::redis::state($id) mbulk] == -1} { + redis_call_callback $id reply {} + } + } + default { + redis_call_callback $id err \ + "Bad protocol, $type as reply type byte" + } + } + } else { + set totlen [dict get $::redis::state($id) bulk] + set buflen [string length [dict get $::redis::state($id) buf]] + set toread [expr {$totlen-$buflen}] + set data [read $fd $toread] + set nread [string length $data] + dict append ::redis::state($id) buf $data + # Check if we read a complete bulk reply + if {[string length [dict get $::redis::state($id) buf]] == + [dict get $::redis::state($id) bulk]} { + if {[dict get $::redis::state($id) mbulk] == -1} { + redis_call_callback $id reply \ + [string range [dict get $::redis::state($id) buf] 0 end-2] + } else { + dict with ::redis::state($id) { + lappend reply [string range $buf 0 end-2] + incr mbulk -1 + set bulk -1 + } + if {[dict get $::redis::state($id) mbulk] == 0} { + redis_call_callback $id reply \ + [dict get $::redis::state($id) reply] + } + } + } + } +}