为什么 Apache Curator 不触发所有更新?
Why doesn't Apache Curator fire all updates?
创建空白 /test/a
路径后,请运行 针对您的 Zookeeper 服务器执行以下操作。
import static java.lang.String.valueOf;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryForever;
public class CacheUpdateTest {
static final String connectString = "127.0.0.1:2181,127.0.0.1:2191,127.0.0.1:2201";
static volatile boolean stop = false;
public static void main(String[] args) throws Exception {
new Listener().start();
Thread.sleep(1000);
new Updater().start();
}
private static class Listener extends Thread {
@SuppressWarnings("resource")
@Override
public void run() {
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectString).retryPolicy(new RetryForever(100)).build();
client.start();
PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if (event.getData() == null || event.getData().getData() == null) return;
int newI = Integer.parseInt(new String(event.getData().getData()));
System.err.println("Sensed update: " + newI);
}
});
try {
cache.start(StartMode.BUILD_INITIAL_CACHE);
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static class Updater extends Thread {
@Override
public void run() {
try {
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectString).retryPolicy(new RetryForever(100)).build();
client.start();
for (int i = 0; i < 10; i++) {
// Thread.sleep(100);
System.out.println("Updated child: " + i);
client.setData().forPath("/test/a", valueOf(i).getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
如果我取消注释 Thread.sleep(100) 行,我通常会得到以下输出
Updated child: 0
Sensed update: 0
Updated child: 1
Sensed update: 1
Updated child: 2
Sensed update: 2
Updated child: 3
Sensed update: 3
Updated child: 4
Sensed update: 4
Updated child: 5
Sensed update: 5
Updated child: 6
Sensed update: 6
Updated child: 7
Sensed update: 7
Updated child: 8
Sensed update: 8
Updated child: 9
Sensed update: 9
我在评论时得到以下输出
Updated child: 0
Updated child: 1
Sensed update: 1 --> Missed 0
Updated child: 2
Updated child: 3
Updated child: 4
Sensed update: 3 --> Missed 2
Updated child: 5
Updated child: 6
Sensed update: 5 --> Missed 4
Updated child: 7
Updated child: 8
Sensed update: 7 --> Missed 6
Updated child: 9
Sensed update: 9 --> Missed 8
为什么我不是总能收到所有通知?为什么我没有错过第一个?
Curator 是一个库,旨在简化 Apache Zookeeper 的使用。
PathChildrenCache
的工作方式是使用 ZK Watchers。
一个 Watcher 创建了一个一次性的 Watch。如果 Watcher 收到有关更改(或它订阅的任何其他操作)的通知,则该 Watch 将被该 Watcher 使用,并且它必须再次创建一个新的 Watch 才能在将来继续收到通知。
在您的例子中,PathChildrenCache
正在查找节点中的更改。它的工作方式是等到它收到来自 ZK 的通知并重新创建 Watch 以继续寻找进一步的变化。
由于一切都是异步的,因此在您收到更改通知之前,数据可能已更改多次。这就是为什么当您在 Updater 中设置延迟时您可以看到所有更改,因为缓存有足够的时间检测更改并在调用新的 setData
之前重新创建 Watch。当您省略睡眠时,事情发生得如此之快以至于缓存错过了一些事件。
如需进一步阅读,请查看官方 documentation 关于观察者的内容,主要是此部分:
Because watches are one time triggers and there is latency between getting the event and sending a new request to get a watch you cannot reliably see every change that happens to a node in ZooKeeper. Be prepared to handle the case where the znode changes multiple times between getting the event and setting the watch again. (You may not care, but at least realize it may happen.)
创建空白 /test/a
路径后,请运行 针对您的 Zookeeper 服务器执行以下操作。
import static java.lang.String.valueOf;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.RetryForever;
public class CacheUpdateTest {
static final String connectString = "127.0.0.1:2181,127.0.0.1:2191,127.0.0.1:2201";
static volatile boolean stop = false;
public static void main(String[] args) throws Exception {
new Listener().start();
Thread.sleep(1000);
new Updater().start();
}
private static class Listener extends Thread {
@SuppressWarnings("resource")
@Override
public void run() {
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectString).retryPolicy(new RetryForever(100)).build();
client.start();
PathChildrenCache cache = new PathChildrenCache(client, "/test", true);
cache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if (event.getData() == null || event.getData().getData() == null) return;
int newI = Integer.parseInt(new String(event.getData().getData()));
System.err.println("Sensed update: " + newI);
}
});
try {
cache.start(StartMode.BUILD_INITIAL_CACHE);
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static class Updater extends Thread {
@Override
public void run() {
try {
CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectString).retryPolicy(new RetryForever(100)).build();
client.start();
for (int i = 0; i < 10; i++) {
// Thread.sleep(100);
System.out.println("Updated child: " + i);
client.setData().forPath("/test/a", valueOf(i).getBytes());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
如果我取消注释 Thread.sleep(100) 行,我通常会得到以下输出
Updated child: 0
Sensed update: 0
Updated child: 1
Sensed update: 1
Updated child: 2
Sensed update: 2
Updated child: 3
Sensed update: 3
Updated child: 4
Sensed update: 4
Updated child: 5
Sensed update: 5
Updated child: 6
Sensed update: 6
Updated child: 7
Sensed update: 7
Updated child: 8
Sensed update: 8
Updated child: 9
Sensed update: 9
我在评论时得到以下输出
Updated child: 0
Updated child: 1
Sensed update: 1 --> Missed 0
Updated child: 2
Updated child: 3
Updated child: 4
Sensed update: 3 --> Missed 2
Updated child: 5
Updated child: 6
Sensed update: 5 --> Missed 4
Updated child: 7
Updated child: 8
Sensed update: 7 --> Missed 6
Updated child: 9
Sensed update: 9 --> Missed 8
为什么我不是总能收到所有通知?为什么我没有错过第一个?
Curator 是一个库,旨在简化 Apache Zookeeper 的使用。
PathChildrenCache
的工作方式是使用 ZK Watchers。
一个 Watcher 创建了一个一次性的 Watch。如果 Watcher 收到有关更改(或它订阅的任何其他操作)的通知,则该 Watch 将被该 Watcher 使用,并且它必须再次创建一个新的 Watch 才能在将来继续收到通知。
在您的例子中,PathChildrenCache
正在查找节点中的更改。它的工作方式是等到它收到来自 ZK 的通知并重新创建 Watch 以继续寻找进一步的变化。
由于一切都是异步的,因此在您收到更改通知之前,数据可能已更改多次。这就是为什么当您在 Updater 中设置延迟时您可以看到所有更改,因为缓存有足够的时间检测更改并在调用新的 setData
之前重新创建 Watch。当您省略睡眠时,事情发生得如此之快以至于缓存错过了一些事件。
如需进一步阅读,请查看官方 documentation 关于观察者的内容,主要是此部分:
Because watches are one time triggers and there is latency between getting the event and sending a new request to get a watch you cannot reliably see every change that happens to a node in ZooKeeper. Be prepared to handle the case where the znode changes multiple times between getting the event and setting the watch again. (You may not care, but at least realize it may happen.)