GenStage:在 GenServer 更新时重试 handle_demand

GenStage: retry handle_demand when GenServer updates

如果我的 GenStage 的 handle_demand/2 方法如下所示:

def handle_demand(demand, _state) when demand > 0 do
  case Queue.dequeue do
    nil ->
      Logger.debug("Queue empty.")
      {:noreply, [], []}
    {job, updated_queue} -> {:noreply, job, updated_queue}
  end
end

当我的 Queue(一个 GenServer)是 changed/updated 时,如何将它转到 "rerun"?

我的队列模块是这样的:

defmodule Queue do
  use GenServer

  ### client

  def start_link(state \ []) do
    GenServer.start_link(__MODULE__, state, name: __MODULE__)
  end

  def queue, do: GenServer.call(__MODULE__, :queue)

  def enqueue(value), do: GenServer.cast(__MODULE__, {:enqueue, value})

  def dequeue, do: GenServer.call(__MODULE__, :dequeue)

  ### server

  def init(state), do: {:ok, state}

  def handle_call(:dequeue, _from, [value | state]) do
    {:reply, value, state}
  end

  def handle_call(:dequeue, _from, []), do: {:reply, nil, []}

  def handle_call(:queue, _from, state), do: {:reply, state, state}

  def handle_cast({:enqueue, value}, state) do
    {:noreply, state ++ [value]}
  end
end

Queue 发生变化时,为什么要“重新运行”它?这是对 GenStage 的严重滥用。它的发明是为了允许对抗 来自 Queue 的背压,反之亦然。在现实生活中,你要么根本不需要 GenStage,要么你不想在 Queue 更新时“重新运行”需求,因为它迟早会通过 [=30 杀死它=].

当它处理来自队列的先前负载时,您可能有点“消费者”调用 handle_demandGenStage 的回购有 four incredibly clear examples using different patterns to work with GenStage. Besides that, there is a great intro to GenStage in Elixir blog.

只需选择您需要的模式并从上面链接的资源中采用它。