Spring 从 kinesis 收到的消息中的云流特殊字符
Spring cloud stream Special Chars in Message received from kinesis
当我使用来自运动流的消息时。我用 headers 等
得到了一些垃圾字符
@StreamListener(Processor.INPUT)
public void receive(String message) {
System.out.println("Message recieved: "+message);
throw new RuntimeException("Exception thrown");
}
@StreamListener("errorChannel")
public void transform(ErrorMessage errorMessage) throws UnsupportedEncodingException {
//original paylaod
System.out.println("Error Oiginal Message Payload"+new String((byte[])errorMessage.getOriginalMessage().getPayload(), "UTF-8"));
System.out.println("Error Original Message Stream channel"+errorMessage.getOriginalMessage().getHeaders().get("aws_receivedStream"));
}
应用程序 yml
spring:
cloud:
stream:
bindings:
input:
group: abcd
destination: stream
content-type: application/json
errorChannelEnabled: true
consumer:
headerMode: raw
我在 listener 和 errorChannel 都得到了带有垃圾字符的输出
我正在尝试提取 errorChannel 中的原始消息。这是转换字节消息的正确方法吗?
Message recieved: ?contentType "application/json"{"aa":"cc"}
AWS Kinesis 不提供任何 headers 实体。因此,为了在 Spring Cloud Stream 中利用此类功能,我们将 嵌入 headers 到 Kinesis 记录的 body 中。为此,在 Kinesis Binder 中默认 headerMode
为 embeddedHeaders
。为了生产者和消费者之间的对称,这个选项不能改变。
框架为目标 @StreamListener
渠道提供 out-of-the-box EmbeddedHeadersChannelInterceptor
并且嵌入 headers 被提取并正确填充到要发送的消息中。
当我们处理 errorChannel
中的错误时,我们确实有一个 errorMessage.getOriginalMessage()
作为 non-transformed - 原始。因此,该消息的 payload
是来自包含嵌入 headers.
的记录 body 的 byte[]
如果您想正确解析它们。你应该使用实用程序:
EmbeddedHeaderUtils.extractHeaders((Message<byte[]>) message, true);
当我使用来自运动流的消息时。我用 headers 等
得到了一些垃圾字符 @StreamListener(Processor.INPUT)
public void receive(String message) {
System.out.println("Message recieved: "+message);
throw new RuntimeException("Exception thrown");
}
@StreamListener("errorChannel")
public void transform(ErrorMessage errorMessage) throws UnsupportedEncodingException {
//original paylaod
System.out.println("Error Oiginal Message Payload"+new String((byte[])errorMessage.getOriginalMessage().getPayload(), "UTF-8"));
System.out.println("Error Original Message Stream channel"+errorMessage.getOriginalMessage().getHeaders().get("aws_receivedStream"));
}
应用程序 yml
spring:
cloud:
stream:
bindings:
input:
group: abcd
destination: stream
content-type: application/json
errorChannelEnabled: true
consumer:
headerMode: raw
我在 listener 和 errorChannel 都得到了带有垃圾字符的输出
我正在尝试提取 errorChannel 中的原始消息。这是转换字节消息的正确方法吗?
Message recieved: ?contentType "application/json"{"aa":"cc"}
AWS Kinesis 不提供任何 headers 实体。因此,为了在 Spring Cloud Stream 中利用此类功能,我们将 嵌入 headers 到 Kinesis 记录的 body 中。为此,在 Kinesis Binder 中默认 headerMode
为 embeddedHeaders
。为了生产者和消费者之间的对称,这个选项不能改变。
框架为目标 @StreamListener
渠道提供 out-of-the-box EmbeddedHeadersChannelInterceptor
并且嵌入 headers 被提取并正确填充到要发送的消息中。
当我们处理 errorChannel
中的错误时,我们确实有一个 errorMessage.getOriginalMessage()
作为 non-transformed - 原始。因此,该消息的 payload
是来自包含嵌入 headers.
byte[]
如果您想正确解析它们。你应该使用实用程序:
EmbeddedHeaderUtils.extractHeaders((Message<byte[]>) message, true);