X-Git-Url: https://git.saurik.com/redis.git/blobdiff_plain/c9a111acf47cb5bb2138d1f699253f87d68e53e8..e59229a2d540b00566f44c8f29764de3ae89b5be:/client-libraries/clojure/src/redis/internal.clj diff --git a/client-libraries/clojure/src/redis/internal.clj b/client-libraries/clojure/src/redis/internal.clj new file mode 100644 index 00000000..d363a58d --- /dev/null +++ b/client-libraries/clojure/src/redis/internal.clj @@ -0,0 +1,263 @@ +(ns redis.internal + (:import [java.io InputStream + OutputStream + Reader + InputStreamReader + BufferedReader] + [java.net Socket])) + + + +(def *cr* 0x0d) +(def *lf* 0x0a) +(defn- cr? [c] (= c *cr*)) +(defn- lf? [c] (= c *lf*)) + +(defn- uppercase [#^String s] (.toUpperCase s)) +(defn- trim [#^String s] (.trim s)) +(defn- parse-int [#^String s] (Integer/parseInt s)) +(defn- char-array [len] (make-array Character/TYPE len)) + +(def *default-host* "127.0.0.1") +(def *default-port* 6379) +(def *default-db* 0) +(def *default-timeout* 5) + + +(defstruct server :host :port :db :timeout :socket) + +(def *server* (struct-map server + :host *default-host* + :port *default-port* + :db *default-db* + :timeout *default-timeout* ;; not yet used + :socket nil)) + +(defn connect-to-server + "Create a Socket connected to server" + [server] + (let [{:keys [host port timeout]} server + socket (Socket. #^String host #^Integer port)] + (doto socket + (.setTcpNoDelay true)))) + +(defn with-server* + [server-spec func] + (let [server (merge *server* server-spec)] + (with-open [#^Socket socket (connect-to-server server)] + (binding [*server* (assoc server :socket socket)] + (func))))) + +(defn socket* [] + (or (:socket *server*) + (throw (Exception. "Not connected to a Redis server")))) + +(defn send-command + "Send a command string to server" + [#^String cmd] + (let [out (.getOutputStream (#^Socket socket*)) + bytes (.getBytes cmd)] + (.write out bytes))) + + +(defn read-crlf + "Read a CR+LF combination from Reader" + [#^Reader reader] + (let [cr (.read reader) + lf (.read reader)] + (when-not + (and (cr? cr) + (lf? lf)) + (throw (Exception. "Error reading CR/LF"))) + nil)) + +(defn read-line-crlf + "Read from reader until exactly a CR+LF combination is + found. Returns the line read without trailing CR+LF. + + This is used instead of Reader.readLine() method since it tries to + read either a CR, a LF or a CR+LF, which we don't want in this + case." + [#^Reader reader] + (loop [line [] + c (.read reader)] + (when (< c 0) + (throw (Exception. "Error reading line: EOF reached before CR/LF sequence"))) + (if (cr? c) + (let [next (.read reader)] + (if (lf? next) + (apply str line) + (throw (Exception. "Error reading line: Missing LF")))) + (recur (conj line (char c)) + (.read reader))))) + +;; +;; Reply dispatching +;; + + + +(defn reply-type + ([#^BufferedReader reader] + (let [type (char (.read reader))] + type))) + +(defmulti parse-reply reply-type :default :unknown) + +(defn read-reply + ([] + (let [input-stream (.getInputStream (#^Socket socket*)) + reader (BufferedReader. (InputStreamReader. input-stream))] + (read-reply reader))) + ([#^BufferedReader reader] + (parse-reply reader))) + +(defmethod parse-reply :unknown + [#^BufferedReader reader] + (throw (Exception. (str "Unknown reply type:")))) + +(defmethod parse-reply \- + [#^BufferedReader reader] + (let [error (read-line-crlf reader)] + (throw (Exception. (str "Server error: " error))))) + +(defmethod parse-reply \+ + [#^BufferedReader reader] + (read-line-crlf reader)) + +(defmethod parse-reply \$ + [#^BufferedReader reader] + (let [line (read-line-crlf reader) + length (parse-int line)] + (if (< length 0) + nil + (let [#^chars cbuf (char-array length) + nread (.read reader cbuf 0 length)] + (if (not= nread length) + (throw (Exception. "Could not read correct number of bytes")) + (do + (read-crlf reader) ;; CRLF + (String. cbuf))))))) + +(defmethod parse-reply \* + [#^BufferedReader reader] + (let [line (read-line-crlf reader) + count (parse-int line)] + (if (< count 0) + nil + (loop [i count + replies []] + (if (zero? i) + replies + (recur (dec i) (conj replies (read-reply reader)))))))) + +(defmethod parse-reply \: + [#^BufferedReader reader] + (let [line (trim (read-line-crlf reader)) + int (parse-int line)] + int)) + + + +(defn str-join + "Join elements in sequence with separator" + [separator sequence] + (apply str (interpose separator sequence))) + + +(defn inline-command + "Create a string for an inline command" + [name & args] + (let [cmd (str-join " " (conj args name))] + (str cmd "\r\n"))) + +(defn bulk-command + "Create a string for an bulk command" + [name & args] + (let [data (str (last args)) + data-length (count (str data)) + args* (concat (butlast args) [data-length]) + cmd (apply inline-command name args*)] + (str cmd data "\r\n"))) + + +(defn- sort-command-args-to-string + [args] + (loop [arg-strings [] + args args] + (if (empty? args) + (str-join " " arg-strings) + (let [type (first args) + args (rest args)] + (condp = type + :by (let [pattern (first args)] + (recur (conj arg-strings "BY" pattern) + (rest args))) + :limit (let [start (first args) + end (second args)] + (recur (conj arg-strings "LIMIT" start end) + (drop 2 args))) + :get (let [pattern (first args)] + (recur (conj arg-strings "GET" pattern) + (rest args))) + :alpha (recur (conj arg-strings "ALPHA") args) + :asc (recur (conj arg-strings "ASC") args) + :desc (recur (conj arg-strings "DESC") args) + (throw (Exception. (str "Error parsing SORT arguments: Unknown argument: " type)))))))) + +(defn sort-command + [name & args] + (when-not (= name "SORT") + (throw (Exception. "Sort command name must be 'SORT'"))) + (let [key (first args) + arg-string (sort-command-args-to-string (rest args)) + cmd (str "SORT " key)] + (if (empty? arg-string) + (str cmd "\r\n") + (str cmd " " arg-string "\r\n")))) + + +(def command-fns {:inline 'inline-command + :bulk 'bulk-command + :sort 'sort-command}) + + +(defn parse-params + "Return a restructuring of params, which is of form: + [arg* (& more)?] + into + [(arg1 arg2 ..) more]" + [params] + (let [[args rest] (split-with #(not= % '&) params)] + [args (last rest)])) + +(defmacro defcommand + "Define a function for Redis command name with parameters + params. Type is one of :inline or :bulk, which determines how the + command string is constructued." + ([name params type] `(defcommand ~name ~params ~type (fn [reply#] reply#))) + ([name params type reply-fn] `(~name ~params ~type ~reply-fn) + (do + (let [command (uppercase (str name)) + command-fn (type command-fns) + [command-params + command-params-rest] (parse-params params)] + `(defn ~name + ~params + (let [request# (apply ~command-fn + ~command + ~@command-params + ~command-params-rest)] + (send-command request#) + (~reply-fn (read-reply))))) + + ))) + + +(defmacro defcommands + [& command-defs] + `(do ~@(map (fn [command-def] + `(defcommand ~@command-def)) command-defs))) + + +