监听重新分区事件?

Listening for repartition event?

我正在试验 Hazelcast 的分布式地图,以便基本上将元素分布到同一应用程序的多个实例中。这个想法是一个应用程序将首先启动,从而填充地图。当然,本地键集就是完整的地图。

当另一个实例加入集群时,映射将被重新分区,以便两个实例都将映射条目的大约一半(我假设)作为它们的本地键集。

Config hzConfig = new Config("hz");
HazelcastInstance hzInstance = HazelcastInstanceFactory.newHazelcastInstance(hzConfig);
IMap<Long, Long> test = hzInstance.getMap("test");
LocalListener listener = new LocalListener();
test.addLocalEntryListener(listener);
test.addPartitionLostListener(listener);

我认为重新分区会调用本地条目侦听器,但显然它不会调用任何其他侦听器。

具体的用例是,第一个应用程序填充地图,当其他应用程序加入时,条目散布在它们上面,并且对于本地密钥集中的每个条目,它们必须做一些事情。与其定期检查本地密钥集,我宁愿在本地密钥集发生 added/removed 等事件时进行检查。

但是,如果一个实例加入或离开集群,就会发生重新分区(我希望总是这样),但我似乎听不到那种违背目的的声音。

使用上面的配置和下面的侦听器,我有一个测试应用程序,每 10 秒将一个随机 long 放入映射中。

private static class LocalListener implements EntryAddedListener<Long, Long>, EntryRemovedListener<Long, Long>,
        EntryUpdatedListener<Long, Long>, EntryEvictedListener<Long, Long>, MapClearedListener, MapPartitionLostListener {
    @Override
    public void entryAdded(EntryEvent<Long, Long> event) {
        LOG.info("An entry was added to the local set: {}", event.getValue());
    }
    @Override
    public void entryRemoved(EntryEvent<Long, Long> event) {
        LOG.info("An entry was removed from the local set: {}", event.getValue());
    }
    @Override
    public void entryEvicted(EntryEvent<Long, Long> event) {
        LOG.info("An entry was evicted from the local set: {}", event.getValue());
    }
    @Override
    public void entryUpdated(EntryEvent<Long, Long> event) {
        LOG.info("An entry was updated in the local set: {}", event.getValue());
    }
    @Override
    public void mapCleared(MapEvent event) {
        LOG.info("The map was cleared: {}", event);
    }
    @Override
    public void partitionLost(MapPartitionLostEvent event) {
        LOG.info("A partition was lost: {}", event);
    }
}

第一个测试实例的输出:

15:43:47.718 [pool-1-thread-1] INFO com.example.playground.DistributedMapTests - Adding new entry -1012665372499231549
15:43:47.858 [hz.hz.event-4] INFO com.example.playground.DistributedMapTests - An entry was added to the local set: -1012665372499231549
15:43:57.716 [pool-1-thread-1] INFO com.example.playground.DistributedMapTests - Adding new entry -5501878816285329759
15:43:57.717 [hz.hz.event-1] INFO com.example.playground.DistributedMapTests - An entry was added to the local set: -5501878816285329759

然后我启动第二个实例,它加入集群。

实例 1 输出:

INFO: [172.20.20.7]:5701 [dev] [3.9.3] Re-partitioning cluster data... Migration queue size: 271
15:44:12.137 [hz.hz.event-4] INFO com.example.playground.DistributedMapTests - An entry was added to the local set: -642323604672752630
jan 10, 2019 3:44:12 PM com.hazelcast.internal.partition.impl.MigrationThread
INFO: [172.20.20.7]:5701 [dev] [3.9.3] All migration tasks have been completed, queues are empty.
15:44:17.716 [pool-1-thread-1] INFO com.example.playground.DistributedMapTests - Adding new entry -2929992218325845758
15:44:17.718 [hz.hz.event-2] INFO com.example.playground.DistributedMapTests - An entry was added to the local set: -2929992218325845758
15:44:27.716 [pool-1-thread-1] INFO com.example.playground.DistributedMapTests - Adding new entry -7717112084150209257
15:44:27.717 [hz.hz.event-3] INFO com.example.playground.DistributedMapTests - An entry was added to the local set: -7717112084150209257
15:44:37.716 [pool-1-thread-1] INFO com.example.playground.DistributedMapTests - Adding new entry -3756253634059275245
15:44:37.717 [hz.hz.event-3] INFO com.example.playground.DistributedMapTests - An entry was added to the local set: -3756253634059275245
15:44:47.716 [pool-1-thread-1] INFO com.example.playground.DistributedMapTests - Adding new entry 9175632974694161488

实例 2 输出:

15:44:12.131 [pool-1-thread-1] INFO com.example.playground.DistributedMapTests - Adding new entry -642323604672752630
15:44:22.130 [pool-1-thread-1] INFO com.example.playground.DistributedMapTests - Adding new entry -785281121378041075
15:44:22.136 [hz.hz.event-1] INFO com.example.playground.DistributedMapTests - An entry was added to the local set: -785281121378041075
15:44:32.130 [pool-1-thread-1] INFO com.example.playground.DistributedMapTests - Adding new entry 3465608643988715362
15:44:32.132 [hz.hz.event-1] INFO com.example.playground.DistributedMapTests - An entry was added to the local set: 3465608643988715362
15:44:42.131 [pool-1-thread-1] INFO com.example.playground.DistributedMapTests - Adding new entry 1474484225334222922
15:44:42.133 [hz.hz.event-1] INFO com.example.playground.DistributedMapTests - An entry was added to the local set: 1474484225334222922
15:44:47.719 [hz.hz.event-1] INFO com.example.playground.DistributedMapTests - An entry was added to the local set: 9175632974694161488
15:44:52.130 [pool-1-thread-1] INFO com.example.playground.DistributedMapTests - Adding new entry -4535267276695561636
15:44:52.131 [hz.hz.event-2] INFO com.example.playground.DistributedMapTests - An entry was added to the local set: -4535267276695561636

然后我关闭第二个实例以触发重新分区。

实例 1 输出:

INFO: [172.20.20.7]:5701 [dev] [3.9.3] Partition balance is ok, no need to re-partition cluster data... 
jan 10, 2019 3:45:03 PM com.hazelcast.spi.impl.operationservice.impl.InvocationMonitor
INFO: [172.20.20.7]:5701 [dev] [3.9.3] Invocations:1 timeouts:0 backup-timeouts:1
15:45:07.716 [pool-1-thread-1] INFO com.example.playground.DistributedMapTests - Adding new entry -4645280647407966219
15:45:07.716 [hz.hz.event-5] INFO com.example.playground.DistributedMapTests - An entry was added to the local set: -4645280647407966219

正如预期的那样,当应用程序单独时,它拥有所有条目,当另一个实例加入时,会发生重新分区,但是,第二个实例不知道它现在在本地键集中有更多元素,直到另一个发生了。

此外,当第二个实例离开时,由于某种原因没有重新分区,所以我不知道它在本地键集中的条目发生了什么。

所以,TL;DR:我想知道如何监听重新分区事件。也许 Hazelcast 中有这种东西的替代品?

更新:

从进一步的测试来看,尽管说分区平衡没问题,但离开集群的条目确实会返回到其他实例。我假设该消息意味着它不需要重新分发给不同的成员,因为只剩下一个。

听重新分区事件是错误的想法。当成员发生变化时会发生重新分区。监听成员变化和条目变化将涵盖所有需要的事件。

本地入口监听仅由用户操作触发。每当从集群中添加成员 to/removed 时,就会发生重新分区。如果您的集群大小仍然大于 1,那么迁移将会发生,您可以使用 migration listeners

OTOH 如果一个成员不优雅地离开了集群,那么剩下的成员将从他们自己的备份中恢复它的数据。这可能就是您所看到的。

但是我不明白为什么你需要监听迁移事件并根据它们采取行动。跟你的业务逻辑有关系吗?否则,您无需担心密钥的位置,只需使用密钥将任何业务逻辑发送到您的集群即可。如果我遗漏了什么,请详细说明。