Spring 集成聚合器
Spring Integration Aggregator
我想使用聚合器从两条消息中创建一条消息,但我不确定该怎么做。
目前我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。
我的整个项目如下所示:
读入 .zip -> 传递给将其解压缩到目录中的自定义消息处理程序 -> 从该目录读取文件 -> 尝试聚合它们
如果我能在解压缩文件后发送一条带有两个有效载荷的消息,那就太好了,但在阅读后聚合就足够了。
我的解压器是这样的:
public class ZipHandler extends AbstractMessageHandler {
File dat;
File json;
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
byte[] buffer = new byte[1024];
try {
File file = (File) message.getPayload();
ZipFile zip = new ZipFile(file);
for (Enumeration<? extends ZipEntry> entries = zip.entries(); entries
.hasMoreElements();) {
ZipEntry ze = entries.nextElement();
String name = ze.getName();
if (name.endsWith(".dat") || name.endsWith(".DAT")) {
InputStream input = zip.getInputStream(ze);
File datFile = new File("D:/lrtrans/zipOut"
+ File.separator + name);
FileOutputStream fos = new FileOutputStream(datFile);
int len;
while ((len = input.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
this.dat = datFile;
fos.close();
} else if (name.endsWith(".json") || name.endsWith(".JSON")) {
InputStream input = zip.getInputStream(ze);
File jsonFile = new File("D:/lrtrans/zipOut"
+ File.separator + name);
FileOutputStream fos = new FileOutputStream(jsonFile);
int len;
while ((len = input.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
this.json = jsonFile;
fos.close();
}
}
zip.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
它获取这些文件并将它们放入两个目录中,我使用 FileReadingMessageSource 从中再次读取它们。
我也想只使用基于注释的符号来解决这个问题,而不是 xml.
编辑:
我只想将 defaultAggregatingMessagegroupProcssor 与基于我称为 header 的 correlationStrategy 和基于消息的 releaseStrategy 一起使用,因为在这种情况下,两个文件应该合并为一个文件。
@Aggregator(inputChannel = "toAggregatorChannel", outputChannel = "toRouterChannel", discardChannel = "nullChannel")
public DefaultAggregatingMessageGroupProcessor aggregate(){
DefaultAggregatingMessageGroupProcessor aggregator = new DefaultAggregatingMessageGroupProcessor();
return aggregator;
}
@CorrelationStrategy
public String correlateBy(@Header("zipFile") String zip){
return zip;
}
@ReleaseStrategy
public boolean isReadytoRelease(List<Message<?>> messages) {
return messages.size() == 2;
}
我会说你走对了路。由于您的 zip 中包含多个文件,因此解压缩并将这些文件收集为一条消息并发送以进行进一步处理是正确的要求。
所以,是的,<aggregator>
适合你。只需要确定如何关联和分组它们。
不知道你是如何解压缩它们的,但你确实可以使用 zip 文件名作为 correlationKey
并使用多个文件作为组大小来确定释放组的信号。
欢迎提出更多问题。但首先我需要看看你的 "unzipper"。
更新
首先,基于注释的聚合器配置有点受限,最好在 AggregatingMessageHandler
@Bean
上使用 @ServiceActivator
以更好地控制其选项。
但是,即使是您的选择,您也可以实现您的要求。但是@Aggregator
配置要遵循POJO方法的调用原则:
@Aggregator(inputChannel = "toAggregatorChannel", outputChannel = "toRouterChannel", discardChannel = "nullChannel")
public List<File> aggregate(List<File> files){
return files;
}
类似的东西。
我想使用聚合器从两条消息中创建一条消息,但我不确定该怎么做。
目前我正在从一个目录中读取两个文件,并希望将这些消息聚合为一个。
我的整个项目如下所示:
读入 .zip -> 传递给将其解压缩到目录中的自定义消息处理程序 -> 从该目录读取文件 -> 尝试聚合它们
如果我能在解压缩文件后发送一条带有两个有效载荷的消息,那就太好了,但在阅读后聚合就足够了。
我的解压器是这样的:
public class ZipHandler extends AbstractMessageHandler {
File dat;
File json;
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
byte[] buffer = new byte[1024];
try {
File file = (File) message.getPayload();
ZipFile zip = new ZipFile(file);
for (Enumeration<? extends ZipEntry> entries = zip.entries(); entries
.hasMoreElements();) {
ZipEntry ze = entries.nextElement();
String name = ze.getName();
if (name.endsWith(".dat") || name.endsWith(".DAT")) {
InputStream input = zip.getInputStream(ze);
File datFile = new File("D:/lrtrans/zipOut"
+ File.separator + name);
FileOutputStream fos = new FileOutputStream(datFile);
int len;
while ((len = input.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
this.dat = datFile;
fos.close();
} else if (name.endsWith(".json") || name.endsWith(".JSON")) {
InputStream input = zip.getInputStream(ze);
File jsonFile = new File("D:/lrtrans/zipOut"
+ File.separator + name);
FileOutputStream fos = new FileOutputStream(jsonFile);
int len;
while ((len = input.read(buffer)) > 0) {
fos.write(buffer, 0, len);
}
this.json = jsonFile;
fos.close();
}
}
zip.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
它获取这些文件并将它们放入两个目录中,我使用 FileReadingMessageSource 从中再次读取它们。 我也想只使用基于注释的符号来解决这个问题,而不是 xml.
编辑:
我只想将 defaultAggregatingMessagegroupProcssor 与基于我称为 header 的 correlationStrategy 和基于消息的 releaseStrategy 一起使用,因为在这种情况下,两个文件应该合并为一个文件。
@Aggregator(inputChannel = "toAggregatorChannel", outputChannel = "toRouterChannel", discardChannel = "nullChannel")
public DefaultAggregatingMessageGroupProcessor aggregate(){
DefaultAggregatingMessageGroupProcessor aggregator = new DefaultAggregatingMessageGroupProcessor();
return aggregator;
}
@CorrelationStrategy
public String correlateBy(@Header("zipFile") String zip){
return zip;
}
@ReleaseStrategy
public boolean isReadytoRelease(List<Message<?>> messages) {
return messages.size() == 2;
}
我会说你走对了路。由于您的 zip 中包含多个文件,因此解压缩并将这些文件收集为一条消息并发送以进行进一步处理是正确的要求。
所以,是的,<aggregator>
适合你。只需要确定如何关联和分组它们。
不知道你是如何解压缩它们的,但你确实可以使用 zip 文件名作为 correlationKey
并使用多个文件作为组大小来确定释放组的信号。
欢迎提出更多问题。但首先我需要看看你的 "unzipper"。
更新
首先,基于注释的聚合器配置有点受限,最好在 AggregatingMessageHandler
@Bean
上使用 @ServiceActivator
以更好地控制其选项。
但是,即使是您的选择,您也可以实现您的要求。但是@Aggregator
配置要遵循POJO方法的调用原则:
@Aggregator(inputChannel = "toAggregatorChannel", outputChannel = "toRouterChannel", discardChannel = "nullChannel")
public List<File> aggregate(List<File> files){
return files;
}
类似的东西。