Spring 集成 Java DSL 流 Splitter/Aggregator 处理所有行后删除文件
Spring Integration Java DSL flow Splitter/Aggregator delete file after processing all lines
使用 Spring 集成 Java DSL,我构建了一个流程,我在其中使用 FileSplitter
同步处理文件。在将 File
中的每一行转换为 Message
进行后续处理后,我已经能够使用 AbstractFilePayloadTransformer
上的 setDeleteFiles
标志删除文件,如下所示:
@Bean
protected IntegrationFlow s3ChannelFlow() {
// do not exhaust filesystem w/ files downloaded from S3
FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
transformer.setDeleteFiles(true);
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.channel(StatsUtil.createRunStatsChannel(runStatsRepository))
.transform(transformer)
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
这工作正常,但速度很慢。所以我尝试在上面的 .split
之后添加一个 ExecutorChannel
,像这样:
.channel(c -> c.executor(Executors.newFixedThreadPool(10)))
但是上述删除标志不允许流程在完全读取文件之前成功完成删除文件。
如果删除该标志,我可能会耗尽从 S3 同步文件的本地文件系统。
上面我可以介绍什么来 a) 完全处理每个文件和 b) 完成后从本地文件系统删除文件? 换句话说,有没有办法了解文件完全处理的确切时间(当它的行已通过池中的线程异步处理时)?
如果你很好奇,这是我对 FileToInputStreamTransformer
的暗示:
public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}
更新
那么下游流中的东西如何知道要请求什么?
顺便说一下,如果我正确地听从了你的建议,当我用上面的 new FileSplitter(true, true)
更新 .split
时,我得到
2015-10-20 14:26:45,288 [pool-6-thread-1] org.springframework.integration.handler.LoggingHandler ERROR org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is java.lang.IllegalArgumentException: 'json' argument must be an instance of: [class java.lang.String, class [B, class java.io.File, class java.net.URL, class java.io.InputStream, class java.io.Reader] , but gotten: class org.springframework.integration.file.splitter.FileSplitter$FileMarker
at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
FileSplitter
有 markers
option 正是为了这个目的:
Set to true to emit start/end of file marker messages before and after the file data. Markers are messages with FileSplitter.FileMarker
payloads (with START
and END
values in the mark property). Markers might be used when sequentially processing files in a downstream flow where some lines are filtered. They enable the downstream processing to know when a file has been completely processed. The END
marker includes a line count. Default: false
. When true
, apply-sequence
is false
by default.
您可以在下游流程中使用它来确定是否已经可以删除文件。
谢谢 Artem。
我确实设法解决了这个问题,但也许是以更重量级的方式。
引入 ExecutorChannel
引起了实施调整的连锁反应,包括:关闭 AbtractFilePayloadTransformer
上的 setDeleteFiles
标志,更新 JPA @Entity
,RunStats
和这样的存储库,以捕获文件处理状态以及整个 运行 的处理状态。将处理状态更新放在一起让流程知道何时从本地文件系统删除文件(即,当它们被完全处理时)以及 return /stats/{run}
端点中的状态,以便用户可以知道何时一个运行完成了。
以下是我的实施片段(如果有人好奇的话)...
class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}
public class RunStatsHandler extends AbstractMessageHandler {
private final SplunkSlf4jLogger log = new SplunkSlf4jLogger(LoggerFactory.getLogger(getClass()));
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
private final RunStatsRepository runStatsRepository;
public RunStatsHandler(RunStatsRepository runStatsRepository) {
this.runStatsRepository = runStatsRepository;
}
// Memory efficient routine, @see http://www.baeldung.com/java-read-lines-large-file
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
RunStats runStats = message.getHeaders().get(RunStats.RUN, RunStats.class);
String token = message.getHeaders().get(RunStats.FILE_TOKEN, String.class);
if (runStats != null) {
File compressedFile = (File) message.getPayload();
String compressedFileName = compressedFile.getCanonicalPath();
LongAdder lineCount = new LongAdder();
// Streams and Scanner implement java.lang.AutoCloseable
InputStream fs = new FileInputStream(compressedFile);
InputStream gzfs = new GZIPInputStream(fs, BUFFER_SIZE);
try (Scanner sc = new Scanner(gzfs, "UTF-8")) {
while (sc.hasNextLine()) {
sc.nextLine();
lineCount.increment();
}
// note that Scanner suppresses exceptions
if (sc.ioException() != null) {
log.warn("file.lineCount", ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName,
"exception", sc.ioException().getMessage()));
throw sc.ioException();
}
runStats.addFile(compressedFileName, token, lineCount.longValue());
runStatsRepository.updateRunStats(runStats);
log.info("file.lineCount",
ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, "lineCount", lineCount.intValue()));
}
}
}
}
更新流程
@Bean
protected IntegrationFlow s3ChannelFlow() {
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
.channel(runStatsChannel())
.channel(c -> c.executor(Executors.newFixedThreadPool(persistencePoolSize)))
.transform(new FileToInputStreamTransformer())
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
@Bean
public MessageChannel runStatsChannel() {
DirectChannel wiretapChannel = new DirectChannel();
wiretapChannel.subscribe(new RunStatsHandler(runStatsRepository));
DirectChannel loggingChannel = new DirectChannel();
loggingChannel.addInterceptor(new WireTap(wiretapChannel));
return loggingChannel;
}
遗憾的是,我无法共享 RunStats
和 repo 实现。
使用 Spring 集成 Java DSL,我构建了一个流程,我在其中使用 FileSplitter
同步处理文件。在将 File
中的每一行转换为 Message
进行后续处理后,我已经能够使用 AbstractFilePayloadTransformer
上的 setDeleteFiles
标志删除文件,如下所示:
@Bean
protected IntegrationFlow s3ChannelFlow() {
// do not exhaust filesystem w/ files downloaded from S3
FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
transformer.setDeleteFiles(true);
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.channel(StatsUtil.createRunStatsChannel(runStatsRepository))
.transform(transformer)
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
这工作正常,但速度很慢。所以我尝试在上面的 .split
之后添加一个 ExecutorChannel
,像这样:
.channel(c -> c.executor(Executors.newFixedThreadPool(10)))
但是上述删除标志不允许流程在完全读取文件之前成功完成删除文件。
如果删除该标志,我可能会耗尽从 S3 同步文件的本地文件系统。
上面我可以介绍什么来 a) 完全处理每个文件和 b) 完成后从本地文件系统删除文件? 换句话说,有没有办法了解文件完全处理的确切时间(当它的行已通过池中的线程异步处理时)?
如果你很好奇,这是我对 FileToInputStreamTransformer
的暗示:
public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}
更新
那么下游流中的东西如何知道要请求什么?
顺便说一下,如果我正确地听从了你的建议,当我用上面的 new FileSplitter(true, true)
更新 .split
时,我得到
2015-10-20 14:26:45,288 [pool-6-thread-1] org.springframework.integration.handler.LoggingHandler ERROR org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is java.lang.IllegalArgumentException: 'json' argument must be an instance of: [class java.lang.String, class [B, class java.io.File, class java.net.URL, class java.io.InputStream, class java.io.Reader] , but gotten: class org.springframework.integration.file.splitter.FileSplitter$FileMarker at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)
FileSplitter
有 markers
option 正是为了这个目的:
Set to true to emit start/end of file marker messages before and after the file data. Markers are messages with
FileSplitter.FileMarker
payloads (withSTART
andEND
values in the mark property). Markers might be used when sequentially processing files in a downstream flow where some lines are filtered. They enable the downstream processing to know when a file has been completely processed. TheEND
marker includes a line count. Default:false
. Whentrue
,apply-sequence
isfalse
by default.
您可以在下游流程中使用它来确定是否已经可以删除文件。
谢谢 Artem。
我确实设法解决了这个问题,但也许是以更重量级的方式。
引入 ExecutorChannel
引起了实施调整的连锁反应,包括:关闭 AbtractFilePayloadTransformer
上的 setDeleteFiles
标志,更新 JPA @Entity
,RunStats
和这样的存储库,以捕获文件处理状态以及整个 运行 的处理状态。将处理状态更新放在一起让流程知道何时从本地文件系统删除文件(即,当它们被完全处理时)以及 return /stats/{run}
端点中的状态,以便用户可以知道何时一个运行完成了。
以下是我的实施片段(如果有人好奇的话)...
class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}
public class RunStatsHandler extends AbstractMessageHandler {
private final SplunkSlf4jLogger log = new SplunkSlf4jLogger(LoggerFactory.getLogger(getClass()));
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB
private final RunStatsRepository runStatsRepository;
public RunStatsHandler(RunStatsRepository runStatsRepository) {
this.runStatsRepository = runStatsRepository;
}
// Memory efficient routine, @see http://www.baeldung.com/java-read-lines-large-file
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
RunStats runStats = message.getHeaders().get(RunStats.RUN, RunStats.class);
String token = message.getHeaders().get(RunStats.FILE_TOKEN, String.class);
if (runStats != null) {
File compressedFile = (File) message.getPayload();
String compressedFileName = compressedFile.getCanonicalPath();
LongAdder lineCount = new LongAdder();
// Streams and Scanner implement java.lang.AutoCloseable
InputStream fs = new FileInputStream(compressedFile);
InputStream gzfs = new GZIPInputStream(fs, BUFFER_SIZE);
try (Scanner sc = new Scanner(gzfs, "UTF-8")) {
while (sc.hasNextLine()) {
sc.nextLine();
lineCount.increment();
}
// note that Scanner suppresses exceptions
if (sc.ioException() != null) {
log.warn("file.lineCount", ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName,
"exception", sc.ioException().getMessage()));
throw sc.ioException();
}
runStats.addFile(compressedFileName, token, lineCount.longValue());
runStatsRepository.updateRunStats(runStats);
log.info("file.lineCount",
ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, "lineCount", lineCount.intValue()));
}
}
}
}
更新流程
@Bean
protected IntegrationFlow s3ChannelFlow() {
// @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
// @formatter:off
return IntegrationFlows
.from(s3Channel())
.enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
.channel(runStatsChannel())
.channel(c -> c.executor(Executors.newFixedThreadPool(persistencePoolSize)))
.transform(new FileToInputStreamTransformer())
.split(new FileSplitter())
.transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
.publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
.get();
// @formatter:on
}
@Bean
public MessageChannel runStatsChannel() {
DirectChannel wiretapChannel = new DirectChannel();
wiretapChannel.subscribe(new RunStatsHandler(runStatsRepository));
DirectChannel loggingChannel = new DirectChannel();
loggingChannel.addInterceptor(new WireTap(wiretapChannel));
return loggingChannel;
}
遗憾的是,我无法共享 RunStats
和 repo 实现。