flatMap 是如何深入工作的?
How flatMap works in depth?
我对 flatMap 如何控制它的 "child" 线程很感兴趣,例如下面的代码工作正常:
private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())
.onBackpressureLatest()
.parallel()
.runOn(Schedulers.computation())
.flatMap((Function<Long, Publisher<PlcDataPackage>>) aLong -> mDataPackageFlowable)
.sequential();
}
并且此代码在被调用 128 次后停止(即 flowable 的默认 maxConcurent):
private ConnectableFlowable<PlcDataPackage> createConnectablePlcFlowable() {
return mPlcIntervalFlowable.onBackpressureLatest()
.subscribeOn(Schedulers.single())
.publish();
}
订阅:
addDisposable(mGetPlcUpdatesChanelUseCase.execute()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(plcDto -> Timber.d("plcReceiver"),
Timber::e));
用例:
public class GetPlcUpdatesChanelUseCase extends UseCase<PlcDto, Object> {
private final PlcRepository mPlcRepository;
public GetPlcUpdatesChanelUseCase(PlcRepository plcRepository) {
mPlcRepository = plcRepository;
}
@Override
public Flowable<PlcDto> buildFlowable(Optional<Object> optional) {
return mPlcRepository.getUpdatesChannel();
}
@Override
public boolean isParamsRequired() {
return false;
}
}
回购方法
@Override
public Flowable<PlcDto> getUpdatesChannel() {
return mPlcCore.getPlcConnectableFlowable()
.map(mPlcInfoTopPlcDtoTransformer::transform);
}
PlcCore 方法
public ConnectableFlowable<PlcDataPackage> getPlcConnectableFlowable() {
return mConnectableFlowable;
}
而 mConnectableFlowable 是:
mConnectableFlowable = createConnectablePlcFlowable();
mConnectableFlowable.connect();
据我所知,mDataPackageFlowable 创建一次,然后执行,每次它为其 child 创建新的 "thread",在执行 128 次后,它只会阻止所有后续执行。
所以主要有3个问题:
1) flatMap 是否控制 child 个线程?
2) 为什么它在新线程上执行每个新的 "request"?(也许不是,然后告诉我)
3) 在什么情况下我们会失去对 child 个线程的控制。
免责声明:英语是我的第二语言,如果有什么不清楚的地方可以问我,我会尽量补充说明。
private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())
.onBackpressureLatest()
.parallel()
.runOn(Schedulers.computation())
.sequental()
这个组合不起作用,它实际上去除了 128 次 flatMap 调用限制,但没有清除导致内存泄漏和 OOM 异常的旧 innersubscription。请改用某种地图。
观察者链需要订阅才能正常工作。当您使用 interval()
生成数据时,您提供了一个 "hot" 可观察值,它会自行发出值。 "cold" observable 仅在订阅发生时才会发出值。
128 是 flatMap()
在暂停之前缓冲的条目数。如果有订阅,那么 flatMap()
会向下游发出内部 observable 产生的值,并且不会停止。
根据 javadoc,flatMap()
本身并不在特定的调度程序上运行。这意味着它不会在特定线程上操纵其订阅。如果您想控制 flatMap()
调用的可观察对象中正在完成的工作,那么您可以使用显式调度:
observable
.flatMap( value -> fun(value).subscribeOn( myScheduler ) )
.subscribe();
例如,myScheduler
可能是 Schedulers.io()
,它会在需要时创建线程。或者,它可以是您提供的具有固定线程数的 Executor
。我经常使用只分配了一个或两个或 48 个线程的 Executor
s 来控制 flatMap()
的扇出。
您还可以向 flatMap()
提供并行参数,告诉它要维护的最大订阅数。当 flatMap()
达到最大值时,它将缓冲请求,直到它订阅的观察者链完成。
parallel()
运算符做类似的事情,但它将传入事件拆分出来,在单独的线程上发出它们。同样,javadoc 具有出色的描述和精美的图片。
总是有可能失去对线程的控制。当您使用 RxJava 运算符时,请阅读它的文档。您需要了解两个方面。第一个区域是操作员在哪个调度程序上工作。如果它说它不在特定的调度程序上运行,那么它不会直接影响线程的选择或线程的使用方式。如果它声明它使用特定的调度程序,那么您需要了解该调度程序的工作原理;总会有另一个版本的操作员允许您提供自己选择的调度程序。
您必须了解的第二个方面是背压。您需要了解背压的含义及其应用方式。每当您跨过线程边界时,这一点尤其重要,例如使用 observeOn()
或 subscribeOn()
.
我对 flatMap 如何控制它的 "child" 线程很感兴趣,例如下面的代码工作正常:
private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())
.onBackpressureLatest()
.parallel()
.runOn(Schedulers.computation())
.flatMap((Function<Long, Publisher<PlcDataPackage>>) aLong -> mDataPackageFlowable)
.sequential();
}
并且此代码在被调用 128 次后停止(即 flowable 的默认 maxConcurent):
private ConnectableFlowable<PlcDataPackage> createConnectablePlcFlowable() {
return mPlcIntervalFlowable.onBackpressureLatest()
.subscribeOn(Schedulers.single())
.publish();
}
订阅:
addDisposable(mGetPlcUpdatesChanelUseCase.execute()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(plcDto -> Timber.d("plcReceiver"),
Timber::e));
用例:
public class GetPlcUpdatesChanelUseCase extends UseCase<PlcDto, Object> {
private final PlcRepository mPlcRepository;
public GetPlcUpdatesChanelUseCase(PlcRepository plcRepository) {
mPlcRepository = plcRepository;
}
@Override
public Flowable<PlcDto> buildFlowable(Optional<Object> optional) {
return mPlcRepository.getUpdatesChannel();
}
@Override
public boolean isParamsRequired() {
return false;
}
}
回购方法
@Override
public Flowable<PlcDto> getUpdatesChannel() {
return mPlcCore.getPlcConnectableFlowable()
.map(mPlcInfoTopPlcDtoTransformer::transform);
}
PlcCore 方法
public ConnectableFlowable<PlcDataPackage> getPlcConnectableFlowable() {
return mConnectableFlowable;
}
而 mConnectableFlowable 是:
mConnectableFlowable = createConnectablePlcFlowable();
mConnectableFlowable.connect();
据我所知,mDataPackageFlowable 创建一次,然后执行,每次它为其 child 创建新的 "thread",在执行 128 次后,它只会阻止所有后续执行。
所以主要有3个问题:
1) flatMap 是否控制 child 个线程?
2) 为什么它在新线程上执行每个新的 "request"?(也许不是,然后告诉我)
3) 在什么情况下我们会失去对 child 个线程的控制。
免责声明:英语是我的第二语言,如果有什么不清楚的地方可以问我,我会尽量补充说明。
private Flowable<PlcDataPackage> createIntervalPlcFlowable() {
return Flowable.interval(1, TimeUnit.SECONDS, Schedulers.computation())
.onBackpressureLatest()
.parallel()
.runOn(Schedulers.computation())
.sequental()
这个组合不起作用,它实际上去除了 128 次 flatMap 调用限制,但没有清除导致内存泄漏和 OOM 异常的旧 innersubscription。请改用某种地图。
观察者链需要订阅才能正常工作。当您使用 interval()
生成数据时,您提供了一个 "hot" 可观察值,它会自行发出值。 "cold" observable 仅在订阅发生时才会发出值。
128 是 flatMap()
在暂停之前缓冲的条目数。如果有订阅,那么 flatMap()
会向下游发出内部 observable 产生的值,并且不会停止。
flatMap()
本身并不在特定的调度程序上运行。这意味着它不会在特定线程上操纵其订阅。如果您想控制 flatMap()
调用的可观察对象中正在完成的工作,那么您可以使用显式调度:
observable
.flatMap( value -> fun(value).subscribeOn( myScheduler ) )
.subscribe();
例如,myScheduler
可能是 Schedulers.io()
,它会在需要时创建线程。或者,它可以是您提供的具有固定线程数的 Executor
。我经常使用只分配了一个或两个或 48 个线程的 Executor
s 来控制 flatMap()
的扇出。
您还可以向 flatMap()
提供并行参数,告诉它要维护的最大订阅数。当 flatMap()
达到最大值时,它将缓冲请求,直到它订阅的观察者链完成。
parallel()
运算符做类似的事情,但它将传入事件拆分出来,在单独的线程上发出它们。同样,javadoc 具有出色的描述和精美的图片。
总是有可能失去对线程的控制。当您使用 RxJava 运算符时,请阅读它的文档。您需要了解两个方面。第一个区域是操作员在哪个调度程序上工作。如果它说它不在特定的调度程序上运行,那么它不会直接影响线程的选择或线程的使用方式。如果它声明它使用特定的调度程序,那么您需要了解该调度程序的工作原理;总会有另一个版本的操作员允许您提供自己选择的调度程序。
您必须了解的第二个方面是背压。您需要了解背压的含义及其应用方式。每当您跨过线程边界时,这一点尤其重要,例如使用 observeOn()
或 subscribeOn()
.