Луч SQL - SqlValidatorException: объект 'PCOLLECTION' не найден

Я делаю некоторые эксперименты с Beam SQL. Я получаю PCollection<Row> из преобразования SampleSource и передать свой вывод SqlTransform,

String sql1 = "select c1, c2, c3 from PCOLLECTION where c1 > 1";

Код ниже работает без ошибок.

POutput it = p.apply(new SampleSource()).apply(SqlTransform.query(sql1));
p.run().waitUntilFinish();

Однако, когда я пытаюсь выполнить следующие строки кода, я получаю ошибку во время выполнения.

POutput it = p.apply(new SampleSource());
it.getPipeline().apply(SqlTransform.query(sql1));
p.run().waitUntilFinish();

Детали ошибки

Caused by: org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.validate.SqlValidatorException: Object 'PCOLLECTION' not found

Пожалуйста, предоставьте несколько указателей.

1 ответ

Это не работает, потому что вы применяете SqlTransform к трубопроводу, а не PCollection,

Вы, вероятно, хотите изменить его следующим образом:


// source probably returns a PCollection,
// would make sense to change 'it' to PCollection:
PCollection<...> it = p.apply(new SampleSource());

// then apply SqlTransform to the PCollection from the previous step,
// that is apply it directly to 'it':
it.apply(SqlTransform.query(sql1));

...

Как работает Балочный трубопровод с точки зрения высокого уровня:

  • создать конвейер;
  • применить IO PTransform который читает из какого-то источника и производит PColelction некоторых элементов, которые он читает из источника;
  • цепочка-применить больше PTransforms к PCollection из предыдущего шага для обработки данных (концептуально, разные PCollections будет производиться на каждом этапе);
  • повторение;

SqlTransform это нормально PTransformожидается, что он будет применен к PCollection элементов и вывод другого PCollection в следствии. Запрос, который вы указываете в SqlTransform.create() применяется к PCollection, Ожидается, что данные поступят из магического PCOLLECTION таблица, которая представляет PCollection что вы применяете SqlTransform к.

То, что вы делаете в вашем примере, отличается:

  • создать конвейер;
  • применить источник PTransform который производит POutput не обязательно PCollection;
  • тогда вы игнорируете вывод, если ваш источник, но вместо этого берете оригинальный конвейер и применяете SqlTransform прямо к нему;

Так что же происходит, что SqlTransform в этом случае применяется к "корню" конвейера, а не к PCollection это исходит из источника. Вместо цепочки PTransforms применяется один за другим, теперь у вас есть два PTransforms наносится на корень независимо друг от друга.

Еще одна оговорка заключается в том, что SqlTransform ожидает, что входные элементы будут Rowsпотому что SQL как язык работает только с данными, представленными в виде строк. Есть два способа добиться этого:

  • вручную преобразовать элементы, созданные источником Rows применяя другой ParDo между источником и SqlTransform;
  • использовать Бим Schema рамки (например, проверить PCollection.setSchema() метод), который позволяет Beam SQL автоматически преобразовывать входные элементы в Rows;
Другие вопросы по тегам