使用 curator treeCache 时,如何确保缓存准备就绪?

When using curator treeCache, how can I ensure cache is ready?

使用 curator treeCache 时,如何确保缓存准备就绪?

cache.start()之后,如果我立即调用getCurrentData,它会return null,那么我如何确保缓存准备好?有人可以给我一个例子?谢谢

client = CuratorFrameworkFactory.builder()
             .connectString(connectionString)
             .retryPolicy(new ExponentialBackoffRetry(zkConnectionTimeoutMs, 3))
             .sessionTimeoutMs(zkSessionTimeoutMs)
             .build();
client.start();

cache = new TreeCache(client, rootPath);
cache.start();
ChildData child = cache.getCurrentData(rootPath); // child is null
Thread.sleep(50);   // must sleep for a while
child = cache.getCurrentData(rootPath); // child is ok

来自getCurrentChildren的代码

 public Map<String, ChildData> getCurrentChildren(String fullPath)
{
    TreeNode node = find(fullPath);
    if ( node == null || node.nodeState.get() != NodeState.LIVE )
    {
        return null;
    }
    ConcurrentMap<String, TreeNode> map = node.children.get();
    Map<String, ChildData> result;
    if ( map == null )
    {
        result = ImmutableMap.of();
    }
    else
    {
        ImmutableMap.Builder<String, ChildData> builder = ImmutableMap.builder();
        for ( Map.Entry<String, TreeNode> entry : map.entrySet() )
        {
            TreeNode childNode = entry.getValue();
            ChildData childData = new ChildData(childNode.path, childNode.stat.get(), childNode.data.get());
            // Double-check liveness after retreiving data.
            if ( childNode.nodeState.get() == NodeState.LIVE )
            {
                builder.put(entry.getKey(), childData);
            }
        }
        result = builder.build();
    }

    // Double-check liveness after retreiving children.
    return node.nodeState.get() == NodeState.LIVE ? result : null;
}

您可以看到,当 NodeState 为 PENDING 或 DEAD 或不存在时,它将 return null,当 NodeState 为 LIVE 时,它将 return 一个 Map 实例。因此,当 return 值不为空时,缓存已准备就绪。

您可以为 Treecache 添加一个侦听器并侦听 INITIALIZED 事件。

    Semaphore sem = new Semaphore(0);
    client = CuratorFrameworkFactory.builder()
                                 .connectString(connectionString)
                                 .retryPolicy(new ExponentialBackoffRetry(zkConnectionTimeoutMs, 3))
                                 .sessionTimeoutMs(zkSessionTimeoutMs)
                                 .build();
        client.start();

        cache = new TreeCache(client, rootPath);
                    cache.start();

        TreeCacheListener listener = new TreeCacheListener() {

                                    @Override
                                    public void childEvent(CuratorFramework client, TreeCacheEvent event)
                                            throws Exception {
                                        switch (event.getType()) {
                                        case INITIALIZED: {

                                          sem.release();

                                        }

                                    }

                                };
        cache.getListenable().addListener(listener);
       sem.acquire();
child = cache.getCurrentData(rootPath);