Где я должен поставить onBackPressureBuffer(n) в цепочке подписки RxJava?
Я пытаюсь исправить существующую библиотеку React Native response-native-ble-plx, добавив onBackPressureBuffer() в существующий код Java.
Я знаю, что это уродливо, но у меня нет времени, чтобы подать PR прямо сейчас, и есть нерешенная проблема, которая может решить проблему. Я делаю это потому, что генератор событий работает на частоте 200 Гц. Мне нужен безопасный способ буферизации элементов на нативной стороне, пока они потребляются в своем собственном темпе на стороне JavaScript.
Таким образом, код становится следующим:
final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
@Override
public Observable<Observable<byte[]>> call() {
int properties = gattCharacteristic.getProperties();
BluetoothGattDescriptor cccDescriptor = gattCharacteristic
.getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
: NotificationSetupMode.COMPAT;
if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
return connection.setupNotification(gattCharacteristic, setupMode);
}
if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
return connection.setupIndication(gattCharacteristic, setupMode);
}
return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
}
}).onBackpressureBuffer(1000) <---- Here is my modification
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
}).subscribe(new Observer<byte[]>() {
@Override
public void onCompleted() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
@Override
public void onError(Throwable e) {
errorConverter.toError(e).reject(promise);
transactions.removeSubscription(transactionId);
}
@Override
public void onNext(byte[] bytes) {
characteristic.logValue("Notification from", bytes);
WritableArray jsResult = Arguments.createArray();
jsResult.pushNull();
jsResult.pushMap(characteristic.toJSObject(bytes));
jsResult.pushString(transactionId);
sendEvent(Event.ReadEvent, jsResult);
}
});
Моя проблема в том, что даже с этим дополнением я испытываю исключения MissingBackPressure.
Я пробовал onBackPressureDrop(), и у меня точно такое же поведение. Поэтому я предполагаю, что делаю это неправильно, но не могу понять, почему прямо сейчас.
Любая помощь приветствуется.
1 ответ
Как вы уже сказали, вы столкнулись с проблемой react-native
библиотека и приведенный выше код MissingBackpressureException
ранее.
Из ага .onBackpressureDrop()
(жирный рудник):
Дает команду Наблюдателю, который испускает предметы быстрее, чем его наблюдатель может потреблять, чтобы сбрасывать, а не испускать те предметы, которые его наблюдатель не готов наблюдать.
Если количество запросов в нисходящем направлении достигает 0, то Observable будет воздерживаться от вызова {@code onNext}, пока наблюдатель снова не вызовет {@code request(n)} для увеличения количества запросов.
- Обратное давление:
- Оператор распознает обратное давление из нисходящего потока и использует источник {@code Observable} неограниченным образом (т. Е. Не применяя к нему обратное давление).
- Планировщик:
- {@code onBackpressureDrop} не работает по умолчанию для определенного {@link Scheduler}.
Вы можете видеть, что следующие операторы в цепочке .flatMap()
, .doOnUnsubscribe()
а также .subscribe()
,
Из ага .flatMap()
относительно противодавления:
Обратное давление: Оператор учитывает противодавление вниз по течению. Внешний {@code Observable} потребляется в неограниченном режиме (т.е. к нему не применяется противодавление). Внутренние {@code Observable} ожидают противодавления; в случае нарушения оператор может дать сигнал {@code MissingBackpressureException}.
Javadoc .doOnUnsubscribe()
:
Обратное давление: {@code doOnUnsubscribe} не взаимодействует с запросами обратного давления или доставкой значений; поведение противодавления сохраняется между его восходящим и нисходящим потоком.
А также .subscribe()
:
Обратное давление: Оператор потребляет источник {@code Observable} неограниченным образом (т.е. к нему не применяется обратное давление).
Как вы можете видеть ни один из операторов ниже .onBackpressure*()
действительно оказывает противодавление на это. Вам нужно будет добавить оператор, который делает это сразу после .onBackpressure*()
, Одним из таких операторов является .observeOn(Scheduler)
Javadoc .observeOn()
:
Обратное давление: Этот оператор учитывает обратное давление в нисходящем направлении и ожидает его от источника {@code Observable}. Нарушение этого ожидания приведет к {@code MissingBackpressureException}. Это самый распространенный оператор, где появляется исключение; ищите источники в цепочке, которые не поддерживают противодавление, например {@code interval}, {@code timer}, {code PublishSubject} или {@code BehaviorSubject} и применяйте любой из операторов {@code onBackpressureXXX} перед применением применяя {@code Наблюдать за собой}.
Так что работающий код может выглядеть так:
final Subscription subscription = Observable.defer(new Func0<Observable<Observable<byte[]>>>() {
@Override
public Observable<Observable<byte[]>> call() {
int properties = gattCharacteristic.getProperties();
BluetoothGattDescriptor cccDescriptor = gattCharacteristic
.getDescriptor(Characteristic.CLIENT_CHARACTERISTIC_CONFIG_UUID);
NotificationSetupMode setupMode = cccDescriptor != null ? NotificationSetupMode.QUICK_SETUP
: NotificationSetupMode.COMPAT;
if ((properties & BluetoothGattCharacteristic.PROPERTY_NOTIFY) != 0) {
return connection.setupNotification(gattCharacteristic, setupMode);
}
if ((properties & BluetoothGattCharacteristic.PROPERTY_INDICATE) != 0) {
return connection.setupIndication(gattCharacteristic, setupMode);
}
return Observable.error(new CannotMonitorCharacteristicException(gattCharacteristic));
}
})
.flatMap(new Func1<Observable<byte[]>, Observable<byte[]>>() {
@Override
public Observable<byte[]> call(Observable<byte[]> observable) {
return observable;
}
})
.doOnUnsubscribe(new Action0() {
@Override
public void call() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
})
.onBackpressureBuffer(1000) // <---- Here is my modification
.observeOn(Schedulers.trampoline()) // <---- an operator that does backpressure the above
.subscribe(new Observer<byte[]>() {
@Override
public void onCompleted() {
promise.resolve(null);
transactions.removeSubscription(transactionId);
}
@Override
public void onError(Throwable e) {
errorConverter.toError(e).reject(promise);
transactions.removeSubscription(transactionId);
}
@Override
public void onNext(byte[] bytes) {
characteristic.logValue("Notification from", bytes);
WritableArray jsResult = Arguments.createArray();
jsResult.pushNull();
jsResult.pushMap(characteristic.toJSObject(bytes));
jsResult.pushString(transactionId);
sendEvent(Event.ReadEvent, jsResult);
}
});