Модульное тестирование Flink SQL: как назначить водяной знак?
Я пишу модульный тест для SQL-оператора Flink, который использует match_recognize. Я настраиваю тестовые данные вот так
Table data = tEnv.fromValues(DataTypes.ROW(
DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("foobar", DataTypes.STRING()),
....
),
row(...),
row(...)
);
У меня два вопроса,
- Как мне указать event_time как поле для водяных знаков? (с указанием времени строки)
- Менее важно, дать создаваемой таблице содержательное имя?
ВЕРСИЯ FLINK: 1.11
1 ответ
Вы столкнулись с текущим ограничением API таблиц: невозможно определить водяные знаки и атрибуты времени строки в сочетании с
forValues
метод; нужен коннектор. Есть несколько способов обойти это:
1. Используйте
csv
соединитель, который вы складываете со своим
VALUES
, как показано в этом примере.
2. Используйте встроенный коннектор DataGen. Поскольку вы составляете модульный тест для CEP, я полагаю, что вам нужна определенная степень контроля над генерируемыми данными, поэтому, вероятно, это не жизнеспособный вариант. В любом случае, я думал об этом упомянуть.
Примечание. Использование синтаксиса SQL DDL - рекомендуемый способ создания таблиц из Flink 1.10. Это упростит обе задачи, которые вы пытаетесь сделать (например, определение водяного знака и присвоение имени вашей таблице):
tEnv.executeSql("CREATE TABLE table_name (\n" +
" event_time TIMESTAMP(3),\n" +
" foobar STRING \n" +
" WATERMARK FOR event_time AS event_time\n" +
") WITH (...)"
);
Table data = tEnv.from("table_name");
Водяной знак объявлен как вычисляемый столбец, и вы можете использовать несколько стратегий водяных знаков. Пожалуйста, проверьте эту страницу документации для получения более подробной информации.