GenServer 上的 Elixir 非阻塞线程?
Elixir non-blocking threads on a GenServer?
我正在尝试完成一项简单的任务,但我遇到了很大的困难。
请假设我有一个GenServer
,其中一个回调如下:
@impl true
def handle_call(:state, _, state) do
# Something that would require 10 seconds
newState = do_job()
{:reply, newState, newState}
end
如果我是对的,从客户端调用 GenServer.call(:server, :state)
将阻塞服务器 10 秒,然后新状态将返回给客户端。
好的。我希望服务器在不被阻塞的情况下处理这个任务。我试过使用任务,但 Task.await/2
和 Task.yield/2
阻止了服务器。
我希望服务器不阻塞,10 秒后,在客户端接收结果。这怎么可能?
If I am right, invoking GenServer.call(:server, :state) from a client
side would block the server for 10 seconds, and then the new state
would be returned to the client.
是的。 Elixir 按照您的指示进行操作,并在这一行中:
newState = do_job()
你是在告诉 elixir 将 do_job()
的 return 值分配给变量 newState
。长生不老药可以执行该分配的唯一方法是获取 go_job()
的 return 值...这将需要 10 秒。
I want the server not to block, and after those 10 seconds, receive
the result on the client terminal.
一种方法是让 GenServer spawn()
一个新进程来执行 10 秒函数并将客户端的 pid 传递给新进程。当新进程从 10 秒函数获得 return 值时,新进程可以 send()
使用客户端 pid 向客户端发送消息。
这意味着客户端需要调用 handle_call()
而不是 handle_cast()
因为服务器的 handle_cast()
实现没有包含客户端的 from
参数变量pid。另一方面,handle_call()
确实 在 from
参数变量中接收客户端 pid,因此服务器可以将客户端 pid 传递给派生进程。请注意 spawn()
return 立即,这意味着 handle_call()
可以 return 立即回复 :working_on_it
.
下一个问题是:客户端如何知道 GenServer 生成的新进程何时执行完 10 秒函数?客户端无法知道服务器上的一些无关进程何时完成执行,因此客户端需要在接收中等待,直到消息从派生进程到达。而且,如果客户端正在检查其邮箱中的消息,了解发件人是谁会很有帮助,这意味着 handle_call()
还应该 return 向客户端发送派生进程的 pid。客户端的另一个选择是 轮询 它的邮箱每隔一段时间做其他工作。为此,客户端可以在 after clause 中定义一个短超时的接收,然后在 after clause
中调用一个函数来完成一些客户端工作,然后递归调用包含接收的函数以便函数再次检查邮箱。
现在Task
呢?根据 Task docs:
If you are using async tasks, you must await a reply...
那么,如果你必须等待,那么异步任务有什么用呢?答:如果一个进程有至少两个需要执行的长运行ning函数,那么这个进程可以使用Task.async()
到运行所有的同时执行函数,而不是执行一个函数并等待它完成,然后执行另一个函数并等待它完成,然后执行另一个函数,等等。
但是,Task 还定义了一个 start() 函数:
start(mod, fun, args)
Starts a task.
This is only used when the task is used for side-effects (i.e. no
interest in the returned result) and it should not be linked to the
current process.
听起来 Task.start()
完成了我在第一种方法中描述的内容。您需要定义 fun
以便它将 运行 10 秒函数,然后在 10 秒函数执行完毕后将消息发送回客户端(= 副作用).
下面是生成长 运行ning 函数的 GenServer 的一个简单示例,它允许服务器在执行长 运行ning 函数时保持对其他客户端请求的响应:
a.exs:
defmodule Gen1.Server do
use GenServer
@impl true
def init(init_state) do
{:ok, init_state}
end
def long_func({pid, _ref}) do
Process.sleep 10_000
result = :dog
send(pid, {self(), result})
end
@impl true
def handle_call(:go_long, from, state) do
long_pid = spawn(Gen1.Server, :long_func, [from])
{:reply, long_pid, state}
end
def handle_call(:other, _from, state) do
{:reply, :other_stuff, state}
end
end
一个 iex 会话将是客户端:
~/elixir_programs$ iex a.exs
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:10] [hipe] [kernel-poll:false]
Interactive Elixir (1.6.6) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> {:ok, server_pid} = GenServer.start_link(Gen1.Server, [])
{:ok, #PID<0.93.0>}
iex(2)> long_pid = GenServer.call(server_pid, :go_long, 15_000)
#PID<0.100.0>
iex(3)> GenServer.call(server_pid, :other)
:other_stuff
iex(4)> receive do
...(4)> {^long_pid, reply} -> reply
...(4)> end
:dog
iex(7)>
像long_pid
这样的变量会匹配任何东西。要使 long_pid
仅匹配其当前值,请指定 ^long_pid
(^
称为 pin 运算符)。
GenServer 还允许您阻止客户端对 handle_call()
的调用,同时允许服务器继续执行。如果客户端在从服务器获得一些需要的数据之前无法继续,但您希望服务器保持对其他客户端的响应,这将很有用。这是一个例子:
defmodule Gen1.Server do
use GenServer
@impl true
def init(init_state) do
{:ok, init_state}
end
@impl true
def handle_call(:go_long, from, state) do
spawn(Gen1.Server, :long_func, [from])
{:noreply, state} #The server doesn't send anything to the client,
#so the client's call of handle_call() blocks until
#somebody calls GenServer.reply().
end
def long_func(from) do
Process.sleep 10_000
result = :dog
GenServer.reply(from, result)
end
end
在 iex 中:
iex(1)> {:ok, server_pid} = GenServer.start_link(Gen1.Server, [])
{:ok, #PID<0.93.0>}
iex(2)> result = GenServer.call(server_pid, :go_long, 15_000)
...hangs for 10 seconds...
:dog
iex(3)>
我正在尝试完成一项简单的任务,但我遇到了很大的困难。
请假设我有一个GenServer
,其中一个回调如下:
@impl true
def handle_call(:state, _, state) do
# Something that would require 10 seconds
newState = do_job()
{:reply, newState, newState}
end
如果我是对的,从客户端调用 GenServer.call(:server, :state)
将阻塞服务器 10 秒,然后新状态将返回给客户端。
好的。我希望服务器在不被阻塞的情况下处理这个任务。我试过使用任务,但 Task.await/2
和 Task.yield/2
阻止了服务器。
我希望服务器不阻塞,10 秒后,在客户端接收结果。这怎么可能?
If I am right, invoking GenServer.call(:server, :state) from a client side would block the server for 10 seconds, and then the new state would be returned to the client.
是的。 Elixir 按照您的指示进行操作,并在这一行中:
newState = do_job()
你是在告诉 elixir 将 do_job()
的 return 值分配给变量 newState
。长生不老药可以执行该分配的唯一方法是获取 go_job()
的 return 值...这将需要 10 秒。
I want the server not to block, and after those 10 seconds, receive the result on the client terminal.
一种方法是让 GenServer spawn()
一个新进程来执行 10 秒函数并将客户端的 pid 传递给新进程。当新进程从 10 秒函数获得 return 值时,新进程可以 send()
使用客户端 pid 向客户端发送消息。
这意味着客户端需要调用 handle_call()
而不是 handle_cast()
因为服务器的 handle_cast()
实现没有包含客户端的 from
参数变量pid。另一方面,handle_call()
确实 在 from
参数变量中接收客户端 pid,因此服务器可以将客户端 pid 传递给派生进程。请注意 spawn()
return 立即,这意味着 handle_call()
可以 return 立即回复 :working_on_it
.
下一个问题是:客户端如何知道 GenServer 生成的新进程何时执行完 10 秒函数?客户端无法知道服务器上的一些无关进程何时完成执行,因此客户端需要在接收中等待,直到消息从派生进程到达。而且,如果客户端正在检查其邮箱中的消息,了解发件人是谁会很有帮助,这意味着 handle_call()
还应该 return 向客户端发送派生进程的 pid。客户端的另一个选择是 轮询 它的邮箱每隔一段时间做其他工作。为此,客户端可以在 after clause 中定义一个短超时的接收,然后在 after clause
中调用一个函数来完成一些客户端工作,然后递归调用包含接收的函数以便函数再次检查邮箱。
现在Task
呢?根据 Task docs:
If you are using async tasks, you must await a reply...
那么,如果你必须等待,那么异步任务有什么用呢?答:如果一个进程有至少两个需要执行的长运行ning函数,那么这个进程可以使用Task.async()
到运行所有的同时执行函数,而不是执行一个函数并等待它完成,然后执行另一个函数并等待它完成,然后执行另一个函数,等等。
但是,Task 还定义了一个 start() 函数:
start(mod, fun, args)
Starts a task.
This is only used when the task is used for side-effects (i.e. no interest in the returned result) and it should not be linked to the current process.
听起来 Task.start()
完成了我在第一种方法中描述的内容。您需要定义 fun
以便它将 运行 10 秒函数,然后在 10 秒函数执行完毕后将消息发送回客户端(= 副作用).
下面是生成长 运行ning 函数的 GenServer 的一个简单示例,它允许服务器在执行长 运行ning 函数时保持对其他客户端请求的响应:
a.exs:
defmodule Gen1.Server do
use GenServer
@impl true
def init(init_state) do
{:ok, init_state}
end
def long_func({pid, _ref}) do
Process.sleep 10_000
result = :dog
send(pid, {self(), result})
end
@impl true
def handle_call(:go_long, from, state) do
long_pid = spawn(Gen1.Server, :long_func, [from])
{:reply, long_pid, state}
end
def handle_call(:other, _from, state) do
{:reply, :other_stuff, state}
end
end
一个 iex 会话将是客户端:
~/elixir_programs$ iex a.exs
Erlang/OTP 20 [erts-9.3] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:10] [hipe] [kernel-poll:false]
Interactive Elixir (1.6.6) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> {:ok, server_pid} = GenServer.start_link(Gen1.Server, [])
{:ok, #PID<0.93.0>}
iex(2)> long_pid = GenServer.call(server_pid, :go_long, 15_000)
#PID<0.100.0>
iex(3)> GenServer.call(server_pid, :other)
:other_stuff
iex(4)> receive do
...(4)> {^long_pid, reply} -> reply
...(4)> end
:dog
iex(7)>
像long_pid
这样的变量会匹配任何东西。要使 long_pid
仅匹配其当前值,请指定 ^long_pid
(^
称为 pin 运算符)。
GenServer 还允许您阻止客户端对 handle_call()
的调用,同时允许服务器继续执行。如果客户端在从服务器获得一些需要的数据之前无法继续,但您希望服务器保持对其他客户端的响应,这将很有用。这是一个例子:
defmodule Gen1.Server do
use GenServer
@impl true
def init(init_state) do
{:ok, init_state}
end
@impl true
def handle_call(:go_long, from, state) do
spawn(Gen1.Server, :long_func, [from])
{:noreply, state} #The server doesn't send anything to the client,
#so the client's call of handle_call() blocks until
#somebody calls GenServer.reply().
end
def long_func(from) do
Process.sleep 10_000
result = :dog
GenServer.reply(from, result)
end
end
在 iex 中:
iex(1)> {:ok, server_pid} = GenServer.start_link(Gen1.Server, [])
{:ok, #PID<0.93.0>}
iex(2)> result = GenServer.call(server_pid, :go_long, 15_000)
...hangs for 10 seconds...
:dog
iex(3)>