Описание тега rx-py
Реактивные расширения для Python
5
ответов
Как отключить длительно работающую программу с использованием rxpython?
Скажите, у меня есть долго работающая функция Python, которая выглядит примерно так? import random import time from rx import Observable def intns(x): y = random.randint(5,10) print(y) print('begin') time.sleep(y) print('end') return x Я хочу иметь …
20 июл '17 в 22:30
1
ответ
RX поток запуска и остановки событий с поздней подпиской
Я пытаюсь сделать тему запуска и остановки событий, когда поздние подписчики получают только выдающиеся стартовые события. то есть. те, у которых не было соответствующего события остановки. Вот некоторый код RxPY: from rx.subjects import ReplaySubje…
27 окт '17 в 10:23
0
ответов
rxpy: buffer_with_time со случайным джиттером?
Я использую buffer_with_time, он принимает аргумент ключевого слова: timespan – Length of each buffer (specified as an integer denoting milliseconds) Тем не менее, отметка времени является фиксированной. Я хочу добавить к нему случайный джиттер. Есл…
17 янв '18 в 03:45
0
ответов
Как добавить много значений в Reactive Subject?
У нас есть s = Subject() объект и набор подписок на его значения. Мы получаем данные в контейнерах (скажем, по 1000 единиц) и в настоящее время выполняем: Observable.from_(data.items).subscribe( lambda x: s.on_next(x)) активировать подписчиков и пер…
06 авг '17 в 05:47
0
ответов
RxPy - Как остановить асинхронную наблюдаемую цепочку / работу
Я новичок в ReactiveX, и я хочу сделать что-то простое, но я не могу найти решение. Я ищу способ остановить асинхронную работу, когда она запланирована в планировщике цикла событий. Ниже приведен пример, показывающий проблему: def compute_1(item): p…
02 апр '18 в 12:33
1
ответ
RxPy передает значение наблюдателю
Есть ли способ передать значение наблюдателю в соответствии с пользовательским вводом (что означает, что передаваемое значение не фиксируется постоянно)? from rx import Observable, Observer def push_five_strings(observer,value): observer.on_next(val…
14 июл '17 в 07:17
1
ответ
Вернуть значение из наблюдаемого через rxpy
Каков элегантный способ преобразования объекта rx.Observable в "нормальный" объект в функции? например: def foo(): return rx.Observable.just('value').subscribe(<some magic here>) >>> print(foo()) # expected: # value # however get: # &…
05 сен '17 в 09:17
1
ответ
Тип Void/Unit для вызова функции on_next без аргументов
Я чувствую, что могу упустить что-то очень очевидное здесь, но в любом случае... При использовании RxPY, есть ли эквивалент C# RX System.Reactive.Unit тип для вызова on_next функция без аргументов? from rx import Observable source = Observable.retur…
26 окт '17 в 14:39
1
ответ
Запуск функции при изменении любого свойства объекта с помощью RxPy
В RxPy есть что-нибудь похожее на INotifyPropertyChanged в.NET Framework упоминается здесь? Я пытаюсь добавить наблюдателя к объекту, чтобы любое свойство объекта изменилось, и будет вызвана функция.
28 сен '16 в 06:41
1
ответ
Наблюдение за нажатием кнопки с помощью RxPY
Как я могу "прослушать" поток событий нажатия клавиш с помощью Python? Я хочу сделать что-то вроде этого: click_stream.map(lambda k: k.key) Но как я могу создать это click_stream?
05 фев '16 в 11:48
0
ответов
Регулирование событий в RxPY
Я читаю видеокадры из видеозахвата OpenCV и хотел бы оставить некоторые, чтобы поддерживать актуальность в случае медленной обработки. Как я могу гарантировать, что кадры не будут накапливаться в памяти, используя следующий код? class ImageSource(Su…
03 фев '19 в 06:24
0
ответов
Как заставить python реактивный подписываться постоянно
Я пытаюсь сделать заметную реакцию на дополнение к списку, но я не могу этого сделать. Он не реагирует, пока я не позвоню подписаться: >>> from rx import Observer >>> alist= [1,2,3] >>> source = Observable.from_list(alist)…
05 фев '19 в 05:50
1
ответ
Как я могу уведомить наблюдателей RxPY в отдельных потоках, используя asyncio?
(Примечание: фон этой проблемы довольно многословен, но внизу есть SSCCE, к которому можно пропустить) Фон Я пытаюсь разработать CLI на основе Python для взаимодействия с веб-сервисом. В моей кодовой базе у меня есть CommunicationService класс, кото…
04 сен '16 в 15:48
0
ответов
Пример буфера RxPY с использованием закрывающей селекторной функции?
Ищем пример использования RxPy, где буферизация выполняется с использованием закрывающей селекторной функции, чтобы решить, когда закрывать буфер. В моем примере я хочу иметь возможность закрыть границы моего буфера на основе значений, видимых в сам…
28 июл '17 в 16:14
2
ответа
Построение двойного бесконечного опроса в RX
Проблема состоит в том, чтобы эмулировать поведение двойной петли с RX: while True: try: token = get_token() while True: try: value = get_value_using_token(token) do_something(value) except: break except: break Было бы чисто, если бы две петли были …
06 апр '16 в 18:21
1
ответ
Веб-сервис Python, подписанный на реактивный источник, вызывает странное поведение в объекте
Я реализовал веб-сервис, используя Falcon. Эта служба хранит конечный автомат (pytransitions), который передается ресурсам службы в конструкторе. Служба работает с Gunicorn. Веб-сервис запускает процесс при запуске с использованием RxPy. Событие вер…
14 май '18 в 06:59
1
ответ
RXPY вводить предметы в наблюдаемую
Этот вопрос касается rxpy. Я пытаюсь построить реактивную систему, которая обрабатывает сообщения от наблюдаемого источника. В дополнение к этому я пытаюсь интегрировать это с системой выборов лидера, основанной на zookeeper. Эта комбинация позволит…
18 апр '17 в 15:11
3
ответа
Как ждать завершения параллельных потоков RxPy
Основываясь на этом превосходном ответе SO, я могу получить несколько задач, работающих параллельно в RxPy, моя проблема в том, как вы ждете, пока они завершатся? Я знаю, используя потоки я могу сделать .join() но, кажется, нет такой опции с Rx Sche…
15 май '17 в 21:33
0
ответов
Как узнать, какую версию RxPY я использую?
Я не могу вспомнить, какую версию RxPY я установил, и очевидное, похоже, не работает: In[33]: import rx In[34]: rx.__version__ Traceback (most recent call last): File "C:\Program Files\Python\2.7\lib\site-packages\IPython\core\interactiveshell.py", …
23 фев '16 в 16:57
2
ответа
Объединение наблюдаемых, когда оба изменяются одновременно
Я пытаюсь интегрировать ReactiveX в мой графический интерфейс с помощью RxPY. Это общий вопрос ReactiveX. Скажем, у меня есть визуализация, которая зависит от нескольких наблюдаемых потоков с помощью combine_latest(stream1, stream2, plot_function), …
19 авг '15 в 19:40