将数据从 CloudSql 流式传输到 Dataflow

Streaming data from CloudSql into Dataflow

我们目前正在探索如何使用 Apache Beam/Google Dataflow 在 Google 云 SQL 数据库 (MySQL) 中处理大量数据存储。

数据库在单个 table 中存储了大约 200GB 的数据。

我们使用 JdbcIO 成功地从数据库中读取了行,但到目前为止,这只有在我们 LIMIT 查询的行数时才有可能。否则我们会运行进入内存问题。我假设默认情况下 SELECT 查询尝试将所有结果行加载到内存中。

这个的惯用方法是什么?批处理 SQL 查询?流式传输结果?

我们尝试调整执行语句的 fetch size,但没有成功。

这就是我们的 JDBC 读取设置的样子:

JdbcReadOptions(
  connectionOptions = connOpts,
  query = "SELECT data FROM raw_data",
  statementPreparator = statement => statement.setFetchSize(100),
  rowMapper = result => result.getString(1)
)

到目前为止,我还没有找到任何关于来自 sql 的流的资源。

编辑

我将列出我采用的一种视图方法,以便其他人可以学到一些东西(例如如何去做)。为了获得更多上下文,所讨论的数据库 table 的结构确实很糟糕:它有一个包含 JSON 字符串的列,以及 id 列(主键)加上一个 addedmodified 列(均为 TIMESTAMP 类型)。在第一次接近时,它没有进一步的指数。 table 包含 25 个 mio 行。所以这可能更多是数据库问题而不是 Apache Beam/JDBC 问题。尽管如此:

方法 1(上文)- 查询所有内容

基本上是这样的:

val readOptions = JdbcReadOptions(
  connectionOptions = connOpts,
  query = "SELECT data FROM raw_data",
  rowMapper = result => result.getString(1)
)

context
  .jdbcSelect(readOptions)
  .map(/*...*/)

如果我在查询中添加了 LIMIT,这会起作用。但是明显很慢。

方法 2 - 键集分页

val queries = List(
  "SELECT data from raw_data LIMIT 5000 OFFSET 0",
  "SELECT data from raw_data LIMIT 5000 OFFSET 5000",
  "SELECT data from raw_data LIMIT 5000 OFFSET 10000"
  // ...
)

context
  .parallelize(queries)
  .map(query => {
      val connection = DriverManager.getConnection(/* */)
      val statement = connection.prepareStatement(query)
      val result = statement.executeQuery()

      makeIterable(result) // <-- creates a Iterator[String]
  })
  .flatten
  .map(/* processing */)

虽然我很快了解到 LIMIT _ OFFSET _ 组合也从第一行开始扫描,但效果稍好一些。因此每个后续查询都花费了更长的时间,收敛到很长时间。

方法 2.5 - 带排序的键集分页

与上述方法类似,但我们在 added 列上创建了一个索引并将查询更新为

SELECT data FROM raw_data ORDER BY added LIMIT 5000 OFFSET x

这加快了速度,但最终查询时间变长了。

方法 3 - 否 Beam/Dataflow

val connection = DriverManager.getConnection(/* */)
val statement = connection.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
statement.setFetchSize(Integer.MIN_VALUE)

val rs = statement.executeQuery("SELECT data FROM raw_data")

while(rs.next()) {
  writer writeLine rs.getString(1)
}

这会将结果集逐行流回并将行写入文件。 运行 所有 25 条 mio 记录大约需要 2 小时。最后。如果有人能指出如何使用 Beam 实现此解决方案,那就太好了。

顺便说一句:现在我有了原始数据,因为使用 Beam 处理 CSV 文件是一件轻而易举的事。它是大约 80GB 的原始数据,可以在大约 5 分钟内通过自动缩放等运行转换为另一种 CSV 格式。

我认为 JDBCIO 由于其固有的局限性(单个 SELECT)而不能很好地扩展。我不知道来自 MySQL 和 BEAM 的流媒体支持。

您可以将您的数据库转储到数据处理系统更容易处理的内容(例如,csv)。它对你有用吗?

看来MySQL JDBC 驱动程序需要一些特殊措施才能使其不将整个结果集加载到内存中;例如我找到了 this code solving the problem in a different project. JdbcIO will need to do the same, or at least be configurable enough to let a user do it. I filed issue https://issues.apache.org/jira/browse/BEAM-3714.

同时,作为一种解决方法,您可以使用 JdbcIO.readAll() 将您的查询划分为许多较小的查询,例如您可以按一系列 ID 对其进行分区。请注意,它们之间不会强制执行事务一致性 - 就 MySQL 而言,它们将是独立的查询。