Apache Flink KafkaSource 没有设置 group.id

Apache Flink KafkaSource doesnt set group.id

我有一个简单的流执行配置为:

val config: Configuration = new Configuration()
config.setString("taskmanager.memory.managed.size", "4g")
config.setString("parallelism.default", "4")
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config)

env
  .fromSource(KafkaSource.builder[String]
    .setBootstrapServers("node1:9093,node2:9093,node3:9093")
    .setTopics("example-topic")
    //.setProperties(kafkaProps) // didn't work
    .setProperty("security.protocol", "SASL_SSL")
    .setProperty("sasl.mechanism", "GSSAPI")
    .setProperty("sasl.kerberos.service.name", "kafka")
    .setProperty("group.id","groupid-test")
    //.setGroupId("groupid-test") // didn't work
    .setStartingOffsets(OffsetsInitializer.earliest)
    .setProperty("partition.discovery.interval.ms", "60000") // discover part
    .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(classOf[StringDeserializer]))
    .build,
    WatermarkStrategy.noWatermarks[String],
    "example-input-topic"
  )
  .print

env.execute("asdasd")

我的flink版本是:1.14.2

我的kafka是在cloudera上运行。卡夫卡版本:2.2.1-cdh6.3.2

我能够使用来自 Kafka 的记录。但它没有为主题设置 groupid。有人有什么想法吗?

从 Flink 1.14.0 开始,group.id 是一个可选值。参见 https://issues.apache.org/jira/browse/FLINK-24051. You can set your own value if you want to specify one. You can see from the accompanying PR how this was previously set at https://github.com/apache/flink/pull/17052/files#diff-34b4ff8d43271eeac91ba17f29b13322f6e0ff3d15f71003a839aeb780fe30fbL56