Fs2 如何折叠然后附加到流
Fs2 how to fold then append to a Stream
我想附加到 Stream
但下一个流依赖于前一个Stream
的折叠结果
我是这样做的,但是 Stream s
被评估了两次
import fs2._
def ints(start: Int) = Stream.iterate(start) { i =>
println(i)
i + 1
}.take(10)
val s = ints(0)
def foldAppend(init: Int)(f: (Int, Int) => Int)(next: Int => Stream[Pure, Int]) = {
s ++ s.fold(init)(f).flatMap(next)
}
val res = foldAppend(0)((s, i) => s + 1)(ints)
println(res.toList)
如何实现只计算一次 s
的 foldAppend
方法。
您是否考虑过使用 scala.collection.immutable.Stream
?它已被缓存,因此不会被多次计算。
Brian 的回答是错误的,s
实际上是懒惰的,所以整个流被评估了两次。绑定到 s
的变量是严格的,但 fs2 中的 Stream
是一个惰性流,它只在你 run
它时被评估。
你的主要问题是 Pure
不是像 IO
那样安全实现副作用的 monad。你不应该纯粹 println
。一个有效的例子是:
import cats.effect.IO
import fs2._
def ints(start: Int) = Stream.iterate(start) { i => println(i)
i + 1
}.take(10)
val s = ints(0)
def foldAppend(init: Int)(f: (Int, Int) => Int)(next: Int => Stream[IO, Int]) = {
val result = s.covary[IO].runLog
Stream.eval(result).covary[IO].flatMap {
s =>
Stream.emits(s) ++ Stream.emits(s).fold(init)(f).flatMap(next)
}
}
val res = foldAppend(0)((s, i) => s + 1)(ints)
println(res.runLast.unsafeRunSync())
这将评估流一次
终于用 Pull
完成了工作
implicit class StreamSyntax[F[_], A](s: Stream[F, A]) {
def foldAppend[S](init: S)(f: (S, A) => S)(next: S => Stream[F, A]): Stream[F, A] = {
def pullAll(s: Stream[F, A]): Pull[F, A, Option[(Chunk[A], Stream[F, A])]] = {
s.pull.unconsChunk.flatMap {
case Some((hd, tl)) =>
Pull.output(hd) *> pullAll(tl)
case None =>
Pull.pure(None)
}
}
def foldChunks(i: S, s: Stream[F, A]): Pull[F, A, Option[(Chunk[A], Stream[F, A])]] = {
s.pull.unconsChunk.flatMap {
case Some((hd, tl)) =>
val sum: S = hd.toVector.foldLeft(i)(f)
Pull.output(hd) *> foldChunks(sum, tl)
case None =>
pullAll(next(i))
}
}
foldChunks(init, s).stream
}
}
我想附加到 Stream
但下一个流依赖于前一个Stream
的折叠结果
我是这样做的,但是 Stream s
被评估了两次
import fs2._
def ints(start: Int) = Stream.iterate(start) { i =>
println(i)
i + 1
}.take(10)
val s = ints(0)
def foldAppend(init: Int)(f: (Int, Int) => Int)(next: Int => Stream[Pure, Int]) = {
s ++ s.fold(init)(f).flatMap(next)
}
val res = foldAppend(0)((s, i) => s + 1)(ints)
println(res.toList)
如何实现只计算一次 s
的 foldAppend
方法。
您是否考虑过使用 scala.collection.immutable.Stream
?它已被缓存,因此不会被多次计算。
Brian 的回答是错误的,s
实际上是懒惰的,所以整个流被评估了两次。绑定到 s
的变量是严格的,但 fs2 中的 Stream
是一个惰性流,它只在你 run
它时被评估。
你的主要问题是 Pure
不是像 IO
那样安全实现副作用的 monad。你不应该纯粹 println
。一个有效的例子是:
import cats.effect.IO
import fs2._
def ints(start: Int) = Stream.iterate(start) { i => println(i)
i + 1
}.take(10)
val s = ints(0)
def foldAppend(init: Int)(f: (Int, Int) => Int)(next: Int => Stream[IO, Int]) = {
val result = s.covary[IO].runLog
Stream.eval(result).covary[IO].flatMap {
s =>
Stream.emits(s) ++ Stream.emits(s).fold(init)(f).flatMap(next)
}
}
val res = foldAppend(0)((s, i) => s + 1)(ints)
println(res.runLast.unsafeRunSync())
这将评估流一次
终于用 Pull
implicit class StreamSyntax[F[_], A](s: Stream[F, A]) {
def foldAppend[S](init: S)(f: (S, A) => S)(next: S => Stream[F, A]): Stream[F, A] = {
def pullAll(s: Stream[F, A]): Pull[F, A, Option[(Chunk[A], Stream[F, A])]] = {
s.pull.unconsChunk.flatMap {
case Some((hd, tl)) =>
Pull.output(hd) *> pullAll(tl)
case None =>
Pull.pure(None)
}
}
def foldChunks(i: S, s: Stream[F, A]): Pull[F, A, Option[(Chunk[A], Stream[F, A])]] = {
s.pull.unconsChunk.flatMap {
case Some((hd, tl)) =>
val sum: S = hd.toVector.foldLeft(i)(f)
Pull.output(hd) *> foldChunks(sum, tl)
case None =>
pullAll(next(i))
}
}
foldChunks(init, s).stream
}
}