Apache Curator + Spring Boot:简单观察者模式示例
Apache Curator + Spring Boot: Simple Observer pattern example
我正在尝试启动一个基本的项目结构,其中多个 spring 启动应用程序将使用 apache 管理器共享资源。
我遵循文档中指定的指南,但更改节点不会触发任何事件
请提供任何帮助,我们将不胜感激
pom.xml
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
docker-compose.yaml
version: '3.1'
services:
zoo1:
image: zookeeper
restart: always
hostname: zoo1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
zoo2:
image: zookeeper
restart: always
hostname: zoo2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181
zoo3:
image: zookeeper
restart: always
hostname: zoo3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
创作者
package com.training.zoo.sss;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.System.out;
@Service
public class Client {
String connectionInfo = "127.0.0.1:2181";
String ZK_PATH = "/someapp/somemodule/someroute";
public Client() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();
Stat stat1 = client.checkExists().creatingParentContainersIfNeeded().forPath(ZK_PATH);
if (stat1 == null) {
client.create().forPath(ZK_PATH, "sometdata".getBytes());
}
byte[] bytes = client.getData().forPath(ZK_PATH);
out.println(new String(bytes, StandardCharsets.UTF_8));
// Update value every half second
final AtomicInteger i = new AtomicInteger(0);
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1);
exec.scheduleAtFixedRate(new Runnable(){
@Override
public void run(){
i.set(i.get()+1);
System.out.println(i);
try {
client.setData().forPath(ZK_PATH, ("init_" + i ).getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}, 0, 500, TimeUnit.MILLISECONDS);
}
}
听众
package com.training.bookstore.request;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.stereotype.Service;
@Service
public class Watcher2 {
String connectionInfo = "127.0.0.1:2181";
String ZK_PATH = "/someapp/somemodule/someroute";
public Watcher2() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();
PathChildrenCache watcher = new PathChildrenCache(
client, ZK_PATH, true // if cache data
);
watcher.getListenable().addListener((client1, event) -> {
ChildData data = event.getData();
if (data == null) {
System.out.println("No data in event[" + event + "]");
} else {
System.out.println("Receive event: "
+ "type=[" + event.getType() + "]"
+ ", path=[" + data.getPath() + "]"
+ ", data=[" + new String(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
});
watcher.start(PathChildrenCache.StartMode.NORMAL);
System.out.println("Register zk watcher successfully!");
}
}
谢谢
是的,class 名称 PathChildrenCache 是一个死的赠品。
听起来很奇怪https://www.youtube.com/watch?v=nZcRU0Op5P4
如果我要发布到 /path1/path2
我正在听路径 /path1/path2
我真的在听 path1
或 path2
吗?
剧透警报:您正在收听 path2
这是一个文件夹,而不是您认为自己创建的节点
解决方法是
如果生产者在指定路径上生产
String connectionInfo = "127.0.0.1:2181";
String PATH = "/someapp/somemodule/whatever";
在 Watcher class 中将路径设置为该节点的“父节点”
String connectionInfo = "127.0.0.1:2181";
String PATH = "/someapp/somemodule";
如果您需要收听制作人路径的 subnodes/subfolders,
而不是使用 PathChildrenCache
使用 TreeCache
我正在尝试启动一个基本的项目结构,其中多个 spring 启动应用程序将使用 apache 管理器共享资源。
我遵循文档中指定的指南,但更改节点不会触发任何事件
请提供任何帮助,我们将不胜感激
pom.xml
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
docker-compose.yaml
version: '3.1'
services:
zoo1:
image: zookeeper
restart: always
hostname: zoo1
ports:
- 2181:2181
environment:
ZOO_MY_ID: 1
ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181
zoo2:
image: zookeeper
restart: always
hostname: zoo2
ports:
- 2182:2181
environment:
ZOO_MY_ID: 2
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181
zoo3:
image: zookeeper
restart: always
hostname: zoo3
ports:
- 2183:2181
environment:
ZOO_MY_ID: 3
ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181
创作者
package com.training.zoo.sss;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static java.lang.System.out;
@Service
public class Client {
String connectionInfo = "127.0.0.1:2181";
String ZK_PATH = "/someapp/somemodule/someroute";
public Client() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();
Stat stat1 = client.checkExists().creatingParentContainersIfNeeded().forPath(ZK_PATH);
if (stat1 == null) {
client.create().forPath(ZK_PATH, "sometdata".getBytes());
}
byte[] bytes = client.getData().forPath(ZK_PATH);
out.println(new String(bytes, StandardCharsets.UTF_8));
// Update value every half second
final AtomicInteger i = new AtomicInteger(0);
ScheduledExecutorService exec = Executors.newScheduledThreadPool(1);
exec.scheduleAtFixedRate(new Runnable(){
@Override
public void run(){
i.set(i.get()+1);
System.out.println(i);
try {
client.setData().forPath(ZK_PATH, ("init_" + i ).getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}, 0, 500, TimeUnit.MILLISECONDS);
}
}
听众
package com.training.bookstore.request;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.stereotype.Service;
@Service
public class Watcher2 {
String connectionInfo = "127.0.0.1:2181";
String ZK_PATH = "/someapp/somemodule/someroute";
public Watcher2() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.builder()
.connectString(connectionInfo)
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();
PathChildrenCache watcher = new PathChildrenCache(
client, ZK_PATH, true // if cache data
);
watcher.getListenable().addListener((client1, event) -> {
ChildData data = event.getData();
if (data == null) {
System.out.println("No data in event[" + event + "]");
} else {
System.out.println("Receive event: "
+ "type=[" + event.getType() + "]"
+ ", path=[" + data.getPath() + "]"
+ ", data=[" + new String(data.getData()) + "]"
+ ", stat=[" + data.getStat() + "]");
}
});
watcher.start(PathChildrenCache.StartMode.NORMAL);
System.out.println("Register zk watcher successfully!");
}
}
谢谢
是的,class 名称 PathChildrenCache 是一个死的赠品。 听起来很奇怪https://www.youtube.com/watch?v=nZcRU0Op5P4
如果我要发布到 /path1/path2
我正在听路径 /path1/path2
我真的在听 path1
或 path2
吗?
剧透警报:您正在收听 path2
这是一个文件夹,而不是您认为自己创建的节点
解决方法是 如果生产者在指定路径上生产
String connectionInfo = "127.0.0.1:2181";
String PATH = "/someapp/somemodule/whatever";
在 Watcher class 中将路径设置为该节点的“父节点”
String connectionInfo = "127.0.0.1:2181";
String PATH = "/someapp/somemodule";
如果您需要收听制作人路径的 subnodes/subfolders,
而不是使用 PathChildrenCache
使用 TreeCache