Spring 与执行器的集成 DSL 自定义错误通道问题
Spring Integration DSL Custom Error Channel Issue with Executor
嗨,我有一个文件侦听器,它正在并行读取文件/一次读取多个文件
package com.example.demo.flow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.*;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.stereotype.Component;
import java.io.File;
import java.util.concurrent.Executors;
/**
* Created by muhdk on 03/01/2020.
*/
@Component
@Slf4j
public class TestFlow {
@Bean
public StandardIntegrationFlow errorChannelHandler() {
return IntegrationFlows.from("testChannel")
.handle(o -> {
log.info("Handling error....{}", o);
}).get();
}
@Bean
public IntegrationFlow testFile() {
IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(5)
.errorChannel("testChannel")))
.channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
.transform(o -> {
throw new RuntimeException("Failing on purpose");
}).handle(o -> {
});
return testChannel.get();
}
}
它不会转到我的自定义错误通道
但如果我删除该行
.channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
然后进入错误通道
我怎样才能使它工作,以便它与执行程序一起进入我的自定义错误通道。
看起来当对多条消息使用 Executor 服务时,它不适用于正常的 errorChannel,我不知道为什么
我做了这样的修改
@Bean
public IntegrationFlow testFile() {
IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(10)
))
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "testChannel"))
.channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
.transform(o -> {
throw new RuntimeException("Failing on purpose");
}).handle(o -> {
});
return testChannel.get();
}
这里的行
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "testChannel"))
其余部分保持不变并且有效。
嗨,我有一个文件侦听器,它正在并行读取文件/一次读取多个文件
package com.example.demo.flow;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.*;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.dsl.Files;
import org.springframework.stereotype.Component;
import java.io.File;
import java.util.concurrent.Executors;
/**
* Created by muhdk on 03/01/2020.
*/
@Component
@Slf4j
public class TestFlow {
@Bean
public StandardIntegrationFlow errorChannelHandler() {
return IntegrationFlows.from("testChannel")
.handle(o -> {
log.info("Handling error....{}", o);
}).get();
}
@Bean
public IntegrationFlow testFile() {
IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(5)
.errorChannel("testChannel")))
.channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
.transform(o -> {
throw new RuntimeException("Failing on purpose");
}).handle(o -> {
});
return testChannel.get();
}
}
它不会转到我的自定义错误通道
但如果我删除该行
.channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
然后进入错误通道
我怎样才能使它工作,以便它与执行程序一起进入我的自定义错误通道。
看起来当对多条消息使用 Executor 服务时,它不适用于正常的 errorChannel,我不知道为什么
我做了这样的修改
@Bean
public IntegrationFlow testFile() {
IntegrationFlowBuilder testChannel = IntegrationFlows.from(Files.inboundAdapter(new File("d:/input-files/")),
e -> e.poller(Pollers.fixedDelay(5000L).maxMessagesPerPoll(10)
))
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "testChannel"))
.channel(MessageChannels.executor(Executors.newFixedThreadPool(5)))
.transform(o -> {
throw new RuntimeException("Failing on purpose");
}).handle(o -> {
});
return testChannel.get();
}
这里的行
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "testChannel"))
其余部分保持不变并且有效。