AWS ElastiCache (Redis) 的 Spring 数据的原子增量
Atomic increment with Spring Data for AWS ElastiCache (Redis)
我们在 ELB(负载均衡器)后面部署了同一应用程序的多个实例。每当完成某项工作时,我们都会对一些元素进行计数,然后想要增加计数器的值。
我们使用 ElastiCache 将这些指标保存在内存中。我们已将其设置为 Redis 实例集群。
我无法理解如何与 ElastiCache 正确交互,以便计数器永远不会错过任何增量(即原子操作)。我知道 INCRBY
似乎是可行的方法,但我不确定如何设置 Spring 数据以便我可以向我的 Master
发出 Redis 命令。事实上,我们的方法甚至不是线程安全的,但这里是代码:
@Slf4j
@Service
@RequiredArgsConstructor
public class MetricServiceImpl implements MetricService {
private final IntegerMetricRepository integerMetricRepository;
private static final BigInteger ZERO = BigInteger.ZERO;
@Override
public long countRealJobs(List<Job> newJobs) {
return newJobs.stream()
.filter(job -> !job.isFake())
.count();
}
@Override
public long countRealDrafts(List<Draft> drafts) {
return drafts.stream()
.filter(draft -> !draft.getString(JsonFields.TITLE.getValue())
.contains("FAKE"))
.count();
}
@Override
public IntegerMetric increment(IntegerMetricType integerMetricType, long amount) {
IntegerMetric metric = getOrInitialize(integerMetricType);
BigInteger newValue = metric.getValue().add(BigInteger.valueOf(amount));
metric.setValue(newValue.max(ZERO)); // smallest possible value is 0
return integerMetricRepository.save(metric);
}
@Override
public BigInteger getValue(IntegerMetricType integerMetricType) {
return getOrInitialize(integerMetricType).getValue();
}
@Override
public IntegerMetric setValue(IntegerMetricType integerMetricType, long amount) {
IntegerMetric metric = getOrInitialize(integerMetricType);
if (amount < 0) { // negatives not allowed
log.info("Tried to set a negative value for an IntegerMetric.");
return metric;
}
metric.setValue(BigInteger.valueOf(amount));
return integerMetricRepository.save(metric);
}
/**
* @param integerMetricType the desired Entity
* @return either the Entity which already existed, or a new one initialized to {@code ZERO}.
*/
private IntegerMetric getOrInitialize(IntegerMetricType integerMetricType) {
return integerMetricRepository.findById(integerMetricType).orElseGet(
() -> integerMetricRepository.save(new IntegerMetric(integerMetricType, ZERO)));
}
}
对于我的 Repository
,似乎我可以发出的唯一相关操作是 get
和 set
的等价物。我如何设置我的代码,以便我可以向我的集群发出实际的 Redis 命令,从而利用我想使用的原语(这里,INCRBY
)的原子性质?
解决方案在于使用 RedisTemplate
。有了那个 class,就可以使用 Redis 原生支持的 "AtomicCounter"(通过 INCRBY
等操作)。
我们在 ELB(负载均衡器)后面部署了同一应用程序的多个实例。每当完成某项工作时,我们都会对一些元素进行计数,然后想要增加计数器的值。
我们使用 ElastiCache 将这些指标保存在内存中。我们已将其设置为 Redis 实例集群。
我无法理解如何与 ElastiCache 正确交互,以便计数器永远不会错过任何增量(即原子操作)。我知道 INCRBY
似乎是可行的方法,但我不确定如何设置 Spring 数据以便我可以向我的 Master
发出 Redis 命令。事实上,我们的方法甚至不是线程安全的,但这里是代码:
@Slf4j
@Service
@RequiredArgsConstructor
public class MetricServiceImpl implements MetricService {
private final IntegerMetricRepository integerMetricRepository;
private static final BigInteger ZERO = BigInteger.ZERO;
@Override
public long countRealJobs(List<Job> newJobs) {
return newJobs.stream()
.filter(job -> !job.isFake())
.count();
}
@Override
public long countRealDrafts(List<Draft> drafts) {
return drafts.stream()
.filter(draft -> !draft.getString(JsonFields.TITLE.getValue())
.contains("FAKE"))
.count();
}
@Override
public IntegerMetric increment(IntegerMetricType integerMetricType, long amount) {
IntegerMetric metric = getOrInitialize(integerMetricType);
BigInteger newValue = metric.getValue().add(BigInteger.valueOf(amount));
metric.setValue(newValue.max(ZERO)); // smallest possible value is 0
return integerMetricRepository.save(metric);
}
@Override
public BigInteger getValue(IntegerMetricType integerMetricType) {
return getOrInitialize(integerMetricType).getValue();
}
@Override
public IntegerMetric setValue(IntegerMetricType integerMetricType, long amount) {
IntegerMetric metric = getOrInitialize(integerMetricType);
if (amount < 0) { // negatives not allowed
log.info("Tried to set a negative value for an IntegerMetric.");
return metric;
}
metric.setValue(BigInteger.valueOf(amount));
return integerMetricRepository.save(metric);
}
/**
* @param integerMetricType the desired Entity
* @return either the Entity which already existed, or a new one initialized to {@code ZERO}.
*/
private IntegerMetric getOrInitialize(IntegerMetricType integerMetricType) {
return integerMetricRepository.findById(integerMetricType).orElseGet(
() -> integerMetricRepository.save(new IntegerMetric(integerMetricType, ZERO)));
}
}
对于我的 Repository
,似乎我可以发出的唯一相关操作是 get
和 set
的等价物。我如何设置我的代码,以便我可以向我的集群发出实际的 Redis 命令,从而利用我想使用的原语(这里,INCRBY
)的原子性质?
解决方案在于使用 RedisTemplate
。有了那个 class,就可以使用 Redis 原生支持的 "AtomicCounter"(通过 INCRBY
等操作)。