Apache Camel 使用新行令牌拆分并使用聚合策略
Apache Camel split with new line token and use aggregation stategy
我有以下路线:
from("file:/home/tmp/test?move=.done")
.routeId("file")
.split(body().tokenize("\n"),new GroupedBodyAggregationStrategy())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getMessage().setHeader("Test", "test");
System.out.println(exchange.getIn().getBody());
}
})
正在使用的文件是:
1234,56676,2345
234,213,412
124,423,5236
我希望只有一个交换在拆分后到达处理器,并且它的主体包含上述所有行,但是它的行为就像我根本不使用聚合策略一样。 3 个交换到达处理器,每个交换与对应于单行的主体。此外,无论我使用 GroupedBodyAggregationStrategy 还是自定义策略,它总是一样的。我做错了什么?
看这里enter link description here
你看到的是:
from("file:/home/tmp/test?move=.done")
.routeId("file")
.split(body().tokenize("\n"),new GroupedBodyAggregationStrategy())
// instructions on each message from the split (.ie each row here)
.end()
// instructions on aggregated result
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getMessage().setHeader("Test", "test");
System.out.println(exchange.getIn().getBody());
}
})
我有以下路线:
from("file:/home/tmp/test?move=.done")
.routeId("file")
.split(body().tokenize("\n"),new GroupedBodyAggregationStrategy())
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getMessage().setHeader("Test", "test");
System.out.println(exchange.getIn().getBody());
}
})
正在使用的文件是:
1234,56676,2345
234,213,412
124,423,5236
我希望只有一个交换在拆分后到达处理器,并且它的主体包含上述所有行,但是它的行为就像我根本不使用聚合策略一样。 3 个交换到达处理器,每个交换与对应于单行的主体。此外,无论我使用 GroupedBodyAggregationStrategy 还是自定义策略,它总是一样的。我做错了什么?
看这里enter link description here
你看到的是:
from("file:/home/tmp/test?move=.done")
.routeId("file")
.split(body().tokenize("\n"),new GroupedBodyAggregationStrategy())
// instructions on each message from the split (.ie each row here)
.end()
// instructions on aggregated result
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getMessage().setHeader("Test", "test");
System.out.println(exchange.getIn().getBody());
}
})