Flink - InstanceAlreadyExistsException:迁移到 KafkaSource 时

Flink - InstanceAlreadyExistsException: while migrating to the KafkaSource

我正在使用 v1.13.2 的 flink。我正在尝试将 FlinkKafkaConsumer 迁移到 KafkaSource。当我测试新的 KafkaSource 时,出现以下异常:

2022-04-27 12:49:13,206 WARN  org.apache.kafka.common.utils.AppInfoParser                  [] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=my-kafka-id-7
    at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) ~[?:?]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) ~[?:?]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) ~[?:?]
    at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) ~[?:?]
    at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:?]
    at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
    at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.<init>(KafkaPartitionSplitReader.java:90) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
    at org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader[=10=](KafkaSource.java:145) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
    at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:136) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
    at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:61) 
    ...

以下是 FlinkKafkaConsumer 和 KafkaConsumer 配置:


import java.util.Properties;

public class KafkaConsumer {

    private KafkaConsumer() {

    }

    public static Properties getKafkaProp(String kafkaTopicName){
        Properties properties = new Properties();
        String kafkaBrokerServers;
        properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "...");

        switch (kafkaTopicName)
        {
            case ...:
                kafkaBrokerServers = "1.2.3.4";
                break;
            case ...:
                kafkaBrokerServers = "3.4.5.6";
                break;
            default:
                kafkaBrokerServers = "6.7.7.9";
                break;
        }

        properties.setProperty("bootstrap.servers", kafkaBrokerServers);
        String kafkaGroupId = "my-kafka-id"
        properties.setProperty("group.id", kafkaGroupId);
        properties.setProperty("partition.discovery.interval.ms", "10000");

        return properties;
    }


    public static<T>  FlinkKafkaConsumer<T> getKafkaConsumerForFlink(String kafkaTopicName, DeserializationSchema<T> deserializationSchema, Properties properties) {
        FlinkKafkaConsumer<T> consumer = new FlinkKafkaConsumer<>(
                kafkaTopicName,
                deserializationSchema,
                properties);

        consumer.setStartFromLatest();
        return consumer;
    }

    public static<T> KafkaSource<T> getKafkaSourceForFlink(String kafkaTopicNames, DeserializationSchema<T> deserializationSchema, Properties properties) {
        return KafkaSource.<T>builder()
                .setTopics(kafkaTopicNames)
                .setProperties(properties)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(deserializationSchema)
                .build();
    }
}


public class KafkaStream{

    public DataStream<KafkaObject> getKafkaStream_1(ExecutionParameters executionParameters) {
        KafkaSource<KafkaObject> consumerBinden = KafkaConsumer.getKafkaSourceForFlink("topic-1", new KafkaEntitySerialization<>(KafkaObject.class), getKafkaProp("topic-1"));
        return executionParameters.getFlinkExecutionEnvironment().fromSource(consumerBinden, WatermarkStrategy.noWatermarks(), "topic-1").setParallelism(15).uid({RandomGeneratedString}).disableChaining();
    }

    public DataStream<KafkaObject> getKafkaStream_2(ExecutionParameters executionParameters) {
        KafkaSource<KafkaObject> kafka = KafkaConsumer.getKafkaSourceForFlink("topic-2", new KafkaEntitySerialization<>(KafkaObject.class), getKafkaProp("topic-2"));
        return executionParameters.getFlinkExecutionEnvironment().fromSource(consumerRal, WatermarkStrategy.noWatermarks(), "topic-2" ).setParallelism(15).uid({RandomGeneratedString}).disableChaining();
    }

}

我还使用以下代码片段创建了 KafkaSource,但没有成功:

public static<T> KafkaSource<T> getKafkaSourceForFlink(String kafkaTopicNames, DeserializationSchema<T> deserializationSchema, Properties properties) {
        return KafkaSource.<T>builder()
                .setBootstrapServers(properties.getProperty("bootstrap.servers"))
                .setTopics(kafkaTopicNames)
                .setGroupId(properties.getProperty("group.id"))
                .setProperties(properties)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(deserializationSchema)
                .build();
    }

可能是什么问题,我该如何解决?


更新:问题出在 client_id。我对不同的主题使用相同的 client.id。 如果有人遇到相同的警告,请尝试设置 setClientIdPrefix:

    public static<T> KafkaSource<T> getKafkaSourceForFlink(String kafkaTopicName, DeserializationSchema<T> deserializationSchema, Properties properties)
    {
        return KafkaSource.<T>builder()
                .setTopics(kafkaTopicName)
                .setProperties(properties)
                .setClientIdPrefix(UUID.randomUUID().toString())
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(deserializationSchema)
                .build();
    }
}

您列出的错误是警告,并非异常。

根据 InstanceAlreadyExistsException coming from kafka consumer,我怀疑您使用的是相同的 client.id。该线程中的建议是将其更改为唯一名称。