如何在 Spring Integration DSL 中实现简单的 echo socket 服务
How to implement simple echo socket service in Spring Integration DSL
拜托,
你能帮助在 Spring 集成 DSL 中实现一个简单的、回声风格的 Heartbeat TCP 套接字服务吗?更准确地说,如何在客户端和服务器端将 Adapter/Handler/Gateway 插入 IntegrationFlows
。 Spring 集成 DSL 和 TCP/IP client/server 通信很难找到实际示例。
我想,我已经掌握了大部分代码,只是将 IntegrationFlow
.
中的所有内容整合在一起而已
SI 示例中有一个示例 echo 服务,但它是在 "old" XML 配置中编写的,我真的很难将其转换为通过代码配置。
我的 Heartbeat 服务是一个简单的服务器,等待客户端请求 "status",并以 "OK" 响应。
没有@ServiceActivator
,没有@MessageGateways
,没有代理,一切都明确而冗长;由客户端的普通 JDK 计划执行程序驱动;服务器和客户端在单独的配置和项目中。
HeartbeatClientConfig
@Configuration
@EnableIntegration
public class HeartbeatClientConfig {
@Bean
public MessageChannel outboudChannel() {
return new DirectChannel();
}
@Bean
public PollableChannel inboundChannel() {
return new QueueChannel();
}
@Bean
public TcpNetClientConnectionFactory connectionFactory() {
TcpNetClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 7777);
return connectionFactory;
}
@Bean
public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
TcpNetClientConnectionFactory connectionFactory,
MessageChannel inboundChannel) {
TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
heartbeatReceivingMessageAdapter.setOutputChannel(inboundChannel); // ???
heartbeatReceivingMessageAdapter.setClientMode(true);
return heartbeatReceivingMessageAdapter;
}
@Bean
public TcpSendingMessageHandler heartbeatSendingMessageHandler(
TcpNetClientConnectionFactory connectionFactory) {
TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
return heartbeatSendingMessageHandler;
}
@Bean
public IntegrationFlow heartbeatClientFlow(
TcpNetClientConnectionFactory connectionFactory,
TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
TcpSendingMessageHandler heartbeatSendingMessageHandler,
MessageChannel outboudChannel) {
return IntegrationFlows
.from(outboudChannel) // ??????
.// adapter ???????????
.// gateway ???????????
.// handler ???????????
.get();
}
@Bean
public HeartbeatClient heartbeatClient(
MessageChannel outboudChannel,
PollableChannel inboundChannel) {
return new HeartbeatClient(outboudChannel, inboundChannel);
}
}
HeartbeatClient
public class HeartbeatClient {
private final MessageChannel outboudChannel;
private final PollableChannel inboundChannel;
private final Logger log = LogManager.getLogger(HeartbeatClient.class);
public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
@EventListener
public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
log.info("Starting Heartbeat client...");
start();
}
public void start() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
while (true) {
try {
log.info("Sending Heartbeat");
outboudChannel.send(new GenericMessage<String>("status"));
Message<?> message = inboundChannel.receive(1000);
if (message == null) {
log.error("Heartbeat timeouted");
} else {
String messageStr = new String((byte[]) message.getPayload());
if (messageStr.equals("OK")) {
log.info("Heartbeat OK response received");
} else {
log.error("Unexpected message content from server: " + messageStr);
}
}
} catch (Exception e) {
log.error(e);
}
}
}, 0, 10000, TimeUnit.SECONDS);
}
}
HeartbeatServerConfig
@Configuration
@EnableIntegration
public class HeartbeatServerConfig {
@Bean
public MessageChannel outboudChannel() {
return new DirectChannel();
}
@Bean
public PollableChannel inboundChannel() {
return new QueueChannel();
}
@Bean
public TcpNetServerConnectionFactory connectionFactory() {
TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
return connectionFactory;
}
@Bean
public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
TcpNetServerConnectionFactory connectionFactory,
MessageChannel outboudChannel) {
TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
heartbeatReceivingMessageAdapter.setOutputChannel(outboudChannel);
return heartbeatReceivingMessageAdapter;
}
@Bean
public TcpSendingMessageHandler heartbeatSendingMessageHandler(
TcpNetServerConnectionFactory connectionFactory) {
TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
return heartbeatSendingMessageHandler;
}
@Bean
public IntegrationFlow heartbeatServerFlow(
TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
TcpSendingMessageHandler heartbeatSendingMessageHandler,
MessageChannel outboudChannel) {
return IntegrationFlows
.from(heartbeatReceivingMessageAdapter) // ???????????????
.handle(heartbeatSendingMessageHandler) // ???????????????
.get();
}
@Bean
public HeartbeatServer heartbeatServer(
PollableChannel inboundChannel,
MessageChannel outboudChannel) {
return new HeartbeatServer(inboundChannel, outboudChannel);
}
}
心跳服务器
public class HeartbeatServer {
private final PollableChannel inboundChannel;
private final MessageChannel outboudChannel;
private final Logger log = LogManager.getLogger(HeartbeatServer.class);
public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
@EventListener
public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
log.info("Starting Heartbeat");
start();
}
public void start() {
Executors.newSingleThreadExecutor().execute(() -> {
while (true) {
try {
Message<?> message = inboundChannel.receive(1000);
if (message == null) {
log.error("Heartbeat timeouted");
} else {
String messageStr = new String((byte[]) message.getPayload());
if (messageStr.equals("status")) {
log.info("Heartbeat received");
outboudChannel.send(new GenericMessage<>("OK"));
} else {
log.error("Unexpected message content from client: " + messageStr);
}
}
} catch (Exception e) {
log.error(e);
}
}
});
}
}
加分题
为什么可以在 TcpReceivingChannelAdapter(入站适配器)上设置通道而不是 TcpSendingMessageHandler(出站适配器)?
更新
这是完整的项目源代码,如果有人对任何人感兴趣 git 克隆它:
https://bitbucket.org/espinosa/spring-integration-tcp-demo
我会尝试将所有建议的解决方案放在那里。
有了 DSL 就简单多了...
@SpringBootApplication
@EnableScheduling
public class So55154418Application {
public static void main(String[] args) {
SpringApplication.run(So55154418Application.class, args);
}
@Bean
public IntegrationFlow server() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234)))
.transform(Transformers.objectToString())
.log()
.handle((p, h) -> "OK")
.get();
}
@Bean
public IntegrationFlow client() {
return IntegrationFlows.from(Gate.class)
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
.transform(Transformers.objectToString())
.handle((p, h) -> {
System.out.println("Received:" + p);
return null;
})
.get();
}
@Bean
@DependsOn("client")
public Runner runner(Gate gateway) {
return new Runner(gateway);
}
public static class Runner {
private final Gate gateway;
public Runner(Gate gateway) {
this.gateway = gateway;
}
@Scheduled(fixedDelay = 5000)
public void run() {
this.gateway.send("foo");
}
}
public interface Gate {
void send(String out);
}
}
或者,从 Gate 方法获取回复...
@Bean
public IntegrationFlow client() {
return IntegrationFlows.from(Gate.class)
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
.transform(Transformers.objectToString())
.get();
}
@Bean
@DependsOn("client")
public Runner runner(Gate gateway) {
return new Runner(gateway);
}
public static class Runner {
private final Gate gateway;
public Runner(Gate gateway) {
this.gateway = gateway;
}
@Scheduled(fixedDelay = 5000)
public void run() {
String reply = this.gateway.sendAndReceive("foo"); // null for timeout
System.out.println("Received:" + reply);
}
}
public interface Gate {
@Gateway(replyTimeout = 5000)
String sendAndReceive(String out);
}
奖金:
消费端点实际上由 2 个 bean 组成;消费者和消息处理程序。渠道继续消费者。参见 。
编辑
另一种方法,用于客户端的单个 bean...
@Bean
public IntegrationFlow client() {
return IntegrationFlows.from(() -> "foo",
e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
.transform(Transformers.objectToString())
.handle((p, h) -> {
System.out.println("Received:" + p);
return null;
})
.get();
}
对于任何感兴趣的人,这是我在 Gary Russell 的帮助下制定的可行解决方案之一。 所有功劳归功于 Gary Russell。 Full project source code here.
亮点:
- IntegrationFlows:仅使用入站和出站网关。
- 不需要适配器或通道;没有 ServiceActivators 或 Message Gate 代理。
- 不需要ScheduledExecutor或Executors;客户端和服务器代码很重要
- IntegrationFlows 直接调用客户端 class 和服务器 class 上的方法;我喜欢这种显式连接
- 拆分客户端class两部分,两种方法:请求产生部分和响应处理部分;这样它可以更好地链接到流。
- 明确定义clientConnectionFactory/serverConnectionFactory。这样以后可以显式配置更多的东西。
HeartbeatClientConfig
@Bean
public IntegrationFlow heartbeatClientFlow(
TcpNetClientConnectionFactory clientConnectionFactory,
HeartbeatClient heartbeatClient) {
return IntegrationFlows.from(heartbeatClient::send, e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
.handle(Tcp.outboundGateway(clientConnectionFactory))
.handle(heartbeatClient::receive)
.get();
}
HeartbeatClient
public class HeartbeatClient {
private final Logger log = LogManager.getLogger(HeartbeatClient.class);
public GenericMessage<String> send() {
log.info("Sending Heartbeat");
return new GenericMessage<String>("status");
}
public Object receive(byte[] payload, MessageHeaders messageHeaders) { // LATER: use transformer() to receive String here
String messageStr = new String(payload);
if (messageStr.equals("OK")) {
log.info("Heartbeat OK response received");
} else {
log.error("Unexpected message content from server: " + messageStr);
}
return null;
}
}
HeartbeatServerConfig
@Bean
public IntegrationFlow heartbeatServerFlow(
TcpNetServerConnectionFactory serverConnectionFactory,
HeartbeatServer heartbeatServer) {
return IntegrationFlows
.from(Tcp.inboundGateway(serverConnectionFactory))
.handle(heartbeatServer::processRequest)
.get();
}
心跳服务器
public class HeartbeatServer {
private final Logger log = LogManager.getLogger(HeartbeatServer.class);
public Message<String> processRequest(byte[] payload, MessageHeaders messageHeaders) {
String messageStr = new String(payload);
if (messageStr.equals("status")) {
log.info("Heartbeat received");
return new GenericMessage<>("OK");
} else {
log.error("Unexpected message content from client: " + messageStr);
return null;
}
}
}
拜托,
你能帮助在 Spring 集成 DSL 中实现一个简单的、回声风格的 Heartbeat TCP 套接字服务吗?更准确地说,如何在客户端和服务器端将 Adapter/Handler/Gateway 插入 IntegrationFlows
。 Spring 集成 DSL 和 TCP/IP client/server 通信很难找到实际示例。
我想,我已经掌握了大部分代码,只是将 IntegrationFlow
.
SI 示例中有一个示例 echo 服务,但它是在 "old" XML 配置中编写的,我真的很难将其转换为通过代码配置。
我的 Heartbeat 服务是一个简单的服务器,等待客户端请求 "status",并以 "OK" 响应。
没有@ServiceActivator
,没有@MessageGateways
,没有代理,一切都明确而冗长;由客户端的普通 JDK 计划执行程序驱动;服务器和客户端在单独的配置和项目中。
HeartbeatClientConfig
@Configuration
@EnableIntegration
public class HeartbeatClientConfig {
@Bean
public MessageChannel outboudChannel() {
return new DirectChannel();
}
@Bean
public PollableChannel inboundChannel() {
return new QueueChannel();
}
@Bean
public TcpNetClientConnectionFactory connectionFactory() {
TcpNetClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 7777);
return connectionFactory;
}
@Bean
public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
TcpNetClientConnectionFactory connectionFactory,
MessageChannel inboundChannel) {
TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
heartbeatReceivingMessageAdapter.setOutputChannel(inboundChannel); // ???
heartbeatReceivingMessageAdapter.setClientMode(true);
return heartbeatReceivingMessageAdapter;
}
@Bean
public TcpSendingMessageHandler heartbeatSendingMessageHandler(
TcpNetClientConnectionFactory connectionFactory) {
TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
return heartbeatSendingMessageHandler;
}
@Bean
public IntegrationFlow heartbeatClientFlow(
TcpNetClientConnectionFactory connectionFactory,
TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
TcpSendingMessageHandler heartbeatSendingMessageHandler,
MessageChannel outboudChannel) {
return IntegrationFlows
.from(outboudChannel) // ??????
.// adapter ???????????
.// gateway ???????????
.// handler ???????????
.get();
}
@Bean
public HeartbeatClient heartbeatClient(
MessageChannel outboudChannel,
PollableChannel inboundChannel) {
return new HeartbeatClient(outboudChannel, inboundChannel);
}
}
HeartbeatClient
public class HeartbeatClient {
private final MessageChannel outboudChannel;
private final PollableChannel inboundChannel;
private final Logger log = LogManager.getLogger(HeartbeatClient.class);
public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
@EventListener
public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
log.info("Starting Heartbeat client...");
start();
}
public void start() {
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
while (true) {
try {
log.info("Sending Heartbeat");
outboudChannel.send(new GenericMessage<String>("status"));
Message<?> message = inboundChannel.receive(1000);
if (message == null) {
log.error("Heartbeat timeouted");
} else {
String messageStr = new String((byte[]) message.getPayload());
if (messageStr.equals("OK")) {
log.info("Heartbeat OK response received");
} else {
log.error("Unexpected message content from server: " + messageStr);
}
}
} catch (Exception e) {
log.error(e);
}
}
}, 0, 10000, TimeUnit.SECONDS);
}
}
HeartbeatServerConfig
@Configuration
@EnableIntegration
public class HeartbeatServerConfig {
@Bean
public MessageChannel outboudChannel() {
return new DirectChannel();
}
@Bean
public PollableChannel inboundChannel() {
return new QueueChannel();
}
@Bean
public TcpNetServerConnectionFactory connectionFactory() {
TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
return connectionFactory;
}
@Bean
public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
TcpNetServerConnectionFactory connectionFactory,
MessageChannel outboudChannel) {
TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
heartbeatReceivingMessageAdapter.setOutputChannel(outboudChannel);
return heartbeatReceivingMessageAdapter;
}
@Bean
public TcpSendingMessageHandler heartbeatSendingMessageHandler(
TcpNetServerConnectionFactory connectionFactory) {
TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
return heartbeatSendingMessageHandler;
}
@Bean
public IntegrationFlow heartbeatServerFlow(
TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
TcpSendingMessageHandler heartbeatSendingMessageHandler,
MessageChannel outboudChannel) {
return IntegrationFlows
.from(heartbeatReceivingMessageAdapter) // ???????????????
.handle(heartbeatSendingMessageHandler) // ???????????????
.get();
}
@Bean
public HeartbeatServer heartbeatServer(
PollableChannel inboundChannel,
MessageChannel outboudChannel) {
return new HeartbeatServer(inboundChannel, outboudChannel);
}
}
心跳服务器
public class HeartbeatServer {
private final PollableChannel inboundChannel;
private final MessageChannel outboudChannel;
private final Logger log = LogManager.getLogger(HeartbeatServer.class);
public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
this.inboundChannel = inboundChannel;
this.outboudChannel = outboudChannel;
}
@EventListener
public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
log.info("Starting Heartbeat");
start();
}
public void start() {
Executors.newSingleThreadExecutor().execute(() -> {
while (true) {
try {
Message<?> message = inboundChannel.receive(1000);
if (message == null) {
log.error("Heartbeat timeouted");
} else {
String messageStr = new String((byte[]) message.getPayload());
if (messageStr.equals("status")) {
log.info("Heartbeat received");
outboudChannel.send(new GenericMessage<>("OK"));
} else {
log.error("Unexpected message content from client: " + messageStr);
}
}
} catch (Exception e) {
log.error(e);
}
}
});
}
}
加分题
为什么可以在 TcpReceivingChannelAdapter(入站适配器)上设置通道而不是 TcpSendingMessageHandler(出站适配器)?
更新
这是完整的项目源代码,如果有人对任何人感兴趣 git 克隆它:
https://bitbucket.org/espinosa/spring-integration-tcp-demo
我会尝试将所有建议的解决方案放在那里。
有了 DSL 就简单多了...
@SpringBootApplication
@EnableScheduling
public class So55154418Application {
public static void main(String[] args) {
SpringApplication.run(So55154418Application.class, args);
}
@Bean
public IntegrationFlow server() {
return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234)))
.transform(Transformers.objectToString())
.log()
.handle((p, h) -> "OK")
.get();
}
@Bean
public IntegrationFlow client() {
return IntegrationFlows.from(Gate.class)
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
.transform(Transformers.objectToString())
.handle((p, h) -> {
System.out.println("Received:" + p);
return null;
})
.get();
}
@Bean
@DependsOn("client")
public Runner runner(Gate gateway) {
return new Runner(gateway);
}
public static class Runner {
private final Gate gateway;
public Runner(Gate gateway) {
this.gateway = gateway;
}
@Scheduled(fixedDelay = 5000)
public void run() {
this.gateway.send("foo");
}
}
public interface Gate {
void send(String out);
}
}
或者,从 Gate 方法获取回复...
@Bean
public IntegrationFlow client() {
return IntegrationFlows.from(Gate.class)
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
.transform(Transformers.objectToString())
.get();
}
@Bean
@DependsOn("client")
public Runner runner(Gate gateway) {
return new Runner(gateway);
}
public static class Runner {
private final Gate gateway;
public Runner(Gate gateway) {
this.gateway = gateway;
}
@Scheduled(fixedDelay = 5000)
public void run() {
String reply = this.gateway.sendAndReceive("foo"); // null for timeout
System.out.println("Received:" + reply);
}
}
public interface Gate {
@Gateway(replyTimeout = 5000)
String sendAndReceive(String out);
}
奖金:
消费端点实际上由 2 个 bean 组成;消费者和消息处理程序。渠道继续消费者。参见
编辑
另一种方法,用于客户端的单个 bean...
@Bean
public IntegrationFlow client() {
return IntegrationFlows.from(() -> "foo",
e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
.transform(Transformers.objectToString())
.handle((p, h) -> {
System.out.println("Received:" + p);
return null;
})
.get();
}
对于任何感兴趣的人,这是我在 Gary Russell 的帮助下制定的可行解决方案之一。 所有功劳归功于 Gary Russell。 Full project source code here.
亮点:
- IntegrationFlows:仅使用入站和出站网关。
- 不需要适配器或通道;没有 ServiceActivators 或 Message Gate 代理。
- 不需要ScheduledExecutor或Executors;客户端和服务器代码很重要
- IntegrationFlows 直接调用客户端 class 和服务器 class 上的方法;我喜欢这种显式连接
- 拆分客户端class两部分,两种方法:请求产生部分和响应处理部分;这样它可以更好地链接到流。
- 明确定义clientConnectionFactory/serverConnectionFactory。这样以后可以显式配置更多的东西。
HeartbeatClientConfig
@Bean
public IntegrationFlow heartbeatClientFlow(
TcpNetClientConnectionFactory clientConnectionFactory,
HeartbeatClient heartbeatClient) {
return IntegrationFlows.from(heartbeatClient::send, e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
.handle(Tcp.outboundGateway(clientConnectionFactory))
.handle(heartbeatClient::receive)
.get();
}
HeartbeatClient
public class HeartbeatClient {
private final Logger log = LogManager.getLogger(HeartbeatClient.class);
public GenericMessage<String> send() {
log.info("Sending Heartbeat");
return new GenericMessage<String>("status");
}
public Object receive(byte[] payload, MessageHeaders messageHeaders) { // LATER: use transformer() to receive String here
String messageStr = new String(payload);
if (messageStr.equals("OK")) {
log.info("Heartbeat OK response received");
} else {
log.error("Unexpected message content from server: " + messageStr);
}
return null;
}
}
HeartbeatServerConfig
@Bean
public IntegrationFlow heartbeatServerFlow(
TcpNetServerConnectionFactory serverConnectionFactory,
HeartbeatServer heartbeatServer) {
return IntegrationFlows
.from(Tcp.inboundGateway(serverConnectionFactory))
.handle(heartbeatServer::processRequest)
.get();
}
心跳服务器
public class HeartbeatServer {
private final Logger log = LogManager.getLogger(HeartbeatServer.class);
public Message<String> processRequest(byte[] payload, MessageHeaders messageHeaders) {
String messageStr = new String(payload);
if (messageStr.equals("status")) {
log.info("Heartbeat received");
return new GenericMessage<>("OK");
} else {
log.error("Unexpected message content from client: " + messageStr);
return null;
}
}
}