rxcpp: вложенный цикл while или аналогичная "классическая" императивная структура для программы

У меня есть устройство, которое передает некоторые события. Я хочу использовать реактивные расширения для моделирования следующего поведения:

  1. Определить, когда пользователь подключает ключ (моя программа проверяет наличие событий для подключенного ключа).
  2. Начните захватывать поток данных из ключа после того, как ключ подключен.
  3. Уметь определять, когда ключ был отключен, и возвращаться к 1. Затем, если пользователь снова подключит ключ, я хочу перейти к 2. В состоянии, когда я передаю данные в потоковом режиме, если нажата клавиатура, тогда программа завершает работу.

Я знаю, как ждать подключения ключа (1.):

auto waitForDongle = events.take_while([](auto const & event) {
      return event == DongleConnected
      }).subscribe([](auto) {});

И я знаю, как захватить поток (2.):

auto streamMotionData = events.take_while([](auto const &) { return !keyboardPressed(); })
    .map([](auto const & evt) -> boost::optional<std::vector<double>> {
            ...
            return data;
        }).subscribe([](vector<double> const &) { ...});

Моя проблема в том, что я не знаю, как объединить потоки, чтобы вернуться к 1. и позже 2. Я просто знаю, как сделать это один раз и последовательно. Но я хочу поведение, описанное выше.

1 ответ

Решение

Это связано с распространенным примером перетаскивания UX в Rx. В этом случае пресс представляет собой соединение с ключом, а выпуск - это удаление ключа.

Это решение требует, чтобы только один ключ мог быть подключен за раз (удаление ожидается до следующего подключения). Там должно быть больше информации в случае, чтобы позволить нескольким соединениям перекрываться.

Вот суть решения. Вся программа ниже.

    auto DatasFromConnectedDongle = DongleConnects. // when connected
        map([=](DongleEvent const & event){
            assert(event == DongleEvent::Connected);
            cout << "Connected - " << flush;
            return DongleDatas. // return all the datas
                take_until(DongleRemoves). // stop when removed
                finally([](){
                    cout << "- Removed" << endl;
                });
        }).
        switch_on_next(). // only listen to datas from the most recent connected dongle
        take_until(Exits); // stop everything when key is pressed

Я в конечном итоге с помощью repeat(), но только для получения данных тестового события.

#include <rxcpp/rx.hpp>

namespace Rx {
using namespace rxcpp;
using namespace rxcpp::sources;
using namespace rxcpp::operators;
using namespace rxcpp::util;
}
using namespace Rx;

#include <cassert>
using namespace std;
using namespace std::chrono;

int main()
{
    //
    // test code
    //

    auto keyboardPressed = [](){
        return false;
    };

    enum class DongleEvent {
        Connected,
        Removed,
        Data,
        Other
    };
    auto events = from(
            DongleEvent::Data, DongleEvent::Other, 
            DongleEvent::Connected, DongleEvent::Data, 
            DongleEvent::Other, DongleEvent::Other, 
            DongleEvent::Data, DongleEvent::Removed, 
            DongleEvent::Other, DongleEvent::Data).
        repeat(5). // send the above events five times over
        zip(take_at<0>(), interval(milliseconds(200))). // pace our test data
        publish().
        ref_count(); // publish and ref_count make the events sharable

    //
    // the solution
    //

    // fires when connected
    auto DongleConnects = events.
        filter([](DongleEvent const & event) {
            return event == DongleEvent::Connected;
        });

    // fires when data arrives
    auto DongleDatas = events.
        filter([](DongleEvent const & event) {
            return event == DongleEvent::Data;
        });

    // fires when removed    
    auto DongleRemoves = events.
        filter([](DongleEvent const & event) {
            return event == DongleEvent::Removed;
        });

    // fires when key pressed    
    auto Exits = interval(milliseconds(200)).
        filter([=](long){
            return keyboardPressed();
        });

    auto DatasFromConnectedDongle = DongleConnects.
        map([=](DongleEvent const & event){
            assert(event == DongleEvent::Connected);
            cout << "Connected - " << flush;
            return DongleDatas. // return all the datas
                take_until(DongleRemoves). // stop when removed
                finally([](){
                    cout << "- Removed" << endl;
                });
        }).
        switch_on_next(). // only listen to datas from the most recent connected dongle
        take_until(Exits); // stop everything when key is pressed

    DatasFromConnectedDongle.subscribe([](DongleEvent const & event){
        assert(event == DongleEvent::Data);
        cout << "Data " << flush;
    });

    return 0;
}

производит

~/source/rxcpp/Rx/v2/examples/dongle (master)$ cmake .
...
~/source/rxcpp/Rx/v2/examples/dongle (master)$ make
Scanning dependencies of target dongle
[ 50%] Building CXX object CMakeFiles/dongle.dir/main.cpp.o
[100%] Linking CXX executable dongle
[100%] Built target dongle
~/source/rxcpp/Rx/v2/examples/dongle (master)$ ./dongle 
Connected - Data Data - Removed
Connected - Data Data - Removed
Connected - Data Data - Removed
Connected - Data Data - Removed
Connected - Data Data - Removed
~/source/rxcpp/Rx/v2/examples/dongle (master)$ 
Другие вопросы по тегам