用于增长列表的 scalaz 流结构

scalaz stream structure for growing lists

我有一种预感,我可以(应该?)使用 scalaz-streams 来解决我的问题,就像这样。

我有一个起始项 A。我有一个接受 A 和 returns A 列表的函数。

def doSomething(a : A) : List[A]

我有一个以 1 项(起始项)开头的工作队列。当我们处理 (doSomething) 每个项目时,它可能会将许多项目添加到同一工作队列的末尾。然而,在某些时候(在数百万个项目之后)我们 doSomething 上的每个后续项目将开始向工作队列添加越来越少的项目,最终不会添加新项目(doSomething 将 return Nil for这些项目)。这就是我们知道计算最终会终止的方式。

假设 scalaz-streams 适用于此,请给我一些提示,说明我应该考虑哪种整体结构或类型来实现它?

一旦使用单个 "worker" 完成了一个简单的实现,我还想使用多个 worker 来并行处理队列项,例如有一个 5 名工人的池子(每个工人都会将其任务分配给代理来计算doSomething)所以我需要在这个算法中处理影响(例如工人故障)。

所以 "how?" 的答案是:

import scalaz.stream._
import scalaz.stream.async._
import Process._

def doSomething(i: Int) = if (i == 0) Nil else List(i - 1)

val q = unboundedQueue[Int]
val out = unboundedQueue[Int]

q.dequeue
 .flatMap(e => emitAll(doSomething(e)))
 .observe(out.enqueue)
 .to(q.enqueue).run.runAsync(_ => ()) //runAsync can process failures, there is `.onFailure` as well

q.enqueueAll(List(3,5,7)).run
q.size.continuous
 .filter(0==)
 .map(_ => -1)
 .to(out.enqueue).once.run.runAsync(_ => ()) //call it only after enqueueAll

import scalaz._, Scalaz._
val result = out
  .dequeue
  .takeWhile(_ != -1)
  .map(_.point[List])
  .foldMonoid.runLast.run.get //run synchronously

结果:

result: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)

但是,您可能会注意到:

1) 我必须解决终止问题。 akka-stream 也有同样的问题,而且更难解决,因为您无法访问队列,并且没有自然的 back-pressure 来保证队列不会因为 fast-readers 而为空。

2) 我不得不为输出引入另一个队列(并将其转换为 List),因为工作队列在计算结束时变空了。

所以,这两个库都不太适应这种要求(有限流),但是 scalaz-stream(在删除 scalaz 依赖后将变成 "fs2")足够灵活来实现你的想法.最大的 "but" 是默认情况下它会 运行 顺序。有(至少)两种方法可以让它更快:

1) 将你的 doSomething 分成多个阶段,例如 .flatMap(doSomething1).flatMap(doSomething2).map(doSomething3),然后在它们之间放置另一个队列(如果阶段花费的时间相同,速度大约快 3 倍)。

2) 并行队列处理。 Akka 有 mapAsync - 它可以自动并行执行 maps。 Scalaz-stream 有块 - 你可以将你的 q 分成多个块,比方说 5,然后并行处理块内的每个元素。无论如何,这两种解决方案(akka vs scalaz)都不太适合使用一个队列作为输入和输出。

但是,再一次,它太复杂而且毫无意义,因为有一个经典简单方式:

@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] = 
  if (l.isEmpty) acc else { 
    val processed = l.flatMap(doSomething) 
    calculate(processed, acc ++ processed) 
  }

scala> calculate(List(3,5,7), Nil)
res5: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)

这是并行化的:

@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] = 
  if (l.isEmpty) acc else { 
    val processed = l.par.flatMap(doSomething).toList
    calculate(processed, acc ++ processed) 
  }

scala> calculate(List(3,5,7), Nil)
res6: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)

所以,是的,我会说 scalaz-stream 和 akka-streams 都不符合您的要求;然而,经典的 Scala 并行集合非常适合。

如果您需要跨多个 JVM 的分布式计算 - 看看 Apache Spark,它的 scala-dsl 使用相同的 map/flatMap/fold 风格。它允许您处理不适合 JVM 内存的大型集合(通过跨 JVM 扩展它们),因此您可以通过使用 RDD 而不是 List 来改进 @tailrec def calculate。它还将为您提供处理 doSomething.

中的故障的工具

P.S。所以这就是为什么我不喜欢使用流媒体库来完成此类任务的原因。流式处理更像是来自某些外部系统(如 HttpRequests)的无限流,而不是预定义(甚至大)数据的计算。

P.S.2 如果您需要 reactive-like(不阻塞),您可以使用 Future(或 scalaz.concurrent.Task)+ Future.sequence