Как присвоить уникальный идентификатор каждой строке в таблице в API таблиц Flink?
Я использую Flink для вычисления серии операций. Каждая операция создает таблицу, которая используется для следующей операции, а также сохраняется в S3. Это позволяет просматривать данные на каждом промежуточном этапе расчета и видеть эффект каждой операции.
Мне нужно назначить уникальный идентификатор каждой строке в каждой таблице, чтобы, когда этот идентификатор снова появится на следующем шаге (возможно, в другом столбце), я знал, что две строки связаны друг с другом.
Первым очевидным кандидатом на это кажется
ROW_NUMBER()
функция, но:
Кажется, этого нет в API табличных выражений. Мне нужно создавать строки SQL?
Как мне его использовать? Когда я пробую этот запрос:
SELECT *, ROW_NUMBER() OVER (ORDER BY f0) AS rn FROM inp
Я получаю такую ошибку:
org.apache.flink.table.api.ValidationException: Over Agg: The window rank function without order by. please re-check the over window statement.
Всегда ли требуется сортировка таблицы? Это похоже на накладные расходы, которых я бы предпочел избежать.
Следующим вариантом было просто сгенерировать случайный UUID для каждой строки. Но когда я пытаюсь это сделать, один и тот же UUID никогда не используется дважды, поэтому это совершенно бесполезно. Вот пример:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Sandbox {
def main(args: Array[String]): Unit = {
val env = StreamTableEnvironment.create(
StreamExecutionEnvironment.getExecutionEnvironment
)
val inp = env.fromValues(1.as("id"))
val out1 = inp.addColumns(uuid().as("u"))
val out2 = out1.addColumns($"u".as("u2"))
env.executeSql("""
CREATE TABLE out1 ( id INTEGER, u VARCHAR(36) )
WITH ('connector' = 'print')
""")
env.executeSql("""
CREATE TABLE out2 ( id INTEGER, u VARCHAR(36), u2 VARCHAR(36) )
WITH ('connector' = 'print')
""")
env.createStatementSet()
.addInsert("out1", out1)
.addInsert("out2", out2)
.execute()
// Equivalent to the createStatementSet method:
out1.executeInsert("out1")
out2.executeInsert("out2")
}
}
На выходе я получаю:
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,55da264d-1e15-4c40-94d4-822e1cd5db9c,c9a78f93-580c-456d-9883-08bc998124ed)
Мне нужно, чтобы UUID снова появился в обоих столбцах, например:
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d)
[info] +I(1,4e6008ad-868a-4f95-88b0-38ee7969067d,4e6008ad-868a-4f95-88b0-38ee7969067d)
Полагаю, это связано с этой заметкой в документации:
Эта функция не является детерминированной, что означает, что значение будет пересчитываться для каждой записи.
Как я могу вычислить UUID только один раз и сделать его "конкретным", чтобы одно и то же значение отправлялось обоим
out1
и
out2
?
Я получаю аналогичный результат с пользовательской функцией:
class uuidUdf extends ScalarFunction {
def eval(): String = UUID.randomUUID().toString
}
val out1 = inp.addColumns(call(new uuidUdf()).as("u"))