Fan in/fan out 并发与 Monix

Fan in/fan out concurrency with Monix

我正在尝试学习 Scala 并从中获得乐趣,但我 运行 陷入了这个经典问题。让我想起了很多NodeJS早期的嵌套回调地狱

这是我的伪代码程序:

  1. 获取 S3 存储桶列表的任务。
  2. 任务一完成后,我想以十个为一组对桶进行批处理。
  3. 每批次:
  4. 获取每个桶的区域。
  5. 过滤掉不在该区域的桶。
  6. 列出每个桶中的所有对象。
  7. 打印所有内容

有一次我得出以下类型:Task[Iterator[Task[List[Bucket]]]]

本质上:

外部任务是列出所有 S3 存储桶的初始步骤,然后内部 Iterator/Task/List 正在尝试批处理 return 列出的任务。

我希望有一些方法可以 remove/flatten 外部任务到达 Iterator[Task[List[Bucket]]]

当我尝试将我的处理分解为多个步骤时,深度嵌套导致我做了很多嵌套映射。这是正确的做法还是有更好的方法来处理这种嵌套?

在这种特殊情况下,我建议使用 FS2 和 Monix 作为 F:

import cats.implicits._
import monix.eval._, monix.execution._
import fs2._

// use your own types here
type BucketName = String
type BucketRegion = String
type S3Object = String

// use your own implementations as well
val fetchS3Buckets: Task[List[BucketName]] = Task(???)
val bucketRegion: BucketName => Task[BucketRegion] = _ => Task(???)
val listObject: BucketName => Task[List[S3Object]] = _ => Task(???)

Stream.evalSeq(fetchS3Buckets)
  .parEvalMap(10) { name =>
    // checking region, filtering and listing on batches of 10
    bucketRegion(name).flatMap {
      case "my-region" => listObject(name)
      case _           => Task.pure(List.empty)
    }
  }
  .foldMonoid // combines List[S3Object] together
  .compile.lastOrError // turns into Task with result
  .map(list => println(s"Result: $list"))
  .onErrorHandle { case error: Throwable => println(error) }
  .runToFuture // or however you handle it

FS2 下面使用 cats.effect.IO 或 Monix Task,或者任何你想要的,只要它提供 Cats Effect type 类。它构建了一个很好的、功能性的 DSL 来设计数据流,所以你可以在没有 Akka Streams 的情况下使用反应流。

这里有一个小问题,我们一次打印所有结果,如果结果多于内存可以处理,这可能是个坏主意 - 我们可以分批打印(不确定如果这是你想要的或不想要的)或进行过滤和打印单独的批次。

Stream.evalSeq(fetchS3Buckets)
  .parEvalMap(10) { name =>
    bucketRegion(name).map(name -> _)
  }
  .collect { case (name, "my-region") => name }
  .parEvalMap(10) { name =>
    listObject(name).map(list => println(s"Result: $list"))
  }
  .compile
  .drain

虽然其中 none 在裸 Monix 中是不可能的,但 FS2 使此类操作更易于编写和维护,因此您应该能够更轻松地实现您的流程。