Flink - InstanceAlreadyExistsException:迁移到 KafkaSource 时
Flink - InstanceAlreadyExistsException: while migrating to the KafkaSource
我正在使用 v1.13.2 的 flink。我正在尝试将 FlinkKafkaConsumer 迁移到 KafkaSource。当我测试新的 KafkaSource 时,出现以下异常:
2022-04-27 12:49:13,206 WARN org.apache.kafka.common.utils.AppInfoParser [] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=my-kafka-id-7
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) ~[?:?]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) ~[?:?]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) ~[?:?]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) ~[?:?]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:?]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.<init>(KafkaPartitionSplitReader.java:90) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader[=10=](KafkaSource.java:145) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:136) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:61)
...
以下是 FlinkKafkaConsumer 和 KafkaConsumer 配置:
import java.util.Properties;
public class KafkaConsumer {
private KafkaConsumer() {
}
public static Properties getKafkaProp(String kafkaTopicName){
Properties properties = new Properties();
String kafkaBrokerServers;
properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "...");
switch (kafkaTopicName)
{
case ...:
kafkaBrokerServers = "1.2.3.4";
break;
case ...:
kafkaBrokerServers = "3.4.5.6";
break;
default:
kafkaBrokerServers = "6.7.7.9";
break;
}
properties.setProperty("bootstrap.servers", kafkaBrokerServers);
String kafkaGroupId = "my-kafka-id"
properties.setProperty("group.id", kafkaGroupId);
properties.setProperty("partition.discovery.interval.ms", "10000");
return properties;
}
public static<T> FlinkKafkaConsumer<T> getKafkaConsumerForFlink(String kafkaTopicName, DeserializationSchema<T> deserializationSchema, Properties properties) {
FlinkKafkaConsumer<T> consumer = new FlinkKafkaConsumer<>(
kafkaTopicName,
deserializationSchema,
properties);
consumer.setStartFromLatest();
return consumer;
}
public static<T> KafkaSource<T> getKafkaSourceForFlink(String kafkaTopicNames, DeserializationSchema<T> deserializationSchema, Properties properties) {
return KafkaSource.<T>builder()
.setTopics(kafkaTopicNames)
.setProperties(properties)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(deserializationSchema)
.build();
}
}
public class KafkaStream{
public DataStream<KafkaObject> getKafkaStream_1(ExecutionParameters executionParameters) {
KafkaSource<KafkaObject> consumerBinden = KafkaConsumer.getKafkaSourceForFlink("topic-1", new KafkaEntitySerialization<>(KafkaObject.class), getKafkaProp("topic-1"));
return executionParameters.getFlinkExecutionEnvironment().fromSource(consumerBinden, WatermarkStrategy.noWatermarks(), "topic-1").setParallelism(15).uid({RandomGeneratedString}).disableChaining();
}
public DataStream<KafkaObject> getKafkaStream_2(ExecutionParameters executionParameters) {
KafkaSource<KafkaObject> kafka = KafkaConsumer.getKafkaSourceForFlink("topic-2", new KafkaEntitySerialization<>(KafkaObject.class), getKafkaProp("topic-2"));
return executionParameters.getFlinkExecutionEnvironment().fromSource(consumerRal, WatermarkStrategy.noWatermarks(), "topic-2" ).setParallelism(15).uid({RandomGeneratedString}).disableChaining();
}
}
我还使用以下代码片段创建了 KafkaSource,但没有成功:
public static<T> KafkaSource<T> getKafkaSourceForFlink(String kafkaTopicNames, DeserializationSchema<T> deserializationSchema, Properties properties) {
return KafkaSource.<T>builder()
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(kafkaTopicNames)
.setGroupId(properties.getProperty("group.id"))
.setProperties(properties)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(deserializationSchema)
.build();
}
可能是什么问题,我该如何解决?
更新:问题出在 client_id。我对不同的主题使用相同的 client.id。
如果有人遇到相同的警告,请尝试设置 setClientIdPrefix
:
public static<T> KafkaSource<T> getKafkaSourceForFlink(String kafkaTopicName, DeserializationSchema<T> deserializationSchema, Properties properties)
{
return KafkaSource.<T>builder()
.setTopics(kafkaTopicName)
.setProperties(properties)
.setClientIdPrefix(UUID.randomUUID().toString())
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(deserializationSchema)
.build();
}
}
您列出的错误是警告,并非异常。
根据 InstanceAlreadyExistsException coming from kafka consumer,我怀疑您使用的是相同的 client.id
。该线程中的建议是将其更改为唯一名称。
我正在使用 v1.13.2 的 flink。我正在尝试将 FlinkKafkaConsumer 迁移到 KafkaSource。当我测试新的 KafkaSource 时,出现以下异常:
2022-04-27 12:49:13,206 WARN org.apache.kafka.common.utils.AppInfoParser [] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=my-kafka-id-7
at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:436) ~[?:?]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1855) ~[?:?]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:955) ~[?:?]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:890) ~[?:?]
at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:320) ~[?:?]
at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[?:?]
at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:814) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:666) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.<init>(KafkaPartitionSplitReader.java:90) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.flink.connector.kafka.source.KafkaSource.lambda$createReader[=10=](KafkaSource.java:145) ~[blob_p-6ddb91cddeeec769ea1c062230f823a348757e9f-2bcb4f9bd83a2a5e043053d1ac91ca90:?]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.createSplitFetcher(SplitFetcherManager.java:136) ~[flink-table-blink_2.11-1.13.2.jar:1.13.2]
at org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager.addSplits(SingleThreadFetcherManager.java:61)
...
以下是 FlinkKafkaConsumer 和 KafkaConsumer 配置:
import java.util.Properties;
public class KafkaConsumer {
private KafkaConsumer() {
}
public static Properties getKafkaProp(String kafkaTopicName){
Properties properties = new Properties();
String kafkaBrokerServers;
properties.setProperty(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "...");
switch (kafkaTopicName)
{
case ...:
kafkaBrokerServers = "1.2.3.4";
break;
case ...:
kafkaBrokerServers = "3.4.5.6";
break;
default:
kafkaBrokerServers = "6.7.7.9";
break;
}
properties.setProperty("bootstrap.servers", kafkaBrokerServers);
String kafkaGroupId = "my-kafka-id"
properties.setProperty("group.id", kafkaGroupId);
properties.setProperty("partition.discovery.interval.ms", "10000");
return properties;
}
public static<T> FlinkKafkaConsumer<T> getKafkaConsumerForFlink(String kafkaTopicName, DeserializationSchema<T> deserializationSchema, Properties properties) {
FlinkKafkaConsumer<T> consumer = new FlinkKafkaConsumer<>(
kafkaTopicName,
deserializationSchema,
properties);
consumer.setStartFromLatest();
return consumer;
}
public static<T> KafkaSource<T> getKafkaSourceForFlink(String kafkaTopicNames, DeserializationSchema<T> deserializationSchema, Properties properties) {
return KafkaSource.<T>builder()
.setTopics(kafkaTopicNames)
.setProperties(properties)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(deserializationSchema)
.build();
}
}
public class KafkaStream{
public DataStream<KafkaObject> getKafkaStream_1(ExecutionParameters executionParameters) {
KafkaSource<KafkaObject> consumerBinden = KafkaConsumer.getKafkaSourceForFlink("topic-1", new KafkaEntitySerialization<>(KafkaObject.class), getKafkaProp("topic-1"));
return executionParameters.getFlinkExecutionEnvironment().fromSource(consumerBinden, WatermarkStrategy.noWatermarks(), "topic-1").setParallelism(15).uid({RandomGeneratedString}).disableChaining();
}
public DataStream<KafkaObject> getKafkaStream_2(ExecutionParameters executionParameters) {
KafkaSource<KafkaObject> kafka = KafkaConsumer.getKafkaSourceForFlink("topic-2", new KafkaEntitySerialization<>(KafkaObject.class), getKafkaProp("topic-2"));
return executionParameters.getFlinkExecutionEnvironment().fromSource(consumerRal, WatermarkStrategy.noWatermarks(), "topic-2" ).setParallelism(15).uid({RandomGeneratedString}).disableChaining();
}
}
我还使用以下代码片段创建了 KafkaSource,但没有成功:
public static<T> KafkaSource<T> getKafkaSourceForFlink(String kafkaTopicNames, DeserializationSchema<T> deserializationSchema, Properties properties) {
return KafkaSource.<T>builder()
.setBootstrapServers(properties.getProperty("bootstrap.servers"))
.setTopics(kafkaTopicNames)
.setGroupId(properties.getProperty("group.id"))
.setProperties(properties)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(deserializationSchema)
.build();
}
可能是什么问题,我该如何解决?
更新:问题出在 client_id。我对不同的主题使用相同的 client.id。
如果有人遇到相同的警告,请尝试设置 setClientIdPrefix
:
public static<T> KafkaSource<T> getKafkaSourceForFlink(String kafkaTopicName, DeserializationSchema<T> deserializationSchema, Properties properties)
{
return KafkaSource.<T>builder()
.setTopics(kafkaTopicName)
.setProperties(properties)
.setClientIdPrefix(UUID.randomUUID().toString())
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(deserializationSchema)
.build();
}
}
您列出的错误是警告,并非异常。
根据 InstanceAlreadyExistsException coming from kafka consumer,我怀疑您使用的是相同的 client.id
。该线程中的建议是将其更改为唯一名称。