RxJava concatMap, который объединяет только самый последний выпуск
У меня есть система, которая синхронизирует данные удаленно, когда они меняются.
Flowable
испускает элементы, когда приходят новые данные, и я хочу обработать эти данные локально. Если Flowable
испускает другой элемент во время обработки, я хочу подождать, пока это не будет сделано, прежде чем будет обработан следующий элемент. я могу использовать concatMap
за это.
Тем не менее, я хотел бы добавить оптимизацию, где, если Flowable
испускает много элементов, пока обрабатывается первый элемент, я хочу, чтобы испускался только последний элемент, а не все. я могу использовать switchMap
за это.
Проблема в том, что я не хочу, чтобы текущая обработка удалялась при отправке нового элемента. Мне нужна текущая обработка, чтобы закончить, а затем самая последняя эмиссия из Flowable
должен быть обработан.
Есть идеи, как это сделать? Я возился с publish
и buffer
с сигналом открытия и закрытия, но это, кажется, всегда заканчивается тупиком.
Я думаю, что я могу сделать что-то вроде buffer
по таймеру и filter
в выдаваемом списке не является пустым, и map
это последний элемент в списке, но моя система менее реактивна (реагирует на срабатывание таймера, а не изменения данных), и используются дополнительные ресурсы (потому что таймер постоянно срабатывает, даже если восходящий поток не излучал).