FS2 流 运行 直到 InputStream 结束
FS2 stream run till the end of InputStream
我是 FS2 的新手,需要一些设计方面的帮助。我正在尝试设计一个流,它将从底层 InputStream
中提取块,直到结束。这是我尝试过的:
import java.io.{File, FileInputStream, InputStream}
import cats.effect.IO
import cats.effect.IO._
object Fs2 {
def main(args: Array[String]): Unit = {
val is = new FileInputStream(new File("/tmp/my-file.mf"))
val stream = fs2.Stream.eval(read(is))
stream.compile.drain.unsafeRunSync()
}
def read(is: InputStream): IO[Array[Byte]] = IO {
val buf = new Array[Byte](4096)
is.read(buf)
println(new String(buf))
buf
}
}
并且程序打印唯一的第一个块。这是有道理的。但我想找到一种方法 "signal" 在哪里停止阅读,在哪里不停止。我的意思是一直调用 read(is)
直到结束。有办法实现吗?
我也试过 repeatEval(read(is))
但它一直在读...我需要介于两者之间的东西。
使用fs2.io.readInputStream
or fs2.io.readInputStreamAsync
。前者阻塞当前线程;后者阻塞了 ExecutionContext
中的一个线程。例如:
val is: InputStream = new FileInputStream(new File("/tmp/my-file.mf"))
val stream = fs2.io.readInputStreamAsync(IO(is), 128)
我是 FS2 的新手,需要一些设计方面的帮助。我正在尝试设计一个流,它将从底层 InputStream
中提取块,直到结束。这是我尝试过的:
import java.io.{File, FileInputStream, InputStream}
import cats.effect.IO
import cats.effect.IO._
object Fs2 {
def main(args: Array[String]): Unit = {
val is = new FileInputStream(new File("/tmp/my-file.mf"))
val stream = fs2.Stream.eval(read(is))
stream.compile.drain.unsafeRunSync()
}
def read(is: InputStream): IO[Array[Byte]] = IO {
val buf = new Array[Byte](4096)
is.read(buf)
println(new String(buf))
buf
}
}
并且程序打印唯一的第一个块。这是有道理的。但我想找到一种方法 "signal" 在哪里停止阅读,在哪里不停止。我的意思是一直调用 read(is)
直到结束。有办法实现吗?
我也试过 repeatEval(read(is))
但它一直在读...我需要介于两者之间的东西。
使用fs2.io.readInputStream
or fs2.io.readInputStreamAsync
。前者阻塞当前线程;后者阻塞了 ExecutionContext
中的一个线程。例如:
val is: InputStream = new FileInputStream(new File("/tmp/my-file.mf"))
val stream = fs2.io.readInputStreamAsync(IO(is), 128)