如何在应用程序部署在两个节点上时只读取一次文件
How to read file only once when app deployed on two nodes
我将逐行从 SFTP 位置读取文件:
@Override
public void configure() {
from(sftpLocationUrl)
.routeId("route-name")
.split(body().tokenize("\n"))
.streaming()
.bean(service, "build")
.to(String.format("activemq:%s", queueName));
}
但是这个应用程序将部署在两个节点上,我认为在这种情况下,我可以获得一个不稳定且不可预测的应用程序工作,因为文件的相同行可以被读取两次。
在这种情况下有没有办法避免这种重复?
解决方案是主动被动模式。 “在 active/passive 模式下,您有一个主实例轮询文件,而所有其他实例(从属)都是被动的。为了使这个策略起作用,必须使用某种锁定机制来确保只有持有锁的节点是主节点,而所有其他节点都处于备用状态。”
它可以用 hazelcast、consul 或 zookeper 实现
public class FileConsumerRoute extends RouteBuilder {
private int delay;
private String name;
public FileConsumerRoute(String name, int delay) {
this.name = name;
this.delay = delay;
}
@Override
public void configure() throws Exception {
// read files from the shared directory
from("file:target/inbox" +
"?delete=true")
// setup route policy to be used
.routePolicyRef("myPolicy")
.log(name + " - Received file: ${file:name}")
.delay(delay)
.log(name + " - Done file: ${file:name}")
.to("file:target/outbox");
}}
服务器栏
public class ServerBar {
private Main main;
public static void main(String[] args) throws Exception {
ServerBar bar = new ServerBar();
bar.boot();
}
public void boot() throws Exception {
// setup the hazelcast route policy
ConsulRoutePolicy routePolicy = new ConsulRoutePolicy();
// the service names must be same in the foo and bar server
routePolicy.setServiceName("myLock");
routePolicy.setTtl(5);
main = new Main();
// bind the hazelcast route policy to the name myPolicy which we refer to from the route
main.bind("myPolicy", routePolicy);
// add the route and and let the route be named Bar and use a little delay when processing the files
main.addRouteBuilder(new FileConsumerRoute("Bar", 100));
main.run();
}
}
服务器 Foo
public class ServerFoo {
private Main main;
public static void main(String[] args) throws Exception {
ServerFoo foo = new ServerFoo();
foo.boot();
}
public void boot() throws Exception {
// setup the hazelcast route policy
ConsulRoutePolicy routePolicy = new ConsulRoutePolicy();
// the service names must be same in the foo and bar server
routePolicy.setServiceName("myLock");
routePolicy.setTtl(5);
main = new Main();
// bind the hazelcast route policy to the name myPolicy which we refer to from the route
main.bind("myPolicy", routePolicy);
// add the route and and let the route be named Bar and use a little delay when processing the files
main.addRouteBuilder(new FileConsumerRoute("Foo", 100));
main.run();
}}
来源:骆驼实战第 2 版
Camel 有一些(实验性的)聚类 功能 - 请参阅 here。
在您的特定情况下,您可以建模一条在开始目录轮询时处于主导地位的路由,从而防止其他节点选择(相同或其他)文件。
我将逐行从 SFTP 位置读取文件:
@Override
public void configure() {
from(sftpLocationUrl)
.routeId("route-name")
.split(body().tokenize("\n"))
.streaming()
.bean(service, "build")
.to(String.format("activemq:%s", queueName));
}
但是这个应用程序将部署在两个节点上,我认为在这种情况下,我可以获得一个不稳定且不可预测的应用程序工作,因为文件的相同行可以被读取两次。 在这种情况下有没有办法避免这种重复?
解决方案是主动被动模式。 “在 active/passive 模式下,您有一个主实例轮询文件,而所有其他实例(从属)都是被动的。为了使这个策略起作用,必须使用某种锁定机制来确保只有持有锁的节点是主节点,而所有其他节点都处于备用状态。”
它可以用 hazelcast、consul 或 zookeper 实现
public class FileConsumerRoute extends RouteBuilder {
private int delay;
private String name;
public FileConsumerRoute(String name, int delay) {
this.name = name;
this.delay = delay;
}
@Override
public void configure() throws Exception {
// read files from the shared directory
from("file:target/inbox" +
"?delete=true")
// setup route policy to be used
.routePolicyRef("myPolicy")
.log(name + " - Received file: ${file:name}")
.delay(delay)
.log(name + " - Done file: ${file:name}")
.to("file:target/outbox");
}}
服务器栏
public class ServerBar {
private Main main;
public static void main(String[] args) throws Exception {
ServerBar bar = new ServerBar();
bar.boot();
}
public void boot() throws Exception {
// setup the hazelcast route policy
ConsulRoutePolicy routePolicy = new ConsulRoutePolicy();
// the service names must be same in the foo and bar server
routePolicy.setServiceName("myLock");
routePolicy.setTtl(5);
main = new Main();
// bind the hazelcast route policy to the name myPolicy which we refer to from the route
main.bind("myPolicy", routePolicy);
// add the route and and let the route be named Bar and use a little delay when processing the files
main.addRouteBuilder(new FileConsumerRoute("Bar", 100));
main.run();
}
}
服务器 Foo
public class ServerFoo {
private Main main;
public static void main(String[] args) throws Exception {
ServerFoo foo = new ServerFoo();
foo.boot();
}
public void boot() throws Exception {
// setup the hazelcast route policy
ConsulRoutePolicy routePolicy = new ConsulRoutePolicy();
// the service names must be same in the foo and bar server
routePolicy.setServiceName("myLock");
routePolicy.setTtl(5);
main = new Main();
// bind the hazelcast route policy to the name myPolicy which we refer to from the route
main.bind("myPolicy", routePolicy);
// add the route and and let the route be named Bar and use a little delay when processing the files
main.addRouteBuilder(new FileConsumerRoute("Foo", 100));
main.run();
}}
来源:骆驼实战第 2 版
Camel 有一些(实验性的)聚类 功能 - 请参阅 here。
在您的特定情况下,您可以建模一条在开始目录轮询时处于主导地位的路由,从而防止其他节点选择(相同或其他)文件。