Spring 集成读取和处理文件而不轮询
Spring Integration read and process a file without polling
我目前正在尝试编写和集成流程,然后读取一个 csv 文件并将其分块处理(调用 API 进行充实),然后作为新的 csv 写回。我目前有一个完美运行的示例,除了它正在轮询目录。我想做的是能够将 file-path 和 file-name 传递给 headers 中的集成流程,然后只对该文件执行操作。
这是我的轮询示例代码,除了轮询之外效果很好。
@Bean
@SuppressWarnings("unchecked")
public IntegrationFlow getUIDsFromTTDandOutputToFile() {
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return IntegrationFlows
.from(Files.inboundAdapter(new File(inputFilePath))
.filter(getFileFilters())
.preventDuplicates(true)
.autoCreateDirectory(true),
c -> c
.poller(Pollers.fixedRate(1000)
.maxMessagesPerPoll(1)
)
)
.log(Level.INFO, m -> "TTD UID 2.0 Integration Start" )
.split(Files.splitter())
.channel(c -> c.executor(Executors.newFixedThreadPool(7)))
.handle((p, h) -> new CSVUtils().csvColumnSelector((String) p, ttdColNum))
.channel("chunkingChannel")
.get();
}
@Bean
@ServiceActivator(inputChannel = "chunkingChannel")
public AggregatorFactoryBean chunker() {
log.info("Initializing Chunker");
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setReleaseStrategy(new MessageCountReleaseStrategy(batchSize));
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setGroupTimeoutExpression(new ValueExpression<>(100L));
aggregator.setOutputChannelName("chunkingOutput");
aggregator.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
aggregator.setSendPartialResultOnExpiry(true);
aggregator.setCorrelationStrategy(new CorrelationStrategyIml());
return aggregator;
}
@Bean
public IntegrationFlow enrichFlow() {
return IntegrationFlows.from("chunkingOutput")
.handle((p, h) -> gson.toJson(new TradeDeskUIDRequestPayloadBean((Collection<String>) p)))
.enrichHeaders(eh -> eh.async(false)
.header("accept", "application/json")
.header("contentType", "application/json")
.header("Authorization", "Bearer [TOKEN]")
)
.log(Level.INFO, m -> "Sending request of size " + batchSize + " to: " + TTD_UID_IDENTITY_MAP)
.handle(Http.outboundGateway(TTD_UID_IDENTITY_MAP)
.requestFactory(
alliantPooledHttpConnection.get_httpComponentsClientHttpRequestFactory())
.httpMethod(HttpMethod.POST)
.expectedResponseType(TradeDeskUIDResponsePayloadBean.class)
.extractPayload(true)
)
.log(Level.INFO, m -> "Writing response to output file" )
.handle((p, h) -> ((TradeDeskUIDResponsePayloadBean) p).printMappedBodyAsCSV2())
.handle(Files.outboundAdapter(new File(outputFilePath))
.autoCreateDirectory(true)
.fileExistsMode(FileExistsMode.APPEND)
//.appendNewLine(true)
.fileNameGenerator(m -> m.getHeaders().getOrDefault("file_name", "outputFile") + "_out.csv")
)
.get();
}
public class CorrelationStrategyIml implements CorrelationStrategy {
@Override
public Object getCorrelationKey(Message<?> message) {
return message.getHeaders().getOrDefault("", 1);
}
}
@Component
public class CSVUtils {
@ServiceActivator
String csvColumnSelector(String inputStr, Integer colNum) {
return StringUtils.commaDelimitedListToStringArray(inputStr)[colNum];
}
}
private FileListFilter<File> getFileFilters(){
ChainFileListFilter<File> cflf = new ChainFileListFilter<>();
cflf.addFilter(new LastModifiedFileListFilter(30));
cflf.addFilter(new AcceptOnceFileListFilter<>());
cflf.addFilter(new SimplePatternFileListFilter(fileExtention));
return cflf;
}
如果您知道该文件,那么框架中的任何特殊组件都没有理由。您只需从一个通道开始您的流程,然后使用 File
对象作为有效负载向它发送一条消息。该消息将继续传送到流程中的分切机,一切都会正常进行。
如果你真的想在这个问题上有一个高层 API,你可以公开一个 @MessagingGateway
作为该流程的开始,最终用户将调用你的网关方法以所需文件作为参数。框架会代为创建一条消息,并发送到流中的消息通道进行处理。
在有关网关的文档中查看更多信息:
还有从某个显式渠道开始的 DSL 定义:
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels
我目前正在尝试编写和集成流程,然后读取一个 csv 文件并将其分块处理(调用 API 进行充实),然后作为新的 csv 写回。我目前有一个完美运行的示例,除了它正在轮询目录。我想做的是能够将 file-path 和 file-name 传递给 headers 中的集成流程,然后只对该文件执行操作。
这是我的轮询示例代码,除了轮询之外效果很好。
@Bean
@SuppressWarnings("unchecked")
public IntegrationFlow getUIDsFromTTDandOutputToFile() {
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return IntegrationFlows
.from(Files.inboundAdapter(new File(inputFilePath))
.filter(getFileFilters())
.preventDuplicates(true)
.autoCreateDirectory(true),
c -> c
.poller(Pollers.fixedRate(1000)
.maxMessagesPerPoll(1)
)
)
.log(Level.INFO, m -> "TTD UID 2.0 Integration Start" )
.split(Files.splitter())
.channel(c -> c.executor(Executors.newFixedThreadPool(7)))
.handle((p, h) -> new CSVUtils().csvColumnSelector((String) p, ttdColNum))
.channel("chunkingChannel")
.get();
}
@Bean
@ServiceActivator(inputChannel = "chunkingChannel")
public AggregatorFactoryBean chunker() {
log.info("Initializing Chunker");
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setReleaseStrategy(new MessageCountReleaseStrategy(batchSize));
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setGroupTimeoutExpression(new ValueExpression<>(100L));
aggregator.setOutputChannelName("chunkingOutput");
aggregator.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
aggregator.setSendPartialResultOnExpiry(true);
aggregator.setCorrelationStrategy(new CorrelationStrategyIml());
return aggregator;
}
@Bean
public IntegrationFlow enrichFlow() {
return IntegrationFlows.from("chunkingOutput")
.handle((p, h) -> gson.toJson(new TradeDeskUIDRequestPayloadBean((Collection<String>) p)))
.enrichHeaders(eh -> eh.async(false)
.header("accept", "application/json")
.header("contentType", "application/json")
.header("Authorization", "Bearer [TOKEN]")
)
.log(Level.INFO, m -> "Sending request of size " + batchSize + " to: " + TTD_UID_IDENTITY_MAP)
.handle(Http.outboundGateway(TTD_UID_IDENTITY_MAP)
.requestFactory(
alliantPooledHttpConnection.get_httpComponentsClientHttpRequestFactory())
.httpMethod(HttpMethod.POST)
.expectedResponseType(TradeDeskUIDResponsePayloadBean.class)
.extractPayload(true)
)
.log(Level.INFO, m -> "Writing response to output file" )
.handle((p, h) -> ((TradeDeskUIDResponsePayloadBean) p).printMappedBodyAsCSV2())
.handle(Files.outboundAdapter(new File(outputFilePath))
.autoCreateDirectory(true)
.fileExistsMode(FileExistsMode.APPEND)
//.appendNewLine(true)
.fileNameGenerator(m -> m.getHeaders().getOrDefault("file_name", "outputFile") + "_out.csv")
)
.get();
}
public class CorrelationStrategyIml implements CorrelationStrategy {
@Override
public Object getCorrelationKey(Message<?> message) {
return message.getHeaders().getOrDefault("", 1);
}
}
@Component
public class CSVUtils {
@ServiceActivator
String csvColumnSelector(String inputStr, Integer colNum) {
return StringUtils.commaDelimitedListToStringArray(inputStr)[colNum];
}
}
private FileListFilter<File> getFileFilters(){
ChainFileListFilter<File> cflf = new ChainFileListFilter<>();
cflf.addFilter(new LastModifiedFileListFilter(30));
cflf.addFilter(new AcceptOnceFileListFilter<>());
cflf.addFilter(new SimplePatternFileListFilter(fileExtention));
return cflf;
}
如果您知道该文件,那么框架中的任何特殊组件都没有理由。您只需从一个通道开始您的流程,然后使用 File
对象作为有效负载向它发送一条消息。该消息将继续传送到流程中的分切机,一切都会正常进行。
如果你真的想在这个问题上有一个高层 API,你可以公开一个 @MessagingGateway
作为该流程的开始,最终用户将调用你的网关方法以所需文件作为参数。框架会代为创建一条消息,并发送到流中的消息通道进行处理。
在有关网关的文档中查看更多信息:
还有从某个显式渠道开始的 DSL 定义:
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-channels