Camel - 使用 Camel Bindy 从 FTP 中使用来自本地磁盘的 CSV 丰富 CSV
Camel - Enrich CSV from FTP with CSV from local disk using Camel Bindy
目标是通过将两个 CSV 文件与
使用骆驼 3.0.0。一个位于 FTP 服务器上,另一个位于磁盘上。如何将轮询丰富模式与使用 Bindy 数据格式解组磁盘上的 CSV 结合使用?
示例代码(为简单起见,FTP 端点替换为文件端点):
@Component
public class EnricherRoute extends RouteBuilder {
@Override
public void configure() {
from("file://data?fileName=part_1.csv&scheduler=quartz2&scheduler.cron=0+0+0/1+*+*+?")
.unmarshal().bindy(BindyType.Csv, Record.class)
.pollEnrich("file://data?fileName=part_2.csv", new ReportAggregationStrategy())
.marshal().bindy(BindyType.Csv, Record.class)
.to("file://reports?fileName=report_${date:now:yyyyMMdd}.csv");
}
}
此示例中的问题在于,在 ReportAggregationStrategy
中,resource
(来自 data/part_2.csv
,见下文)未解组。如何解组 data/part_2.csv
?
public class ReportAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange original, Exchange resource) {
final List<Record> originalRecords = original.getIn().getBody(List.class);
final List<Record> resourceRecords = resource.getIn().getBody(List.class); // Results in errors!
...
}
}
您可以使用 direct endpoint 封装丰富内容并在那里进行解组。
from("file://data?fileName=part_1.csv&scheduler=quartz2&scheduler.cron=0+0+0/1+*+*+?")
.unmarshal().bindy(BindyType.Csv, Record.class)
.enrich("direct:enrich_record", new ReportAggregationStrategy())
.marshal().bindy(BindyType.Csv, Record.class)
.to("file://reports?fileName=report_${date:now:yyyyMMdd}.csv");
from("direct:enrich_record")
.pollEnrich("file://data?fileName=part_2.csv")
.unmarshal().bindy(BindyType.Csv, Record.class);
目标是通过将两个 CSV 文件与 使用骆驼 3.0.0。一个位于 FTP 服务器上,另一个位于磁盘上。如何将轮询丰富模式与使用 Bindy 数据格式解组磁盘上的 CSV 结合使用?
示例代码(为简单起见,FTP 端点替换为文件端点):
@Component
public class EnricherRoute extends RouteBuilder {
@Override
public void configure() {
from("file://data?fileName=part_1.csv&scheduler=quartz2&scheduler.cron=0+0+0/1+*+*+?")
.unmarshal().bindy(BindyType.Csv, Record.class)
.pollEnrich("file://data?fileName=part_2.csv", new ReportAggregationStrategy())
.marshal().bindy(BindyType.Csv, Record.class)
.to("file://reports?fileName=report_${date:now:yyyyMMdd}.csv");
}
}
此示例中的问题在于,在 ReportAggregationStrategy
中,resource
(来自 data/part_2.csv
,见下文)未解组。如何解组 data/part_2.csv
?
public class ReportAggregationStrategy implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange original, Exchange resource) {
final List<Record> originalRecords = original.getIn().getBody(List.class);
final List<Record> resourceRecords = resource.getIn().getBody(List.class); // Results in errors!
...
}
}
您可以使用 direct endpoint 封装丰富内容并在那里进行解组。
from("file://data?fileName=part_1.csv&scheduler=quartz2&scheduler.cron=0+0+0/1+*+*+?")
.unmarshal().bindy(BindyType.Csv, Record.class)
.enrich("direct:enrich_record", new ReportAggregationStrategy())
.marshal().bindy(BindyType.Csv, Record.class)
.to("file://reports?fileName=report_${date:now:yyyyMMdd}.csv");
from("direct:enrich_record")
.pollEnrich("file://data?fileName=part_2.csv")
.unmarshal().bindy(BindyType.Csv, Record.class);