KStream windowed aggregation - 分区问题
KStream windowed aggregation - partition problems
我在 AWS 上有一个简单的集群设置,其中包含一个 kafka 实例和一个 zookeeper。我正在为此写 <String, String>
并努力在 10 秒内汇总这些值 windows。
我收到错误消息:
DEBUG o.a.kafka.clients.NetworkClient - Sending metadata request {topics=[kafka_test1-write_aggregate-changelog]} to node 100
DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 6 to Cluster(nodes = [12.34.56.78:9092 (id: 100 rack: null)], partitions = [Partition(topic = kafka_test1-write_aggregate-changelog, partition = 1, leader = 100, replicas = [100,], isr = [100,], Partition(topic = kafka_test1-write_aggregate-changelog, partition = 0, leader = 100, replicas = [100,], isr = [100,]])
DEBUG o.a.k.c.consumer.internals.Fetcher - Attempt to fetch offsets for partition kafka_test1-write_aggregate-changelog-0 failed due to obsolete leadership information, retrying.
cluster metadata
#无限前进
代码:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String> dbwriteTable = lines.aggregateByKey(
new DBAggregateInit(),
new DBAggregate(),
TimeWindows.of("write_aggregate", 10000));
dbwriteTable.toStream().print();
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
其中 DBAggregateInit
和 DBAggregate
被删除以在遇到任何问题时登录到 DEBUG。没有其他功能。
None 这些存根函数曾经被命中。
不确定我在这里错过了什么步骤。如果我 .foreach()
或对该主题进行简单阅读,它似乎工作正常。
FWIW:
当我让 kafka 创建我的主题而不是使用 kafka-topic --create --topic ...
时,我遇到过类似的分区问题。
我认为此类错误是由于我 运行ning zookeeper 和 kafka 作为不同的用户,并且各种数据文件夹中可能存在权限问题。
一旦这两个服务都是 运行 root 并且所有相关数据文件都被删除/重新创建,这些错误就消失了。
我在 AWS 上有一个简单的集群设置,其中包含一个 kafka 实例和一个 zookeeper。我正在为此写 <String, String>
并努力在 10 秒内汇总这些值 windows。
我收到错误消息:
DEBUG o.a.kafka.clients.NetworkClient - Sending metadata request {topics=[kafka_test1-write_aggregate-changelog]} to node 100
DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 6 to Cluster(nodes = [12.34.56.78:9092 (id: 100 rack: null)], partitions = [Partition(topic = kafka_test1-write_aggregate-changelog, partition = 1, leader = 100, replicas = [100,], isr = [100,], Partition(topic = kafka_test1-write_aggregate-changelog, partition = 0, leader = 100, replicas = [100,], isr = [100,]])
DEBUG o.a.k.c.consumer.internals.Fetcher - Attempt to fetch offsets for partition kafka_test1-write_aggregate-changelog-0 failed due to obsolete leadership information, retrying.
cluster metadata
#无限前进
代码:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
KTable<Windowed<String>, String> dbwriteTable = lines.aggregateByKey(
new DBAggregateInit(),
new DBAggregate(),
TimeWindows.of("write_aggregate", 10000));
dbwriteTable.toStream().print();
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
其中 DBAggregateInit
和 DBAggregate
被删除以在遇到任何问题时登录到 DEBUG。没有其他功能。
None 这些存根函数曾经被命中。
不确定我在这里错过了什么步骤。如果我 .foreach()
或对该主题进行简单阅读,它似乎工作正常。
FWIW:
当我让 kafka 创建我的主题而不是使用 kafka-topic --create --topic ...
时,我遇到过类似的分区问题。
我认为此类错误是由于我 运行ning zookeeper 和 kafka 作为不同的用户,并且各种数据文件夹中可能存在权限问题。
一旦这两个服务都是 运行 root 并且所有相关数据文件都被删除/重新创建,这些错误就消失了。