Планирование и обработка тайм-аута с помощью rxcpp

Я новичок в использовании rxcpp и пытаюсь собрать что-то функциональное вместе в следующем сценарии:

У меня есть один источник данных, который будет извлекать команды из отдельного источника, код, который я пишу, будет извлекать эти команды в наблюдаемый rxcpp. Особое условие заключается в том, что если в течение определенного периода времени не было получено ни одной команды, функция onError подписчиков будет запущена вместо onNext, но тайм-аут может произойти только до получения первой команды. После получения первой команды тайм-аут не может произойти, независимо от того, сколько времени потребуется для получения каких-либо дальнейших команд.

Я пытаюсь сделать это примерно так:

auto timeout = rxcpp::observable<>::timer(std::chrono::steady_clock::now() + timeout,
                             rxcpp::observe_on_event_loop()).map([](int val) // Note, converts the value type of the timer observable and converts timeouts to errors
{
    std::cout << "TIMED OUT!" << std::endl;
    throw std::runtime_error("timeout");
    return command_type();
});
auto commands = timeout.amb(rxcpp::observe_on_event_loop(), createCommandSource(event_loop_scheduler, ...));

У меня проблема в том, что тайм-аут происходит до того, как какие-либо команды будут получены, даже если они вставляются задолго до того, как истечет тайм-аут. Я экспериментировал с тайм-аутами от 1000 мс до 5000 мс, и это не имеет значения. Если я удалю код тайм-аута, команда получится немедленно, однако. Я подозреваю, что, скорее всего, я просто неправильно понял, как использовать планировщики в rxcpp, поэтому мне интересно, как это можно сделать.

1 ответ

Решение

Я написал простой createCommandSource. Это сработало для меня:

#include "rxcpp/rx.hpp"
using namespace rxcpp;
using namespace rxcpp::sources;
using namespace rxcpp::util;

using namespace std;

struct command_type {};

int main()
{
    auto eventloop = rxcpp::observe_on_event_loop();
    auto createCommandSource = [=]() {
        return rxcpp::observable<>::interval(std::chrono::seconds(1), eventloop).map([](long) {return command_type(); });
    };
    auto timeout = rxcpp::observable<>::timer(eventloop.now() + std::chrono::seconds(2), eventloop).map([](long ) // Note, converts the value type of the timer observable and converts timeouts to errors
    {
        std::cout << "TIMED OUT!" << std::endl;
        throw std::runtime_error("timeout");
        return command_type();
    });
    auto commands = timeout.amb(eventloop, createCommandSource().take(5));

    commands
        .as_blocking().subscribe(
        [](command_type) {printf("command\n"); },
        [](std::exception_ptr) {printf("execption\n"); });

    std::this_thread::sleep_for(std::chrono::seconds(2));

    return 0;
}

Теперь для управления тайм-аутами вы можете использовать оператор.timeout(), предоставляя в качестве параметра длительность интервала:

createCommandSource(event_loop_scheduler, ...).timeout((std::chrono::seconds(2));

Насколько я понимаю, этот оператор обеспечивает время ожидания между созданием потока и первой командой, любой парой команд и между последней командой и событием on_complete() потока команд.

Другие вопросы по тегам