正确管理 Erlang 并发

Properly managing Erlang concurrency

我正在按照以下过程整理一些 Erlang 代码:

- Spawn a number of threads
- Each thread scans an IP address
- Report back to the main thread the results

我有主线程运行这个:

cve_receive() ->
    receive
    Msg ->
            io:fwrite("~s~n", [Msg]),
            cve_receive()
    after 5000 ->
            ok
    end.

它在功能上是正确的并且接收了它应该接收的内容,但我不太确定如何正确管理退出条件。它目前只是超时,但实际情况是所有线程通常在 100 毫秒左右完成,这意味着这张图片中存在毫无意义的超时。真正应该说的是"after all threads terminate".

有惯用的方法吗?

最简单的解决方案是通过在 cve_receive 中保留一个计数器来创建屏障,这样您就可以跟踪已完成的工作进程数。

类似于:

%% First Spawn N work processes
...
%% Then receive N messages.
cve_receive(N).
...
cve_receive(N) ->
    receive
        Msg ->
           io:fwrite("~s~n", [Msg]),
           case N == 1 of
              true  -> all_done;
              false -> cve_receive(N-1)
           end;
        after 5000 ->
           {timeout, N}
    end.        

如果您需要在进程未在超时间隔内完成的情况下执行一些清理,您可以存储进程 ID 列表而不是计数器。 在收到消息和超时时删除 pid 你可以,例如杀死列表中剩余的所有进程。

编辑:如果工作进程崩溃,上面的简单解决方案仍然有效,因为您最终会超时。但是您可以使用 monitor/2 函数并在 cve_receive 函数中处理 DOWN 消息。

为了使@johlo 的回答更加地道:

cve_receive(0) ->
  all_done;
cve_receive(N) ->
  receive
    Msg ->
       io:fwrite("~s~n", [Msg]),
       cve_receive(N-1)
    after 5000 ->
      {timeout, N}
  end.