KStream windowed aggregation - 分区问题

KStream windowed aggregation - partition problems

我在 AWS 上有一个简单的集群设置,其中包含一个 kafka 实例和一个 zookeeper。我正在为此写 <String, String> 并努力在 10 秒内汇总这些值 windows。

我收到错误消息:

DEBUG  o.a.kafka.clients.NetworkClient - Sending metadata request {topics=[kafka_test1-write_aggregate-changelog]} to node 100
DEBUG  org.apache.kafka.clients.Metadata - Updated cluster metadata version 6 to Cluster(nodes = [12.34.56.78:9092 (id: 100 rack: null)], partitions = [Partition(topic = kafka_test1-write_aggregate-changelog, partition = 1, leader = 100, replicas = [100,], isr = [100,], Partition(topic = kafka_test1-write_aggregate-changelog, partition = 0, leader = 100, replicas = [100,], isr = [100,]])
DEBUG  o.a.k.c.consumer.internals.Fetcher - Attempt to fetch offsets for partition kafka_test1-write_aggregate-changelog-0 failed due to obsolete leadership information, retrying.

cluster metadata#无限前进

代码:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

KTable<Windowed<String>, String>  dbwriteTable = lines.aggregateByKey(
            new DBAggregateInit(),
            new DBAggregate(),
            TimeWindows.of("write_aggregate", 10000));

dbwriteTable.toStream().print();

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

其中 DBAggregateInitDBAggregate 被删除以在遇到任何问题时登录到 DEBUG。没有其他功能。

None 这些存根函数曾经被命中。

不确定我在这里错过了什么步骤。如果我 .foreach() 或对该主题进行简单阅读,它似乎工作正常。

FWIW:

当我让 kafka 创建我的主题而不是使用 kafka-topic --create --topic ... 时,我遇到过类似的分区问题。

我认为此类错误是由于我 运行ning zookeeper 和 kafka 作为不同的用户,并且各种数据文件夹中可能存在权限问题。

一旦这两个服务都是 运行 root 并且所有相关数据文件都被删除/重新创建,这些错误就消失了。