使用 spring 集成批量发送数据?

Sending data in batches using spring integration?

我正在尝试根据按照下面定义的流程收到的通知从 GCP 读取文件:

文件 reader - 将数据反序列化为 collection 并发送以进行路由。

我是de-searializingobjects的collection中的数据,发送到路由器做进一步处理。由于我无法控制文件大小,我正在考虑一些批处理 reader 过程的方法。

目前,file-reader 服务激活器 return 是整个 Collection 的反序列化 objects。

问题:

更新方法:

   public <S> Collection<S> readData(DataInfo dataInfo, Class<S> clazz) {
        Resource gcpResource = context.getResource("classpath://data.json")
        var tempDataSet = new HashSet<S>();
        AtomicInteger pivot = new AtomicInteger();
        try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gcpResource.getInputStream()))) {
            bufferedReader.lines().map((dataStr) -> {
                try {
                    var data = deserializeData(dataStr, clazz);
                    return data;
                } catch (JsonProcessingException ex) {
                    throw new CustomException("PARSER-1001", "Error occurred while parsing", ex);
                }
            }).forEach(data -> {
                        if (BATCH_SIZE == pivot.get()) {
                            //When the size in tempDataSet reached BATCH_SIZE send the data in routing channel and reset the pivot
                            var message = MessageBuilder.withPayload(tempDataSet.clone())
                                    .setHeader(AppConstants.EVENT_HEADER_KEY, eventType)
                                    .build();
                            routingChannel.send(message);
                            pivot.set(0);
                            tempDataSet.removeAll(tempDataSet);
                        } else {
                            pivot.addAndGet(1);
                            tempDataSet.add(data);
                        }
                    });
            return tempDataSet;
        } catch (Exception ex) {
            throw new CustomException("PARSER-1002", "Error occurred while parsing", ex);
        }
    }

如果批量大小为 100 而我们收到 1010 objects。将创建 11 个批次,其中 10 个有 100 个,最后一个有 10 个 objects。

如果我使用拆分器并将流传递给它,它会等待整个流完成然后发送收集的 collection 还是我们可以使用它实现与以前的方法接近的东西?

不确定问题是什么,但我会选择 FileSplitter + Aggregator 解决方案。第一个正是用于流式文件读取用例。第二个允许您缓冲传入的消息,直到它们达到某种条件,因此它可以向下游发出单个消息。该消息确实可以将集合作为有效负载。

这是他们的文档供您参考:

https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator

https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-splitter