如何制作一个以特定速率处理消息的 GenServer? (每 n 秒)
How do I make a GenServer that processes messages at a specific rate? (every n seconds)
我的一项服务与速率受限的外部 API 通信,因此我想确保每 10 秒发送不超过 1 个调用。
我天真的做法是使用长运行 API 服务,并在每次调用后超时:
def handle_cast({:call_api, data}, state) do
send_to_external_api(data)
:timer.sleep(10000)
{:noreply, state}
end
我不确定是否有正确的方法。
编辑:原始解决方案在 10 秒滴答之间丢弃消息,如 所建议。编辑提供了更合适的解决方案。
编辑
由于 GenServer handle_* 函数实际上并不从队列接收消息,而只是处理它们,我们不能利用模式匹配从进程队列中选择性地每隔 10 秒接收一次消息。
因此,由于我们按照消息到达的顺序接收消息,因此我们需要将内部队列作为 GenServer 状态的一部分。
defmodule RateLimited do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, %{queue: []})
end
def init(state) do
allow_work()
{:ok, state}
end
def handle_cast({:call_api, data}, %{"queue" => queue} = state) do
{:noreply, %{state | queue: queue ++ [data]}}
end
def handle_info(:work, %{"queue" => [data | queue]} = state) do
send_to_external_api(data)
allow_work()
{:noreply, %{state | queue: queue}}
end
defp allow_work() do
Process.send_after(self(), :work, 10000) # after 10s
end
defp send_to_external_api (data) do end
end
所以我们只是将消息从进程队列移动到状态队列,当我们向自己发出 10 秒已经过去的信号时,我们处理头部。
但最后,我们实际上达到了让进程休眠 10 秒相同的结果。您的解决方案似乎更简单并且达到了相同的结果。
解决方案基于
首先,让您的 GenServer 在其状态 (work = true/false) 中存储一个标志。
然后让GenServer在可以工作的时候使用Process.send_after
来通知自己。您在 handle_info
中收到信号,您将 work
状态标志设置为 true
。
现在请注意 handle_cast
函数中状态的模式匹配:它只会在 work
状态标志等于 true 时接收消息。否则,消息将被放入队列中等待。
并且在您将消息发送到外部服务后,您再次 运行 Process.send_after
安排下一个信号和 return 状态,其中 work
标志设置为 false
以防止立即接收下一条消息。
defmodule RateLimited do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, %{work: false})
end
def init(state) do
allow_work()
{:ok, state}
end
def handle_cast({:call_api, data}, %{"work" => true} = state) do
send_to_external_api(data)
allow_work()
{:noreply, %{state | work = false}}
end
def handle_info(:work, state) do
{:noreply, %{state | work = true}}
end
defp allow_work() do
Process.send_after(self(), :work, 10000) # after 10s
end
end
我的一项服务与速率受限的外部 API 通信,因此我想确保每 10 秒发送不超过 1 个调用。
我天真的做法是使用长运行 API 服务,并在每次调用后超时:
def handle_cast({:call_api, data}, state) do
send_to_external_api(data)
:timer.sleep(10000)
{:noreply, state}
end
我不确定是否有正确的方法。
编辑:原始解决方案在 10 秒滴答之间丢弃消息,如
编辑
由于 GenServer handle_* 函数实际上并不从队列接收消息,而只是处理它们,我们不能利用模式匹配从进程队列中选择性地每隔 10 秒接收一次消息。
因此,由于我们按照消息到达的顺序接收消息,因此我们需要将内部队列作为 GenServer 状态的一部分。
defmodule RateLimited do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, %{queue: []})
end
def init(state) do
allow_work()
{:ok, state}
end
def handle_cast({:call_api, data}, %{"queue" => queue} = state) do
{:noreply, %{state | queue: queue ++ [data]}}
end
def handle_info(:work, %{"queue" => [data | queue]} = state) do
send_to_external_api(data)
allow_work()
{:noreply, %{state | queue: queue}}
end
defp allow_work() do
Process.send_after(self(), :work, 10000) # after 10s
end
defp send_to_external_api (data) do end
end
所以我们只是将消息从进程队列移动到状态队列,当我们向自己发出 10 秒已经过去的信号时,我们处理头部。
但最后,我们实际上达到了让进程休眠 10 秒相同的结果。您的解决方案似乎更简单并且达到了相同的结果。
解决方案基于
首先,让您的 GenServer 在其状态 (work = true/false) 中存储一个标志。
然后让GenServer在可以工作的时候使用Process.send_after
来通知自己。您在 handle_info
中收到信号,您将 work
状态标志设置为 true
。
现在请注意 handle_cast
函数中状态的模式匹配:它只会在 work
状态标志等于 true 时接收消息。否则,消息将被放入队列中等待。
并且在您将消息发送到外部服务后,您再次 运行 Process.send_after
安排下一个信号和 return 状态,其中 work
标志设置为 false
以防止立即接收下一条消息。
defmodule RateLimited do
use GenServer
def start_link do
GenServer.start_link(__MODULE__, %{work: false})
end
def init(state) do
allow_work()
{:ok, state}
end
def handle_cast({:call_api, data}, %{"work" => true} = state) do
send_to_external_api(data)
allow_work()
{:noreply, %{state | work = false}}
end
def handle_info(:work, state) do
{:noreply, %{state | work = true}}
end
defp allow_work() do
Process.send_after(self(), :work, 10000) # after 10s
end
end