如何配置 Alpakka Slick 以启用从 Java 中的数据库流式传输?

How can I configure Alpakka Slick to enable streaming from my database in Java?

我正在尝试使用 Akka Streams 和 Postgres 在 Java 应用程序中使用 Slick 流式传输查询结果:

Source mySource = Slick.source(
  slickSession,
  "SELECT * from message where started_instant is null",
  (this::createQueueItemFromSlickRow));

Slick documentation it is clear that in order to stream with Postgres, additional parameters need to be set. However, I can only find Scala examples, and there is no information in the Alpakka documentation关于如何配置这个。

实际上,我无法处理流开始后进入数据库的任何新对象。

如您所述,Slick documentation 注释如下:

Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n) (with the desired page size n) and .transactionally for proper streaming.

Alpakka's Slick support recently exposed the PreparedStatement in its Java DSL1,如果你不能使用Scala API,你可以用它来设置上述参数。例如:

PreparedStatement statement =
  connection.prepareStatement(
    "MY SQL STATEMENT",
    ResultSet.TYPE_FORWARD_ONLY, // java.sql.ResultSet
    ResultSet.CONCUR_READ_ONLY);

查看 SlickTest.java class 和上面的拉取请求以获取更多详细信息。

1此更改是即将发布的 Alpakka 2.0.2 版本的一部分。 (2.0.1 版是撰写本文时的最新版本。)