Erlang,尝试 gen_server: call with many responses
Erlang, try to make gen_server: call with many responses
尝试在项目中使用OTP-style,得到一道OTP-interface题。什么方案比较popular/beautiful?
我有:
- web-server 与
mochiweb
- 一个过程,产生许多 (1000-2000) children。
Children 包含状态 (netflow-speed)。如果需要,处理代理消息到 children 并创建新的 children。
在 mochiweb 中,我有一个页面包含所有演员的速度,乳清是如何制作的:
nf_collector ! {get_abonents_speed, self()},
receive
{abonents_speed_count, AbonentsCount} ->
ok
end,
%% write http header, chunked
%% and while AbonentsCount != 0, receive speed and write http
这是not-opt风格,我怎么理解。解决方案:
- 在API同步函数中获取所有带有速度的请求和return包含所有速度的列表。但是我想马上写给客户。
API-function的一个参数是回调:
nf_collector:get_all_speeds(fun (Speed) -> Resp:write_chunk(templater(Speed)) end)
- Return迭代器:
get_all_speeds 的结果之一将与 receive-block 一起使用。每次调用它都会 return
{ok, Speed}
,最后它 return {end}
.
get_all_speeds() ->
nf_collector ! {get_abonents_speed, self()},
receive
{abonents_speed_count, AbonentsCount} ->
ok
end,
{ok, fun() ->
create_receive_fun(AbonentsCount)
end}.
create_receive_fun(0)->
{end};
create_receive_fun(Count)->
receive
{abonent_speed, Speed} ->
Speed
end,
{ok, Speed, create_receive_fun(Count-1)}.
从主管那里生成你的 'children':
-module(ch_sup).
-behaviour(supervisor).
-export([start_link/0, init/1, start_child/1]).
start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> {ok, {{simple_one_for_one}, [{ch, {ch, start_link, []}, transient, 1000, worker, [ch]}]}}.
start_child(Data) -> supervisor:start_child(?MODULE, [Data]).
从 ch_sup:start_child/1 开始(数据是什么)。
将你的 children 实现为 gen_server:
-module(ch).
-behaviour(gen_server).
-record(?MODULE, {speed}).
...
get_speed(Pid, Timeout) ->
try
gen_server:call(Pid, get, Timeout)
catch
exit:{timeout, _} -> timeout;
exit:{noproc, _} -> died
end
.
...
handle_call(get, _From, St) -> {reply, {ok, St#?MODULE.speed}, St} end.
您现在可以使用主管获取 运行 children 的列表并查询它们,尽管您必须接受 child 在获取列表之间死亡的可能性children 并调用它们,显然 child 可能由于某种原因还活着但没有响应,或者响应错误等
returns 上面的 get_speed/2 函数 {ok, Speed} 或死亡或超时。您可以根据您的应用程序需求进行适当的过滤;列表理解很容易,这里有一些。
只是速度:
[Speed || {ok, Speed} <- [ch:get_speed(Pid, 1000) || Pid <-
[Pid || {undefined, Pid, worker, [ch]} <-
supervisor:which_children(ch_sup)
]
]].
Pid 和速度元组:
[{Pid, Speed} || {Pid, {ok, Speed}} <-
[{Pid, ch:get_speed(Pid, 1000)} || Pid <-
[Pid || {undefined, Pid, worker, [ch]} <-
supervisor:which_children(ch_sup)]
]
].
所有结果,包括超时和'died' children 在你到达之前死亡的结果:
[{Pid, Any} || {Pid, Any} <-
[{Pid, ch:get_speed(Pid, 1000)} || Pid <-
[Pid || {undefined, Pid, worker, [ch]} <-
supervisor:which_children(ch_sup)]
]
].
在大多数情况下,除了速度之外,您几乎肯定不想要任何其他东西,因为您打算如何处理死亡和超时?你希望那些死去的人被主管重生,所以当你知道问题时或多或少地解决了问题,超时,与任何错误一样,是一个单独的问题,以你看到的任何方式处理fit...虽然没有必要将故障修复逻辑与数据检索逻辑混合。
现在,我认为您在 post 中遇到的所有这些问题是每次调用超时 1000,每次调用一个接一个是同步的,因此对于 1000 children 和 1 秒超时,可能需要 1000 秒才能产生任何结果。使时间超时 1ms 可能是答案,但正确地做到这一点有点复杂:
get_speeds() ->
ReceiverPid = self(),
Ref = make_ref(),
Pids = [Pid || {undefined, Pid, worker, [ch]} <-
supervisor:which_children(ch_sup)],
lists:foreach(
fun(Pid) -> spawn(
fun() -> ReceiverPid ! {Ref, ch:get_speed(Pid, 1000)} end
) end,
Pids),
receive_speeds(Ref, length(Pids), os_milliseconds(), 1000)
.
receive_speeds(_Ref, 0, _StartTime, _Timeout) ->
[];
receive_speeds(Ref, Remaining, StartTime, Timeout) ->
Time = os_milliseconds(),
TimeLeft = Timeout - Time + StartTime,
receive
{Ref, acc_timeout} ->
[];
{Ref, {ok, Speed}} ->
[Speed | receive_speeds(Ref, Remaining-1, StartTime, Timeout)];
{Ref, _} ->
receive_speeds(Ref, Remaining-1, StartTime, Timeout)
after TimeLeft ->
[]
end
.
os_milliseconds() ->
{OsMegSecs, OsSecs, OsMilSecs} = os:timestamp(),
round(OsMegSecs*1000000 + OsSecs + OsMilSecs/1000)
.
在这里,每个调用都在不同的进程中产生,并收集回复,直到 'master timeout' 或者它们都被接收到。
代码主要是 cut-n-pasted 来自我身边的各种作品,手动编辑和搜索替换,匿名化并删除多余的代码,所以它可能主要是可编译的质量,但我不保证我什么都没弄坏。
尝试在项目中使用OTP-style,得到一道OTP-interface题。什么方案比较popular/beautiful?
我有:
- web-server 与
mochiweb
- 一个过程,产生许多 (1000-2000) children。 Children 包含状态 (netflow-speed)。如果需要,处理代理消息到 children 并创建新的 children。
在 mochiweb 中,我有一个页面包含所有演员的速度,乳清是如何制作的:
nf_collector ! {get_abonents_speed, self()},
receive
{abonents_speed_count, AbonentsCount} ->
ok
end,
%% write http header, chunked
%% and while AbonentsCount != 0, receive speed and write http
这是not-opt风格,我怎么理解。解决方案:
- 在API同步函数中获取所有带有速度的请求和return包含所有速度的列表。但是我想马上写给客户。
API-function的一个参数是回调:
nf_collector:get_all_speeds(fun (Speed) -> Resp:write_chunk(templater(Speed)) end)
- Return迭代器:
get_all_speeds 的结果之一将与 receive-block 一起使用。每次调用它都会 return
{ok, Speed}
,最后它 return{end}
.
get_all_speeds() ->
nf_collector ! {get_abonents_speed, self()},
receive
{abonents_speed_count, AbonentsCount} ->
ok
end,
{ok, fun() ->
create_receive_fun(AbonentsCount)
end}.
create_receive_fun(0)->
{end};
create_receive_fun(Count)->
receive
{abonent_speed, Speed} ->
Speed
end,
{ok, Speed, create_receive_fun(Count-1)}.
从主管那里生成你的 'children':
-module(ch_sup).
-behaviour(supervisor).
-export([start_link/0, init/1, start_child/1]).
start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> {ok, {{simple_one_for_one}, [{ch, {ch, start_link, []}, transient, 1000, worker, [ch]}]}}.
start_child(Data) -> supervisor:start_child(?MODULE, [Data]).
从 ch_sup:start_child/1 开始(数据是什么)。
将你的 children 实现为 gen_server:
-module(ch).
-behaviour(gen_server).
-record(?MODULE, {speed}).
...
get_speed(Pid, Timeout) ->
try
gen_server:call(Pid, get, Timeout)
catch
exit:{timeout, _} -> timeout;
exit:{noproc, _} -> died
end
.
...
handle_call(get, _From, St) -> {reply, {ok, St#?MODULE.speed}, St} end.
您现在可以使用主管获取 运行 children 的列表并查询它们,尽管您必须接受 child 在获取列表之间死亡的可能性children 并调用它们,显然 child 可能由于某种原因还活着但没有响应,或者响应错误等
returns 上面的 get_speed/2 函数 {ok, Speed} 或死亡或超时。您可以根据您的应用程序需求进行适当的过滤;列表理解很容易,这里有一些。
只是速度:
[Speed || {ok, Speed} <- [ch:get_speed(Pid, 1000) || Pid <-
[Pid || {undefined, Pid, worker, [ch]} <-
supervisor:which_children(ch_sup)
]
]].
Pid 和速度元组:
[{Pid, Speed} || {Pid, {ok, Speed}} <-
[{Pid, ch:get_speed(Pid, 1000)} || Pid <-
[Pid || {undefined, Pid, worker, [ch]} <-
supervisor:which_children(ch_sup)]
]
].
所有结果,包括超时和'died' children 在你到达之前死亡的结果:
[{Pid, Any} || {Pid, Any} <-
[{Pid, ch:get_speed(Pid, 1000)} || Pid <-
[Pid || {undefined, Pid, worker, [ch]} <-
supervisor:which_children(ch_sup)]
]
].
在大多数情况下,除了速度之外,您几乎肯定不想要任何其他东西,因为您打算如何处理死亡和超时?你希望那些死去的人被主管重生,所以当你知道问题时或多或少地解决了问题,超时,与任何错误一样,是一个单独的问题,以你看到的任何方式处理fit...虽然没有必要将故障修复逻辑与数据检索逻辑混合。
现在,我认为您在 post 中遇到的所有这些问题是每次调用超时 1000,每次调用一个接一个是同步的,因此对于 1000 children 和 1 秒超时,可能需要 1000 秒才能产生任何结果。使时间超时 1ms 可能是答案,但正确地做到这一点有点复杂:
get_speeds() ->
ReceiverPid = self(),
Ref = make_ref(),
Pids = [Pid || {undefined, Pid, worker, [ch]} <-
supervisor:which_children(ch_sup)],
lists:foreach(
fun(Pid) -> spawn(
fun() -> ReceiverPid ! {Ref, ch:get_speed(Pid, 1000)} end
) end,
Pids),
receive_speeds(Ref, length(Pids), os_milliseconds(), 1000)
.
receive_speeds(_Ref, 0, _StartTime, _Timeout) ->
[];
receive_speeds(Ref, Remaining, StartTime, Timeout) ->
Time = os_milliseconds(),
TimeLeft = Timeout - Time + StartTime,
receive
{Ref, acc_timeout} ->
[];
{Ref, {ok, Speed}} ->
[Speed | receive_speeds(Ref, Remaining-1, StartTime, Timeout)];
{Ref, _} ->
receive_speeds(Ref, Remaining-1, StartTime, Timeout)
after TimeLeft ->
[]
end
.
os_milliseconds() ->
{OsMegSecs, OsSecs, OsMilSecs} = os:timestamp(),
round(OsMegSecs*1000000 + OsSecs + OsMilSecs/1000)
.
在这里,每个调用都在不同的进程中产生,并收集回复,直到 'master timeout' 或者它们都被接收到。
代码主要是 cut-n-pasted 来自我身边的各种作品,手动编辑和搜索替换,匿名化并删除多余的代码,所以它可能主要是可编译的质量,但我不保证我什么都没弄坏。