Apache beam Проблема схемы SqlTransforms
Я пытаюсь выполнить ETL, который включает загрузку файлов из HDFS, применение преобразований и запись их в Hive. Используя SqlTransforms для выполнения преобразований, следуя этому документу, я сталкиваюсь с проблемой ниже. Можете ли вы помочь?
java.lang.IllegalStateException: Cannot call getSchema when there is no schema
at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328)
at org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.<init>(BeamPCollectionTable.java:34)
at org.apache.beam.sdk.extensions.sql.SqlTransform.toTableMap(SqlTransform.java:105)
at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:90)
at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:77)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:471)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:339)
at org.apache.beam.examples.SqlTest.runSqlTest(SqlTest.java:107)
at org.apache.beam.examples.SqlTest.main(SqlTest.java:167)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)
Фрагмент кода:
PCollection<String> data = p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
if(options.getOutput().equals("hive")){
Schema hiveTableSchema = Schema.builder()
.addStringField("eid")
.addStringField("name")
.addStringField("salary")
.addStringField("destination")
.build();
data.apply(ParDo.of(new DoFn<String, Row>() {
@ProcessElement
public void processElement(@Element String input, OutputReceiver<Row> out){
String[] values = input.split(",");
System.out.println(values);
Row row = Row.withSchema(hiveTableSchema)
.addValues(values)
.build();
out.output(row);
}
})).apply(SqlTransform.query("select eid, destination from PCOLLECTION"))
.apply(ParDo.of(new DoFn<Row, HCatRecord>() {
@ProcessElement
public void processElement(@Element Row input, OutputReceiver<HCatRecord> out){
HCatRecord record = new DefaultHCatRecord(input.getFieldCount());
for(int i=0; i < input.getFieldCount(); i++){
record.set(i, input.getString(i));
}
out.output(record);
}
}))
.apply("WriteData", HCatalogIO.write()
.withConfigProperties(configProperties)
.withDatabase("wmrpoc")
.withTable(options.getOutputTableName()));
1 ответ
Похоже, вам нужно установить схему на PCollection
, В прохождении, которое вы связали, есть Create...withCoder()
это обрабатывает это. В вашем случае схема не может быть выведена из вашего ParDo
Единственная информация, которую Beam потенциально может рассмотреть, это то, что он выводит элементы типа Row
но нет никакой информации, доступной ему ParDo
даже придерживается единой схемы для всех выходов. Так что вам нужно позвонить pcollection.setRowSchema()
прежде чем подать заявку SqlTransform
сказать Beam, какую схему вы планируете использовать в своей конверсии ParDo
,
Обновить
И похоже, что большинство из того, что вы делаете раньше HCatalog
будет, вероятно, в конечном итоге сильно упрощен, например, представьте, что вам нужно только указать что-то вроде pipeline.apply(TextIO.readCsvRows(schema)).apply(sqlTransform)...
, На самом деле Beam SQL поддерживает чтение файлов CSV без дополнительной конвертации ParDos
(через TextTableProvider
) но он не подключен к SqlTransform
пока и доступно только через Beam SQL CLI