Erlang 进程和消息传递架构

erlang processes and message passing architecture

我手头的任务是读取大文件的行,处理它们,return排序结果。

我的算法是:

  1. 从评估工作量的主进程开始(写在文件的第一行)
  2. spawn worker processes:每个worker将使用pread/3读取文件的一部分,处理这部分,并将结果发送给master
  3. master接收所有子结果,排序,return 所以工人之间基本上不需要沟通。

我的问题:

  1. 如何在erlang进程数和核心数之间找到最佳平衡点?因此,如果我为每个处理器核心生成一个进程,我的 cpu?
  2. pread/3如何到达指定行;它是否遍历文件中的所有行? pread/3 是并行文件读取的好计划吗?
  3. 从进程A向B发送一条大消息好还是发送N条小消息好?我在下面 link 中找到了部分答案,但希望进一步阐述
  1. Erlang 进程很便宜。您可以自由(并鼓励)使用比您拥有的内核更多的内核。对于您的问题的实用性可能存在上限(根据行大小,在一个进程中每行加载 1TB 的数据要求有点高)。

    当您不知道时,最简单的方法是让用户决定。这意味着您可以决定派生 N 个工人,并在他们之间分配工作,等待回音。如果您不喜欢 运行s.

    ,请在更改 N 的同时重新 运行 程序

    更棘手的方法是对大量时间进行基准测试,选择您认为有意义的最大值,将其粘贴到池库中(如果您愿意;一些池用于预分配资源,一些用于一个可调整大小的数量),并满足于一个放之四海而皆准的解决方案。

    但说真的,没有那么容易'optimal number of cores'。如果需要,您可以 运行 在 50 个进程以及其中的 65,000 个进程上使用它;如果任务并行得令人尴尬,VM 应该能够利用其中的大部分并使核心饱和。

-

  1. 并行文件读取是一个有趣的问题。它可能会或可能不会更快(正如直接评论所提到的),并且如果每行上的工作足够少以至于读取文件的成本最大,它可能只代表加速。

    棘手的一点实际上是像 pread/2-3 这样的函数需要一个字节偏移量。您的问题措辞使您担心文件的 。因此,您交给工作人员的字节偏移量最终可能会跨越一条线。如果您的块以 this is my line\nhere it goes\n 中的单词 my 结束,一个工作人员将看到自己有一个不完整的行,而另一个将仅报告 my line\n,而缺少之前的 this is.

    通常,这种烦人的事情会导致您让第一个进程拥有文件并对其进行筛选,只是将一些文本交给工作人员进行处理;然后该进程将充当某种协调器。

    此策略的好处在于,如果主进程知道作为消息发送的所有内容,它也知道何时收到所有响应,从而很容易知道何时 return 结果.如果一切都是不相交的,你必须相信启动器和工作人员都会告诉你 "we're all out of work" 作为一组不同的独立消息要知道。

    在实践中,您可能会发现最有帮助的是了解在文件操作方面对硬件寿命有帮助的操作,而不是 "how many people can read the file at once"。只有一个硬盘(或SSD),无论如何所有数据都必须通过它;并行性最终可能会限制那里的访问。

-

  1. 使用对您的程序有意义的消息。性能最高的程序将有许多进程能够在不需要传递消息、通信或获取锁的情况下工作。

    一个更现实的高性能程序将使用非常小的非常少的消息。

    有趣的是您的问题本质上是基于数据的。因此,您可以做一些事情:

    • 确保您以二进制格式阅读文本;大型二进制文件(> 64b)在全局二进制堆上分配,共享并使用引用计数进行 GC
    • 提交关于需要做什么的信息,而不是做这件事的数据;这个需要测量,但是领导进程可以检查文件,记下行结束的位置,然后将字节偏移量交给工作人员,这样他们就可以自己去读取文件;请注意,您最终会读取文件两次,因此如果内存分配不是您的主要开销,这可能会更慢
    • 确保文件是在rawram模式下读取的;其他模式使用中间人进程来读取和转发数据(如果您在集群 Erlang 节点中通过网络读取文件,这将很有用); rawram 模式将文件描述符直接提供给调用进程,速度要快得多。
    • 首先担心编写清晰、可读且正确的程序。只有当它太慢时,你才应该尝试重构和优化它;您可能会在第一次尝试时发现它足够好。

希望对您有所帮助。

P.S。您可以先尝试真正简单的东西:

  1. 任一:

    • 使用 {ok, Bin} = file:read_file(Path) 和拆分行(使用 binary:split(Bin, <<"\n">>, [global]))一次读取整个文件,
    • 使用 {ok, Io} = file:open(File, [read,ram]) 然后在文件描述符上重复使用 file:read_line(Io)
    • 使用 {ok, Io} = file:open(File, [read,raw,{read_ahead,BlockSize}]) 然后在文件描述符上重复使用 file:read_line(Io)
  2. 调用 rpc:pmap({?MODULE, Function}, ExtraArgs, Lines) 到 运行 一切自动并行(每行会产生一个进程)

  3. 对结果调用 lists:sort/1

如果您发现每个步骤有问题,您可以从那里改进每个步骤。