将 Doobie 流从数据库保存到文件
Save Doobie stream from database to file
杜比 select returns 一个 fs2.Stream(doobie.ConnectionIO, String)
。如果我们需要将其写入文件,显而易见的选择是调用 stream.compile.toList.transact(transactor)
然后将此列表保存到文件。
有没有办法在不将结果转换为列表的情况下以流方式保存结果?
诀窍是将 cats.IO
操作转换为 doobie.ConnectionIO
和 Async[doobie.ConnectionIO].liftIO(IO(...))
。这允许将文件操作与数据库操作很好地结合起来。这是将结果流式传输到文件的完整示例程序。
package com.example
import java.io.BufferedWriter
import better.files.File
import cats.effect._
import cats.implicits._
import doobie._
import doobie.implicits._
import fs2.Stream
object Example extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val xa = Transactor.fromDriverManager[IO](
"org.postgresql.Driver", // driver classname
"jdbc:postgresql:example_db", // connect URL (driver-specific)
"postgres", // user
"" // password
)
val drop = sql"drop table if exists example".update.run
val create =
sql"create table if not exists example (id serial primary key, string_value text not null)".update.run
val insert = Update[String]("insert into example (string_value) values (?)")
.updateMany(List("one", "two", "three", "four", "five"))
val setup = for {
_ <- drop.transact(xa)
_ <- create.transact(xa)
_ <- insert.transact(xa)
} yield ()
val select: Stream[doobie.ConnectionIO, String] =
sql"select string_value from example".query[String].stream
val output = writeToFile(select).compile.drain.transact(xa)
for {
_ <- setup
_ <- output
} yield ExitCode.Success
}
private def writeToFile(result: Stream[doobie.ConnectionIO, String]): Stream[doobie.ConnectionIO, Unit] = {
Stream.resource(writer("./example.txt")).flatMap { writer =>
result.intersperse("\n").chunks.evalMap { chunk =>
Async[doobie.ConnectionIO].liftIO(IO(
chunk.foreach(writer.write)
))
}
}
}
private def writer(path: String): Resource[doobie.ConnectionIO, BufferedWriter] = {
Resource.make {
Async[doobie.ConnectionIO].liftIO(IO(
File(path).newBufferedWriter
))
} { outStream =>
Async[doobie.ConnectionIO].liftIO(IO(
outStream.close())
)
}
}
}
我想这就是您要找的:
import cats.effect.IO
import doobie.implicits._
import doobie.util.transactor.Transactor
import fs2.text
import fs2.io.file.{Files, Path}
object Example {
def queryToFile: IO[Unit] =
sql"select string_value from example"
.query[String]
.stream
.transact(xa)
.through(text.utf8.encode[IO])
.through(Files[IO].writeAll(Path("path-to-file")))
.compile
.drain
}
杜比 select returns 一个 fs2.Stream(doobie.ConnectionIO, String)
。如果我们需要将其写入文件,显而易见的选择是调用 stream.compile.toList.transact(transactor)
然后将此列表保存到文件。
有没有办法在不将结果转换为列表的情况下以流方式保存结果?
诀窍是将 cats.IO
操作转换为 doobie.ConnectionIO
和 Async[doobie.ConnectionIO].liftIO(IO(...))
。这允许将文件操作与数据库操作很好地结合起来。这是将结果流式传输到文件的完整示例程序。
package com.example
import java.io.BufferedWriter
import better.files.File
import cats.effect._
import cats.implicits._
import doobie._
import doobie.implicits._
import fs2.Stream
object Example extends IOApp {
override def run(args: List[String]): IO[ExitCode] = {
val xa = Transactor.fromDriverManager[IO](
"org.postgresql.Driver", // driver classname
"jdbc:postgresql:example_db", // connect URL (driver-specific)
"postgres", // user
"" // password
)
val drop = sql"drop table if exists example".update.run
val create =
sql"create table if not exists example (id serial primary key, string_value text not null)".update.run
val insert = Update[String]("insert into example (string_value) values (?)")
.updateMany(List("one", "two", "three", "four", "five"))
val setup = for {
_ <- drop.transact(xa)
_ <- create.transact(xa)
_ <- insert.transact(xa)
} yield ()
val select: Stream[doobie.ConnectionIO, String] =
sql"select string_value from example".query[String].stream
val output = writeToFile(select).compile.drain.transact(xa)
for {
_ <- setup
_ <- output
} yield ExitCode.Success
}
private def writeToFile(result: Stream[doobie.ConnectionIO, String]): Stream[doobie.ConnectionIO, Unit] = {
Stream.resource(writer("./example.txt")).flatMap { writer =>
result.intersperse("\n").chunks.evalMap { chunk =>
Async[doobie.ConnectionIO].liftIO(IO(
chunk.foreach(writer.write)
))
}
}
}
private def writer(path: String): Resource[doobie.ConnectionIO, BufferedWriter] = {
Resource.make {
Async[doobie.ConnectionIO].liftIO(IO(
File(path).newBufferedWriter
))
} { outStream =>
Async[doobie.ConnectionIO].liftIO(IO(
outStream.close())
)
}
}
}
我想这就是您要找的:
import cats.effect.IO
import doobie.implicits._
import doobie.util.transactor.Transactor
import fs2.text
import fs2.io.file.{Files, Path}
object Example {
def queryToFile: IO[Unit] =
sql"select string_value from example"
.query[String]
.stream
.transact(xa)
.through(text.utf8.encode[IO])
.through(Files[IO].writeAll(Path("path-to-file")))
.compile
.drain
}