长生不老药:Genserver.call 未启动 handle_call
Elixir: Genserver.call not initiaing handle_call
我正在实施 Gossip Algorithm
,其中多个参与者同时并行传播八卦。当每个 Actor 都听了 10 次八卦后,系统停止。
现在,我有一个场景,在将八卦发送给它之前,我正在检查接收者 actor 的收听计数。如果 listen count 已经是 10,那么八卦将不会发送给接收者 actor。我正在使用同步调用来获取监听计数。
def get_message(server, msg) do
GenServer.call(server, {:get_message, msg})
end
def handle_call({:get_message, msg}, _from, state) do
listen_count = hd(state)
{:reply, listen_count, state}
end
程序在启动时运行良好,但一段时间后 Genserver.call
停止并出现超时错误,如下所示。经过一番调试,我发现Genserver.call
变成了休眠状态,无法启动相应的handle_call
方法。使用同步调用时是否会出现这种行为?由于所有参与者都是独立的,Genserver.call
方法不应该是 运行 独立而不等待彼此的响应。
02:28:05.634 [error] GenServer #PID<0.81.0> terminating
** (stop) exited in: GenServer.call(#PID<0.79.0>, {:get_message, []}, 5000)
** (EXIT) time out
(elixir) lib/gen_server.ex:774: GenServer.call/3
编辑:当 运行 in iex shell.
时,以下代码可以重现错误
defmodule RumourActor do
use GenServer
def start_link(opts) do
{:ok, pid} = GenServer.start_link(__MODULE__,opts)
{pid}
end
def set_message(server, msg, recipient) do
GenServer.cast(server, {:set_message, msg, server, recipient})
end
def get_message(server, msg) do
GenServer.call(server, :get_message)
end
def init(opts) do
state=opts
{:ok,state}
end
def handle_cast({:set_message, msg, server, recipient},state) do
:timer.sleep(5000)
c = RumourActor.get_message(recipient, [])
IO.inspect c
{:noreply,state}
end
def handle_call(:get_message, _from, state) do
count = tl(state)
{:reply, count, state}
end
end
打开 iex shell 并加载上面的模块。使用以下命令启动两个进程:
a = RumourActor.start_link(["", 3])
b = RumourActor.start_link(["", 5])
通过调用 Dogbert 在评论中提到的死锁条件来产生错误。 运行以下,时差不大
cb = RumourActor.set_message(elem(a,0), [], elem(b,0))
ca = RumourActor.set_message(elem(b,0), [], elem(a,0))
等待 5 秒。会出现错误。
八卦协议是一种处理异步、未知、未配置(随机)网络的方法,这些网络可能会遭受间歇性中断和分区,并且不存在领导者或默认结构。 (请注意,这种情况在现实世界中有些不寻常,带外控制总是以某种方式强加于系统。)
考虑到这一点,让我们将其更改为异步系统(使用 cast
),以便我们遵循八卦式通信概念的精神。
我们需要消息摘要来计算给定消息已被接收的次数,已接收且已经超过幻数的消息摘要(因此我们不会重新发送消息,如果它是太晚了),以及我们系统中注册的进程列表,以便我们知道我们正在向谁广播:
(以下示例是在 Erlang 中编写的,因为自从我停止使用 Elixir 语法后,我就被绊倒了...)
-module(rumor).
-record(s,
{peers = [] :: [pid()],
digest = #{} :: #{message_id(), non_neg_integer()},
dead = sets:new() :: sets:set(message_id())}).
-type message_id() :: zuuid:uuid().
在这里,我使用 UUID, but it could be whatever. An Erlang reference 作为测试用例会很好,但是由于八卦在 Erlang 集群中没有用,并且引用在原始系统之外是不安全的,我只是跳到假设这是一个网络系统。
我们将需要一个接口函数,允许我们告诉进程将新消息注入系统。我们还需要一个接口函数,一旦它已经在系统中,它就会在两个进程之间发送消息。然后我们需要一个内部函数来向所有已知(订阅)的节点广播消息。啊,这意味着我们需要一个问候接口,以便对等进程可以相互通知它们的存在。
我们还需要一种方法让进程告诉自己随着时间的推移继续广播。设置重传间隔多长时间实际上不是一个简单的决定——它与网络拓扑、延迟、可变性等有关似乎反应迟钝,等等——但我们不打算在这里陷入那种疯狂)。在这里,我只是将其设置为 1 秒,因为对于观察系统的人来说,这是一个易于解释的时间间隔。
请注意下面的一切都是异步的。
接口...
insert(Pid, Message) ->
gen_server:cast(Pid, {insert, Message}).
relay(Pid, ID, Message) ->
gen_server:cast(Pid, {relay, ID, Message}).
greet(Pid) ->
gen_server:cast(Pid, {greet, self()}).
make_introduction(Pid, PeerPid) ->
gen_server:cast(Pid, {make_introduction, PeerPid}).
最后一个函数将成为我们作为系统测试人员的方式,使其中一个进程在某个目标 Pid 上调用 greet/1
,以便他们开始构建对等网络。在现实世界中,通常会发生一些稍微不同的事情。
在接收演员表的 gen_server 回调中,我们将得到:
handle_cast({insert, Message}, State) ->
NewState = do_insert(Message, State);
{noreply, NewState};
handle_cast({relay, ID, Message}, State) ->
NewState = do_relay(ID, Message, State),
{noreply, NewState};
handle_cast({greet, Peer}, State) ->
NewState = do_greet(Peer, State),
{noreply, NewState};
handle_cast({make_introduction, Peer}, State) ->
NewState = do_make_introduction(Peer, State),
{noreply, NewState}.
很简单的东西。
上面我提到我们需要一种方法让这个东西告诉自己在延迟后重新发送。为此,我们将在延迟后使用 erlang:send_after/3
向 "redo_relay" 发送一条裸消息,因此我们将需要一个 handle_info/2 来处理它:
handle_info({redo_relay, ID, Message}, State) ->
NewState = do_relay(ID, Message, State),
{noreply, NewState}.
消息位的实现是有趣的部分,但是 none 这部分非常棘手。请原谅下面的 do_relay/3
-- 它可以更简洁,但我是在浏览器中写的,所以...
do_insert(Message, State = #s{peers = Peers, digest = Digest}) ->
MessageID = zuuid:v1(),
NewDigest = maps:put(MessageID, 1, Digest),
ok = broadcast(Message, Peers),
ok = schedule_resend(MessageID, Message),
State#s{digest = NewDigest}.
do_relay(ID,
Message,
State = #s{peers = Peers, digest = Digest, dead = Dead}) ->
case maps:find(ID, Digest) of
{ok, Count} when Count >= 10 ->
NewDigest = maps:remove(ID, Digest),
NewDead = sets:add_element(ID, Dead),
ok = broadcast(Message, Peers),
State#s{digest = NewDigest, dead = NewDead};
{ok, Count} ->
NewDigest = maps:put(ID, Count + 1),
ok = broadcast(ID, Message, Peers),
ok = schedule_resend(ID, Message),
State#s{digest = NewDigest};
error ->
case set:is_element(ID, Dead) of
true ->
State;
false ->
NewDigest = maps:put(ID, 1),
ok = broadcast(Message, Peers),
ok = schedule_resend(ID, Message),
State#s{digest = NewDigest}
end
end.
broadcast(ID, Message, Peers) ->
Forward = fun(P) -> relay(P, ID, Message),
lists:foreach(Forward, Peers).
schedule_resend(ID, Message) ->
_ = erlang:send_after(1000, self(), {redo_relay, ID, Message}),
ok.
现在我们需要社交位...
do_greet(Peer, State = #s{peers = Peers}) ->
case lists:member(Peer, Peers) of
false -> State#s{peers = [Peer | Peers]};
true -> State
end.
do_make_introduction(Peer, State = #s{peers = Peers}) ->
ok = greet(Peer),
do_greet(Peer, State).
那么上面所有可怕的未指定类型的东西都做了什么?
它避免了任何死锁的可能性。死锁在对等系统中如此致命的原因是任何时候你有两个相同的进程(或参与者,或其他)同步通信,您已经创建了一个潜在死锁的教科书案例。
任何时候 A
有一条同步消息指向 B
并且 B
有一条同步消息指向 A
同时你现在有一个死锁。无法创建 相同的 进程,这些进程在不创建潜在死锁的情况下同步调用彼此。在大规模并发系统中,任何可能发生的事情几乎肯定最终都会发生,所以你迟早会 运行 进入这个。
八卦旨在异步是有原因的:它是一种草率的、不可靠的、低效的方式来处理草率的、不可靠的、低效的网络拓扑。尝试进行调用而不是强制转换不仅会破坏八卦式消息中继的目的,而且 还会 将您推入不可能的死锁区域,从而将协议的性质从异步更改为同步。
Genser.call
的 default timeout 为 5000 毫秒。所以可能发生的情况是,actor 的消息队列中充满了数百万条消息,当它到达 call
时,calling actor 已经超时。
您可以使用 try...catch
:
处理超时
try do
c = RumourActor.get_message(recipient, [])
catch
:exit, reason ->
# handle timeout
现在,被调用 actor 最终将收到 call
消息并做出响应,这将作为第一个进程的意外消息出现。您需要使用 handle_info
来处理这个问题。所以一种方法是忽略 catch
块中的错误并从 handle_info
.
发送谣言
此外,如果有许多进程等待超时 5 秒后再继续,这将显着降低性能。可以故意减少超时并处理 handle_info
中的回复。这将减少使用 cast
和处理来自其他进程的回复。
您的阻塞调用需要分成两个非阻塞调用。因此,如果 A 正在对 B 进行阻塞调用,而不是等待回复,A 可以要求 B 在给定地址(A 的地址)上发送其状态并继续。
然后A将单独处理该消息并在必要时回复。
A.fun1():
body of A before blocking call
result = blockingcall()
do things based on result
需要分成:
A.send():
body of A before blocking call
nonblockingcall(A.receive) #A.receive is where B should send results
do other things
A.receive(result):
do things based on result
我正在实施 Gossip Algorithm
,其中多个参与者同时并行传播八卦。当每个 Actor 都听了 10 次八卦后,系统停止。
现在,我有一个场景,在将八卦发送给它之前,我正在检查接收者 actor 的收听计数。如果 listen count 已经是 10,那么八卦将不会发送给接收者 actor。我正在使用同步调用来获取监听计数。
def get_message(server, msg) do
GenServer.call(server, {:get_message, msg})
end
def handle_call({:get_message, msg}, _from, state) do
listen_count = hd(state)
{:reply, listen_count, state}
end
程序在启动时运行良好,但一段时间后 Genserver.call
停止并出现超时错误,如下所示。经过一番调试,我发现Genserver.call
变成了休眠状态,无法启动相应的handle_call
方法。使用同步调用时是否会出现这种行为?由于所有参与者都是独立的,Genserver.call
方法不应该是 运行 独立而不等待彼此的响应。
02:28:05.634 [error] GenServer #PID<0.81.0> terminating
** (stop) exited in: GenServer.call(#PID<0.79.0>, {:get_message, []}, 5000)
** (EXIT) time out
(elixir) lib/gen_server.ex:774: GenServer.call/3
编辑:当 运行 in iex shell.
时,以下代码可以重现错误defmodule RumourActor do
use GenServer
def start_link(opts) do
{:ok, pid} = GenServer.start_link(__MODULE__,opts)
{pid}
end
def set_message(server, msg, recipient) do
GenServer.cast(server, {:set_message, msg, server, recipient})
end
def get_message(server, msg) do
GenServer.call(server, :get_message)
end
def init(opts) do
state=opts
{:ok,state}
end
def handle_cast({:set_message, msg, server, recipient},state) do
:timer.sleep(5000)
c = RumourActor.get_message(recipient, [])
IO.inspect c
{:noreply,state}
end
def handle_call(:get_message, _from, state) do
count = tl(state)
{:reply, count, state}
end
end
打开 iex shell 并加载上面的模块。使用以下命令启动两个进程:
a = RumourActor.start_link(["", 3])
b = RumourActor.start_link(["", 5])
通过调用 Dogbert 在评论中提到的死锁条件来产生错误。 运行以下,时差不大
cb = RumourActor.set_message(elem(a,0), [], elem(b,0))
ca = RumourActor.set_message(elem(b,0), [], elem(a,0))
等待 5 秒。会出现错误。
八卦协议是一种处理异步、未知、未配置(随机)网络的方法,这些网络可能会遭受间歇性中断和分区,并且不存在领导者或默认结构。 (请注意,这种情况在现实世界中有些不寻常,带外控制总是以某种方式强加于系统。)
考虑到这一点,让我们将其更改为异步系统(使用 cast
),以便我们遵循八卦式通信概念的精神。
我们需要消息摘要来计算给定消息已被接收的次数,已接收且已经超过幻数的消息摘要(因此我们不会重新发送消息,如果它是太晚了),以及我们系统中注册的进程列表,以便我们知道我们正在向谁广播:
(以下示例是在 Erlang 中编写的,因为自从我停止使用 Elixir 语法后,我就被绊倒了...)
-module(rumor).
-record(s,
{peers = [] :: [pid()],
digest = #{} :: #{message_id(), non_neg_integer()},
dead = sets:new() :: sets:set(message_id())}).
-type message_id() :: zuuid:uuid().
在这里,我使用 UUID, but it could be whatever. An Erlang reference 作为测试用例会很好,但是由于八卦在 Erlang 集群中没有用,并且引用在原始系统之外是不安全的,我只是跳到假设这是一个网络系统。
我们将需要一个接口函数,允许我们告诉进程将新消息注入系统。我们还需要一个接口函数,一旦它已经在系统中,它就会在两个进程之间发送消息。然后我们需要一个内部函数来向所有已知(订阅)的节点广播消息。啊,这意味着我们需要一个问候接口,以便对等进程可以相互通知它们的存在。
我们还需要一种方法让进程告诉自己随着时间的推移继续广播。设置重传间隔多长时间实际上不是一个简单的决定——它与网络拓扑、延迟、可变性等有关似乎反应迟钝,等等——但我们不打算在这里陷入那种疯狂)。在这里,我只是将其设置为 1 秒,因为对于观察系统的人来说,这是一个易于解释的时间间隔。
请注意下面的一切都是异步的。
接口...
insert(Pid, Message) ->
gen_server:cast(Pid, {insert, Message}).
relay(Pid, ID, Message) ->
gen_server:cast(Pid, {relay, ID, Message}).
greet(Pid) ->
gen_server:cast(Pid, {greet, self()}).
make_introduction(Pid, PeerPid) ->
gen_server:cast(Pid, {make_introduction, PeerPid}).
最后一个函数将成为我们作为系统测试人员的方式,使其中一个进程在某个目标 Pid 上调用 greet/1
,以便他们开始构建对等网络。在现实世界中,通常会发生一些稍微不同的事情。
在接收演员表的 gen_server 回调中,我们将得到:
handle_cast({insert, Message}, State) ->
NewState = do_insert(Message, State);
{noreply, NewState};
handle_cast({relay, ID, Message}, State) ->
NewState = do_relay(ID, Message, State),
{noreply, NewState};
handle_cast({greet, Peer}, State) ->
NewState = do_greet(Peer, State),
{noreply, NewState};
handle_cast({make_introduction, Peer}, State) ->
NewState = do_make_introduction(Peer, State),
{noreply, NewState}.
很简单的东西。
上面我提到我们需要一种方法让这个东西告诉自己在延迟后重新发送。为此,我们将在延迟后使用 erlang:send_after/3
向 "redo_relay" 发送一条裸消息,因此我们将需要一个 handle_info/2 来处理它:
handle_info({redo_relay, ID, Message}, State) ->
NewState = do_relay(ID, Message, State),
{noreply, NewState}.
消息位的实现是有趣的部分,但是 none 这部分非常棘手。请原谅下面的 do_relay/3
-- 它可以更简洁,但我是在浏览器中写的,所以...
do_insert(Message, State = #s{peers = Peers, digest = Digest}) ->
MessageID = zuuid:v1(),
NewDigest = maps:put(MessageID, 1, Digest),
ok = broadcast(Message, Peers),
ok = schedule_resend(MessageID, Message),
State#s{digest = NewDigest}.
do_relay(ID,
Message,
State = #s{peers = Peers, digest = Digest, dead = Dead}) ->
case maps:find(ID, Digest) of
{ok, Count} when Count >= 10 ->
NewDigest = maps:remove(ID, Digest),
NewDead = sets:add_element(ID, Dead),
ok = broadcast(Message, Peers),
State#s{digest = NewDigest, dead = NewDead};
{ok, Count} ->
NewDigest = maps:put(ID, Count + 1),
ok = broadcast(ID, Message, Peers),
ok = schedule_resend(ID, Message),
State#s{digest = NewDigest};
error ->
case set:is_element(ID, Dead) of
true ->
State;
false ->
NewDigest = maps:put(ID, 1),
ok = broadcast(Message, Peers),
ok = schedule_resend(ID, Message),
State#s{digest = NewDigest}
end
end.
broadcast(ID, Message, Peers) ->
Forward = fun(P) -> relay(P, ID, Message),
lists:foreach(Forward, Peers).
schedule_resend(ID, Message) ->
_ = erlang:send_after(1000, self(), {redo_relay, ID, Message}),
ok.
现在我们需要社交位...
do_greet(Peer, State = #s{peers = Peers}) ->
case lists:member(Peer, Peers) of
false -> State#s{peers = [Peer | Peers]};
true -> State
end.
do_make_introduction(Peer, State = #s{peers = Peers}) ->
ok = greet(Peer),
do_greet(Peer, State).
那么上面所有可怕的未指定类型的东西都做了什么?
它避免了任何死锁的可能性。死锁在对等系统中如此致命的原因是任何时候你有两个相同的进程(或参与者,或其他)同步通信,您已经创建了一个潜在死锁的教科书案例。
任何时候 A
有一条同步消息指向 B
并且 B
有一条同步消息指向 A
同时你现在有一个死锁。无法创建 相同的 进程,这些进程在不创建潜在死锁的情况下同步调用彼此。在大规模并发系统中,任何可能发生的事情几乎肯定最终都会发生,所以你迟早会 运行 进入这个。
八卦旨在异步是有原因的:它是一种草率的、不可靠的、低效的方式来处理草率的、不可靠的、低效的网络拓扑。尝试进行调用而不是强制转换不仅会破坏八卦式消息中继的目的,而且 还会 将您推入不可能的死锁区域,从而将协议的性质从异步更改为同步。
Genser.call
的 default timeout 为 5000 毫秒。所以可能发生的情况是,actor 的消息队列中充满了数百万条消息,当它到达 call
时,calling actor 已经超时。
您可以使用 try...catch
:
try do
c = RumourActor.get_message(recipient, [])
catch
:exit, reason ->
# handle timeout
现在,被调用 actor 最终将收到 call
消息并做出响应,这将作为第一个进程的意外消息出现。您需要使用 handle_info
来处理这个问题。所以一种方法是忽略 catch
块中的错误并从 handle_info
.
此外,如果有许多进程等待超时 5 秒后再继续,这将显着降低性能。可以故意减少超时并处理 handle_info
中的回复。这将减少使用 cast
和处理来自其他进程的回复。
您的阻塞调用需要分成两个非阻塞调用。因此,如果 A 正在对 B 进行阻塞调用,而不是等待回复,A 可以要求 B 在给定地址(A 的地址)上发送其状态并继续。 然后A将单独处理该消息并在必要时回复。
A.fun1():
body of A before blocking call
result = blockingcall()
do things based on result
需要分成:
A.send():
body of A before blocking call
nonblockingcall(A.receive) #A.receive is where B should send results
do other things
A.receive(result):
do things based on result