如何最好地关闭 clojure core.async 进程管道
How to best shut down a clojure core.async pipeline of processes
我有一个 clojure 处理应用程序,它是一个通道管道。每个处理步骤都异步进行计算(即使用 http-kit 或其他东西发出 http 请求),并将结果放在输出通道上。这样下一步就可以从该通道读取并进行计算。
我的主要功能是这样的
(defn -main [args]
(-> file/tmp-dir
(schedule/scheduler)
(search/searcher)
(process/resultprocessor)
(buy/buyer)
(report/reporter)))
目前,调度程序步骤驱动管道(它没有输入通道),并为链提供工作负载。
当我在 REPL 中 运行 时:
(-main "some args")
由于调度程序的无限性,它基本上 运行 永远存在。更改此体系结构以便我可以从 REPL 关闭整个系统的最佳方法是什么?关闭每个通道是否意味着系统终止?
广播频道有帮助吗?
您可以让您的调度程序 alts!
/ alts!!
在终止通道和管道的输入通道上:
(def kill-channel (async/chan))
(defn scheduler [input output-ch kill-ch]
(loop []
(let [[v p] (async/alts!! [kill-ch [out-ch (preprocess input)]]
:priority true)]
(if-not (= p kill-ch)
(recur))))
在 kill-channel
上输入一个值将终止循环。
从技术上讲,您还可以使用 output-ch
来控制进程(放入关闭的通道 return false
),但我通常会发现显式终止通道更干净,至少对于顶部 -水平管道。
为了让事情同时更优雅和更方便地使用(在 REPL 和生产中),您可以使用 Stuart Sierra's component,启动调度程序循环(在单独的线程上)和 assoc
在组件的 start
方法中将 kill 通道连接到您的组件,然后在组件的 stop
方法中 close!
kill 通道(从而终止循环)。
我建议使用 https://github.com/stuartsierra/component 之类的东西来处理系统设置。它确保您可以轻松地在 REPL 中启动和停止系统。使用该库,您可以将其设置为每个处理步骤都是一个组件,并且每个组件将处理其 start
和 stop
协议中通道的设置和拆卸。此外,您可以为每个要实现的组件创建一个 IStream
协议,并让每个组件依赖于实现该协议的组件。它为您带来了一些非常简单的模块化。
您最终会得到一个如下所示的系统:
(component/system-map
:scheduler (schedule/new-scheduler file/tmp-dir)
:searcher (component/using (search/searcher)
{:in :scheduler})
:processor (component/using (process/resultprocessor)
{:in :searcher})
:buyer (component/using (buy/buyer)
{:in :processor})
:report (component/using (report/reporter)
{:in :buyer}))
这种方法的一个好处是,如果组件也依赖于通道,您可以轻松添加它们。例如,如果每个组件使用内部 mult
上的 tap
创建其输出通道,您可以仅通过将处理器作为依赖项的日志记录组件为处理器添加记录器。
:processor (component/using (process/resultprocessor)
{:in :searcher})
:processor-logger (component/using (log/logger)
{:in processor})
我建议您也观看他的 talk 以了解其工作原理。
您应该考虑使用 Stuart Sierra's reloaded workflow, which depends on modelling your 'pipeline' elements as components,这样您就可以将逻辑单例建模为 'classes',这意味着您可以控制每个单例的构建和销毁 (start/stop) 逻辑他们。
我有一个 clojure 处理应用程序,它是一个通道管道。每个处理步骤都异步进行计算(即使用 http-kit 或其他东西发出 http 请求),并将结果放在输出通道上。这样下一步就可以从该通道读取并进行计算。
我的主要功能是这样的
(defn -main [args]
(-> file/tmp-dir
(schedule/scheduler)
(search/searcher)
(process/resultprocessor)
(buy/buyer)
(report/reporter)))
目前,调度程序步骤驱动管道(它没有输入通道),并为链提供工作负载。
当我在 REPL 中 运行 时:
(-main "some args")
由于调度程序的无限性,它基本上 运行 永远存在。更改此体系结构以便我可以从 REPL 关闭整个系统的最佳方法是什么?关闭每个通道是否意味着系统终止?
广播频道有帮助吗?
您可以让您的调度程序 alts!
/ alts!!
在终止通道和管道的输入通道上:
(def kill-channel (async/chan))
(defn scheduler [input output-ch kill-ch]
(loop []
(let [[v p] (async/alts!! [kill-ch [out-ch (preprocess input)]]
:priority true)]
(if-not (= p kill-ch)
(recur))))
在 kill-channel
上输入一个值将终止循环。
从技术上讲,您还可以使用 output-ch
来控制进程(放入关闭的通道 return false
),但我通常会发现显式终止通道更干净,至少对于顶部 -水平管道。
为了让事情同时更优雅和更方便地使用(在 REPL 和生产中),您可以使用 Stuart Sierra's component,启动调度程序循环(在单独的线程上)和 assoc
在组件的 start
方法中将 kill 通道连接到您的组件,然后在组件的 stop
方法中 close!
kill 通道(从而终止循环)。
我建议使用 https://github.com/stuartsierra/component 之类的东西来处理系统设置。它确保您可以轻松地在 REPL 中启动和停止系统。使用该库,您可以将其设置为每个处理步骤都是一个组件,并且每个组件将处理其 start
和 stop
协议中通道的设置和拆卸。此外,您可以为每个要实现的组件创建一个 IStream
协议,并让每个组件依赖于实现该协议的组件。它为您带来了一些非常简单的模块化。
您最终会得到一个如下所示的系统:
(component/system-map
:scheduler (schedule/new-scheduler file/tmp-dir)
:searcher (component/using (search/searcher)
{:in :scheduler})
:processor (component/using (process/resultprocessor)
{:in :searcher})
:buyer (component/using (buy/buyer)
{:in :processor})
:report (component/using (report/reporter)
{:in :buyer}))
这种方法的一个好处是,如果组件也依赖于通道,您可以轻松添加它们。例如,如果每个组件使用内部 mult
上的 tap
创建其输出通道,您可以仅通过将处理器作为依赖项的日志记录组件为处理器添加记录器。
:processor (component/using (process/resultprocessor)
{:in :searcher})
:processor-logger (component/using (log/logger)
{:in processor})
我建议您也观看他的 talk 以了解其工作原理。
您应该考虑使用 Stuart Sierra's reloaded workflow, which depends on modelling your 'pipeline' elements as components,这样您就可以将逻辑单例建模为 'classes',这意味着您可以控制每个单例的构建和销毁 (start/stop) 逻辑他们。