Создание пользовательских операторов в rxcpp
Я пытаюсь научиться создавать пользовательские операторы в rxcpp, и я смог создать операторы, как указано здесь. Но я хотел бы узнать, как создавать более общие операторы, реализующие rxo::operator_base и использующие оператор подъема. Есть ли документация, чтобы узнать это на нескольких простых примерах?
2 ответа
Вот способ использования наблюдаемой функции подъема rxcpp v2:
class MyTestOp //: public rxcpp::operators::operator_base<int>
{
public:
MyTestOp(){}
~MyTestOp(){}
rxcpp::subscriber<int> operator() (rxcpp::subscriber<int> s) const {
return rxcpp::make_subscriber<int>([s](const int & next) {
s.on_next(std::move(next + 1));
}, [&s](const std::exception_ptr & e) {
s.on_error(e);
}, [&s]() {
s.on_completed();
});
}
};
int main()
{
auto keys = rxcpp::observable<>::create<int>(
[](rxcpp::subscriber<int> dest){
for (;;) {
int key = std::cin.get();
dest.on_next(key);
}
}).
publish();
keys.lift<int>(MyTestOp()).subscribe([](int key){
std::cout << key << std::endl;
});
// same as use class
//keys.lift<int>([](rxcpp::subscriber<int> s) {
// return rxcpp::make_subscriber<int>([s](const int & next) {
// s.on_next(std::move(next + 1));
// }, [&s](const std::exception_ptr & e) {
// s.on_error(e);
// }, [&s]() {
// s.on_completed();
// });
//}).subscribe([](int key){
// std::cout << key << std::endl;
//});
// run the loop in create
keys.connect();
return 0;
}
Так как он основан на проверке шаблона, вам не нужно наследовать от какого-либо интерфейса, просто реализуйте operator(), как раньше, все будет в порядке.
И я думаю, что автор предпочел бы использовать способ в комментариях.
И, возможно, я должен использовать проверку подписки... в любом случае...
if(!s.isUnsubscribed()) { /*call s.on_xxx*/ }
Я нашел следующий слайд из презентации Кирка 2016 года весьма полезным, хотя он касается rxcppv3, а не v2.
Понятия последовательности
struct observable {
void bind(observer);
};
struct observer {
template<class T>
void next(T);
};
struct lifter {
observer lift(observer);
};
Реализации последовательности
const auto ints = [](auto first, auto last){
return make_observable([=](auto r){ // Define observable::bind
for(auto i = first;; ++i){
r.next(i);
if (i == last) break;
}
});
};
const auto copy_if = [](auto pred){
return make_lifter([=](auto r){
return make_observer(r, [=](auto& r, auto v){ // Define observer::next
if (pred(v)) r.next(v);
});
});
};