Java Spring / Reactor 在底层数据库改变时发送/更新通量
Java Spring / Reactor send / update flux when underlying database changed
我有一个候选人“喜欢”的数据,每次更改“喜欢”数字时我都想发送给客户。我认为这可以使用 Spring Flux 实现吗?但我找不到任何例子。大多数通量示例基于特定时间间隔(例如每秒)。这可能是一种浪费,因为交易量不大,候选人可能在很多分钟内都得不到点赞。
我只想创建订阅“喜欢”更改的仪表板,并在某些候选人“喜欢”数字更改时得到更新。
通过什么途径获得?
这就是我所做的,并且有效,但它基于间隔(5 秒),而不是基于数据变化。
public Flux<Candidate> subscribeItemChange(String id) {
return Flux.interval(Duration.ofSeconds(5)).map(t -> candidateService.getCandidateDetail(id));
}
candidateService.getCandidateDetail
基本上是查询数据库中的某个 id,所以这更像是轮询而不是“更改时更新”。
我想我必须在下面的 candidateService.updateLikes()
上放点东西,但是我应该更新什么?
public class CandidateService {
public Candidate getCandidateDetail(String id) {
// query candidate from database
// select * from candidates where id = :id
// and return it
}
public void updateLikes(String id, int likesCount) {
// update candidates set likes_count = :likesCount where id = :id
// ...
// I think I need to write something here, but what?
}
}
您可以使用动态接收器,添加类似于以下内容的字段:
private Sinks.Many<Candidate> likesSink = Sinks.many().multicast().onBackpressureBuffer();
...那么您可以:
- 在您的
updateLikes()
方法中使用 sink.tryEmitNext
以在候选人的点赞更新时发布到接收器;
- 实施您的
subscribeItemChange()
方法,该方法使用 likesSink.asFlux()
,然后可以根据需要对其进行过滤,以仅 return 特定候选人的“喜欢更新”流。
基于@Michael Berry 指南。
public void updateLikes(String id, int likesCount) {
Candidate c = getCandidateDetail(id);
c.setLikesCount(likesCount);
CandidateDummyDatasource.likesSink.tryEmitNext(c);
}
订阅者
public Flux<Candidate> subscribeItemChange(String id) {
return CandidateDummyDatasource.likesSink.asFlux()
.filter(c -> c.getId().equals(id))
.map(data -> candidateService.getCandidateDetail(id));
}
我有一个候选人“喜欢”的数据,每次更改“喜欢”数字时我都想发送给客户。我认为这可以使用 Spring Flux 实现吗?但我找不到任何例子。大多数通量示例基于特定时间间隔(例如每秒)。这可能是一种浪费,因为交易量不大,候选人可能在很多分钟内都得不到点赞。 我只想创建订阅“喜欢”更改的仪表板,并在某些候选人“喜欢”数字更改时得到更新。
通过什么途径获得?
这就是我所做的,并且有效,但它基于间隔(5 秒),而不是基于数据变化。
public Flux<Candidate> subscribeItemChange(String id) {
return Flux.interval(Duration.ofSeconds(5)).map(t -> candidateService.getCandidateDetail(id));
}
candidateService.getCandidateDetail
基本上是查询数据库中的某个 id,所以这更像是轮询而不是“更改时更新”。
我想我必须在下面的 candidateService.updateLikes()
上放点东西,但是我应该更新什么?
public class CandidateService {
public Candidate getCandidateDetail(String id) {
// query candidate from database
// select * from candidates where id = :id
// and return it
}
public void updateLikes(String id, int likesCount) {
// update candidates set likes_count = :likesCount where id = :id
// ...
// I think I need to write something here, but what?
}
}
您可以使用动态接收器,添加类似于以下内容的字段:
private Sinks.Many<Candidate> likesSink = Sinks.many().multicast().onBackpressureBuffer();
...那么您可以:
- 在您的
updateLikes()
方法中使用sink.tryEmitNext
以在候选人的点赞更新时发布到接收器; - 实施您的
subscribeItemChange()
方法,该方法使用likesSink.asFlux()
,然后可以根据需要对其进行过滤,以仅 return 特定候选人的“喜欢更新”流。
基于@Michael Berry 指南。
public void updateLikes(String id, int likesCount) {
Candidate c = getCandidateDetail(id);
c.setLikesCount(likesCount);
CandidateDummyDatasource.likesSink.tryEmitNext(c);
}
订阅者
public Flux<Candidate> subscribeItemChange(String id) {
return CandidateDummyDatasource.likesSink.asFlux()
.filter(c -> c.getId().equals(id))
.map(data -> candidateService.getCandidateDetail(id));
}