Flink Table,Create table Ошибка типа массива «ValidationException»

Я создал таблицу flink, которая содержит поля типа данных, и тип ошибки не совпадает。 Я хочу знать, как создать временную таблицу, содержащую тип массива в таблице flink.

      public class FlinkConnectorClickhouse {
    public static void main(String[] args) throws Exception {
        // create environments of both APIs
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // create a DataStream
        DataStream<Order> dataStream = env.fromCollection(Arrays.asList(
                new Order(2L, "pen", 1, Arrays.asList("name01", "name02", "name03"), Arrays.asList(1, 2, 3)));
        Table inputTable = tableEnv.fromDataStream(dataStreamMap, $("user").as("user_a"), $("product"), $("amount"), $("name_list"), $("id_list"));

        // register the Table object as a view and query it
        tableEnv.createTemporaryView("InputTable", inputTable);

        tableEnv.executeSql("CREATE TABLE sink_table (\n" +
                "    `user_a` BIGINT,\n" +
                "    `product` VARCHAR,\n" +
                "    `amount` BIGINT,\n" +
                "    `name_list` ARRAY<STRING>,\n" +
                "    `id_list` ARRAY<INT>,\n" +
                "    PRIMARY KEY (user_a) NOT ENFORCED /* 如果指定 pk,进入 upsert 模式 */\n" +
                ") WITH (\n" +
                ")");

        TableResult resultTable = tableEnv.executeSql("INSERT INTO sink_table SELECT user_a, product, amount,name_list,id_list FROM InputTable");
        env.execute();
    }
    public static class Order {
        private Long user;
        private String product;
        private Integer amount;
        private List<String> name_list;
        private List<Integer> id_list;
}

1 ответ

                  ") WITH (\n" +
            ")");

вероятно, где что-то идет не так. Где у вас разъем для раковины? Кафка? Это улей? Это обычная база данных с JDBC?

Вы найдете список в разделе «Соединители API таблиц» здесь: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/. Убедитесь, что вы следуете правильной политике упаковки / развертывания / управления JAR.

Если вы новичок, вас может заинтересовать тот, который идет в комплекте с Flink:

      ) WITH (
  'connector' = 'filesystem',           -- required: specify the connector
  'path' = 'file:///path/to/whatever',  -- required: path to a directory
  'format' = 'csv'                     -- required: file system connector requires to specify a format,
)

или этот:

      WITH (
  'connector' = 'print'
);

или этот:

      WITH (
  'connector' = 'blackhole'
);
Другие вопросы по тегам