]> git.saurik.com Git - redis.git/blame - client-libraries/clojure/src/redis/internal.clj
redis-cli now accepts a -r (repeat) switch. Still there is a memory leaks to fix
[redis.git] / client-libraries / clojure / src / redis / internal.clj
CommitLineData
e59229a2 1(ns redis.internal
2 (:import [java.io InputStream
3 OutputStream
4 Reader
5 InputStreamReader
6 BufferedReader]
7 [java.net Socket]))
8
9
10
11(def *cr* 0x0d)
12(def *lf* 0x0a)
13(defn- cr? [c] (= c *cr*))
14(defn- lf? [c] (= c *lf*))
15
16(defn- uppercase [#^String s] (.toUpperCase s))
17(defn- trim [#^String s] (.trim s))
18(defn- parse-int [#^String s] (Integer/parseInt s))
19(defn- char-array [len] (make-array Character/TYPE len))
20
21(def *default-host* "127.0.0.1")
22(def *default-port* 6379)
23(def *default-db* 0)
24(def *default-timeout* 5)
25
26
27(defstruct server :host :port :db :timeout :socket)
28
29(def *server* (struct-map server
30 :host *default-host*
31 :port *default-port*
32 :db *default-db*
33 :timeout *default-timeout* ;; not yet used
34 :socket nil))
35
36(defn connect-to-server
37 "Create a Socket connected to server"
38 [server]
39 (let [{:keys [host port timeout]} server
40 socket (Socket. #^String host #^Integer port)]
41 (doto socket
111d9959 42 (.setTcpNoDelay true)
43 (.setKeepAlive true))))
e59229a2 44
45(defn with-server*
46 [server-spec func]
47 (let [server (merge *server* server-spec)]
48 (with-open [#^Socket socket (connect-to-server server)]
49 (binding [*server* (assoc server :socket socket)]
50 (func)))))
51
52(defn socket* []
53 (or (:socket *server*)
54 (throw (Exception. "Not connected to a Redis server"))))
55
56(defn send-command
57 "Send a command string to server"
58 [#^String cmd]
59 (let [out (.getOutputStream (#^Socket socket*))
60 bytes (.getBytes cmd)]
61 (.write out bytes)))
62
63
64(defn read-crlf
65 "Read a CR+LF combination from Reader"
66 [#^Reader reader]
67 (let [cr (.read reader)
68 lf (.read reader)]
69 (when-not
70 (and (cr? cr)
71 (lf? lf))
72 (throw (Exception. "Error reading CR/LF")))
73 nil))
74
75(defn read-line-crlf
76 "Read from reader until exactly a CR+LF combination is
77 found. Returns the line read without trailing CR+LF.
78
79 This is used instead of Reader.readLine() method since it tries to
80 read either a CR, a LF or a CR+LF, which we don't want in this
81 case."
82 [#^Reader reader]
83 (loop [line []
84 c (.read reader)]
85 (when (< c 0)
86 (throw (Exception. "Error reading line: EOF reached before CR/LF sequence")))
87 (if (cr? c)
88 (let [next (.read reader)]
89 (if (lf? next)
90 (apply str line)
91 (throw (Exception. "Error reading line: Missing LF"))))
92 (recur (conj line (char c))
93 (.read reader)))))
94
95;;
96;; Reply dispatching
97;;
98
99
100
101(defn reply-type
102 ([#^BufferedReader reader]
111d9959 103 (char (.read reader))))
e59229a2 104
105(defmulti parse-reply reply-type :default :unknown)
106
107(defn read-reply
108 ([]
109 (let [input-stream (.getInputStream (#^Socket socket*))
110 reader (BufferedReader. (InputStreamReader. input-stream))]
111 (read-reply reader)))
112 ([#^BufferedReader reader]
113 (parse-reply reader)))
114
115(defmethod parse-reply :unknown
116 [#^BufferedReader reader]
117 (throw (Exception. (str "Unknown reply type:"))))
118
119(defmethod parse-reply \-
120 [#^BufferedReader reader]
121 (let [error (read-line-crlf reader)]
122 (throw (Exception. (str "Server error: " error)))))
123
124(defmethod parse-reply \+
125 [#^BufferedReader reader]
126 (read-line-crlf reader))
127
128(defmethod parse-reply \$
129 [#^BufferedReader reader]
130 (let [line (read-line-crlf reader)
131 length (parse-int line)]
132 (if (< length 0)
133 nil
134 (let [#^chars cbuf (char-array length)
135 nread (.read reader cbuf 0 length)]
136 (if (not= nread length)
137 (throw (Exception. "Could not read correct number of bytes"))
138 (do
139 (read-crlf reader) ;; CRLF
140 (String. cbuf)))))))
141
142(defmethod parse-reply \*
143 [#^BufferedReader reader]
144 (let [line (read-line-crlf reader)
145 count (parse-int line)]
146 (if (< count 0)
147 nil
148 (loop [i count
149 replies []]
150 (if (zero? i)
151 replies
152 (recur (dec i) (conj replies (read-reply reader))))))))
153
154(defmethod parse-reply \:
155 [#^BufferedReader reader]
156 (let [line (trim (read-line-crlf reader))
157 int (parse-int line)]
158 int))
159
160
161
162(defn str-join
163 "Join elements in sequence with separator"
164 [separator sequence]
165 (apply str (interpose separator sequence)))
166
167
168(defn inline-command
169 "Create a string for an inline command"
170 [name & args]
171 (let [cmd (str-join " " (conj args name))]
172 (str cmd "\r\n")))
173
174(defn bulk-command
175 "Create a string for an bulk command"
176 [name & args]
177 (let [data (str (last args))
178 data-length (count (str data))
179 args* (concat (butlast args) [data-length])
180 cmd (apply inline-command name args*)]
181 (str cmd data "\r\n")))
182
183
184(defn- sort-command-args-to-string
185 [args]
186 (loop [arg-strings []
187 args args]
188 (if (empty? args)
189 (str-join " " arg-strings)
190 (let [type (first args)
191 args (rest args)]
192 (condp = type
193 :by (let [pattern (first args)]
194 (recur (conj arg-strings "BY" pattern)
195 (rest args)))
196 :limit (let [start (first args)
197 end (second args)]
198 (recur (conj arg-strings "LIMIT" start end)
199 (drop 2 args)))
200 :get (let [pattern (first args)]
201 (recur (conj arg-strings "GET" pattern)
202 (rest args)))
203 :alpha (recur (conj arg-strings "ALPHA") args)
204 :asc (recur (conj arg-strings "ASC") args)
205 :desc (recur (conj arg-strings "DESC") args)
206 (throw (Exception. (str "Error parsing SORT arguments: Unknown argument: " type))))))))
207
208(defn sort-command
209 [name & args]
210 (when-not (= name "SORT")
211 (throw (Exception. "Sort command name must be 'SORT'")))
212 (let [key (first args)
213 arg-string (sort-command-args-to-string (rest args))
214 cmd (str "SORT " key)]
215 (if (empty? arg-string)
216 (str cmd "\r\n")
217 (str cmd " " arg-string "\r\n"))))
218
219
220(def command-fns {:inline 'inline-command
221 :bulk 'bulk-command
222 :sort 'sort-command})
223
224
225(defn parse-params
226 "Return a restructuring of params, which is of form:
227 [arg* (& more)?]
228 into
229 [(arg1 arg2 ..) more]"
230 [params]
231 (let [[args rest] (split-with #(not= % '&) params)]
232 [args (last rest)]))
233
234(defmacro defcommand
235 "Define a function for Redis command name with parameters
236 params. Type is one of :inline or :bulk, which determines how the
237 command string is constructued."
238 ([name params type] `(defcommand ~name ~params ~type (fn [reply#] reply#)))
239 ([name params type reply-fn] `(~name ~params ~type ~reply-fn)
240 (do
241 (let [command (uppercase (str name))
242 command-fn (type command-fns)
243 [command-params
244 command-params-rest] (parse-params params)]
245 `(defn ~name
246 ~params
247 (let [request# (apply ~command-fn
248 ~command
249 ~@command-params
250 ~command-params-rest)]
251 (send-command request#)
252 (~reply-fn (read-reply)))))
253
254 )))
255
256
257(defmacro defcommands
258 [& command-defs]
259 `(do ~@(map (fn [command-def]
260 `(defcommand ~@command-def)) command-defs)))
261
262
263