使用 Slick (JDBC) Connector for Alpakka 时使用 Paging SQL Statement 是否有意义
Does it make sense to use Paging SQL Statement when using Slick (JDBC) Connector for Alpakka
我目前想知道 Alpakka 的 Slick (JDBC) 连接器是如何在后台工作的 - 我无法使用文档真正找到答案。
考虑一个用例,我想处理从数据库中选择的大量记录。我可以在单个流中简单地使用 SELECT * FROM [TABLE]
吗,或者像 SELECT * FROM [TABLE] LIMIT 0,1000
.
一样为每个页面(一个接一个)启动多个流是否有意义?
我希望/认为 Slick Connector Alpakka 的响应方式只在流需要时才从数据库中获取记录,这样我就可以使用 SELECT * FROM [TABLE]
...
任何人都可以给我一些见解或一些好的文档来通读吗?
考虑 Alpakka 的 Slick.source
方法的源代码:
/**
* Scala API: creates a Source[T, NotUsed] that performs the
* specified query against the (implicitly) specified
* Slick database and streams the results.
* This works for both "typed" Slick queries
* and "plain SQL" queries.
*
* @param streamingQuery The Slick query to execute, which can
* be either a "typed" query or a "plain SQL"
* query produced by one of the Slick "sql..."
* String interpolators
* @param session The database session to use.
*/
def source[T](
streamingQuery: StreamingDBIO[Seq[T], T]
)(implicit session: SlickSession): Source[T, NotUsed] =
Source.fromPublisher(session.db.stream(streamingQuery))
session.db.stream(streamingQuery))
上面的结果是 DatabasePublisher
,它是一个 Reactive Streams Publisher
,它被传递给 Akka Stream 的 Source.fromPublisher
。不要担心为数据子集创建多个流;您可以安全地使用检索 table 中所有行的查询,并将结果 Source
作为单个流处理。
需要注意的一件事是,您可能需要配置一些未在 Alpakka 文档中提及但在 Slick documentation:
中提及的设置
Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n)
(with the desired page size n
) and .transactionally
for proper streaming.
因此,例如,如果您使用的是 PostgreSQL,那么您的 Source
可能如下所示:
val source =
Slick.source(
TableQuery[Items]
.result
.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = 10
)
.transactionally)
TableQuery[Items].result
returns table 中与 Items
关联的所有行。
尽管有文档,我已经成功地将 Slick 的 DatabasePublisher
与 Akka Streams 结合使用,从 PostgreSQL 中的 table 检索和更新数百万行,而无需设置 withStatementParameters
或 transactionally
。在没有这些设置的情况下尝试:
val source = Slick.source(TableQuery[Items].result)
我目前想知道 Alpakka 的 Slick (JDBC) 连接器是如何在后台工作的 - 我无法使用文档真正找到答案。
考虑一个用例,我想处理从数据库中选择的大量记录。我可以在单个流中简单地使用 SELECT * FROM [TABLE]
吗,或者像 SELECT * FROM [TABLE] LIMIT 0,1000
.
我希望/认为 Slick Connector Alpakka 的响应方式只在流需要时才从数据库中获取记录,这样我就可以使用 SELECT * FROM [TABLE]
...
任何人都可以给我一些见解或一些好的文档来通读吗?
考虑 Alpakka 的 Slick.source
方法的源代码:
/**
* Scala API: creates a Source[T, NotUsed] that performs the
* specified query against the (implicitly) specified
* Slick database and streams the results.
* This works for both "typed" Slick queries
* and "plain SQL" queries.
*
* @param streamingQuery The Slick query to execute, which can
* be either a "typed" query or a "plain SQL"
* query produced by one of the Slick "sql..."
* String interpolators
* @param session The database session to use.
*/
def source[T](
streamingQuery: StreamingDBIO[Seq[T], T]
)(implicit session: SlickSession): Source[T, NotUsed] =
Source.fromPublisher(session.db.stream(streamingQuery))
session.db.stream(streamingQuery))
上面的结果是 DatabasePublisher
,它是一个 Reactive Streams Publisher
,它被传递给 Akka Stream 的 Source.fromPublisher
。不要担心为数据子集创建多个流;您可以安全地使用检索 table 中所有行的查询,并将结果 Source
作为单个流处理。
需要注意的一件事是,您可能需要配置一些未在 Alpakka 文档中提及但在 Slick documentation:
中提及的设置Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both
.withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n)
(with the desired page sizen
) and.transactionally
for proper streaming.
因此,例如,如果您使用的是 PostgreSQL,那么您的 Source
可能如下所示:
val source =
Slick.source(
TableQuery[Items]
.result
.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = 10
)
.transactionally)
TableQuery[Items].result
returns table 中与 Items
关联的所有行。
尽管有文档,我已经成功地将 Slick 的 DatabasePublisher
与 Akka Streams 结合使用,从 PostgreSQL 中的 table 检索和更新数百万行,而无需设置 withStatementParameters
或 transactionally
。在没有这些设置的情况下尝试:
val source = Slick.source(TableQuery[Items].result)