Kafkacat如何重新发布维护密钥的二进制消息
Kafkacat how to republish a binary message maintaining key
我一直在尝试使用kafkacat在主题中查找消息并将其发布回主题。我们使用 protobuf,因此消息值应该以字节为单位(键可以不同,例如字符串或字节)。但是,我无法发布可以正确反序列化的消息。
如何使用 kafkacat 执行此操作?我也愿意使用其他推荐的工具来执行此操作。
示例尝试:
kafkacat -b <broker> -t <topic> -o -10 -e -c 1 -C -K: > test2.txt
cat test2.txt | kafkacat -b <broker> -t <topic> -P -K:
test2.txt 显示:
21aa7e2f-41a1-4972-9108-3057627d53f0:
Y/<protobuf.class.path>i
aa7e2f-41a1-4972-9108-3057627d53f0A
DIABETEBG_METERBG300"BG300*QP3687WK02000012????
但是当我使用相同的kafkacat消费者命令获取结果时,我只得到最后一行:
DIABETEBG_METERBG300"BG300*QP3687WK02000012????
我认为问题在于消费输出行(可能是有效负载的一部分?)并且生产者将每一行都视为一条新消息。
the producer is treating each line as a new message
没错。
如果你有二进制文件,我建议为此编写代码,因为 kafkacat 假定 UTF8 编码字符串作为输入
我一直在尝试使用kafkacat在主题中查找消息并将其发布回主题。我们使用 protobuf,因此消息值应该以字节为单位(键可以不同,例如字符串或字节)。但是,我无法发布可以正确反序列化的消息。
如何使用 kafkacat 执行此操作?我也愿意使用其他推荐的工具来执行此操作。
示例尝试:
kafkacat -b <broker> -t <topic> -o -10 -e -c 1 -C -K: > test2.txt
cat test2.txt | kafkacat -b <broker> -t <topic> -P -K:
test2.txt 显示:
21aa7e2f-41a1-4972-9108-3057627d53f0:
Y/<protobuf.class.path>i
aa7e2f-41a1-4972-9108-3057627d53f0A
DIABETEBG_METERBG300"BG300*QP3687WK02000012????
但是当我使用相同的kafkacat消费者命令获取结果时,我只得到最后一行:
DIABETEBG_METERBG300"BG300*QP3687WK02000012????
我认为问题在于消费输出行(可能是有效负载的一部分?)并且生产者将每一行都视为一条新消息。
the producer is treating each line as a new message
没错。
如果你有二进制文件,我建议为此编写代码,因为 kafkacat 假定 UTF8 编码字符串作为输入