FLINK SQL JOIN с НЕ СУЩЕСТВУЮЩИМ ключевым словом и окном HOP

Я пытаюсь реализовать этот вариант использования с помощью flink sql.

Пользователи, которые щелкнули товар 2 раза (ItemBuyPopOpen), но не купили его (ItemBuySuccess) в течение 24 часов

Первый подход не сработал из-за следующего исключения.Caused by: org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before

Второй подход - интервальное соединение. Но я думаю, что этот подход не подходит для моего случая использования. Потому что я хочу выдавать результат каждые 5 минут с интервалом в 1 день. Также НЕ СУЩЕСТВУЕТ решение с интервалом в 1 день. Я считаю неправильным решением. Потому что, когда я запускаю событие ItemBuyPopOpen 2 раза, если в этот момент нет данных ItemBuySuccess, он выдаст результат.

Третий подход может быть действителен из планировщика таблиц Flink, но график заданий неверен.

Какое истинное решение этого варианта использования?

Подход 1

streamTableEnvironment.createTemporaryView("ItemBuyPopupOpen", popupOpenStream, $("timeStamp").rowtime(), $("token"), $("itemId"));
streamTableEnvironment.createTemporaryView("ItemBuySuccess", successStream, $("timeStamp").rowtime(), $("token"), $("itemId"));
        Table tableR = streamTableEnvironment.
                sqlQuery(
                        "SELECT po.token, po.itemId, count(po.itemId) as cnt, HOP_END(po.`timeStamp`, INTERVAL '5' MINUTE , INTERVAL '1' DAY) "  +
                                " FROM ItemBuyPopupOpen po  LEFT JOIN  ItemBuySuccess s  ON po.token = s.token " +
                                " WHERE s.token is null AND po.itemId IN ( 517, 6016, 26754, 26573, 29302, 24883, 26015, 25373, 20543, 16132 ) " +
                                " GROUP BY po.token, po.itemId, HOP(po.`timeStamp`, INTERVAL '5' MINUTE , INTERVAL '1' DAY) " +
                                " HAVING COUNT(po.itemId) = 2 "
                );


Подход 2

        Table tableR = streamTableEnvironment.
                sqlQuery(
                        "SELECT po.token, po.itemId, count(po.itemId) as poCnt" +
                                " FROM ItemBuyPopupOpen po " +
                                " WHERE po.itemId IN ( 517, 6016, 26754, 26573, 29302, 24883, 26015, 25373, 20543, 16132  ) AND NOT EXISTS ( " +
                                "     SELECT s.token, s.itemId " +
                                "     FROM ItemBuySuccess  s " +
                                "     WHERE s.token = po.token AND s.itemId = po.itemId AND po.`timeStamp` BETWEEN s.`timeStamp` - INTERVAL '24' HOUR AND s.`timeStamp` " +
                                " ) GROUP BY po.token, po.itemId having count(po.itemId) = 2 "
                );

Подход 3

       Table tableR = streamTableEnvironment.
                sqlQuery(
                        "SELECT po.token, po.itemId, count(po.itemId) as cnt " +
                                " FROM ItemBuyPopupOpen po  LEFT JOIN  ItemBuySuccess s  ON po.token = s.token " +
                                " WHERE s.token is null AND po.itemId IN ( 517, 6016, 26754, 26573, 29302, 24883, 26015, 25373, 20543, 16132 ) " +
                                " AND po.`timeStamp` BETWEEN s.`timeStamp` - INTERVAL '24' HOUR AND s.`timeStamp` " +
                                " GROUP BY po.token, po.itemId " +
                                " HAVING COUNT(po.itemId) = 2 "
                );

0 ответов

Другие вопросы по тегам