RxJava concatMap, который объединяет только самый последний выпуск

У меня есть система, которая синхронизирует данные удаленно, когда они меняются.

Flowable испускает элементы, когда приходят новые данные, и я хочу обработать эти данные локально. Если Flowable испускает другой элемент во время обработки, я хочу подождать, пока это не будет сделано, прежде чем будет обработан следующий элемент. я могу использовать concatMap за это.

Тем не менее, я хотел бы добавить оптимизацию, где, если Flowable испускает много элементов, пока обрабатывается первый элемент, я хочу, чтобы испускался только последний элемент, а не все. я могу использовать switchMap за это.

Проблема в том, что я не хочу, чтобы текущая обработка удалялась при отправке нового элемента. Мне нужна текущая обработка, чтобы закончить, а затем самая последняя эмиссия из Flowable должен быть обработан.

Есть идеи, как это сделать? Я возился с publish и buffer с сигналом открытия и закрытия, но это, кажется, всегда заканчивается тупиком.

Я думаю, что я могу сделать что-то вроде buffer по таймеру и filter в выдаваемом списке не является пустым, и map это последний элемент в списке, но моя система менее реактивна (реагирует на срабатывание таймера, а не изменения данных), и используются дополнительные ресурсы (потому что таймер постоянно срабатывает, даже если восходящий поток не излучал).

0 ответов

Другие вопросы по тегам