Акселерометр RxJava Противодавление

Возникли проблемы с противодавлением. Использование субъекта публикации для получения события датчика при отправке и необходимость сохранения данных в базе данных при подписке на предмет в транзакции.

Я пытался использовать оператор.window(100), чтобы я мог выполнять массовую вставку всякий раз, когда получаю 100 событий датчика подряд, но я могу получить только один элемент в.subscribe()

Не хочу отбрасывать события, используя оператор буфера. Какой правильный способ справиться с этим?

@Override
public void onSensorChanged(SensorEvent sevent) {

    Sensor sensor = sevent.sensor;

    switch (sensor.getType()) {
        case Sensor.TYPE_ACCELEROMETER:
            sensorEventPublishSubject.onNext(sevent);
            break;
    }
}

sensorEventPublishSubject
            .map(event ->
                    new AccModel(
                            event.values[0],
                            event.values[1],
                            event.values[2],
                            event.accuracy                           
                    )
            )
            .window(100)
            .subscribe(
                    new Action1<Observable<AccModel>>() {
                        @Override
                        public void call(Observable<AccModel> accModelObservable) {
                            //insert in db
                        }
                    }
            );

1 ответ

У вас есть два варианта, в зависимости от того, что вы хотите сделать с событием onError.

Во-первых, ваше решение использует .window правильно, просто он генерирует Observable, и вы будете получать один Observable на каждые 100 событий, и что Observable будет, когда вы подпишетесь на него, воспроизвести эти 100 событий. Кроме того, в случае ошибки он также будет последовательно воспроизводить событие ошибки (AFAIK).

Если вас не волнует событие ошибки в последовательности, есть решение с .buffer(100)перед которым стоит поставить onErrorReturn() или же onErrorResumeNext() который вы будете использовать для преобразования onError событие в onNext, Это потому, что в случае onError, buffer Оператор немедленно распространяет его, поэтому вы теряете события во временном буфере (<100).

Другие вопросы по тегам