Java Spring / Reactor 在底层数据库改变时发送/更新通量

Java Spring / Reactor send / update flux when underlying database changed

我有一个候选人“喜欢”的数据,每次更改“喜欢”数字时我都想发送给客户。我认为这可以使用 Spring Flux 实现吗?但我找不到任何例子。大多数通量示例基于特定时间间隔(例如每秒)。这可能是一种浪费,因为交易量不大,候选人可能在很多分钟内都得不到点赞。 我只想创建订阅“喜欢”更改的仪表板,并在某些候选人“喜欢”数字更改时得到更新。

通过什么途径获得?

这就是我所做的,并且有效,但它基于间隔(5 秒),而不是基于数据变化。

    public Flux<Candidate> subscribeItemChange(String id) {
        return Flux.interval(Duration.ofSeconds(5)).map(t -> candidateService.getCandidateDetail(id));
    }

candidateService.getCandidateDetail 基本上是查询数据库中的某个 id,所以这更像是轮询而不是“更改时更新”。 我想我必须在下面的 candidateService.updateLikes() 上放点东西,但是我应该更新什么?

public class CandidateService {

    public Candidate getCandidateDetail(String id) {
        // query candidate from database
        // select * from candidates where id = :id
        // and return it
    }
    
    
    public void updateLikes(String id, int likesCount) {
        // update candidates set likes_count = :likesCount where id = :id
        // ...
        // I think I need to write something here, but what?
    }
    
}

您可以使用动态接收器,添加类似于以下内容的字段:

private Sinks.Many<Candidate> likesSink = Sinks.many().multicast().onBackpressureBuffer();

...那么您可以:

  • 在您的 updateLikes() 方法中使用 sink.tryEmitNext 以在候选人的点赞更新时发布到接收器;
  • 实施您的 subscribeItemChange() 方法,该方法使用 likesSink.asFlux(),然后可以根据需要对其进行过滤,以仅 return 特定候选人的“喜欢更新”流。

基于@Michael Berry 指南。

    public void updateLikes(String id, int likesCount) {
        Candidate c = getCandidateDetail(id);
        c.setLikesCount(likesCount);

        CandidateDummyDatasource.likesSink.tryEmitNext(c);
    }

订阅者

    public Flux<Candidate> subscribeItemChange(String id) {
        return CandidateDummyDatasource.likesSink.asFlux()
                .filter(c -> c.getId().equals(id))
                .map(data -> candidateService.getCandidateDetail(id));
    }