Создание пользовательских операторов в rxcpp

Я пытаюсь научиться создавать пользовательские операторы в rxcpp, и я смог создать операторы, как указано здесь. Но я хотел бы узнать, как создавать более общие операторы, реализующие rxo::operator_base и использующие оператор подъема. Есть ли документация, чтобы узнать это на нескольких простых примерах?

2 ответа

Вот способ использования наблюдаемой функции подъема rxcpp v2:

class MyTestOp //: public rxcpp::operators::operator_base<int>
{
public:
    MyTestOp(){}
    ~MyTestOp(){}

    rxcpp::subscriber<int> operator() (rxcpp::subscriber<int> s) const {
        return rxcpp::make_subscriber<int>([s](const int & next) {
            s.on_next(std::move(next + 1));
        },  [&s](const std::exception_ptr & e) {
            s.on_error(e);
        },  [&s]() {
            s.on_completed();
        });
    }
};


int main()
{
    auto keys = rxcpp::observable<>::create<int>(
        [](rxcpp::subscriber<int> dest){
            for (;;) {
                int key = std::cin.get();
                dest.on_next(key);
            }
        }).
    publish();

    keys.lift<int>(MyTestOp()).subscribe([](int key){ 
        std::cout << key << std::endl;
    });

    // same as use class
    //keys.lift<int>([](rxcpp::subscriber<int> s) { 
    //    return rxcpp::make_subscriber<int>([s](const int & next) {
    //        s.on_next(std::move(next + 1));
    //    },  [&s](const std::exception_ptr & e) {
    //        s.on_error(e);
    //    },  [&s]() {
    //        s.on_completed();
    //    });
    //}).subscribe([](int key){ 
    //    std::cout << key << std::endl;
    //});

    // run the loop in create
    keys.connect();

    return 0;
}

Так как он основан на проверке шаблона, вам не нужно наследовать от какого-либо интерфейса, просто реализуйте operator(), как раньше, все будет в порядке.

И я думаю, что автор предпочел бы использовать способ в комментариях.

И, возможно, я должен использовать проверку подписки... в любом случае...

if(!s.isUnsubscribed()) { /*call s.on_xxx*/ }

Я нашел следующий слайд из презентации Кирка 2016 года весьма полезным, хотя он касается rxcppv3, а не v2.

Понятия последовательности

      struct observable {
    void bind(observer);
};

struct observer {
    template<class T>
    void next(T);
};

struct lifter {
    observer lift(observer);
};

Реализации последовательности

      const auto ints = [](auto first, auto last){
  return make_observable([=](auto r){ // Define observable::bind
    for(auto i = first;; ++i){
      r.next(i);
      if (i == last) break;
    }
  });
};

const auto copy_if = [](auto pred){
  return make_lifter([=](auto r){
    return make_observer(r, [=](auto& r, auto v){ // Define observer::next
        if (pred(v)) r.next(v);
    });
  });
};
Другие вопросы по тегам