Разверните вложенные PCollection с помощью BeamSQL

Попробуйте использовать BeamSQL для удаления вложенного типа PCollection. Предположим, что PCollection, где есть сотрудники и его детали. Здесь детали находятся во вложенной коллекции. Так что, если мы используем BeamSQL как "SELECT PCOLLECTION.details FROM PCOLLECTION" затем получение вложенного типа деталей в виде массива в отдельной PCollection. Однако, когда я хочу получить конкретный столбец из коллекции вложенных типов в качестве подробностей, я получаю сообщение об ошибке, например, не могу найти имя столбца. Пробовал как BeamSQL (похоже на BigQuery SQL) "SELECT X.address FROM PCOLLECTION, Unnest(details) as X" затем получить исключение nullpointer. Используется версия 2.12.0 apache beam.

Оцените кого-то, пожалуйста, помогите в этом.

Ниже приведен пример данных вложенных значений Value (подробности имеют адрес электронной почты, телефонные столбцы. Поэтому для каждой строки 'n' нет списка деталей. Здесь он содержит два списка деталей):

WARNING: printValue:Row:[[Row:[lourdurajan@gmail.com, 9840618047], Row:[lourdurajan@sanmina.com, 9840618047]]]

Вот трассировка стека Java для второго оператора select:

SELECT `X`.`email`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`,
UNNEST(`PCOLLECTION`.`details`) AS `X`
May 08, 2019 11:23:30 AM org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner convertToBeamRel
INFO: SQLPlan>
LogicalProject(email=[$3])
  LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
    BeamIOSourceRel(table=[[beam, PCOLLECTION]])
    Uncollect
      LogicalProject(details=[$cor0.details_2])
        LogicalValues(tuples=[[{ 0 }]])

May 08, 2019 11:23:30 AM org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner convertToBeamRel
INFO: BEAMPlan>
BeamCalcRel(expr#0..4=[{inputs}], email=[$t3])
  BeamUnnestRel(unnestIndex=[2])
    BeamIOSourceRel(table=[[beam, PCOLLECTION]])

[WARNING] 
java.lang.NullPointerException
    at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:171)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamUnnestRel$Transform.expand(BeamUnnestRel.java:93)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamUnnestRel$Transform.expand(BeamUnnestRel.java:87)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:66)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList$0(BeamSqlRelUtils.java:47)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:48)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:64)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:36)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:111)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:79)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
    at com.sanmina.BeamSQLUnnest.main(BeamSQLUnnest.java:217)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)

0 ответов

Вы можете добиться этого с помощью BigQueryIO.

String Query ="SELECT `X`.`email`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`,
UNNEST(`PCOLLECTION`.`details`) AS `X`"

BigQueryIO.readTableRows().fromQuery(query).usingStandardSql()
Другие вопросы по тегам