当我将 File 目录传递给 messageChannel 和 InboundFileAdapter 从中读取文件时,如何调用集成流?
How can I do integration flow to be invoked when I pass File dir to messageChannel and InboundFileAdapter reading files from it?
我有从特定目录读取文件的集成流程,将其转换为 pojo 并保存在列表中。
配置class:
@Configuration
@ComponentScan
@EnableIntegration
@IntegrationComponentScan
public class IntegrationConfig {
@Bean
public MessageChannel fileChannel(){
return new DirectChannel();
}
@Bean
public MessageSource<File> fileMessageSource(){
FileReadingMessageSource readingMessageSource = new FileReadingMessageSource();
CompositeFileListFilter<File> compositeFileListFilter= new CompositeFileListFilter<>();
compositeFileListFilter.addFilter(new SimplePatternFileListFilter("*.csv"));
compositeFileListFilter.addFilter(new AcceptOnceFileListFilter<>());
readingMessageSource.setFilter(compositeFileListFilter);
readingMessageSource.setDirectory(new File("myFiles"));
return readingMessageSource;
}
@Bean
public CSVToOrderTransformer csvToOrderTransformer(){
return new CSVToOrderTransformer();
}
@Bean
public IntegrationFlow convert(){
return IntegrationFlows.from(fileMessageSource(),source -> source.poller(Pollers.fixedDelay(500)))
.channel(fileChannel())
.transform(csvToOrderTransformer())
.handle("loggerOrderList","processOrders")
.channel(MessageChannels.queue())
.get();
}
}
变形金刚:
public class CSVToOrderTransformer {
@Transformer
public List<Order> transform(File file){
List<Order> orders = new ArrayList<>();
Pattern pattern = Pattern.compile("(?m)^(\d*);(WAITING_FOR_PAYMENT|PAYMENT_COMPLETED);(\d*)$");
Matcher matcher = null;
try {
matcher = pattern.matcher(new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
}
while (!matcher.hitEnd()){
if(matcher.find()){
Order order = new Order();
order.setOrderId(Integer.parseInt(matcher.group(1)));
order.setOrderState(matcher.group(2).equals("WAITING_FOR_PAYMENT")? OrderState.WAITING_FOR_PAYMENT:OrderState.PAYMENT_COMPLETED);
order.setOrderCost(Integer.parseInt(matcher.group(3)));
orders.add(order);
}
}
return orders;
}
}
OrderState 枚举:
public enum OrderState {
CANCELED,
WAITING_FOR_PAYMENT,
PAYMENT_COMPLETED
}
订单:
public class Order {
private int orderId;
private OrderState orderState;
private int orderCost;
}
LoggerOrderList 服务:
@Service
public class LoggerOrderList {
private static final Logger LOGGER = LogManager.getLogger(LoggerOrderList.class);
public List<Order> processOrders(List<Order> orderList){
orderList.forEach(LOGGER::info);
return orderList;
}
}
1)当我通过调用网关方法时,如何启动该流程?
2) 如何读取入站通道适配器中传递的消息(在我的例子中是 FileReadingMessageSource)?
FileReadingMessageSource
基于配置目录中提供的轮询。这是流程的开始,不能用在某些逻辑的中间。
你没有解释你的网关是什么,但如果目录作为已发送消息的有效负载传递,你可能希望有类似的逻辑来获取内容。然而,无论如何,这样的逻辑看起来并不适合该消息源。它的目标是一直轮询目录以获取新内容。如果你想要几个目录类似的东西,你可以考虑为提供的目录动态注册流:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-runtime-flows.
否则你需要考虑有一个简单的服务激活器,它会在提供的目录上调用 listFiles()
。仅仅因为没有“等待新内容”功能,滥用就没有意义 FileReeadingMessageSource
我有从特定目录读取文件的集成流程,将其转换为 pojo 并保存在列表中。
配置class:
@Configuration
@ComponentScan
@EnableIntegration
@IntegrationComponentScan
public class IntegrationConfig {
@Bean
public MessageChannel fileChannel(){
return new DirectChannel();
}
@Bean
public MessageSource<File> fileMessageSource(){
FileReadingMessageSource readingMessageSource = new FileReadingMessageSource();
CompositeFileListFilter<File> compositeFileListFilter= new CompositeFileListFilter<>();
compositeFileListFilter.addFilter(new SimplePatternFileListFilter("*.csv"));
compositeFileListFilter.addFilter(new AcceptOnceFileListFilter<>());
readingMessageSource.setFilter(compositeFileListFilter);
readingMessageSource.setDirectory(new File("myFiles"));
return readingMessageSource;
}
@Bean
public CSVToOrderTransformer csvToOrderTransformer(){
return new CSVToOrderTransformer();
}
@Bean
public IntegrationFlow convert(){
return IntegrationFlows.from(fileMessageSource(),source -> source.poller(Pollers.fixedDelay(500)))
.channel(fileChannel())
.transform(csvToOrderTransformer())
.handle("loggerOrderList","processOrders")
.channel(MessageChannels.queue())
.get();
}
}
变形金刚:
public class CSVToOrderTransformer {
@Transformer
public List<Order> transform(File file){
List<Order> orders = new ArrayList<>();
Pattern pattern = Pattern.compile("(?m)^(\d*);(WAITING_FOR_PAYMENT|PAYMENT_COMPLETED);(\d*)$");
Matcher matcher = null;
try {
matcher = pattern.matcher(new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8));
} catch (IOException e) {
e.printStackTrace();
}
while (!matcher.hitEnd()){
if(matcher.find()){
Order order = new Order();
order.setOrderId(Integer.parseInt(matcher.group(1)));
order.setOrderState(matcher.group(2).equals("WAITING_FOR_PAYMENT")? OrderState.WAITING_FOR_PAYMENT:OrderState.PAYMENT_COMPLETED);
order.setOrderCost(Integer.parseInt(matcher.group(3)));
orders.add(order);
}
}
return orders;
}
}
OrderState 枚举:
public enum OrderState {
CANCELED,
WAITING_FOR_PAYMENT,
PAYMENT_COMPLETED
}
订单:
public class Order {
private int orderId;
private OrderState orderState;
private int orderCost;
}
LoggerOrderList 服务:
@Service
public class LoggerOrderList {
private static final Logger LOGGER = LogManager.getLogger(LoggerOrderList.class);
public List<Order> processOrders(List<Order> orderList){
orderList.forEach(LOGGER::info);
return orderList;
}
}
1)当我通过调用网关方法时,如何启动该流程? 2) 如何读取入站通道适配器中传递的消息(在我的例子中是 FileReadingMessageSource)?
FileReadingMessageSource
基于配置目录中提供的轮询。这是流程的开始,不能用在某些逻辑的中间。
你没有解释你的网关是什么,但如果目录作为已发送消息的有效负载传递,你可能希望有类似的逻辑来获取内容。然而,无论如何,这样的逻辑看起来并不适合该消息源。它的目标是一直轮询目录以获取新内容。如果你想要几个目录类似的东西,你可以考虑为提供的目录动态注册流:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-runtime-flows.
否则你需要考虑有一个简单的服务激活器,它会在提供的目录上调用 listFiles()
。仅仅因为没有“等待新内容”功能,滥用就没有意义 FileReeadingMessageSource