scalaz-stream 的 inflate 使用示例

Usage example of scalaz-stream's inflate

在以下 scalaz-stream 的用法示例中(取自 documentation), what do I need to change if the input and/or output is a gzipped file? In other words, how do I use compress?

import scalaz.stream._
import scalaz.concurrent.Task

val converter: Task[Unit] =
  io.linesR("testdata/fahrenheit.txt")
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble).toString)
    .intersperse("\n")
    .pipe(text.utf8Encode)
    .to(io.fileChunkW("testdata/celsius.txt"))
    .run

// at the end of the universe...
val u: Unit = converter.run

压缩输出很容易。由于 compress.deflate() 是一个 Process1[ByteVector, ByteVector],因此您需要将其插入您发出 ByteVectors 的管道中(紧接在 text.utf8Encode 之后,即 Process1[String, ByteVector]) :

val converter: Task[Unit] =
  io.linesR("testdata/fahrenheit.txt")
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble).toString)
    .intersperse("\n")
    .pipe(text.utf8Encode)
    .pipe(compress.deflate())
    .to(io.fileChunkW("testdata/celsius.zip"))
    .run

对于inflate您不能使用io.linesR读取压缩文件。您需要一个生成 ByteVector 而不是 String 的进程,以便将它们通过管道传输到 inflate。 (您可以为此使用 io.fileChunkR。)下一步是将未压缩的数据解码为 Strings(例如 text.utf8Decode),然后使用 text.lines() 发出一行一行的文字。这样的事情应该可以解决问题:

val converter: Task[Unit] =
  Process.constant(4096).toSource
    .through(io.fileChunkR("testdata/fahrenheit.zip"))
    .pipe(compress.inflate())
    .pipe(text.utf8Decode)
    .pipe(text.lines())
    .filter(s => !s.trim.isEmpty && !s.startsWith("//"))
    .map(line => fahrenheitToCelsius(line.toDouble).toString)
    .intersperse("\n")
    .pipe(text.utf8Encode)
    .to(io.fileChunkW("testdata/celsius.txt"))
    .run