如何在 Slick 中将 `FixedSqlAction` 转换为 `StreamingDBIO`?

How do you transform a `FixedSqlAction` into a `StreamingDBIO` in Slick?

我正在使用 Alpakka 和 Slick 模块创建 akka-stream,但遇到类型不匹配问题。

一个分支机构正在获取他们table中的发票总数:

  def getTotal(implicit session: SlickSession) = {
    import session.profile.api._
    val query = TableQuery[Tables.Invoice].length.result
    Slick.source(query)
  }

但结尾行无法编译,因为 Alpakka 需要 StreamingDBIO 但我提供的是 FixedSqlAction[Int,slick.dbio.NoStream,slick.dbio.Effect.Read].

如何从非流媒体结果转移到流媒体结果?

这对你有用吗?

def getTotal() = {
  // Doc Expressions (Scalar values)
  // https://scala-slick.org/doc/3.2.0/queries.html
  val query = TableQuery[Tables.Invoice].length.result

  val res = Await.result(session.db.run(query), 60.seconds)
  println(s"Result: $res")
  res
}

获取 table 的长度会产生单个值,而不是流。因此,获取 Source 来提供流的最简单方法是

def getTotal(implicit session: SlickSession): Source[Int, NotUsed] =
  Source.lazyFuture { () =>
    // Don't actually run the query until the stream has materialized and
    //  demand has reached the source
    val query = TableQuery[Tables.Invoice].length.result
    session.db.run(query)
  }

Alpakka 的 Slick 连接器更倾向于流式传输(包括管理分页等)具有大量结果的查询结果。对于单个结果,将 vanilla Slick 给您的结果的 Future 转换为流就足够了。

如果您想在调用 getTotal 后立即开始执行查询(请注意,无论下游是否运行或需要来自源的数据),您可以

def getTotal(implicit session: SlickSession): Source[Int, NotUsed] = {
  val query = TableQuery[Tables.Invoice].length.result
  Source.future(session.db.run(query))
}