我该如何改进这个 Clojure Component+async 示例?
How can I improve this Clojure Component+async example?
我想弄清楚如何最好地创建异步组件,或以组件友好的方式容纳异步代码。这是我能想到的最好的办法了,而且……感觉不太对劲。
The Gist:取词,uppercase
他们和 reverse
他们,最后 print
他们。
问题 1:我无法让 system
停在最后。我希望看到 println
个人 c-chan
停下来,但没有。
问题 2:如何正确注入 deps。进入 producer
/consumer
fns?我的意思是,它们不是组件,我认为它们应该 不是 组件,因为它们没有合理的生命周期。
问题 3:我如何惯用地处理 async/pipeline
- 创建名为 a>b
和 b>c
的副作用? pipeline
应该是一个组件吗?
(ns pipelines.core
(:require [clojure.core.async :as async
:refer [go >! <! chan pipeline-blocking close!]]
[com.stuartsierra.component :as component]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PIPELINES
(defn a>b [a> b>]
(pipeline-blocking 4
b>
(map clojure.string/upper-case)
a>))
(defn b>c [b> c>]
(pipeline-blocking 4
c>
(map (comp (partial apply str)
reverse))
b>))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PRODUCER / CONSUMER
(defn producer [a>]
(doseq [word ["apple" "banana" "carrot"]]
(go (>! a> word))))
(defn consumer [c>]
(go (while true
(println "Your Word Is: " (<! c>)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SYSTEM
(defn pipeline-system [config-options]
(let [c-chan (reify component/Lifecycle
(start [this]
(println "starting chan: " this)
(chan 1))
(stop [this]
(println "stopping chan: " this)
(close! this)))]
(-> (component/system-map
:a> c-chan
:b> c-chan
:c> c-chan)
(component/using {}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; RUN IT!
(def system (atom nil))
(let [_ (reset! system (component/start (pipeline-system {})))
_ (a>b (:a> @system) (:b> @system))
_ (b>c (:b> @system) (:c> @system))
_ (producer (:a> @system))
_ (consumer (:c> @system))
_ (component/stop @system)])
编辑:
我开始考虑以下问题,但我不太确定它是否正确关闭...
(extend-protocol component/Lifecycle
clojure.core.async.impl.channels.ManyToManyChannel
(start [this]
this)
(stop [this]
(close! this)))
我稍微重写了您的示例以使其 可重新加载:
可重载管道
(ns pipeline
(:require [clojure.core.async :as ca :refer [>! <!]]
[clojure.string :as s]))
(defn upverse [from to]
(ca/pipeline-blocking 4
to
(map (comp s/upper-case
s/reverse))
from))
(defn produce [ch xs]
(doseq [word xs]
(ca/go (>! ch word))))
(defn consume [ch]
(ca/go-loop []
(when-let [word (<! ch)]
(println "your word is:" word)
(recur))))
(defn start-engine []
(let [[from to] [(ca/chan) (ca/chan)]]
(upverse to from)
(consume from)
{:stop (fn []
(ca/close! to)
(ca/close! from)
(println "engine is stopped"))
:process (partial produce to)}))
这样你就可以(start-engine)
并用它来处理单词序列:
REPL 时间
boot.user=> (require '[pipeline])
boot.user=> (def engine (pipeline/start-engine))
#'boot.user/engine
运行它
boot.user=> ((engine :process) ["apple" "banana" "carrot"])
your word is: TORRAC
your word is: ANANAB
your word is: ELPPA
boot.user=> ((engine :process) ["do" "what" "makes" "sense"])
your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW
停止它
boot.user=> ((:stop engine))
engine is stopped
;; engine would not process anymore
boot.user=> ((engine :process) ["apple" "banana" "carrot"])
nil
状态管理
根据您打算如何使用此管道,可能根本不需要像 Component 这样的状态管理框架:无需添加任何内容"just in case",在这种情况下启动和停止管道是一个问题调用两个函数。
但是,如果在具有更多状态的更大应用程序中使用此管道,您绝对可以从状态管理库中受益。
我是 not a fan of Component 主要是因为它需要完整的应用程序购买(这使它成为一个 框架 ),但我确实尊重使用它的其他人。
坐骑
如果应用很小,我建议不要使用任何特定的东西:例如,您可以将此管道与其他管道/逻辑组合在一起,然后从 -main
开始,但如果应用是任何更大的和有更多不相关的状态,你需要做的就是向它添加 mount:
(defstate engine :start (start-engine)
:stop ((:stop engine)))
开始管道
boot.user=> (mount/start)
{:started ["#'pipeline/engine"]}
运行它
boot.user=> ((engine :process) ["do" "what" "makes" "sense"])
your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW
停止
boot.user=> (mount/stop)
engine is stopped
{:stopped ["#'pipeline/engine"]}
这是一个 gist with a full example,其中包括 build.boot
。
您可以通过 boot repl
下载并使用它
[编辑]:回答评论
如果您已经对 Component 着迷,这应该可以帮助您入门:
(defrecord WordEngine []
component/Lifecycle
(start [component]
(merge component (start-engine)))
(stop [component]
((:stop component))
(assoc component :process nil :stop nil)))
这将在开始时创建一个 WordEngine
对象,该对象将具有 :process
方法 .
您将无法像调用普通的 Clojure 函数那样调用它:即从 REPL 或任何名称空间仅通过 :require
调用它,除非您传递对整个系统的引用不推荐。
所以为了调用它,这个 WordEngine
需要插入一个组件系统,并注入另一个组件,然后可以解构 :process
函数并调用它。
我想弄清楚如何最好地创建异步组件,或以组件友好的方式容纳异步代码。这是我能想到的最好的办法了,而且……感觉不太对劲。
The Gist:取词,uppercase
他们和 reverse
他们,最后 print
他们。
问题 1:我无法让 system
停在最后。我希望看到 println
个人 c-chan
停下来,但没有。
问题 2:如何正确注入 deps。进入 producer
/consumer
fns?我的意思是,它们不是组件,我认为它们应该 不是 组件,因为它们没有合理的生命周期。
问题 3:我如何惯用地处理 async/pipeline
- 创建名为 a>b
和 b>c
的副作用? pipeline
应该是一个组件吗?
(ns pipelines.core
(:require [clojure.core.async :as async
:refer [go >! <! chan pipeline-blocking close!]]
[com.stuartsierra.component :as component]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PIPELINES
(defn a>b [a> b>]
(pipeline-blocking 4
b>
(map clojure.string/upper-case)
a>))
(defn b>c [b> c>]
(pipeline-blocking 4
c>
(map (comp (partial apply str)
reverse))
b>))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PRODUCER / CONSUMER
(defn producer [a>]
(doseq [word ["apple" "banana" "carrot"]]
(go (>! a> word))))
(defn consumer [c>]
(go (while true
(println "Your Word Is: " (<! c>)))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; SYSTEM
(defn pipeline-system [config-options]
(let [c-chan (reify component/Lifecycle
(start [this]
(println "starting chan: " this)
(chan 1))
(stop [this]
(println "stopping chan: " this)
(close! this)))]
(-> (component/system-map
:a> c-chan
:b> c-chan
:c> c-chan)
(component/using {}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; RUN IT!
(def system (atom nil))
(let [_ (reset! system (component/start (pipeline-system {})))
_ (a>b (:a> @system) (:b> @system))
_ (b>c (:b> @system) (:c> @system))
_ (producer (:a> @system))
_ (consumer (:c> @system))
_ (component/stop @system)])
编辑:
我开始考虑以下问题,但我不太确定它是否正确关闭...
(extend-protocol component/Lifecycle
clojure.core.async.impl.channels.ManyToManyChannel
(start [this]
this)
(stop [this]
(close! this)))
我稍微重写了您的示例以使其 可重新加载:
可重载管道
(ns pipeline
(:require [clojure.core.async :as ca :refer [>! <!]]
[clojure.string :as s]))
(defn upverse [from to]
(ca/pipeline-blocking 4
to
(map (comp s/upper-case
s/reverse))
from))
(defn produce [ch xs]
(doseq [word xs]
(ca/go (>! ch word))))
(defn consume [ch]
(ca/go-loop []
(when-let [word (<! ch)]
(println "your word is:" word)
(recur))))
(defn start-engine []
(let [[from to] [(ca/chan) (ca/chan)]]
(upverse to from)
(consume from)
{:stop (fn []
(ca/close! to)
(ca/close! from)
(println "engine is stopped"))
:process (partial produce to)}))
这样你就可以(start-engine)
并用它来处理单词序列:
REPL 时间
boot.user=> (require '[pipeline])
boot.user=> (def engine (pipeline/start-engine))
#'boot.user/engine
运行它
boot.user=> ((engine :process) ["apple" "banana" "carrot"])
your word is: TORRAC
your word is: ANANAB
your word is: ELPPA
boot.user=> ((engine :process) ["do" "what" "makes" "sense"])
your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW
停止它
boot.user=> ((:stop engine))
engine is stopped
;; engine would not process anymore
boot.user=> ((engine :process) ["apple" "banana" "carrot"])
nil
状态管理
根据您打算如何使用此管道,可能根本不需要像 Component 这样的状态管理框架:无需添加任何内容"just in case",在这种情况下启动和停止管道是一个问题调用两个函数。
但是,如果在具有更多状态的更大应用程序中使用此管道,您绝对可以从状态管理库中受益。
我是 not a fan of Component 主要是因为它需要完整的应用程序购买(这使它成为一个 框架 ),但我确实尊重使用它的其他人。
坐骑
如果应用很小,我建议不要使用任何特定的东西:例如,您可以将此管道与其他管道/逻辑组合在一起,然后从 -main
开始,但如果应用是任何更大的和有更多不相关的状态,你需要做的就是向它添加 mount:
(defstate engine :start (start-engine)
:stop ((:stop engine)))
开始管道
boot.user=> (mount/start)
{:started ["#'pipeline/engine"]}
运行它
boot.user=> ((engine :process) ["do" "what" "makes" "sense"])
your word is: OD
your word is: SEKAM
your word is: ESNES
your word is: TAHW
停止
boot.user=> (mount/stop)
engine is stopped
{:stopped ["#'pipeline/engine"]}
这是一个 gist with a full example,其中包括 build.boot
。
您可以通过 boot repl
[编辑]:回答评论
如果您已经对 Component 着迷,这应该可以帮助您入门:
(defrecord WordEngine []
component/Lifecycle
(start [component]
(merge component (start-engine)))
(stop [component]
((:stop component))
(assoc component :process nil :stop nil)))
这将在开始时创建一个 WordEngine
对象,该对象将具有 :process
方法 .
您将无法像调用普通的 Clojure 函数那样调用它:即从 REPL 或任何名称空间仅通过 :require
调用它,除非您传递对整个系统的引用不推荐。
所以为了调用它,这个 WordEngine
需要插入一个组件系统,并注入另一个组件,然后可以解构 :process
函数并调用它。