Луч Apache: агрегация SQL не дает результатов для неограниченного / ограниченного соединения
Я работаю над конвейером лучей apache для запуска функции агрегации SQL. Ссылка: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java#L159. Пример здесь работает нормально. Однако, когда я заменяю источник фактическим неограниченным источником и выполняю агрегирование, я не вижу результатов. Шаги в моем конвейере:
- Чтение ограниченных данных из источника и преобразование в набор строк.
- Чтение неограниченных данных JSON из источника веб-сокета.
- Присвойте метку времени каждому исходному потоку через DoFn.
- Преобразовать неограниченный JSON в неограниченную коллекцию строк
- Применить окно к коллекции строк
- Применить оператор SQL.
- Выведите результат sql.
Обычный оператор SQL выполняется и выводит результаты. Тем не менее, когда я использую группу в SQL, нет вывода.
SELECT
o1.detectedCount,
o1.sensor se,
o2.sensor sa
FROM SENSOR o1
LEFT JOIN AREA o2
on o1.sensor = o2.sensor
Результаты непрерывны и как показано ниже.
2019-07-19 20:43:11 INFO ConsoleSink:27 - {
"detectedCount":0,
"se":"3a002f000647363432323230",
"sa":"3a002f000647363432323230"
}
2019-07-19 20:43:11 INFO ConsoleSink:27 - {
"detectedCount":1,
"se":"3a002f000647363432323230",
"sa":"3a002f000647363432323230"
}
2019-07-19 20:43:11 INFO ConsoleSink:27 - {
"detectedCount":0,
"se":"3a002f000647363432323230",
"sa":"3a002f000647363432323230"
}
Результаты не отображаются вообще, когда я изменяю sql на
SELECT
COUNT(o1.detectedCount) o2.sensor sa
FROM SENSOR o1
LEFT JOIN AREA o2
on o1.sensor = o2.sensor
GROUP BY o2.sensor
Есть ли что-то, что я делаю неправильно в этой реализации. Любые указатели были бы действительно полезны.
2 ответа
ВЫБЕРИТЕ COUNT(o1.detectedCount) как число,o2.sensor,sa ОТ ДАТЧИКА o1 СЛЕДУЮЩАЯ ОБЪЕДИНЕННАЯ ОБЛАСТЬ o2 на o1.sensor = o2.sensor GROUP BY, sa,o1.sensor,o2.sensor
Некоторые предложения появляются при чтении вашего кода:
- Расширьте окно, чтобы разрешить опоздание и выдать ранее поступившие данные.
.apply("windowing", Window.<Row>into(FixedWindows.of(Duration.standardSeconds(2)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(2))))
.withAllowedLateness(Duration.standardMinutes(10))
.discardingFiredPanes());
Попробуйте удалить
join
и проверьте, если без него у вас есть выход в окно,Попробуйте добавить больше времени в окно. потому что иногда слишком мало, чтобы перетасовать данные между рабочими. и присоединенные потоки не излучаются одновременно.
outputWithTimestamp
выведет строки в другой временной отметке, а затем они могут быть отброшены, если вы не допустите опоздание. Прочитайте документы дляoutputWithTimestamp
этот API немного рискован.
Если у элементов input {@link PCollection} есть временные метки, выходная временная метка для каждого элемента не должна быть раньше временной метки входного элемента минус значение {@link getAllowedTimestampSkew()}. Если выходная временная метка раньше этого времени, преобразование при выполнении выдает {@link IllegalArgumentException}. Используйте {@link withAllowedTimestampSkew(Duration)}, чтобы обновить разрешенный перекос.
ПРЕДУПРЕЖДЕНИЕ. Использование {@link #withAllowedTimestampSkew(Duration)} разрешает отправку элементов за водяным знаком. Эти элементы считаются запоздалыми, и если за {@link Window#withAllowedLateness(Duration) разрешено опоздание} нисходящего потока {@link PCollection}, можно молча отбросить.