无法在 GenServer 中重现 "start/loop" 行为

Can't reproduce "start/loop" behaviour in GenServer

我正在尝试使用一个进程作为同步机制,它可以按顺序接收消息但仍能正常运行。我已经设法用简单的过程实现了我的问题的简化版本,但是我没能用 GenServer 实现同样的效果。

简化版是这样的:

defmodule Fun do
  def start_link do
    spawn(fn -> loop(:initiated) end)
  end

  def loop(state) do
    receive do
      :join when state == :initiated ->
        IO.inspect("Handling join")
        loop(:initiated)

      :finish when state == :initiated ->
        IO.inspect("Finishing")
        loop(:finishing)

      :cleanup when state == :finishing ->
        IO.inspect("Cleaning up...")
    end
  end
end

以上只会在state:initiated时处理:join:finish消息,只有在:finish时才执行:cleanup退出] 已经收到。在这里,我试图利用邮件卡在邮箱中的优势,直到它们可以匹配。

它是这样工作的:

iex(1)> pid = Fun.start_link
#PID<0.140.0>
iex(2)> send(pid, :join)
"Handling join"
:join
iex(3)> send(pid, :join)
"Handling join"
:join
iex(4)> send(pid, :cleanup) # No `IO.inspect` at this point
:cleanup
iex(5)> send(pid, :finish) # Two `IO.inspect`s once `:finish` received
"Finishing"
:finish
"Cleaning up..."

我试图用 GenServer 重现相同的行为:

defmodule FunServer do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:ok, :initiated}
  end

  def handle_info(:join, msg) when msg == :initiated do
    IO.inspect("Handling join [From GenServer]")
    {:noreply, :initiated}
  end

  def handle_info(:finish, msg) when msg == :initiated do
    IO.inspect("Finishing [From GenServer]")
    {:noreply, :finishing}
  end

  def handle_info(:cleanup, msg) when msg == :finishing do
    IO.inspect("Cleaning up [From GenServer]")
    {:stop, :normal, msg}
  end
end

鉴于我将此 GenServer 的应用程序配置为 :temporary worker 它的工作方式如下:

iex(1)> send(FunServer, :join)
"Handling join [From GenServer]"
:join
iex(2)> send(FunServer, :cleanup)
:cleanup
iex(3)>
07:11:17.383 [error] GenServer FunServer terminating
** (FunctionClauseError) no function clause matching in 
FunServer.handle_info/2
    (what_the_beam) lib/fun_server.ex:22: FunServer.handle_info(:cleanup, :initiated)
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: :cleanup
State: :initiated

我试过使用 handle_cast 回调,以及不同格式的参数,例如:

handle_info(:cleanup, :finishing)

handle_cast(:cleanup, :finishing)

而不是 where msg == :finishing,但其中 none 对我有用。

我正在使用 Elixir 1.5 和 Erlang 20。

在您的原始代码中,当状态为 :initiated 时,您的 receive 会忽略 :cleanup 消息。这是一种不好的做法,可能会导致瓶颈,因为任何此类消息都会永远留在进程的收件箱中,耗尽内存并减慢未来的 receive 块,因为 receive(通常)需要时间成正比到进程收件箱中的邮件数量。

GenServer 通过强制您按照收到消息的顺序处理消息来正确处理这种情况。为了忽略消息,您需要显式添加一个什么都不做的 handle_info 。您可以简单地添加此子句,当状态为 :initiated:

时,该子句将忽略 :cleanup
def handle_info(:cleanup, :initiated), do: {:noreply, :initiated}

您也可以通过在所有现有 handle_info:

之后添加此子句来忽略任何其他消息
def handle_info(_message, state), do: {:noreply, state}

I've tried using handle_cast callbacks, and also different format of arguments, like:

handle_info(:cleanup, :finishing)

...

如果您仔细阅读错误消息中的堆栈跟踪,问题是 :gen_server 试图调用 FunServer.handle_info(:cleanup, :initiated)handle_info 中没有定义子句来处理它。 (:cleanup, :finishing).

没有问题

您 运行 遇到的问题称为常规 erlang 进程的选择性接收。这是乔·阿姆斯特朗 (Joe Armstrong) 书中的一句话:

    receive works as follows:
    ...
    2. Take the first message in the mailbox and try to match it 
    against Pattern1, Pattern2, and so on. If the match succeeds, 
    the message is removed from the mailbox, and the expressions 
    following the pattern are evaluated.
    3. If none of the patterns in the receive statement matches the 
    first message in the mailbox, then the first message is removed 
    from the mailbox and put into a “save queue.” The second message
    in the mailbox is then tried. This procedure is repeated until a 
    matching message is found or until all the messages in the mail- 
    box have been examined.
    4. If none of the messages in the mailbox matches, then the process 
    is suspended and will be rescheduled for execution the next time 
    a new message is put in the mailbox. Note that when a new message 
    arrives, the messages in the save queue are not rematched; only 
    the new message is matched.
    5. As soon as a message has been matched, then all messages that 
    have been put into the save queue are reentered into the mailbox 
    in the order in which they arrived at the process. If a timer 
    was set, it is cleared.
    6. If the timer elapses when we are waiting for a message, then 
    evaluate the expressions ExpressionsTimeout and put any saved 
    messages back into the mailbox in the order in which they 
    arrived at the process.

gen_server 不是这样工作的。它需要您的回调模块来匹配一条消息,或者当您发现由于 gen_server 实现未找到分派方式而抛出错误时。如果您希望 gen_server 实现与上面概述的接收逻辑相匹配,则必须手动执行。一种简单的方法是将在给定状态下无法匹配的所有消息累积到某种列表中,然后在每次成功匹配后将它们重新发送到 self()。为此,您的状态不能再只是一个简单的原子,因为您需要自己组织保存的队列。

顺便说一句,之前在 erlang 的上下文中也有人问过同样的问题。响应者有一个建议与我描述的一一对应。因此,如果您需要具体代码,这里是该问题的 link:How do you do selective receives in gen_servers?

使用 :gen_statem(或 E​​lixir 包装器库,在十六进制上找到 gen_state_machine)可以更轻松地完成您想要实现的事情。您可以使用 "postpone" 功能模拟选择性接收,这会将消息放回内部缓冲区,直到机器状态发生变化,此时推迟的消息将按照收到的顺序进行处理.

还有其他一些不错的技巧,比如能够生成 "internal" 消息,这些消息在处理其他任何事情之前都放在邮箱的头部。由于您的示例是 FSM 的一个非常明确的案例,我建议您走这条路,而不是在 GenServer 中重新发明它。