flink kafkaproducer 在检查点恢复时以 exactly once 模式发送重复消息

flink kafkaproducer send duplicate message in exactly once mode when checkpoint restore

我正在写一个案例来测试flink的两步提交,下面是概述。

sink kafka 正好是一次 kafka 生产者。 sink step 是 mysql 下沉扩展 two step commitsink compare 是 mysql sink extend two step commit,这个 sink 偶尔会抛出一个异常来模拟检查点失败。

当检查点失败并恢复时,我发现 mysql 两步提交可以正常工作,但是 kafka 消费者将读取上次成功的偏移量,而 kafka 生产者即使完成了也会产生消息在此检查点失败之前

在这种情况下如何避免重复消息?

感谢您的帮助。

环境:

kafka生产者代码:

        dataStreamReduce.addSink(new FlinkKafkaProducer<>(
                "flink_output",
                new KafkaSerializationSchema<Tuple4<String, String, String, Long>>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(Tuple4<String, String, String, Long> element, @Nullable Long timestamp) {
                        UUID uuid = UUID.randomUUID();
                        JSONObject jsonObject = new JSONObject();
                        jsonObject.put("uuid", uuid.toString());
                        jsonObject.put("key1", element.f0);
                        jsonObject.put("key2", element.f1);
                        jsonObject.put("key3", element.f2);
                        jsonObject.put("indicate", element.f3);
                        return new ProducerRecord<>("flink_output", jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));
                    }
                },
                kafkaProps,
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        )).name("sink kafka");

检查点设置:

        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(10000);
        executionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
        executionEnvironment.getCheckpointConfig().setPreferCheckpointForRecovery(true);

mysql下沉:

dataStreamReduce.addSink(
                new TwoPhaseCommitSinkFunction<Tuple4<String, String, String, Long>,
                        Connection, Void>
                        (new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE) {

                    int count = 0;
                    Connection connection;

                    @Override
                    protected void invoke(Connection transaction, Tuple4<String, String, String, Long> value, Context context) throws Exception {
                        if (count > 10) {
                            throw new Exception("compare test exception.");
                        }
                        PreparedStatement ps = transaction.prepareStatement(
                                " insert into test_two_step_compare(slot_time, key1, key2, key3, indicate) " +
                                        " values(?, ?, ?, ?, ?) " +
                                        " ON DUPLICATE KEY UPDATE indicate = indicate + values(indicate) "
                        );
                        ps.setString(1, context.timestamp().toString());
                        ps.setString(2, value.f0);
                        ps.setString(3, value.f1);
                        ps.setString(4, value.f1);
                        ps.setLong(5, value.f3);
                        ps.execute();
                        ps.close();
                        count += 1;
                    }

                    @Override
                    protected Connection beginTransaction() throws Exception {
                        LOGGER.error("compare in begin transaction");
                        try {
                            if (connection.isClosed()) {
                                throw new Exception("mysql connection closed");
                            }
                        }catch (Exception e) {
                            LOGGER.error("mysql connection is error: " + e.toString());
                            LOGGER.error("reconnect mysql connection");
                            String jdbcURI = "jdbc:mysql://";
                            Class.forName("com.mysql.jdbc.Driver");
                            Connection connection = DriverManager.getConnection(jdbcURI);
                            connection.setAutoCommit(false);
                            this.connection = connection;
                        }
                        return this.connection;
                    }

                    @Override
                    protected void preCommit(Connection transaction) throws Exception {
                        LOGGER.error("compare in pre Commit");
                    }

                    @Override
                    protected void commit(Connection transaction) {
                        LOGGER.error("compare in commit");
                        try {
                            transaction.commit();
                        } catch (Exception e) {
                            LOGGER.error("compare Commit error: " + e.toString());
                        }
                    }

                    @Override
                    protected void abort(Connection transaction) {
                        LOGGER.error("compare in abort");
                        try {
                            transaction.rollback();
                        } catch (Exception e) {
                            LOGGER.error("compare abort error." + e.toString());
                        }
                    }

                    @Override
                    protected void recoverAndCommit(Connection transaction) {
                        super.recoverAndCommit(transaction);
                        LOGGER.error("compare in recover And Commit");
                    }

                    @Override
                    protected void recoverAndAbort(Connection transaction) {
                        super.recoverAndAbort(transaction);
                        LOGGER.error("compare in recover And Abort");
                    }
                })
                .setParallelism(1).name("sink compare");

我不太确定我是否正确理解了问题:

When checkpoint is failed and restore, I find mysql two step commit will work fine, but kafka producer will read offset from last success and produce message even he was done it before this checkpoint failed.

Kafka 生产者未读取任何数据。所以,我假设你的整个管道重新读取旧的偏移量并产生重复项。如果是,你需要了解Flink是如何保证exactly once的。

  1. 创建定期检查点以在出现故障时保持一致的状态。
  2. 这些检查点包含检查点时最后一次成功读取记录的偏移量。
  3. 恢复后,Flink 将重新读取存储在最后一个成功检查点中的偏移量的所有记录。因此,将重放在最后一个检查点和失败之间生成的相同记录。
  4. 重放的记录会恢复到失败前的状态
  5. 它将产生 重复的 输出,源自重放的输入记录。
  6. 接收器有责任确保没有重复有效地写入目标系统。

最后一点,有两种选择:

  • 仅在写入检查点时输出数据,这样目标中就不会出现有效的重复项。这种天真的方法非常通用(独立于接收器),但将检查点间隔添加到延迟中。
  • 让接收器对输出进行重复数据删除。

后一个选项用于Kafka sink。它使用 Kafka 事务来删除重复数据。为避免在消费者端出现重复,您需要确保它未读取 uncommitted data as mentioned in the documentation。还要确保您的事务超时足够大,不会在失败和恢复之间丢弃数据。