如何加快骆驼路线
How to speed up camel routes
我正在开发一项服务 (spring-boot),它获取一个 ID 列表,一个一个地从数据库中获取对象,将这些对象聚合成批次,然后将它们保存在其他地方.目前,聚合后的批量大小约为 50 个对象,大约每秒 10 个对象被发送到聚合路由。现在我正在尝试了解如何加快我的路线或至少了解瓶颈在哪里。
这是将请求并发发送到 direct:findObject
的 bean
@Produce(uri = "direct:findObject")
private ProducerTemplate producerTemplate;
public void send() {
List<Long> listOfIDs = someService.getIDList(); //1000000 ids in the list
listOfIDs.parallelStream().forEach(producerTemplate::sendBody);
}
我的骆驼路线:
from(direct:findObject)
.bean(objectDaoBean)
.marshal().json(JsonLibrary.Gson, MyObject.class)
.to("direct:aggregator");
from("direct:aggregator")
.unmarshal().json(JsonLibrary.Gson, MyObject.class)
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.parallelProcessing()
.completionInterval(TimeUnit.SECONDS.toMillis(5))
.completionSize(1000)
.marshal(new GsonDataFormat(new TypeToken<List<MyObject>>() {}.getType()))
.to("direct:bulkSave");
from("direct:bulkSave")
.unmarshal(new GsonDataFormat(new TypeToken<List<MyObject>>() {}.getType()))
.bean(bulkSaveBean);
非常感谢任何关于如何使整个过程更快的帮助或见解。
更新:
我已经通过@Fábio Carmo Santos
提出的更改设法提高了总体性能(至少 x10)
application.properties:
camel.threadpool.pool-size=10
camel.threadpool.max-pool-size=400
camel.threadpool.max-queue-size=-1
更新制作人:
@Produce("direct:splitter")
private ProducerTemplate producerTemplate;
public void sendList() {
List<Long> listOfIDs = getIDList(); //1000000 ids in the list
producerTemplate.sendBody(listOfIDs);
}
修改后的骆驼路线:
from("direct:splitter")
.split(body())
.parallelProcessing()
.bean(new DaoBean()) //fetch an object by id from db here
.to("direct:aggregator");
from("direct:aggregator")
.aggregate(body(), new GroupedBodyAggregationStrategy())
.parallelProcessing()
.completionInterval(TimeUnit.SECONDS.toMillis(5))
.completionSize(1000)
.to("direct:bulkSave");
from("direct:bulkSave")
.bean(new SaveBean());
之前我在日志中有:
Saving... Batch size 30
Saving... Batch size 45
Saving... Batch size 41
Saving... Batch size 53
Saving... Batch size 47
Saving... Batch size 50
并且总体执行时间至少为数小时(从未等到它完成)
现在更好更快了
Saving... Batch size 499
Saving... Batch size 1000
Saving... Batch size 401
Saving... Batch size 1000
Saving... Batch size 257
Saving... Batch size 1000
当您 getIDList()
时,ID 是否不同?如果不是,您可以区分它们以消除重复的 ID。只有你能决定它是否有意义。
你为什么不把列表发送给 route,让 Camel 帮你拆分呢? Split 中的流媒体选项非常有用。 PS.: 只有在最近的项目中使用这个,我才设法将处理时间从1m30s减少到15s!
也看看 Threading Model。也许,您会想要编辑线程池配置文件。最大默认为 20。
其实处理的方式有很多种。您还可以通过对集合进行分区来对搜索进行分页,然后通过一次从数据库中获取多条记录(假设每次投票 200 条)来丰富您的 Exchange,而不是一条一条地获取,如果需要则使用 AggregationStrategy 或只是拆分搜索结果。
我正在开发一项服务 (spring-boot),它获取一个 ID 列表,一个一个地从数据库中获取对象,将这些对象聚合成批次,然后将它们保存在其他地方.目前,聚合后的批量大小约为 50 个对象,大约每秒 10 个对象被发送到聚合路由。现在我正在尝试了解如何加快我的路线或至少了解瓶颈在哪里。
这是将请求并发发送到 direct:findObject
的 bean@Produce(uri = "direct:findObject")
private ProducerTemplate producerTemplate;
public void send() {
List<Long> listOfIDs = someService.getIDList(); //1000000 ids in the list
listOfIDs.parallelStream().forEach(producerTemplate::sendBody);
}
我的骆驼路线:
from(direct:findObject)
.bean(objectDaoBean)
.marshal().json(JsonLibrary.Gson, MyObject.class)
.to("direct:aggregator");
from("direct:aggregator")
.unmarshal().json(JsonLibrary.Gson, MyObject.class)
.aggregate(constant(true), new GroupedBodyAggregationStrategy())
.parallelProcessing()
.completionInterval(TimeUnit.SECONDS.toMillis(5))
.completionSize(1000)
.marshal(new GsonDataFormat(new TypeToken<List<MyObject>>() {}.getType()))
.to("direct:bulkSave");
from("direct:bulkSave")
.unmarshal(new GsonDataFormat(new TypeToken<List<MyObject>>() {}.getType()))
.bean(bulkSaveBean);
非常感谢任何关于如何使整个过程更快的帮助或见解。
更新: 我已经通过@Fábio Carmo Santos
提出的更改设法提高了总体性能(至少 x10)application.properties:
camel.threadpool.pool-size=10
camel.threadpool.max-pool-size=400
camel.threadpool.max-queue-size=-1
更新制作人:
@Produce("direct:splitter")
private ProducerTemplate producerTemplate;
public void sendList() {
List<Long> listOfIDs = getIDList(); //1000000 ids in the list
producerTemplate.sendBody(listOfIDs);
}
修改后的骆驼路线:
from("direct:splitter")
.split(body())
.parallelProcessing()
.bean(new DaoBean()) //fetch an object by id from db here
.to("direct:aggregator");
from("direct:aggregator")
.aggregate(body(), new GroupedBodyAggregationStrategy())
.parallelProcessing()
.completionInterval(TimeUnit.SECONDS.toMillis(5))
.completionSize(1000)
.to("direct:bulkSave");
from("direct:bulkSave")
.bean(new SaveBean());
之前我在日志中有:
Saving... Batch size 30
Saving... Batch size 45
Saving... Batch size 41
Saving... Batch size 53
Saving... Batch size 47
Saving... Batch size 50
并且总体执行时间至少为数小时(从未等到它完成)
现在更好更快了
Saving... Batch size 499
Saving... Batch size 1000
Saving... Batch size 401
Saving... Batch size 1000
Saving... Batch size 257
Saving... Batch size 1000
当您 getIDList()
时,ID 是否不同?如果不是,您可以区分它们以消除重复的 ID。只有你能决定它是否有意义。
你为什么不把列表发送给 route,让 Camel 帮你拆分呢? Split 中的流媒体选项非常有用。 PS.: 只有在最近的项目中使用这个,我才设法将处理时间从1m30s减少到15s!
也看看 Threading Model。也许,您会想要编辑线程池配置文件。最大默认为 20。
其实处理的方式有很多种。您还可以通过对集合进行分区来对搜索进行分页,然后通过一次从数据库中获取多条记录(假设每次投票 200 条)来丰富您的 Exchange,而不是一条一条地获取,如果需要则使用 AggregationStrategy 或只是拆分搜索结果。