неправильный результат в Apache flink full external join

У меня есть 2 потока данных, которые были созданы из 2 таблиц, например:

      Table orderRes1 = ste.sqlQuery(
               "SELECT orderId, userId, SUM(bidPrice) as q FROM " + tble +
                       " Group by orderId, userId");

Table orderRes2 = ste.sqlQuery(
               "SELECT orderId, userId, SUM(askPrice) as q FROM " + tble +
                       " Group by orderId, userId");

DataStream<Tuple2<Boolean, Row>> ds1 = ste.toRetractStream(orderRes1 , Row.class).
               filter(order-> order.f0);

DataStream<Tuple2<Boolean, Row>> ds2 = ste.toRetractStream(orderRes2 , Row.class).
               filter(order-> order.f0);

Мне интересно выполнить полное внешнее соединение в этих двух потоках, и я использовал как orderRes1.fullOuterJoin(orderRes2, $(exp)), так и sql-запрос, содержащий подход полного внешнего соединения, как показано ниже:

        Table bidOrdr = ste.fromDataStream(bidTuple, $("orderId"),
                $("userId"), $("price"));
  
  Table askOrdr = ste.fromDataStream(askTuple, $("orderId"),
                $("userId"), $("price"));

 Table result = ste.sqlQuery(
                "SELECT COALESCE(bidTbl.orderId,askTbl.orderId) , " +
                        " COALESCE(bidTbl.userId,askTbl.orderId)," +
                        " COALESCE(bidTbl.bidTotalPrice,0) as bidTotalPrice, " +
                        " COALESCE(askTbl.askTotalPrice,0) as askTotalPrice, " + 
                        " FROM " +
                        " (SELECT orderId, userId," +
                        " SUM(price) AS bidTotalPrice " +
                        " FROM " + bidOrdr +
                        " Group by orderId, userId) bidTbl full outer JOIN " +
                        " (SELECT orderId, userId," +
                        " SUM(price) AS askTotalPrice" +
                        " FROM " + askOrdr +
                        " Group by orderId, userId) askTbl " +
                        " ON (bidTbl.orderId = askTbl.orderId" +
                        " AND bidTbl.userId= askTbl.userId) ") ;

 DataStream<Tuple2<Boolean, Row>> =  ste.toRetractStream(result, Row.class).filter(order -> order.f0);

Однако результат в некоторых случаях неверен: представьте, что пользователь A продает по цене B 3 раза, после этого пользователь B продает A 2 раза, во второй раз результат:

7> (правда, 123, a, 300.0,0.0)

7> (правда, 123, a, 300.0,200.0)

10> (правда, 123, b, 0,0,300,0)

10> (правда, 123, b, 200.0,300.0)

вторая и четвертая строки являются ожидаемым результатом потока, но он также сгенерирует 1-ю и 3-ю строки. Стоит упомянуть, что coGroup - другое решение, но я не хочу использовать оконное управление в этом сценарии, а не оконное решение доступно только в ограниченных потоках (DataSet).

Подсказка: orderId и userId будут повторяться в обоих потоках, и я хочу создать 2 строки в каждом действии, содержащее: orderId, userId1, bidTotalPrice, askTotalPrice И orderId, userId2, bidTotalPrice, askTotalPrice

1 ответ

Нечто подобное следует ожидать от потоковых запросов (или, другими словами, от запросов, выполняемых в динамических таблицах). В отличие от традиционной базы данных, где входные отношения к запросу остаются статичными во время выполнения запроса, входные данные для потокового запроса постоянно обновляются, поэтому результат также должен постоянно обновляться.

Если я понимаю настройку здесь, «неправильные» результаты в строках 1 и 3 верны до тех пор, пока соответствующие строки из orderRes2обрабатываются. Если эти строки не поступят, строки 1 и 3 останутся правильными.

Что вы должны ожидать, так это в конечном итоге правильный результат, включая, при необходимости, отзыв. Вы можете уменьшить количество промежуточных результатов, включив мини-пакетную агрегацию.

Эта ветка списка рассылки дает больше информации. Если я неправильно понял вашу ситуацию, приведите воспроизводимый пример, иллюстрирующий проблему.

Другие вопросы по тегам