Flink - 运行 使用 JdbcDynamicTableSource 对 JDBC 数据库进行连续查询

Flink - Run a continuous query on JDBC database using JdbcDynamicTableSource

我正在使用 Flink 从 postgresql 数据库读取数据,该数据库会不断更新新数据。目前,我可以使用 Flink 的 JdbcCatalog.

从该数据库进行一次性查询

我想运行对该数据库进行连续查询,但是由于sql源不是无限输入,我的查询运行s 一次并停止。据我所知,我可以:

  1. 运行 以暴力方式重复这些查询,可能使用 Iterations.
  2. 使用 JdbcDynamicTableSource,因为连续查询意味着要在动态 table 秒内完成。

第二个解决方案将是理想的。但是,我不知道如何使用 运行 对 JdbcDynamicTableSource 的连续查询。我能够使用所需的连接选项实例化 JdbcDynamicTableSource,但是我应该如何 运行 对其进行连续查询以生成动态 table?本质上,我如何使用 JdbcDynamicTableSource?

任何 advice/code 样本将不胜感激!

我看到有 4 种方法可以解决您的问题:

    1. 将更新日志数据捕获 (CDC) 与 Debezium. CDC will look at your postgres' WAL an produce a stream of changes. Some Flink connectors are already available to interpret it, and build a Table from it 之类的东西一起使用。这应该是您的首选方式,但我相信它需要对您的 postgres 实例具有一些管理员权限。
    1. Use postgres's LISTEN/NOTIFY, pipe it to a message queue, interpret it in Flink with some Deduplication。不过,这种技术看起来既复杂又脆弱。
    1. 使用 Kafka Connect's JDBC Connector,配置为轮询您的 table,incrementing.column.name 设置为递增的主键,或您使用触发器更新的上次更改时间戳。不过你需要 Kafka Streams。不是实时的,但您可以将轮询间隔减少到每秒(确保轮询列上有索引)。

这里的冰山是发生故障时发生的情况。我认为1)和3)应该没问题。此外,还有一些性能问题:1) 会减慢您对 postgres 的写入速度(来自复制 I/O 开销),3) 可能会减慢您对 postgres 的读取速度(来自不断轮询)

所有解决方案都涉及 Kafka 或消息队列。您也可以尝试 4):

  • 实施 3) 并在 Flink SourceFunction 中自己轮询数据库。请务必使用 Stateful Source Function 并将您的查询偏移量作为 ValueState,以便在出现故障时它可以在正确的偏移量处重新启动。处理重复查询的一些想法:将该源的并行度设置为 1,或者轮询以并行度为模的键。

编辑:5. 就像 1. 中的 Debezium CDC,but packaged up as source in Flink。这可能是最好的选择。