Spring 从 kinesis 收到的消息中的云流特殊字符

Spring cloud stream Special Chars in Message received from kinesis

当我使用来自运动流的消息时。我用 headers 等

得到了一些垃圾字符
    @StreamListener(Processor.INPUT)
    public void receive(String message) {       
        System.out.println("Message recieved: "+message);
        throw new RuntimeException("Exception thrown");
    }

    @StreamListener("errorChannel")
    public void transform(ErrorMessage errorMessage) throws UnsupportedEncodingException {      

        //original paylaod 
        System.out.println("Error Oiginal Message Payload"+new String((byte[])errorMessage.getOriginalMessage().getPayload(), "UTF-8"));
        System.out.println("Error Original Message Stream channel"+errorMessage.getOriginalMessage().getHeaders().get("aws_receivedStream"));
    }

应用程序 yml

spring:
  cloud:
    stream:
      bindings:
        input: 
          group: abcd
          destination: stream
          content-type: application/json
          errorChannelEnabled: true
          consumer:
            headerMode: raw

我在 listener 和 errorChannel 都得到了带有垃圾字符的输出

我正在尝试提取 errorChannel 中的原始消息。这是转换字节消息的正确方法吗?

Message recieved: ?contentType "application/json"{"aa":"cc"}

AWS Kinesis 不提供任何 headers 实体。因此,为了在 Spring Cloud Stream 中利用此类功能,我们将 嵌入 headers 到 Kinesis 记录的 body 中。为此,在 Kinesis Binder 中默认 headerModeembeddedHeaders。为了生产者和消费者之间的对称,这个选项不能改变。

框架为目标 @StreamListener 渠道提供 out-of-the-box EmbeddedHeadersChannelInterceptor 并且嵌入 headers 被提取并正确填充到要发送的消息中。

当我们处理 errorChannel 中的错误时,我们确实有一个 errorMessage.getOriginalMessage() 作为 non-transformed - 原始。因此,该消息的 payload 是来自包含嵌入 headers.

的记录 body 的 byte[]

如果您想正确解析它们。你应该使用实用程序:

EmbeddedHeaderUtils.extractHeaders((Message<byte[]>) message, true);