Infinispan CacheEntryCreated 没有在 primaryOnly=true 的 Rebalance 上触发?

Infinispan CacheEntryCreated not firing on Rebalance for primaryOnly=true?

我们最近开始在嵌入式模式下使用 Infinispan 运行ning,我们目前使用的是版本 13.0.0。我们想要的是一种集群存储,它可以保证在任何给定时间只有一个进程在使用给定的缓存键。即使新节点加入我们的集群或旧节点离开,我们也希望这种保证成立。

我们认为可以做到这一点的方法之一是使用 @Listener(primaryOnly = true, observation = Listener.Observation.POST) 设置缓存侦听器,以激活我们希望成为 运行ning 的代码。这种技术适用于在我们的缓存中首次创建密钥。问题是,一旦存储密钥作为其主要位置的节点离开集群,新的主要所有者节点似乎没有触发任何事件,通知它现在已经从另一个节点“接管”了这个密钥。

我的问题是,是否有任何方法可以知道某个节点已成为密钥的主要所有者,因为之前的主要所有者节点已离开集群?

为了说明问题,我完成了以下项目:https://github.com/radiosphere/infinispan-test。您可以克隆它并签出标签 Whosebug-1 。完成此操作后,请执行以下操作:

  1. 打开三个终端windows和运行./start-server.sh 8080,./start-server.sh 8081./start-server.sh 8082.
  2. 运行 ./set-key.sh 8080 a 1
  3. 在其中一个终端 windows 中,您现在会看到 Entry created
  4. 终止您获得 Entry created 日志的进程。
  5. 现在我希望在另一个 window 中看到一个事件,但我没有看到任何新事件。

CacheEntryCreatedEvent仅在将键插入缓存时触发。将键从一个节点移动到另一个节点不会触发任何事件。

在任何情况下,节点首先作为备份所有者接收条目,然后才成为密钥的主要所有者,因此节点成为密钥的主要所有者在语义上与“创建”非常不同。

如果你听 TopologyChangeEvents 并遍历本地节点以前不是主要所有者但现在是主要所有者的所有条目,你可以得到你想要的:

@Listener(sync = false)
public class TopologyUpdateListener {
    @TopologyChanged
    public void onTopologyChange(TopologyChangedEvent<?, ?> e) throws InterruptedException {
       if (!e.isPre()) {
          Cache<?, ?> cache = e.getCache();
          Address localAddress = cache.getAdvancedCache().getRpcManager().getAddress();
          Set<Integer> oldPrimarySegments =
                e.getReadConsistentHashAtStart().getPrimarySegmentsForOwner(localAddress);
          Set<Integer> newPrimarySegments =
                e.getReadConsistentHashAtEnd().getPrimarySegmentsForOwner(localAddress);
          IntSet diff = IntSets.mutableCopyFrom(newPrimarySegments);
          diff.removeAll(oldPrimarySegments);
          cache.getAdvancedCache().withFlags(Flag.CACHE_MODE_LOCAL)
               .entrySet().stream()
               .filterKeySegments(diff)
               .forEach(entry -> { ... });
       }
    }
}

在内部 Infinispan 线程上同步侦听器 运行,因此它们永远不会阻塞。 stream() 方法确实会阻塞,因此为简单起见,我将侦听器设置为异步,但使用您自己的执行程序来更好地控制几乎总是一个好主意。

您也可以将 stream().filterSegments(diff) 替换为 localPublisher(diff) 并在不阻塞的情况下处理条目,但 CacheCollection.localPublisher() 方法是实验性的,我们计划在 14.0 中将其替换为其他方法。