Flink、Kafka 和 JDBC sink

Flink, Kafka and JDBC sink

我有一个 Flink 1.11 作业,它使用来自 Kafka 主题的消息,对它们进行键控,过滤它们(keyBy 后跟自定义 ProcessFunction),并通过 JDBC 接收器将它们保存到数据库中(如此处所述) : https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/jdbc.html)

Kafka 消费者使用这些选项进行初始化:

properties.setProperty("auto.offset.reset", "earliest")
kafkaConsumer = new FlinkKafkaConsumer(topic, deserializer, properties)
kafkaConsumer.setStartFromGroupOffsets()
kafkaConsumer.setCommitOffsetsOnCheckpoints(true)

集群上启用了检查点。

我想要实现的是保证将所有过滤后的数据保存到数据库中,即使数据库已关闭,比方说,6 小时,或者在保存到数据库和工作需要时出现编程错误进行更新、重新部署和重新启动。

要做到这一点,Kafka 偏移量的任何检查点都应该意味着

  1. 从 Kafka 读取的数据处于 Flink 算子状态,等待过滤/传递到接收器中,并将作为 Flink 算子检查点的一部分进行检查点,或
  2. 从 Kafka 读取的数据已经提交到数据库中。

在查看 JdbcSink 的实现时,我发现它并没有真正保留任何内部状态 checkpointed/restored - 相反,它的检查点是写入数据库。现在,如果此写入在检查点期间失败,并且 Kafka 偏移量确实得到保存,我将处于“丢失”数据的情况 - 从 Kafka 的后续读取将从提交的偏移量恢复,以及数据库运行时正在运行的任何数据写入失败现在不再从 Kafka 读取,也不在数据库中。

所以有没有一种方法可以在完整管道(Kafka -> Flink -> DB)无法执行时停止推进 Kafka 偏移量 - 或者这里的解决方案(在 1.13 之前的世界中)可能是创建我自己的GenericJdbcSinkFunction 的实现将保持一些 ValueState 直到数据库写入成功?

我可以看到 3 个选项:

  1. 使用您的 Flink 版本试用 JDBC 1.13 连接器。它很有可能会起作用。
  2. 如果这不能立即生效,请检查是否可以将其移植到 1.11。应该不会有太大的变化。
  3. 通过扩展 TwoPhaseCommitSinkFunction 或使用 CheckpointedFunctionCheckpointListener 实现您自己的 SinkFunction 来编写您自己的两阶段提交接收器。基本上,您在成功检查点后创建一个新事务并使用 notifyCheckpointCompleted.
  4. 提交它