为什么 Kafka Mirrormaker 目标主题包含原始消息的一半?

Why does Kafka Mirrormaker target topic contain half of original messages?

我想从 Kafka 集群中的主题复制所有消息。所以我 运行 Kafka Mirrormaker 但是它似乎只从源集群复制了大约一半的消息(我检查了源主题中没有消费者滞后)。我在源集群中有 2 个代理,这与此有什么关系吗?

这是源集群配置:

log.retention.ms=1814400000
transaction.state.log.replication.factor=2
offsets.topic.replication.factor=2
auto.create.topics.enable=true
default.replication.factor=2
min.insync.replicas=1
num.io.threads=8
num.network.threads=5
num.partitions=1
num.replica.fetchers=2
replica.lag.time.max.ms=30000
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
socket.send.buffer.bytes=102400
unclean.leader.election.enable=true
zookeeper.session.timeout.ms=18000

源主题有 4 个分区,未压缩。 Mirrormaker 配置为:

bootstrap.servers=broker1:9092,broker2:9092
group.id=picturesGroup3
auto.offset.reset=earliest
bootstrap.servers=localhost:9092
max.in.flight.requests.per.connection=1
retries=2000000000
acks=all
max.block.ms=2000000000

以下是 Kafdrop 关于源集群主题的统计数据:

Partition First Offset Last Offset Size Leader Node Replica Nodes In-sync Replica Nodes Offline Replica Nodes Preferred Leader Under-replicated
0 13659 17768 4109 1 1 1 Yes No
1 13518 17713 4195 2 2 2 Yes No
2 13664 17913 4249 1 1 1 Yes No
3 13911 18072 4161 2 2 2 Yes No

这些是 Mirrormaker 运行 之后目标主题的统计数据:

Partition First Offset Last Offset Size Leader Node Replica Nodes In-sync Replica Nodes Offline Replica Nodes Preferred Leader Under-replicated
0 2132 4121 1989 1 1 1 Yes No
1 2307 4217 1910 1 1 1 Yes No
2 2379 4294 1915 1 1 1 Yes No
3 2218 4083 1865 1 1 1 Yes No

如您所见,根据大小列,大约只有一半的源消息位于目标主题中。我做错了什么?

我意识到问题的发生是因为我正在将数据从具有 2 个代理的集群复制到具有 1 个代理的集群。所以我假设 Mirrormaker1 只是从原始集群的一个代理复制了数据。当我将目标集群配置为有 2 个代理时,所有消息都被复制到它。


关于 @OneCricketeer 使用 Mirrormaker2 的建议,这也有效,但是我花了一段时间才得到正确的配置文件:

clusters = source, dest

source.bootstrap.servers = sourcebroker1:9092,sourcebroker2:9092
dest.bootstrap.servers = destbroker1:9091,destbroker2:9092
topics = .*
groups = mm2topic
source->dest.enabled = true
offsets.topic.replication.factor=1
offset.storage.replication.factor=1
auto.offset.reset=latest

此外,在这个KafkaConnect项目的connect容器中可以找到Mirrormaker2(进入容器,在/kafka/bin目录下会有connect-mirror-maker.sh可执行文件)。

Mirrormaker2 解决方案的主要缺点是它将为目标集群中的主题添加前缀(在我的例子中,新名称需要更改应用程序代码)。在 Mirrormaker2 配置中无法更改前缀,因此唯一的方法是按照 here.

中的说明实现自定义 Java class