Emitter.onComplete 后缓冲的 Flowable 中的线程中断
Thread interrupts in buffered Flowable after Emitter.onComplete
我有一个Flowable
喜欢:
Flowable.<String>create(onSubscribe, BackpressureStrategy.DROP)
.doOnSubscribe(sub -> {
System.out.println("onSubscribe");
})
.onBackpressureDrop(sns2 -> LOG.warn("Backpressure, dropping " + Arrays.asList(sns2)))
.buffer(1, TimeUnit.SECONDS)
.doOnTerminate(() -> {
System.out.println("onTerminate");
})
.onErrorReturn(error -> {
System.out.println("Error, will cancel scan: " + error);
throw new RuntimeException(error);
})
.subscribe(objs -> /* NIO work here */);
当我在源发射器上调用 onComplete
时,紧随 onNext
调用之后,我在 subscribe
lambda 内部得到一个中断。我认为这是预期的行为: https://github.com/ReactiveX/RxJava/issues/3601 。这似乎是在订阅者处理缓冲区中的下游对象时发生的(我猜如果它是单线程的就没有问题)。
这是一个问题,因为我在这里执行 NIO 工作,它对 Thread.interrupt
上发生的事情有非常具体的要求。这发生在缓冲区完成发送所有工作之前,因此一些 objs
未完全处理。
这与Scheduler
有关吗?我应该使用 IO 吗?我如何 "protect" 在订阅者中执行的工作
我应该在 IO 线程上注册我的订阅者:
.subscribeOn(Schedulers.io(), false)
.subscribe(objs -> /* NIO work here */);
我想我是在假设,因为上游 buffer
,中断被放置在来自 buffer
的线程上,并且这也会对 subscribeOn
选择的任何线程进行。但似乎并非如此。
如果有人能提供更好的解释,我想看看。
我有一个Flowable
喜欢:
Flowable.<String>create(onSubscribe, BackpressureStrategy.DROP)
.doOnSubscribe(sub -> {
System.out.println("onSubscribe");
})
.onBackpressureDrop(sns2 -> LOG.warn("Backpressure, dropping " + Arrays.asList(sns2)))
.buffer(1, TimeUnit.SECONDS)
.doOnTerminate(() -> {
System.out.println("onTerminate");
})
.onErrorReturn(error -> {
System.out.println("Error, will cancel scan: " + error);
throw new RuntimeException(error);
})
.subscribe(objs -> /* NIO work here */);
当我在源发射器上调用 onComplete
时,紧随 onNext
调用之后,我在 subscribe
lambda 内部得到一个中断。我认为这是预期的行为: https://github.com/ReactiveX/RxJava/issues/3601 。这似乎是在订阅者处理缓冲区中的下游对象时发生的(我猜如果它是单线程的就没有问题)。
这是一个问题,因为我在这里执行 NIO 工作,它对 Thread.interrupt
上发生的事情有非常具体的要求。这发生在缓冲区完成发送所有工作之前,因此一些 objs
未完全处理。
这与Scheduler
有关吗?我应该使用 IO 吗?我如何 "protect" 在订阅者中执行的工作
我应该在 IO 线程上注册我的订阅者:
.subscribeOn(Schedulers.io(), false)
.subscribe(objs -> /* NIO work here */);
我想我是在假设,因为上游 buffer
,中断被放置在来自 buffer
的线程上,并且这也会对 subscribeOn
选择的任何线程进行。但似乎并非如此。
如果有人能提供更好的解释,我想看看。