我应该把 onBackPressureBuffer(n) 放在 RxJava 订阅链的什么地方?

Where should I put onBackPressureBuffer(n) in a RxJava subscription chain?

我正在尝试修补现有的 React Native 库 react-native-ble-plx adding onBackPressureBuffer() in existing Java code

我知道这很丑陋,但我现在没有时间提交 PR,有一个 pending issue 可以解决问题。 我这样做是因为事件发射器以 200Hz 的频率工作。我需要一种安全的方式来缓冲本机端的项目,同时它们在 JavaScript 端以自己的速度消耗。

所以代码变成了下面这样:

       final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
            @Override
            public Observable<Observable<byte[]>> call() {
                int properties = gattCharacteristic.getProperties();
                BluetoothGattDescriptor cccDescriptor = gattCharacteristic
                        .getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
                NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
                        : NotificationSetupMode.COMPAT;
                if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
                    return connection.setupNotification(gattCharacteristic, setupMode);
                }

                if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
                    return connection.setupIndication(gattCharacteristic, setupMode);
                }

                return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
            }
        }).onBackpressureBuffer(1000)  <---- Here is my modification
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
            @Override
            public Observable<byte[]> call(Observable<byte[]> observable) {
                return observable;
            }
        }).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                promise.resolve(null);
                transactions.removeSubscription(transactionId);
            }
        }).subscribe(new Observer<byte[]>() {
            @Override
            public void onCompleted() {
                promise.resolve(null);
                transactions.removeSubscription(transactionId);
            }

            @Override
            public void onError(Throwable e) {
                errorConverter.toError(e).reject(promise);
                transactions.removeSubscription(transactionId);
            }

            @Override
            public void onNext(byte[] bytes) {
                characteristic.logValue("Notification from", bytes);
                WritableArray jsResult = Arguments.createArray();
                jsResult.pushNull();
                jsResult.pushMap(characteristic.toJSObject(bytes));
                jsResult.pushString(transactionId);
                sendEvent(Event.ReadEvent, jsResult);
            }
        });

我的问题是,即使添加了那个,我仍然遇到 MissingBackPressure 异常。

我试过 onBackPressureDrop() 并且我有完全相同的行为。所以我假设我做错了,但现在无法弄清楚为什么。

感谢任何帮助。

正如您所说,您正面临 react-native 库的问题,并且上面的代码之前确实抛出 MissingBackpressureException

来自 .onBackpressureDrop() 的 Javadoc(加粗我的):

Instructs an Observable that is emitting items faster than its observer can consume them to discard, rather than emit, those items that its observer is not prepared to observe.

If the downstream request count hits 0 then the Observable will refrain from calling {@code onNext} until the observer invokes {@code request(n)} again to increase the request count.

Backpressure:
The operator honors backpressure from downstream and consumes the source {@code Observable} in an unbounded manner (i.e., not applying backpressure to it).
Scheduler:
{@code onBackpressureDrop} does not operate by default on a particular {@link Scheduler}.

您可以看到链中的下一个运算符是 .flatMap().doOnUnsubscribe().subscribe()

来自 .flatMap() 关于背压的 Javadoc:

Backpressure:
The operator honors backpressure from downstream. The outer {@code Observable} is consumed in unbounded mode (i.e., no backpressure is applied to it). The inner {@code Observable}s are expected to honor backpressure; if violated, the operator may signal {@code MissingBackpressureException}.

Javadoc .doOnUnsubscribe():

Backpressure:
{@code doOnUnsubscribe} does not interact with backpressure requests or value delivery; backpressure behavior is preserved between its upstream and its downstream.

.subscribe()

Backpressure:
The operator consumes the source {@code Observable} in an unbounded manner (i.e., no backpressure is applied to it).

如您所见,.onBackpressure*() 下面的运算符 none 确实对其施加了背压。您需要添加一个在 .onBackpressure*() 之后立即执行此操作的运算符。此类运算符之一是 .observeOn(Scheduler)

Javadoc .observeOn():

Backpressure: This operator honors backpressure from downstream and expects it from the source {@code Observable}. Violating this expectation will lead to {@code MissingBackpressureException}. This is the most common operator where the exception pops up; look for sources up the chain that don't support backpressure, such as {@code interval}, {@code timer}, {code PublishSubject} or {@code BehaviorSubject} and apply any of the {@code onBackpressureXXX} operators before applying applying {@code observeOn} itself.

因此可行的代码可能如下所示:

final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
    @Override
    public Observable<Observable<byte[]>> call() {
        int properties = gattCharacteristic.getProperties();
        BluetoothGattDescriptor cccDescriptor = gattCharacteristic
                .getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
        NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
                : NotificationSetupMode.COMPAT;
        if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
            return connection.setupNotification(gattCharacteristic, setupMode);
        }

        if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
            return connection.setupIndication(gattCharacteristic, setupMode);
        }

        return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
    }
})
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
    @Override
    public Observable<byte[]> call(Observable<byte[]> observable) {
        return observable;
    }
})
.doOnUnsubscribe(new Action0() {
    @Override
    public void call() {
        promise.resolve(null);
        transactions.removeSubscription(transactionId);
    }
})
.onBackpressureBuffer(1000) // <---- Here is my modification
.observeOn(Schedulers.trampoline()) // <---- an operator that does backpressure the above
.subscribe(new Observer<byte[]>() {
    @Override
    public void onCompleted() {
        promise.resolve(null);
        transactions.removeSubscription(transactionId);
    }

    @Override
    public void onError(Throwable e) {
        errorConverter.toError(e).reject(promise);
        transactions.removeSubscription(transactionId);
    }

    @Override
    public void onNext(byte[] bytes) {
        characteristic.logValue("Notification from", bytes);
        WritableArray jsResult = Arguments.createArray();
        jsResult.pushNull();
        jsResult.pushMap(characteristic.toJSObject(bytes));
        jsResult.pushString(transactionId);
        sendEvent(Event.ReadEvent, jsResult);
    }
});