clojure jdbc -> 异步通道 -> csv 文件...为什么我不懒惰?
clojure jdbc -> async channel -> csv file... why am i not lazy?
我正在尝试更好地了解 core.async 和频道等。
我手头的任务是在数据库上发出 jdbc select 语句并将结果流式传输到异步通道。
我想从这个通道获取一个互斥的线程并使用 clojure.data.csv
写入一个 csv 文件。
当 运行 下面的程序时,它似乎并没有延迟发生...我没有向终端输出任何内容,然后所有内容立即出现,我的 csv 文件有 50 行。我希望有人能帮助我理解为什么。
提前致谢,
(ns db-async-test.core-test
(:require [clojure.java.jdbc :as j]
[clojure.java.io :as io]
[clojure.data.csv :as csv]
[clojure.core.async :as async :refer [>! <! >!! <!! chan thread]]
[clojure.string :as str]
[while-let.core :refer [while-let]]))
(defn db->chan [ch {:keys [sql db-spec]} ]
"Given input channel ch, sql select, and db-spec connection info, put db
hash-maps onto ch in a separate thread. Through back pressure I'm hoping to
populate channel lazily as a consumer does downstream processing."
(println "starting fetch...")
(let [
row-count (atom 0) ; For state on rows
db-connection (j/get-connection db-spec)
statement (j/prepare-statement
db-connection
sql {
:result-type :forward-only ;; you need this to be lazy
:fetch-size 3 ;; also this
:max-rows 0
:concurrency :read-only})
row-fn (fn[d] (do
(>!! ch d)
;; everything below is just for printing to stdout and
;; trying to understand where my non-lazy bottleneck is.
(swap! row-count inc)
(when (zero? (mod @row-count 5))
(do
#_(Thread/sleep 2000 )
(println "\tFetched " @row-count " rows.")
(flush)
))))]
(thread
(j/query db-connection [statement]
{:as-arrays? false
:result-set-fn vec
:row-fn row-fn
})
;; as producer we finished popluting the chan, now close in this same
;; thread.
(println "producer closing channel... (hopefully you have written rows by now...")
(async/close! ch))))
(defn chan->csv [ch csv-file ]
"With input channel ch and output file csv-file, read values off ch and write
to csv file in a separate thread."
(thread
(println "starting csv write...")
(def row-count (atom 0))
(with-open [^java.io.Writer writer (io/writer csv-file :append false :encoding "UTF-8")]
(while-let [data (<!! ch)]
(swap! row-count inc)
(csv/write-csv writer [data] :quote? (fn[x] false) )
(when (zero? (mod @row-count 2))
(do
#_(Thread/sleep 2000 )
(println "Wrote " @row-count " rows.")
(.flush writer)
(flush)))
))))
(def config {:db-spec {:classname "org.postgres.Driver"
:subprotocol "postgres"
:subname "//my-database-host:5432/mydb"
:user "me"
:password "****"}
:sql "select row_id, giant_xml_column::text as xml_column from public.big_toasty_table limit 50"})
;; main sorta thing
(do
(def ch (chan 1))
(db->chan ch config)
;; could pipeline with transducers/etc at some point.
(chan->csv ch "./test.csv"))
这是一些输出,其中包含我的评论,解释了 when/how 它的结果:
db-async-test.core-test>
;; this happens pretty quick when i run:
starting fetch...
starting csv write...
;; then it waits 30 seconds, and spits out all the output below... it's not
;; "streaming" through lazily?
Wrote 2 rows.
Fetched 5 rows.
Wrote 4 rows.
Wrote 6 rows.
Wrote 8 rows.
...clip...
Wrote 44 rows.
Wrote 46 rows.
Wrote 48 rows.
Fetched 50 rows.
producer closing channel... (hopefully you have written rows by now...
Wrote 50 rows.
好的,我想我有一些适合我的东西。
我的主要修复是换掉 org.clojure.java/jdbc
和
在我的 project.clj
中将其替换为 funcool/clojure.jdbc
。
funcool/clojure.jdbc
给我的是访问 result-set->lazy-seq 的权限。
新 ns
:
(ns db-async-test.core-test
(:require [jdbc.core :as j]
[while-let.core :refer [while-let]]
[clojure.java.io :as io]
[clojure.data.csv :as csv]
[clojure.core.async :as a :refer [>!! <!! chan thread]]
[clojure.string :as str]))
下面是相关代码。据我所知,这会通过异步通道流式传输内容。如果我有耐心,我应该能够使用它来玩转异步 reducers/transducers,并且希望能够处理相当大的数量(尤其是使用类似地图的函数)。
reader 线程的函数:
(defn db->chan [ch {:keys [sql db-spec]} ]
"Put db hash-maps onto ch."
(println "starting reader thread...")
(let [
row-count (atom 0) ; For state on rows
row-fn (fn[r] (do (>!! ch r)
;; everything below is just for printing to stdout
(swap! row-count inc)
(when (zero? (mod @row-count 100))
(println "Fetched " @row-count " rows."))))]
(with-open [conn (j/connection db-spec)]
(j/atomic conn
(with-open [cursor (j/fetch-lazy conn sql)]
(doseq [row (j/cursor->lazyseq cursor)]
(row-fn row)))))
(a/close! ch)))
作者线程函数:
(defn chan->csv [ch csv-file ]
"Read values off ch and write to csv file."
(println "starting writer thread...")
(def row-count (atom 0))
(with-open [^java.io.Writer writer (io/writer csv-file
:append false :encoding "UTF-8")]
(while-let [data (<!! ch)]
(swap! row-count inc)
(csv/write-csv writer [data] :quote? (fn[x] false) )
(when (zero? (mod @row-count 100))
(println "Wrote " @row-count " rows.")))))
我把 thread
部分放在下面而不是单独的函数中:
(def config {:db-spec {:subprotocol "postgresql"
:subname "//mydbhost:5432/mydb"
:user "me"
:password "*****"}
:sql "select row_id, giant_xml_value::text from some_table"})
(do
(def ch (chan 1))
(thread (db->chan ch config))
(thread (chan->csv ch "./test.csv")))
下面的输出,看起来两个线程都在同时工作,将数据流式传输到通道,然后从该通道弹出到 csv。
此外,即使 giant_xml_column
我的系统仍然没有使用大量内存。
starting fetch...
starting csv write...
Fetched 100 Wrote rows.
100
rows.
Fetched 200Wrote rows.200
rows.
Fetched 300 rows.
Wrote
...clip....
6000 rows.
Fetched 6100 rows.
Wrote 6100 rows.
Fetched 6200 rows.
Wrote 6200 rows.
Fetched 6300 rows.
Wrote
6300 rows.
Fetched 6400Wrote rows.6400
rows.
Fetched 6500 rows.Wrote
6500 rows.
我正在尝试更好地了解 core.async 和频道等。
我手头的任务是在数据库上发出 jdbc select 语句并将结果流式传输到异步通道。
我想从这个通道获取一个互斥的线程并使用 clojure.data.csv
写入一个 csv 文件。
当 运行 下面的程序时,它似乎并没有延迟发生...我没有向终端输出任何内容,然后所有内容立即出现,我的 csv 文件有 50 行。我希望有人能帮助我理解为什么。
提前致谢,
(ns db-async-test.core-test
(:require [clojure.java.jdbc :as j]
[clojure.java.io :as io]
[clojure.data.csv :as csv]
[clojure.core.async :as async :refer [>! <! >!! <!! chan thread]]
[clojure.string :as str]
[while-let.core :refer [while-let]]))
(defn db->chan [ch {:keys [sql db-spec]} ]
"Given input channel ch, sql select, and db-spec connection info, put db
hash-maps onto ch in a separate thread. Through back pressure I'm hoping to
populate channel lazily as a consumer does downstream processing."
(println "starting fetch...")
(let [
row-count (atom 0) ; For state on rows
db-connection (j/get-connection db-spec)
statement (j/prepare-statement
db-connection
sql {
:result-type :forward-only ;; you need this to be lazy
:fetch-size 3 ;; also this
:max-rows 0
:concurrency :read-only})
row-fn (fn[d] (do
(>!! ch d)
;; everything below is just for printing to stdout and
;; trying to understand where my non-lazy bottleneck is.
(swap! row-count inc)
(when (zero? (mod @row-count 5))
(do
#_(Thread/sleep 2000 )
(println "\tFetched " @row-count " rows.")
(flush)
))))]
(thread
(j/query db-connection [statement]
{:as-arrays? false
:result-set-fn vec
:row-fn row-fn
})
;; as producer we finished popluting the chan, now close in this same
;; thread.
(println "producer closing channel... (hopefully you have written rows by now...")
(async/close! ch))))
(defn chan->csv [ch csv-file ]
"With input channel ch and output file csv-file, read values off ch and write
to csv file in a separate thread."
(thread
(println "starting csv write...")
(def row-count (atom 0))
(with-open [^java.io.Writer writer (io/writer csv-file :append false :encoding "UTF-8")]
(while-let [data (<!! ch)]
(swap! row-count inc)
(csv/write-csv writer [data] :quote? (fn[x] false) )
(when (zero? (mod @row-count 2))
(do
#_(Thread/sleep 2000 )
(println "Wrote " @row-count " rows.")
(.flush writer)
(flush)))
))))
(def config {:db-spec {:classname "org.postgres.Driver"
:subprotocol "postgres"
:subname "//my-database-host:5432/mydb"
:user "me"
:password "****"}
:sql "select row_id, giant_xml_column::text as xml_column from public.big_toasty_table limit 50"})
;; main sorta thing
(do
(def ch (chan 1))
(db->chan ch config)
;; could pipeline with transducers/etc at some point.
(chan->csv ch "./test.csv"))
这是一些输出,其中包含我的评论,解释了 when/how 它的结果:
db-async-test.core-test>
;; this happens pretty quick when i run:
starting fetch...
starting csv write...
;; then it waits 30 seconds, and spits out all the output below... it's not
;; "streaming" through lazily?
Wrote 2 rows.
Fetched 5 rows.
Wrote 4 rows.
Wrote 6 rows.
Wrote 8 rows.
...clip...
Wrote 44 rows.
Wrote 46 rows.
Wrote 48 rows.
Fetched 50 rows.
producer closing channel... (hopefully you have written rows by now...
Wrote 50 rows.
好的,我想我有一些适合我的东西。
我的主要修复是换掉 org.clojure.java/jdbc
和
在我的 project.clj
中将其替换为 funcool/clojure.jdbc
。
funcool/clojure.jdbc
给我的是访问 result-set->lazy-seq 的权限。
新 ns
:
(ns db-async-test.core-test
(:require [jdbc.core :as j]
[while-let.core :refer [while-let]]
[clojure.java.io :as io]
[clojure.data.csv :as csv]
[clojure.core.async :as a :refer [>!! <!! chan thread]]
[clojure.string :as str]))
下面是相关代码。据我所知,这会通过异步通道流式传输内容。如果我有耐心,我应该能够使用它来玩转异步 reducers/transducers,并且希望能够处理相当大的数量(尤其是使用类似地图的函数)。
reader 线程的函数:
(defn db->chan [ch {:keys [sql db-spec]} ]
"Put db hash-maps onto ch."
(println "starting reader thread...")
(let [
row-count (atom 0) ; For state on rows
row-fn (fn[r] (do (>!! ch r)
;; everything below is just for printing to stdout
(swap! row-count inc)
(when (zero? (mod @row-count 100))
(println "Fetched " @row-count " rows."))))]
(with-open [conn (j/connection db-spec)]
(j/atomic conn
(with-open [cursor (j/fetch-lazy conn sql)]
(doseq [row (j/cursor->lazyseq cursor)]
(row-fn row)))))
(a/close! ch)))
作者线程函数:
(defn chan->csv [ch csv-file ]
"Read values off ch and write to csv file."
(println "starting writer thread...")
(def row-count (atom 0))
(with-open [^java.io.Writer writer (io/writer csv-file
:append false :encoding "UTF-8")]
(while-let [data (<!! ch)]
(swap! row-count inc)
(csv/write-csv writer [data] :quote? (fn[x] false) )
(when (zero? (mod @row-count 100))
(println "Wrote " @row-count " rows.")))))
我把 thread
部分放在下面而不是单独的函数中:
(def config {:db-spec {:subprotocol "postgresql"
:subname "//mydbhost:5432/mydb"
:user "me"
:password "*****"}
:sql "select row_id, giant_xml_value::text from some_table"})
(do
(def ch (chan 1))
(thread (db->chan ch config))
(thread (chan->csv ch "./test.csv")))
下面的输出,看起来两个线程都在同时工作,将数据流式传输到通道,然后从该通道弹出到 csv。
此外,即使 giant_xml_column
我的系统仍然没有使用大量内存。
starting fetch...
starting csv write...
Fetched 100 Wrote rows.
100
rows.
Fetched 200Wrote rows.200
rows.
Fetched 300 rows.
Wrote
...clip....
6000 rows.
Fetched 6100 rows.
Wrote 6100 rows.
Fetched 6200 rows.
Wrote 6200 rows.
Fetched 6300 rows.
Wrote
6300 rows.
Fetched 6400Wrote rows.6400
rows.
Fetched 6500 rows.Wrote
6500 rows.