RxJava 将 publishsubject 与超时结合起来
RxJava combine publishsubject with timeout
我希望能够订阅 publishsubject 并等待结果,但不要超过 1 分钟。
问题是如果我这样做
publishsubject.timeout(1, TimeUnit.MINUTES).subscribe({result -> ... }, {error -> ... } )
即使在此之前我成功获得结果,我也总是会出错。如何正确实施这种方法?
在下面的示例中,我展示了 timeout
是如何工作的。对于每次发射,都会启动一个新的超时,如果新项目在超时之前到达 运行,则重新设置超时,否则会引发异常。
在示例中,我们可以看到 1、2、3 在控制台打印,它以超时异常结束,因为第 4 项不在 3 之后的 200 毫秒内。
正如我在下面的评论中所说,如果您知道什么时候可以终止,就可以避免这种情况 publishSubject
。例如使用 take
、takeUntil
或在第 3 项之后调用 publishSubject.onComplete()
。
@Test
public void timeout() throws InterruptedException {
PublishSubject<Object> publishSubject = PublishSubject.create();
Observable<Object> timeout = publishSubject.timeout(200, TimeUnit.MILLISECONDS);
timeout
.subscribe(
e -> System.out.println(e),
error -> System.out.println("ERROR: " + error),
() -> System.out.println("complete")
);
sleep(50);
publishSubject.onNext(1);
sleep(150);
publishSubject.onNext(2);
sleep(199);
publishSubject.onNext(3);
sleep(201);
publishSubject.onNext(4);
Thread.sleep(2000);
}
您很可能会遇到超时异常,因为 timeout
要求您的源在指定的时间 window 内继续生成项目或完成。因此,如果您只向 PublishSubject
发出一个 onNext
信号,而不再发出更多信号,您将因缺少第二个 onNext
调用而超时。
因此,如果您想要一件商品,请使用 take
(在 timeout
之前或之后):
publishsubject
.timeout(1, TimeUnit.MINUTES)
.take(1)
.subscribe(result -> { /* ... */ }, error -> { /* ... */ } )
我希望能够订阅 publishsubject 并等待结果,但不要超过 1 分钟。
问题是如果我这样做
publishsubject.timeout(1, TimeUnit.MINUTES).subscribe({result -> ... }, {error -> ... } )
即使在此之前我成功获得结果,我也总是会出错。如何正确实施这种方法?
在下面的示例中,我展示了 timeout
是如何工作的。对于每次发射,都会启动一个新的超时,如果新项目在超时之前到达 运行,则重新设置超时,否则会引发异常。
在示例中,我们可以看到 1、2、3 在控制台打印,它以超时异常结束,因为第 4 项不在 3 之后的 200 毫秒内。
正如我在下面的评论中所说,如果您知道什么时候可以终止,就可以避免这种情况 publishSubject
。例如使用 take
、takeUntil
或在第 3 项之后调用 publishSubject.onComplete()
。
@Test
public void timeout() throws InterruptedException {
PublishSubject<Object> publishSubject = PublishSubject.create();
Observable<Object> timeout = publishSubject.timeout(200, TimeUnit.MILLISECONDS);
timeout
.subscribe(
e -> System.out.println(e),
error -> System.out.println("ERROR: " + error),
() -> System.out.println("complete")
);
sleep(50);
publishSubject.onNext(1);
sleep(150);
publishSubject.onNext(2);
sleep(199);
publishSubject.onNext(3);
sleep(201);
publishSubject.onNext(4);
Thread.sleep(2000);
}
您很可能会遇到超时异常,因为 timeout
要求您的源在指定的时间 window 内继续生成项目或完成。因此,如果您只向 PublishSubject
发出一个 onNext
信号,而不再发出更多信号,您将因缺少第二个 onNext
调用而超时。
因此,如果您想要一件商品,请使用 take
(在 timeout
之前或之后):
publishsubject
.timeout(1, TimeUnit.MINUTES)
.take(1)
.subscribe(result -> { /* ... */ }, error -> { /* ... */ } )