Apache Flink - включить порядок объединения

Я заметил, что Apache Flink не оптимизирует порядок объединения таблиц. На данный момент он поддерживает указанный пользователем порядок соединения (в основном он принимает запрос буквально). Я полагаю, что Apache Calcite может оптимизировать порядок соединений, но по некоторым причинам эти правила не используются в Apache Flink.

Если, например, у нас есть две таблицы "R" и "S"

private val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
private val fileNumber = 1
tableEnv.registerTableSource("R", getDataSourceR(fileNumber))
tableEnv.registerTableSource("S", getDataSourceS(fileNumber))
private val r = tableEnv.scan("R")
private val s = tableEnv.scan("S")

и мы предполагаем, что 'S' пусто, и мы хотим объединить эти таблицы двумя способами:

val tableOne = r.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
        .join(s.as("x5, x6")).where("x4 === x5 ").select("x1, x6")


val tableTwo = s.as("x1, x2").join(r.as("x3, x4")).where("x2 === x3").select("x1, x4")
          .join(r.as("x5, x6")).where("x4 === x5 ").select("x1, x6")

Если мы хотим посчитать количество строк в tableOne и tableTwo, результат будет нулевым в обоих случаях. Проблема в том, что оценка tableOne займет гораздо больше времени, чем оценка tableTwo.

Есть ли способ, которым мы можем автоматически оптимизировать порядок выполнения объединения или даже включить возможную операцию плановой стоимости, добавив некоторую статистику? Как можно добавить эту статистику?

В документации по этой ссылке написано, что, возможно, необходимо изменить среду таблиц CalciteConfig, но мне не ясно, как это сделать.

Пожалуйста помоги.

1 ответ

Решение

Переупорядочение соединений не включено, поскольку Flink плохо обрабатывает статистику. Изменение порядка соединений без каких-либо точных оценок кардинальности - это в основном азартная игра. Поэтому переупорядочение соединений отключено, и таблицы объединяются в порядке, указанном пользователем. Это дает детерминистическое и контролируемое поведение.

Однако вы можете передать правила оптимизации в оптимизатор, передав TableConfig с CalciteConfig при создании TableEnvironmentт.е. TableEnvironment.getTableEnvironment (env, yourTableConfig). в CalciteConfig Вы можете добавить правила оптимизации к различным этапам оптимизации. Вы, вероятно, хотите добавить JoinCommunteRule а также JoinAssociateRule на этапе логической оптимизации. Вам, вероятно, также придется покопаться в коде, чтобы проверить, как передавать статистику в оптимизатор.

Другие вопросы по тегам