尝试解码 Spring Cloud Stream 中的 Avro 消息时出现无法识别的 header 字节错误
Unrecognised header byte error when try to decode an Avro message in Spring Cloud Stream
我正在尝试为我的 Spring Cloud Stream 应用程序编写测试用例。我正在使用 Confluent Schema Registry 和 Avro,所以我需要在从通道轮询后解码消息。这是我的代码:
processor.input()
.send(MessageBuilder.withPayload(InputData).build());
Message<?> message = messageCollector.forChannel(processor.output()).poll();
BinaryMessageDecoder<OutputData> decoder = OutputData.getDecoder();
OutputData outputObject = decoder.decode((byte[]) message.getPayload());
出于某种原因,此代码抛出
org.apache.avro.message.BadHeaderException: Unrecognized header bytes: 0x00 0x08
我不确定这是否是我面临的某种错误,或者我没有遵循正确的方法来解码收到的 avro 消息。我怀疑我需要设置 header 一些东西,但我不太确定如何以及究竟是什么。如果有人能帮助我解决这个问题,我将不胜感激。
P.S:我正在使用 spring-cloud-stream-test-support
用于此测试。
使用测试绑定器时,数据不会被 avro 编码。
测试活页夹非常有限
要使用 avro 正确地进行端到端测试,您应该删除测试绑定器并使用带有嵌入式 kafka 代理的真正的 kafka 绑定器。
其中一个 sample apps 展示了如何操作。
事实证明,这个问题与我尝试解码 Avro 消息的方式有关。通过使用官方 Avro 库,以下代码对我有用:
Decoder decoder = DecoderFactory.get().binaryDecoder((byte[]) message.getPayload(), null);
DatumReader<OutputData> reader = new SpecificDatumReader<>(OutputData.getClassSchema());
RawDataCapsule rawDataCapsule = reader.read(null , decoder);
我正在尝试为我的 Spring Cloud Stream 应用程序编写测试用例。我正在使用 Confluent Schema Registry 和 Avro,所以我需要在从通道轮询后解码消息。这是我的代码:
processor.input()
.send(MessageBuilder.withPayload(InputData).build());
Message<?> message = messageCollector.forChannel(processor.output()).poll();
BinaryMessageDecoder<OutputData> decoder = OutputData.getDecoder();
OutputData outputObject = decoder.decode((byte[]) message.getPayload());
出于某种原因,此代码抛出
org.apache.avro.message.BadHeaderException: Unrecognized header bytes: 0x00 0x08
我不确定这是否是我面临的某种错误,或者我没有遵循正确的方法来解码收到的 avro 消息。我怀疑我需要设置 header 一些东西,但我不太确定如何以及究竟是什么。如果有人能帮助我解决这个问题,我将不胜感激。
P.S:我正在使用 spring-cloud-stream-test-support
用于此测试。
使用测试绑定器时,数据不会被 avro 编码。
测试活页夹非常有限
要使用 avro 正确地进行端到端测试,您应该删除测试绑定器并使用带有嵌入式 kafka 代理的真正的 kafka 绑定器。
其中一个 sample apps 展示了如何操作。
事实证明,这个问题与我尝试解码 Avro 消息的方式有关。通过使用官方 Avro 库,以下代码对我有用:
Decoder decoder = DecoderFactory.get().binaryDecoder((byte[]) message.getPayload(), null);
DatumReader<OutputData> reader = new SpecificDatumReader<>(OutputData.getClassSchema());
RawDataCapsule rawDataCapsule = reader.read(null , decoder);