为 fs2.Stream 的所有元素同时安排计算
Schedule computation concurrently for all elements of the fs2.Stream
我有一个 fs2.Stream
由一些元素(可能是无限的)组成,我想为流的所有元素同时安排一些计算。这是我试过的
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
val stream = for {
id <- fs2.Stream.emits(List(1, 2)).covary[IO]
_ <- fs2.Stream.awakeEvery[IO](1.second)
_ <- fs2.Stream.eval(IO(println(id)))
} yield ()
stream.compile.drain.unsafeRunSync()
程序输出看起来像
1
1
1
etc...
这不是预期的。我想为原始流的所有元素交错安排计算,但不要等到第一个流终止(由于无限调度而永远不会发生)。
根据@KrzysztofAtłasik 和@LuisMiguelMejíaSuárez 给出的提示,这是我刚刚想出的解决方案:
val originalStream = fs2.Stream.emits(List(1, 2))
val scheduledComputation = originalStream.covary[IO].map({ id =>
fs2.Stream.awakeEvery[IO](1.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten
@KrzysztofAtłasik 在评论中提出的交错解决方案
id <- fs2.Stream.emits(List(1, 2)).covary[IO]
和 _ <- fs2.Stream.awakeEvery[IO](1.second)
也可以,但它不允许以自己的方式安排每个元素。
要同时安排元素 elementValue
秒,可以执行以下操作:
val scheduleEachElementIndividually = originalStream.covary[IO].map({ id =>
//id.seconds
fs2.Stream.awakeEvery[IO](id.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten
val str = for {
id <- Stream.emits(List(1, 5, 7)).covary[IO]
res = timer.sleep(id.second) >> IO(println(id))
} yield res
val stream = str.parEvalMapUnordered(5)(identity)
stream.compile.drain.unsafeRunSync()
或
val stream = Stream.emits(List(1, 5, 7))
.map { id =>
Stream.eval(timer.sleep(id.second) >> IO(println(id))) }
.parJoinUnbounded
stream.compile.drain.unsafeRunSync()
我有一个 fs2.Stream
由一些元素(可能是无限的)组成,我想为流的所有元素同时安排一些计算。这是我试过的
implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)
val stream = for {
id <- fs2.Stream.emits(List(1, 2)).covary[IO]
_ <- fs2.Stream.awakeEvery[IO](1.second)
_ <- fs2.Stream.eval(IO(println(id)))
} yield ()
stream.compile.drain.unsafeRunSync()
程序输出看起来像
1
1
1
etc...
这不是预期的。我想为原始流的所有元素交错安排计算,但不要等到第一个流终止(由于无限调度而永远不会发生)。
根据@KrzysztofAtłasik 和@LuisMiguelMejíaSuárez 给出的提示,这是我刚刚想出的解决方案:
val originalStream = fs2.Stream.emits(List(1, 2))
val scheduledComputation = originalStream.covary[IO].map({ id =>
fs2.Stream.awakeEvery[IO](1.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten
@KrzysztofAtłasik 在评论中提出的交错解决方案
id <- fs2.Stream.emits(List(1, 2)).covary[IO]
和 _ <- fs2.Stream.awakeEvery[IO](1.second)
也可以,但它不允许以自己的方式安排每个元素。
要同时安排元素 elementValue
秒,可以执行以下操作:
val scheduleEachElementIndividually = originalStream.covary[IO].map({ id =>
//id.seconds
fs2.Stream.awakeEvery[IO](id.second).evalMap(_ => IO.delay(println(id)))
}).fold(fs2.Stream.empty.covaryAll[IO, Unit])((result, stream) => result.merge(stream)).flatten
val str = for {
id <- Stream.emits(List(1, 5, 7)).covary[IO]
res = timer.sleep(id.second) >> IO(println(id))
} yield res
val stream = str.parEvalMapUnordered(5)(identity)
stream.compile.drain.unsafeRunSync()
或
val stream = Stream.emits(List(1, 5, 7))
.map { id =>
Stream.eval(timer.sleep(id.second) >> IO(println(id))) }
.parJoinUnbounded
stream.compile.drain.unsafeRunSync()