Spring 引导集成 - 在连接初始化时发送问候语
Spring Boot Integration - Send greet on connection initialization
我正在使用 Spring 引导集成框架编写一个简单的 TCP 服务器应用程序,它必须在连接建立时向客户端发送问候语。工作流程应该如下:
- 客户端连接到服务器
- 服务器向客户端发送问候消息"Hello client"
- 客户端向服务器发送消息"Any string"
- 服务器响应 "OK"
- ...
目前第 1、3 和 4 步正在运行,但我在第 2 步上失败了。
到目前为止我的代码:
@SpringBootApplication
@EnableIntegration
public class ExampleApp {
public static void main(String[] args) {
SpringApplication.run(ExampleApp.class, args);
}
// Create listener on port 1234
@Bean
public AbstractServerConnectionFactory serverConnectionFactory() {
TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(1234);
return tcpNetServerConnectionFactory;
}
// Inbound channel
@Bean
public MessageChannel requestChannel() {
return new DirectChannel();
}
// Outbound channel
@Bean
public MessageChannel replyChannel() {
return new DirectChannel();
}
// Inbound gateway
@Bean
public TcpInboundGateway tcpInboundGateway(AbstractServerConnectionFactory serverConnectionFactory, MessageChannel requestChannel, MessageChannel replyChannel) {
TcpInboundGateway tcpInboundGateway = new TcpInboundGateway();
tcpInboundGateway.setConnectionFactory(serverConnectionFactory);
tcpInboundGateway.setRequestChannel(requestChannel);
tcpInboundGateway.setReplyChannel(replyChannel);
return tcpInboundGateway;
}
// Send reply for incoming message -> working
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public Message<String> processMessage(Message<String> message) {
Message reply = MessageBuilder
.withPayload("OK")
.setHeader(IpHeaders.CONNECTION_ID, message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class))
.build();
return reply;
}
// Send greeting -> not working
@Bean
public ApplicationListener<TcpConnectionEvent> listener(MessageChannel replyChannel) {
return tcpConnectionEvent -> {
if (tcpConnectionEvent instanceof TcpConnectionOpenEvent) {
Message<String> message = MessageBuilder
.withPayload("Hello client")
.setHeader(IpHeaders.CONNECTION_ID, tcpConnectionEvent.getConnectionId())
.build();
replyChannel.send(message);
}
};
}
}
如果我使用
连接到服务器
nc -C localhost 1234
连接已建立,但我在日志中收到以下错误:
Failed to publish TcpConnectionOpenEvent [source=TcpNetConnection:localhost:37656:1234:187cfbc2-7e5d-4f4e-97de-1a3b55a4e264], [factory=serverConnectionFactory, connectionId=localhost:37656:1234:187cfbc2-7e5d-4f4e-97de-1a3b55a4e264] OPENED:Dispatcher has no subscribers for channel 'application.replyChannel'
如果我向服务器发送一个字符串,他会按预期回复 "OK"。
我缺少什么才能让这条问候语正常工作?
解决方案
感谢 Gary Russel 的评论,我找到了解决方案。入站网关必须 "split" 进入 Inbound/Outbound 通道适配器对。这是完整的工作示例:
@SpringBootApplication
@EnableIntegration
public class ExampleApp {
public static void main(String[] args) {
SpringApplication.run(ExampleApp.class, args);
}
// Create listener on port 1234
@Bean
public AbstractServerConnectionFactory serverConnectionFactory() {
TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(1234);
return tcpNetServerConnectionFactory;
}
// Inbound channel
@Bean
public MessageChannel requestChannel() {
return new DirectChannel();
}
// Outbound channel
@Bean
public MessageChannel replyChannel() {
return new DirectChannel();
}
// Inbound channel adapter
@Bean
public TcpReceivingChannelAdapter receivingChannelAdapter(AbstractServerConnectionFactory serverConnectionFactory, MessageChannel requestChannel) {
TcpReceivingChannelAdapter tcpReceivingChannelAdapter = new TcpReceivingChannelAdapter();
tcpReceivingChannelAdapter.setConnectionFactory(serverConnectionFactory);
tcpReceivingChannelAdapter.setOutputChannel(requestChannel);
return tcpReceivingChannelAdapter;
}
// Outbound channel adapter
@Bean
@ServiceActivator(inputChannel = "replyChannel")
public TcpSendingMessageHandler tcpSendingMessageHandler(AbstractServerConnectionFactory serverConnectionFactory) {
TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
tcpSendingMessageHandler.setConnectionFactory(serverConnectionFactory);
return tcpSendingMessageHandler;
}
// Send reply for incoming message -> working
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public Message<String> processMessage(Message<String> message) {
Message<String> reply = MessageBuilder
.withPayload("OK")
.setHeader(IpHeaders.CONNECTION_ID, message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class))
.build();
return reply;
}
// Send greeting -> now working
@Bean
public ApplicationListener<TcpConnectionEvent> listener(MessageChannel replyChannel) {
return tcpConnectionEvent -> {
if (tcpConnectionEvent instanceof TcpConnectionOpenEvent) {
Message<String> message = MessageBuilder
.withPayload("Hello client")
.setHeader(IpHeaders.CONNECTION_ID, tcpConnectionEvent.getConnectionId())
.build();
replyChannel.send(message);
}
};
}
}
现在,客户端在建立连接时收到问候 "Hello client",在每次发送消息时收到 "OK" 回复。
replyChannel.send(message);
你不能那样做;当第一个请求进入时,回复通道被连接起来。
在任何情况下,您都不能使用那样的网关,回复通道用于回复请求而不是发送一些任意消息。
您必须使用一对 inbound/outbound 通道适配器而不是网关来启用任意双向通信。
我正在使用 Spring 引导集成框架编写一个简单的 TCP 服务器应用程序,它必须在连接建立时向客户端发送问候语。工作流程应该如下:
- 客户端连接到服务器
- 服务器向客户端发送问候消息"Hello client"
- 客户端向服务器发送消息"Any string"
- 服务器响应 "OK"
- ...
目前第 1、3 和 4 步正在运行,但我在第 2 步上失败了。 到目前为止我的代码:
@SpringBootApplication
@EnableIntegration
public class ExampleApp {
public static void main(String[] args) {
SpringApplication.run(ExampleApp.class, args);
}
// Create listener on port 1234
@Bean
public AbstractServerConnectionFactory serverConnectionFactory() {
TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(1234);
return tcpNetServerConnectionFactory;
}
// Inbound channel
@Bean
public MessageChannel requestChannel() {
return new DirectChannel();
}
// Outbound channel
@Bean
public MessageChannel replyChannel() {
return new DirectChannel();
}
// Inbound gateway
@Bean
public TcpInboundGateway tcpInboundGateway(AbstractServerConnectionFactory serverConnectionFactory, MessageChannel requestChannel, MessageChannel replyChannel) {
TcpInboundGateway tcpInboundGateway = new TcpInboundGateway();
tcpInboundGateway.setConnectionFactory(serverConnectionFactory);
tcpInboundGateway.setRequestChannel(requestChannel);
tcpInboundGateway.setReplyChannel(replyChannel);
return tcpInboundGateway;
}
// Send reply for incoming message -> working
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public Message<String> processMessage(Message<String> message) {
Message reply = MessageBuilder
.withPayload("OK")
.setHeader(IpHeaders.CONNECTION_ID, message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class))
.build();
return reply;
}
// Send greeting -> not working
@Bean
public ApplicationListener<TcpConnectionEvent> listener(MessageChannel replyChannel) {
return tcpConnectionEvent -> {
if (tcpConnectionEvent instanceof TcpConnectionOpenEvent) {
Message<String> message = MessageBuilder
.withPayload("Hello client")
.setHeader(IpHeaders.CONNECTION_ID, tcpConnectionEvent.getConnectionId())
.build();
replyChannel.send(message);
}
};
}
}
如果我使用
连接到服务器nc -C localhost 1234
连接已建立,但我在日志中收到以下错误:
Failed to publish TcpConnectionOpenEvent [source=TcpNetConnection:localhost:37656:1234:187cfbc2-7e5d-4f4e-97de-1a3b55a4e264], [factory=serverConnectionFactory, connectionId=localhost:37656:1234:187cfbc2-7e5d-4f4e-97de-1a3b55a4e264] OPENED:Dispatcher has no subscribers for channel 'application.replyChannel'
如果我向服务器发送一个字符串,他会按预期回复 "OK"。
我缺少什么才能让这条问候语正常工作?
解决方案
感谢 Gary Russel 的评论,我找到了解决方案。入站网关必须 "split" 进入 Inbound/Outbound 通道适配器对。这是完整的工作示例:
@SpringBootApplication
@EnableIntegration
public class ExampleApp {
public static void main(String[] args) {
SpringApplication.run(ExampleApp.class, args);
}
// Create listener on port 1234
@Bean
public AbstractServerConnectionFactory serverConnectionFactory() {
TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(1234);
return tcpNetServerConnectionFactory;
}
// Inbound channel
@Bean
public MessageChannel requestChannel() {
return new DirectChannel();
}
// Outbound channel
@Bean
public MessageChannel replyChannel() {
return new DirectChannel();
}
// Inbound channel adapter
@Bean
public TcpReceivingChannelAdapter receivingChannelAdapter(AbstractServerConnectionFactory serverConnectionFactory, MessageChannel requestChannel) {
TcpReceivingChannelAdapter tcpReceivingChannelAdapter = new TcpReceivingChannelAdapter();
tcpReceivingChannelAdapter.setConnectionFactory(serverConnectionFactory);
tcpReceivingChannelAdapter.setOutputChannel(requestChannel);
return tcpReceivingChannelAdapter;
}
// Outbound channel adapter
@Bean
@ServiceActivator(inputChannel = "replyChannel")
public TcpSendingMessageHandler tcpSendingMessageHandler(AbstractServerConnectionFactory serverConnectionFactory) {
TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
tcpSendingMessageHandler.setConnectionFactory(serverConnectionFactory);
return tcpSendingMessageHandler;
}
// Send reply for incoming message -> working
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public Message<String> processMessage(Message<String> message) {
Message<String> reply = MessageBuilder
.withPayload("OK")
.setHeader(IpHeaders.CONNECTION_ID, message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class))
.build();
return reply;
}
// Send greeting -> now working
@Bean
public ApplicationListener<TcpConnectionEvent> listener(MessageChannel replyChannel) {
return tcpConnectionEvent -> {
if (tcpConnectionEvent instanceof TcpConnectionOpenEvent) {
Message<String> message = MessageBuilder
.withPayload("Hello client")
.setHeader(IpHeaders.CONNECTION_ID, tcpConnectionEvent.getConnectionId())
.build();
replyChannel.send(message);
}
};
}
}
现在,客户端在建立连接时收到问候 "Hello client",在每次发送消息时收到 "OK" 回复。
replyChannel.send(message);
你不能那样做;当第一个请求进入时,回复通道被连接起来。
在任何情况下,您都不能使用那样的网关,回复通道用于回复请求而不是发送一些任意消息。
您必须使用一对 inbound/outbound 通道适配器而不是网关来启用任意双向通信。