如何暂停 Spring 云数据流源 class 向 kafka 发送数据?
How to pause the Spring cloud data flow Source class from sending data to kafka?
我正在开发spring云数据流应用程序,以下是代码片段
@Bean
@InboundChannelAdapter(channel = TbeSource.PR1, poller = @Poller(fixedDelay = "2000"))
public MessageSource<Product> getProductSource(ProductBuilder dataAccess) {
return new MessageSource<Product>() {
@SneakyThrows
@Override
public Message<Product> receive() {
System.out.println("calling method");
return MessageBuilder.withPayload(dataAccess.getNext()).build();
}
};
}
在上面的代码中,getNext()
方法将从数据库和 return 对象中获取数据,因此如果数据被完全读取,那么它将 return null
我们不能 return 此 MessageSource 为 null。
那么是否有任何选项可用于在我们需要时暂停和恢复此源连接class?
有人遇到/克服过这种情况吗?
首先,您可以使用 Supplier<Product>
而不是 MessageSource
,您的代码将如下所示:
return () -> dataAccess.getNext();
null
结果在这里是有效的,在这种情况下不会发出任何消息,也没有错误,因为框架正确处理了 null
结果。
当方法调用的结果为 null
时,您仍然可以在 @InboundChannelAdapter
上使用闲置功能。因此,您需要查看 SimpleActiveIdleMessageSourceAdvice
。有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.4.RELEASE/reference/html/core.html#simpleactiveidlereceivemessageadvice
我正在开发spring云数据流应用程序,以下是代码片段
@Bean
@InboundChannelAdapter(channel = TbeSource.PR1, poller = @Poller(fixedDelay = "2000"))
public MessageSource<Product> getProductSource(ProductBuilder dataAccess) {
return new MessageSource<Product>() {
@SneakyThrows
@Override
public Message<Product> receive() {
System.out.println("calling method");
return MessageBuilder.withPayload(dataAccess.getNext()).build();
}
};
}
在上面的代码中,getNext()
方法将从数据库和 return 对象中获取数据,因此如果数据被完全读取,那么它将 return null
我们不能 return 此 MessageSource 为 null。
那么是否有任何选项可用于在我们需要时暂停和恢复此源连接class?
有人遇到/克服过这种情况吗?
首先,您可以使用 Supplier<Product>
而不是 MessageSource
,您的代码将如下所示:
return () -> dataAccess.getNext();
null
结果在这里是有效的,在这种情况下不会发出任何消息,也没有错误,因为框架正确处理了 null
结果。
当方法调用的结果为 null
时,您仍然可以在 @InboundChannelAdapter
上使用闲置功能。因此,您需要查看 SimpleActiveIdleMessageSourceAdvice
。有关详细信息,请参阅文档:https://docs.spring.io/spring-integration/docs/5.3.4.RELEASE/reference/html/core.html#simpleactiveidlereceivemessageadvice