如何在应用程序部署在两个节点上时只读取一次文件

How to read file only once when app deployed on two nodes

我将逐行从 SFTP 位置读取文件:

@Override
public void configure() {
    from(sftpLocationUrl)
            .routeId("route-name")
            .split(body().tokenize("\n"))
            .streaming()
            .bean(service, "build")
            .to(String.format("activemq:%s", queueName));
}

但是这个应用程序将部署在两个节点上,我认为在这种情况下,我可以获得一个不稳定且不可预测的应用程序工作,因为文件的相同行可以被读取两次。 在这种情况下有没有办法避免这种重复?

解决方案是主动被动模式。 “在 active/passive 模式下,您有一个主实例轮询文件,而所有其他实例(从属)都是被动的。为了使这个策略起作用,必须使用某种锁定机制来确保只有持有锁的节点是主节点,而所有其他节点都处于备用状态。”

它可以用 hazelcast、consul 或 zookeper 实现

public class FileConsumerRoute extends RouteBuilder {

private int delay;
private String name;

public FileConsumerRoute(String name, int delay) {
    this.name = name;
    this.delay = delay;
}

@Override
public void configure() throws Exception {
    // read files from the shared directory
    from("file:target/inbox" +
            "?delete=true")
        // setup route policy to be used
        .routePolicyRef("myPolicy")
        .log(name + " - Received file: ${file:name}")
        .delay(delay)
        .log(name + " - Done file:     ${file:name}")
        .to("file:target/outbox");
}}

服务器栏

public class ServerBar {

private Main main;

public static void main(String[] args) throws Exception {
    ServerBar bar = new ServerBar();
    bar.boot();
}

public void boot() throws Exception {
    // setup the hazelcast route policy
    ConsulRoutePolicy routePolicy = new ConsulRoutePolicy();
    // the service names must be same in the foo and bar server
    routePolicy.setServiceName("myLock");
    routePolicy.setTtl(5);

    main = new Main();
    // bind the hazelcast route policy to the name myPolicy which we refer to from the route
    main.bind("myPolicy", routePolicy);
    // add the route and and let the route be named Bar and use a little delay when processing the files
    main.addRouteBuilder(new FileConsumerRoute("Bar", 100));
    main.run();
}

}

服务器 Foo

public class ServerFoo {

private Main main;

public static void main(String[] args) throws Exception {
    ServerFoo foo = new ServerFoo();
    foo.boot();
}

public void boot() throws Exception {
    // setup the hazelcast route policy
    ConsulRoutePolicy routePolicy = new ConsulRoutePolicy();
    // the service names must be same in the foo and bar server
    routePolicy.setServiceName("myLock");
    routePolicy.setTtl(5);

    main = new Main();
    // bind the hazelcast route policy to the name myPolicy which we refer to from the route
    main.bind("myPolicy", routePolicy);
    // add the route and and let the route be named Bar and use a little delay when processing the files
    main.addRouteBuilder(new FileConsumerRoute("Foo", 100));
    main.run();
}}

来源:骆驼实战第 2 版

Camel 有一些(实验性的)聚类 功能 - 请参阅 here

在您的特定情况下,您可以建模一条在开始目录轮询时处于主导地位的路由,从而防止其他节点选择(相同或其他)文件。