骆驼:文件使用者组件 "bites off more than it can chew",管道因内存不足错误而死
Camel: File consumer component "bites off more than it can chew", pipeline dies from out-of-memory error
我在 Camel 中定义了一个路由,它是这样的:GET 请求进来,在文件系统中创建一个文件。文件使用者拾取它,从外部 Web 服务获取数据,并通过 POST 将生成的消息发送到其他 Web 服务。
下面的简化代码:
// Update request goes on queue:
from("restlet:http://localhost:9191/update?restletMethod=post")
.routeId("Update via POST")
[...some magic that defines a directory and file name based on request headers...]
.to("file://cameldest/queue?allowNullBody=true&fileExist=Ignore")
// Update gets processed
from("file://cameldest/queue?delay=500&recursive=true&maxDepth=2&sortBy=file:parent;file:modified&preMove=inprogress&delete=true")
.routeId("Update main route")
.streamCaching() //otherwise stuff can't be sent to multiple endpoints
[...enrich message from some web service using http4 component...]
.multicast()
.stopOnException()
.to("direct:sendUpdate", "direct:dependencyCheck", "direct:saveXML")
.end();
多播中的三个端点只是POST将生成的消息发送到其他网络服务。
当队列(即文件目录 cameldest
)相当空时,这一切都工作得很好。文件正在 cameldest/<subdir>
中创建,由文件使用者拾取并移至 cameldest/<subdir>/inprogress
,并且内容正在发送到三个传出的 POST 没问题。
但是,一旦传入请求堆积到大约 300,000 个文件,进度就会变慢,最终 管道会因内存不足错误 (超出 GC 开销限制)而失败。
通过增加日志记录,我可以看到文件消费者轮询基本上不会运行,因为它似乎每次都对它看到的所有文件负责,等待它们被完成处理,然后才开始另一轮轮询。除了(我假设)造成资源瓶颈之外,这也干扰了我的排序要求:一旦队列中塞满了数千条等待处理的消息,那么天真地排序更高的新消息 - 如果它们仍然被拾取- 仍在等待那些已经 "started".
的人
现在,我已经尝试了 maxMessagesPerPoll
和 eagerMaxMessagesPerPoll
选项。起初他们似乎缓解了这个问题,但经过几轮投票后,我仍然有数千个文件处于 "started" 不确定状态。
唯一可行的是使 delay
和 maxMessages...
的瓶颈变得如此狭窄,以至于平均处理速度比文件轮询周期更快。
显然,这不是我想要的。我希望我的管道尽可能快地处理文件,但不会更快。我原以为文件消费者会在路由繁忙时等待。
我犯了明显的错误吗?
(我 运行 在带有 XFS 的 Redhat 7 机器上安装了一个稍旧的 Camel 2.14.0,如果这是问题的一部分。)
尝试将来自文件端点的 maxMessagesPerPoll 设置为一个较低的值,以便每次轮询最多只拾取 X 个文件,这也限制了您将在 Camel 应用程序中拥有的飞行消息总数。
您可以在文件组件的 Camel 文档中找到有关该选项的更多信息
除非您真的需要将数据保存为文件,否则我会提出一个替代解决方案。
从您的 restlet 消费者,将每个请求发送到消息队列应用程序,例如 activemq 或 rabbitmq 或类似的东西。您很快就会在该队列中收到大量消息,但这没关系。
然后用队列消费者替换您的文件消费者。这将需要一些时间,但每条消息都应单独处理并发送到您想要的任何地方。我已经用大约 500 000 条消息测试了 rabbitmq,并且运行良好。这也应该减少消费者的负担。
简短的回答是没有答案:Camel 文件组件的 sortBy
选项内存效率太低,无法满足我的用例:
- 唯一性:如果文件已经存在,我不想将其放入队列。
- 优先级:应首先处理标记为高优先级的文件。
- 性能:几十万个文件,甚至几百万个文件应该没有问题。
- FIFO:(奖励)最旧的文件(按优先级)应该首先被拾取。
问题似乎是,如果我正确阅读 source code and the documentation,所有文件详细信息都在内存中以执行排序,无论是内置语言还是自定义可插入 sorter
用来。文件组件 always 创建一个包含所有详细信息的对象列表,这显然会在经常轮询许多文件时导致 疯狂 数量的垃圾收集开销。
我让我的用例正常工作,大部分情况下,无需使用数据库或编写自定义组件,使用以下步骤:
- 从父目录
cameldest/queue
上的一个文件使用者移动到 两个使用者,每个目录一个,根本没有排序。
- 仅设置来自
/cameldest/queue/high/
的消费者通过我的实际业务逻辑处理文件。
- 设置来自
/cameldest/queue/low
的消费者以简单地将文件从 "low" 提升到 "high"(将它们复制过来,即 .to("file://cameldest/queue/high");
)
- 重要的是,为了 仅在 high 不忙时从 "low" 提升到 "high" ,请将路由策略附加到 "high" 限制了另一条路由 ,即 "low" 如果 "high"
中有任何正在传输的消息
- 此外,我在 "high" 中添加了一个
ThrottlingInflightRoutePolicy
以防止它同时进行过多的交换。
想象一下,这就像在机场办理登机手续时,游客被邀请进入商业 class 车道,如果那是空的。
这在负载下就像一个魅力,即使在 "low" 中有数十万个文件排队,新消息(文件)直接放入 "high" 也能在几秒钟内得到处理。
此解决方案未涵盖的唯一要求是有序性:不能保证首先选取较旧的文件,而是随机选取它们。可以想象这样一种情况,稳定的传入文件流可能导致一个特定的文件 X 总是不走运,永远不会被拾取。不过,这种情况发生的可能性非常低。
可能的改进: 目前,允许/暂停将文件从 "low" 升级到 "high" 的阈值设置为 0 消息在 "high"。一方面,这 保证 放入 "high" 的文件将在执行来自 "low" 的另一次升级之前得到处理,另一方面它导致了一点停止启动模式,尤其是在多线程场景中。虽然不是真正的问题,但性能令人印象深刻。
来源:
我的路线定义:
ThrottlingInflightRoutePolicy trp = new ThrottlingInflightRoutePolicy();
trp.setMaxInflightExchanges(50);
SuspendOtherRoutePolicy sorp = new SuspendOtherRoutePolicy("lowPriority");
from("file://cameldest/queue/low?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
.routeId("lowPriority")
.log("Copying over to high priority: ${in.headers."+Exchange.FILE_PATH+"}")
.to("file://cameldest/queue/high");
from("file://cameldest/queue/high?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
.routeId("highPriority")
.routePolicy(trp)
.routePolicy(sorp)
.threads(20)
.log("Before: ${in.headers."+Exchange.FILE_PATH+"}")
.delay(2000) // This is where business logic would happen
.log("After: ${in.headers."+Exchange.FILE_PATH+"}")
.stop();
我的 SuspendOtherRoutePolicy
,结构松散,像 ThrottlingInflightRoutePolicy
public class SuspendOtherRoutePolicy extends RoutePolicySupport implements CamelContextAware {
private CamelContext camelContext;
private final Lock lock = new ReentrantLock();
private String otherRouteId;
public SuspendOtherRoutePolicy(String otherRouteId) {
super();
this.otherRouteId = otherRouteId;
}
@Override
public CamelContext getCamelContext() {
return camelContext;
}
@Override
public void onStart(Route route) {
super.onStart(route);
if (camelContext.getRoute(otherRouteId) == null) {
throw new IllegalArgumentException("There is no route with the id '" + otherRouteId + "'");
}
}
@Override
public void setCamelContext(CamelContext context) {
camelContext = context;
}
@Override
public void onExchangeDone(Route route, Exchange exchange) {
//log.info("Exchange done on route " + route);
Route otherRoute = camelContext.getRoute(otherRouteId);
//log.info("Other route: " + otherRoute);
throttle(route, otherRoute, exchange);
}
protected void throttle(Route route, Route otherRoute, Exchange exchange) {
// this works the best when this logic is executed when the exchange is done
Consumer consumer = otherRoute.getConsumer();
int size = getSize(route, exchange);
boolean stop = size > 0;
if (stop) {
try {
lock.lock();
stopConsumer(size, consumer);
} catch (Exception e) {
handleException(e);
} finally {
lock.unlock();
}
}
// reload size in case a race condition with too many at once being invoked
// so we need to ensure that we read the most current size and start the consumer if we are already to low
size = getSize(route, exchange);
boolean start = size == 0;
if (start) {
try {
lock.lock();
startConsumer(size, consumer);
} catch (Exception e) {
handleException(e);
} finally {
lock.unlock();
}
}
}
private int getSize(Route route, Exchange exchange) {
return exchange.getContext().getInflightRepository().size(route.getId());
}
private void startConsumer(int size, Consumer consumer) throws Exception {
boolean started = super.startConsumer(consumer);
if (started) {
log.info("Resuming the other consumer " + consumer);
}
}
private void stopConsumer(int size, Consumer consumer) throws Exception {
boolean stopped = super.stopConsumer(consumer);
if (stopped) {
log.info("Suspending the other consumer " + consumer);
}
}
}
我在 Camel 中定义了一个路由,它是这样的:GET 请求进来,在文件系统中创建一个文件。文件使用者拾取它,从外部 Web 服务获取数据,并通过 POST 将生成的消息发送到其他 Web 服务。
下面的简化代码:
// Update request goes on queue:
from("restlet:http://localhost:9191/update?restletMethod=post")
.routeId("Update via POST")
[...some magic that defines a directory and file name based on request headers...]
.to("file://cameldest/queue?allowNullBody=true&fileExist=Ignore")
// Update gets processed
from("file://cameldest/queue?delay=500&recursive=true&maxDepth=2&sortBy=file:parent;file:modified&preMove=inprogress&delete=true")
.routeId("Update main route")
.streamCaching() //otherwise stuff can't be sent to multiple endpoints
[...enrich message from some web service using http4 component...]
.multicast()
.stopOnException()
.to("direct:sendUpdate", "direct:dependencyCheck", "direct:saveXML")
.end();
多播中的三个端点只是POST将生成的消息发送到其他网络服务。
当队列(即文件目录 cameldest
)相当空时,这一切都工作得很好。文件正在 cameldest/<subdir>
中创建,由文件使用者拾取并移至 cameldest/<subdir>/inprogress
,并且内容正在发送到三个传出的 POST 没问题。
但是,一旦传入请求堆积到大约 300,000 个文件,进度就会变慢,最终 管道会因内存不足错误 (超出 GC 开销限制)而失败。
通过增加日志记录,我可以看到文件消费者轮询基本上不会运行,因为它似乎每次都对它看到的所有文件负责,等待它们被完成处理,然后才开始另一轮轮询。除了(我假设)造成资源瓶颈之外,这也干扰了我的排序要求:一旦队列中塞满了数千条等待处理的消息,那么天真地排序更高的新消息 - 如果它们仍然被拾取- 仍在等待那些已经 "started".
的人现在,我已经尝试了 maxMessagesPerPoll
和 eagerMaxMessagesPerPoll
选项。起初他们似乎缓解了这个问题,但经过几轮投票后,我仍然有数千个文件处于 "started" 不确定状态。
唯一可行的是使 delay
和 maxMessages...
的瓶颈变得如此狭窄,以至于平均处理速度比文件轮询周期更快。
显然,这不是我想要的。我希望我的管道尽可能快地处理文件,但不会更快。我原以为文件消费者会在路由繁忙时等待。
我犯了明显的错误吗?
(我 运行 在带有 XFS 的 Redhat 7 机器上安装了一个稍旧的 Camel 2.14.0,如果这是问题的一部分。)
尝试将来自文件端点的 maxMessagesPerPoll 设置为一个较低的值,以便每次轮询最多只拾取 X 个文件,这也限制了您将在 Camel 应用程序中拥有的飞行消息总数。
您可以在文件组件的 Camel 文档中找到有关该选项的更多信息
除非您真的需要将数据保存为文件,否则我会提出一个替代解决方案。
从您的 restlet 消费者,将每个请求发送到消息队列应用程序,例如 activemq 或 rabbitmq 或类似的东西。您很快就会在该队列中收到大量消息,但这没关系。
然后用队列消费者替换您的文件消费者。这将需要一些时间,但每条消息都应单独处理并发送到您想要的任何地方。我已经用大约 500 000 条消息测试了 rabbitmq,并且运行良好。这也应该减少消费者的负担。
简短的回答是没有答案:Camel 文件组件的 sortBy
选项内存效率太低,无法满足我的用例:
- 唯一性:如果文件已经存在,我不想将其放入队列。
- 优先级:应首先处理标记为高优先级的文件。
- 性能:几十万个文件,甚至几百万个文件应该没有问题。
- FIFO:(奖励)最旧的文件(按优先级)应该首先被拾取。
问题似乎是,如果我正确阅读 source code and the documentation,所有文件详细信息都在内存中以执行排序,无论是内置语言还是自定义可插入 sorter
用来。文件组件 always 创建一个包含所有详细信息的对象列表,这显然会在经常轮询许多文件时导致 疯狂 数量的垃圾收集开销。
我让我的用例正常工作,大部分情况下,无需使用数据库或编写自定义组件,使用以下步骤:
- 从父目录
cameldest/queue
上的一个文件使用者移动到 两个使用者,每个目录一个,根本没有排序。 - 仅设置来自
/cameldest/queue/high/
的消费者通过我的实际业务逻辑处理文件。 - 设置来自
/cameldest/queue/low
的消费者以简单地将文件从 "low" 提升到 "high"(将它们复制过来,即.to("file://cameldest/queue/high");
) - 重要的是,为了 仅在 high 不忙时从 "low" 提升到 "high" ,请将路由策略附加到 "high" 限制了另一条路由 ,即 "low" 如果 "high" 中有任何正在传输的消息
- 此外,我在 "high" 中添加了一个
ThrottlingInflightRoutePolicy
以防止它同时进行过多的交换。
想象一下,这就像在机场办理登机手续时,游客被邀请进入商业 class 车道,如果那是空的。
这在负载下就像一个魅力,即使在 "low" 中有数十万个文件排队,新消息(文件)直接放入 "high" 也能在几秒钟内得到处理。
此解决方案未涵盖的唯一要求是有序性:不能保证首先选取较旧的文件,而是随机选取它们。可以想象这样一种情况,稳定的传入文件流可能导致一个特定的文件 X 总是不走运,永远不会被拾取。不过,这种情况发生的可能性非常低。
可能的改进: 目前,允许/暂停将文件从 "low" 升级到 "high" 的阈值设置为 0 消息在 "high"。一方面,这 保证 放入 "high" 的文件将在执行来自 "low" 的另一次升级之前得到处理,另一方面它导致了一点停止启动模式,尤其是在多线程场景中。虽然不是真正的问题,但性能令人印象深刻。
来源:
我的路线定义:
ThrottlingInflightRoutePolicy trp = new ThrottlingInflightRoutePolicy();
trp.setMaxInflightExchanges(50);
SuspendOtherRoutePolicy sorp = new SuspendOtherRoutePolicy("lowPriority");
from("file://cameldest/queue/low?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
.routeId("lowPriority")
.log("Copying over to high priority: ${in.headers."+Exchange.FILE_PATH+"}")
.to("file://cameldest/queue/high");
from("file://cameldest/queue/high?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
.routeId("highPriority")
.routePolicy(trp)
.routePolicy(sorp)
.threads(20)
.log("Before: ${in.headers."+Exchange.FILE_PATH+"}")
.delay(2000) // This is where business logic would happen
.log("After: ${in.headers."+Exchange.FILE_PATH+"}")
.stop();
我的 SuspendOtherRoutePolicy
,结构松散,像 ThrottlingInflightRoutePolicy
public class SuspendOtherRoutePolicy extends RoutePolicySupport implements CamelContextAware {
private CamelContext camelContext;
private final Lock lock = new ReentrantLock();
private String otherRouteId;
public SuspendOtherRoutePolicy(String otherRouteId) {
super();
this.otherRouteId = otherRouteId;
}
@Override
public CamelContext getCamelContext() {
return camelContext;
}
@Override
public void onStart(Route route) {
super.onStart(route);
if (camelContext.getRoute(otherRouteId) == null) {
throw new IllegalArgumentException("There is no route with the id '" + otherRouteId + "'");
}
}
@Override
public void setCamelContext(CamelContext context) {
camelContext = context;
}
@Override
public void onExchangeDone(Route route, Exchange exchange) {
//log.info("Exchange done on route " + route);
Route otherRoute = camelContext.getRoute(otherRouteId);
//log.info("Other route: " + otherRoute);
throttle(route, otherRoute, exchange);
}
protected void throttle(Route route, Route otherRoute, Exchange exchange) {
// this works the best when this logic is executed when the exchange is done
Consumer consumer = otherRoute.getConsumer();
int size = getSize(route, exchange);
boolean stop = size > 0;
if (stop) {
try {
lock.lock();
stopConsumer(size, consumer);
} catch (Exception e) {
handleException(e);
} finally {
lock.unlock();
}
}
// reload size in case a race condition with too many at once being invoked
// so we need to ensure that we read the most current size and start the consumer if we are already to low
size = getSize(route, exchange);
boolean start = size == 0;
if (start) {
try {
lock.lock();
startConsumer(size, consumer);
} catch (Exception e) {
handleException(e);
} finally {
lock.unlock();
}
}
}
private int getSize(Route route, Exchange exchange) {
return exchange.getContext().getInflightRepository().size(route.getId());
}
private void startConsumer(int size, Consumer consumer) throws Exception {
boolean started = super.startConsumer(consumer);
if (started) {
log.info("Resuming the other consumer " + consumer);
}
}
private void stopConsumer(int size, Consumer consumer) throws Exception {
boolean stopped = super.stopConsumer(consumer);
if (stopped) {
log.info("Suspending the other consumer " + consumer);
}
}
}