如何制作一个以特定速率处理消息的 GenServer? (每 n 秒)

How do I make a GenServer that processes messages at a specific rate? (every n seconds)

我的一项服务与速率受限的外部 API 通信,因此我想确保每 10 秒发送不超过 1 个调用。

我天真的做法是使用长运行 API 服务,并在每次调用后超时:

def handle_cast({:call_api, data}, state) do
  send_to_external_api(data)
  :timer.sleep(10000)
  {:noreply, state}
end

我不确定是否有正确的方法。

编辑:原始解决方案在 10 秒滴答之间丢弃消息,如 所建议。编辑提供了更合适的解决方案。


编辑

由于 GenServer handle_* 函数实际上并不从队列接收消息,而只是处理它们,我们不能利用模式匹配从进程队列中选择性地每隔 10 秒接收一次消息。

因此,由于我们按照消息到达的顺序接收消息,因此我们需要将内部队列作为 GenServer 状态的一部分。

defmodule RateLimited do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, %{queue: []})
  end

  def init(state) do
    allow_work()
    {:ok, state}
  end

  def handle_cast({:call_api, data}, %{"queue" => queue} = state) do
    {:noreply, %{state | queue: queue ++ [data]}}
  end

  def handle_info(:work, %{"queue" => [data | queue]} = state) do
      send_to_external_api(data)
    allow_work()

    {:noreply, %{state | queue: queue}}
  end

  defp allow_work() do
    Process.send_after(self(), :work, 10000) # after 10s
  end

  defp send_to_external_api (data) do end
end

所以我们只是将消息从进程队列移动到状态队列,当我们向自己发出 10 秒已经过去的信号时,我们处理头部。

但最后,我们实际上达到了让进程休眠 10 秒相同的结果。您的解决方案似乎更简单并且达到了相同的结果。


解决方案基于

首先,让您的 GenServer 在其状态 (work = true/false) 中存储一个标志。

然后让GenServer在可以工作的时候使用Process.send_after来通知自己。您在 handle_info 中收到信号,您将 work 状态标志设置为 true

现在请注意 handle_cast 函数中状态的模式匹配:它只会在 work 状态标志等于 true 时接收消息。否则,消息将被放入队列中等待。

并且在您将消息发送到外部服务后,您再次 运行 Process.send_after 安排下一个信号和 return 状态,其中 work 标志设置为 false 以防止立即接收下一条消息。

defmodule RateLimited do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, %{work: false})
  end

  def init(state) do
    allow_work()
    {:ok, state}
  end

  def handle_cast({:call_api, data}, %{"work" => true} = state) do
    send_to_external_api(data)
    allow_work()
    {:noreply, %{state | work = false}}
  end

  def handle_info(:work, state) do
    {:noreply, %{state | work = true}}
  end

  defp allow_work() do
    Process.send_after(self(), :work, 10000) # after 10s
  end
end