]>
Commit | Line | Data |
---|---|---|
e59229a2 | 1 | (ns redis.internal |
2 | (:import [java.io InputStream | |
3 | OutputStream | |
4 | Reader | |
5 | InputStreamReader | |
6 | BufferedReader] | |
7 | [java.net Socket])) | |
8 | ||
9 | ||
10 | ||
11 | (def *cr* 0x0d) | |
12 | (def *lf* 0x0a) | |
13 | (defn- cr? [c] (= c *cr*)) | |
14 | (defn- lf? [c] (= c *lf*)) | |
15 | ||
16 | (defn- uppercase [#^String s] (.toUpperCase s)) | |
17 | (defn- trim [#^String s] (.trim s)) | |
18 | (defn- parse-int [#^String s] (Integer/parseInt s)) | |
19 | (defn- char-array [len] (make-array Character/TYPE len)) | |
20 | ||
21 | (def *default-host* "127.0.0.1") | |
22 | (def *default-port* 6379) | |
23 | (def *default-db* 0) | |
24 | (def *default-timeout* 5) | |
25 | ||
26 | ||
27 | (defstruct server :host :port :db :timeout :socket) | |
28 | ||
29 | (def *server* (struct-map server | |
30 | :host *default-host* | |
31 | :port *default-port* | |
32 | :db *default-db* | |
33 | :timeout *default-timeout* ;; not yet used | |
34 | :socket nil)) | |
35 | ||
36 | (defn connect-to-server | |
37 | "Create a Socket connected to server" | |
38 | [server] | |
39 | (let [{:keys [host port timeout]} server | |
40 | socket (Socket. #^String host #^Integer port)] | |
41 | (doto socket | |
111d9959 | 42 | (.setTcpNoDelay true) |
43 | (.setKeepAlive true)))) | |
e59229a2 | 44 | |
45 | (defn with-server* | |
46 | [server-spec func] | |
47 | (let [server (merge *server* server-spec)] | |
48 | (with-open [#^Socket socket (connect-to-server server)] | |
49 | (binding [*server* (assoc server :socket socket)] | |
50 | (func))))) | |
51 | ||
52 | (defn socket* [] | |
53 | (or (:socket *server*) | |
54 | (throw (Exception. "Not connected to a Redis server")))) | |
55 | ||
56 | (defn send-command | |
57 | "Send a command string to server" | |
58 | [#^String cmd] | |
59 | (let [out (.getOutputStream (#^Socket socket*)) | |
60 | bytes (.getBytes cmd)] | |
61 | (.write out bytes))) | |
62 | ||
63 | ||
64 | (defn read-crlf | |
65 | "Read a CR+LF combination from Reader" | |
66 | [#^Reader reader] | |
67 | (let [cr (.read reader) | |
68 | lf (.read reader)] | |
69 | (when-not | |
70 | (and (cr? cr) | |
71 | (lf? lf)) | |
72 | (throw (Exception. "Error reading CR/LF"))) | |
73 | nil)) | |
74 | ||
75 | (defn read-line-crlf | |
76 | "Read from reader until exactly a CR+LF combination is | |
77 | found. Returns the line read without trailing CR+LF. | |
78 | ||
79 | This is used instead of Reader.readLine() method since it tries to | |
80 | read either a CR, a LF or a CR+LF, which we don't want in this | |
81 | case." | |
82 | [#^Reader reader] | |
83 | (loop [line [] | |
84 | c (.read reader)] | |
85 | (when (< c 0) | |
86 | (throw (Exception. "Error reading line: EOF reached before CR/LF sequence"))) | |
87 | (if (cr? c) | |
88 | (let [next (.read reader)] | |
89 | (if (lf? next) | |
90 | (apply str line) | |
91 | (throw (Exception. "Error reading line: Missing LF")))) | |
92 | (recur (conj line (char c)) | |
93 | (.read reader))))) | |
94 | ||
95 | ;; | |
96 | ;; Reply dispatching | |
97 | ;; | |
98 | ||
99 | ||
100 | ||
101 | (defn reply-type | |
102 | ([#^BufferedReader reader] | |
111d9959 | 103 | (char (.read reader)))) |
e59229a2 | 104 | |
105 | (defmulti parse-reply reply-type :default :unknown) | |
106 | ||
107 | (defn read-reply | |
108 | ([] | |
109 | (let [input-stream (.getInputStream (#^Socket socket*)) | |
110 | reader (BufferedReader. (InputStreamReader. input-stream))] | |
111 | (read-reply reader))) | |
112 | ([#^BufferedReader reader] | |
113 | (parse-reply reader))) | |
114 | ||
115 | (defmethod parse-reply :unknown | |
116 | [#^BufferedReader reader] | |
117 | (throw (Exception. (str "Unknown reply type:")))) | |
118 | ||
119 | (defmethod parse-reply \- | |
120 | [#^BufferedReader reader] | |
121 | (let [error (read-line-crlf reader)] | |
122 | (throw (Exception. (str "Server error: " error))))) | |
123 | ||
124 | (defmethod parse-reply \+ | |
125 | [#^BufferedReader reader] | |
126 | (read-line-crlf reader)) | |
127 | ||
128 | (defmethod parse-reply \$ | |
129 | [#^BufferedReader reader] | |
130 | (let [line (read-line-crlf reader) | |
131 | length (parse-int line)] | |
132 | (if (< length 0) | |
133 | nil | |
134 | (let [#^chars cbuf (char-array length) | |
135 | nread (.read reader cbuf 0 length)] | |
136 | (if (not= nread length) | |
137 | (throw (Exception. "Could not read correct number of bytes")) | |
138 | (do | |
139 | (read-crlf reader) ;; CRLF | |
140 | (String. cbuf))))))) | |
141 | ||
142 | (defmethod parse-reply \* | |
143 | [#^BufferedReader reader] | |
144 | (let [line (read-line-crlf reader) | |
145 | count (parse-int line)] | |
146 | (if (< count 0) | |
147 | nil | |
148 | (loop [i count | |
149 | replies []] | |
150 | (if (zero? i) | |
151 | replies | |
152 | (recur (dec i) (conj replies (read-reply reader)))))))) | |
153 | ||
154 | (defmethod parse-reply \: | |
155 | [#^BufferedReader reader] | |
156 | (let [line (trim (read-line-crlf reader)) | |
157 | int (parse-int line)] | |
158 | int)) | |
159 | ||
160 | ||
161 | ||
162 | (defn str-join | |
163 | "Join elements in sequence with separator" | |
164 | [separator sequence] | |
165 | (apply str (interpose separator sequence))) | |
166 | ||
167 | ||
168 | (defn inline-command | |
169 | "Create a string for an inline command" | |
170 | [name & args] | |
171 | (let [cmd (str-join " " (conj args name))] | |
172 | (str cmd "\r\n"))) | |
173 | ||
174 | (defn bulk-command | |
175 | "Create a string for an bulk command" | |
176 | [name & args] | |
177 | (let [data (str (last args)) | |
178 | data-length (count (str data)) | |
179 | args* (concat (butlast args) [data-length]) | |
180 | cmd (apply inline-command name args*)] | |
181 | (str cmd data "\r\n"))) | |
182 | ||
183 | ||
184 | (defn- sort-command-args-to-string | |
185 | [args] | |
186 | (loop [arg-strings [] | |
187 | args args] | |
188 | (if (empty? args) | |
189 | (str-join " " arg-strings) | |
190 | (let [type (first args) | |
191 | args (rest args)] | |
192 | (condp = type | |
193 | :by (let [pattern (first args)] | |
194 | (recur (conj arg-strings "BY" pattern) | |
195 | (rest args))) | |
196 | :limit (let [start (first args) | |
197 | end (second args)] | |
198 | (recur (conj arg-strings "LIMIT" start end) | |
199 | (drop 2 args))) | |
200 | :get (let [pattern (first args)] | |
201 | (recur (conj arg-strings "GET" pattern) | |
202 | (rest args))) | |
203 | :alpha (recur (conj arg-strings "ALPHA") args) | |
204 | :asc (recur (conj arg-strings "ASC") args) | |
205 | :desc (recur (conj arg-strings "DESC") args) | |
206 | (throw (Exception. (str "Error parsing SORT arguments: Unknown argument: " type)))))))) | |
207 | ||
208 | (defn sort-command | |
209 | [name & args] | |
210 | (when-not (= name "SORT") | |
211 | (throw (Exception. "Sort command name must be 'SORT'"))) | |
212 | (let [key (first args) | |
213 | arg-string (sort-command-args-to-string (rest args)) | |
214 | cmd (str "SORT " key)] | |
215 | (if (empty? arg-string) | |
216 | (str cmd "\r\n") | |
217 | (str cmd " " arg-string "\r\n")))) | |
218 | ||
219 | ||
220 | (def command-fns {:inline 'inline-command | |
221 | :bulk 'bulk-command | |
222 | :sort 'sort-command}) | |
223 | ||
224 | ||
225 | (defn parse-params | |
226 | "Return a restructuring of params, which is of form: | |
227 | [arg* (& more)?] | |
228 | into | |
229 | [(arg1 arg2 ..) more]" | |
230 | [params] | |
231 | (let [[args rest] (split-with #(not= % '&) params)] | |
232 | [args (last rest)])) | |
233 | ||
234 | (defmacro defcommand | |
235 | "Define a function for Redis command name with parameters | |
236 | params. Type is one of :inline or :bulk, which determines how the | |
237 | command string is constructued." | |
238 | ([name params type] `(defcommand ~name ~params ~type (fn [reply#] reply#))) | |
239 | ([name params type reply-fn] `(~name ~params ~type ~reply-fn) | |
240 | (do | |
241 | (let [command (uppercase (str name)) | |
242 | command-fn (type command-fns) | |
243 | [command-params | |
244 | command-params-rest] (parse-params params)] | |
245 | `(defn ~name | |
246 | ~params | |
247 | (let [request# (apply ~command-fn | |
248 | ~command | |
249 | ~@command-params | |
250 | ~command-params-rest)] | |
251 | (send-command request#) | |
252 | (~reply-fn (read-reply))))) | |
253 | ||
254 | ))) | |
255 | ||
256 | ||
257 | (defmacro defcommands | |
258 | [& command-defs] | |
259 | `(do ~@(map (fn [command-def] | |
260 | `(defcommand ~@command-def)) command-defs))) | |
261 | ||
262 | ||
263 |