有没有更好的方法来使用 RxJava 获取两个 Observable 的交集

Is there a better way to get the intersection of two Observables using RxJava

使用 RxJava,我有一个源 Observable,它发出许多我希望与另一个发出相同类型的 Observable 相交的项目。在考虑了多种选择之后,似乎最连贯的结构方式是这样的:

Observable<String> source = ...emits 20 items

Observable.create(subscriber -> {
    source
        .buffer(5)
        .subscribe(things -> {
            tocheck.getMatches(things) //emits 3 matches
                .subscribe(subscriber::onNext, subscriber::onError, () -> {});
        }, subscriber::onError, subscriber::onCompleted));

这里的预期输出是当我订阅生成的 Observable 时,我发出了 12 个项目。由于 getMatches 的约定,我需要缓冲结果。

从表面上看,这似乎可行,但它似乎不是最干净的方法。过滤器似乎不适用于此处,因为出于性能原因我无法 运行 对每个项目进行相交检查。我试过使用 flatMap,但是 getMatches 可观察对象完成了流,而不是来自源可观察对象的完成通知。

有没有更好的方法来构建这个?

编辑: 澄清这种代码风格发生了什么:

Observable<String> source = ...emits 20 items

source
    .buffer(5)
    .flatMap(this::getMatches);  //final observable would emit a total of 12 items

这显然更简洁,但是当我添加一些日志记录时(假设数据大小与原始代码段相同:

source
    .doOnEach(notification -> {
        log.trace("Processing {}", notification.getValue());
    })
    .buffer(5)
    .flatMap(this::getMatches)
    .doOnEach(notification -> {
        log.trace("Processing after match {}", notification.getValue());
    });

我得到了 "Processing" 日志的 20 个实例,然后奇怪的是 "Processing after" 中只有几行日志(我预计是 12 行)。它似乎比应有的时间更早地调用 on complete 。也许我的结构有问题?

看来 AndroidEx 是正确的。我正在使用 Redis Lettuce 反应式 API,但它看起来并不正常。上面添加的代码片段是构造两个 Observable 的交集的正确方法。