Kafka 字节数组
Kafka ByteArray
我正在使用 Kafka 发送生产和消费消息。
制作很好,与 <String, ByteArray>
制作人合作。
消费时,我使用下面的代码(取自示例)但我得到的每条记录只有 8 个字节(代码下方的示例输出)。
有没有一种方法可以让消费者简单地将整个消息作为字节数组?
代码:
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);
consumer.subscribe(Arrays.asList(topic));
int i = 0;
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(100);
for (ConsumerRecord<String, byte[]> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
System.out 的输出:
offset = 1773133, key = 105906453, value = [B@b8eff39
offset = 1773134, key = 105906453, value = [B@7bb1504
offset = 1773135, key = 105906453, value = [B@67b6c728
offset = 1773136, key = 105906453, value = [B@60b1f9c5
offset = 1773137, key = 105906177, value = [B@1cbab5dd
offset = 1773138, key = 105906177, value = [B@4376907b
offset = 1773139, key = 105906177, value = [B@122880ba
offset = 1773140, key = 105906177, value = [B@7db82ceb
offset = 1773141, key = 105906177, value = [B@34657adc
我不希望必须 assemble 加载这些记录来重新创建消息,因为我相信我遗漏了一些东西并且手动组装可能容易出错。
我认为你应该使用
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), java.util.Arrays.toString(record.value()));
而不是依赖于普通的 array.toString(这给你垃圾而不是实际内容)。数组可能是正确的,你只是错误地调试它。
我正在使用 Kafka 发送生产和消费消息。
制作很好,与 <String, ByteArray>
制作人合作。
消费时,我使用下面的代码(取自示例)但我得到的每条记录只有 8 个字节(代码下方的示例输出)。
有没有一种方法可以让消费者简单地将整个消息作为字节数组?
代码:
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);
consumer.subscribe(Arrays.asList(topic));
int i = 0;
while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(100);
for (ConsumerRecord<String, byte[]> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
System.out 的输出:
offset = 1773133, key = 105906453, value = [B@b8eff39
offset = 1773134, key = 105906453, value = [B@7bb1504
offset = 1773135, key = 105906453, value = [B@67b6c728
offset = 1773136, key = 105906453, value = [B@60b1f9c5
offset = 1773137, key = 105906177, value = [B@1cbab5dd
offset = 1773138, key = 105906177, value = [B@4376907b
offset = 1773139, key = 105906177, value = [B@122880ba
offset = 1773140, key = 105906177, value = [B@7db82ceb
offset = 1773141, key = 105906177, value = [B@34657adc
我不希望必须 assemble 加载这些记录来重新创建消息,因为我相信我遗漏了一些东西并且手动组装可能容易出错。
我认为你应该使用
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), java.util.Arrays.toString(record.value()));
而不是依赖于普通的 array.toString(这给你垃圾而不是实际内容)。数组可能是正确的,你只是错误地调试它。