Как присвоить уникальный идентификатор каждой строке в таблице в API таблиц Flink?

Я использую Flink для вычисления серии операций. Каждая операция создает таблицу, которая используется для следующей операции, а также сохраняется в S3. Это позволяет просматривать данные на каждом промежуточном этапе расчета и видеть эффект каждой операции.

Мне нужно назначить уникальный идентификатор каждой строке в каждой таблице, чтобы, когда этот идентификатор снова появится на следующем шаге (возможно, в другом столбце), я знал, что две строки связаны друг с другом.

Первым очевидным кандидатом на это кажется ROW_NUMBER() функция, но:

  1. Кажется, этого нет в API табличных выражений. Мне нужно создавать строки SQL?

  2. Как мне его использовать? Когда я пробую этот запрос:

    SELECT *, ROW_NUMBER() OVER (ORDER BY f0) AS rn FROM inp

    Я получаю такую ​​ошибку:

    org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.

  3. Всегда ли требуется сортировка таблицы? Это похоже на накладные расходы, которых я бы предпочел избежать.

Следующим вариантом было просто сгенерировать случайный UUID для каждой строки. Но когда я пытаюсь это сделать, один и тот же UUID никогда не используется дважды, поэтому это совершенно бесполезно. Вот пример:

      import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Sandbox {
  def main(args: Array[String]): Unit = {

    val env = StreamTableEnvironment.create(
      StreamExecutionEnvironment.getExecutionEnvironment
    )

    val inp = env.fromValues(1.as("id"))
    val out1 = inp.addColumns(uuid().as("u"))
    val out2 = out1.addColumns($"u".as("u2"))

    env.executeSql("""
      CREATE TABLE out1 ( id INTEGER, u VARCHAR(36) )
      WITH ('connector' = 'print')
    """)

    env.executeSql("""
      CREATE TABLE out2 ( id INTEGER, u VARCHAR(36), u2 VARCHAR(36) )
      WITH ('connector' = 'print')
    """)

    env.createStatementSet()
      .addInsert("out1", out1)
      .addInsert("out2", out2)
      .execute()

    // Equivalent to the createStatementSet method:
    out1.executeInsert("out1")
    out2.executeInsert("out2")
  }
}

На выходе я получаю:

      [info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,55da264d-1e15-4c40-94d4-822e1cd5db9c,c9a78f93-580c-456d-9883-08bc998124ed)

Мне нужно, чтобы UUID снова появился в обоих столбцах, например:

      [info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d,4e6008ad-868a-4f95-88b0-38ee7969067d)

Полагаю, это связано с этой заметкой в документации:

Эта функция не является детерминированной, что означает, что значение будет пересчитываться для каждой записи.

Как я могу вычислить UUID только один раз и сделать его "конкретным", чтобы одно и то же значение отправлялось обоим out1 и out2?

Я получаю аналогичный результат с пользовательской функцией:

          class uuidUdf extends ScalarFunction {
      def eval(): String = UUID.randomUUID().toString
    }

    val out1 = inp.addColumns(call(new uuidUdf()).as("u"))

0 ответов

Другие вопросы по тегам