使用 Elixir 去除事件
Debounce events with Elixir
我正在从 MQ 获取事件流到我的 Elixir 消费者中。
在消费者中我需要:
- 按 ID 聚合事件
- 如果 3 分钟内没有该 ID 的新数据,则向下游发送该 ID 的汇总数据。
我的数据集并不大。可能是几百个ID,一天几千条更新。
有什么方法可以使用 GenServer magic 解决这个问题吗?
谢谢!
我会这样做:
每当有新事件发生时:
如果它是具有该 ID 的第一个事件,使用 Process.send_after/3
创建一个超时为 3 分钟的计时器引用,并将事件和计时器存储在状态中。
如果它不是第一个具有该 ID 的事件,则使用 Process.cancel_timer/1
取消存储的计时器引用,按照上一步所述创建一个新计时器,并将新计时器与新事件与旧事件串联。
并且在计时器触发的 handle_info
中,将该 ID 的事件推送到下游并从状态中删除该条目。
下面是上面的简单实现:
defmodule DebouncedEcho do
@timeout 1000
use GenServer
def start_link do
GenServer.start_link __MODULE__, []
end
def init(_) do
{:ok, %{}}
end
def handle_cast({:store, id, event}, state) do
case state[id] do
nil ->
timer = Process.send_after(self, {:timer, id}, @timeout)
state = Map.put(state, id, %{events: [event], timer: timer})
{:noreply, state}
%{events: events, timer: timer} ->
Process.cancel_timer(timer)
timer = Process.send_after(self, {:timer, id}, @timeout)
state = Map.put(state, id, %{events: [event | events], timer: timer})
{:noreply, state}
end
end
def handle_info({:timer, id}, state) do
%{events: events} = state[id]
IO.inspect {:flush, id, events}
state = Map.delete(state, id)
{:noreply, state}
end
end
测试:
{:ok, server} = DebouncedEcho.start_link
GenServer.cast server, {:store, 1, :foo}
GenServer.cast server, {:store, 1, :bar}
GenServer.cast server, {:store, 2, :foo}
:timer.sleep(500)
GenServer.cast server, {:store, 2, :bar}
:timer.sleep(500)
GenServer.cast server, {:store, 2, :baz}
:timer.sleep(500)
GenServer.cast server, {:store, 1, :baz}
:timer.sleep(2000)
输出:
{:flush, 1, [:bar, :foo]}
{:flush, 2, [:baz, :bar, :foo]}
{:flush, 1, [:baz]}
我正在从 MQ 获取事件流到我的 Elixir 消费者中。
在消费者中我需要:
- 按 ID 聚合事件
- 如果 3 分钟内没有该 ID 的新数据,则向下游发送该 ID 的汇总数据。
我的数据集并不大。可能是几百个ID,一天几千条更新。
有什么方法可以使用 GenServer magic 解决这个问题吗?
谢谢!
我会这样做:
每当有新事件发生时:
如果它是具有该 ID 的第一个事件,使用
Process.send_after/3
创建一个超时为 3 分钟的计时器引用,并将事件和计时器存储在状态中。如果它不是第一个具有该 ID 的事件,则使用
Process.cancel_timer/1
取消存储的计时器引用,按照上一步所述创建一个新计时器,并将新计时器与新事件与旧事件串联。
并且在计时器触发的 handle_info
中,将该 ID 的事件推送到下游并从状态中删除该条目。
下面是上面的简单实现:
defmodule DebouncedEcho do
@timeout 1000
use GenServer
def start_link do
GenServer.start_link __MODULE__, []
end
def init(_) do
{:ok, %{}}
end
def handle_cast({:store, id, event}, state) do
case state[id] do
nil ->
timer = Process.send_after(self, {:timer, id}, @timeout)
state = Map.put(state, id, %{events: [event], timer: timer})
{:noreply, state}
%{events: events, timer: timer} ->
Process.cancel_timer(timer)
timer = Process.send_after(self, {:timer, id}, @timeout)
state = Map.put(state, id, %{events: [event | events], timer: timer})
{:noreply, state}
end
end
def handle_info({:timer, id}, state) do
%{events: events} = state[id]
IO.inspect {:flush, id, events}
state = Map.delete(state, id)
{:noreply, state}
end
end
测试:
{:ok, server} = DebouncedEcho.start_link
GenServer.cast server, {:store, 1, :foo}
GenServer.cast server, {:store, 1, :bar}
GenServer.cast server, {:store, 2, :foo}
:timer.sleep(500)
GenServer.cast server, {:store, 2, :bar}
:timer.sleep(500)
GenServer.cast server, {:store, 2, :baz}
:timer.sleep(500)
GenServer.cast server, {:store, 1, :baz}
:timer.sleep(2000)
输出:
{:flush, 1, [:bar, :foo]}
{:flush, 2, [:baz, :bar, :foo]}
{:flush, 1, [:baz]}