将 Apache Camel CSV 与拆分器模式相结合
Combine Apache Camel CSV with the splitter pattern
我有一个很大的 CSV 文件,我想用一个速率限制来处理它。 Splitter Pattern provides exactly what I'm looking for except I can't quite figure out how out how to combine it with the CSV Component.
从拆分器文档中,您可以处理 CSV,例如:
from("file:inbox")
.split().tokenize("\n", 1000).streaming()
.to("activemq:queue:order");
但理想情况下,我想利用 Apache Camel CSV 组件来处理混搭,例如:
from("file:inbox")
.unmarshal().csv().split()
.streaming().parallelProcessing()
.throttle(requestsPerSecond)
.bean(new ValidateProcess(), "validate")
.marshal().csv().to("file:outbox");
我知道上面的代码是完全错误的,但希望它传达了我正在努力实现的目标。这完全可行吗?
所以出于某种原因我之前无法弄清楚这一点,我认为我的类路径有问题,没有获取对 org.apache.camel:camel-csv
的依赖。一旦我解决了这个问题,一切都很好。
这是我最终得到的结果:
final CsvDataFormat csv = new CsvDataFormat(";");
csv.setLazyLoad(true);
csv.setSkipFirstLine(true);
from(in).unmarshal(csv).split(body()).streaming().parallelProcessing()
.bean(validator, "validateNumber")
.filter(header(ValidateProcess.Valid).isEqualTo(true))
.throttle(tps).bean(validator, "validate")
.marshal().csv()
.to(out).log("done.").end();
基本上我想流式处理包含数字的 CSV
与速率限制为 50 TPS 的 API
并将结果输出到 csv 文件。
我有一个很大的 CSV 文件,我想用一个速率限制来处理它。 Splitter Pattern provides exactly what I'm looking for except I can't quite figure out how out how to combine it with the CSV Component.
从拆分器文档中,您可以处理 CSV,例如:
from("file:inbox")
.split().tokenize("\n", 1000).streaming()
.to("activemq:queue:order");
但理想情况下,我想利用 Apache Camel CSV 组件来处理混搭,例如:
from("file:inbox")
.unmarshal().csv().split()
.streaming().parallelProcessing()
.throttle(requestsPerSecond)
.bean(new ValidateProcess(), "validate")
.marshal().csv().to("file:outbox");
我知道上面的代码是完全错误的,但希望它传达了我正在努力实现的目标。这完全可行吗?
所以出于某种原因我之前无法弄清楚这一点,我认为我的类路径有问题,没有获取对 org.apache.camel:camel-csv
的依赖。一旦我解决了这个问题,一切都很好。
这是我最终得到的结果:
final CsvDataFormat csv = new CsvDataFormat(";");
csv.setLazyLoad(true);
csv.setSkipFirstLine(true);
from(in).unmarshal(csv).split(body()).streaming().parallelProcessing()
.bean(validator, "validateNumber")
.filter(header(ValidateProcess.Valid).isEqualTo(true))
.throttle(tps).bean(validator, "validate")
.marshal().csv()
.to(out).log("done.").end();
基本上我想流式处理包含数字的 CSV
与速率限制为 50 TPS 的 API
并将结果输出到 csv 文件。