当集群中的应用程序 运行 相同的 IScheduledExecutorService 时,如何防止重复任务?

How to prevent duplicate tasks when run same IScheduledExecutorService on apps in cluster?

我想了解 IScheduledExecutorService 的 hazelcast 方法之间的区别,以防止重复任务。 我有两个带有 HazelcastInstance 的 java 应用程序。我分别拥有带有两个 HazelcastInstances(服务器)的 hazelcast 集群。 我使用 IMap 并想在每个午夜重置 AtomicLong。

    config.getScheduledExecutorConfig("my scheduler")
            .setPoolSize(16)
            .setCapacity(100)
            .setDurability(1);


class DelayedResetTask implements Runnable, HazelcastInstanceAware, Serializable {

    static final long serialVersionUID = -7588380448693010399L;

    private transient HazelcastInstance client;

    @Override
    public void run() {
        final IMap<Long, AtomicLong> map = client.getMap(HazelcastConfiguration.mapName);

        final ILogger logger = client.getLoggingService().getLogger(HazelcastInstance.class);
        logger.info("Show data in cache before reset: " + map.entrySet());
        map.keySet().forEach(key -> map.put(key, new AtomicLong(0)));
        logger.info("Data was reseted: " + map.entrySet());
    }

    @Override
    public void setHazelcastInstance(HazelcastInstance hazelcastInstance) { this.client = hazelcastInstance; }
}

private void resetAtMidnight() {
    final Long midnight = LocalDateTime.now().until(LocalDate.now().plusDays(1).atStartOfDay(), ChronoUnit.MINUTES);

    executor.scheduleAtFixedRate(new DelayedResetTask(), midnight, TimeUnit.DAYS.toMinutes(1), TimeUnit.MINUTES);
}

我不想在每个实例上并行执行此任务。阅读文档后 documentation 我不明白如何在两台服务器上一步执行重置(没有重复任务,没有同时在两台服务器上执行)。
我可以使用什么方法来完成我的任务 scheduleOnAllMembersAtFixedRatescheduleAtFixedRatescheduleOnMembersAtFixedRate。 运行 集群中的应用程序上有相同的 IScheduledExecutorService 时如何防止重复任务?

您只需在集群中 运行 您的代码一次,因为您正在重置的地图可以从任何成员访问。两个成员都访问同一个地图实例,只是条目保存在不同的成员中。您可以使用 scheduleAtFixedRate 到 运行 一次。

另外,您不需要调用IMap#keySet().forEach()来遍历地图中的所有条目。相反,您可以使用 EntryProcessor,如下所示:

    public static class DelayedResetTask implements Runnable, HazelcastInstanceAware, Serializable {

    static final long serialVersionUID = -7588380448693010399L;

    private transient HazelcastInstance client;

    @Override
    public void run() {
        final IMap<Long, AtomicLong> map = client.getMap(HazelcastConfiguration.mapName);

        final ILogger logger = client.getLoggingService().getLogger(HazelcastInstance.class);
        logger.info("Show data in cache before reset: " + map.entrySet());
        map.executeOnEntries(new AbstractEntryProcessor() {
            @Override
            public Object process(Map.Entry entry) {
                entry.setValue(new AtomicLong(0));
                return null;
            }
        });
        logger.info("Data was reseted: " + map.entrySet());
    }

    @Override
    public void setHazelcastInstance(HazelcastInstance hazelcastInstance) { this.client = hazelcastInstance; }