Flink SQL: недостаточно памяти для объединения таблиц

Я часто обновляю таблицу MySql. Я хочу сделать снимок для каждого идентификатора, который обновляется за последние 20 секунд, и записать значение в redis. Я использую binlog в качестве потокового ввода и преобразовываю поток данных в таблицу Flink. Я запускаю следующий sql.

SELECT id, ts, val
FROM my_tbl
WHERE (id, ts) IN
(
   SELECT id, MAX(ts)
   FROM my_tbl
   GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id
)

Поскольку я знаю, что объединение таблиц приведет к чрезмерному размеру состояния, я установил StreamQueryConfig следующим образом

qConfig.withIdleStateRetentionTime(Time.seconds(600), Time.seconds(1200));

Я запускаю задачу в течение одного дня и выхожу из памяти. Как я могу решить эту проблему?

1 ответ

Это также можно решить с помощью соединения с временными окнами вместо обычного соединения с настроенным временем хранения в состоянии ожидания.

Следующий запрос должен помочь.

SELECT id, ts, val
FROM my_tbl m1,
     (SELECT id, MAX(ts), TUMBLE_PROCTIME(proctime, INTERVAL '20' SECOND) as ptime
      FROM my_tbl
      GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id) m2
WHERE m1.id = m2.id AND m1.ts = m2.ts ANS
      m1.proctime BETWEEN m2.ptime - INTERVAL '25' SECOND AND m2.ptime

Оконечный предикат соединения (BETWEEN) гарантирует, что состояние автоматически очищается. Поскольку вы используете не точное время обработки, я добавил 5 секунд.

Другие вопросы по тегам