Spring 集成 Java DSL 流 Splitter/Aggregator 处理所有行后删除文件

Spring Integration Java DSL flow Splitter/Aggregator delete file after processing all lines

使用 Spring 集成 Java DSL,我构建了一个流程,我在其中使用 FileSplitter 同步处理文件。在将 File 中的每一行转换为 Message 进行后续处理后,我已经能够使用 AbstractFilePayloadTransformer 上的 setDeleteFiles 标志删除文件,如下所示:

@Bean
protected IntegrationFlow s3ChannelFlow() {
    // do not exhaust filesystem w/ files downloaded from S3
    FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
    transformer.setDeleteFiles(true);

    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
    // @formatter:off
    return IntegrationFlows
        .from(s3Channel())
        .channel(StatsUtil.createRunStatsChannel(runStatsRepository))
        .transform(transformer)
        .split(new FileSplitter())
        .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
        .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
        .get();
    // @formatter:on
}

这工作正常,但速度很慢。所以我尝试在上面的 .split 之后添加一个 ExecutorChannel,像这样:

.channel(c -> c.executor(Executors.newFixedThreadPool(10)))

但是上述删除标志不允许流程在完全读取文件之前成功完成删除文件。

如果删除该标志,我可能会耗尽从 S3 同步文件的本地文件系统。

上面我可以介绍什么来 a) 完全处理每个文件和 b) 完成后从本地文件系统删除文件? 换句话说,有没有办法了解文件完全处理的确切时间(当它的行已通过池中的线程异步处理时)?

如果你很好奇,这是我对 FileToInputStreamTransformer 的暗示:

public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {

    private static final int BUFFER_SIZE = 64 * 1024; // 64 kB

    @Override
    // @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
    protected InputStream transformFile(File payload) throws Exception {
        return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
    }
}

更新

那么下游流中的东西如何知道要请求什么?

顺便说一下,如果我正确地听从了你的建议,当我用上面的 new FileSplitter(true, true) 更新 .split 时,我得到

2015-10-20 14:26:45,288 [pool-6-thread-1] org.springframework.integration.handler.LoggingHandler ERROR org.springframework.integration.transformer.MessageTransformationException: failed to transform message; nested exception is java.lang.IllegalArgumentException: 'json' argument must be an instance of: [class java.lang.String, class [B, class java.io.File, class java.net.URL, class java.io.InputStream, class java.io.Reader] , but gotten: class org.springframework.integration.file.splitter.FileSplitter$FileMarker
    at org.springframework.integration.transformer.AbstractTransformer.transform(AbstractTransformer.java:44)

FileSplittermarkers option 正是为了这个目的:

Set to true to emit start/end of file marker messages before and after the file data. Markers are messages with FileSplitter.FileMarker payloads (with START and END values in the mark property). Markers might be used when sequentially processing files in a downstream flow where some lines are filtered. They enable the downstream processing to know when a file has been completely processed. The END marker includes a line count. Default: false. When true, apply-sequence is false by default.

您可以在下游流程中使用它来确定是否已经可以删除文件。

谢谢 Artem。

我确实设法解决了这个问题,但也许是以更重量级的方式。

引入 ExecutorChannel 引起了实施调整的连锁反应,包括:关闭 AbtractFilePayloadTransformer 上的 setDeleteFiles 标志,更新 JPA @EntityRunStats 和这样的存储库,以捕获文件处理状态以及整个 运行 的处理状态。将处理状态更新放在一起让流程知道何时从本地文件系统删除文件(即,当它们被完全处理时)以及 return /stats/{run} 端点中的状态,以便用户可以知道何时一个运行完成了。

以下是我的实施片段(如果有人好奇的话)...

class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {

private static final int BUFFER_SIZE = 64 * 1024; // 64 kB

@Override
// @see http://java-performance.info/java-io-bufferedinputstream-and-java-util-zip-gzipinputstream/
protected InputStream transformFile(File payload) throws Exception {
    return new GZIPInputStream(new FileInputStream(payload), BUFFER_SIZE);
}
}

public class RunStatsHandler extends AbstractMessageHandler {

private final SplunkSlf4jLogger log = new SplunkSlf4jLogger(LoggerFactory.getLogger(getClass()));
private static final int BUFFER_SIZE = 64 * 1024; // 64 kB

private final RunStatsRepository runStatsRepository;

public RunStatsHandler(RunStatsRepository runStatsRepository) {
    this.runStatsRepository = runStatsRepository;
}

// Memory efficient routine, @see http://www.baeldung.com/java-read-lines-large-file
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
    RunStats runStats = message.getHeaders().get(RunStats.RUN, RunStats.class);
    String token = message.getHeaders().get(RunStats.FILE_TOKEN, String.class);
    if (runStats != null) {
        File compressedFile = (File) message.getPayload();
        String compressedFileName = compressedFile.getCanonicalPath();
        LongAdder lineCount = new LongAdder();
        // Streams and Scanner implement java.lang.AutoCloseable
        InputStream fs = new FileInputStream(compressedFile);
        InputStream gzfs = new GZIPInputStream(fs, BUFFER_SIZE);
        try (Scanner sc = new Scanner(gzfs, "UTF-8")) {
            while (sc.hasNextLine()) {
                sc.nextLine();
                lineCount.increment();
            }
            // note that Scanner suppresses exceptions
            if (sc.ioException() != null) {
                log.warn("file.lineCount", ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, 
                        "exception", sc.ioException().getMessage()));
                throw sc.ioException();
            }
            runStats.addFile(compressedFileName, token, lineCount.longValue());
            runStatsRepository.updateRunStats(runStats);
            log.info("file.lineCount",
                    ImmutableMap.of("run", runStats.getRun(), "file", compressedFileName, "lineCount", lineCount.intValue()));
        }
    }
}

}

更新流程

@Bean
protected IntegrationFlow s3ChannelFlow() {
    // @see http://docs.spring.io/spring-integration/reference/html/files.html#file-reading
    // @formatter:off
    return IntegrationFlows
        .from(s3Channel())
        .enrichHeaders(h -> h.headerFunction(RunStats.FILE_TOKEN, f -> UUID.randomUUID().toString()))
        .channel(runStatsChannel())
        .channel(c -> c.executor(Executors.newFixedThreadPool(persistencePoolSize)))
        .transform(new FileToInputStreamTransformer())
        .split(new FileSplitter())
        .transform(new JsonToObjectViaTypeHeaderTransformer(new Jackson2JsonObjectMapper(objectMapper), typeSupport))
        .publishSubscribeChannel(p -> p.subscribe(persistenceSubFlow()))
        .get();
    // @formatter:on
}

@Bean
public MessageChannel runStatsChannel() {
    DirectChannel wiretapChannel = new DirectChannel();
    wiretapChannel.subscribe(new RunStatsHandler(runStatsRepository));
    DirectChannel loggingChannel = new DirectChannel();
    loggingChannel.addInterceptor(new WireTap(wiretapChannel));
    return loggingChannel;
}

遗憾的是,我无法共享 RunStats 和 repo 实现。