Apache Beam SQLTransform: невозможно вызвать getSchema при отсутствии схемы

Я пытаюсь применить SQLTransform на PCollection<Object>, Здесь преобразование CustomSource генерирует Pojo во время выполнения. Hence, тип объекта, для которого выполняется SQLTransform, неизвестен во время компиляции.

        Pipeline p = Pipeline.create(options);

        PCollection<Object> objs = p.apply(new CustomSource());

        Schema type = Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build();
        PCollectionTuple.of(new TupleTag<>("somedata"), objs).apply(SqlTransform.query("SELECT c1 FROM somedata"))
                .setSchema(type, SerializableFunctions.identity(), SerializableFunctions.identity());
        p.run().waitUntilFinish();

Я предоставил схему SQLTransform с setSchema и все же я получаю ошибку, а именно

java.lang.IllegalStateException: Cannot call getSchema when there is no schema
    at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328)
PCollection.java:328
    at org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.<init>(BeamPCollectionTable.java:34)

Можно ли генерировать объекты Pojo во время выполнения и запускать на них sqltransforms, предоставляя информацию о схеме для преобразования?

Вот класс CustomSource для справки:

import java.util.HashMap;
import java.util.Map;

import com.beaconinside.messages.PojoGenerator;

import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;

import javassist.CannotCompileException;
import javassist.NotFoundException;

public class CustomSource extends PTransform<PBegin, PCollection<Object>> {

    Map<String, Class<?>> props;
    Class<?> clazz;
    String data = "{\"c1\": 1, \"c2\": \"row\", \"c3\": 2.0}";

    public CustomSource() throws NotFoundException, CannotCompileException {
        props = new HashMap<String, Class<?>>();
        props.put("c1", Integer.class);
        props.put("c2", String.class);
        props.put("c3", Double.class);
        clazz = PojoGenerator.generate("net.javaforge.blog.javassist.PojoGenerated", props);
    }

    @Override
    public PCollection<Object> expand(PBegin input) {
        return input.apply(Create.of(data)).setCoder(StringUtf8Coder.of()).apply(new SensorSource(clazz, props));
        // return input.apply(Create.of(data));
    }

}

1 ответ

Я думаю, что ваша setSchema просто установить схему вывода PCollection из SQLTransform, Вы также должны установить схему на PCollection<Object> objs,

Приведенный выше ответ верен, что PCollection<Object> также должен позвонить setSchemaдля определения схемы входных данных и функций преобразования объекта строки. Если у вас есть несколько PCollection для создания PCollectionTuple, PCollections должны вызывать setSchema соответственно. PCollectionTuple не требует вызова setSchema, поскольку схема вывода может быть выведена из команды SQL.

Используйте setRowSchema, как показано ниже

PCollection<Row> testApps = PBegin.in(p).apply(Create.of(row1,row2,row3).withCoder(RowCoder.of(appSchema)))
                .setRowSchema(appSchema);
Другие вопросы по тегам