如何测试 Elixir GenStage Consumer?

How to test the Elixir GenStage Consumer?

我找到了一些关于如何测试生产者的资源,但是我找不到任何显示如何测试消费者的资源。

在生产者中,我创建了一个虚拟消费者并且一切正常,但是在消费者中我正在努力测试。

defmodule DataProducer do
      use GenStage

      def start_link([]) do
        GenStage.start_link(__MODULE__, 0, name: __MODULE__)
      end

      # {:queue.new, demand, size}
      def init(counter) do
        {:producer, counter, dispatcher: GenStage.BroadcastDispatcher}
      end

      def handle_demand(demand, state) do 
        events = Enum.to_list(state..state + demand + 1)
        # Logger.info "demand is: #{inspect(demand)}, state is #{inspect(state)}"
        {:noreply, events, (state + demand)}
      end
    end

生产者测试:

 defmodule DataProducerTest do
      use ExUnit.Case

      test "check the results" do
        {:ok, stage} = DataProducer.start_link([])
        {:ok, _cons} = TestConsumer.start_link(stage)
        assert_receive {:received, events}
        GenStage.stop(stage)
      end

    end

    defmodule TestConsumer do
      def start_link(producer) do
        GenStage.start_link(__MODULE__, {producer, self()})
      end
      def init({producer, owner}) do
        {:consumer, owner, subscribe_to: [producer]}
      end
      def handle_events(events, _from, owner) do
        send(owner, {:received, events})
        {:noreply, [], owner}
      end
    end

消费者:

defmodule DataConsumer do
  use GenStage
  def start_link([]) do
    GenStage.start_link(__MODULE__, :any_state)
  end
  def init(state) do
    {:consumer, state, subscribe_to: [{DataProducer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
  end
  def handle_events(events, _from, state) do
    for event <- events do
      # :timer.sleep(250)
      Logger.info inspect( {self(), event, state} )
    end
    {:noreply, [], state}
  end
end

先谢谢你。

没有理由在这里使用 ex_mock。如果您让生产者成为您的消费者订阅这样的论点会容易得多:

defmodule DataConsumer do
  use GenStage

  def start_link(producer) do
    GenStage.start_link(__MODULE__, producer)
  end

  def init(producer) do
    {:consumer, state, subscribe_to: [{producer, selector: fn n -> n > 50 && n < 100 end, max_demand: 10}]}
  end
end

那么你可以 TestProducer:

defmodule TestProducer
  use GenStage

  def notify(pid, event) do
    GenServer.cast(pid, {:notify, event})
  end

  def start_link do
    GenStage.start_link(__MODULE__, :ok)
  end

  def init(:ok) do
    {:producer, :ok, dispatcher: GenStage.BroadcastDispatcher}
  end

  def handle_demand(_demand, state) do
    {:noreply, [], state}
  end

  def handle_cast({:notify, event}, state) do
    {:noreply, [event], state}
  end
end

并在您的测试中订阅它并断言预期结果:

defmodule DataConsumerTest do
  use ExUnit.Case

  test "consumes events" do
    {:ok, pid} = TestProducer.start_link()
    DataConsumer.start_link(pid)
    TestProducer.notify(%{data: :event_data})

    # assert thing you expected to happen happens
  end
end

TLDR; 如果您在代码库中与许多不同的消费者一起工作,那么 manual/test 事件生产者是必须的。消费者并不真正关心生产者为生产事件做了什么,只关心它可以订阅和消费它们。因此,您的测试只需要确保消费者能够从任何生产者接收事件,并且您可以向他们发送其在测试中寻找的正确事件。

消费者测试中:

 test "should behave like consumer" do
    {:ok, producer} = DummyProducer.start_link(1)
    {:ok, consumer} = Consumer.start_link(producer)
    Process.register self, :test
    assert_receive {:called_back, 10}
  end

现在DummyProducer

defmodule DummyProducer do
  use GenStage

  def start_link(demand) do
    GenStage.start_link(__MODULE__, demand)
  end

  def init(demand) do
    {:producer, demand}
  end

  def handle_demand(demand, counter) when demand > 0 do
    events = Enum.to_list(counter..counter+demand-1)
    Process.send_after(self(), {:stop, demand}, 1)
    {:noreply, events, demand + counter}
  end

  def handle_info({:stop, demand}, state) do
    send :test, {:called_back, demand}
    {:stop, :normal, demand}
  end
end

我觉得,

测试消费者的重点是检查消费者是否可以发送需求并坚持订阅中分配的最大需求。