用于增长列表的 scalaz 流结构
scalaz stream structure for growing lists
我有一种预感,我可以(应该?)使用 scalaz-streams 来解决我的问题,就像这样。
我有一个起始项 A。我有一个接受 A 和 returns A 列表的函数。
def doSomething(a : A) : List[A]
我有一个以 1 项(起始项)开头的工作队列。当我们处理 (doSomething
) 每个项目时,它可能会将许多项目添加到同一工作队列的末尾。然而,在某些时候(在数百万个项目之后)我们 doSomething
上的每个后续项目将开始向工作队列添加越来越少的项目,最终不会添加新项目(doSomething 将 return Nil for这些项目)。这就是我们知道计算最终会终止的方式。
假设 scalaz-streams 适用于此,请给我一些提示,说明我应该考虑哪种整体结构或类型来实现它?
一旦使用单个 "worker" 完成了一个简单的实现,我还想使用多个 worker 来并行处理队列项,例如有一个 5 名工人的池子(每个工人都会将其任务分配给代理来计算doSomething
)所以我需要在这个算法中处理影响(例如工人故障)。
所以 "how?" 的答案是:
import scalaz.stream._
import scalaz.stream.async._
import Process._
def doSomething(i: Int) = if (i == 0) Nil else List(i - 1)
val q = unboundedQueue[Int]
val out = unboundedQueue[Int]
q.dequeue
.flatMap(e => emitAll(doSomething(e)))
.observe(out.enqueue)
.to(q.enqueue).run.runAsync(_ => ()) //runAsync can process failures, there is `.onFailure` as well
q.enqueueAll(List(3,5,7)).run
q.size.continuous
.filter(0==)
.map(_ => -1)
.to(out.enqueue).once.run.runAsync(_ => ()) //call it only after enqueueAll
import scalaz._, Scalaz._
val result = out
.dequeue
.takeWhile(_ != -1)
.map(_.point[List])
.foldMonoid.runLast.run.get //run synchronously
结果:
result: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
但是,您可能会注意到:
1) 我必须解决终止问题。 akka-stream 也有同样的问题,而且更难解决,因为您无法访问队列,并且没有自然的 back-pressure 来保证队列不会因为 fast-readers 而为空。
2) 我不得不为输出引入另一个队列(并将其转换为 List
),因为工作队列在计算结束时变空了。
所以,这两个库都不太适应这种要求(有限流),但是 scalaz-stream(在删除 scalaz 依赖后将变成 "fs2")足够灵活来实现你的想法.最大的 "but" 是默认情况下它会 运行 顺序。有(至少)两种方法可以让它更快:
1) 将你的 doSomething 分成多个阶段,例如 .flatMap(doSomething1).flatMap(doSomething2).map(doSomething3)
,然后在它们之间放置另一个队列(如果阶段花费的时间相同,速度大约快 3 倍)。
2) 并行队列处理。 Akka 有 mapAsync
- 它可以自动并行执行 map
s。 Scalaz-stream 有块 - 你可以将你的 q 分成多个块,比方说 5,然后并行处理块内的每个元素。无论如何,这两种解决方案(akka vs scalaz)都不太适合使用一个队列作为输入和输出。
但是,再一次,它太复杂而且毫无意义,因为有一个经典简单方式:
@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] =
if (l.isEmpty) acc else {
val processed = l.flatMap(doSomething)
calculate(processed, acc ++ processed)
}
scala> calculate(List(3,5,7), Nil)
res5: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
这是并行化的:
@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] =
if (l.isEmpty) acc else {
val processed = l.par.flatMap(doSomething).toList
calculate(processed, acc ++ processed)
}
scala> calculate(List(3,5,7), Nil)
res6: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
所以,是的,我会说 scalaz-stream 和 akka-streams 都不符合您的要求;然而,经典的 Scala 并行集合非常适合。
如果您需要跨多个 JVM 的分布式计算 - 看看 Apache Spark,它的 scala-dsl 使用相同的 map/flatMap/fold 风格。它允许您处理不适合 JVM 内存的大型集合(通过跨 JVM 扩展它们),因此您可以通过使用 RDD 而不是 List
来改进 @tailrec def calculate
。它还将为您提供处理 doSomething
.
中的故障的工具
P.S。所以这就是为什么我不喜欢使用流媒体库来完成此类任务的原因。流式处理更像是来自某些外部系统(如 HttpRequests)的无限流,而不是预定义(甚至大)数据的计算。
P.S.2 如果您需要 reactive-like(不阻塞),您可以使用 Future
(或 scalaz.concurrent.Task
)+ Future.sequence
我有一种预感,我可以(应该?)使用 scalaz-streams 来解决我的问题,就像这样。
我有一个起始项 A。我有一个接受 A 和 returns A 列表的函数。
def doSomething(a : A) : List[A]
我有一个以 1 项(起始项)开头的工作队列。当我们处理 (doSomething
) 每个项目时,它可能会将许多项目添加到同一工作队列的末尾。然而,在某些时候(在数百万个项目之后)我们 doSomething
上的每个后续项目将开始向工作队列添加越来越少的项目,最终不会添加新项目(doSomething 将 return Nil for这些项目)。这就是我们知道计算最终会终止的方式。
假设 scalaz-streams 适用于此,请给我一些提示,说明我应该考虑哪种整体结构或类型来实现它?
一旦使用单个 "worker" 完成了一个简单的实现,我还想使用多个 worker 来并行处理队列项,例如有一个 5 名工人的池子(每个工人都会将其任务分配给代理来计算doSomething
)所以我需要在这个算法中处理影响(例如工人故障)。
所以 "how?" 的答案是:
import scalaz.stream._
import scalaz.stream.async._
import Process._
def doSomething(i: Int) = if (i == 0) Nil else List(i - 1)
val q = unboundedQueue[Int]
val out = unboundedQueue[Int]
q.dequeue
.flatMap(e => emitAll(doSomething(e)))
.observe(out.enqueue)
.to(q.enqueue).run.runAsync(_ => ()) //runAsync can process failures, there is `.onFailure` as well
q.enqueueAll(List(3,5,7)).run
q.size.continuous
.filter(0==)
.map(_ => -1)
.to(out.enqueue).once.run.runAsync(_ => ()) //call it only after enqueueAll
import scalaz._, Scalaz._
val result = out
.dequeue
.takeWhile(_ != -1)
.map(_.point[List])
.foldMonoid.runLast.run.get //run synchronously
结果:
result: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
但是,您可能会注意到:
1) 我必须解决终止问题。 akka-stream 也有同样的问题,而且更难解决,因为您无法访问队列,并且没有自然的 back-pressure 来保证队列不会因为 fast-readers 而为空。
2) 我不得不为输出引入另一个队列(并将其转换为 List
),因为工作队列在计算结束时变空了。
所以,这两个库都不太适应这种要求(有限流),但是 scalaz-stream(在删除 scalaz 依赖后将变成 "fs2")足够灵活来实现你的想法.最大的 "but" 是默认情况下它会 运行 顺序。有(至少)两种方法可以让它更快:
1) 将你的 doSomething 分成多个阶段,例如 .flatMap(doSomething1).flatMap(doSomething2).map(doSomething3)
,然后在它们之间放置另一个队列(如果阶段花费的时间相同,速度大约快 3 倍)。
2) 并行队列处理。 Akka 有 mapAsync
- 它可以自动并行执行 map
s。 Scalaz-stream 有块 - 你可以将你的 q 分成多个块,比方说 5,然后并行处理块内的每个元素。无论如何,这两种解决方案(akka vs scalaz)都不太适合使用一个队列作为输入和输出。
但是,再一次,它太复杂而且毫无意义,因为有一个经典简单方式:
@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] =
if (l.isEmpty) acc else {
val processed = l.flatMap(doSomething)
calculate(processed, acc ++ processed)
}
scala> calculate(List(3,5,7), Nil)
res5: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
这是并行化的:
@tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] =
if (l.isEmpty) acc else {
val processed = l.par.flatMap(doSomething).toList
calculate(processed, acc ++ processed)
}
scala> calculate(List(3,5,7), Nil)
res6: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
所以,是的,我会说 scalaz-stream 和 akka-streams 都不符合您的要求;然而,经典的 Scala 并行集合非常适合。
如果您需要跨多个 JVM 的分布式计算 - 看看 Apache Spark,它的 scala-dsl 使用相同的 map/flatMap/fold 风格。它允许您处理不适合 JVM 内存的大型集合(通过跨 JVM 扩展它们),因此您可以通过使用 RDD 而不是 List
来改进 @tailrec def calculate
。它还将为您提供处理 doSomething
.
P.S。所以这就是为什么我不喜欢使用流媒体库来完成此类任务的原因。流式处理更像是来自某些外部系统(如 HttpRequests)的无限流,而不是预定义(甚至大)数据的计算。
P.S.2 如果您需要 reactive-like(不阻塞),您可以使用 Future
(或 scalaz.concurrent.Task
)+ Future.sequence