在 elixir 通道中处理或防止 "race condition" 的惯用方法

The idiomatic way to handle or prevent "race condition" in elixir channels

抱歉这个愚蠢的问题。但我是elixir的新手,来自golang。

我真的很喜欢 elixir 和 FP 风格,我尝试实现一些基本功能,但在 websocket 连接上遇到了竞争条件问题。比如在golang中我用Mutex解决了,但不是FP风格。

初始问题: 几个 WebSocket 连接,等待 5 人一组。然后随机洗牌。在这种情况下,我遇到了 "race condition" 的一些问题。相同的人到不同的组等等

在 Elixir 中解决这个问题的正确惯用方法是什么?

感谢您的帮助。再次抱歉提出一个愚蠢的问题。

我将分享我丑陋的代码片段)

例如,两个并发用户加入了我跟踪的大厅频道,他们是通过在线状态加入的。之后,当一些用户发送 "match" 事件时,我在大厅中找到其他在线用户,直到有多个用户。

defp match(user_id, online_list) when length(online_list) < 1

然后我创建一个房间并发送 room_id 给他们。但是当两个用户发送匹配项时,由于竞争条件,我有两个房间。

我想为一组用户提供一个公共休息室。在 lang such go 中,我可以使用 mutex 和 share。但是我不知道如何在elixir中实现这个逻辑。

现在如果 Alice 调用该事件并且我有足够的用户给 Alice,但是 Bob 同时调用该事件并且他也有足够的用户。但他们需要在一个共同的群体中,而不是在两个不同的群体中。他们相交

def match(user_id) do
    user_id = Integer.to_string(user_id)
    match(user_id, [])
  end

  defp match(user_id, online_list) when length(online_list) < 1 do
    new_online_list =
      RchatWeb.Presence.list("room:lobby")
      |> Map.delete(user_id)
      |> Map.keys()
    match(user_id, new_online_list)
  end

  defp match(first_user_id, online_list) do
    second_user_id = Enum.random(online_list) |> String.to_integer()
    second_room_id = Accounts.get_user_room_id(second_user_id)
    {:ok, room} = cond do
      is_nil(second_room_id) -> create_room()
      true -> {:ok, get_room!(second_room_id)}
    end
    # Try to fix race condition with presence or channels
    {:ok, first_user} =
      Accounts.get_user!(first_user_id)
      |> Accounts.update_user(%{room_id: room.id})
    {:ok, second_user} =
      Accounts.get_user!(second_user_id)
      |> Accounts.update_user(%{room_id: room.id})
    ids = [first_user.id, second_user.id]
    {:ok, room.hash, ids}
  end

Elixir 没有可变状态,所以局部互斥的概念在那里毫无意义。分布式锁可以有一些用例,但这与本地互斥锁有很大不同。

请记住,每个过程始终是顺序的,所以如果我正确理解您的用例,您需要类似的东西:

defmodule Queue do
  use GenServer

  def start_link(_), do: GenServer.start_link(__MODULE__, [])

  def register_and_wait(pid),
    do: GenServer.call(pid, {:register, self()}, :infinity)

  def init(_), do: {:ok, []}

  def handle_call({:register, pid}, from, participants) do
    new_state = [{pid, from} | participants]
    case length(new_state) do
      5 ->
        shuffled = Enum.shuffle(new_state)
        {pids, refs} = Enum.unzip(shuffled)

        for ref <- refs, do: GenServer.reply(ref, {:ready, pid})

        {:noreply, shuffled}

      n when n < 5 ->
        {:reply, {:wait, 5 - n}, new_state}

      _ ->
        {:reply, :overcrowded, participants}
    end
  end
end

现在 register_and_wait/1 将根据需要注册当前进程并根据需要锁定(提防无限锁定)尽可能多的进程(在本例中为 5 个,但可以配置)。只要有足够多的进程注册,然后返回{:ready, shuffled_list_of_pids},就不会出现死锁。