分布式 Erlang:多路调用超出请求的超时时间
Distributed Erlang: multicall exceeds requested timeout
我们使用分布式 erlang 集群,现在我在网络分裂的情况下测试它。
为了从集群的所有节点获取信息,我使用 gen_server:multicall/4 并定义了超时。我需要的是尽快从可用节点获取信息。所以超时不会太大(大约 3000 毫秒)。
这里调用示例:
Timeout = 3000
Nodes = AllConfiguredNodes
gen_server:multi_call(Nodes, broker, get_score, Timeout)
我预计此调用 returns 会导致超时毫秒。但在净分割的情况下它不会。它等待大约。 8 秒。
我发现 multi_call 请求在发送请求之前在调用 erlang:monitor(process, {Name, Node})
中额外暂停了 5 秒。
我真的不在乎某些节点不回复或忙碌或不可用,我可以使用任何其他节点但是由于这种停止我不得不等到 Erlang VM
尝试与 dead/not 个可用节点建立新连接。
问题是:你知道可以防止这种停机的解决方案吗?或者可能是另一个适合我情况的 RPC。
我不确定我是否完全理解你要解决的问题,但如果是获得所有可以在 X 时间内检索到的答案并忽略其余部分,你可以尝试组合async_call 和 nb_yield.
也许像
somefun() ->
SmallTimeMs = 50,
Nodes = AllConfiguredNodes,
Promises = [rpc:async_call(N, some_mod, some_fun, ArgList) || N <- Nodes],
get_results([], Promises, SmallTimeMs).
get_results(Results, _Promises, _SmallTimeMs) when length(Results) > 1 -> % Replace 1 with whatever is the minimum acceptable number of results
lists:flatten(Results);
get_results(Results, Promises, SmallTimeMs) ->
Rs = get_promises(Promises, SmallTimeMs)
get_results([Results|Rs], Promises, SmallTimeMs)).
get_promise(Promises, WaitMs) ->
[rpc:nb_yield(Key, WaitMs) || Key <- Promises].
有关详细信息,请参阅:http://erlang.org/doc/man/rpc.html#async_call-4。
我的问题解决方法。
我自己实现了使用 gen_server:call
的 multicall
基本思想是在单独的进程中使用 gen_server:call() 调用所有节点。并收集这些调用的结果。通过从调用进程的邮箱接收消息进行收集。
为了控制超时,我在超时到期时计算截止日期,然后将其用作参考点来计算 receive
中 after
的超时。
实施
主要功能是:
multicall(Nodes, Name, Req, Timeout) ->
Refs = lists:map(fun(Node) -> call_node(Node, Name, Req, Timeout) end, Nodes),
Results = read_all(Timeout, Refs),
PosResults = [ { Node, Result } || { ok, { ok, { Node, Result } } } <- Results ],
{ PosResults, calc_bad_nodes(Nodes, PosResults) }.
这里的想法是调用所有节点并在一个超时内等待所有结果。
从派生进程调用一个节点。它捕获 gen_server:call
在出现错误时使用的出口。
call_node(Node, Name, Req, Timeout) ->
Ref = make_ref(),
Self = self(),
spawn_link(fun() ->
try
Result = gen_server:call({Name,Node},Req,Timeout),
Self ! { Ref, { ok, { Node, Result } } }
catch
exit:Exit ->
Self ! { Ref, { error, { 'EXIT', Exit } } }
end
end),
Ref.
坏节点计算为在Timout内没有响应的节点
calc_bad_nodes(Nodes, PosResults) ->
{ GoodNodes, _ } = lists:unzip(PosResults),
[ BadNode || BadNode <- Nodes, not lists:member(BadNode, GoodNodes) ].
通过超时读取邮箱收集结果
read_all(ReadList, Timeout) ->
Now = erlang:monotonic_time(millisecond),
Deadline = Now + Timeout,
read_all_impl(ReadList, Deadline, []).
执行读取直到没有出现截止日期
read_all_impl([], _, Results) ->
lists:reverse(Results);
read_all_impl([ W | Rest ], expired, Results) ->
R = read(0, W),
read_all_impl(Rest, expired, [R | Results ]);
read_all_impl([ W | Rest ] = L, Deadline, Results) ->
Now = erlang:monotonic_time(millisecond),
case Deadline - Now of
Timeout when Timeout > 0 ->
R = read(Timeout, W),
case R of
{ ok, _ } ->
read_all_impl(Rest, Deadline, [ R | Results ]);
{ error, { read_timeout, _ } } ->
read_all_impl(Rest, expired, [ R | Results ])
end;
Timeout when Timeout =< 0 ->
read_all_impl(L, expired, Results)
end.
一次读取只是从邮箱接收超时。
read(Timeout, Ref) ->
receive
{ Ref, Result } ->
{ ok, Result }
after Timeout ->
{ error, { read_timeout, Timeout } }
end.
进一步改进:
- rpc 模块产生单独的进程以避免迟到的答案的垃圾。所以在这个 multicall 函数中做同样的事情会很有用
infinity
超时可以用显而易见的方式处理
我们使用分布式 erlang 集群,现在我在网络分裂的情况下测试它。
为了从集群的所有节点获取信息,我使用 gen_server:multicall/4 并定义了超时。我需要的是尽快从可用节点获取信息。所以超时不会太大(大约 3000 毫秒)。 这里调用示例:
Timeout = 3000
Nodes = AllConfiguredNodes
gen_server:multi_call(Nodes, broker, get_score, Timeout)
我预计此调用 returns 会导致超时毫秒。但在净分割的情况下它不会。它等待大约。 8 秒。
我发现 multi_call 请求在发送请求之前在调用 erlang:monitor(process, {Name, Node})
中额外暂停了 5 秒。
我真的不在乎某些节点不回复或忙碌或不可用,我可以使用任何其他节点但是由于这种停止我不得不等到 Erlang VM 尝试与 dead/not 个可用节点建立新连接。
问题是:你知道可以防止这种停机的解决方案吗?或者可能是另一个适合我情况的 RPC。
我不确定我是否完全理解你要解决的问题,但如果是获得所有可以在 X 时间内检索到的答案并忽略其余部分,你可以尝试组合async_call 和 nb_yield.
也许像
somefun() ->
SmallTimeMs = 50,
Nodes = AllConfiguredNodes,
Promises = [rpc:async_call(N, some_mod, some_fun, ArgList) || N <- Nodes],
get_results([], Promises, SmallTimeMs).
get_results(Results, _Promises, _SmallTimeMs) when length(Results) > 1 -> % Replace 1 with whatever is the minimum acceptable number of results
lists:flatten(Results);
get_results(Results, Promises, SmallTimeMs) ->
Rs = get_promises(Promises, SmallTimeMs)
get_results([Results|Rs], Promises, SmallTimeMs)).
get_promise(Promises, WaitMs) ->
[rpc:nb_yield(Key, WaitMs) || Key <- Promises].
有关详细信息,请参阅:http://erlang.org/doc/man/rpc.html#async_call-4。
我的问题解决方法。
我自己实现了使用 gen_server:call
的 multicall
基本思想是在单独的进程中使用 gen_server:call() 调用所有节点。并收集这些调用的结果。通过从调用进程的邮箱接收消息进行收集。
为了控制超时,我在超时到期时计算截止日期,然后将其用作参考点来计算 receive
中 after
的超时。
实施
主要功能是:
multicall(Nodes, Name, Req, Timeout) ->
Refs = lists:map(fun(Node) -> call_node(Node, Name, Req, Timeout) end, Nodes),
Results = read_all(Timeout, Refs),
PosResults = [ { Node, Result } || { ok, { ok, { Node, Result } } } <- Results ],
{ PosResults, calc_bad_nodes(Nodes, PosResults) }.
这里的想法是调用所有节点并在一个超时内等待所有结果。
从派生进程调用一个节点。它捕获 gen_server:call
在出现错误时使用的出口。
call_node(Node, Name, Req, Timeout) ->
Ref = make_ref(),
Self = self(),
spawn_link(fun() ->
try
Result = gen_server:call({Name,Node},Req,Timeout),
Self ! { Ref, { ok, { Node, Result } } }
catch
exit:Exit ->
Self ! { Ref, { error, { 'EXIT', Exit } } }
end
end),
Ref.
坏节点计算为在Timout内没有响应的节点
calc_bad_nodes(Nodes, PosResults) ->
{ GoodNodes, _ } = lists:unzip(PosResults),
[ BadNode || BadNode <- Nodes, not lists:member(BadNode, GoodNodes) ].
通过超时读取邮箱收集结果
read_all(ReadList, Timeout) ->
Now = erlang:monotonic_time(millisecond),
Deadline = Now + Timeout,
read_all_impl(ReadList, Deadline, []).
执行读取直到没有出现截止日期
read_all_impl([], _, Results) ->
lists:reverse(Results);
read_all_impl([ W | Rest ], expired, Results) ->
R = read(0, W),
read_all_impl(Rest, expired, [R | Results ]);
read_all_impl([ W | Rest ] = L, Deadline, Results) ->
Now = erlang:monotonic_time(millisecond),
case Deadline - Now of
Timeout when Timeout > 0 ->
R = read(Timeout, W),
case R of
{ ok, _ } ->
read_all_impl(Rest, Deadline, [ R | Results ]);
{ error, { read_timeout, _ } } ->
read_all_impl(Rest, expired, [ R | Results ])
end;
Timeout when Timeout =< 0 ->
read_all_impl(L, expired, Results)
end.
一次读取只是从邮箱接收超时。
read(Timeout, Ref) ->
receive
{ Ref, Result } ->
{ ok, Result }
after Timeout ->
{ error, { read_timeout, Timeout } }
end.
进一步改进:
- rpc 模块产生单独的进程以避免迟到的答案的垃圾。所以在这个 multicall 函数中做同样的事情会很有用
infinity
超时可以用显而易见的方式处理