DataStream Flink中从JDBC源读取数据的问题

Questions for reading data from JDBC source in DataStream Flink

我正在启动一个新的 Flink 应用程序,以允许我的公司执行大量报告。我们有一个现有的遗留系统,我们需要的大部分数据保存在 SQL 服务器数据库中。在开始使用新部署的 Kafka 流中的更多数据之前,我们需要先使用这些数据库中的数据。

我花了很多时间阅读 Flink 书籍和网页,但我有一些简单的问题和假设,希望你能帮助我取得进步。

首先,我想使用 DataStream API 这样我们既可以使用历史数据也可以使用实时数据。我不认为我想使用 DataSet API 但我也没有看到使用 SQL/Table api 的意义,因为我更愿意在 Java [=38 中编写我的函数=].我需要维护自己的状态,DataStream 键控函数似乎是可行的方法。

现在我正在尝试针对我们的生产数据库实际编写代码,我需要能够使用 SQL 查询读取数据“流”——似乎没有 JDBC 源连接器,所以我认为我必须自己调用 JDBC,然后可能使用 env.fromElements() 创建数据源。显然这是一个“有界”数据集,但我还打算如何加载历史数据呢?将来我还想包括一个 Kafka 流,它只有几周的数据,所以我想我有时需要将来自 SQL Server/Snowflake 数据库的数据与实时流合并来自卡夫卡流。什么是最佳实践,因为我没有看到讨论这个的例子。

通过从 JDBC 源检索数据,我还看到了一些使用 StreamingTableEnvironment 的示例 - 我是否打算以某种方式使用它来查询来自 JDBC 连接的数据到我的 DataStream 函数ETC?同样,我想在 Java 而不是某些 Flink SQL 中编写我的函数。如果我只使用 DataStream API,最好使用 StreamingTableEnvironment 来查询 JDBC 数据吗?

可以使用以下方法从数据库读取数据并创建数据流:

  1. 您可以使用 RichParallelSourceFunction,您可以在其中对数据库执行自定义查询并从中获取数据流。可以在 RichParallelSourceFunction class.

    的扩展中触发带有 JDBC 驱动程序的 SQL
  2. 使用 Table DataStream API - 可以通过创建 JDBC 目录然后将其转换为流来查询数据库

  3. 对此的替代方案,也许是更昂贵的解决方案 - 您可以使用 Flink CDC connectors,它为 Apache Flink 提供源连接器,使用变更数据捕获从不同的数据库获取变更(CDC )

然后您可以将 Kafka 添加为源并获取数据流。

因此,您的管道可能如下所示: 您将两个源都转换为数据流,您可以使用例如协处理函数加入这些流,这也将使您有可能维护状态并在业务逻辑中使用它。最后,使用 Sink 函数将最终输出汇入数据库、Kafka 甚至 AWS S3 存储桶。