Kafkacat如何重新发布维护密钥的二进制消息

Kafkacat how to republish a binary message maintaining key

我一直在尝试使用kafkacat在主题中查找消息并将其发布回主题。我们使用 protobuf,因此消息值应该以字节为单位(键可以不同,例如字符串或字节)。但是,我无法发布可以正确反序列化的消息。

如何使用 kafkacat 执行此操作?我也愿意使用其他推荐的工具来执行此操作。


示例尝试:

kafkacat -b <broker> -t <topic> -o -10 -e -c 1 -C -K: > test2.txt
cat test2.txt | kafkacat -b <broker> -t <topic> -P -K:

test2.txt 显示:

21aa7e2f-41a1-4972-9108-3057627d53f0:
Y/<protobuf.class.path>i
aa7e2f-41a1-4972-9108-3057627d53f0A
DIABETEBG_METERBG300"BG300*QP3687WK02000012????

但是当我使用相同的kafkacat消费者命令获取结果时,我只得到最后一行:

DIABETEBG_METERBG300"BG300*QP3687WK02000012????

我认为问题在于消费输出行(可能是有效负载的一部分?)并且生产者将每一行都视为一条新消息。

the producer is treating each line as a new message

没错。

如果你有二进制文件,我建议为此编写代码,因为 kafkacat 假定 UTF8 编码字符串作为输入