Erlang,尝试 gen_server: call with many responses

Erlang, try to make gen_server: call with many responses

尝试在项目中使用OTP-style,得到一道OTP-interface题。什么方案比较popular/beautiful?

我有:

  1. web-server 与 mochiweb
  2. 一个过程,产生许多 (1000-2000) children。 Children 包含状态 (netflow-speed)。如果需要,处理代理消息到 children 并创建新的 children。

在 mochiweb 中,我有一个页面包含所有演员的速度,乳清是如何制作的:

    nf_collector ! {get_abonents_speed, self()},
    receive
        {abonents_speed_count, AbonentsCount} ->
            ok
    end,
%% write http header, chunked
%% and while AbonentsCount != 0,  receive speed and write http

这是not-opt风格,我怎么理解。解决方案:

  1. 在API同步函数中获取所有带有速度的请求和return包含所有速度的列表。但是我想马上写给客户。
  2. API-function的一个参数是回调:

    nf_collector:get_all_speeds(fun (Speed) -> Resp:write_chunk(templater(Speed)) end)
    
  3. Return迭代器: get_all_speeds 的结果之一将与 receive-block 一起使用。每次调用它都会 return {ok, Speed},最后它 return {end}.

get_all_speeds() ->
    nf_collector ! {get_abonents_speed, self()},
    receive
        {abonents_speed_count, AbonentsCount} ->
            ok
    end,
    {ok, fun() -> 
        create_receive_fun(AbonentsCount)
    end}.

create_receive_fun(0)->
    {end};

create_receive_fun(Count)->
        receive
            {abonent_speed, Speed} ->
                Speed
        end,
        {ok, Speed, create_receive_fun(Count-1)}.

从主管那里生成你的 'children':

-module(ch_sup).
-behaviour(supervisor).
-export([start_link/0, init/1, start_child/1]).
start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) -> {ok, {{simple_one_for_one}, [{ch, {ch, start_link, []}, transient, 1000, worker, [ch]}]}}.
start_child(Data) -> supervisor:start_child(?MODULE, [Data]).

从 ch_sup:start_child/1 开始(数据是什么)。

将你的 children 实现为 gen_server:

-module(ch).
-behaviour(gen_server).
-record(?MODULE, {speed}).

...

get_speed(Pid, Timeout) ->
    try
        gen_server:call(Pid, get, Timeout)
    catch
        exit:{timeout, _} -> timeout;
        exit:{noproc, _} -> died
    end
.

...

handle_call(get, _From, St) -> {reply, {ok, St#?MODULE.speed}, St} end.

您现在可以使用主管获取 运行 children 的列表并查询它们,尽管您必须接受 child 在获取列表之间死亡的可能性children 并调用它们,显然 child 可能由于某种原因还活着但没有响应,或者响应错误等

returns 上面的 get_speed/2 函数 {ok, Speed} 或死亡或超时。您可以根据您的应用程序需求进行适当的过滤;列表理解很容易,这里有一些。

只是速度:

[Speed || {ok, Speed} <- [ch:get_speed(Pid, 1000) || Pid <-
    [Pid || {undefined, Pid, worker, [ch]} <-
        supervisor:which_children(ch_sup)
        ]
    ]].

Pid 和速度元组:

[{Pid, Speed} || {Pid, {ok, Speed}} <-
    [{Pid, ch:get_speed(Pid, 1000)} || Pid <-
        [Pid || {undefined, Pid, worker, [ch]} <-
                supervisor:which_children(ch_sup)]
        ]
    ].

所有结果,包括超时和'died' children 在你到达之前死亡的结果:

[{Pid, Any} || {Pid, Any} <-
    [{Pid, ch:get_speed(Pid, 1000)} || Pid <-
        [Pid || {undefined, Pid, worker, [ch]} <-
                supervisor:which_children(ch_sup)]
        ]
    ].

在大多数情况下,除了速度之外,您几乎肯定不想要任何其他东西,因为您打算如何处理死亡和超时?你希望那些死去的人被主管重生,所以当你知道问题时或多或少地解决了问题,超时,与任何错误一样,是一个单独的问题,以你看到的任何方式处理fit...虽然没有必要将故障修复逻辑与数据检索逻辑混合。

现在,我认为您在 post 中遇到的所有这些问题是每次调用超时 1000,每次调用一个接一个是同步的,因此对于 1000 children 和 1 秒超时,可能需要 1000 秒才能产生任何结果。使时间超时 1ms 可能是答案,但正确地做到这一点有点复杂:

get_speeds() ->
    ReceiverPid = self(),
    Ref = make_ref(),
    Pids = [Pid || {undefined, Pid, worker, [ch]} <-
            supervisor:which_children(ch_sup)],
    lists:foreach(
        fun(Pid) -> spawn(
            fun() -> ReceiverPid ! {Ref, ch:get_speed(Pid, 1000)} end
            ) end,
        Pids),
    receive_speeds(Ref, length(Pids), os_milliseconds(), 1000)
.

receive_speeds(_Ref, 0, _StartTime, _Timeout) ->
    [];
receive_speeds(Ref, Remaining, StartTime, Timeout) ->
    Time = os_milliseconds(),
    TimeLeft = Timeout - Time + StartTime,
    receive
        {Ref, acc_timeout} ->
            [];
        {Ref, {ok, Speed}} ->
            [Speed | receive_speeds(Ref, Remaining-1, StartTime, Timeout)];
        {Ref, _} ->
            receive_speeds(Ref, Remaining-1, StartTime, Timeout)
    after TimeLeft ->
        []
    end
.

os_milliseconds() ->
    {OsMegSecs, OsSecs, OsMilSecs} = os:timestamp(),
    round(OsMegSecs*1000000 + OsSecs + OsMilSecs/1000)
.

在这里,每个调用都在不同的进程中产生,并收集回复,直到 'master timeout' 或者它们都被接收到。

代码主要是 cut-n-pasted 来自我身边的各种作品,手动编辑和搜索替换,匿名化并删除多余的代码,所以它可能主要是可编译的质量,但我不保证我什么都没弄坏。