RxJava 将 publishsubject 与超时结合起来

RxJava combine publishsubject with timeout

我希望能够订阅 publishsubject 并等待结果,但不要超过 1 分钟。

问题是如果我这样做

publishsubject.timeout(1, TimeUnit.MINUTES).subscribe({result -> ... }, {error -> ... } )

即使在此之前我成功获得结果,我也总是会出错。如何正确实施这种方法?

在下面的示例中,我展示了 timeout 是如何工作的。对于每次发射,都会启动一个新的超时,如果新项目在超时之前到达 运行,则重新设置超时,否则会引发异常。

在示例中,我们可以看到 1、2、3 在控制台打印,它以超时异常结束,因为第 4 项不在 3 之后的 200 毫秒内。

正如我在下面的评论中所说,如果您知道什么时候可以终止,就可以避免这种情况 publishSubject。例如使用 taketakeUntil 或在第 3 项之后调用 publishSubject.onComplete()

    @Test
    public void timeout() throws InterruptedException {
        PublishSubject<Object> publishSubject = PublishSubject.create();

        Observable<Object> timeout = publishSubject.timeout(200, TimeUnit.MILLISECONDS);

        timeout
                .subscribe(
                        e -> System.out.println(e),
                        error -> System.out.println("ERROR: " + error),
                        () -> System.out.println("complete")
                );

        sleep(50);
        publishSubject.onNext(1);
        sleep(150);
        publishSubject.onNext(2);
        sleep(199);
        publishSubject.onNext(3);
        sleep(201);
        publishSubject.onNext(4);

        Thread.sleep(2000);
    }

您很可能会遇到超时异常,因为 timeout 要求您的源在指定的时间 window 内继续生成项目或完成。因此,如果您只向 PublishSubject 发出一个 onNext 信号,而不再发出更多信号,您将因缺少第二个 onNext 调用而超时。

因此,如果您想要一件商品,请使用 take(在 timeout 之前或之后):

publishsubject
   .timeout(1, TimeUnit.MINUTES)
   .take(1)
   .subscribe(result -> { /* ... */ }, error -> { /* ... */ } )