使用 spring 集成批量发送数据?
Sending data in batches using spring integration?
我正在尝试根据按照下面定义的流程收到的通知从 GCP 读取文件:
文件 reader - 将数据反序列化为 collection 并发送以进行路由。
我是de-searializingobjects的collection中的数据,发送到路由器做进一步处理。由于我无法控制文件大小,我正在考虑一些批处理 reader 过程的方法。
目前,file-reader 服务激活器 return 是整个 Collection
的反序列化 objects。
问题:
如果我收到一个更大的文件,即有 200k 条记录,我想将其分批发送到 header 值路由器而不是
collection 共 200k objects.
如果我将 file-reader 转换为拆分器并添加聚合器
之后通知 -> file-reader -> 聚合器 -> 路由器。
我仍然需要 return 所有 objects 的 collection 而不是迭代器。
我不想将所有记录加载到 collection。
更新方法:
public <S> Collection<S> readData(DataInfo dataInfo, Class<S> clazz) {
Resource gcpResource = context.getResource("classpath://data.json")
var tempDataSet = new HashSet<S>();
AtomicInteger pivot = new AtomicInteger();
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gcpResource.getInputStream()))) {
bufferedReader.lines().map((dataStr) -> {
try {
var data = deserializeData(dataStr, clazz);
return data;
} catch (JsonProcessingException ex) {
throw new CustomException("PARSER-1001", "Error occurred while parsing", ex);
}
}).forEach(data -> {
if (BATCH_SIZE == pivot.get()) {
//When the size in tempDataSet reached BATCH_SIZE send the data in routing channel and reset the pivot
var message = MessageBuilder.withPayload(tempDataSet.clone())
.setHeader(AppConstants.EVENT_HEADER_KEY, eventType)
.build();
routingChannel.send(message);
pivot.set(0);
tempDataSet.removeAll(tempDataSet);
} else {
pivot.addAndGet(1);
tempDataSet.add(data);
}
});
return tempDataSet;
} catch (Exception ex) {
throw new CustomException("PARSER-1002", "Error occurred while parsing", ex);
}
}
如果批量大小为 100 而我们收到 1010 objects。将创建 11 个批次,其中 10 个有 100 个,最后一个有 10 个 objects。
如果我使用拆分器并将流传递给它,它会等待整个流完成然后发送收集的 collection 还是我们可以使用它实现与以前的方法接近的东西?
不确定问题是什么,但我会选择 FileSplitter
+ Aggregator
解决方案。第一个正是用于流式文件读取用例。第二个允许您缓冲传入的消息,直到它们达到某种条件,因此它可以向下游发出单个消息。该消息确实可以将集合作为有效负载。
这是他们的文档供您参考:
https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-splitter
我正在尝试根据按照下面定义的流程收到的通知从 GCP 读取文件:
文件 reader - 将数据反序列化为 collection 并发送以进行路由。
我是de-searializingobjects的collection中的数据,发送到路由器做进一步处理。由于我无法控制文件大小,我正在考虑一些批处理 reader 过程的方法。
目前,file-reader 服务激活器 return 是整个 Collection
的反序列化 objects。
问题:
如果我收到一个更大的文件,即有 200k 条记录,我想将其分批发送到 header 值路由器而不是 collection 共 200k objects.
如果我将 file-reader 转换为拆分器并添加聚合器 之后通知 -> file-reader -> 聚合器 -> 路由器。 我仍然需要 return 所有 objects 的 collection 而不是迭代器。
我不想将所有记录加载到 collection。
更新方法:
public <S> Collection<S> readData(DataInfo dataInfo, Class<S> clazz) {
Resource gcpResource = context.getResource("classpath://data.json")
var tempDataSet = new HashSet<S>();
AtomicInteger pivot = new AtomicInteger();
try (BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gcpResource.getInputStream()))) {
bufferedReader.lines().map((dataStr) -> {
try {
var data = deserializeData(dataStr, clazz);
return data;
} catch (JsonProcessingException ex) {
throw new CustomException("PARSER-1001", "Error occurred while parsing", ex);
}
}).forEach(data -> {
if (BATCH_SIZE == pivot.get()) {
//When the size in tempDataSet reached BATCH_SIZE send the data in routing channel and reset the pivot
var message = MessageBuilder.withPayload(tempDataSet.clone())
.setHeader(AppConstants.EVENT_HEADER_KEY, eventType)
.build();
routingChannel.send(message);
pivot.set(0);
tempDataSet.removeAll(tempDataSet);
} else {
pivot.addAndGet(1);
tempDataSet.add(data);
}
});
return tempDataSet;
} catch (Exception ex) {
throw new CustomException("PARSER-1002", "Error occurred while parsing", ex);
}
}
如果批量大小为 100 而我们收到 1010 objects。将创建 11 个批次,其中 10 个有 100 个,最后一个有 10 个 objects。
如果我使用拆分器并将流传递给它,它会等待整个流完成然后发送收集的 collection 还是我们可以使用它实现与以前的方法接近的东西?
不确定问题是什么,但我会选择 FileSplitter
+ Aggregator
解决方案。第一个正是用于流式文件读取用例。第二个允许您缓冲传入的消息,直到它们达到某种条件,因此它可以向下游发出单个消息。该消息确实可以将集合作为有效负载。
这是他们的文档供您参考:
https://docs.spring.io/spring-integration/docs/current/reference/html/file.html#file-splitter