使用 Slick 处理大 table 失败并出现 OutOfMemoryError

Processing a big table with Slick fails with OutOfMemoryError

我正在使用 Akka Streams 和 Slick 查询一个大的 MySQL table,但是它失败了 OutOfMemoryError。似乎 Slick 正在将所有结果加载到内存中(如果查询仅限于几行,它不会失败)。为什么会这样,有什么解决办法吗?

val dbUrl = "jdbc:mysql://..."

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.slick.scaladsl.SlickSession
import akka.stream.alpakka.slick.scaladsl.Slick
import akka.stream.scaladsl.Source
import akka.stream.{ActorMaterializer, Materializer}
import com.typesafe.config.ConfigFactory
import slick.jdbc.GetResult

import scala.concurrent.Await
import scala.concurrent.duration.Duration

val slickDbConfig = s"""
 |profile = "slick.jdbc.MySQLProfile$$"
 |db {
 |  dataSourceClass = "slick.jdbc.DriverDataSource"
 |  properties = {
 |    driver = "com.mysql.jdbc.Driver",
 |    url = "$dbUrl"
 |  }
 |}
 |""".stripMargin

implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()
implicit val slickSession: SlickSession = SlickSession.forConfig(ConfigFactory.parseString(slickDbConfig))
import slickSession.profile.api._

val responses: Source[String, NotUsed] = Slick.source(
  sql"select my_text from my_table".as(GetResult(r => r.nextString())) // limit 100
)

val future = responses.runForeach((myText: String) =>
  println("my_text: " + myText.length)
)

Await.result(future, Duration.Inf)

来自Slick documentation

Note: Some database systems may require session parameters to be set in a certain way to support streaming without caching all data at once in memory on the client side. For example, PostgreSQL requires both .withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = n) (with the desired page size n) and .transactionally for proper streaming.

换句话说,为了防止数据库将所有查询结果加载到内存中,可能需要额外的配置。此配置依赖于数据库。 MySQL documentation 声明如下:

By default, ResultSets are completely retrieved and stored in memory. In most cases this is the most efficient way to operate and, due to the design of the MySQL network protocol, is easier to implement. If you are working with ResultSets that have a large number of rows or large values and cannot allocate heap space in your JVM for the memory required, you can tell the driver to stream the results back one row at a time.

To enable this functionality, create a Statement instance in the following manner:

stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY,
              java.sql.ResultSet.CONCUR_READ_ONLY);
stmt.setFetchSize(Integer.MIN_VALUE);

The combination of a forward-only, read-only result set, with a fetch size of Integer.MIN_VALUE serves as a signal to the driver to stream result sets row-by-row.

在Slick中设置以上配置:

import slick.jdbc._

val query =
  sql"select my_text from my_table".as(GetResult(r => r.nextString()))
    .withStatementParameters(
      rsType = ResultSetType.ForwardOnly,
      rsConcurrency = ResultSetConcurrency.ReadOnly,
      fetchSize = Int.MinValue
    )//.transactionally <-- I'm not sure whether you need ".transactionally"

val responses: Source[String, NotUsed] = Slick.source(query)