如何测试 Elixir GenStage Consumer?
How to test the Elixir GenStage Consumer?
我找到了一些关于如何测试生产者的资源,但是我找不到任何显示如何测试消费者的资源。
在生产者中,我创建了一个虚拟消费者并且一切正常,但是在消费者中我正在努力测试。
defmodule DataProducer do
use GenStage
def start_link([]) do
GenStage.start_link(__MODULE__, 0, name: __MODULE__)
end
# {:queue.new, demand, size}
def init(counter) do
{:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_demand(demand, state) do
events = Enum.to_list(state..state + demand + 1)
# Logger.info "demand is: #{inspect(demand)}, state is #{inspect(state)}"
{:noreply, events, (state + demand)}
end
end
生产者测试:
defmodule DataProducerTest do
use ExUnit.Case
test "check the results" do
{:ok, stage} = DataProducer.start_link([])
{:ok, _cons} = TestConsumer.start_link(stage)
assert_receive {:received, events}
GenStage.stop(stage)
end
end
defmodule TestConsumer do
def start_link(producer) do
GenStage.start_link(__MODULE__, {producer, self()})
end
def init({producer, owner}) do
{:consumer, owner, subscribe_to: [producer]}
end
def handle_events(events, _from, owner) do
send(owner, {:received, events})
{:noreply, [], owner}
end
end
消费者:
defmodule DataConsumer do
use GenStage
def start_link([]) do
GenStage.start_link(__MODULE__, :any_state)
end
def init(state) do
{:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
end
def handle_events(events, _from, state) do
for event <- events do
# :timer.sleep(250)
Logger.info inspect( {self(), event, state} )
end
{:noreply, [], state}
end
end
先谢谢你。
没有理由在这里使用 ex_mock
。如果您让生产者成为您的消费者订阅这样的论点会容易得多:
defmodule DataConsumer do
use GenStage
def start_link(producer) do
GenStage.start_link(__MODULE__, producer)
end
def init(producer) do
{:consumer, state, subscribe_to: [{producer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
end
end
那么你可以 TestProducer
:
defmodule TestProducer
use GenStage
def notify(pid, event) do
GenServer.cast(pid, {:notify, event})
end
def start_link do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
{:producer, :ok, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_demand(_demand, state) do
{:noreply, [], state}
end
def handle_cast({:notify, event}, state) do
{:noreply, [event], state}
end
end
并在您的测试中订阅它并断言预期结果:
defmodule DataConsumerTest do
use ExUnit.Case
test "consumes events" do
{:ok, pid} = TestProducer.start_link()
DataConsumer.start_link(pid)
TestProducer.notify(%{data: :event_data})
# assert thing you expected to happen happens
end
end
TLDR; 如果您在代码库中与许多不同的消费者一起工作,那么 manual/test 事件生产者是必须的。消费者并不真正关心生产者为生产事件做了什么,只关心它可以订阅和消费它们。因此,您的测试只需要确保消费者能够从任何生产者接收事件,并且您可以向他们发送其在测试中寻找的正确事件。
消费者测试中:
test "should behave like consumer" do
{:ok, producer} = DummyProducer.start_link(1)
{:ok, consumer} = Consumer.start_link(producer)
Process.register self, :test
assert_receive {:called_back, 10}
end
现在DummyProducer
defmodule DummyProducer do
use GenStage
def start_link(demand) do
GenStage.start_link(__MODULE__, demand)
end
def init(demand) do
{:producer, demand}
end
def handle_demand(demand, counter) when demand > 0 do
events = Enum.to_list(counter..counter+demand-1)
Process.send_after(self(), {:stop, demand}, 1)
{:noreply, events, demand + counter}
end
def handle_info({:stop, demand}, state) do
send :test, {:called_back, demand}
{:stop, :normal, demand}
end
end
我觉得,
测试消费者的重点是检查消费者是否可以发送需求并坚持订阅中分配的最大需求。
我找到了一些关于如何测试生产者的资源,但是我找不到任何显示如何测试消费者的资源。
在生产者中,我创建了一个虚拟消费者并且一切正常,但是在消费者中我正在努力测试。
defmodule DataProducer do
use GenStage
def start_link([]) do
GenStage.start_link(__MODULE__, 0, name: __MODULE__)
end
# {:queue.new, demand, size}
def init(counter) do
{:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_demand(demand, state) do
events = Enum.to_list(state..state + demand + 1)
# Logger.info "demand is: #{inspect(demand)}, state is #{inspect(state)}"
{:noreply, events, (state + demand)}
end
end
生产者测试:
defmodule DataProducerTest do
use ExUnit.Case
test "check the results" do
{:ok, stage} = DataProducer.start_link([])
{:ok, _cons} = TestConsumer.start_link(stage)
assert_receive {:received, events}
GenStage.stop(stage)
end
end
defmodule TestConsumer do
def start_link(producer) do
GenStage.start_link(__MODULE__, {producer, self()})
end
def init({producer, owner}) do
{:consumer, owner, subscribe_to: [producer]}
end
def handle_events(events, _from, owner) do
send(owner, {:received, events})
{:noreply, [], owner}
end
end
消费者:
defmodule DataConsumer do
use GenStage
def start_link([]) do
GenStage.start_link(__MODULE__, :any_state)
end
def init(state) do
{:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
end
def handle_events(events, _from, state) do
for event <- events do
# :timer.sleep(250)
Logger.info inspect( {self(), event, state} )
end
{:noreply, [], state}
end
end
先谢谢你。
没有理由在这里使用 ex_mock
。如果您让生产者成为您的消费者订阅这样的论点会容易得多:
defmodule DataConsumer do
use GenStage
def start_link(producer) do
GenStage.start_link(__MODULE__, producer)
end
def init(producer) do
{:consumer, state, subscribe_to: [{producer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
end
end
那么你可以 TestProducer
:
defmodule TestProducer
use GenStage
def notify(pid, event) do
GenServer.cast(pid, {:notify, event})
end
def start_link do
GenStage.start_link(__MODULE__, :ok)
end
def init(:ok) do
{:producer, :ok, dispatcher: GenStage.BroadcastDispatcher}
end
def handle_demand(_demand, state) do
{:noreply, [], state}
end
def handle_cast({:notify, event}, state) do
{:noreply, [event], state}
end
end
并在您的测试中订阅它并断言预期结果:
defmodule DataConsumerTest do
use ExUnit.Case
test "consumes events" do
{:ok, pid} = TestProducer.start_link()
DataConsumer.start_link(pid)
TestProducer.notify(%{data: :event_data})
# assert thing you expected to happen happens
end
end
TLDR; 如果您在代码库中与许多不同的消费者一起工作,那么 manual/test 事件生产者是必须的。消费者并不真正关心生产者为生产事件做了什么,只关心它可以订阅和消费它们。因此,您的测试只需要确保消费者能够从任何生产者接收事件,并且您可以向他们发送其在测试中寻找的正确事件。
消费者测试中:
test "should behave like consumer" do
{:ok, producer} = DummyProducer.start_link(1)
{:ok, consumer} = Consumer.start_link(producer)
Process.register self, :test
assert_receive {:called_back, 10}
end
现在DummyProducer
defmodule DummyProducer do
use GenStage
def start_link(demand) do
GenStage.start_link(__MODULE__, demand)
end
def init(demand) do
{:producer, demand}
end
def handle_demand(demand, counter) when demand > 0 do
events = Enum.to_list(counter..counter+demand-1)
Process.send_after(self(), {:stop, demand}, 1)
{:noreply, events, demand + counter}
end
def handle_info({:stop, demand}, state) do
send :test, {:called_back, demand}
{:stop, :normal, demand}
end
end
我觉得,
测试消费者的重点是检查消费者是否可以发送需求并坚持订阅中分配的最大需求。