在 RxJava 中超时取消任务
Cancel task on timeout in RxJava
我正在试验 RxJava 和 Java 8 的 CompletableFuture class
并且不太了解如何处理超时情况。
import static net.javacrumbs.futureconverter.java8rx.FutureConverter.toObservable;
// ...
Observable<String> doSomethingSlowly() {
CompletableFuture<PaymentResult> task = CompletableFuture.supplyAsync(() -> {
// this call may be very slow - if it takes too long,
// we want to time out and cancel it.
return processor.slowExternalCall();
});
return toObservable(task);
}
// ...
doSomethingSlowly()
.single()
.timeout(3, TimeUnit.SECONDS, Observable.just("timeout"));
这基本上有效(如果达到三秒超时,"timeout" 已发布)。然而,我还想取消我在 Observable
中包装的未来任务 - 使用以 RxJava 为中心的方法是否可能?
我知道一种选择是使用 task.get(3, TimeUnit.SECONDS)
自己处理超时,但我想知道是否可以在 RxJava.
中完成所有任务处理工作
是的,你可以做到。您可以将 Subscription
添加到 Subscriber
.
这允许您监听取消订阅,如果您明确调用 subscribe().unsubscribe()
或者 Observable
成功完成或出现错误,就会发生这种情况。
如果您在 future 完成之前看到取消订阅,您可以假设这是因为显式 unsubscribe
或超时。
public class FutureTest {
public static void main(String[] args) throws IOException {
doSomethingSlowly()
.timeout(1, TimeUnit.SECONDS, Observable.just("timeout"))
.subscribe(System.out::println);
System.in.read(); // keep process alive
}
private static Observable<String> doSomethingSlowly() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return "Something";
});
return toObservable(future);
}
private static <T> Observable<T> toObservable(CompletableFuture<T> future) {
return Observable.create(subscriber -> {
subscriber.add(new Subscription() {
private boolean unsubscribed = false;
@Override
public void unsubscribe() {
if (!future.isDone()){
future.cancel(true);
}
unsubscribed = true;
}
@Override
public boolean isUnsubscribed() {
return unsubscribed;
}
});
future.thenAccept(value -> {
if (!subscriber.isUnsubscribed()){
subscriber.onNext(value);
subscriber.onCompleted();
}
}).exceptionally(throwable -> {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(throwable);
}
return null;
});
});
}
}
我正在试验 RxJava 和 Java 8 的 CompletableFuture class 并且不太了解如何处理超时情况。
import static net.javacrumbs.futureconverter.java8rx.FutureConverter.toObservable;
// ...
Observable<String> doSomethingSlowly() {
CompletableFuture<PaymentResult> task = CompletableFuture.supplyAsync(() -> {
// this call may be very slow - if it takes too long,
// we want to time out and cancel it.
return processor.slowExternalCall();
});
return toObservable(task);
}
// ...
doSomethingSlowly()
.single()
.timeout(3, TimeUnit.SECONDS, Observable.just("timeout"));
这基本上有效(如果达到三秒超时,"timeout" 已发布)。然而,我还想取消我在 Observable
中包装的未来任务 - 使用以 RxJava 为中心的方法是否可能?
我知道一种选择是使用 task.get(3, TimeUnit.SECONDS)
自己处理超时,但我想知道是否可以在 RxJava.
是的,你可以做到。您可以将 Subscription
添加到 Subscriber
.
这允许您监听取消订阅,如果您明确调用 subscribe().unsubscribe()
或者 Observable
成功完成或出现错误,就会发生这种情况。
如果您在 future 完成之前看到取消订阅,您可以假设这是因为显式 unsubscribe
或超时。
public class FutureTest {
public static void main(String[] args) throws IOException {
doSomethingSlowly()
.timeout(1, TimeUnit.SECONDS, Observable.just("timeout"))
.subscribe(System.out::println);
System.in.read(); // keep process alive
}
private static Observable<String> doSomethingSlowly() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return "Something";
});
return toObservable(future);
}
private static <T> Observable<T> toObservable(CompletableFuture<T> future) {
return Observable.create(subscriber -> {
subscriber.add(new Subscription() {
private boolean unsubscribed = false;
@Override
public void unsubscribe() {
if (!future.isDone()){
future.cancel(true);
}
unsubscribed = true;
}
@Override
public boolean isUnsubscribed() {
return unsubscribed;
}
});
future.thenAccept(value -> {
if (!subscriber.isUnsubscribed()){
subscriber.onNext(value);
subscriber.onCompleted();
}
}).exceptionally(throwable -> {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(throwable);
}
return null;
});
});
}
}