Hazelcast EventQueue 重载更新缓存区域 default-update-timestamps-region

Hazelcast EventQueue overloaded updating cache region default-update-timestamps-region

我最近添加了一个新的工作线程,它将队列排入不同的 tables,然后退出。虽然 运行,但我看到了很多关注日志消息;

2021-05-16 11:25:19.496  WARN 18 --- [Thread-1] com.hazelcast.spi.EventService           : [127.0.0.1]:5701 [dev] [3.12.6] EventQueue overloaded! TopicEvent{name='default-update-timestamps-region', publishTime=1621164319495, publisherAddress=[172.18.0.4]:5701} failed to publish to hz:impl:topicService:default-update-timestamps-region

我正在读取 from/writing 的 table 实体都没有被缓存,所以我想知道为什么缓存在这个线程上完全被刷新,更不用说它是如何吹的了这个 EventQueue 的限制?

我没有更改配置的默认值(使用 Hazelcast 3.12.6),所以我很困惑这怎么能这么快地补充这个缓存?

请参阅下面我的新服务的粗略伪代码;

private void processForever() {
    threadRef = Thread.currentThread();
    synchronized (syncObject) {
        //notify init that we're good to continue
        syncObject.notifyAll();
    }
    while (threadRef == Thread.currentThread()) {
        boolean foundWork = false;
        try {
            foundWork = process();
        } catch (Exception e) {
            log.debug("stack", e);
        }

        long sleep = foundWork ? 1000 : 60000;
        try {
            Thread.sleep(sleep);
        } catch (InterruptedException e) {

        }
    }
}

private boolean process() {
    try {
        // N.B this attempts to grab a shared lock on the current tenant and skips of already taken
        return dataGrid.runExclusiveForCurrentTenantOrSkip(LockName.PROCESS, this::processInternal).orElse(true);
    } catch (Exception ex) {
        log.error("error", ex);
        return true;
    }
}

private boolean processInternal() {
    Long maxSid = sourceQueueRepo.findMaxSid();
    if (maxSid == null) {
        return false;
    }

    Set<Worker> agents = workerRepo.findAllWorkers();
    queueWork(maxSid, agents);

    return true;
}

public void queueWork(Long maxId, Set<Worker> workers) {

    sourceQueueRepo.dedupeByMaxSid(maxId);

    List<SourceQueue> batch = sourceQueueRepo.findAllBySidLessThanEqual(maxId);
    Map<Long, List<SourceQueue>> batched = // Redacted

    for (Worker worker : workers) {
        // Method 'batchInsert' calls a save query (transactional)
        batchInsert(worker, batched.getOrDefault(Type.TYPE_1, new HashMap<>()));
        batchInsert(worker, batched.getOrDefault(Type.TYPE_2, new HashMap<>()));
        batchInsert(worker, batched.getOrDefault(Type.TYPE_3, new HashMap<>()));
    }


    sourceQueueRepo.deleteByMaxId(maxId);
}

N.B.

我查明问题出在 HazelcastLocalCacheRegionFactory 的使用上,我的新线程承受着很大的压力,并且缓存区域在传播事件时淹没了 EventQueue。

我选择不切换到使用 HazelcastCacheRegionFactory 对于我的场景,这对性能产生了不可接受的影响。

相反,对于 @Modifying 查询,我选择使用 NativeQuery 对象并指定我通过 addSynchronizedEntityClass.

使缓存项失效

例如;


return em.createNativeQuery("DELETE FROM dbo.SourceQueue WHERE id <= :id")
    .unwrap(NativeQuery.class)
    .setParameter("id", id)
    .addSynchronizedEntityClass(SourceQueue.class)
    .executeUpdate();