如何在 Spring Integration DSL Tcp 中使用消息头进行路由
How to route using message headers in Spring Integration DSL Tcp
我有 2 个服务器端服务,我想使用消息头将消息路由到它们,远程客户端将服务标识放入字段 type
。
来自服务器端配置的代码片段是正确的方法吗?它抛出强制转换异常,表明 route()
只看到有效载荷,但看不到消息头。此外,Spring 集成手册中的所有示例仅显示基于负载的决策。
@Bean
public IntegrationFlow serverFlow( // common flow for all my services, currently 2
TcpNetServerConnectionFactory serverConnectionFactory,
HeartbeatServer heartbeatServer,
FeedServer feedServer) {
return IntegrationFlows
.from(Tcp.inboundGateway(serverConnectionFactory))
.<Message<?>, String>route((m) -> m.getHeaders().get("type", String.class),
(routeSpec) -> routeSpec
.subFlowMapping("hearbeat", subflow -> subflow.handle(heartbeatServer::processRequest))
.subFlowMapping("feed", subflow -> subflow.handle(feedServer::consumeFeed)))
.get();
}
客户端配置:
@Bean
public IntegrationFlow heartbeatClientFlow(
TcpNetClientConnectionFactory clientConnectionFactory,
HeartbeatClient heartbeatClient) {
return IntegrationFlows.from(heartbeatClient::send, e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
.enrichHeaders(c -> c.header("type", "heartbeat"))
.log()
.handle(outboundGateway(clientConnectionFactory))
.handle(heartbeatClient::receive)
.get();
}
@Bean
public IntegrationFlow feedClientFlow(
TcpNetClientConnectionFactory clientConnectionFactory) {
return IntegrationFlows.from(FeedClient.MessageGateway.class)
.enrichHeaders(c -> c.header("type", "feed"))
.log()
.handle(outboundGateway(clientConnectionFactory))
.get();
}
和往常一样,这里是完整的演示 project code, ClientConfig and ServerConfig。
没有通过原始 TCP 发送 headers 的标准方法。您需要以某种方式将它们编码到有效载荷中(并在服务器端提取它们)。
框架提供了一种机制来为您执行此操作,但它需要额外的配置。
具体...
The MapJsonSerializer
uses a Jackson ObjectMapper to convert between a Map and JSON. You can use this serializer in conjunction with a MessageConvertingTcpMessageMapper
and a MapMessageConverter
to transfer selected headers and the payload in JSON.
我会抽时间创建一个示例来说明如何使用它。
但是,当然,您可以自己滚动 encoding/decoding。
编辑
这是使用 JSON 通过 TCP 传送消息 headers 的示例配置...
@SpringBootApplication
public class TcpWithHeadersApplication {
public static void main(String[] args) {
SpringApplication.run(TcpWithHeadersApplication.class, args);
}
// Client side
public interface TcpExchanger {
public String exchange(String data, @Header("type") String type);
}
@Bean
public IntegrationFlow client(@Value("${tcp.port:1234}") int port) {
return IntegrationFlows.from(TcpExchanger.class)
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.deserializer(jsonMapping())
.serializer(jsonMapping())
.mapper(mapper())))
.get();
}
// Server side
@Bean
public IntegrationFlow server(@Value("${tcp.port:1234}") int port) {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(port)
.deserializer(jsonMapping())
.serializer(jsonMapping())
.mapper(mapper())))
.log(Level.INFO, "exampleLogger", "'Received type header:' + headers['type']")
.route("headers['type']", r -> r
.subFlowMapping("upper",
subFlow -> subFlow.transform(String.class, p -> p.toUpperCase()))
.subFlowMapping("lower",
subFlow -> subFlow.transform(String.class, p -> p.toLowerCase())))
.get();
}
// Common
@Bean
public MessageConvertingTcpMessageMapper mapper() {
MapMessageConverter converter = new MapMessageConverter();
converter.setHeaderNames("type");
return new MessageConvertingTcpMessageMapper(converter);
}
@Bean
public MapJsonSerializer jsonMapping() {
return new MapJsonSerializer();
}
// Console
@Bean
@DependsOn("client")
public ApplicationRunner runner(TcpExchanger exchanger,
ConfigurableApplicationContext context) {
return args -> {
System.out.println("Enter some text; if it starts with a lower case character,\n"
+ "it will be uppercased by the server; otherwise it will be lowercased;\n"
+ "enter 'quit' to end");
Scanner scanner = new Scanner(System.in);
String request = scanner.nextLine();
while (!"quit".equals(request.toLowerCase())) {
if (StringUtils.hasText(request)) {
String result = exchanger.exchange(request,
Character.isLowerCase(request.charAt(0)) ? "upper" : "lower");
System.out.println(result);
}
request = scanner.nextLine();
}
scanner.close();
context.close();
};
}
}
我有 2 个服务器端服务,我想使用消息头将消息路由到它们,远程客户端将服务标识放入字段 type
。
来自服务器端配置的代码片段是正确的方法吗?它抛出强制转换异常,表明 route()
只看到有效载荷,但看不到消息头。此外,Spring 集成手册中的所有示例仅显示基于负载的决策。
@Bean
public IntegrationFlow serverFlow( // common flow for all my services, currently 2
TcpNetServerConnectionFactory serverConnectionFactory,
HeartbeatServer heartbeatServer,
FeedServer feedServer) {
return IntegrationFlows
.from(Tcp.inboundGateway(serverConnectionFactory))
.<Message<?>, String>route((m) -> m.getHeaders().get("type", String.class),
(routeSpec) -> routeSpec
.subFlowMapping("hearbeat", subflow -> subflow.handle(heartbeatServer::processRequest))
.subFlowMapping("feed", subflow -> subflow.handle(feedServer::consumeFeed)))
.get();
}
客户端配置:
@Bean
public IntegrationFlow heartbeatClientFlow(
TcpNetClientConnectionFactory clientConnectionFactory,
HeartbeatClient heartbeatClient) {
return IntegrationFlows.from(heartbeatClient::send, e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
.enrichHeaders(c -> c.header("type", "heartbeat"))
.log()
.handle(outboundGateway(clientConnectionFactory))
.handle(heartbeatClient::receive)
.get();
}
@Bean
public IntegrationFlow feedClientFlow(
TcpNetClientConnectionFactory clientConnectionFactory) {
return IntegrationFlows.from(FeedClient.MessageGateway.class)
.enrichHeaders(c -> c.header("type", "feed"))
.log()
.handle(outboundGateway(clientConnectionFactory))
.get();
}
和往常一样,这里是完整的演示 project code, ClientConfig and ServerConfig。
没有通过原始 TCP 发送 headers 的标准方法。您需要以某种方式将它们编码到有效载荷中(并在服务器端提取它们)。
框架提供了一种机制来为您执行此操作,但它需要额外的配置。
具体...
The
MapJsonSerializer
uses a Jackson ObjectMapper to convert between a Map and JSON. You can use this serializer in conjunction with aMessageConvertingTcpMessageMapper
and aMapMessageConverter
to transfer selected headers and the payload in JSON.
我会抽时间创建一个示例来说明如何使用它。
但是,当然,您可以自己滚动 encoding/decoding。
编辑
这是使用 JSON 通过 TCP 传送消息 headers 的示例配置...
@SpringBootApplication
public class TcpWithHeadersApplication {
public static void main(String[] args) {
SpringApplication.run(TcpWithHeadersApplication.class, args);
}
// Client side
public interface TcpExchanger {
public String exchange(String data, @Header("type") String type);
}
@Bean
public IntegrationFlow client(@Value("${tcp.port:1234}") int port) {
return IntegrationFlows.from(TcpExchanger.class)
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", port)
.deserializer(jsonMapping())
.serializer(jsonMapping())
.mapper(mapper())))
.get();
}
// Server side
@Bean
public IntegrationFlow server(@Value("${tcp.port:1234}") int port) {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(port)
.deserializer(jsonMapping())
.serializer(jsonMapping())
.mapper(mapper())))
.log(Level.INFO, "exampleLogger", "'Received type header:' + headers['type']")
.route("headers['type']", r -> r
.subFlowMapping("upper",
subFlow -> subFlow.transform(String.class, p -> p.toUpperCase()))
.subFlowMapping("lower",
subFlow -> subFlow.transform(String.class, p -> p.toLowerCase())))
.get();
}
// Common
@Bean
public MessageConvertingTcpMessageMapper mapper() {
MapMessageConverter converter = new MapMessageConverter();
converter.setHeaderNames("type");
return new MessageConvertingTcpMessageMapper(converter);
}
@Bean
public MapJsonSerializer jsonMapping() {
return new MapJsonSerializer();
}
// Console
@Bean
@DependsOn("client")
public ApplicationRunner runner(TcpExchanger exchanger,
ConfigurableApplicationContext context) {
return args -> {
System.out.println("Enter some text; if it starts with a lower case character,\n"
+ "it will be uppercased by the server; otherwise it will be lowercased;\n"
+ "enter 'quit' to end");
Scanner scanner = new Scanner(System.in);
String request = scanner.nextLine();
while (!"quit".equals(request.toLowerCase())) {
if (StringUtils.hasText(request)) {
String result = exchanger.exchange(request,
Character.isLowerCase(request.charAt(0)) ? "upper" : "lower");
System.out.println(result);
}
request = scanner.nextLine();
}
scanner.close();
context.close();
};
}
}