如何将一些消息从一个 kafka 主题复制到另一个 bash?
How copy some message from one kafka topic to another from bash?
请帮忙
我们有 2 个 kafka 主题。我想从 topic1 开始复制 10 条消息到 topic2。
我会尝试使用 kafka-console-consumer 和 kafka-console-producer
首先我将 topic1 的 10 条消息保存到某个目录:
for (( i=1; i<=10; i++ )); do bin/kafka-console-consumer.sh --bootstrap-server 1.1.2.3:9092 --group CONSUMER1 --topic TOPIC1 --max-messages 1 > /tmp/_topic/$i.msg; done;
然后我尝试使用 kafka-console-producer 将它发送到 topic2:
for (( i=1; i<=10; i++ )); do bin/kafka-console-producer.sh --broker-list 1.1.2.4:9092 --topic TOPIC2 < /tmp/_topic/$i.msg; done;
我收到错误消息 - 我的服务无法反序列化数据。
我的问题是:
- 我的解决方案行得通吗?
- 为什么我可以收到这个错误?
- 将消息从一个主题复制到另一个主题的最佳方法是什么?
更新:
我是如何解决这个问题的(感谢:Robin Moffatt):
我使用 kafka-mirror 和这个罐子:
https://github.com/opencore/mirrormaker_topic_rename
有了这个,我可以将消息从一个主题 kafka 复制到一个集群上的另一个主题
您可以使用 kafkacat
执行此操作:
kafkacat -b localhost:9092 -C -t source-topic -K: -e -o beginning -c10 | \
kafkacat -b localhost:9092 -P -t target-topic -K:
|
将第一个 kafkacat(-C
消费者)的输出重定向到第二个 kafkacat(-P
生产者)的输入
-c10
表示只消费10条消息
-o beginning
表示从题目开头开始。
请注意,如果您有二进制数据(例如 Avro),这将不起作用。要正确执行此操作,请使用 Replicator 或 MirrorMaker2 和 ByteArrayConverter 之类的东西。
参考:https://rmoff.net/2019/09/29/copying-data-between-kafka-clusters-with-kafkacat/
请帮忙
我们有 2 个 kafka 主题。我想从 topic1 开始复制 10 条消息到 topic2。
我会尝试使用 kafka-console-consumer 和 kafka-console-producer
首先我将 topic1 的 10 条消息保存到某个目录:
for (( i=1; i<=10; i++ )); do bin/kafka-console-consumer.sh --bootstrap-server 1.1.2.3:9092 --group CONSUMER1 --topic TOPIC1 --max-messages 1 > /tmp/_topic/$i.msg; done;
然后我尝试使用 kafka-console-producer 将它发送到 topic2:
for (( i=1; i<=10; i++ )); do bin/kafka-console-producer.sh --broker-list 1.1.2.4:9092 --topic TOPIC2 < /tmp/_topic/$i.msg; done;
我收到错误消息 - 我的服务无法反序列化数据。 我的问题是:
- 我的解决方案行得通吗?
- 为什么我可以收到这个错误?
- 将消息从一个主题复制到另一个主题的最佳方法是什么?
更新: 我是如何解决这个问题的(感谢:Robin Moffatt): 我使用 kafka-mirror 和这个罐子: https://github.com/opencore/mirrormaker_topic_rename 有了这个,我可以将消息从一个主题 kafka 复制到一个集群上的另一个主题
您可以使用 kafkacat
执行此操作:
kafkacat -b localhost:9092 -C -t source-topic -K: -e -o beginning -c10 | \
kafkacat -b localhost:9092 -P -t target-topic -K:
|
将第一个 kafkacat(-C
消费者)的输出重定向到第二个 kafkacat(-P
生产者)的输入-c10
表示只消费10条消息-o beginning
表示从题目开头开始。
请注意,如果您有二进制数据(例如 Avro),这将不起作用。要正确执行此操作,请使用 Replicator 或 MirrorMaker2 和 ByteArrayConverter 之类的东西。
参考:https://rmoff.net/2019/09/29/copying-data-between-kafka-clusters-with-kafkacat/