Луч Apache: агрегация SQL не дает результатов для неограниченного / ограниченного соединения

Я работаю над конвейером лучей apache для запуска функции агрегации SQL. Ссылка: https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslJoinTest.java#L159. Пример здесь работает нормально. Однако, когда я заменяю источник фактическим неограниченным источником и выполняю агрегирование, я не вижу результатов. Шаги в моем конвейере:

  1. Чтение ограниченных данных из источника и преобразование в набор строк.
  2. Чтение неограниченных данных JSON из источника веб-сокета.
  3. Присвойте метку времени каждому исходному потоку через DoFn.
  4. Преобразовать неограниченный JSON в неограниченную коллекцию строк
  5. Применить окно к коллекции строк
  6. Применить оператор SQL.
  7. Выведите результат sql.

Обычный оператор SQL выполняется и выводит результаты. Тем не менее, когда я использую группу в SQL, нет вывода.

SELECT 
  o1.detectedCount,
  o1.sensor se,
  o2.sensor sa
FROM SENSOR o1 
  LEFT JOIN AREA o2 
  on o1.sensor = o2.sensor

Результаты непрерывны и как показано ниже.

2019-07-19 20:43:11 INFO ConsoleSink:27 - {
                                           "detectedCount":0,
                                           "se":"3a002f000647363432323230",
                                           "sa":"3a002f000647363432323230"
                                          }

2019-07-19 20:43:11 INFO ConsoleSink:27 - {
                                           "detectedCount":1,
                                           "se":"3a002f000647363432323230",
                                           "sa":"3a002f000647363432323230"
                                          }

2019-07-19 20:43:11 INFO ConsoleSink:27 - {
                                           "detectedCount":0,            
                                           "se":"3a002f000647363432323230",
                                           "sa":"3a002f000647363432323230"
                                          }

Результаты не отображаются вообще, когда я изменяю sql на

SELECT
  COUNT(o1.detectedCount) o2.sensor sa
FROM SENSOR o1
  LEFT JOIN AREA o2
  on o1.sensor = o2.sensor
GROUP BY o2.sensor

Есть ли что-то, что я делаю неправильно в этой реализации. Любые указатели были бы действительно полезны.

2 ответа

ВЫБЕРИТЕ COUNT(o1.detectedCount) как число,o2.sensor,sa ОТ ДАТЧИКА o1 СЛЕДУЮЩАЯ ОБЪЕДИНЕННАЯ ОБЛАСТЬ o2 на o1.sensor = o2.sensor GROUP BY, sa,o1.sensor,o2.sensor

Некоторые предложения появляются при чтении вашего кода:

  1. Расширьте окно, чтобы разрешить опоздание и выдать ранее поступившие данные.
 .apply("windowing", Window.<Row>into(FixedWindows.of(Duration.standardSeconds(2)))
                            .triggering(AfterWatermark.pastEndOfWindow()
                                    .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
                                            .plusDelayOf(Duration.standardSeconds(1)))
                                    .withLateFirings(AfterProcessingTime.pastFirstElementInPane()
                                            .plusDelayOf(Duration.standardSeconds(2))))
                            .withAllowedLateness(Duration.standardMinutes(10))
                            .discardingFiredPanes());

  1. Попробуйте удалить join и проверьте, если без него у вас есть выход в окно,

  2. Попробуйте добавить больше времени в окно. потому что иногда слишком мало, чтобы перетасовать данные между рабочими. и присоединенные потоки не излучаются одновременно.

  3. outputWithTimestamp выведет строки в другой временной отметке, а затем они могут быть отброшены, если вы не допустите опоздание. Прочитайте документы для outputWithTimestampэтот API немного рискован.

Если у элементов input {@link PCollection} есть временные метки, выходная временная метка для каждого элемента не должна быть раньше временной метки входного элемента минус значение {@link getAllowedTimestampSkew()}. Если выходная временная метка раньше этого времени, преобразование при выполнении выдает {@link IllegalArgumentException}. Используйте {@link withAllowedTimestampSkew(Duration)}, чтобы обновить разрешенный перекос.

ПРЕДУПРЕЖДЕНИЕ. Использование {@link #withAllowedTimestampSkew(Duration)} разрешает отправку элементов за водяным знаком. Эти элементы считаются запоздалыми, и если за {@link Window#withAllowedLateness(Duration) разрешено опоздание} нисходящего потока {@link PCollection}, можно молча отбросить.

Другие вопросы по тегам