将 Doobie 流从数据库保存到文件

Save Doobie stream from database to file

杜比 select returns 一个 fs2.Stream(doobie.ConnectionIO, String)。如果我们需要将其写入文件,显而易见的选择是调用 stream.compile.toList.transact(transactor) 然后将此列表保存到文件。

有没有办法在不将结果转换为列表的情况下以流方式保存结果?

诀窍是将 cats.IO 操作转换为 doobie.ConnectionIOAsync[doobie.ConnectionIO].liftIO(IO(...))。这允许将文件操作与数据库操作很好地结合起来。这是将结果流式传输到文件的完整示例程序。

package com.example

import java.io.BufferedWriter

import better.files.File
import cats.effect._
import cats.implicits._
import doobie._
import doobie.implicits._
import fs2.Stream


object Example extends IOApp {
  override def run(args: List[String]): IO[ExitCode] = {
    val xa = Transactor.fromDriverManager[IO](
      "org.postgresql.Driver",     // driver classname
      "jdbc:postgresql:example_db",     // connect URL (driver-specific)
      "postgres",                  // user
      ""                          // password
    )

    val drop = sql"drop table if exists example".update.run
    val create =
      sql"create table if not exists example (id serial primary key, string_value text not null)".update.run
    val insert = Update[String]("insert into example (string_value) values (?)")
      .updateMany(List("one", "two", "three", "four", "five"))

    val setup = for {
      _ <- drop.transact(xa)
      _ <- create.transact(xa)
      _ <- insert.transact(xa)
    } yield ()

    val select: Stream[doobie.ConnectionIO, String] =
      sql"select string_value from example".query[String].stream
    val output = writeToFile(select).compile.drain.transact(xa)

    for {
      _ <- setup
      _ <- output
    } yield ExitCode.Success
  }

  private def writeToFile(result: Stream[doobie.ConnectionIO, String]): Stream[doobie.ConnectionIO, Unit] = {
    Stream.resource(writer("./example.txt")).flatMap { writer =>
      result.intersperse("\n").chunks.evalMap { chunk =>
        Async[doobie.ConnectionIO].liftIO(IO(
          chunk.foreach(writer.write)
        ))
      }
    }
  }

  private def writer(path: String): Resource[doobie.ConnectionIO, BufferedWriter] = {
    Resource.make {
      Async[doobie.ConnectionIO].liftIO(IO(
        File(path).newBufferedWriter
      ))
    } { outStream =>
      Async[doobie.ConnectionIO].liftIO(IO(
        outStream.close())
      )
    }
  }
}

我想这就是您要找的:

import cats.effect.IO
import doobie.implicits._
import doobie.util.transactor.Transactor
import fs2.text
import fs2.io.file.{Files, Path}

object Example {
  def queryToFile: IO[Unit] =
    sql"select string_value from example"
      .query[String]
      .stream
      .transact(xa)
      .through(text.utf8.encode[IO])
      .through(Files[IO].writeAll(Path("path-to-file")))
      .compile
      .drain
}