如何暂停 Spring 云数据流源 class 向 kafka 发送数据?

How to pause the Spring cloud data flow Source class from sending data to kafka?

我正在开发spring云数据流应用程序,以下是代码片段

    @Bean
    @InboundChannelAdapter(channel = TbeSource.PR1, poller = @Poller(fixedDelay = "2000"))
    public MessageSource<Product> getProductSource(ProductBuilder dataAccess) {

        return new MessageSource<Product>() {
            @SneakyThrows
            @Override
            public Message<Product> receive() {
                System.out.println("calling method");
                return MessageBuilder.withPayload(dataAccess.getNext()).build();
            }
        };
    }

在上面的代码中,getNext() 方法将从数据库和 return 对象中获取数据,因此如果数据被完全读取,那么它将 return null

我们不能 return 此 MessageSource 为 null。

那么是否有任何选项可用于在我们需要时暂停和恢复此源连接class?

有人遇到/克服过这种情况吗?

首先,您可以使用 Supplier<Product> 而不是 MessageSource,您的代码将如下所示:

return () -> dataAccess.getNext();

null 结果在这里是有效的,在这种情况下不会发出任何消息,也没有错误,因为框架正确处理了 null 结果。

当方法调用的结果为 null 时,您仍然可以在 @InboundChannelAdapter 上使用闲置功能。因此,您需要查看 SimpleActiveIdleMessageSourceAdvice。有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.4.RELEASE/reference/html/core.html#simpleactiveidlereceivemessageadvice