Ошибка преобразования вложенных классов в DataStream

Я использую flink 1.13. Я пытаюсь преобразовать результаты таблицы в поток данных следующим образом, но продолжаю получать ошибку.

      public class HybridTrial {
  public static class Address {
    public String street;
    public String houseNumber;

    public Address() {}

    public Address(String street, String houseNumber) {
      this.street = street;
      this.houseNumber = houseNumber;
    }
  }

  public static class User {
    public String name;

    public Integer score;

    public LocalDateTime event_time;

    public Address address;

    // default constructor for DataStream API
    public User() {}

    // fully assigning constructor for Table API
    public User(String name, Integer score, LocalDateTime event_time, Address address) {
      this.name = name;
      this.score = score;
      this.event_time = event_time;
      this.address = address;
    }
  }

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<User> dataStream =
        env.fromElements(
                new User("Alice", 4, LocalDateTime.now(), new Address()),
                new User("Bob", 6, LocalDateTime.now(), new Address("NBC", "204")),
                new User("Alice", 10, LocalDateTime.now(), new Address("ABC", "1033")))
            .assignTimestampsAndWatermarks(
                WatermarkStrategy.<User>forBoundedOutOfOrderness(Duration.ofSeconds(60)));

    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    Table table =
        tableEnv.fromDataStream(
            dataStream, Schema.newBuilder().build());

    table.printSchema();

    Table t = table.select($("*"));

    DataStream<User> dsRow = tableEnv.toDataStream(t,User.class);
    dsRow.print();

    env.execute();
  }
}

Я получил следующую ошибку:

      Exception in thread "main" org.apache.flink.table.api.ValidationException: Column types of query result and sink for registered table 'default_catalog.default_database.Unregistered_DataStream_Sink_1' do not match.
Cause: Incompatible types for sink column 'event_time' at position 2.

Query schema: [name: STRING, score: INT, event_time: RAW('java.time.LocalDateTime', '...'), address: *flinkSqlExperiments.HybridTrial$Address<`street` STRING, `houseNumber` STRING>*]
Sink schema:  [name: STRING, score: INT, event_time: TIMESTAMP(9), address: *flinkSqlExperiments.HybridTrial$Address<`street` STRING, `houseNumber` STRING>*]
    at org.apache.flink.table.planner.connectors.DynamicSinkUtils.createSchemaMismatchException(DynamicSinkUtils.java:437)
    at org.apache.flink.table.planner.connectors.DynamicSinkUtils.validateSchemaAndApplyImplicitCast(DynamicSinkUtils.java:256)
    at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertSinkToRel(DynamicSinkUtils.java:198)
    at org.apache.flink.table.planner.connectors.DynamicSinkUtils.convertExternalToRel(DynamicSinkUtils.java:143)

Я также пробовал настраивать преобразование из DataStream в таблицу, но при преобразовании из таблицы в DataStream все еще возникала ошибка. Я застрял, поэтому приветствуется любая помощь.

2 ответа

Автоматическое извлечение типов на основе отражения в DataStream не так мощно, как в API таблиц. Это также связано с проблемами обратной совместимости состояний в API DataStream.

В event_time поле это GenericType в DataStream API, что приводит к RAWв API таблиц. У вас есть следующие возможности:

  • Дать должное в fromElements
  • Отменить TypeInformation с использованием DataType в fromDataStream

Моя проблема была решена путем регистрации POJO, используя метод ниже.

env.getConfig().registerPojoType(YourClass.class);

Вы можете использовать любой определяемый пользователем DTO и зарегистрироваться как POJO.

Другие вопросы по тегам