在 Elixir 中测试异步代码
Testing asynchronous code in Elixir
我想测试一个正在使用 Task.async
的函数
为了让我的测试通过,我需要让它在断言之前休眠 100 毫秒,否则测试进程会在异步任务执行之前被杀死。
有没有更好的方法?
已编辑,添加代码示例:
我想测试的代码(大致):
def search(params) do
RateLimiter.rate_limit(fn ->
parsed_params = ExTwitter.Parser.parse_request_params(params)
json = ExTwitter.API.Base.request(:get, "1.1/search/tweets.json", parsed_params)
Task.async(fn -> process_search_output(json) end)
new_max_id(json)
end)
end
以及我已经编写的测试(仅适用于睡眠调用)
test "processes and store tweets" do
with_mock ExTwitter.API.Base, [request: fn(_,_,_) -> json_fixture end] do
with_mock TwitterRateLimiter, [rate_limit: fn(fun) -> fun.() end] do
TSearch.search([q: "my query"])
:timer.sleep(100)
# assertions
assert called TStore.store("some tweet from my fixtures")
assert called TStore.store("another one")
end
end
end
由于问题比较笼统,所以我在这里给出大致的答案。通常的技术是监视进程并等待 down 消息。像这样:
task = Task.async(fn -> "foo" end)
ref = Process.monitor(task.pid)
assert_receive {:DOWN, ^ref, :process, _, :normal}, 500
一些重要的事情:
元组的第五个元素是退出原因。我断言任务出口是 :normal
。如果您期望另一个出口,请相应地更改它。
assert_receive
中的第二个值是超时。考虑到您目前有 100 毫秒的睡眠时间,500 毫秒听起来很合理。
当我无法使用涉及assert_receive
的José方法时,我使用一个小帮手反复做断言/睡眠,直到断言通过或最终超时。
这是辅助模块
defmodule TimeHelper do
def wait_until(fun), do: wait_until(500, fun)
def wait_until(0, fun), do: fun.()
def wait_until(timeout, fun) defo
try do
fun.()
rescue
ExUnit.AssertionError ->
:timer.sleep(10)
wait_until(max(0, timeout - 10), fun)
end
end
end
在前面的例子中可以这样使用:
TSearch.search([q: "my query"])
wait_until fn ->
assert called TStore.store("some tweet from my fixtures")
assert called TStore.store("another one")
end
我想测试一个正在使用 Task.async
为了让我的测试通过,我需要让它在断言之前休眠 100 毫秒,否则测试进程会在异步任务执行之前被杀死。
有没有更好的方法?
已编辑,添加代码示例:
我想测试的代码(大致):
def search(params) do
RateLimiter.rate_limit(fn ->
parsed_params = ExTwitter.Parser.parse_request_params(params)
json = ExTwitter.API.Base.request(:get, "1.1/search/tweets.json", parsed_params)
Task.async(fn -> process_search_output(json) end)
new_max_id(json)
end)
end
以及我已经编写的测试(仅适用于睡眠调用)
test "processes and store tweets" do
with_mock ExTwitter.API.Base, [request: fn(_,_,_) -> json_fixture end] do
with_mock TwitterRateLimiter, [rate_limit: fn(fun) -> fun.() end] do
TSearch.search([q: "my query"])
:timer.sleep(100)
# assertions
assert called TStore.store("some tweet from my fixtures")
assert called TStore.store("another one")
end
end
end
由于问题比较笼统,所以我在这里给出大致的答案。通常的技术是监视进程并等待 down 消息。像这样:
task = Task.async(fn -> "foo" end)
ref = Process.monitor(task.pid)
assert_receive {:DOWN, ^ref, :process, _, :normal}, 500
一些重要的事情:
元组的第五个元素是退出原因。我断言任务出口是
:normal
。如果您期望另一个出口,请相应地更改它。assert_receive
中的第二个值是超时。考虑到您目前有 100 毫秒的睡眠时间,500 毫秒听起来很合理。
当我无法使用涉及assert_receive
的José方法时,我使用一个小帮手反复做断言/睡眠,直到断言通过或最终超时。
这是辅助模块
defmodule TimeHelper do
def wait_until(fun), do: wait_until(500, fun)
def wait_until(0, fun), do: fun.()
def wait_until(timeout, fun) defo
try do
fun.()
rescue
ExUnit.AssertionError ->
:timer.sleep(10)
wait_until(max(0, timeout - 10), fun)
end
end
end
在前面的例子中可以这样使用:
TSearch.search([q: "my query"])
wait_until fn ->
assert called TStore.store("some tweet from my fixtures")
assert called TStore.store("another one")
end