在 elixir 通道中处理或防止 "race condition" 的惯用方法
The idiomatic way to handle or prevent "race condition" in elixir channels
抱歉这个愚蠢的问题。但我是elixir的新手,来自golang。
我真的很喜欢 elixir 和 FP 风格,我尝试实现一些基本功能,但在 websocket 连接上遇到了竞争条件问题。比如在golang中我用Mutex解决了,但不是FP风格。
初始问题:
几个 WebSocket 连接,等待 5 人一组。然后随机洗牌。在这种情况下,我遇到了 "race condition" 的一些问题。相同的人到不同的组等等
在 Elixir 中解决这个问题的正确惯用方法是什么?
感谢您的帮助。再次抱歉提出一个愚蠢的问题。
我将分享我丑陋的代码片段)
例如,两个并发用户加入了我跟踪的大厅频道,他们是通过在线状态加入的。之后,当一些用户发送 "match" 事件时,我在大厅中找到其他在线用户,直到有多个用户。
defp match(user_id, online_list) when length(online_list) < 1
然后我创建一个房间并发送 room_id 给他们。但是当两个用户发送匹配项时,由于竞争条件,我有两个房间。
我想为一组用户提供一个公共休息室。在 lang such go 中,我可以使用 mutex 和 share。但是我不知道如何在elixir中实现这个逻辑。
现在如果 Alice 调用该事件并且我有足够的用户给 Alice,但是 Bob 同时调用该事件并且他也有足够的用户。但他们需要在一个共同的群体中,而不是在两个不同的群体中。他们相交
def match(user_id) do
user_id = Integer.to_string(user_id)
match(user_id, [])
end
defp match(user_id, online_list) when length(online_list) < 1 do
new_online_list =
RchatWeb.Presence.list("room:lobby")
|> Map.delete(user_id)
|> Map.keys()
match(user_id, new_online_list)
end
defp match(first_user_id, online_list) do
second_user_id = Enum.random(online_list) |> String.to_integer()
second_room_id = Accounts.get_user_room_id(second_user_id)
{:ok, room} = cond do
is_nil(second_room_id) -> create_room()
true -> {:ok, get_room!(second_room_id)}
end
# Try to fix race condition with presence or channels
{:ok, first_user} =
Accounts.get_user!(first_user_id)
|> Accounts.update_user(%{room_id: room.id})
{:ok, second_user} =
Accounts.get_user!(second_user_id)
|> Accounts.update_user(%{room_id: room.id})
ids = [first_user.id, second_user.id]
{:ok, room.hash, ids}
end
Elixir 没有可变状态,所以局部互斥的概念在那里毫无意义。分布式锁可以有一些用例,但这与本地互斥锁有很大不同。
请记住,每个过程始终是顺序的,所以如果我正确理解您的用例,您需要类似的东西:
defmodule Queue do
use GenServer
def start_link(_), do: GenServer.start_link(__MODULE__, [])
def register_and_wait(pid),
do: GenServer.call(pid, {:register, self()}, :infinity)
def init(_), do: {:ok, []}
def handle_call({:register, pid}, from, participants) do
new_state = [{pid, from} | participants]
case length(new_state) do
5 ->
shuffled = Enum.shuffle(new_state)
{pids, refs} = Enum.unzip(shuffled)
for ref <- refs, do: GenServer.reply(ref, {:ready, pid})
{:noreply, shuffled}
n when n < 5 ->
{:reply, {:wait, 5 - n}, new_state}
_ ->
{:reply, :overcrowded, participants}
end
end
end
现在 register_and_wait/1
将根据需要注册当前进程并根据需要锁定(提防无限锁定)尽可能多的进程(在本例中为 5 个,但可以配置)。只要有足够多的进程注册,然后返回{:ready, shuffled_list_of_pids}
,就不会出现死锁。
抱歉这个愚蠢的问题。但我是elixir的新手,来自golang。
我真的很喜欢 elixir 和 FP 风格,我尝试实现一些基本功能,但在 websocket 连接上遇到了竞争条件问题。比如在golang中我用Mutex解决了,但不是FP风格。
初始问题: 几个 WebSocket 连接,等待 5 人一组。然后随机洗牌。在这种情况下,我遇到了 "race condition" 的一些问题。相同的人到不同的组等等
在 Elixir 中解决这个问题的正确惯用方法是什么?
感谢您的帮助。再次抱歉提出一个愚蠢的问题。
我将分享我丑陋的代码片段)
例如,两个并发用户加入了我跟踪的大厅频道,他们是通过在线状态加入的。之后,当一些用户发送 "match" 事件时,我在大厅中找到其他在线用户,直到有多个用户。
defp match(user_id, online_list) when length(online_list) < 1
然后我创建一个房间并发送 room_id 给他们。但是当两个用户发送匹配项时,由于竞争条件,我有两个房间。
我想为一组用户提供一个公共休息室。在 lang such go 中,我可以使用 mutex 和 share。但是我不知道如何在elixir中实现这个逻辑。
现在如果 Alice 调用该事件并且我有足够的用户给 Alice,但是 Bob 同时调用该事件并且他也有足够的用户。但他们需要在一个共同的群体中,而不是在两个不同的群体中。他们相交
def match(user_id) do
user_id = Integer.to_string(user_id)
match(user_id, [])
end
defp match(user_id, online_list) when length(online_list) < 1 do
new_online_list =
RchatWeb.Presence.list("room:lobby")
|> Map.delete(user_id)
|> Map.keys()
match(user_id, new_online_list)
end
defp match(first_user_id, online_list) do
second_user_id = Enum.random(online_list) |> String.to_integer()
second_room_id = Accounts.get_user_room_id(second_user_id)
{:ok, room} = cond do
is_nil(second_room_id) -> create_room()
true -> {:ok, get_room!(second_room_id)}
end
# Try to fix race condition with presence or channels
{:ok, first_user} =
Accounts.get_user!(first_user_id)
|> Accounts.update_user(%{room_id: room.id})
{:ok, second_user} =
Accounts.get_user!(second_user_id)
|> Accounts.update_user(%{room_id: room.id})
ids = [first_user.id, second_user.id]
{:ok, room.hash, ids}
end
Elixir 没有可变状态,所以局部互斥的概念在那里毫无意义。分布式锁可以有一些用例,但这与本地互斥锁有很大不同。
请记住,每个过程始终是顺序的,所以如果我正确理解您的用例,您需要类似的东西:
defmodule Queue do
use GenServer
def start_link(_), do: GenServer.start_link(__MODULE__, [])
def register_and_wait(pid),
do: GenServer.call(pid, {:register, self()}, :infinity)
def init(_), do: {:ok, []}
def handle_call({:register, pid}, from, participants) do
new_state = [{pid, from} | participants]
case length(new_state) do
5 ->
shuffled = Enum.shuffle(new_state)
{pids, refs} = Enum.unzip(shuffled)
for ref <- refs, do: GenServer.reply(ref, {:ready, pid})
{:noreply, shuffled}
n when n < 5 ->
{:reply, {:wait, 5 - n}, new_state}
_ ->
{:reply, :overcrowded, participants}
end
end
end
现在 register_and_wait/1
将根据需要注册当前进程并根据需要锁定(提防无限锁定)尽可能多的进程(在本例中为 5 个,但可以配置)。只要有足够多的进程注册,然后返回{:ready, shuffled_list_of_pids}
,就不会出现死锁。