clojure core.async 线程块
clojure core.async thread blocks
我有以下代码:
(defn -db-producer-factory [order-ids-chan next-chan]
(thread
(prn "db starting...")
(while true
(do
(prn "db starting2...")
;;
;; issue spot!
;; it stays blocked here-- the orderid doesnt come off the chan
;;
(let [order-id (<!! order-ids-chan)]
(prn "db->" order-id)
(condp = order-id
:finished (>!! next-chan :finished)
:>> (supress-w-nextexc
(->>
; get denorm'd order
(-> (r/-get-order :live order-id)
denorm/order->denormalized)
; put in a map to avoid nils
(hash-map :data)
(>!! next-chan)))))))))
(defn -stats-producer-factory [stats-db-chan next-chan]
(thread
(while true
(do
(let [msg (<!! stats-db-chan)
data (:data msg)]
(when data
(do
(prn "stats-> " (-> data :order :order-id))
(supress-w-nextexc
(q/stats-order-insert (-> data :order)))
(supress-w-nextexc
(q/stats-item-insert (-> data :items)))))
(>!! next-chan msg))))))
(defn -do-orderids [orderids]
(let [finished-chan (chan)
order-ids-chan (chan)
stats-db-chan (chan)
db-producer (-db-producer-factory order-ids-chan stats-db-chan)
stats-producer (-stats-producer-factory stats-db-chan finished-chan)]
(prn "pre-pub")
;; pub ids and finished message
(map #(>!! order-ids-chan %) (conj orderids :finished))
;; wait for finish
(prn "finished? " (<!! finished-chan))
;; allow time for finishing
;(Thread/sleep 3000)
;; close all chans
(map close! [finished-chan order-ids-chan stats-db-chan db-producer stats-producer])
))
该过程通过调用 -do-orderids
启动,例如 (-do-orderids [123])
。
执行的输出是:
"db starting..."
"pre-pub"
"db starting2..."
但随后它阻塞了。为什么它不在 "issue spot" 传递 orderid?
您的程序阻塞是因为 db-producer
在等待订单 ID 时被阻塞,实际上从未在 order-ids-chan
上收到订单 ID。
那是因为map
懒惰。所以,在这个调用中
(map #(>!! order-ids-chan %) (conj orderids :finished))
从未调用映射函数,也从未将订单 ID 放入渠道。
Clojure 经验法则:
Never use map
for side-effects! Use run!
instead.
我认为用 run!
替换该行中的 map
(以及您在频道中调用 close!
的最后一行)应该可以解决问题。
无关:您使用的所有do
形式都是多余的,您可以安全地删除它们并减少嵌套级别。
我有以下代码:
(defn -db-producer-factory [order-ids-chan next-chan]
(thread
(prn "db starting...")
(while true
(do
(prn "db starting2...")
;;
;; issue spot!
;; it stays blocked here-- the orderid doesnt come off the chan
;;
(let [order-id (<!! order-ids-chan)]
(prn "db->" order-id)
(condp = order-id
:finished (>!! next-chan :finished)
:>> (supress-w-nextexc
(->>
; get denorm'd order
(-> (r/-get-order :live order-id)
denorm/order->denormalized)
; put in a map to avoid nils
(hash-map :data)
(>!! next-chan)))))))))
(defn -stats-producer-factory [stats-db-chan next-chan]
(thread
(while true
(do
(let [msg (<!! stats-db-chan)
data (:data msg)]
(when data
(do
(prn "stats-> " (-> data :order :order-id))
(supress-w-nextexc
(q/stats-order-insert (-> data :order)))
(supress-w-nextexc
(q/stats-item-insert (-> data :items)))))
(>!! next-chan msg))))))
(defn -do-orderids [orderids]
(let [finished-chan (chan)
order-ids-chan (chan)
stats-db-chan (chan)
db-producer (-db-producer-factory order-ids-chan stats-db-chan)
stats-producer (-stats-producer-factory stats-db-chan finished-chan)]
(prn "pre-pub")
;; pub ids and finished message
(map #(>!! order-ids-chan %) (conj orderids :finished))
;; wait for finish
(prn "finished? " (<!! finished-chan))
;; allow time for finishing
;(Thread/sleep 3000)
;; close all chans
(map close! [finished-chan order-ids-chan stats-db-chan db-producer stats-producer])
))
该过程通过调用 -do-orderids
启动,例如 (-do-orderids [123])
。
执行的输出是:
"db starting..."
"pre-pub"
"db starting2..."
但随后它阻塞了。为什么它不在 "issue spot" 传递 orderid?
您的程序阻塞是因为 db-producer
在等待订单 ID 时被阻塞,实际上从未在 order-ids-chan
上收到订单 ID。
那是因为map
懒惰。所以,在这个调用中
(map #(>!! order-ids-chan %) (conj orderids :finished))
从未调用映射函数,也从未将订单 ID 放入渠道。
Clojure 经验法则:
Never use
map
for side-effects! Userun!
instead.
我认为用 run!
替换该行中的 map
(以及您在频道中调用 close!
的最后一行)应该可以解决问题。
无关:您使用的所有do
形式都是多余的,您可以安全地删除它们并减少嵌套级别。