Построить шаблон Observer/Observable с использованием RxCpp

Я пытаюсь реализовать observer/observable шаблон в Rx-cpp, Это очень интересный урок Rx.Net о том, как кто-то может к этому.

В этом C# Например, есть конкретные interfaces что мы должны переопределить:

public interface IObserver<in T>
{
    void OnCompleted();
    void OnError(Exception error);
    void OnNext(T value);
}


public interface IObservable<out T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

Насколько я понимаю, в Rx-cpp нет такого удобства. Итак, можно ли привести пример заголовка (myObservable.h / myObserver.h), аналогично interfaces выше, что я могу использовать в качестве руководства для определения того же шаблона коммуникации?

Любая помощь высоко ценится, спасибо!

РЕДАКТИРОВАТЬ 1: Благодаря @zentrunix Я пытаюсь сделать классно-ориентированное общение. Пока у меня есть код ниже для наблюдаемой модели. То, что я хочу, - это определить список наблюдателей, которые будут прикреплены к наблюдаемой, и когда OnNext называется эти наблюдатели должны быть уведомлены. Тем не менее, есть недостающие части.

  1. Как я могу subscribe() на этих наблюдателей (Rx::subscribers<int>) когда myObservable::Subscribe() функции называется.
  2. Также как я могу unsubscribe(),
  3. Наконец, как соответствующий o.subscribe(onNext, onEnd); в нескольких onNext наблюдатели это будет? Можно ли построить соответствующий myObserver учебный класс? (снова вдохновлено здесь)
  4. Извините за вопрос, но имеет ли смысл такая организация? До сих пор я работал с архитектурой, представленной в этом руководстве, и поэтому я одержим этой задачей. Я нашел это как способ принять участие в RxCpp, Любые комментарии высоко ценятся. (Опять извините за мое невежество.)

    class myObservable {
    
    private:
    
    std::shared_ptr<std::list<rxcpp::subscriber<int>>> observers;
    
    public:
    
    myObservable() { observers = std::make_shared<std::list<Rx::subscriber<int>>>(); };
    
    Rx::observable<int> Attach(std::shared_ptr<rxcpp::subscriber<int>> out) {
    
        return Rx::observable<>::create<int>([&, out]() {
            auto it = observers->insert(observers->end(), *out);
            it->add([=]() {
                observers->erase(it);
            });
        });
    
    };
    
    void OnNext(int sendItem) {
    
        for (Rx::subscriber<int> observer : *observers) {
            (observer).on_next(sendItem);
        }
    }
    
    void Disposer(Rx::subscriber<int> out) {
    
        observers->erase(std::remove(observers->begin(), observers->end(), &out), observers->end());
    };
    };
    

1 ответ

Очень простой пример в RxCpp ниже. Хотя есть (по крайней мере) одно предостережение: типичный код RxCpp интенсивно использует лямбды, которые мне очень не нравятся.

Я также пытался найти документацию и учебные пособия в Интернете, но не смог найти ни одного. Меня особенно интересуют объяснения о моделях потоков.

Если вы хотите изучить код и документацию Doxygen, на сайте RxCpp GitHub есть много примеров.

#include <iostream>
#include <exception>

#include "rxcpp/rx.hpp"
namespace rx = rxcpp;

static void onNext(int n) { std::cout << "* " << n << "\n"; }
static void onEnd() { std::cout << "* end\n"; }

static void onError(std::exception_ptr ep)
{
  try { std::rethrow_exception(ep); }
  catch (std::exception& e) { std::cout << "* exception " << e.what() << '\n'; }
}

static void observableImpl(rx::subscriber<int> s)
{
  s.on_next(1);
  s.on_next(2);
  s.on_completed();
}

int main()
{
  auto o = rxcpp::observable<>::create<int>(observableImpl);
  std::cout << "*\n";
  o.subscribe(onNext, onEnd);
} 
Другие вопросы по тегам