clojure core.async 线程块

clojure core.async thread blocks

我有以下代码:

(defn -db-producer-factory [order-ids-chan next-chan]
  (thread
    (prn "db starting...")
    (while true
      (do
        (prn "db starting2...")
        ;;
        ;; issue spot!
        ;; it stays blocked here-- the orderid doesnt come off the chan
        ;;
        (let [order-id (<!! order-ids-chan)]
          (prn "db->" order-id)
          (condp = order-id
            :finished (>!! next-chan :finished)
            :>> (supress-w-nextexc
                  (->>
                    ; get denorm'd order
                    (-> (r/-get-order :live order-id)
                        denorm/order->denormalized)
                    ; put in a map to avoid nils
                    (hash-map :data)
                    (>!! next-chan)))))))))

(defn -stats-producer-factory [stats-db-chan next-chan]
  (thread
    (while true
      (do
        (let [msg (<!! stats-db-chan)
              data (:data msg)]
          (when data
            (do
              (prn "stats-> " (-> data :order :order-id))
              (supress-w-nextexc
                (q/stats-order-insert (-> data :order)))
              (supress-w-nextexc
                (q/stats-item-insert (-> data :items)))))
          (>!! next-chan msg))))))

(defn -do-orderids [orderids]
  (let [finished-chan (chan)
        order-ids-chan (chan)
        stats-db-chan (chan)
        db-producer (-db-producer-factory order-ids-chan stats-db-chan)
        stats-producer (-stats-producer-factory stats-db-chan finished-chan)]

    (prn "pre-pub")
    ;; pub ids and finished message
    (map #(>!! order-ids-chan %) (conj orderids :finished))
    ;; wait for finish
    (prn "finished? " (<!! finished-chan))
    ;; allow time for finishing
    ;(Thread/sleep 3000)
    ;; close all chans
    (map close! [finished-chan order-ids-chan stats-db-chan db-producer stats-producer])
    ))

该过程通过调用 -do-orderids 启动,例如 (-do-orderids [123])

执行的输出是:

"db starting..."
"pre-pub"
"db starting2..."

但随后它阻塞了。为什么它不在 "issue spot" 传递 orderid?

您的程序阻塞是因为 db-producer 在等待订单 ID 时被阻塞,实际上从未在 order-ids-chan 上收到订单 ID。

那是因为map懒惰。所以,在这个调用中

(map #(>!! order-ids-chan %) (conj orderids :finished))

从未调用映射函数,也从未将订单 ID 放入渠道。

Clojure 经验法则:

Never use map for side-effects! Use run! instead.

我认为用 run! 替换该行中的 map(以及您在频道中调用 close! 的最后一行)应该可以解决问题。

无关:您使用的所有do形式都是多余的,您可以安全地删除它们并减少嵌套级别。