Объединение наблюдаемых, когда оба изменяются одновременно

Я пытаюсь интегрировать ReactiveX в мой графический интерфейс с помощью RxPY. Это общий вопрос ReactiveX.

Скажем, у меня есть визуализация, которая зависит от нескольких наблюдаемых потоков с помощью combine_latest(stream1, stream2, plot_function), Это прекрасно работает, когда изменяется один Observable, например, когда пользователь изменяет значение; визуализация обновляется с использованием новых значений. Однако иногда оба Observable обновляются одновременно, например, когда пользователь загружает данные для обоих потоков из одного файла. Технически один Observable будет обновлен раньше другого (в зависимости от того, что будет первым в функции импорта). В результате сюжет будет обновляться дважды, но для всех целей и целей его нужно будет обновлять только один раз.

Некоторые из моих визуализаций являются дорогостоящими для вычисления, поэтому я хочу убедиться, что если оба потока обновляются одновременно, тогда объединенный поток выдает только одно значение. Я могу придумать несколько вариантов:

  1. использование debounce() с небольшим таймаутом (например, 50 мс) в объединенном потоке. Этот подход кажется мне грязным.

  2. Не использовать combine_latest непосредственно. Оберните два потока в новый объект, который также имеет своего рода updating флаг. Если я установлю updating установите флажок True, затем ничего не излучайте, пока я не установлю updating флаг в ложь. Этот подход чувствует себя состоящим, и это разрушает сочетаемость потоков.

  3. Скажите всем визуализациям не обновлять, пока не обновятся все потоки. Опять же, это нарушает инкапсуляцию, потому что визуализация не должна заботиться о том, что происходит вверх по течению. Он должен просто получить новые значения из объединенного потока и сделать красивую картинку.

  4. Сделайте визуализации достаточно детализированными, чтобы одно обновление потока в первую очередь приводило к небольшому снижению производительности. Это невозможно для некоторых визуализаций, таких как визуализация, которая вычисляет сетку на основе точек и размера сетки. Если точки или размер сетки изменяются, то необходимо пересчитать всю сетку.

Есть ли в Rx средство для одновременного обновления нескольких потоков? Я чувствую, что то, что я прошу, это магия.

Для тех, кто создал программы с графическим интерфейсом, используя Rx: есть ли лучшая архитектура, которую я должен использовать для моделей, кроме отправки новых значений через потоки?

Если этот вопрос неясен, пожалуйста, сообщите мне в комментарии, и я постараюсь привести более конкретный пример.

пример

Вот пример программы на Python RxPY:

import rx

stream1 = rx.subjects.BehaviorSubject(1)
stream2 = rx.subjects.BehaviorSubject(2)

rx.Observable\
  .combine_latest(stream1, stream2, lambda x, y: (x, y))\
  .subscribe(print)

stream1.on_next(3)
stream2.on_next(4)

Это печатает:

(1, 2)
(3, 2)
(3, 4)

Как я могу обновить значения stream1 а также stream2 одновременно, чтобы следующее стало результатом?

(1, 2)
(3, 4)

Другими словами, как я мог изменить combine_latest таким образом, чтобы я мог сказать ему вниз по течению "эй, подождите секунду, пока я обновлю другие потоки, прежде чем вы отправите свое следующее значение"?

2 ответа

Это работает в Rx для C#:

var throttled = source.Publish(hot => hot.Buffer(() => hot.Throttle(dueTime));

dueTime значение здесь TimeSpan в.NET. Это просто говорит о том, что окно времени - это то, что вы хотите иметь бездействие, прежде чем значение, которое оно произвело. Это в основном поглощает значения, произведенные "одновременно" в течение определенного промежутка времени.

source в этом случае будет вашим .combine_latest(...) наблюдаемым.

Я нашел один возможный ответ, но он не самый лучший, и я хотел бы, чтобы другие.

Я обнаружил pausable комбинатор. Передавая поток, который испускает True или False, вы можете контролировать, будет ли последовательность приостановлена. Вот модификация моего примера:

import rx

stream1 = rx.subjects.BehaviorSubject(1)
stream2 = rx.subjects.BehaviorSubject(2)

pauser = rx.subjects.BehaviorSubject(True)
rx.Observable\
         .combine_latest(stream1, stream2, lambda x, y: (x, y))\
         .pausable(pauser)\
         .subscribe(print)

# Begin updating simultaneously
pauser.on_next(False)
stream1.on_next(3)
stream2.on_next(4)

# Update done, resume combined stream
pauser.on_next(True)

# Prints:
# (1, 2)
# (3, 4)

Чтобы применить к моему GUI, я могу создать BehaviorSubject называется updating в моей модели, которая выдает, обновляется ли вся модель или нет. Например, если stream1 а также stream2 одновременно обновляются, тогда я могу установить updating к Истине. На любую визуализацию, которая стоит дорого, я могу применить значение updating приостановить объединенный поток.

Другие вопросы по тегам