Flink SQL: недостаточно памяти для объединения таблиц
Я часто обновляю таблицу MySql. Я хочу сделать снимок для каждого идентификатора, который обновляется за последние 20 секунд, и записать значение в redis. Я использую binlog в качестве потокового ввода и преобразовываю поток данных в таблицу Flink. Я запускаю следующий sql.
SELECT id, ts, val
FROM my_tbl
WHERE (id, ts) IN
(
SELECT id, MAX(ts)
FROM my_tbl
GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id
)
Поскольку я знаю, что объединение таблиц приведет к чрезмерному размеру состояния, я установил StreamQueryConfig следующим образом
qConfig.withIdleStateRetentionTime(Time.seconds(600), Time.seconds(1200));
Я запускаю задачу в течение одного дня и выхожу из памяти. Как я могу решить эту проблему?
1 ответ
Это также можно решить с помощью соединения с временными окнами вместо обычного соединения с настроенным временем хранения в состоянии ожидания.
Следующий запрос должен помочь.
SELECT id, ts, val
FROM my_tbl m1,
(SELECT id, MAX(ts), TUMBLE_PROCTIME(proctime, INTERVAL '20' SECOND) as ptime
FROM my_tbl
GROUP BY TUMBLE(proctime, INTERVAL '20' SECOND), id) m2
WHERE m1.id = m2.id AND m1.ts = m2.ts ANS
m1.proctime BETWEEN m2.ptime - INTERVAL '25' SECOND AND m2.ptime
Оконечный предикат соединения (BETWEEN
) гарантирует, что состояние автоматически очищается. Поскольку вы используете не точное время обработки, я добавил 5 секунд.