在 clojure 中处理来自 http 服务器的消息流
Processing a stream of messages from a http server in clojure
我正在寻找一种惯用的方法来执行以下操作。我有一个 http 服务器,它对特定的 GET 请求以消息流进行响应。现在,由于此消息是非终止的,因此当我使用 clj-http/get 时,调用将永远阻塞(我正在使用 LightTable)。我想设置一个回调或 core.async 样式的通道来对传入的消息进行一些操作。即使将流写入文件对我来说也是一个很好的第一步。任何指针?这是电话:
(require '[clj-http.client :as client])
(def url "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")
(client/get url)
必须将日期更改为今天的日期才能流式传输数据。
谢谢!
(ns asyncfun.core
(:require [clojure.core.async :as async
:refer [<! >!! go chan]]
[clj-http.client :as client]))
(def url "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")
(def out-chan (chan))
(go (println (<! out-chan)))
(>!! out-chan (client/get url))
我在几分钟内将这段代码拼凑在一起。我想 core.async 就是您要找的。
要将流写入文件,一种简单的方法是使用 clojure.java.io/copy
(它采用输入流,例如 (:body (client/get some-url {:as :stream}))
返回的输入流和输出流,并从其中复制到其他)。像
(ns http-stream
(:require [clj-http.client :as client]
[clojure.java.io :as io]))
(with-open [in-stream (:body (client/get "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt" {:as :stream}))
out-stream (->> "streamoutput.txt"
io/as-file
io/output-stream)]
(io/copy in-stream out-stream))
这在几秒钟内给了我几千行制表符分隔值。现在,为了在行级别使用 core.async 处理它们,我们可能希望使用 reader
和 line-seq
:
来处理更多流
(ns http-stream
(:require [clj-http.client :as client]
[clojure.core.async :as async]
[clojure.java.io :as io]
[clojure.string :as str]))
(defn trades-chan
"Open the URL as a stream of trades information. Return a channel of the trades, represented as strings."
[dump-url]
(let[lines (-> dump-url
(client/get {:as :stream})
:body
io/reader
line-seq) ];;A lazy seq of each line in the stream.
(async/to-chan lines))) ;;Return a channel which outputs the lines
;;Example: Print the first 250 lines.
(let [a (trades-chan "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")]
(async/go-loop [takes 250]
(when (< 0 takes)
(println (async/<! a))
(recur (dec takes)))))
现在,你基本上已经开始了,但我注意到流总是以列的描述开始
time price quantity board source buyer seller initiator
你可以借此机会提高一点。特别是,这些信息足以为 trades-chan 构建一个转换器,可以将交易转换为更方便使用的格式,例如地图。此外,我们可能需要一种方法来 停止 获取元素并在某个时候关闭连接。我自己对 core.async 不太熟悉,但这似乎有效:
(defn trades-chan
"Open the URL as a tab-separated values stream of trades.
Returns a core.async channel of the trades, represented as maps.
Closes the HTTP stream on channel close!"
[dump-url]
(let[stream (-> dump-url
(client/get {:as :stream})
:body)
lines (-> stream
io/reader
line-seq) ;;A lazy seq of each line in the stream.
fields (map keyword (str/split (first lines) #"\t")) ;; (:time :price :quantity ...
transducer (map (comp #(zipmap fields %) #(str/split % #"\t"))) ;;A transducer that splits strings on tab and makes them into maps with keys from fields
output-chan (async/chan 50 transducer)]
(async/go-loop [my-lines (drop 1 lines)]
(if (async/>! output-chan (first my-lines)) ;;If we managed to put
(recur (rest my-lines)) ;;then the chan is not closed. Recur with the rest of the lines.
(.close stream))) ;;else close the HTTP stream.
output-chan))
我认为 is reasonable and gives a good introduction to combining clj-http
with core.async
. However, if you do not stick to clj-http
, I would like to strongly recommend the http-kit 库,它更适合异步响应处理。使用 http-kit
,您可以按如下方式写回电话。
user> (require '[clojure.java.io :as io]
'[org.httpkit.client :as h])
nil
user> (def url "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")
#'user/url
user> (h/get url {:as :stream}
(fn [{:keys [status body]}]
(if (= status 200)
(with-open [out (io/output-stream "/tmp/output.txt")]
(io/copy body out)))))
#<core$promise$reify__6363@373b22df: :pending>
最后一个 h/get
函数立即调用 returns,其回调 fn
将响应正文异步写入文件 /tmp/output.txt
。
我正在寻找一种惯用的方法来执行以下操作。我有一个 http 服务器,它对特定的 GET 请求以消息流进行响应。现在,由于此消息是非终止的,因此当我使用 clj-http/get 时,调用将永远阻塞(我正在使用 LightTable)。我想设置一个回调或 core.async 样式的通道来对传入的消息进行一些操作。即使将流写入文件对我来说也是一个很好的第一步。任何指针?这是电话:
(require '[clj-http.client :as client])
(def url "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")
(client/get url)
必须将日期更改为今天的日期才能流式传输数据。 谢谢!
(ns asyncfun.core
(:require [clojure.core.async :as async
:refer [<! >!! go chan]]
[clj-http.client :as client]))
(def url "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")
(def out-chan (chan))
(go (println (<! out-chan)))
(>!! out-chan (client/get url))
我在几分钟内将这段代码拼凑在一起。我想 core.async 就是您要找的。
要将流写入文件,一种简单的方法是使用 clojure.java.io/copy
(它采用输入流,例如 (:body (client/get some-url {:as :stream}))
返回的输入流和输出流,并从其中复制到其他)。像
(ns http-stream
(:require [clj-http.client :as client]
[clojure.java.io :as io]))
(with-open [in-stream (:body (client/get "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt" {:as :stream}))
out-stream (->> "streamoutput.txt"
io/as-file
io/output-stream)]
(io/copy in-stream out-stream))
这在几秒钟内给了我几千行制表符分隔值。现在,为了在行级别使用 core.async 处理它们,我们可能希望使用 reader
和 line-seq
:
(ns http-stream
(:require [clj-http.client :as client]
[clojure.core.async :as async]
[clojure.java.io :as io]
[clojure.string :as str]))
(defn trades-chan
"Open the URL as a stream of trades information. Return a channel of the trades, represented as strings."
[dump-url]
(let[lines (-> dump-url
(client/get {:as :stream})
:body
io/reader
line-seq) ];;A lazy seq of each line in the stream.
(async/to-chan lines))) ;;Return a channel which outputs the lines
;;Example: Print the first 250 lines.
(let [a (trades-chan "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")]
(async/go-loop [takes 250]
(when (< 0 takes)
(println (async/<! a))
(recur (dec takes)))))
现在,你基本上已经开始了,但我注意到流总是以列的描述开始
time price quantity board source buyer seller initiator
你可以借此机会提高一点。特别是,这些信息足以为 trades-chan 构建一个转换器,可以将交易转换为更方便使用的格式,例如地图。此外,我们可能需要一种方法来 停止 获取元素并在某个时候关闭连接。我自己对 core.async 不太熟悉,但这似乎有效:
(defn trades-chan
"Open the URL as a tab-separated values stream of trades.
Returns a core.async channel of the trades, represented as maps.
Closes the HTTP stream on channel close!"
[dump-url]
(let[stream (-> dump-url
(client/get {:as :stream})
:body)
lines (-> stream
io/reader
line-seq) ;;A lazy seq of each line in the stream.
fields (map keyword (str/split (first lines) #"\t")) ;; (:time :price :quantity ...
transducer (map (comp #(zipmap fields %) #(str/split % #"\t"))) ;;A transducer that splits strings on tab and makes them into maps with keys from fields
output-chan (async/chan 50 transducer)]
(async/go-loop [my-lines (drop 1 lines)]
(if (async/>! output-chan (first my-lines)) ;;If we managed to put
(recur (rest my-lines)) ;;then the chan is not closed. Recur with the rest of the lines.
(.close stream))) ;;else close the HTTP stream.
output-chan))
我认为 clj-http
with core.async
. However, if you do not stick to clj-http
, I would like to strongly recommend the http-kit 库,它更适合异步响应处理。使用 http-kit
,您可以按如下方式写回电话。
user> (require '[clojure.java.io :as io]
'[org.httpkit.client :as h])
nil
user> (def url "http://hopey.netfonds.no/tradedump.php?date=20150508&paper=AAPL.O&csv_format=txt")
#'user/url
user> (h/get url {:as :stream}
(fn [{:keys [status body]}]
(if (= status 200)
(with-open [out (io/output-stream "/tmp/output.txt")]
(io/copy body out)))))
#<core$promise$reify__6363@373b22df: :pending>
最后一个 h/get
函数立即调用 returns,其回调 fn
将响应正文异步写入文件 /tmp/output.txt
。