Apache beam Проблема схемы SqlTransforms

Я пытаюсь выполнить ETL, который включает загрузку файлов из HDFS, применение преобразований и запись их в Hive. Используя SqlTransforms для выполнения преобразований, следуя этому документу, я сталкиваюсь с проблемой ниже. Можете ли вы помочь?

java.lang.IllegalStateException: Cannot call getSchema when there is no schema
    at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328)
    at org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.<init>(BeamPCollectionTable.java:34)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.toTableMap(SqlTransform.java:105)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:90)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:77)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:339)
    at org.apache.beam.examples.SqlTest.runSqlTest(SqlTest.java:107)
    at org.apache.beam.examples.SqlTest.main(SqlTest.java:167)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)

Фрагмент кода:

PCollection<String> data = p.apply("ReadLines", TextIO.read().from(options.getInputFile()));

    if(options.getOutput().equals("hive")){
        Schema hiveTableSchema = Schema.builder()
                .addStringField("eid")
                .addStringField("name")
                .addStringField("salary")
                .addStringField("destination")
                .build();
          data.apply(ParDo.of(new DoFn<String, Row>() {
              @ProcessElement
              public void processElement(@Element String input, OutputReceiver<Row> out){
                  String[] values = input.split(",");
                  System.out.println(values);
                  Row row = Row.withSchema(hiveTableSchema)
                                .addValues(values)
                                .build();
                  out.output(row);
              }
          })).apply(SqlTransform.query("select eid, destination from PCOLLECTION"))

                .apply(ParDo.of(new DoFn<Row, HCatRecord>() {
                    @ProcessElement
                    public void processElement(@Element Row input, OutputReceiver<HCatRecord> out){
                        HCatRecord record = new DefaultHCatRecord(input.getFieldCount());
                        for(int i=0; i < input.getFieldCount(); i++){
                            record.set(i, input.getString(i));
                        }
                        out.output(record);
                    }
                        }))
                .apply("WriteData", HCatalogIO.write()
                        .withConfigProperties(configProperties)
                        .withDatabase("wmrpoc")
                        .withTable(options.getOutputTableName()));

1 ответ

Похоже, вам нужно установить схему на PCollection, В прохождении, которое вы связали, есть Create...withCoder() это обрабатывает это. В вашем случае схема не может быть выведена из вашего ParDo Единственная информация, которую Beam потенциально может рассмотреть, это то, что он выводит элементы типа Row но нет никакой информации, доступной ему ParDo даже придерживается единой схемы для всех выходов. Так что вам нужно позвонить pcollection.setRowSchema() прежде чем подать заявку SqlTransform сказать Beam, какую схему вы планируете использовать в своей конверсии ParDo,

Обновить

И похоже, что большинство из того, что вы делаете раньше HCatalog будет, вероятно, в конечном итоге сильно упрощен, например, представьте, что вам нужно только указать что-то вроде pipeline.apply(TextIO.readCsvRows(schema)).apply(sqlTransform)..., На самом деле Beam SQL поддерживает чтение файлов CSV без дополнительной конвертации ParDos (через TextTableProvider) но он не подключен к SqlTransform пока и доступно только через Beam SQL CLI

Другие вопросы по тегам