Spring 集成读取和处理文件而不轮询

Spring Integration read and process a file without polling

我目前正在尝试编写和集成流程,然后读取一个 csv 文件并将其分块处理(调用 API 进行充实),然后作为新的 csv 写回。我目前有一个完美运行的示例,除了它正在轮询目录。我想做的是能够将 file-path 和 file-name 传递给 headers 中的集成流程,然后只对该文件执行操作。

这是我的轮询示例代码,除了轮询之外效果很好。

    @Bean
@SuppressWarnings("unchecked")
public IntegrationFlow getUIDsFromTTDandOutputToFile() {
    
    Gson gson = new GsonBuilder().disableHtmlEscaping().create();
    

    return IntegrationFlows
            .from(Files.inboundAdapter(new File(inputFilePath))
                    .filter(getFileFilters())
                    .preventDuplicates(true)
                    .autoCreateDirectory(true),
                    c -> c
                        .poller(Pollers.fixedRate(1000)
                        .maxMessagesPerPoll(1)
                        )
                    )
            .log(Level.INFO, m ->  "TTD UID 2.0 Integration Start" )
            .split(Files.splitter())
            .channel(c -> c.executor(Executors.newFixedThreadPool(7)))
            .handle((p, h) -> new CSVUtils().csvColumnSelector((String) p, ttdColNum))
            .channel("chunkingChannel")
            .get();
            
}

@Bean
@ServiceActivator(inputChannel = "chunkingChannel")
public AggregatorFactoryBean chunker() {
    log.info("Initializing Chunker");
    AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
    aggregator.setReleaseStrategy(new MessageCountReleaseStrategy(batchSize));
    aggregator.setExpireGroupsUponCompletion(true);
    aggregator.setGroupTimeoutExpression(new ValueExpression<>(100L));
    aggregator.setOutputChannelName("chunkingOutput");
    aggregator.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
    aggregator.setSendPartialResultOnExpiry(true);
    aggregator.setCorrelationStrategy(new CorrelationStrategyIml());
    return aggregator;
}

@Bean
public IntegrationFlow enrichFlow() {
  return IntegrationFlows.from("chunkingOutput")
          .handle((p, h) -> gson.toJson(new TradeDeskUIDRequestPayloadBean((Collection<String>) p)))
          .enrichHeaders(eh -> eh.async(false)
                  .header("accept", "application/json")
                  .header("contentType", "application/json")
                  .header("Authorization", "Bearer [TOKEN]")
                  )
          .log(Level.INFO, m -> "Sending request of size " + batchSize + " to: " + TTD_UID_IDENTITY_MAP)
          .handle(Http.outboundGateway(TTD_UID_IDENTITY_MAP)
              .requestFactory(
                  alliantPooledHttpConnection.get_httpComponentsClientHttpRequestFactory())
                      .httpMethod(HttpMethod.POST)
                      .expectedResponseType(TradeDeskUIDResponsePayloadBean.class)
                      .extractPayload(true)
                  )
          .log(Level.INFO, m -> "Writing response to output file" )
          .handle((p, h) -> ((TradeDeskUIDResponsePayloadBean) p).printMappedBodyAsCSV2())
          .handle(Files.outboundAdapter(new File(outputFilePath))
                  .autoCreateDirectory(true)
                  .fileExistsMode(FileExistsMode.APPEND)
                  //.appendNewLine(true)
                  .fileNameGenerator(m -> m.getHeaders().getOrDefault("file_name", "outputFile") + "_out.csv")
                  )
          .get();
}

public class CorrelationStrategyIml implements CorrelationStrategy {
  @Override
  public Object getCorrelationKey(Message<?> message) {
      return message.getHeaders().getOrDefault("", 1);
  }
   

}

@Component
public class CSVUtils {

    @ServiceActivator
    String csvColumnSelector(String inputStr, Integer colNum) {
        return StringUtils.commaDelimitedListToStringArray(inputStr)[colNum];
    }

}

private FileListFilter<File> getFileFilters(){
     ChainFileListFilter<File> cflf = new ChainFileListFilter<>();
     cflf.addFilter(new LastModifiedFileListFilter(30));
     cflf.addFilter(new AcceptOnceFileListFilter<>());
     cflf.addFilter(new SimplePatternFileListFilter(fileExtention));
     return cflf;
     
}

如果您知道该文件,那么框架中的任何特殊组件都没有理由。您只需从一个通道开始您的流程,然后使用 File 对象作为有效负载向它发送一条消息。该消息将继续传送到流程中的分切机,一切都会正常进行。

如果你真的想在这个问题上有一个高层 API,你可以公开一个 @MessagingGateway 作为该流程的开始,最终用户将调用你的网关方法以所需文件作为参数。框架会代为创建一条消息,并发送到流中的消息通道进行处理。

在有关网关的文档中查看更多信息:

https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway

https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#integration-flow-as-gateway

还有从某个显式渠道开始的 DSL 定义:

https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels