Где я должен поставить onBackPressureBuffer(n) в цепочке подписки RxJava?

Я пытаюсь исправить существующую библиотеку React Native response-native-ble-plx, добавив onBackPressureBuffer() в существующий код Java.

Я знаю, что это уродливо, но у меня нет времени, чтобы подать PR прямо сейчас, и есть нерешенная проблема, которая может решить проблему. Я делаю это потому, что генератор событий работает на частоте 200 Гц. Мне нужен безопасный способ буферизации элементов на нативной стороне, пока они потребляются в своем собственном темпе на стороне JavaScript.

Таким образом, код становится следующим:

       final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
            @Override
            public Observable<Observable<byte[]>> call() {
                int properties = gattCharacteristic.getProperties();
                BluetoothGattDescriptor cccDescriptor = gattCharacteristic
                        .getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
                NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
                        : NotificationSetupMode.COMPAT;
                if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
                    return connection.setupNotification(gattCharacteristic, setupMode);
                }

                if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
                    return connection.setupIndication(gattCharacteristic, setupMode);
                }

                return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
            }
        }).onBackpressureBuffer(1000)  <---- Here is my modification
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
            @Override
            public Observable<byte[]> call(Observable<byte[]> observable) {
                return observable;
            }
        }).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                promise.resolve(null);
                transactions.removeSubscription(transactionId);
            }
        }).subscribe(new Observer<byte[]>() {
            @Override
            public void onCompleted() {
                promise.resolve(null);
                transactions.removeSubscription(transactionId);
            }

            @Override
            public void onError(Throwable e) {
                errorConverter.toError(e).reject(promise);
                transactions.removeSubscription(transactionId);
            }

            @Override
            public void onNext(byte[] bytes) {
                characteristic.logValue("Notification from", bytes);
                WritableArray jsResult = Arguments.createArray();
                jsResult.pushNull();
                jsResult.pushMap(characteristic.toJSObject(bytes));
                jsResult.pushString(transactionId);
                sendEvent(Event.ReadEvent, jsResult);
            }
        });

Моя проблема в том, что даже с этим дополнением я испытываю исключения MissingBackPressure.

Я пробовал onBackPressureDrop(), и у меня точно такое же поведение. Поэтому я предполагаю, что делаю это неправильно, но не могу понять, почему прямо сейчас.

Любая помощь приветствуется.

1 ответ

Решение

Как вы уже сказали, вы столкнулись с проблемой react-native библиотека и приведенный выше код MissingBackpressureException ранее.

Из ага .onBackpressureDrop() (жирный рудник):

Дает команду Наблюдателю, который испускает предметы быстрее, чем его наблюдатель может потреблять, чтобы сбрасывать, а не испускать те предметы, которые его наблюдатель не готов наблюдать.

Если количество запросов в нисходящем направлении достигает 0, то Observable будет воздерживаться от вызова {@code onNext}, пока наблюдатель снова не вызовет {@code request(n)} для увеличения количества запросов.

Обратное давление:
Оператор распознает обратное давление из нисходящего потока и использует источник {@code Observable} неограниченным образом (т. Е. Не применяя к нему обратное давление).
Планировщик:
{@code onBackpressureDrop} не работает по умолчанию для определенного {@link Scheduler}.

Вы можете видеть, что следующие операторы в цепочке .flatMap(), .doOnUnsubscribe() а также .subscribe(),

Из ага .flatMap() относительно противодавления:

Обратное давление:
Оператор учитывает противодавление вниз по течению. Внешний {@code Observable} потребляется в неограниченном режиме (т.е. к нему не применяется противодавление). Внутренние {@code Observable} ожидают противодавления; в случае нарушения оператор может дать сигнал {@code MissingBackpressureException}.

Javadoc .doOnUnsubscribe():

Обратное давление:
{@code doOnUnsubscribe} не взаимодействует с запросами обратного давления или доставкой значений; поведение противодавления сохраняется между его восходящим и нисходящим потоком.

А также .subscribe():

Обратное давление:
Оператор потребляет источник {@code Observable} неограниченным образом (т.е. к нему не применяется обратное давление).

Как вы можете видеть ни один из операторов ниже .onBackpressure*() действительно оказывает противодавление на это. Вам нужно будет добавить оператор, который делает это сразу после .onBackpressure*(), Одним из таких операторов является .observeOn(Scheduler)

Javadoc .observeOn():

Обратное давление: Этот оператор учитывает обратное давление в нисходящем направлении и ожидает его от источника {@code Observable}. Нарушение этого ожидания приведет к {@code MissingBackpressureException}. Это самый распространенный оператор, где появляется исключение; ищите источники в цепочке, которые не поддерживают противодавление, например {@code interval}, {@code timer}, {code PublishSubject} или {@code BehaviorSubject} и применяйте любой из операторов {@code onBackpressureXXX} перед применением применяя {@code Наблюдать за собой}.

Так что работающий код может выглядеть так:

final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
    @Override
    public Observable<Observable<byte[]>> call() {
        int properties = gattCharacteristic.getProperties();
        BluetoothGattDescriptor cccDescriptor = gattCharacteristic
                .getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
        NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
                : NotificationSetupMode.COMPAT;
        if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
            return connection.setupNotification(gattCharacteristic, setupMode);
        }

        if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
            return connection.setupIndication(gattCharacteristic, setupMode);
        }

        return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
    }
})
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
    @Override
    public Observable<byte[]> call(Observable<byte[]> observable) {
        return observable;
    }
})
.doOnUnsubscribe(new Action0() {
    @Override
    public void call() {
        promise.resolve(null);
        transactions.removeSubscription(transactionId);
    }
})
.onBackpressureBuffer(1000) // <---- Here is my modification
.observeOn(Schedulers.trampoline()) // <---- an operator that does backpressure the above
.subscribe(new Observer<byte[]>() {
    @Override
    public void onCompleted() {
        promise.resolve(null);
        transactions.removeSubscription(transactionId);
    }

    @Override
    public void onError(Throwable e) {
        errorConverter.toError(e).reject(promise);
        transactions.removeSubscription(transactionId);
    }

    @Override
    public void onNext(byte[] bytes) {
        characteristic.logValue("Notification from", bytes);
        WritableArray jsResult = Arguments.createArray();
        jsResult.pushNull();
        jsResult.pushMap(characteristic.toJSObject(bytes));
        jsResult.pushString(transactionId);
        sendEvent(Event.ReadEvent, jsResult);
    }
});
Другие вопросы по тегам