GenServer 上的 Elixir 非阻塞线程?

Elixir non-blocking threads on a GenServer?

我正在尝试完成一项简单的任务,但我遇到了很大的困难。

请假设我有一个GenServer,其中一个回调如下:

  @impl true
  def handle_call(:state, _, state) do
    # Something that would require 10 seconds
    newState = do_job()
    {:reply, newState, newState}
  end

如果我是对的,从客户端调用 GenServer.call(:server, :state) 将阻塞服务器 10 秒,然后新状态将返回给客户端。

好的。我希望服务器在不被阻塞的情况下处理这个任务。我试过使用任务,但 Task.await/2Task.yield/2 阻止了服务器。

我希望服务器不阻塞,10 秒后,在客户端接收结果。这怎么可能?

If I am right, invoking GenServer.call(:server, :state) from a client side would block the server for 10 seconds, and then the new state would be returned to the client.

是的。 Elixir 按照您的指示进行操作,并在这一行中:

newState = do_job()

你是在告诉 elixir 将 do_job() 的 return 值分配给变量 newState。长生不老药可以执行该分配的唯一方法是获取 go_job() 的 return 值...这将需要 10 秒。

I want the server not to block, and after those 10 seconds, receive the result on the client terminal.

一种方法是让 GenServer spawn() 一个新进程来执行 10 秒函数并将客户端的 pid 传递给新进程。当新进程从 10 秒函数获得 return 值时,新进程可以 send() 使用客户端 pid 向客户端发送消息。

这意味着客户端需要调用 handle_call() 而不是 handle_cast() 因为服务器的 handle_cast() 实现没有包含客户端的 from 参数变量pid。另一方面,handle_call() 确实from 参数变量中接收客户端 pid,因此服务器可以将客户端 pid 传递给派生进程。请注意 spawn() return 立即,这意味着 handle_call() 可以 return 立即回复 :working_on_it.

下一个问题是:客户端如何知道 GenServer 生成的新进程何时执行完 10 秒函数?客户端无法知道服务器上的一些无关进程何时完成执行,因此客户端需要在接收中等待,直到消息从派生进程到达。而且,如果客户端正在检查其邮箱中的消息,了解发件人是谁会很有帮助,这意味着 handle_call() 还应该 return 向客户端发送派生进程的 pid。客户端的另一个选择是 轮询 它的邮箱每隔一段时间做其他工作。为此,客户端可以在 after clause 中定义一个短超时的接收,然后在 after clause 中调用一个函数来完成一些客户端工作,然后递归调用包含接收的函数以便函数再次检查邮箱。

现在Task呢?根据 Task docs:

If you are using async tasks, you must await a reply...

那么,如果你必须等待,那么异步任务有什么用呢?答:如果一个进程有至少两个需要执行的长运行ning函数,那么这个进程可以使用Task.async()到运行所有的同时执行函数,而不是执行一个函数并等待它完成,然后执行另一个函数并等待它完成,然后执行另一个函数,等等。

但是,Task 还定义了一个 start() 函数:

start(mod, fun, args)

Starts a task.

This is only used when the task is used for side-effects (i.e. no interest in the returned result) and it should not be linked to the current process.

听起来 Task.start() 完成了我在第一种方法中描述的内容。您需要定义 fun 以便它将 运行 10 秒函数,然后在 10 秒函数执行完毕后将消息发送回客户端(= 副作用).

下面是生成长 运行ning 函数的 GenServer 的一个简单示例,它允许服务器在执行长 运行ning 函数时保持对其他客户端请求的响应:

a.exs:

defmodule Gen1.Server do
  use GenServer

  @impl true
  def init(init_state) do
    {:ok, init_state}
  end

  def long_func({pid, _ref}) do
    Process.sleep 10_000
    result = :dog
    send(pid, {self(), result})
  end

  @impl true
  def handle_call(:go_long, from, state) do
    long_pid = spawn(Gen1.Server, :long_func, [from])
    {:reply, long_pid, state}
  end
  def handle_call(:other, _from, state) do
    {:reply, :other_stuff, state}
  end

end

一个 iex 会话将是客户端:

~/elixir_programs$ iex a.exs
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:10] [hipe] [kernel-poll:false]
Interactive Elixir (1.6.6) - press Ctrl+C to exit (type h() ENTER for help)

iex(1)> {:ok, server_pid} = GenServer.start_link(Gen1.Server, [])
{:ok, #PID<0.93.0>}

iex(2)> long_pid = GenServer.call(server_pid, :go_long, 15_000)
#PID<0.100.0>

iex(3)> GenServer.call(server_pid, :other)                       
:other_stuff

iex(4)> receive do                                             
...(4)> {^long_pid, reply} -> reply                            
...(4)> end                                                    
:dog

iex(7)> 

long_pid这样的变量会匹配任何东西。要使 long_pid 仅匹配其当前值,请指定 ^long_pid^ 称为 pin 运算符)。

GenServer 还允许您阻止客户端对 handle_call() 的调用,同时允许服务器继续执行。如果客户端在从服务器获得一些需要的数据之前无法继续,但您希望服务器保持对其他客户端的响应,这将很有用。这是一个例子:

defmodule Gen1.Server do
  use GenServer

  @impl true
  def init(init_state) do
    {:ok, init_state}
  end

  @impl true
  def handle_call(:go_long, from, state) do
    spawn(Gen1.Server, :long_func, [from])
    {:noreply, state}  #The server doesn't send anything to the client, 
                       #so the client's call of handle_call() blocks until 
                       #somebody calls GenServer.reply().
  end

  def long_func(from) do
    Process.sleep 10_000
    result = :dog
    GenServer.reply(from, result) 
  end

end

在 iex 中:

iex(1)> {:ok, server_pid} = GenServer.start_link(Gen1.Server, [])
{:ok, #PID<0.93.0>}

iex(2)> result = GenServer.call(server_pid, :go_long, 15_000)
...hangs for 10 seconds...   
:dog

iex(3)>