我该如何改进这个 Clojure Component+async 示例?

How can I improve this Clojure Component+async example?

我想弄清楚如何最好地创建异步组件,或以组件友好的方式容纳异步代码。这是我能想到的最好的办法了,而且……感觉不太对劲。

The Gist:取词,uppercase 他们和 reverse 他们,最后 print 他们。

问题 1:我无法让 system 停在最后。我希望看到 println 个人 c-chan 停下来,但没有。

问题 2:如何正确注入 deps。进入 producer/consumer fns?我的意思是,它们不是组件,我认为它们应该 不是 组件,因为它们没有合理的生命周期。

问题 3:我如何惯用地处理 async/pipeline - 创建名为 a>bb>c 的副作用? pipeline 应该是一个组件吗?

(ns pipelines.core
  (:require [clojure.core.async :as async
             :refer [go >! <! chan pipeline-blocking close!]]
            [com.stuartsierra.component :as component]))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PIPELINES
(defn a>b [a> b>]
  (pipeline-blocking 4
                     b>
                     (map clojure.string/upper-case)
                     a>))
(defn b>c [b> c>]
  (pipeline-blocking 4
                     c>
                     (map (comp (partial apply str)
                                reverse))
                     b>))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PRODUCER / CONSUMER
(defn producer [a>]
  (doseq [word ["apple" "banana" "carrot"]]
    (go (>! a> word))))

(defn consumer [c>]
  (go (while true
        (println "Your Word Is: " (<! c>)))))



;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SYSTEM
(defn pipeline-system [config-options]
  (let [c-chan (reify component/Lifecycle
                 (start [this]
                   (println "starting chan: " this)
                   (chan 1))
                 (stop [this]
                   (println "stopping chan: " this)
                   (close! this)))]
    (-> (component/system-map
         :a> c-chan
         :b> c-chan
         :c> c-chan)
        (component/using {}))))


;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; RUN IT!
(def system (atom nil))
(let [_      (reset! system (component/start (pipeline-system {})))
      _      (a>b (:a> @system) (:b> @system))
      _      (b>c (:b> @system) (:c> @system))
      _      (producer (:a> @system))
      _      (consumer (:c> @system))
      _      (component/stop @system)])

编辑:

我开始考虑以下问题,但我不太确定它是否正确关闭...

(extend-protocol component/Lifecycle
  clojure.core.async.impl.channels.ManyToManyChannel
  (start [this]
    this)
  (stop [this]
    (close! this)))

我稍微重写了您的示例以使其 可重新加载:

可重载管道

(ns pipeline
  (:require [clojure.core.async :as ca :refer [>! <!]]
            [clojure.string :as s]))

(defn upverse [from to]
  (ca/pipeline-blocking 4
                        to
                        (map (comp s/upper-case
                                   s/reverse))
                        from))
(defn produce [ch xs]
  (doseq [word xs]
    (ca/go (>! ch word))))

(defn consume [ch]
  (ca/go-loop []
              (when-let [word (<! ch)]
                (println "your word is:" word)
                (recur))))

(defn start-engine []
  (let [[from to] [(ca/chan) (ca/chan)]]
    (upverse to from)
    (consume from)
    {:stop (fn []
             (ca/close! to)
             (ca/close! from)
             (println "engine is stopped"))
     :process (partial produce to)}))

这样你就可以(start-engine)并用它来处理单词序列:

REPL 时间

boot.user=> (require '[pipeline])

boot.user=> (def engine (pipeline/start-engine))
#'boot.user/engine

运行它

boot.user=> ((engine :process) ["apple" "banana" "carrot"])

your word is: TORRAC
your word is: ANANAB
your word is: ELPPA

boot.user=> ((engine :process) ["do" "what" "makes" "sense"])

your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW

停止它

boot.user=> ((:stop engine))
engine is stopped

;; engine would not process anymore
boot.user=> ((engine :process) ["apple" "banana" "carrot"])
nil

状态管理

根据您打算如何使用此管道,可能根本不需要像 Component 这样的状态管理框架:无需添加任何内容"just in case",在这种情况下启动和停止管道是一个问题调用两个函数。

但是,如果在具有更多状态的更大应用程序中使用此管道,您绝对可以从状态管理库中受益。

我是 not a fan of Component 主要是因为它需要完整的应用程序购买(这使它成为一个 框架 ),但我确实尊重使用它的其他人。

坐骑

如果应用很小,我建议不要使用任何特定的东西:例如,您可以将此管道与其他管道/逻辑组合在一起,然后从 -main 开始,但如果应用是任何更大的和有更多不相关的状态,你需要做的就是向它添加 mount

(defstate engine :start (start-engine)
                 :stop ((:stop engine)))

开始管道

boot.user=> (mount/start)
{:started ["#'pipeline/engine"]}

运行它

boot.user=> ((engine :process) ["do" "what" "makes" "sense"])

your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW

停止

boot.user=> (mount/stop)
engine is stopped
{:stopped ["#'pipeline/engine"]}

这是一个 gist with a full example,其中包括 build.boot

您可以通过 boot repl

下载并使用它

[编辑]:回答评论

如果您已经对 Component 着迷,这应该可以帮助您入门:

(defrecord WordEngine []
  component/Lifecycle

  (start [component]
    (merge component (start-engine)))

  (stop [component]
    ((:stop component))
    (assoc component :process nil :stop nil)))

这将在开始时创建一个 WordEngine 对象,该对象将具有 :process 方法 .

您将无法像调用普通的 Clojure 函数那样调用它:即从 REPL 或任何名称空间仅通过 :require 调用它,除非您传递对整个系统的引用不推荐。

所以为了调用它,这个 WordEngine 需要插入一个组件系统,并注入另一个组件,然后可以解构 :process 函数并调用它。