Apache Curator + Spring Boot:简单观察者模式示例

Apache Curator + Spring Boot: Simple Observer pattern example

我正在尝试启动一个基本的项目结构,其中多个 spring 启动应用程序将使用 apache 管理器共享资源。

我遵循文档中指定的指南,但更改节点不会触发任何事件

请提供任何帮助,我们将不胜感激

pom.xml

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

docker-compose.yaml

version: '3.1'

services:
  zoo1:
    image: zookeeper
    restart: always
    hostname: zoo1
    ports:
      - 2181:2181
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=0.0.0.0:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo2:
    image: zookeeper
    restart: always
    hostname: zoo2
    ports:
      - 2182:2181
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=0.0.0.0:2888:3888;2181 server.3=zoo3:2888:3888;2181

  zoo3:
    image: zookeeper
    restart: always
    hostname: zoo3
    ports:
      - 2183:2181
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zoo1:2888:3888;2181 server.2=zoo2:2888:3888;2181 server.3=0.0.0.0:2888:3888;2181

创作者

package com.training.zoo.sss;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.springframework.stereotype.Service;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static java.lang.System.out;

@Service
public class Client {
    String connectionInfo = "127.0.0.1:2181";
    String ZK_PATH = "/someapp/somemodule/someroute";

    public Client() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectionInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .namespace("base")
                        .build();
        client.start();

        Stat stat1 = client.checkExists().creatingParentContainersIfNeeded().forPath(ZK_PATH);
        if (stat1 == null) {
            client.create().forPath(ZK_PATH, "sometdata".getBytes());
        }

        byte[] bytes = client.getData().forPath(ZK_PATH);
        out.println(new String(bytes, StandardCharsets.UTF_8));

        // Update value every half second
        final AtomicInteger i = new AtomicInteger(0);
        ScheduledExecutorService exec = Executors.newScheduledThreadPool(1);
        exec.scheduleAtFixedRate(new Runnable(){
            @Override
            public void run(){
                i.set(i.get()+1);
                System.out.println(i);
                try {
                    client.setData().forPath(ZK_PATH, ("init_" + i ).getBytes());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
}

听众

package com.training.bookstore.request;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.stereotype.Service;

@Service
public class Watcher2 {
    String connectionInfo = "127.0.0.1:2181";
    String ZK_PATH = "/someapp/somemodule/someroute";

    public Watcher2() throws Exception {
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
                CuratorFrameworkFactory.builder()
                        .connectString(connectionInfo)
                        .sessionTimeoutMs(5000)
                        .connectionTimeoutMs(5000)
                        .retryPolicy(retryPolicy)
                        .namespace("base")
                        .build();
        client.start();

        PathChildrenCache watcher = new PathChildrenCache(
                client, ZK_PATH, true    // if cache data
        );

        watcher.getListenable().addListener((client1, event) -> {
            ChildData data = event.getData();
            if (data == null) {
                System.out.println("No data in event[" + event + "]");
            } else {
                System.out.println("Receive event: "
                        + "type=[" + event.getType() + "]"
                        + ", path=[" + data.getPath() + "]"
                        + ", data=[" + new String(data.getData()) + "]"
                        + ", stat=[" + data.getStat() + "]");
            }
        });
        watcher.start(PathChildrenCache.StartMode.NORMAL);
        System.out.println("Register zk watcher successfully!");
    }
}

谢谢

是的,class 名称 PathChildrenCache 是一个死的赠品。 听起来很奇怪https://www.youtube.com/watch?v=nZcRU0Op5P4

如果我要发布到 /path1/path2 我正在听路径 /path1/path2 我真的在听 path1path2 吗? 剧透警报:您正在收听 path2 这是一个文件夹,而不是您认为自己创建的节点

解决方法是 如果生产者在指定路径上生产

    String connectionInfo = "127.0.0.1:2181";
    String PATH = "/someapp/somemodule/whatever";

在 Watcher class 中将路径设置为该节点的“父节点”

    String connectionInfo = "127.0.0.1:2181";
    String PATH = "/someapp/somemodule";

如果您需要收听制作人路径的 subnodes/subfolders, 而不是使用 PathChildrenCache 使用 TreeCache