当我将 File 目录传递给 messageChannel 和 InboundFileAdapter 从中读取文件时,如何调用集成流?

How can I do integration flow to be invoked when I pass File dir to messageChannel and InboundFileAdapter reading files from it?

我有从特定目录读取文件的集成流程,将其转换为 pojo 并保存在列表中。

配置class:

@Configuration
@ComponentScan
@EnableIntegration
@IntegrationComponentScan
public class IntegrationConfig {

    @Bean
    public MessageChannel fileChannel(){
        return new DirectChannel();
    }
    @Bean
    public MessageSource<File> fileMessageSource(){
        FileReadingMessageSource readingMessageSource = new FileReadingMessageSource();
        CompositeFileListFilter<File> compositeFileListFilter= new CompositeFileListFilter<>();
        compositeFileListFilter.addFilter(new SimplePatternFileListFilter("*.csv"));
        compositeFileListFilter.addFilter(new AcceptOnceFileListFilter<>());
        readingMessageSource.setFilter(compositeFileListFilter);
        readingMessageSource.setDirectory(new File("myFiles"));
        return readingMessageSource;
    }

    @Bean
    public CSVToOrderTransformer csvToOrderTransformer(){
        return new CSVToOrderTransformer();
    }
    @Bean
    public IntegrationFlow convert(){
        return IntegrationFlows.from(fileMessageSource(),source -> source.poller(Pollers.fixedDelay(500)))
                .channel(fileChannel())
                .transform(csvToOrderTransformer())
                .handle("loggerOrderList","processOrders")
                .channel(MessageChannels.queue())
                .get();
    }
}

变形金刚:

public class CSVToOrderTransformer {
    @Transformer
    public List<Order> transform(File file){
        List<Order> orders = new ArrayList<>();
        Pattern pattern = Pattern.compile("(?m)^(\d*);(WAITING_FOR_PAYMENT|PAYMENT_COMPLETED);(\d*)$");
        Matcher matcher = null;
        try {
            matcher = pattern.matcher(new String(Files.readAllBytes(file.toPath()), StandardCharsets.UTF_8));
        } catch (IOException e) {
            e.printStackTrace();
        }
        while (!matcher.hitEnd()){
            if(matcher.find()){
                Order order = new Order();
                order.setOrderId(Integer.parseInt(matcher.group(1)));
                order.setOrderState(matcher.group(2).equals("WAITING_FOR_PAYMENT")? OrderState.WAITING_FOR_PAYMENT:OrderState.PAYMENT_COMPLETED);
                order.setOrderCost(Integer.parseInt(matcher.group(3)));
                orders.add(order);
            }
        }
        return orders;
    }
}

OrderState 枚举:

public enum OrderState {
    CANCELED,
    WAITING_FOR_PAYMENT,
    PAYMENT_COMPLETED
}

订单:

public class Order {
    private int orderId;
    private OrderState orderState;
    private int orderCost;
}

LoggerOrderList 服务:

@Service
public class LoggerOrderList {
    private static final Logger LOGGER = LogManager.getLogger(LoggerOrderList.class);
    public List<Order> processOrders(List<Order> orderList){
        orderList.forEach(LOGGER::info);
        return orderList;
    }
}

1)当我通过调用网关方法时,如何启动该流程? 2) 如何读取入站通道适配器中传递的消息(在我的例子中是 FileReadingMessageSource)?

FileReadingMessageSource基于配置目录中提供的轮询。这是流程的开始,不能用在某些逻辑的中间。

你没有解释你的网关是什么,但如果目录作为已发送消息的有效负载传递,你可能希望有类似的逻辑来获取内容。然而,无论如何,这样的逻辑看起来并不适合该消息源。它的目标是一直轮询目录以获取新内容。如果你想要几个目录类似的东西,你可以考虑为提供的目录动态注册流:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-runtime-flows.

否则你需要考虑有一个简单的服务激活器,它会在提供的目录上调用 listFiles()。仅仅因为没有“等待新内容”功能,滥用就没有意义 FileReeadingMessageSource