在 Scala 中处理多个并发流的惯用方式

Idiomatic way to handle multiple concurrent streams in Scala

我有一个流列表,在调用它们时 next() 将随机休眠一段时间,然后从不同的来源读取一个字符。

我正在尝试编写一个消费者,它将在 EOF 之前一直调用这些流,并在运行时构建这些流的通用字典。

到目前为止,我正在为字典使用 ConcurrentHashMap 并为每个流使用者简单地创建一个新线程。

虽然我的解决方案有效,但它看起来很天真,我想知道是否有更好的流媒体库使用,例如 monixfs2

根据问题的描述和随后的评论,我假设存在多个 Iterator[Char] 来源:

val allSources : Iterable[Iterator[Char]] = ???

问题是:如何从这些迭代器中并发收集String个值,形成String到count的映射。

基于流的解决方案

首先我们需要将每个迭代器转换为基于分隔符的字符串值迭代器:

trait Word {
  val data : String
}

object EmptyWord extends Word {
  override val data = ""
}

case class PartialWord(val data : String) extends Word

case class WholeWord(val data : String) extends Word

val appendToWord : Char => (Word, Char) => Word = 
  (separator) => (originalWord, appendChar) => originalWord match {
    case PartialWord(d) => 
      if(appendChar == separator)
        WholeWord(d)
      else
        PartialWord(d + appendChar)
    case _ => PartialWord(appendChar.toString)
  }

val isWholeWord : Word => Boolean = (_ : Word) match {
  case _ : WholeWord => true
  case _             => false
}

//using space as separator
val convertCharIterator : Iterator[Char] => Iterator[String] = 
  (_ : Iterator[Char])
    .scanLeft(EmptyWord)(appendToWord(' '))
    .filter(isWholeWord)
    .map(_.data)

我们现在可以将所有迭代器转换为字符串,我们可以将所有迭代器组合成一个迭代器:

val allWordSource : Iterator[String] = 
  allSources.map(convertCharIterator)
            .reduceOption( _ ++ _)
            .getOrElse(Iterator.empty[String])

此迭代器现在可以作为计算您的计数的 akka 流的来源:

val addToCounter : (Map[String, Int], String) => Map[String, Int] = 
  (counter, word) => 
    counter.updated(word, counter.getOrElse(word, 0) + 1)

val counter : Future[Map[String, Int]] = 
  Source
    .fromIterator( () => allWordSource)
    .runFold(Map.empty[String, Int])(addToCounter)