Apache Beam SQLTransform: невозможно вызвать getSchema при отсутствии схемы
Я пытаюсь применить SQLTransform на PCollection<Object>
, Здесь преобразование CustomSource генерирует Pojo во время выполнения. Hence, тип объекта, для которого выполняется SQLTransform, неизвестен во время компиляции.
Pipeline p = Pipeline.create(options);
PCollection<Object> objs = p.apply(new CustomSource());
Schema type = Schema.builder().addInt32Field("c1").addStringField("c2").addDoubleField("c3").build();
PCollectionTuple.of(new TupleTag<>("somedata"), objs).apply(SqlTransform.query("SELECT c1 FROM somedata"))
.setSchema(type, SerializableFunctions.identity(), SerializableFunctions.identity());
p.run().waitUntilFinish();
Я предоставил схему SQLTransform
с setSchema
и все же я получаю ошибку, а именно
java.lang.IllegalStateException: Cannot call getSchema when there is no schema
at org.apache.beam.sdk.values.PCollection.getSchema(PCollection.java:328)
PCollection.java:328
at org.apache.beam.sdk.extensions.sql.impl.schema.BeamPCollectionTable.<init>(BeamPCollectionTable.java:34)
Можно ли генерировать объекты Pojo во время выполнения и запускать на них sqltransforms, предоставляя информацию о схеме для преобразования?
Вот класс CustomSource для справки:
import java.util.HashMap;
import java.util.Map;
import com.beaconinside.messages.PojoGenerator;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import javassist.CannotCompileException;
import javassist.NotFoundException;
public class CustomSource extends PTransform<PBegin, PCollection<Object>> {
Map<String, Class<?>> props;
Class<?> clazz;
String data = "{\"c1\": 1, \"c2\": \"row\", \"c3\": 2.0}";
public CustomSource() throws NotFoundException, CannotCompileException {
props = new HashMap<String, Class<?>>();
props.put("c1", Integer.class);
props.put("c2", String.class);
props.put("c3", Double.class);
clazz = PojoGenerator.generate("net.javaforge.blog.javassist.PojoGenerated", props);
}
@Override
public PCollection<Object> expand(PBegin input) {
return input.apply(Create.of(data)).setCoder(StringUtf8Coder.of()).apply(new SensorSource(clazz, props));
// return input.apply(Create.of(data));
}
}
1 ответ
Я думаю, что ваша setSchema просто установить схему вывода PCollection из SQLTransform
, Вы также должны установить схему на PCollection<Object> objs
,
Приведенный выше ответ верен, что PCollection<Object>
также должен позвонить setSchema
для определения схемы входных данных и функций преобразования объекта строки. Если у вас есть несколько PCollection для создания PCollectionTuple, PCollections должны вызывать setSchema соответственно. PCollectionTuple не требует вызова setSchema, поскольку схема вывода может быть выведена из команды SQL.
Используйте setRowSchema, как показано ниже
PCollection<Row> testApps = PBegin.in(p).apply(Create.of(row1,row2,row3).withCoder(RowCoder.of(appSchema)))
.setRowSchema(appSchema);