RxPy передает значение наблюдателю

Есть ли способ передать значение наблюдателю в соответствии с пользовательским вводом (что означает, что передаваемое значение не фиксируется постоянно)?

from rx import Observable, Observer

def push_five_strings(observer,value):
        observer.on_next(value)
        #observer.on_next("Alpha")
        observer.on_completed()


class PrintObserver(Observer):

    def on_next(self, value):
        print("Received {0}".format(value))

    def on_completed(self):
        print("Done!")

    def on_error(self, error):
        print("Error Occurred: {0}".format(error))

strings = [("Alpha", "Beta", "Gamma", "Delta", "Epsilon")]
for i in strings:
        push_five_strings(strings) #e.g. getting the values to push in one string at a time from a list of strings
#push_five_strings("Gamma")
#push_five_strings("Alpha")
#push_five_strings("Beta")
#push_five_strings("Delta")

source = Observable.create(push_five_strings)
#source = Observable.from_(["Alpha", "Beta", "Gamma", "Delta", "Epsilon"])
#source = Observable.from_([value])


source.subscribe(PrintObserver())

Я пытался искать, пытаясь понять RxPy, но в сети практически нет примеров...

1 ответ

from rx import Observable, Observer                                                                                      
import sys                                                                                                               

class PrintObserver(Observer):                                                                                           

    def on_next(self, value):                                                                                            
        print("Received {0}".format(value))                                                                              

    def on_completed(self):                                                                                              
        print("Done!")                                                                                                   

    def on_error(self, error):                                                                                           
        print("Error Occurred: {0}".format(error))                                                                       

Observable.from_(sys.stdin).subscribe(PrintObserver())  

Запуск и ввод результатов в:

abc
Received abc

def
Received def

Done!

Остановите ваш поток ввода с Ctrl+D,

Другие вопросы по тегам