Fan in/fan out 并发与 Monix
Fan in/fan out concurrency with Monix
我正在尝试学习 Scala 并从中获得乐趣,但我 运行 陷入了这个经典问题。让我想起了很多NodeJS早期的嵌套回调地狱
这是我的伪代码程序:
- 获取 S3 存储桶列表的任务。
- 任务一完成后,我想以十个为一组对桶进行批处理。
- 每批次:
- 获取每个桶的区域。
- 过滤掉不在该区域的桶。
- 列出每个桶中的所有对象。
- 打印所有内容
有一次我得出以下类型:Task[Iterator[Task[List[Bucket]]]]
本质上:
外部任务是列出所有 S3 存储桶的初始步骤,然后内部 Iterator/Task/List 正在尝试批处理 return 列出的任务。
我希望有一些方法可以 remove/flatten 外部任务到达 Iterator[Task[List[Bucket]]]
。
当我尝试将我的处理分解为多个步骤时,深度嵌套导致我做了很多嵌套映射。这是正确的做法还是有更好的方法来处理这种嵌套?
在这种特殊情况下,我建议使用 FS2 和 Monix 作为 F:
import cats.implicits._
import monix.eval._, monix.execution._
import fs2._
// use your own types here
type BucketName = String
type BucketRegion = String
type S3Object = String
// use your own implementations as well
val fetchS3Buckets: Task[List[BucketName]] = Task(???)
val bucketRegion: BucketName => Task[BucketRegion] = _ => Task(???)
val listObject: BucketName => Task[List[S3Object]] = _ => Task(???)
Stream.evalSeq(fetchS3Buckets)
.parEvalMap(10) { name =>
// checking region, filtering and listing on batches of 10
bucketRegion(name).flatMap {
case "my-region" => listObject(name)
case _ => Task.pure(List.empty)
}
}
.foldMonoid // combines List[S3Object] together
.compile.lastOrError // turns into Task with result
.map(list => println(s"Result: $list"))
.onErrorHandle { case error: Throwable => println(error) }
.runToFuture // or however you handle it
FS2 下面使用 cats.effect.IO 或 Monix Task,或者任何你想要的,只要它提供 Cats Effect type 类。它构建了一个很好的、功能性的 DSL 来设计数据流,所以你可以在没有 Akka Streams 的情况下使用反应流。
这里有一个小问题,我们一次打印所有结果,如果结果多于内存可以处理,这可能是个坏主意 - 我们可以分批打印(不确定如果这是你想要的或不想要的)或进行过滤和打印单独的批次。
Stream.evalSeq(fetchS3Buckets)
.parEvalMap(10) { name =>
bucketRegion(name).map(name -> _)
}
.collect { case (name, "my-region") => name }
.parEvalMap(10) { name =>
listObject(name).map(list => println(s"Result: $list"))
}
.compile
.drain
虽然其中 none 在裸 Monix 中是不可能的,但 FS2 使此类操作更易于编写和维护,因此您应该能够更轻松地实现您的流程。
我正在尝试学习 Scala 并从中获得乐趣,但我 运行 陷入了这个经典问题。让我想起了很多NodeJS早期的嵌套回调地狱
这是我的伪代码程序:
- 获取 S3 存储桶列表的任务。
- 任务一完成后,我想以十个为一组对桶进行批处理。
- 每批次:
- 获取每个桶的区域。
- 过滤掉不在该区域的桶。
- 列出每个桶中的所有对象。
- 打印所有内容
有一次我得出以下类型:Task[Iterator[Task[List[Bucket]]]]
本质上:
外部任务是列出所有 S3 存储桶的初始步骤,然后内部 Iterator/Task/List 正在尝试批处理 return 列出的任务。
我希望有一些方法可以 remove/flatten 外部任务到达 Iterator[Task[List[Bucket]]]
。
当我尝试将我的处理分解为多个步骤时,深度嵌套导致我做了很多嵌套映射。这是正确的做法还是有更好的方法来处理这种嵌套?
在这种特殊情况下,我建议使用 FS2 和 Monix 作为 F:
import cats.implicits._
import monix.eval._, monix.execution._
import fs2._
// use your own types here
type BucketName = String
type BucketRegion = String
type S3Object = String
// use your own implementations as well
val fetchS3Buckets: Task[List[BucketName]] = Task(???)
val bucketRegion: BucketName => Task[BucketRegion] = _ => Task(???)
val listObject: BucketName => Task[List[S3Object]] = _ => Task(???)
Stream.evalSeq(fetchS3Buckets)
.parEvalMap(10) { name =>
// checking region, filtering and listing on batches of 10
bucketRegion(name).flatMap {
case "my-region" => listObject(name)
case _ => Task.pure(List.empty)
}
}
.foldMonoid // combines List[S3Object] together
.compile.lastOrError // turns into Task with result
.map(list => println(s"Result: $list"))
.onErrorHandle { case error: Throwable => println(error) }
.runToFuture // or however you handle it
FS2 下面使用 cats.effect.IO 或 Monix Task,或者任何你想要的,只要它提供 Cats Effect type 类。它构建了一个很好的、功能性的 DSL 来设计数据流,所以你可以在没有 Akka Streams 的情况下使用反应流。
这里有一个小问题,我们一次打印所有结果,如果结果多于内存可以处理,这可能是个坏主意 - 我们可以分批打印(不确定如果这是你想要的或不想要的)或进行过滤和打印单独的批次。
Stream.evalSeq(fetchS3Buckets)
.parEvalMap(10) { name =>
bucketRegion(name).map(name -> _)
}
.collect { case (name, "my-region") => name }
.parEvalMap(10) { name =>
listObject(name).map(list => println(s"Result: $list"))
}
.compile
.drain
虽然其中 none 在裸 Monix 中是不可能的,但 FS2 使此类操作更易于编写和维护,因此您应该能够更轻松地实现您的流程。