Луч SQL - SqlValidatorException: объект 'PCOLLECTION' не найден
Я делаю некоторые эксперименты с Beam SQL. Я получаю PCollection<Row>
из преобразования SampleSource
и передать свой вывод SqlTransform
,
String sql1 = "select c1, c2, c3 from PCOLLECTION where c1 > 1";
Код ниже работает без ошибок.
POutput it = p.apply(new SampleSource()).apply(SqlTransform.query(sql1));
p.run().waitUntilFinish();
Однако, когда я пытаюсь выполнить следующие строки кода, я получаю ошибку во время выполнения.
POutput it = p.apply(new SampleSource());
it.getPipeline().apply(SqlTransform.query(sql1));
p.run().waitUntilFinish();
Детали ошибки
Caused by: org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.validate.SqlValidatorException: Object 'PCOLLECTION' not found
Пожалуйста, предоставьте несколько указателей.
1 ответ
Это не работает, потому что вы применяете SqlTransform
к трубопроводу, а не PCollection
,
Вы, вероятно, хотите изменить его следующим образом:
// source probably returns a PCollection,
// would make sense to change 'it' to PCollection:
PCollection<...> it = p.apply(new SampleSource());
// then apply SqlTransform to the PCollection from the previous step,
// that is apply it directly to 'it':
it.apply(SqlTransform.query(sql1));
...
Как работает Балочный трубопровод с точки зрения высокого уровня:
- создать конвейер;
- применить IO
PTransform
который читает из какого-то источника и производитPColelction
некоторых элементов, которые он читает из источника; - цепочка-применить больше
PTransforms
кPCollection
из предыдущего шага для обработки данных (концептуально, разныеPCollections
будет производиться на каждом этапе); - повторение;
SqlTransform
это нормально PTransform
ожидается, что он будет применен к PCollection
элементов и вывод другого PCollection
в следствии. Запрос, который вы указываете в SqlTransform.create()
применяется к PCollection
, Ожидается, что данные поступят из магического PCOLLECTION
таблица, которая представляет PCollection
что вы применяете SqlTransform
к.
То, что вы делаете в вашем примере, отличается:
- создать конвейер;
- применить источник
PTransform
который производитPOutput
не обязательноPCollection
; - тогда вы игнорируете вывод, если ваш источник, но вместо этого берете оригинальный конвейер и применяете
SqlTransform
прямо к нему;
Так что же происходит, что SqlTransform
в этом случае применяется к "корню" конвейера, а не к PCollection
это исходит из источника. Вместо цепочки PTransforms
применяется один за другим, теперь у вас есть два PTransforms
наносится на корень независимо друг от друга.
Еще одна оговорка заключается в том, что SqlTransform
ожидает, что входные элементы будут Rows
потому что SQL как язык работает только с данными, представленными в виде строк. Есть два способа добиться этого:
- вручную преобразовать элементы, созданные источником
Rows
применяя другойParDo
между источником иSqlTransform
; - использовать Бим
Schema
рамки (например, проверитьPCollection.setSchema()
метод), который позволяет Beam SQL автоматически преобразовывать входные элементы вRows
;