在 Scala 中处理多个并发流的惯用方式
Idiomatic way to handle multiple concurrent streams in Scala
我有一个流列表,在调用它们时 next()
将随机休眠一段时间,然后从不同的来源读取一个字符。
我正在尝试编写一个消费者,它将在 EOF
之前一直调用这些流,并在运行时构建这些流的通用字典。
到目前为止,我正在为字典使用 ConcurrentHashMap
并为每个流使用者简单地创建一个新线程。
虽然我的解决方案有效,但它看起来很天真,我想知道是否有更好的流媒体库使用,例如 monix
或 fs2
根据问题的描述和随后的评论,我假设存在多个 Iterator[Char]
来源:
val allSources : Iterable[Iterator[Char]] = ???
问题是:如何从这些迭代器中并发收集String
个值,形成String到count的映射。
基于流的解决方案
首先我们需要将每个迭代器转换为基于分隔符的字符串值迭代器:
trait Word {
val data : String
}
object EmptyWord extends Word {
override val data = ""
}
case class PartialWord(val data : String) extends Word
case class WholeWord(val data : String) extends Word
val appendToWord : Char => (Word, Char) => Word =
(separator) => (originalWord, appendChar) => originalWord match {
case PartialWord(d) =>
if(appendChar == separator)
WholeWord(d)
else
PartialWord(d + appendChar)
case _ => PartialWord(appendChar.toString)
}
val isWholeWord : Word => Boolean = (_ : Word) match {
case _ : WholeWord => true
case _ => false
}
//using space as separator
val convertCharIterator : Iterator[Char] => Iterator[String] =
(_ : Iterator[Char])
.scanLeft(EmptyWord)(appendToWord(' '))
.filter(isWholeWord)
.map(_.data)
我们现在可以将所有迭代器转换为字符串,我们可以将所有迭代器组合成一个迭代器:
val allWordSource : Iterator[String] =
allSources.map(convertCharIterator)
.reduceOption( _ ++ _)
.getOrElse(Iterator.empty[String])
此迭代器现在可以作为计算您的计数的 akka 流的来源:
val addToCounter : (Map[String, Int], String) => Map[String, Int] =
(counter, word) =>
counter.updated(word, counter.getOrElse(word, 0) + 1)
val counter : Future[Map[String, Int]] =
Source
.fromIterator( () => allWordSource)
.runFold(Map.empty[String, Int])(addToCounter)
我有一个流列表,在调用它们时 next()
将随机休眠一段时间,然后从不同的来源读取一个字符。
我正在尝试编写一个消费者,它将在 EOF
之前一直调用这些流,并在运行时构建这些流的通用字典。
到目前为止,我正在为字典使用 ConcurrentHashMap
并为每个流使用者简单地创建一个新线程。
虽然我的解决方案有效,但它看起来很天真,我想知道是否有更好的流媒体库使用,例如 monix
或 fs2
根据问题的描述和随后的评论,我假设存在多个 Iterator[Char]
来源:
val allSources : Iterable[Iterator[Char]] = ???
问题是:如何从这些迭代器中并发收集String
个值,形成String到count的映射。
基于流的解决方案
首先我们需要将每个迭代器转换为基于分隔符的字符串值迭代器:
trait Word {
val data : String
}
object EmptyWord extends Word {
override val data = ""
}
case class PartialWord(val data : String) extends Word
case class WholeWord(val data : String) extends Word
val appendToWord : Char => (Word, Char) => Word =
(separator) => (originalWord, appendChar) => originalWord match {
case PartialWord(d) =>
if(appendChar == separator)
WholeWord(d)
else
PartialWord(d + appendChar)
case _ => PartialWord(appendChar.toString)
}
val isWholeWord : Word => Boolean = (_ : Word) match {
case _ : WholeWord => true
case _ => false
}
//using space as separator
val convertCharIterator : Iterator[Char] => Iterator[String] =
(_ : Iterator[Char])
.scanLeft(EmptyWord)(appendToWord(' '))
.filter(isWholeWord)
.map(_.data)
我们现在可以将所有迭代器转换为字符串,我们可以将所有迭代器组合成一个迭代器:
val allWordSource : Iterator[String] =
allSources.map(convertCharIterator)
.reduceOption( _ ++ _)
.getOrElse(Iterator.empty[String])
此迭代器现在可以作为计算您的计数的 akka 流的来源:
val addToCounter : (Map[String, Int], String) => Map[String, Int] =
(counter, word) =>
counter.updated(word, counter.getOrElse(word, 0) + 1)
val counter : Future[Map[String, Int]] =
Source
.fromIterator( () => allWordSource)
.runFold(Map.empty[String, Int])(addToCounter)