骆驼:文件使用者组件 "bites off more than it can chew",管道因内存不足错误而死

Camel: File consumer component "bites off more than it can chew", pipeline dies from out-of-memory error

我在 Camel 中定义了一个路由,它是这样的:GET 请求进来,在文件系统中创建一个文件。文件使用者拾取它,从外部 Web 服务获取数据,并通过 POST 将生成的消息发送到其他 Web 服务。

下面的简化代码:

    // Update request goes on queue:
    from("restlet:http://localhost:9191/update?restletMethod=post")
    .routeId("Update via POST")
    [...some magic that defines a directory and file name based on request headers...]
    .to("file://cameldest/queue?allowNullBody=true&fileExist=Ignore")

    // Update gets processed
    from("file://cameldest/queue?delay=500&recursive=true&maxDepth=2&sortBy=file:parent;file:modified&preMove=inprogress&delete=true")
    .routeId("Update main route")
    .streamCaching() //otherwise stuff can't be sent to multiple endpoints
    [...enrich message from some web service using http4 component...]
    .multicast()
        .stopOnException()
        .to("direct:sendUpdate", "direct:dependencyCheck", "direct:saveXML")
    .end();

多播中的三个端点只是POST将生成的消息发送到其他网络服务。

当队列(即文件目录 cameldest)相当空时,这一切都工作得很好。文件正在 cameldest/<subdir> 中创建,由文件使用者拾取并移至 cameldest/<subdir>/inprogress,并且内容正在发送到三个传出的 POST 没问题。

但是,一旦传入请求堆积到大约 300,000 个文件,进度就会变慢,最终 管道会因内存不足错误 (超出 GC 开销限制)而失败。

通过增加日志记录,我可以看到文件消费者轮询基本上不会运行,因为它似乎每次都对它看到的所有文件负责,等待它们被完成处理,然后才开始另一轮轮询。除了(我假设)造成资源瓶颈之外,这也干扰了我的排序要求:一旦队列中塞满了数千条等待处理的消息,那么天真地排序更高的新消息 - 如果它们仍然被拾取- 仍在等待那些已经 "started".

的人

现在,我已经尝试了 maxMessagesPerPolleagerMaxMessagesPerPoll 选项。起初他们似乎缓解了这个问题,但经过几轮投票后,我仍然有数千个文件处于 "started" 不确定状态。

唯一可行的是使 delaymaxMessages... 的瓶颈变得如此狭窄,以至于平均处理速度比文件轮询周期更快。

显然,这不是我想要的。我希望我的管道尽可能快地处理文件,但不会更快。我原以为文件消费者会在路由繁忙时等待。

我犯了明显的错误吗?

(我 运行 在带有 XFS 的 Redhat 7 机器上安装了一个稍旧的 Camel 2.14.0,如果这是问题的一部分。)

尝试将来自文件端点的 maxMessagesPerPoll 设置为一个较低的值,以便每次轮询最多只拾取 X 个文件,这也限制了您将在 Camel 应用程序中拥有的飞行消息总数。

您可以在文件组件的 Camel 文档中找到有关该选项的更多信息

除非您真的需要将数据保存为文件,否则我会提出一个替代解决方案。

从您的 restlet 消费者,将每个请求发送到消息队列应用程序,例如 activemq 或 rabbitmq 或类似的东西。您很快就会在该队列中收到大量消息,但这没关系。

然后用队列消费者替换您的文件消费者。这将需要一些时间,但每条消息都应单独处理并发送到您想要的任何地方。我已经用大约 500 000 条消息测试了 rabbitmq,并且运行良好。这也应该减少消费者的负担。

简短的回答是没有答案:Camel 文件组件的 sortBy 选项内存效率太低,无法满足我的用例:

  • 唯一性:如果文件已经存在,我不想将其放入队列。
  • 优先级:应首先处理标记为高优先级的文件。
  • 性能:几十万个文件,甚至几百万个文件应该没有问题。
  • FIFO:(奖励)最旧的文件(按优先级)应该首先被拾取。

问题似乎是,如果我正确阅读 source code and the documentation,所有文件详细信息都在内存中以执行排序,无论是内置语言还是自定义可插入 sorter 用来。文件组件 always 创建一个包含所有详细信息的对象列表,这显然会在经常轮询许多文件时导致 疯狂 数量的垃圾收集开销。

我让我的用例正常工作,大部分情况下,无需使用数据库或编写自定义组件,使用以下步骤:

  • 从父目录 cameldest/queue 上的一个文件使用者移动到 两个使用者,每个目录一个,根本没有排序。
  • 设置来自/cameldest/queue/high/的消费者通过我的实际业务逻辑处理文件。
  • 设置来自 /cameldest/queue/low 的消费者以简单地将文件从 "low" 提升到 "high"(将它们复制过来,即 .to("file://cameldest/queue/high");
  • 重要的是,为了 仅在 high 不忙时从 "low" 提升到 "high" ,请将路由策略附加到 "high" 限制了另一条路由 ,即 "low" 如果 "high"
  • 中有任何正在传输的消息
  • 此外,我在 "high" 中添加了一个 ThrottlingInflightRoutePolicy 以防止它同时进行过多的交换。

想象一下,这就像在机场办理登机手续时,游客被邀请进入商业 class 车道,如果那是空的。

这在负载下就像一个魅力,即使在 "low" 中有数十万个文件排队,新消息(文件)直接放入 "high" 也能在几秒钟内得到处理。

此解决方案未涵盖的唯一要求是有序性:不能保证首先选取较旧的文件,而是随机选取它们。可以想象这样一种情况,稳定的传入文件流可能导致一个特定的文件 X 总是不走运,永远不会被拾取。不过,这种情况发生的可能性非常低。

可能的改进: 目前,允许/暂停将文件从 "low" 升级到 "high" 的阈值设置为 0 消息在 "high"。一方面,这 保证 放入 "high" 的文件将在执行来自 "low" 的另一次升级之前得到处理,另一方面它导致了一点停止启动模式,尤其是在多线程场景中。虽然不是真正的问题,但性能令人印象深刻。


来源:

我的路线定义:

    ThrottlingInflightRoutePolicy trp = new ThrottlingInflightRoutePolicy();
    trp.setMaxInflightExchanges(50);

    SuspendOtherRoutePolicy sorp = new SuspendOtherRoutePolicy("lowPriority");

    from("file://cameldest/queue/low?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
    .routeId("lowPriority")
    .log("Copying over to high priority: ${in.headers."+Exchange.FILE_PATH+"}")
    .to("file://cameldest/queue/high");

    from("file://cameldest/queue/high?delay=500&maxMessagesPerPoll=25&preMove=inprogress&delete=true")
    .routeId("highPriority")
    .routePolicy(trp)
    .routePolicy(sorp)
    .threads(20)
    .log("Before: ${in.headers."+Exchange.FILE_PATH+"}")
    .delay(2000) // This is where business logic would happen
    .log("After: ${in.headers."+Exchange.FILE_PATH+"}")
    .stop();

我的 SuspendOtherRoutePolicy,结构松散,像 ThrottlingInflightRoutePolicy

public class SuspendOtherRoutePolicy extends RoutePolicySupport implements CamelContextAware {

    private CamelContext camelContext;
    private final Lock lock = new ReentrantLock();
    private String otherRouteId;

    public SuspendOtherRoutePolicy(String otherRouteId) {
        super();
        this.otherRouteId = otherRouteId;
    }

    @Override
    public CamelContext getCamelContext() {
        return camelContext;
    }

    @Override
    public void onStart(Route route) {
        super.onStart(route);
        if (camelContext.getRoute(otherRouteId) == null) {
            throw new IllegalArgumentException("There is no route with the id '" + otherRouteId + "'");
        }
    }

    @Override
    public void setCamelContext(CamelContext context) {
        camelContext = context;
    }

    @Override
    public void onExchangeDone(Route route, Exchange exchange) {
        //log.info("Exchange done on route " + route);
        Route otherRoute = camelContext.getRoute(otherRouteId);
        //log.info("Other route: " + otherRoute);
        throttle(route, otherRoute, exchange);
    }

    protected void throttle(Route route, Route otherRoute, Exchange exchange) {
        // this works the best when this logic is executed when the exchange is done
        Consumer consumer = otherRoute.getConsumer();

        int size = getSize(route, exchange);
        boolean stop = size > 0;
        if (stop) {
            try {
                lock.lock();
                stopConsumer(size, consumer);
            } catch (Exception e) {
                handleException(e);
            } finally {
                lock.unlock();
            }
        }

        // reload size in case a race condition with too many at once being invoked
        // so we need to ensure that we read the most current size and start the consumer if we are already to low
        size = getSize(route, exchange);
        boolean start = size == 0;
        if (start) {
            try {
                lock.lock();
                startConsumer(size, consumer);
            } catch (Exception e) {
                handleException(e);
            } finally {
                lock.unlock();
            }
        }
    }

    private int getSize(Route route, Exchange exchange) {
        return exchange.getContext().getInflightRepository().size(route.getId());
    }

    private void startConsumer(int size, Consumer consumer) throws Exception {
        boolean started = super.startConsumer(consumer);
        if (started) {
            log.info("Resuming the other consumer " + consumer);
        }
    }

    private void stopConsumer(int size, Consumer consumer) throws Exception {
        boolean stopped = super.stopConsumer(consumer);
        if (stopped) {
            log.info("Suspending the other consumer " + consumer);
        }
    }
}