RxCpp: время жизни наблюдателя, если используется наблюдающая (rxcpp:: наблюдаемая_нон_руда ())

Как правильно подождать, пока все наблюдатели on_completed будут вызваны, если наблюдатели будут использовать наблюдаемую (rxcpp:: meet_on_new_thread()):

Например:

{
    Foo foo;
    auto generator = [&](rxcpp::subscriber<int> s)
    {
        s.on_next(1);
        // ...
        s.on_completed();
    };
    auto values = rxcpp::observable<>::create<int>(generator).publish();
    auto s1 = values.observe_on(rxcpp::observe_on_new_thread())
                    .subscribe([&](int) { slow_function(foo); }));

    auto lifetime = rxcpp::composite_subscription();
    lifetime.add([&](){ wrapper.log("unsubscribe");  });
    auto s2 = values.ref_count().as_blocking().subscribe(lifetime);

    // hope to call something here to wait for the completion of
    // s1's on_completed function
}

// the program usually crashes here when foo goes out of scope because 
// the slow_function(foo) is still working on foo.  I also noticed that
// s1's on_completed never got called.

Мой вопрос заключается в том, как ждать, пока s1 on_completed не будет завершен, без необходимости устанавливать и опрашивать некоторые переменные.

Мотивация использования наблюдателя () заключается в том, что обычно значения имеют несколько наблюдателей, и я бы хотел, чтобы каждый наблюдатель работал одновременно. Возможно, есть разные способы достижения одной и той же цели, я открыт для всех ваших предложений.

1 ответ

Решение

Слияние двух позволит одной подписке блокировать ждать окончания обоих.

{
    Foo foo;
    auto generator = [&](rxcpp::subscriber<int> s)
    {
        s.on_next(1);
        s.on_next(2);
        // ...
        s.on_completed();
    };

    auto values = rxcpp::observable<>::create<int>(generator).publish();

    auto work = values.
        observe_on(rxcpp::observe_on_new_thread()).
        tap([&](int c) {
            slow_function(foo);
        }).
        finally([](){printf("s1 completed\n");}).
        as_dynamic();

    auto start = values.
        ref_count().
        finally([](){printf("s2 completed\n");}).
        as_dynamic();

    // wait for all to finish
    rxcpp::observable<>::from(work, start).
        merge(rxcpp::observe_on_new_thread()).
        as_blocking().subscribe();
}

Несколько баллов.

поток должен возвращать тот же тип, чтобы слияние работало. если вы объединяете потоки разных типов, вместо этого используйте comb_latest.

Порядок наблюдаемых в observable<>::from() важен, стартовый поток имеет ref_count, поэтому он должен называться последним, чтобы следующее слияние подписалось на работу перед запуском генератора.

У слияния есть два потока, вызывающих его. Это требует использования поточно-ориентированной координации. rxcpp является платным за использование. по умолчанию операторы предполагают, что все вызовы поступают из одного потока. любой оператор, который получает вызовы из нескольких потоков, должен получить поточно-ориентированную координацию, которую оператор использует для навязывания потокобезопасного управления состоянием и выходных вызовов.

При желании один и тот же экземпляр координатора может быть использован для обоих.

Другие вопросы по тегам