GenStage:在 GenServer 更新时重试 handle_demand
GenStage: retry handle_demand when GenServer updates
如果我的 GenStage 的 handle_demand/2
方法如下所示:
def handle_demand(demand, _state) when demand > 0 do
case Queue.dequeue do
nil ->
Logger.debug("Queue empty.")
{:noreply, [], []}
{job, updated_queue} -> {:noreply, job, updated_queue}
end
end
当我的 Queue
(一个 GenServer)是 changed/updated 时,如何将它转到 "rerun"?
我的队列模块是这样的:
defmodule Queue do
use GenServer
### client
def start_link(state \ []) do
GenServer.start_link(__MODULE__, state, name: __MODULE__)
end
def queue, do: GenServer.call(__MODULE__, :queue)
def enqueue(value), do: GenServer.cast(__MODULE__, {:enqueue, value})
def dequeue, do: GenServer.call(__MODULE__, :dequeue)
### server
def init(state), do: {:ok, state}
def handle_call(:dequeue, _from, [value | state]) do
{:reply, value, state}
end
def handle_call(:dequeue, _from, []), do: {:reply, nil, []}
def handle_call(:queue, _from, state), do: {:reply, state, state}
def handle_cast({:enqueue, value}, state) do
{:noreply, state ++ [value]}
end
end
当 Queue
发生变化时,为什么要“重新运行”它?这是对 GenStage
的严重滥用。它的发明是为了允许对抗 来自 Queue
的背压,反之亦然。在现实生活中,你要么根本不需要 GenStage
,要么你不想在 Queue
更新时“重新运行”需求,因为它迟早会通过 [=30 杀死它=].
当它处理来自队列的先前负载时,您可能有点“消费者”调用 handle_demand
。 GenStage
的回购有 four incredibly clear examples using different patterns to work with GenStage
. Besides that, there is a great intro to GenStage
in Elixir blog.
只需选择您需要的模式并从上面链接的资源中采用它。
如果我的 GenStage 的 handle_demand/2
方法如下所示:
def handle_demand(demand, _state) when demand > 0 do
case Queue.dequeue do
nil ->
Logger.debug("Queue empty.")
{:noreply, [], []}
{job, updated_queue} -> {:noreply, job, updated_queue}
end
end
当我的 Queue
(一个 GenServer)是 changed/updated 时,如何将它转到 "rerun"?
我的队列模块是这样的:
defmodule Queue do
use GenServer
### client
def start_link(state \ []) do
GenServer.start_link(__MODULE__, state, name: __MODULE__)
end
def queue, do: GenServer.call(__MODULE__, :queue)
def enqueue(value), do: GenServer.cast(__MODULE__, {:enqueue, value})
def dequeue, do: GenServer.call(__MODULE__, :dequeue)
### server
def init(state), do: {:ok, state}
def handle_call(:dequeue, _from, [value | state]) do
{:reply, value, state}
end
def handle_call(:dequeue, _from, []), do: {:reply, nil, []}
def handle_call(:queue, _from, state), do: {:reply, state, state}
def handle_cast({:enqueue, value}, state) do
{:noreply, state ++ [value]}
end
end
当 Queue
发生变化时,为什么要“重新运行”它?这是对 GenStage
的严重滥用。它的发明是为了允许对抗 来自 Queue
的背压,反之亦然。在现实生活中,你要么根本不需要 GenStage
,要么你不想在 Queue
更新时“重新运行”需求,因为它迟早会通过 [=30 杀死它=].
当它处理来自队列的先前负载时,您可能有点“消费者”调用 handle_demand
。 GenStage
的回购有 four incredibly clear examples using different patterns to work with GenStage
. Besides that, there is a great intro to GenStage
in Elixir blog.
只需选择您需要的模式并从上面链接的资源中采用它。