Hazelcast EventQueue 重载更新缓存区域 default-update-timestamps-region
Hazelcast EventQueue overloaded updating cache region default-update-timestamps-region
我最近添加了一个新的工作线程,它将队列排入不同的 tables,然后退出。虽然 运行,但我看到了很多关注日志消息;
2021-05-16 11:25:19.496 WARN 18 --- [Thread-1] com.hazelcast.spi.EventService : [127.0.0.1]:5701 [dev] [3.12.6] EventQueue overloaded! TopicEvent{name='default-update-timestamps-region', publishTime=1621164319495, publisherAddress=[172.18.0.4]:5701} failed to publish to hz:impl:topicService:default-update-timestamps-region
我正在读取 from/writing 的 table 实体都没有被缓存,所以我想知道为什么缓存在这个线程上完全被刷新,更不用说它是如何吹的了这个 EventQueue 的限制?
我没有更改配置的默认值(使用 Hazelcast 3.12.6),所以我很困惑这怎么能这么快地补充这个缓存?
请参阅下面我的新服务的粗略伪代码;
private void processForever() {
threadRef = Thread.currentThread();
synchronized (syncObject) {
//notify init that we're good to continue
syncObject.notifyAll();
}
while (threadRef == Thread.currentThread()) {
boolean foundWork = false;
try {
foundWork = process();
} catch (Exception e) {
log.debug("stack", e);
}
long sleep = foundWork ? 1000 : 60000;
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
}
}
}
private boolean process() {
try {
// N.B this attempts to grab a shared lock on the current tenant and skips of already taken
return dataGrid.runExclusiveForCurrentTenantOrSkip(LockName.PROCESS, this::processInternal).orElse(true);
} catch (Exception ex) {
log.error("error", ex);
return true;
}
}
private boolean processInternal() {
Long maxSid = sourceQueueRepo.findMaxSid();
if (maxSid == null) {
return false;
}
Set<Worker> agents = workerRepo.findAllWorkers();
queueWork(maxSid, agents);
return true;
}
public void queueWork(Long maxId, Set<Worker> workers) {
sourceQueueRepo.dedupeByMaxSid(maxId);
List<SourceQueue> batch = sourceQueueRepo.findAllBySidLessThanEqual(maxId);
Map<Long, List<SourceQueue>> batched = // Redacted
for (Worker worker : workers) {
// Method 'batchInsert' calls a save query (transactional)
batchInsert(worker, batched.getOrDefault(Type.TYPE_1, new HashMap<>()));
batchInsert(worker, batched.getOrDefault(Type.TYPE_2, new HashMap<>()));
batchInsert(worker, batched.getOrDefault(Type.TYPE_3, new HashMap<>()));
}
sourceQueueRepo.deleteByMaxId(maxId);
}
N.B.
- 每个查询都是事务性的,目的是保持数据库事务简短,因为目标上的其他线程会争用 table。
- 插入此队列的代码在此新线程上调用中断,以确保其耗尽新队列。有多个线程调用它,因此在重负载下停机时间非常少。
我查明问题出在 HazelcastLocalCacheRegionFactory 的使用上,我的新线程承受着很大的压力,并且缓存区域在传播事件时淹没了 EventQueue。
我选择不切换到使用 HazelcastCacheRegionFactory 对于我的场景,这对性能产生了不可接受的影响。
相反,对于 @Modifying
查询,我选择使用 NativeQuery 对象并指定我通过 addSynchronizedEntityClass
.
使缓存项失效
例如;
return em.createNativeQuery("DELETE FROM dbo.SourceQueue WHERE id <= :id")
.unwrap(NativeQuery.class)
.setParameter("id", id)
.addSynchronizedEntityClass(SourceQueue.class)
.executeUpdate();
我最近添加了一个新的工作线程,它将队列排入不同的 tables,然后退出。虽然 运行,但我看到了很多关注日志消息;
2021-05-16 11:25:19.496 WARN 18 --- [Thread-1] com.hazelcast.spi.EventService : [127.0.0.1]:5701 [dev] [3.12.6] EventQueue overloaded! TopicEvent{name='default-update-timestamps-region', publishTime=1621164319495, publisherAddress=[172.18.0.4]:5701} failed to publish to hz:impl:topicService:default-update-timestamps-region
我正在读取 from/writing 的 table 实体都没有被缓存,所以我想知道为什么缓存在这个线程上完全被刷新,更不用说它是如何吹的了这个 EventQueue 的限制?
我没有更改配置的默认值(使用 Hazelcast 3.12.6),所以我很困惑这怎么能这么快地补充这个缓存?
请参阅下面我的新服务的粗略伪代码;
private void processForever() {
threadRef = Thread.currentThread();
synchronized (syncObject) {
//notify init that we're good to continue
syncObject.notifyAll();
}
while (threadRef == Thread.currentThread()) {
boolean foundWork = false;
try {
foundWork = process();
} catch (Exception e) {
log.debug("stack", e);
}
long sleep = foundWork ? 1000 : 60000;
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
}
}
}
private boolean process() {
try {
// N.B this attempts to grab a shared lock on the current tenant and skips of already taken
return dataGrid.runExclusiveForCurrentTenantOrSkip(LockName.PROCESS, this::processInternal).orElse(true);
} catch (Exception ex) {
log.error("error", ex);
return true;
}
}
private boolean processInternal() {
Long maxSid = sourceQueueRepo.findMaxSid();
if (maxSid == null) {
return false;
}
Set<Worker> agents = workerRepo.findAllWorkers();
queueWork(maxSid, agents);
return true;
}
public void queueWork(Long maxId, Set<Worker> workers) {
sourceQueueRepo.dedupeByMaxSid(maxId);
List<SourceQueue> batch = sourceQueueRepo.findAllBySidLessThanEqual(maxId);
Map<Long, List<SourceQueue>> batched = // Redacted
for (Worker worker : workers) {
// Method 'batchInsert' calls a save query (transactional)
batchInsert(worker, batched.getOrDefault(Type.TYPE_1, new HashMap<>()));
batchInsert(worker, batched.getOrDefault(Type.TYPE_2, new HashMap<>()));
batchInsert(worker, batched.getOrDefault(Type.TYPE_3, new HashMap<>()));
}
sourceQueueRepo.deleteByMaxId(maxId);
}
N.B.
- 每个查询都是事务性的,目的是保持数据库事务简短,因为目标上的其他线程会争用 table。
- 插入此队列的代码在此新线程上调用中断,以确保其耗尽新队列。有多个线程调用它,因此在重负载下停机时间非常少。
我查明问题出在 HazelcastLocalCacheRegionFactory 的使用上,我的新线程承受着很大的压力,并且缓存区域在传播事件时淹没了 EventQueue。
我选择不切换到使用 HazelcastCacheRegionFactory 对于我的场景,这对性能产生了不可接受的影响。
相反,对于 @Modifying
查询,我选择使用 NativeQuery 对象并指定我通过 addSynchronizedEntityClass
.
例如;
return em.createNativeQuery("DELETE FROM dbo.SourceQueue WHERE id <= :id")
.unwrap(NativeQuery.class)
.setParameter("id", id)
.addSynchronizedEntityClass(SourceQueue.class)
.executeUpdate();