Spring 跨越文件入站适配器和队列通道的集成事务策略
Spring Integration transaction strategy spanning file inbound adapter and queue channel
我有一个目录正在被入站文件适配器读取,该适配器通过管道传输到优先级通道,该通道按文件名对文件进行排序。我已经创建了一个事务同步工厂,用于在处理完成后移动文件,这对入站适配器和附加文件编写器流中发生的所有 transformations/aggregations 工作正常。一旦我添加了 PriorityChannel,交易似乎就完成了,它没有被传递给 transformation/aggregration 逻辑。
这里是入站流量
return IntegrationFlows
.from(fileReadingMessageSource,
c -> c.poller(Pollers.fixedDelay(period)
.taskExecutor(taskExecutor)
.maxMessagesPerPoll(maxMessagesPerPoll)))
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.transactional(transactionManager())))
.channel("alphabetically")
.bridge(s -> s.poller(Pollers.fixedDelay(100)))
.channel(ApplicationConfiguration.INBOUND_CHANNEL)
.get();
以及事务同步策略
@Bean
TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionParser parser = new SpelExpressionParser();
ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
syncProcessor.setAfterCommitExpression(parser.parseExpression(
"payload.renameTo(new java.io.File(@inboundProcessedDirectory.path " + " + T(java.io.File).separator + payload.name))"));
syncProcessor.setAfterRollbackExpression(parser.parseExpression(
"payload.renameTo(new java.io.File(@inboundFailedDirectory.path " + " + T(java.io.File).separator + payload.name))"));
return new DefaultTransactionSynchronizationFactory(syncProcessor);
}
知道如何结合优先队列通道跨越这个事务吗?或者有没有其他方法可以实现按字母顺序读取文件?
EDIT1
根据 Gary 的说法,这应该可行(按要求提供整个示例):
@Configuration
class FilePollingIntegrationFlow {
@Autowired
public File inboundReadDirectory;
@Autowired
private ApplicationContext applicationContext;
@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
@Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll, TaskExecutor taskExecutor,
MessageSource<File> fileReadingMessageSource) {
return IntegrationFlows
.from(fileReadingMessageSource,
c -> c.poller(Pollers.fixedDelay(period)
.taskExecutor(taskExecutor)
.maxMessagesPerPoll(maxMessagesPerPoll)
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.transactional(transactionManager())))
.channel(ApplicationConfiguration.INBOUND_CHANNEL)
.get();
}
@Bean
TaskExecutor taskExecutor(@Value("${inbound.file.poller.thread.pool.size}") int poolSize) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(poolSize);
return taskExecutor;
}
@Bean
PseudoTransactionManager transactionManager() {
return new PseudoTransactionManager();
}
@Bean
TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionParser parser = new SpelExpressionParser();
ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
syncProcessor.setAfterCommitExpression(parser.parseExpression(
"payload.renameTo(new java.io.File(@inboundProcessedDirectory.path " + " + T(java.io.File).separator + payload.name))"));
syncProcessor.setAfterRollbackExpression(parser.parseExpression(
"payload.renameTo(new java.io.File(@inboundFailedDirectory.path " + " + T(java.io.File).separator + payload.name))"));
return new DefaultTransactionSynchronizationFactory(syncProcessor);
}
@Bean
public FileReadingMessageSource fileReadingMessageSource(DirectoryScanner directoryScanner) {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(this.inboundReadDirectory);
source.setScanner(directoryScanner);
source.setAutoCreateDirectory(true);
return source;
}
@Bean
public DirectoryScanner directoryScanner(@Value("${inbound.filename.regex}") String regex) {
DirectoryScanner scanner = new RecursiveDirectoryScanner();
CompositeFileListFilter<File> filter = new CompositeFileListFilter<>(
Arrays.asList(new AcceptOnceFileListFilter<>(), new RegexPatternFileListFilter(regex), new AlphabeticalFileListFilter()));
scanner.setFilter(filter);
return scanner;
}
private class AlphabeticalFileListFilter implements FileListFilter<File> {
@Override
public List<File> filterFiles(File[] files) {
List<File> list = Arrays.asList(files);
list.sort(Comparator.comparing(File::getName));
return list;
}
}
}
@Configuration
public class FilePollingConfiguration {
@Bean(name="inboundReadDirectory")
public File inboundReadDirectory(@Value("${inbound.read.path}") String path) {
return makeDirectory(path);
}
@Bean(name="inboundProcessedDirectory")
public File inboundProcessedDirectory(@Value("${inbound.processed.path}") String path) {
return makeDirectory(path);
}
@Bean(name="inboundFailedDirectory")
public File inboundFailedDirectory(@Value("${inbound.failed.path}") String path) {
return makeDirectory(path);
}
@Bean(name="inboundOutDirectory")
public File inboundOutDirectory(@Value("${inbound.out.path}") String path) {
return makeDirectory(path);
}
private File makeDirectory(String path) {
File file = new File(path);
file.mkdirs();
return file;
}
}
通过这样做并删除 PriorityChannel,交易似乎仍然没有像我想的那样工作。使用此流,该文件在 Http 出站网关中不可用。知道为什么吗?
@Component
public class MessageProcessingIntegrationFlow {
public static final String OUTBOUND_FILENAME_GENERATOR = "outboundFilenameGenerator.handler";
public static final String FILE_WRITING_MESSAGE_HANDLER = "fileWritingMessageHandler";
@Autowired
public File inboundOutDirectory;
@Bean
public IntegrationFlow writeToFile(@Value("${api.base.uri}") URI uri,
@Value("${out.filename.dateFormat}") String dateFormat, @Value("${out.filename.suffix}") String filenameSuffix) {
return IntegrationFlows.from(ApplicationConfiguration.INBOUND_CHANNEL)
.enrichHeaders(h -> h.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, m -> ((String) m
.getHeaders()
.get(FileHeaders.FILENAME)).substring(0, 17)))
.aggregate(a -> a.groupTimeout(2000)
.sendPartialResultOnExpiry(true))
.transform(m -> {
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
//noinspection unchecked
((List<File>) m).forEach(f -> body.add("documents", new FileSystemResource((File) f)));
return body;
})
.handle(Http.outboundGateway(uri)
.httpMethod(HttpMethod.POST)
.expectedResponseType(byte[].class))
.handle(Files.outboundGateway(inboundOutDirectory)
.autoCreateDirectory(true)
.fileNameGenerator(
m -> m.getHeaders()
.get(FileHeaders.FILENAME) + "_" + DateTimeFormatter.ofPattern(dateFormat)
.format(LocalDateTime
.now()) + filenameSuffix))
.log(LoggingHandler.Level.INFO)
.get();
}
}
您不能使用 Spring 个事务切换线程;事务绑定到线程。
您可以在消息来源中使用自定义 FileListFilter
,并在那里对文件进行排序。
感谢 Gary Russel,我提出了以下解决方案:
@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
@Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
@Value("${inbound.file.poller.thread.pool.size}") int poolSize,
MessageSource<File> fileReadingMessageSource) {
return IntegrationFlows
.from(fileReadingMessageSource,
c -> c.poller(Pollers.fixedDelay(period)
.taskExecutor(Executors.newFixedThreadPool(poolSize))
.maxMessagesPerPoll(maxMessagesPerPoll)))
.channel("alphabetically")
.bridge(s -> s.poller(Pollers.fixedDelay(100)))
.channel(ApplicationConfiguration.INBOUND_CHANNEL)
.get();
}
具有规范的建议:
@Bean
public Advice fileMoveAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(new FunctionExpression<Message<?>>(m -> renameMultiValueMapFiles(m, this.inboundProcessedDirectory)));
advice.setOnFailureExpression(new FunctionExpression<Message<?>>(m -> renameMultiValueMapFiles(m, this.inboundFailedDirectory)));
return advice;
}
@Bean
public Consumer<GenericEndpointSpec<HttpRequestExecutingMessageHandler>> outboundSpec() {
return new Consumer<GenericEndpointSpec<HttpRequestExecutingMessageHandler>>() {
@Override
public void accept(GenericEndpointSpec<HttpRequestExecutingMessageHandler> spec) {
spec.advice(fileMoveAdvice(), retryAdvice());
}
};
}
@SneakyThrows(IOException.class)
private boolean renameMultiValueMapFiles(Message<?> m, File directory) {
MultiValueMap<String, Resource> files = (MultiValueMap<String, Resource>) m.getPayload();
List<File> list = new ArrayList<>();
// no lambda to avoid ThrowsFunction type
for (List<Resource> l : files.values()) {
for (Resource v : l) {
list.add(v.getFile());
}
}
list.forEach(v -> v.renameTo(new File(directory.getPath(), v.getName())));
return true;
}
添加了要处理的规范:
.handle(Http.outboundGateway(uri)
.httpMethod(HttpMethod.POST)
.expectedResponseType(byte[].class), this.advices.outboundSpec())
我有一个目录正在被入站文件适配器读取,该适配器通过管道传输到优先级通道,该通道按文件名对文件进行排序。我已经创建了一个事务同步工厂,用于在处理完成后移动文件,这对入站适配器和附加文件编写器流中发生的所有 transformations/aggregations 工作正常。一旦我添加了 PriorityChannel,交易似乎就完成了,它没有被传递给 transformation/aggregration 逻辑。
这里是入站流量
return IntegrationFlows
.from(fileReadingMessageSource,
c -> c.poller(Pollers.fixedDelay(period)
.taskExecutor(taskExecutor)
.maxMessagesPerPoll(maxMessagesPerPoll)))
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.transactional(transactionManager())))
.channel("alphabetically")
.bridge(s -> s.poller(Pollers.fixedDelay(100)))
.channel(ApplicationConfiguration.INBOUND_CHANNEL)
.get();
以及事务同步策略
@Bean
TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionParser parser = new SpelExpressionParser();
ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
syncProcessor.setAfterCommitExpression(parser.parseExpression(
"payload.renameTo(new java.io.File(@inboundProcessedDirectory.path " + " + T(java.io.File).separator + payload.name))"));
syncProcessor.setAfterRollbackExpression(parser.parseExpression(
"payload.renameTo(new java.io.File(@inboundFailedDirectory.path " + " + T(java.io.File).separator + payload.name))"));
return new DefaultTransactionSynchronizationFactory(syncProcessor);
}
知道如何结合优先队列通道跨越这个事务吗?或者有没有其他方法可以实现按字母顺序读取文件?
EDIT1
根据 Gary 的说法,这应该可行(按要求提供整个示例):
@Configuration
class FilePollingIntegrationFlow {
@Autowired
public File inboundReadDirectory;
@Autowired
private ApplicationContext applicationContext;
@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
@Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll, TaskExecutor taskExecutor,
MessageSource<File> fileReadingMessageSource) {
return IntegrationFlows
.from(fileReadingMessageSource,
c -> c.poller(Pollers.fixedDelay(period)
.taskExecutor(taskExecutor)
.maxMessagesPerPoll(maxMessagesPerPoll)
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.transactional(transactionManager())))
.channel(ApplicationConfiguration.INBOUND_CHANNEL)
.get();
}
@Bean
TaskExecutor taskExecutor(@Value("${inbound.file.poller.thread.pool.size}") int poolSize) {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(poolSize);
return taskExecutor;
}
@Bean
PseudoTransactionManager transactionManager() {
return new PseudoTransactionManager();
}
@Bean
TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionParser parser = new SpelExpressionParser();
ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
syncProcessor.setAfterCommitExpression(parser.parseExpression(
"payload.renameTo(new java.io.File(@inboundProcessedDirectory.path " + " + T(java.io.File).separator + payload.name))"));
syncProcessor.setAfterRollbackExpression(parser.parseExpression(
"payload.renameTo(new java.io.File(@inboundFailedDirectory.path " + " + T(java.io.File).separator + payload.name))"));
return new DefaultTransactionSynchronizationFactory(syncProcessor);
}
@Bean
public FileReadingMessageSource fileReadingMessageSource(DirectoryScanner directoryScanner) {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(this.inboundReadDirectory);
source.setScanner(directoryScanner);
source.setAutoCreateDirectory(true);
return source;
}
@Bean
public DirectoryScanner directoryScanner(@Value("${inbound.filename.regex}") String regex) {
DirectoryScanner scanner = new RecursiveDirectoryScanner();
CompositeFileListFilter<File> filter = new CompositeFileListFilter<>(
Arrays.asList(new AcceptOnceFileListFilter<>(), new RegexPatternFileListFilter(regex), new AlphabeticalFileListFilter()));
scanner.setFilter(filter);
return scanner;
}
private class AlphabeticalFileListFilter implements FileListFilter<File> {
@Override
public List<File> filterFiles(File[] files) {
List<File> list = Arrays.asList(files);
list.sort(Comparator.comparing(File::getName));
return list;
}
}
}
@Configuration
public class FilePollingConfiguration {
@Bean(name="inboundReadDirectory")
public File inboundReadDirectory(@Value("${inbound.read.path}") String path) {
return makeDirectory(path);
}
@Bean(name="inboundProcessedDirectory")
public File inboundProcessedDirectory(@Value("${inbound.processed.path}") String path) {
return makeDirectory(path);
}
@Bean(name="inboundFailedDirectory")
public File inboundFailedDirectory(@Value("${inbound.failed.path}") String path) {
return makeDirectory(path);
}
@Bean(name="inboundOutDirectory")
public File inboundOutDirectory(@Value("${inbound.out.path}") String path) {
return makeDirectory(path);
}
private File makeDirectory(String path) {
File file = new File(path);
file.mkdirs();
return file;
}
}
通过这样做并删除 PriorityChannel,交易似乎仍然没有像我想的那样工作。使用此流,该文件在 Http 出站网关中不可用。知道为什么吗?
@Component
public class MessageProcessingIntegrationFlow {
public static final String OUTBOUND_FILENAME_GENERATOR = "outboundFilenameGenerator.handler";
public static final String FILE_WRITING_MESSAGE_HANDLER = "fileWritingMessageHandler";
@Autowired
public File inboundOutDirectory;
@Bean
public IntegrationFlow writeToFile(@Value("${api.base.uri}") URI uri,
@Value("${out.filename.dateFormat}") String dateFormat, @Value("${out.filename.suffix}") String filenameSuffix) {
return IntegrationFlows.from(ApplicationConfiguration.INBOUND_CHANNEL)
.enrichHeaders(h -> h.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, m -> ((String) m
.getHeaders()
.get(FileHeaders.FILENAME)).substring(0, 17)))
.aggregate(a -> a.groupTimeout(2000)
.sendPartialResultOnExpiry(true))
.transform(m -> {
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
//noinspection unchecked
((List<File>) m).forEach(f -> body.add("documents", new FileSystemResource((File) f)));
return body;
})
.handle(Http.outboundGateway(uri)
.httpMethod(HttpMethod.POST)
.expectedResponseType(byte[].class))
.handle(Files.outboundGateway(inboundOutDirectory)
.autoCreateDirectory(true)
.fileNameGenerator(
m -> m.getHeaders()
.get(FileHeaders.FILENAME) + "_" + DateTimeFormatter.ofPattern(dateFormat)
.format(LocalDateTime
.now()) + filenameSuffix))
.log(LoggingHandler.Level.INFO)
.get();
}
}
您不能使用 Spring 个事务切换线程;事务绑定到线程。
您可以在消息来源中使用自定义 FileListFilter
,并在那里对文件进行排序。
感谢 Gary Russel,我提出了以下解决方案:
@Bean
public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
@Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll,
@Value("${inbound.file.poller.thread.pool.size}") int poolSize,
MessageSource<File> fileReadingMessageSource) {
return IntegrationFlows
.from(fileReadingMessageSource,
c -> c.poller(Pollers.fixedDelay(period)
.taskExecutor(Executors.newFixedThreadPool(poolSize))
.maxMessagesPerPoll(maxMessagesPerPoll)))
.channel("alphabetically")
.bridge(s -> s.poller(Pollers.fixedDelay(100)))
.channel(ApplicationConfiguration.INBOUND_CHANNEL)
.get();
}
具有规范的建议:
@Bean
public Advice fileMoveAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(new FunctionExpression<Message<?>>(m -> renameMultiValueMapFiles(m, this.inboundProcessedDirectory)));
advice.setOnFailureExpression(new FunctionExpression<Message<?>>(m -> renameMultiValueMapFiles(m, this.inboundFailedDirectory)));
return advice;
}
@Bean
public Consumer<GenericEndpointSpec<HttpRequestExecutingMessageHandler>> outboundSpec() {
return new Consumer<GenericEndpointSpec<HttpRequestExecutingMessageHandler>>() {
@Override
public void accept(GenericEndpointSpec<HttpRequestExecutingMessageHandler> spec) {
spec.advice(fileMoveAdvice(), retryAdvice());
}
};
}
@SneakyThrows(IOException.class)
private boolean renameMultiValueMapFiles(Message<?> m, File directory) {
MultiValueMap<String, Resource> files = (MultiValueMap<String, Resource>) m.getPayload();
List<File> list = new ArrayList<>();
// no lambda to avoid ThrowsFunction type
for (List<Resource> l : files.values()) {
for (Resource v : l) {
list.add(v.getFile());
}
}
list.forEach(v -> v.renameTo(new File(directory.getPath(), v.getName())));
return true;
}
添加了要处理的规范:
.handle(Http.outboundGateway(uri)
.httpMethod(HttpMethod.POST)
.expectedResponseType(byte[].class), this.advices.outboundSpec())