使用 clojure core.async 管道处理错误
Handling errors with clojure core.async pipeline
我试图了解使用核心处理错误的正确方法是什么。async/pipeline,我的管道如下:
input --> xf-run-computation --> first-out
first-out --> xf-run-computation --> last-out
其中 xf-run-computation
将执行 http 调用和 return 响应。但是,其中一些响应会 return 出错。处理这些错误的最佳方法是什么?
我的解决方案是将 success-values
和 error-values
中的输出通道拆分,然后将它们合并回一个通道:
(let [[success-values1 error-values1] (split fn-to-split first-out)
[success-values2 error-values2] (split fn-to-split last-out)
errors (merge [error-values1 error-values2])]
(pipeline 4 first-out xf-run-computation input)
(pipeline 4 last-out xf-run-computation success-values1)
[last-out errors])
所以我的函数将 return 最后的结果和错误。
这是一个按照您的建议执行的示例。从(range 10)
开始先过滤掉5的倍数,再过滤掉3的倍数。
(ns tst.clj.core
(:use clj.core
clojure.test )
(:require
[clojure.core.async :as async]
[clojure.string :as str]
)
)
(defn err-3 [x]
"'fail' for multiples of 3"
(if (zero? (mod x 3))
(+ x 300) ; error case
x)) ; non-error
(defn err-5 [x]
"'fail' for multiples of 5"
(if (zero? (mod x 5))
(+ x 500) ; error case
x)) ; non-error
(defn is-ok?
"Returns true if the value is not 'in error' (>=100)"
[x]
(< x 100))
(def ch-0 (async/to-chan (range 10)))
(def ch-1 (async/chan 99))
(def ch-2 (async/chan 99))
(deftest t-2
(let [
_ (async/pipeline 1 ch-1 (map err-5) ch-0)
[ok-chan-1 fail-chan-1] (async/split is-ok? ch-1 99 99)
_ (async/pipeline 1 ch-2 (map err-3) ok-chan-1)
[ok-chan-2 fail-chan-2] (async/split is-ok? ch-2 99 99)
ok-vec-2 (async/<!! (async/into [] ok-chan-2))
fail-vec-1 (async/<!! (async/into [] fail-chan-1))
fail-vec-2 (async/<!! (async/into [] fail-chan-2))
]
(is (= ok-vec-2 [1 2 4 7 8]))
(is (= fail-vec-1 [500 505]))
(is (= fail-vec-2 [303 306 309]))))
而不是 return 错误,我可能会在检测到它们后立即记录它们,然后忘记它们。
一般来说,什么是"the"正确的方法可能取决于您的应用需求,但是根据您的问题描述,我认为您需要考虑三件事:
xf-run-computation
returns 您的业务逻辑将视为错误的数据,
xf-run-computation
抛出异常并且
- 鉴于涉及到 http 调用,
xf-run-computation
中的一些 运行 可能永远不会完成(或未及时完成)。
关于第 3 点,您首先应该考虑的是使用 pipeline-blocking
而不是 pipeline
。
我认为你的问题主要与第 1 点有关。基本思想是 xf-run-computation
的结果需要 return 一个数据结构(比如映射或记录),这很清楚将结果标记为错误或成功,例如{:title nil :body nil :status "error"}
。这会给你一些处理这种情况的选择:
您以后的所有代码都会忽略具有 :status "error"
的输入数据。即,您的 xf-run-computation
将包含类似 (when (not (= (:status input) "error")) (run-computation input))
、
的行
您可以 运行 根据需要对 pipeline
调用和 filter
之间的所有结果进行过滤(请注意 filter
也可以用作管道中的换能器,从而消除了 core.async),
的旧 filter>
和 filter<
功能
你按照你的建议使用 async/split
/ Alan Thompson 在他的回答中显示将错误值过滤到一个单独的错误通道。如果您无论如何都要合并这些值,则没有真正需要为您的第二个管道设置第二个错误通道,您可以简单地重新使用您的错误通道。
对于第 2 点,问题是 xf-run-computation
中的任何异常都发生在另一个线程中,并且不会简单地传播回您的调用代码。但是您可以使用 pipeline
(和 pipeline-blocking
)的 ex-handler
参数。您可以简单地过滤掉所有异常,将结果放在单独的异常通道上,或者尝试捕获它们并将它们变成错误(可能将它们放回结果或另一个错误通道)——后者只有在exception 为您提供了足够的信息,例如一个 id 或允许将异常与导致异常的输入联系起来的东西。您可以在 xf-run-computation
中安排此操作(即 catch
从第三方库抛出的任何异常,如 http 调用)。
对于第 3 点,core.async 中的规范答案是指向 timeout
频道,但这对于 pipeline
没有多大意义。一个更好的主意是确保在您的 http 调用上设置超时,例如http-kit 的 :timeout
选项或 clj-http 的 :socket-timeout
和 :conn-timeout
选项。请注意,这些选项通常会导致超时异常。
我试图了解使用核心处理错误的正确方法是什么。async/pipeline,我的管道如下:
input --> xf-run-computation --> first-out
first-out --> xf-run-computation --> last-out
其中 xf-run-computation
将执行 http 调用和 return 响应。但是,其中一些响应会 return 出错。处理这些错误的最佳方法是什么?
我的解决方案是将 success-values
和 error-values
中的输出通道拆分,然后将它们合并回一个通道:
(let [[success-values1 error-values1] (split fn-to-split first-out)
[success-values2 error-values2] (split fn-to-split last-out)
errors (merge [error-values1 error-values2])]
(pipeline 4 first-out xf-run-computation input)
(pipeline 4 last-out xf-run-computation success-values1)
[last-out errors])
所以我的函数将 return 最后的结果和错误。
这是一个按照您的建议执行的示例。从(range 10)
开始先过滤掉5的倍数,再过滤掉3的倍数。
(ns tst.clj.core
(:use clj.core
clojure.test )
(:require
[clojure.core.async :as async]
[clojure.string :as str]
)
)
(defn err-3 [x]
"'fail' for multiples of 3"
(if (zero? (mod x 3))
(+ x 300) ; error case
x)) ; non-error
(defn err-5 [x]
"'fail' for multiples of 5"
(if (zero? (mod x 5))
(+ x 500) ; error case
x)) ; non-error
(defn is-ok?
"Returns true if the value is not 'in error' (>=100)"
[x]
(< x 100))
(def ch-0 (async/to-chan (range 10)))
(def ch-1 (async/chan 99))
(def ch-2 (async/chan 99))
(deftest t-2
(let [
_ (async/pipeline 1 ch-1 (map err-5) ch-0)
[ok-chan-1 fail-chan-1] (async/split is-ok? ch-1 99 99)
_ (async/pipeline 1 ch-2 (map err-3) ok-chan-1)
[ok-chan-2 fail-chan-2] (async/split is-ok? ch-2 99 99)
ok-vec-2 (async/<!! (async/into [] ok-chan-2))
fail-vec-1 (async/<!! (async/into [] fail-chan-1))
fail-vec-2 (async/<!! (async/into [] fail-chan-2))
]
(is (= ok-vec-2 [1 2 4 7 8]))
(is (= fail-vec-1 [500 505]))
(is (= fail-vec-2 [303 306 309]))))
而不是 return 错误,我可能会在检测到它们后立即记录它们,然后忘记它们。
一般来说,什么是"the"正确的方法可能取决于您的应用需求,但是根据您的问题描述,我认为您需要考虑三件事:
xf-run-computation
returns 您的业务逻辑将视为错误的数据,xf-run-computation
抛出异常并且- 鉴于涉及到 http 调用,
xf-run-computation
中的一些 运行 可能永远不会完成(或未及时完成)。
关于第 3 点,您首先应该考虑的是使用 pipeline-blocking
而不是 pipeline
。
我认为你的问题主要与第 1 点有关。基本思想是 xf-run-computation
的结果需要 return 一个数据结构(比如映射或记录),这很清楚将结果标记为错误或成功,例如{:title nil :body nil :status "error"}
。这会给你一些处理这种情况的选择:
您以后的所有代码都会忽略具有
:status "error"
的输入数据。即,您的xf-run-computation
将包含类似(when (not (= (:status input) "error")) (run-computation input))
、 的行
您可以 运行 根据需要对
pipeline
调用和filter
之间的所有结果进行过滤(请注意filter
也可以用作管道中的换能器,从而消除了 core.async), 的旧 你按照你的建议使用
async/split
/ Alan Thompson 在他的回答中显示将错误值过滤到一个单独的错误通道。如果您无论如何都要合并这些值,则没有真正需要为您的第二个管道设置第二个错误通道,您可以简单地重新使用您的错误通道。
filter>
和 filter<
功能
对于第 2 点,问题是 xf-run-computation
中的任何异常都发生在另一个线程中,并且不会简单地传播回您的调用代码。但是您可以使用 pipeline
(和 pipeline-blocking
)的 ex-handler
参数。您可以简单地过滤掉所有异常,将结果放在单独的异常通道上,或者尝试捕获它们并将它们变成错误(可能将它们放回结果或另一个错误通道)——后者只有在exception 为您提供了足够的信息,例如一个 id 或允许将异常与导致异常的输入联系起来的东西。您可以在 xf-run-computation
中安排此操作(即 catch
从第三方库抛出的任何异常,如 http 调用)。
对于第 3 点,core.async 中的规范答案是指向 timeout
频道,但这对于 pipeline
没有多大意义。一个更好的主意是确保在您的 http 调用上设置超时,例如http-kit 的 :timeout
选项或 clj-http 的 :socket-timeout
和 :conn-timeout
选项。请注意,这些选项通常会导致超时异常。