неправильный результат в Apache flink full external join
У меня есть 2 потока данных, которые были созданы из 2 таблиц, например:
Table orderRes1 = ste.sqlQuery(
"SELECT orderId, userId, SUM(bidPrice) as q FROM " + tble +
" Group by orderId, userId");
Table orderRes2 = ste.sqlQuery(
"SELECT orderId, userId, SUM(askPrice) as q FROM " + tble +
" Group by orderId, userId");
DataStream<Tuple2<Boolean, Row>> ds1 = ste.toRetractStream(orderRes1 , Row.class).
filter(order-> order.f0);
DataStream<Tuple2<Boolean, Row>> ds2 = ste.toRetractStream(orderRes2 , Row.class).
filter(order-> order.f0);
Мне интересно выполнить полное внешнее соединение в этих двух потоках, и я использовал как orderRes1.fullOuterJoin(orderRes2, $(exp)), так и sql-запрос, содержащий подход полного внешнего соединения, как показано ниже:
Table bidOrdr = ste.fromDataStream(bidTuple, $("orderId"),
$("userId"), $("price"));
Table askOrdr = ste.fromDataStream(askTuple, $("orderId"),
$("userId"), $("price"));
Table result = ste.sqlQuery(
"SELECT COALESCE(bidTbl.orderId,askTbl.orderId) , " +
" COALESCE(bidTbl.userId,askTbl.orderId)," +
" COALESCE(bidTbl.bidTotalPrice,0) as bidTotalPrice, " +
" COALESCE(askTbl.askTotalPrice,0) as askTotalPrice, " +
" FROM " +
" (SELECT orderId, userId," +
" SUM(price) AS bidTotalPrice " +
" FROM " + bidOrdr +
" Group by orderId, userId) bidTbl full outer JOIN " +
" (SELECT orderId, userId," +
" SUM(price) AS askTotalPrice" +
" FROM " + askOrdr +
" Group by orderId, userId) askTbl " +
" ON (bidTbl.orderId = askTbl.orderId" +
" AND bidTbl.userId= askTbl.userId) ") ;
DataStream<Tuple2<Boolean, Row>> = ste.toRetractStream(result, Row.class).filter(order -> order.f0);
Однако результат в некоторых случаях неверен: представьте, что пользователь A продает по цене B 3 раза, после этого пользователь B продает A 2 раза, во второй раз результат:
7> (правда, 123, a, 300.0,0.0)
7> (правда, 123, a, 300.0,200.0)
10> (правда, 123, b, 0,0,300,0)
10> (правда, 123, b, 200.0,300.0)
вторая и четвертая строки являются ожидаемым результатом потока, но он также сгенерирует 1-ю и 3-ю строки. Стоит упомянуть, что coGroup - другое решение, но я не хочу использовать оконное управление в этом сценарии, а не оконное решение доступно только в ограниченных потоках (DataSet).
Подсказка: orderId и userId будут повторяться в обоих потоках, и я хочу создать 2 строки в каждом действии, содержащее: orderId, userId1, bidTotalPrice, askTotalPrice И orderId, userId2, bidTotalPrice, askTotalPrice
1 ответ
Нечто подобное следует ожидать от потоковых запросов (или, другими словами, от запросов, выполняемых в динамических таблицах). В отличие от традиционной базы данных, где входные отношения к запросу остаются статичными во время выполнения запроса, входные данные для потокового запроса постоянно обновляются, поэтому результат также должен постоянно обновляться.
Если я понимаю настройку здесь, «неправильные» результаты в строках 1 и 3 верны до тех пор, пока соответствующие строки из
orderRes2
обрабатываются. Если эти строки не поступят, строки 1 и 3 останутся правильными.
Что вы должны ожидать, так это в конечном итоге правильный результат, включая, при необходимости, отзыв. Вы можете уменьшить количество промежуточных результатов, включив мини-пакетную агрегацию.
Эта ветка списка рассылки дает больше информации. Если я неправильно понял вашу ситуацию, приведите воспроизводимый пример, иллюстрирующий проблему.