如何在 Spring Integration DSL 中实现简单的 echo socket 服务

How to implement simple echo socket service in Spring Integration DSL

拜托,
你能帮助在 Spring 集成 DSL 中实现一个简单的、回声风格的 Heartbeat TCP 套接字服务吗?更准确地说,如何在客户端和服务器端将 Adapter/Handler/Gateway 插入 IntegrationFlows。 Spring 集成 DSL 和 TCP/IP client/server 通信很难找到实际示例。

我想,我已经掌握了大部分代码,只是将 IntegrationFlow.

中的所有内容整合在一起而已

SI 示例中有一个示例 echo 服务,但它是在 "old" XML 配置中编写的,我真的很难将其转换为通过代码配置。

我的 Heartbeat 服务是一个简单的服务器,等待客户端请求 "status",并以 "OK" 响应。

没有@ServiceActivator,没有@MessageGateways,没有代理,一切都明确而冗长;由客户端的普通 JDK 计划执行程序驱动;服务器和客户端在单独的配置和项目中。

HeartbeatClientConfig

@Configuration
@EnableIntegration
public class HeartbeatClientConfig {

    @Bean
    public MessageChannel outboudChannel() {
        return new DirectChannel();
    }

    @Bean
    public PollableChannel inboundChannel() {
        return new QueueChannel();
    }

    @Bean
    public TcpNetClientConnectionFactory connectionFactory() {
        TcpNetClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory("localhost", 7777);
        return connectionFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
            TcpNetClientConnectionFactory connectionFactory,
            MessageChannel inboundChannel) {
        TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
        heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
        heartbeatReceivingMessageAdapter.setOutputChannel(inboundChannel); // ???
        heartbeatReceivingMessageAdapter.setClientMode(true);
        return heartbeatReceivingMessageAdapter;
    }

    @Bean
    public TcpSendingMessageHandler heartbeatSendingMessageHandler(
            TcpNetClientConnectionFactory connectionFactory) {
        TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
        heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
        return heartbeatSendingMessageHandler;
    }

    @Bean
    public IntegrationFlow heartbeatClientFlow(
            TcpNetClientConnectionFactory connectionFactory,
            TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
            TcpSendingMessageHandler heartbeatSendingMessageHandler,
            MessageChannel outboudChannel) {
        return IntegrationFlows
                .from(outboudChannel) // ??????
                .// adapter ???????????
                .// gateway ???????????
                .// handler ???????????
                .get();
    }

    @Bean
    public HeartbeatClient heartbeatClient(
            MessageChannel outboudChannel,
            PollableChannel inboundChannel) {
        return new HeartbeatClient(outboudChannel, inboundChannel);
    }
}

HeartbeatClient

public class HeartbeatClient {
    private final MessageChannel outboudChannel;
    private final PollableChannel inboundChannel;
    private final Logger log = LogManager.getLogger(HeartbeatClient.class);

    public HeartbeatClient(MessageChannel outboudChannel, PollableChannel inboundChannel) {
        this.inboundChannel = inboundChannel;
        this.outboudChannel = outboudChannel;
    }

    @EventListener
    public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
        log.info("Starting Heartbeat client...");
        start();
    }

    public void start() {
        Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
            while (true) {
                try {
                    log.info("Sending Heartbeat");
                    outboudChannel.send(new GenericMessage<String>("status"));
                    Message<?> message = inboundChannel.receive(1000);
                    if (message == null) {
                        log.error("Heartbeat timeouted");
                    } else {
                        String messageStr = new String((byte[]) message.getPayload());
                        if (messageStr.equals("OK")) {
                            log.info("Heartbeat OK response received");
                        } else {
                            log.error("Unexpected message content from server: " + messageStr);
                        }
                    }
                } catch (Exception e) {
                    log.error(e);
                }
            }
        }, 0, 10000, TimeUnit.SECONDS);
    }
}

HeartbeatServerConfig

@Configuration
@EnableIntegration
public class HeartbeatServerConfig {

    @Bean
    public MessageChannel outboudChannel() {
        return new DirectChannel();
    }

    @Bean
    public PollableChannel inboundChannel() {
        return new QueueChannel();
    }

    @Bean
    public TcpNetServerConnectionFactory connectionFactory() {
        TcpNetServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(7777);
        return connectionFactory;
    }

    @Bean
    public TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter(
            TcpNetServerConnectionFactory connectionFactory,
            MessageChannel outboudChannel) {
        TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter = new TcpReceivingChannelAdapter();
        heartbeatReceivingMessageAdapter.setConnectionFactory(connectionFactory);
        heartbeatReceivingMessageAdapter.setOutputChannel(outboudChannel);
        return heartbeatReceivingMessageAdapter;
    }

    @Bean
    public TcpSendingMessageHandler heartbeatSendingMessageHandler(
            TcpNetServerConnectionFactory connectionFactory) {
        TcpSendingMessageHandler heartbeatSendingMessageHandler = new TcpSendingMessageHandler();
        heartbeatSendingMessageHandler.setConnectionFactory(connectionFactory);
        return heartbeatSendingMessageHandler;
    }

    @Bean
    public IntegrationFlow heartbeatServerFlow(
            TcpReceivingChannelAdapter heartbeatReceivingMessageAdapter,
            TcpSendingMessageHandler heartbeatSendingMessageHandler,
            MessageChannel outboudChannel) {
        return IntegrationFlows
                .from(heartbeatReceivingMessageAdapter) // ???????????????
                .handle(heartbeatSendingMessageHandler) // ???????????????
                .get();
    }

    @Bean
    public HeartbeatServer heartbeatServer(
            PollableChannel inboundChannel, 
            MessageChannel outboudChannel) {
        return new HeartbeatServer(inboundChannel, outboudChannel);
    }
}

心跳服务器

public class HeartbeatServer {
    private final PollableChannel inboundChannel;
    private final MessageChannel outboudChannel;
    private final Logger log = LogManager.getLogger(HeartbeatServer.class);

    public HeartbeatServer(PollableChannel inboundChannel, MessageChannel outboudChannel) {
        this.inboundChannel = inboundChannel;
        this.outboudChannel = outboudChannel;
    }

    @EventListener
    public void initializaAfterContextIsReady(ContextRefreshedEvent event) {
        log.info("Starting Heartbeat");
        start();
    }

    public void start() {
        Executors.newSingleThreadExecutor().execute(() -> {
            while (true) {
                try {
                    Message<?> message = inboundChannel.receive(1000);
                    if (message == null) {
                        log.error("Heartbeat timeouted");
                    } else {
                        String messageStr = new String((byte[]) message.getPayload());
                        if (messageStr.equals("status")) {
                            log.info("Heartbeat received");
                            outboudChannel.send(new GenericMessage<>("OK"));
                        } else {
                            log.error("Unexpected message content from client: " + messageStr);
                        }
                    }
                } catch (Exception e) {
                    log.error(e);
                }
            }
        });
    }
}

加分题

为什么可以在 TcpReceivingChannelAdapter(入站适配器)上设置通道而不是 TcpSendingMessageHandler(出站适配器)?

更新
这是完整的项目源代码,如果有人对任何人感兴趣 git 克隆它:
https://bitbucket.org/espinosa/spring-integration-tcp-demo
我会尝试将所有建议的解决方案放在那里。

有了 DSL 就简单多了...

@SpringBootApplication
@EnableScheduling
public class So55154418Application {

    public static void main(String[] args) {
        SpringApplication.run(So55154418Application.class, args);
    }

    @Bean
    public IntegrationFlow server() {
        return IntegrationFlows.from(Tcp.inboundGateway(Tcp.netServer(1234)))
                .transform(Transformers.objectToString())
                .log()
                .handle((p, h) -> "OK")
                .get();
    }

    @Bean
    public IntegrationFlow client() {
        return IntegrationFlows.from(Gate.class)
                .handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
                .transform(Transformers.objectToString())
                .handle((p, h) -> {
                    System.out.println("Received:" + p);
                    return null;
                })
                .get();
    }

    @Bean
    @DependsOn("client")
    public Runner runner(Gate gateway) {
        return new Runner(gateway);
    }

    public static class Runner {

        private final Gate gateway;

        public Runner(Gate gateway) {
            this.gateway = gateway;
        }

        @Scheduled(fixedDelay = 5000)
        public void run() {
            this.gateway.send("foo");
        }

    }

    public interface Gate {

        void send(String out);

    }

}

或者,从 Gate 方法获取回复...

    @Bean
    public IntegrationFlow client() {
        return IntegrationFlows.from(Gate.class)
                .handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
                .transform(Transformers.objectToString())
                .get();
    }

    @Bean
    @DependsOn("client")
    public Runner runner(Gate gateway) {
        return new Runner(gateway);
    }

    public static class Runner {

        private final Gate gateway;

        public Runner(Gate gateway) {
            this.gateway = gateway;
        }

        @Scheduled(fixedDelay = 5000)
        public void run() {
            String reply = this.gateway.sendAndReceive("foo"); // null for timeout
            System.out.println("Received:" + reply);
        }

    }

    public interface Gate {

        @Gateway(replyTimeout = 5000)
        String sendAndReceive(String out);

    }

奖金:

消费端点实际上由 2 个 bean 组成;消费者和消息处理程序。渠道继续消费者。参见

编辑

另一种方法,用于客户端的单个 bean...

@Bean
public IntegrationFlow client() {
    return IntegrationFlows.from(() -> "foo", 
                    e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
            .handle(Tcp.outboundGateway(Tcp.netClient("localhost", 1234)))
            .transform(Transformers.objectToString())
            .handle((p, h) -> {
                System.out.println("Received:" + p);
                return null;
            })
            .get();
}

对于任何感兴趣的人,这是我在 Gary Russell 的帮助下制定的可行解决方案之一。 所有功劳归功于 Gary RussellFull project source code here.

亮点:

  • IntegrationFlows:仅使用入站和出站网关。
  • 不需要适配器或通道;没有 ServiceActivators 或 Message Gate 代理。
  • 不需要ScheduledExecutor或Executors;客户端和服务器代码很重要
  • IntegrationFlows 直接调用客户端 class 和服务器 class 上的方法;我喜欢这种显式连接
  • 拆分客户端class两部分,两种方法:请求产生部分和响应处理部分;这样它可以更好地链接到流。
  • 明确定义clientConnectionFactory/serverConnectionFactory。这样以后可以显式配置更多的东西。

HeartbeatClientConfig

@Bean
public IntegrationFlow heartbeatClientFlow(
        TcpNetClientConnectionFactory clientConnectionFactory,
        HeartbeatClient heartbeatClient) {
    return IntegrationFlows.from(heartbeatClient::send,  e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(5))))
            .handle(Tcp.outboundGateway(clientConnectionFactory))
            .handle(heartbeatClient::receive)
            .get();
}

HeartbeatClient

public class HeartbeatClient {
    private final Logger log = LogManager.getLogger(HeartbeatClient.class);

    public GenericMessage<String> send() {
        log.info("Sending Heartbeat");
        return new GenericMessage<String>("status");
    }

    public Object receive(byte[] payload, MessageHeaders messageHeaders) { // LATER: use transformer() to receive String here
        String messageStr = new String(payload);
        if (messageStr.equals("OK")) {
            log.info("Heartbeat OK response received");
        } else {
            log.error("Unexpected message content from server: " + messageStr);
        }
        return null;
    }
}

HeartbeatServerConfig

@Bean
public IntegrationFlow heartbeatServerFlow(
        TcpNetServerConnectionFactory serverConnectionFactory,
        HeartbeatServer heartbeatServer) {
    return IntegrationFlows
            .from(Tcp.inboundGateway(serverConnectionFactory))
            .handle(heartbeatServer::processRequest)
            .get();
}

心跳服务器

public class HeartbeatServer {
    private final Logger log = LogManager.getLogger(HeartbeatServer.class);

    public Message<String> processRequest(byte[] payload, MessageHeaders messageHeaders) {
        String messageStr = new String(payload);
        if (messageStr.equals("status")) {
            log.info("Heartbeat received");
            return new GenericMessage<>("OK");
        } else {
            log.error("Unexpected message content from client: " + messageStr);
            return null;
        }

    }
}