`RotatingServerAdvice` 多次不获取文件如何解决?
How to solve if `RotatingServerAdvice` is not fetching file for multiple times?
我正在使用 IntegrationFlow
作为 Sftp 入站 DSL 配置 我正在使用 CustomTriggerAdvice
来处理手动触发。请参阅下面的代码片段以供参考。
我也在使用 RotatingServerAdvice
来处理同一主机中的多个路径。
但是当我启动 Sftp Inbound 时,它第一次从每个路径获取文件,但第二次和以后都不起作用。 Sftp 入站启动但不从路径中获取文件。我无法找出问题所在。有什么我想念的吗?
SftpConfiguration
public IntegrationFlow fileFlow() {
SftpInboundChannelAdapterSpec spec = Sftp
.inboundAdapter(dSF())
.preserveTimestamp(true)
.remoteDirectory(".")
.autoCreateLocalDirectory(true)
.deleteRemoteFiles(false)
.localDirectory(new File(getDestinationLocation()));
return IntegrationFlows
.from(spec, e -> e.id(BEAN_ID)
.autoStartup(false)
.poller(Pollers
.fixedDelay(5000)
.advice(
customRotatingServerAdvice(dSF()),
customTriggerAdvice()
)
)
)
.channel(sftpReceiverChannel())
.handle(sftpInboundMessageHandler())
.get();
}
private MessageChannel sftpReceiverChannel() {
return MessageChannels.direct().get();
}
………………
@Bean
public RotatingServerAdvice customRotatingServerAdvice(
DelegatingSessionFactory<LsEntry> dSF
) {
List<String> pathList = getSourcePathList();
for (String path : pathList) {
keyDirectories.add(new RotationPolicy.KeyDirectory(KEY, path));
}
return new RotatingServerAdvice(
dSF,
keyDirectories
);
}
@Bean
public CustomTriggerAdvice customTriggerAdvice() {
return new CustomTriggerAdvice(customControlChannel(),BEAN_ID);
}
@Bean
public IntegrationFlow customControlBus() {
return IntegrationFlows.from(customControlChannel())
.controlBus()
.get();
}
@Bean
public MessageChannel customControlChannel() {
return MessageChannels.direct().get();
}
CustomTriggerAdvice
public class CustomTriggerAdvice extends AbstractMessageSourceAdvice {
private final MessageChannel controlChannel;
private final String BEAN_ID;
public CustomTriggerAdvice(MessageChannel controlChannel, String beanID) {
this.controlChannel = controlChannel;
this.BEAN_ID = beanID;
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return true;
}
@Override
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
if (result == null) {
controlChannel.send(new GenericMessage<>("@" + BEAN_ID + ".stop()"));
}
return result;
}
}
使用 MessageChannel 启动 Sftp 入站
@Qualifier("customControlChannel") MessageChannel controlChannel;
public void startSftpInbound(){
controlChannel.send(new GenericMessage<>("@" + beanID + ".start()"));
}
I need the system to start on demand and fetch file completing one cycle. If it is not stopped after that, it will continue polling and won't stop and my system will fall into an infinite loop. Is there any way to get that when the RotatingServerAdvice is completing polling from all servers at least one? Does it throw any event or something like that ?
您可能误解了 afterReceive(@Nullable Message<?> result, MessageSource<?> source)
合约的逻辑。当其中一台服务器未返回任何要轮询的内容时,您无法根据您的要求停止通道适配器。这样你就不会给另一个服务器在下一个轮询周期进行轮询的机会。
我认为您的想法是只对所有服务器进行一次迭代然后停止。可能独立于其中任何一个的结果。看起来最好的停止方式是每次使用 RotatingServerAdvice
和 fair = true
移动到下一个服务器。当您看到 RotationPolicy.getCurrent()
等于列表中的最后一个时,可能会从自定义 afterReceive()
独立于结果执行停止。因此,您可以通过这种方式遍历所有这些,并在下一个轮询周期停止移动第一个。
我正在使用 IntegrationFlow
作为 Sftp 入站 DSL 配置 我正在使用 CustomTriggerAdvice
来处理手动触发。请参阅下面的代码片段以供参考。
我也在使用 RotatingServerAdvice
来处理同一主机中的多个路径。
但是当我启动 Sftp Inbound 时,它第一次从每个路径获取文件,但第二次和以后都不起作用。 Sftp 入站启动但不从路径中获取文件。我无法找出问题所在。有什么我想念的吗?
SftpConfiguration
public IntegrationFlow fileFlow() {
SftpInboundChannelAdapterSpec spec = Sftp
.inboundAdapter(dSF())
.preserveTimestamp(true)
.remoteDirectory(".")
.autoCreateLocalDirectory(true)
.deleteRemoteFiles(false)
.localDirectory(new File(getDestinationLocation()));
return IntegrationFlows
.from(spec, e -> e.id(BEAN_ID)
.autoStartup(false)
.poller(Pollers
.fixedDelay(5000)
.advice(
customRotatingServerAdvice(dSF()),
customTriggerAdvice()
)
)
)
.channel(sftpReceiverChannel())
.handle(sftpInboundMessageHandler())
.get();
}
private MessageChannel sftpReceiverChannel() {
return MessageChannels.direct().get();
}
………………
@Bean
public RotatingServerAdvice customRotatingServerAdvice(
DelegatingSessionFactory<LsEntry> dSF
) {
List<String> pathList = getSourcePathList();
for (String path : pathList) {
keyDirectories.add(new RotationPolicy.KeyDirectory(KEY, path));
}
return new RotatingServerAdvice(
dSF,
keyDirectories
);
}
@Bean
public CustomTriggerAdvice customTriggerAdvice() {
return new CustomTriggerAdvice(customControlChannel(),BEAN_ID);
}
@Bean
public IntegrationFlow customControlBus() {
return IntegrationFlows.from(customControlChannel())
.controlBus()
.get();
}
@Bean
public MessageChannel customControlChannel() {
return MessageChannels.direct().get();
}
CustomTriggerAdvice
public class CustomTriggerAdvice extends AbstractMessageSourceAdvice {
private final MessageChannel controlChannel;
private final String BEAN_ID;
public CustomTriggerAdvice(MessageChannel controlChannel, String beanID) {
this.controlChannel = controlChannel;
this.BEAN_ID = beanID;
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return true;
}
@Override
public Message<?> afterReceive(Message<?> result, MessageSource<?> source) {
if (result == null) {
controlChannel.send(new GenericMessage<>("@" + BEAN_ID + ".stop()"));
}
return result;
}
}
使用 MessageChannel 启动 Sftp 入站
@Qualifier("customControlChannel") MessageChannel controlChannel;
public void startSftpInbound(){
controlChannel.send(new GenericMessage<>("@" + beanID + ".start()"));
}
I need the system to start on demand and fetch file completing one cycle. If it is not stopped after that, it will continue polling and won't stop and my system will fall into an infinite loop. Is there any way to get that when the RotatingServerAdvice is completing polling from all servers at least one? Does it throw any event or something like that ?
您可能误解了 afterReceive(@Nullable Message<?> result, MessageSource<?> source)
合约的逻辑。当其中一台服务器未返回任何要轮询的内容时,您无法根据您的要求停止通道适配器。这样你就不会给另一个服务器在下一个轮询周期进行轮询的机会。
我认为您的想法是只对所有服务器进行一次迭代然后停止。可能独立于其中任何一个的结果。看起来最好的停止方式是每次使用 RotatingServerAdvice
和 fair = true
移动到下一个服务器。当您看到 RotationPolicy.getCurrent()
等于列表中的最后一个时,可能会从自定义 afterReceive()
独立于结果执行停止。因此,您可以通过这种方式遍历所有这些,并在下一个轮询周期停止移动第一个。