中断 concatMap 中的单个可观察对象
Interrupt a single observable in concatMap
我使用 concatMap
通过长时间的 运行 操作一次处理一个项目流。在某些时候我需要 "interrupt" 这么长的 运行 操作,但只针对当前项目:
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
// Now I need to interrupt whichever longRunningOperation in
// progress, but I don't want to interrupt the whole stream.
// In other words, I want to force it to move onto the next
// integer.
}
Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("\tfinally for " + integer));
}
我试图通过保留 "inner" 流的一次性并在正确的时间处理它来解决这个问题。但这没有用。内部流已处理,但 concatMap
从未继续处理项目 3。测试只是挂起(因为外部可观察者也从未 completes/terminates/disposes)
Disposable disposable = Disposables.empty();
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
disposable.dispose();
test.awaitTerminalEvent();
System.out.println("terminal event");
}
Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("\tfinally for " + integer))
.doOnSubscribe(disposable -> {
// save disposable so we can "interrupt" later
System.out.println("Saving disposable for " + integer);
Example.this.disposable = disposable;
});
}
即使这确实有效,但依赖副作用似乎有点老套。实现此目标的最佳方法是什么?
我的问题几乎与 相同。我可以使用 PublishSubject
到 "interrupt"
private PublishSubject interrupter;
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
interrupter.onComplete();
test.awaitTerminalEvent();
System.out.println("terminal event");
}
Observable<String> doLongRunningOperation(final Integer integer) {
interrupter = PublishSubject.create();
return Observable
.just("Start working on " + integer,
"Still working on " + integer,
"Almost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("Finally for " + integer))
.takeUntil(interrupter);
}
您只需要实现 semaphore-kind 可以触发停止事件的 Observable。
实现这个信号量,例如:
PublishSubject<Boolean> semaphore = PublishSubject.create()
像这样修改在 .concatMap() 中创建的 Observable:
.concatMap(string ->
Observable.just(string)
.delay(2, TimeUnit.SECONDS)
.takeUntil(semaphore)) //Here is the stop trigger
当您需要取消当前处理的项目时——只需调用:
semaphore.onNext(true);
我使用 concatMap
通过长时间的 运行 操作一次处理一个项目流。在某些时候我需要 "interrupt" 这么长的 运行 操作,但只针对当前项目:
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
// Now I need to interrupt whichever longRunningOperation in
// progress, but I don't want to interrupt the whole stream.
// In other words, I want to force it to move onto the next
// integer.
}
Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("\tfinally for " + integer));
}
我试图通过保留 "inner" 流的一次性并在正确的时间处理它来解决这个问题。但这没有用。内部流已处理,但 concatMap
从未继续处理项目 3。测试只是挂起(因为外部可观察者也从未 completes/terminates/disposes)
Disposable disposable = Disposables.empty();
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
disposable.dispose();
test.awaitTerminalEvent();
System.out.println("terminal event");
}
Observable<String> doLongRunningOperation(final Integer integer) {
return Observable
.just("\tStart working on " + integer,
"\tStill working on " + integer,
"\tAlmost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("\tfinally for " + integer))
.doOnSubscribe(disposable -> {
// save disposable so we can "interrupt" later
System.out.println("Saving disposable for " + integer);
Example.this.disposable = disposable;
});
}
即使这确实有效,但依赖副作用似乎有点老套。实现此目标的最佳方法是什么?
我的问题几乎与 PublishSubject
到 "interrupt"
private PublishSubject interrupter;
@Test
public void main() throws InterruptedException {
TestObserver<String> test = Observable.just(1, 2, 3, 4, 5)
.concatMap(this::doLongRunningOperation)
.test();
Thread.sleep(10000);
System.out.println("interrupt NOW");
interrupter.onComplete();
test.awaitTerminalEvent();
System.out.println("terminal event");
}
Observable<String> doLongRunningOperation(final Integer integer) {
interrupter = PublishSubject.create();
return Observable
.just("Start working on " + integer,
"Still working on " + integer,
"Almost done working on " + integer)
// delay each item by 2 seconds
.concatMap(string -> Observable.just(string).delay(2, TimeUnit.SECONDS))
.doOnNext(System.out::println)
.doFinally(() -> System.out.println("Finally for " + integer))
.takeUntil(interrupter);
}
您只需要实现 semaphore-kind 可以触发停止事件的 Observable。
实现这个信号量,例如:
PublishSubject<Boolean> semaphore = PublishSubject.create()
像这样修改在 .concatMap() 中创建的 Observable:
.concatMap(string ->
Observable.just(string)
.delay(2, TimeUnit.SECONDS)
.takeUntil(semaphore)) //Here is the stop trigger
当您需要取消当前处理的项目时——只需调用:
semaphore.onNext(true);