elixir中频率计算的快速并发算法

fast and concurrent algorithm of frequency calculation in elixir

我有两个大列表,它们的项目长度不是恒定的。每个列表都包含数百万个项目。 我想计算 first list 项在 second list!

中的频率

例如:

a = [[c, d], [a, b, e]]
b = [[a, d, c], [e, a, b], [a, d], [c, d, a]]

# expected result of calculate_frequency(a, b) is %{[c, d] => 2, [a, b, e] => 1} Or [{[c, d], 2}, {[a, b, e], 1}]

由于列表很大,我希望同时完成此过程。 所以我写了这个函数:

  def calculate_frequency(items, data_list) do
    items
    |> Task.async_stream(
      fn item ->
        frequency =
          data_list
          |> Enum.reduce(0, fn data_row, acc ->
            if item -- data_row == [] do
              acc + 1
            else
              acc
            end
          end)

        {item, frequency}
      end,
      ordered: false
    )
    |> Enum.reduce([], fn {:ok, merged}, merged_list -> [merged | merged_list] end)
  end

但是这个算法很慢。我应该怎么做才能让它变快?

PS:请不要考虑输入和输出的类型,执行速度很重要。

将一个列表放入 MapSet

遍历第二个列表,查看每个元素是否在 MapSet.

这在列表的长度上是线性的,两个操作应该能够并行化。

我将从规范化您要比较的数据开始,这样一个简单的相等性检查就可以判断两个项目是否如您定义的那样 "equal"。根据您的代码,我猜想 Enum.sort/1 可以解决问题,尽管 MapSet.new/1 或返回地图的函数如果与您的用例匹配可能会比较快。

defp normalize(item) do
  Enum.sort(item)
end

def calculate_frequency(items, data_list) do
  data_list = Enum.map(data_list, &normalize/1)
  items = Enum.map(items, &normalize/1)
end

如果你要从数据列表中获取大部分频率,我会计算数据列表的所有频率。 Elixir 1.10 引入了 Enum.frequencies/1Enum.frequencies_by/2,但如果需要,您可以使用 reduce 来实现。

def calculate_frequency(items, data_list) do
  data_frequencies = Enum.frequencies_by(data_list, &normalize/1) # does map for you

  Map.new(items, &Map.get(data_frequencies, normalize(&1), 0)) # if you want result as map
end

我没有对我的代码或你的代码进行任何基准测试。如果你想做更多异步的事情,你可以用 Task.async_stream/3 替换你的映射,你可以用 Stream.chunk_every/2Task.async_stream/3 的组合替换你的频率调用(与 Enum.frequencies/1 是函数)和 Map.merge/3.

不确定这是否足够快并且肯定不是并发的。它是 O(m + n),其中 mitems 的大小,ndata_list 的大小。我找不到更快的并发方式,因为合并所有子进程的结果也需要时间。

data_list
|> Enum.reduce(%{}, fn(item, counts)-> 
  Map.update(counts, item, 1, &(&1 + 1)) 
end)
|> Map.take(items)

仅供参考,同时做事并不一定意味着并行做事。如果你只有一个 CPU 核心,并发实际上会减慢速度,因为一个 CPU 核心一次只能做一件事。