FLINK SQL JOIN с НЕ СУЩЕСТВУЮЩИМ ключевым словом и окном HOP
Я пытаюсь реализовать этот вариант использования с помощью flink sql.
Пользователи, которые щелкнули товар 2 раза (ItemBuyPopOpen), но не купили его (ItemBuySuccess) в течение 24 часов
Первый подход не сработал из-за следующего исключения.Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before
Второй подход - интервальное соединение. Но я думаю, что этот подход не подходит для моего случая использования. Потому что я хочу выдавать результат каждые 5 минут с интервалом в 1 день. Также НЕ СУЩЕСТВУЕТ решение с интервалом в 1 день. Я считаю неправильным решением. Потому что, когда я запускаю событие ItemBuyPopOpen 2 раза, если в этот момент нет данных ItemBuySuccess, он выдаст результат.
Третий подход может быть действителен из планировщика таблиц Flink, но график заданий неверен.
Какое истинное решение этого варианта использования?
Подход 1
streamTableEnvironment.createTemporaryView("ItemBuyPopupOpen", popupOpenStream, $("timeStamp").rowtime(), $("token"), $("itemId"));
streamTableEnvironment.createTemporaryView("ItemBuySuccess", successStream, $("timeStamp").rowtime(), $("token"), $("itemId"));
Table tableR = streamTableEnvironment.
sqlQuery(
"SELECT po.token, po.itemId, count(po.itemId) as cnt, HOP_END(po.`timeStamp`, INTERVAL '5' MINUTE , INTERVAL '1' DAY) " +
" FROM ItemBuyPopupOpen po LEFT JOIN ItemBuySuccess s ON po.token = s.token " +
" WHERE s.token is null AND po.itemId IN ( 517, 6016, 26754, 26573, 29302, 24883, 26015, 25373, 20543, 16132 ) " +
" GROUP BY po.token, po.itemId, HOP(po.`timeStamp`, INTERVAL '5' MINUTE , INTERVAL '1' DAY) " +
" HAVING COUNT(po.itemId) = 2 "
);
Подход 2
Table tableR = streamTableEnvironment.
sqlQuery(
"SELECT po.token, po.itemId, count(po.itemId) as poCnt" +
" FROM ItemBuyPopupOpen po " +
" WHERE po.itemId IN ( 517, 6016, 26754, 26573, 29302, 24883, 26015, 25373, 20543, 16132 ) AND NOT EXISTS ( " +
" SELECT s.token, s.itemId " +
" FROM ItemBuySuccess s " +
" WHERE s.token = po.token AND s.itemId = po.itemId AND po.`timeStamp` BETWEEN s.`timeStamp` - INTERVAL '24' HOUR AND s.`timeStamp` " +
" ) GROUP BY po.token, po.itemId having count(po.itemId) = 2 "
);
Подход 3
Table tableR = streamTableEnvironment.
sqlQuery(
"SELECT po.token, po.itemId, count(po.itemId) as cnt " +
" FROM ItemBuyPopupOpen po LEFT JOIN ItemBuySuccess s ON po.token = s.token " +
" WHERE s.token is null AND po.itemId IN ( 517, 6016, 26754, 26573, 29302, 24883, 26015, 25373, 20543, 16132 ) " +
" AND po.`timeStamp` BETWEEN s.`timeStamp` - INTERVAL '24' HOUR AND s.`timeStamp` " +
" GROUP BY po.token, po.itemId " +
" HAVING COUNT(po.itemId) = 2 "
);