Kafka 字节数组

Kafka ByteArray

我正在使用 Kafka 发送生产和消费消息。

制作很好,与 <String, ByteArray> 制作人合作。

消费时,我使用下面的代码(取自示例)但我得到的每条记录只有 8 个字节(代码下方的示例输出)。

有没有一种方法可以让消费者简单地将整个消息作为字节数组?

代码:

props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);

consumer.subscribe(Arrays.asList(topic));
int i = 0;

while (true) {
    ConsumerRecords<String, byte[]> records = consumer.poll(100);
    for (ConsumerRecord<String, byte[]> record : records)
        System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}

System.out 的输出:

offset = 1773133, key = 105906453, value = [B@b8eff39
offset = 1773134, key = 105906453, value = [B@7bb1504
offset = 1773135, key = 105906453, value = [B@67b6c728
offset = 1773136, key = 105906453, value = [B@60b1f9c5
offset = 1773137, key = 105906177, value = [B@1cbab5dd
offset = 1773138, key = 105906177, value = [B@4376907b
offset = 1773139, key = 105906177, value = [B@122880ba
offset = 1773140, key = 105906177, value = [B@7db82ceb
offset = 1773141, key = 105906177, value = [B@34657adc

我不希望必须 assemble 加载这些记录来重新创建消息,因为我相信我遗漏了一些东西并且手动组装可能容易出错。

我认为你应该使用

System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), java.util.Arrays.toString(record.value()));

而不是依赖于普通的 array.toString(这给你垃圾而不是实际内容)。数组可能是正确的,你只是错误地调试它。