Spring 带有 Rabbitmq 消息传递的云流 IntegrationFlow,消费者提供 ASCII 数字作为消息负载
Spring cloud stream IntegrationFlow with Rabbitmq messaging, The consumer giving ASCII numbers as message payload
我正在使用 spring 云流进行消息传递。在消费者部分,我使用 IntegrationFlow 来监听队列。它正在监听并打印来自生产者方的消息。但是格式不同,这就是我现在面临的问题。生产者的内容类型是 application/json 并且 IntegrationFLow 消息负载显示 ASCII 数字。下面给出了为消费者编写的代码
@EnableBinding(UserOperationConsume.class)
public class ConsumerController {
@Bean
IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
return IntegrationFlows
.from(u.userRegistraionProduces())
//.transform(Transformers.toJson()) // not working as expected
//.transform(Transformers.fromJson(UserDTO.class))
.handle(String.class, (payload, headers) -> {
System.out.println(payload.toString()); // here the output is 123,34,105,100,34,58,49,44,34,110,97,109,101,34,58,34,86,105,115,104,110,117,34,44,34,101,109,97,105,108,34,58,34,118...
return null;
}).get();
}
}
输入界面是,
public interface UserOperationConsume {
@Input
public SubscribableChannel userRegistraionProduces();
}
而消费者 yml 配置是,
server:
port: 8181
spring:
application:
name: nets-alert-service
---
spring:
cloud:
config:
name: notification-service
uri: http://localhost:8888
---
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
---
spring:
cloud:
stream:
bindings:
userRegistraionProduces:
destination: userOperations
input:
content-type: application/json
我试过Sink.class绑定,那一次我从队列中得到了准确的消息。因此,如果此 IntegrationFlow 配置中有任何错误,请告诉我。因为我是 spring 云流和 IntegrationFlow 的新手。有什么方法可以将此 ascii 转换为精确的字符串吗?
提前致谢
使用 IntegrationFlows.from(channel)
不提供任何转换提示,因此您只需获取原始 byte[]
负载(包含 JSON)。目前尚不清楚您为什么要使用 toJson()
变压器。
您的 .handle(String.class, (payload, headers) -> {...
导致使用一个简单的 ArrayToStringConverter
,这就是您看到每个字节值的原因。
无论如何,您没有正确使用框架。使用...
@StreamListener("userRegistraionProduces")
public void listen(UserDTO dto) {
System.out.println(dto);
}
...框架将为您完成转换。或者...
@StreamListener("userRegistraionProduces")
public void listen(Message<UserDTO> dtoMessage) {
System.out.println(dtoMessage);
}
如果您的制作人在 headers 中传达了额外的信息。
编辑
如果您更喜欢自己进行转换,这很好...
@Bean
IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
return IntegrationFlows.from(u.userRegistraionProduces())
.transform(Transformers.fromJson(UserDTO.class))
.handle((payload, headers) -> {
System.out.println(payload.toString());
return null;
}).get();
}
...因为 Json 转换器可以读取 byte[]
.
我正在使用 spring 云流进行消息传递。在消费者部分,我使用 IntegrationFlow 来监听队列。它正在监听并打印来自生产者方的消息。但是格式不同,这就是我现在面临的问题。生产者的内容类型是 application/json 并且 IntegrationFLow 消息负载显示 ASCII 数字。下面给出了为消费者编写的代码
@EnableBinding(UserOperationConsume.class)
public class ConsumerController {
@Bean
IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
return IntegrationFlows
.from(u.userRegistraionProduces())
//.transform(Transformers.toJson()) // not working as expected
//.transform(Transformers.fromJson(UserDTO.class))
.handle(String.class, (payload, headers) -> {
System.out.println(payload.toString()); // here the output is 123,34,105,100,34,58,49,44,34,110,97,109,101,34,58,34,86,105,115,104,110,117,34,44,34,101,109,97,105,108,34,58,34,118...
return null;
}).get();
}
}
输入界面是,
public interface UserOperationConsume {
@Input
public SubscribableChannel userRegistraionProduces();
}
而消费者 yml 配置是,
server:
port: 8181
spring:
application:
name: nets-alert-service
---
spring:
cloud:
config:
name: notification-service
uri: http://localhost:8888
---
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
---
spring:
cloud:
stream:
bindings:
userRegistraionProduces:
destination: userOperations
input:
content-type: application/json
我试过Sink.class绑定,那一次我从队列中得到了准确的消息。因此,如果此 IntegrationFlow 配置中有任何错误,请告诉我。因为我是 spring 云流和 IntegrationFlow 的新手。有什么方法可以将此 ascii 转换为精确的字符串吗? 提前致谢
使用 IntegrationFlows.from(channel)
不提供任何转换提示,因此您只需获取原始 byte[]
负载(包含 JSON)。目前尚不清楚您为什么要使用 toJson()
变压器。
您的 .handle(String.class, (payload, headers) -> {...
导致使用一个简单的 ArrayToStringConverter
,这就是您看到每个字节值的原因。
无论如何,您没有正确使用框架。使用...
@StreamListener("userRegistraionProduces")
public void listen(UserDTO dto) {
System.out.println(dto);
}
...框架将为您完成转换。或者...
@StreamListener("userRegistraionProduces")
public void listen(Message<UserDTO> dtoMessage) {
System.out.println(dtoMessage);
}
如果您的制作人在 headers 中传达了额外的信息。
编辑
如果您更喜欢自己进行转换,这很好...
@Bean
IntegrationFlow consumerIntgrationFlow(UserOperationConsume u) {
return IntegrationFlows.from(u.userRegistraionProduces())
.transform(Transformers.fromJson(UserDTO.class))
.handle((payload, headers) -> {
System.out.println(payload.toString());
return null;
}).get();
}
...因为 Json 转换器可以读取 byte[]
.