Модульное тестирование Flink SQL: как назначить водяной знак?

Я пишу модульный тест для SQL-оператора Flink, который использует match_recognize. Я настраиваю тестовые данные вот так

Table data = tEnv.fromValues(DataTypes.ROW(
  DataTypes.FIELD("event_time", DataTypes.TIMESTAMP(3)),
  DataTypes.FIELD("foobar", DataTypes.STRING()),
  ....
  ),
  row(...),
  row(...)
);

У меня два вопроса,

  • Как мне указать event_time как поле для водяных знаков? (с указанием времени строки)
  • Менее важно, дать создаваемой таблице содержательное имя?

ВЕРСИЯ FLINK: 1.11

1 ответ

Решение

Вы столкнулись с текущим ограничением API таблиц: невозможно определить водяные знаки и атрибуты времени строки в сочетании с forValuesметод; нужен коннектор. Есть несколько способов обойти это:

1. Используйте csv соединитель, который вы складываете со своим VALUES, как показано в этом примере.

2. Используйте встроенный коннектор DataGen. Поскольку вы составляете модульный тест для CEP, я полагаю, что вам нужна определенная степень контроля над генерируемыми данными, поэтому, вероятно, это не жизнеспособный вариант. В любом случае, я думал об этом упомянуть.

Примечание. Использование синтаксиса SQL DDL - рекомендуемый способ создания таблиц из Flink 1.10. Это упростит обе задачи, которые вы пытаетесь сделать (например, определение водяного знака и присвоение имени вашей таблице):

tEnv.executeSql("CREATE TABLE table_name (\n" +
                "             event_time TIMESTAMP(3),\n" +
                "             foobar STRING \n" +
                "             WATERMARK FOR event_time AS event_time\n" +
                ") WITH (...)"
);

Table data = tEnv.from("table_name");

Водяной знак объявлен как вычисляемый столбец, и вы можете использовать несколько стратегий водяных знаков. Пожалуйста, проверьте эту страницу документации для получения более подробной информации.

Другие вопросы по тегам