Как связать две функции, возвращающие наблюдаемые?

Я написал одну функцию, которая возвращает пару QDateTime как наблюдаемый, как этот:

rxcpp::observable<std::tuple<QDateTime, QDateTime>> experimentOne(const QDateTimeAxis * const axis 
{
   return rxcpp::observable<>::create<std::tuple<QDateTime, QDateTime>>(
     [axis](rxcpp::subscriber<std::tuple<QDateTime, QDateTime>> s) {

       auto rangeCallback = [s](QDateTime minv, QDateTime maxv) {

          if (s.is_subscribed()) {

              // send to the subscriber
              s.on_next(std::make_tuple<QDateTime, QDateTime>(std::move(minv), std::move(maxv)));
          }

       };

       QObject::connect(axis, &QDateTimeAxis::rangeChanged, rangeCallback);
   }); 
}

Таким образом, с помощью этой функции я могу подписаться на изменение диапазона дат на оси QChart,

Я написал также другую функцию, которая, учитывая две даты, возвращает наблюдаемую со значениями из базы данных sqlite, как показано ниже

rxcpp::observable<std::tuple<double, double>> Database::getValueRange(const std::string& table, unsigned long start, unsigned long end)
{

   return rxcpp::observable<>::create<std::tuple<double, double>>(
      [this, table, start, end](rxcpp::subscriber<std::tuple<double, double>> s) {

    // get the prepared statement for the query 1, i.e. ohlcv values
    // within a date range
    sqlite3_stmt *stmt = this->m_query3_stms[table].get();

    // bind first parameter, the start timestamp
    int rc = sqlite3_bind_int64(stmt, 1, start);
    checkSqliteCode(rc, m_db.get());

    // bind the second parameter, the end timestamp
    rc = sqlite3_bind_int64(stmt, 2, end);
    checkSqliteCode(rc, m_db.get());

    // step through the query results
    while ( sqlite3_step(stmt)==SQLITE_ROW && s.is_subscribed() ) {

        // extract name values from the current result row
        float minv = sqlite3_column_double(stmt, 0);
        float maxv = sqlite3_column_double(stmt, 1);

        // send to the subscriber
        s.on_next(std::make_tuple<double, double>(minv, maxv));
    }

    // reset the statement for reuse
    sqlite3_reset(stmt);

    // send complete to the subscriber
    s.on_completed();

   });
}

Как я могу передать значения из первых функций (две даты) в качестве входных данных для второй функции в идиоматической форме в RxCpp? В некотором смысле в конце конвейера я могу подписаться на значения, поступающие из БД на основе входных дат?

1 ответ

Решение

Канонический способ создания нового диапазона значений для каждой новой пары значений дат состоит в использовании карты, за которой следует один из операторов выравнивания

auto valueRanges = experimentOne(/*params*/).
    map(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2){
      return getValueRange(/*params*/).
          map(rxcpp::util::apply_to([=](double db1, double db2){ 
              return std::make_tuple(d1, d2, db1, db2); 
          }));
    })).
    /* switch_on_next() or merge() or concat() */
    /* this will produce std::tuple< QDateTime, QDateTime, double, double>
  • switch_on_next отменит предыдущий диапазон значений и начнет новый диапазон значений.
  • merge произведет все диапазоны значений как можно скорее.
  • concat будет производить диапазоны значений по одному, по порядку.

в случае если диапазоны значений выполняются в разных потоках, необходимо передать потокобезопасную координацию merge так что диапазоны значений чередуются безопасно.

Чтобы выбрать определенный диапазон, используйте filter(), Если вы хотите разделить диапазоны на отдельные выражения, используйте publish() поделиться диапазонами в первую очередь.

auto hotValueRanges = valueRanges.
    publish().ref_count();

auto aDateRange = hotValueRanges.
    filter(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2, double, double){
        return isADesiredDate(d1, d2);
    })).
    subscribe(/*use the range*/);

auto anotherDateRange = hotValueRanges.
    filter(rxcpp::util::apply_to([](QDateTime d1, QDateTime d2, double, double){
        return isAnotherDesiredDate(d1, d2);
    })).
    subscribe(/*use the range*/);
Другие вопросы по тегам