并行聚合不适用于列表 .length > 8

Parallel Aggregate is not working on lists .length > 8

我正在编写一个小型练习应用程序,用于计算 stringsseq 中唯一字母(包括 Unicode)的数量,我正在为此使用 aggregate,当我尝试 运行 并行

这是我的代码:

class Frequency(seq: Seq[String]) {
  type FreqMap = Map[Char, Int]

  def calculate() = {
    val freqMap: FreqMap = Map[Char, Int]()
    val pattern = "(\p{L}+)".r
    val seqop: (FreqMap, String) => FreqMap = (fm, s) => {
      s.toLowerCase().foldLeft(freqMap){(fm, c) =>
        c match {
          case pattern(char) => fm.get(char) match {
            case None => fm+((char, 1))
            case Some(i) => fm.updated(char, i+1)
          }
          case _ => fm
        }
      }
    }

    val reduce: (FreqMap, FreqMap) => FreqMap =
      (m1, m2) => {
        m1 ++ m2.map { case (k, v) => k -> (v + m1.getOrElse(k, 0)) }
      }

    seq.par.aggregate(freqMap)(seqop, reduce)

  }
}

然后是使用该

的代码
object Frequency extends App {

  val text = List("abc", "abc", "abc", "abc", "abc", "abc", "abc", "abc", "abc");

  def frequency(seq: Seq[String]):Map[Char, Int] = {
    new Frequency(seq).calculate()
  }

  Console println frequency(seq=text)
}

虽然我提供了 "abc" 9 次,结果是 Map(a -> 8, b -> 8, c -> 8),因为它是任何数量的 "abc" > 8

我查看了 this,我似乎正确地使用了聚合

有什么建议可以让它发挥作用吗?

您正在丢弃 seqop 中已收集的结果(第一个 fm)。您需要将这些添加到您正在计算的新结果中,例如像这样:

def calculate() = {
    val freqMap: FreqMap = Map[Char, Int]()
    val pattern = "(\p{L}+)".r
    val reduce: (FreqMap, FreqMap) => FreqMap =
      (m1, m2) => {
        m1 ++ m2.map { case (k, v) => k -> (v + m1.getOrElse(k, 0)) }
      }
    val seqop: (FreqMap, String) => FreqMap = (fm, s) => {
      val res = s.toLowerCase().foldLeft(freqMap){(fm, c) =>
        c match {
          case pattern(char) => fm.get(char) match {
            case None => fm+((char, 1))
            case Some(i) => fm.updated(char, i+1)
          }
          case _ => fm
        }
      }

      // I'm reusing your existing combinator function here:
      reduce(res,fm)
    }

    seq.par.aggregate(freqMap)(seqop, reduce)
}

根据并行集合如何划分工作,您会丢弃其中的一些工作。在您的情况下 (9x "abc") 它将事物分为 8 个并行 seqop 操作,这意味着您只丢弃一个结果集。这取决于数字,如果你 运行 说 17x "abc" 它 运行s 在 13 个并行操作中,丢弃 4 个结果集(无论如何在我的机器上 - 我不熟悉底层代码以及它如何划分工作,这可能取决于使用的 ExecutionContext/Threadpool 和随后的 CPUs/cores 数量等等)。

通常并行集合是顺序集合的替代品,这意味着如果你放弃 .par 你应该仍然得到相同的结果,尽管通常更慢。如果您使用原始代码执行此操作,您会得到结果 1,这告诉您这不是并行化问题。这是一个很好的方法来测试你在使用这些时是否做对了。

最后但同样重要的是:这对我来说比平时更难发现,因为您使用了相同的变量名两次,然后又使用了阴影 fm。不这样做会使代码更具可读性,并且更容易发现诸如此类的错误。