同步处理迭代器的机会性、部分和异步预处理

Opportunistic, partially and asyncronously pre-processing of a syncronously processing iterator

让我们使用 Scala。

我正在尝试找到最好的方法来对迭代器的一些元素进行机会性的、部分的和异步的预计算,否则这些元素将被同步处理。

下图说明了这个问题。

有一个带迭代器和状态的引导线程 (blue)。状态包含必须防止并发访问的可变数据。此外,在迭代器从头开始、按顺序、按顺序处理时,必须更新状态,因为迭代器的元素依赖于前面的元素。而且,依赖的性质是事先不知道的。

与其他元素相比,处理某些元素可能会导致大量开销(2 个数量级),这意味着 一些 元素的计算时间为 1 毫秒,而 一些 元素需要 300 毫秒来计算。如果我可以推测性地预处理下一个 k 元素,这将导致 运行 时间的显着改进。可以在异步线程上进行推测性预处理(而 blue 线程是同步处理),但预处理后的数据必须由 blue[=71] 验证=]线程,预计算的结果当时是否有效。 通常(90%的时间),它应该是有效的。因此,推测性地启动单独的异步线程来预处理迭代器的剩余部分将在 运行 时间内花费 300 毫秒。

我研究了 Scala 的异步库和函数库的比较,以便更好地理解哪种计算模型,或者换句话说,哪种计算描述(哪个库)可能更适合这个处理问题。我在思考通信模式并产生了以下想法:

  1. 阿卡

为采用迭代器的 blue 线程使用 AKKA actor Blue,并且对于每个步骤,它都会向自己发送一条 Step 消息。在 Step 消息上,在开始处理下一个 i 元素之前,它会向 [=10] 之一发送带有第 i+k 元素的 PleasePreprocess(i+k) 消息=] 预处理器演员到位。 Blue 只会 Stepi+1,并且只有当收到 PreprocessingKindlyDone(i+1) 时。

  1. AKKA 流

AFAIK AKKA 流也支持之前的双向背压机制,因此,它可能是一个很好的候选者,可以在不实际使用演员的情况下实现演员所做的事情。

  1. Scala 期货

blue 线程处理 iterator.map(processElement(_)) 中的元素 ˙processElement(e)˙ 时,它还会产生 Futures 进行预处理。但是,如我所见,维护这些预处理 Future 并等待它们的状态需要在纯 Scala 中进行半阻塞实现,因此据我所知,我不会朝这个方向发展。

  1. 使用 Monix

我对 Monix 有一些了解,但无法理解如何使用 Monix 优雅地解决这个问题。我没有看到 blue 线程如何等待 i+1 的结果然后继续。为此,我正在考虑使用类似滑动 window 和 foldLeft(blueThreadAsZero){ (blue, preProc1, preProc2, notYetPreProc) => ... } 的东西,但找不到类似的结构。

可能有一些我没有提到的库可以更好地表达这方面的计算模式。

我希望我已经充分描述了我的问题。感谢您提供 hints/ideas 或代码片段!

我会将处理分为两个步骤,pre-processing 可以是并行的 运行,而依赖的步骤必须是串行的。
然后,您可以从迭代器创建一个数据流,做一个应用预处理步骤的并行映射,并以 fold

结束

我个人会使用 fs2,但同样的方法可以用任何流媒体解决方案来表达,比如 AkkaStreamsMonix ObservablesZIO ZStreams

import fs2.Stream
import cats.effect.IO

val finalState = 
  Stream
    .fromIterator[IO](iterator = ???, chunkSize = ???)
    .parEvalMap(elem => IO(preProcess(elem))
    .compile
    .fold(initialState) {
      case (acc, elem) =>
        computeNewState(acc, elem)
    }

PS: Remember to benchmark to make sure parallelism is actually speeding things up; it may not be worth the hassle.

如果你的蓝线恰好比黄线快,你无论如何都需要阻塞。我不认为你需要任何花哨的库,“vanilla scala”应该这样做(就像它在大多数情况下实际做的那样)。像这样的东西,也许...

def doit[T,R](it: Iterator[T], yellow: T => R, blue: R => R): Future[Seq[R]] = it
    .map { elem => Future(yellow(elem)) }
    .foldLeft(Future.successful(List.empty[R])) { (last, next) => 
       last.flatMap { acc => next.map(blue).map(_ :: acc) }
    }.map(_.reverse)
       

我没有测试或编译它,所以它可能需要一些调整,但从概念上讲,这应该可行:通过迭代器并立即开始预处理,然后折叠以在每个完成的预处理中加入“验证”按顺序。