Исключение таблицы Flink: совокупность окон может быть определена только для столбца атрибутов времени, но обнаружено TIMESTAMP(6)

Я использую flink 1.12.0. Пытаюсь преобразовать поток данных в таблицу A и запустить запрос sql в таблице A для агрегирования по окну, как показано ниже. Я использую столбец f2 в качестве поля типа данных временной метки.

          EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);

    Properties props = new Properties();

    props.setProperty("bootstrap.servers", BOOTSTRAP_SERVER);
    props.setProperty("schema.registry.url", xxx);
    props.setProperty("group.id", "test");
    props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());

    props.put("client.id", "flink-kafka-example");

    FlinkKafkaConsumer<PlaybackListening> kafkaConsumer = new FlinkKafkaConsumer<>(
            "test-topic",
            ConfluentRegistryAvroDeserializationSchema.forSpecific(
                    Avrotest.class, prodSchemaRegistryURL),
            props);

    DataStreamSource<Avrotest> stream =
            env.addSource(kafkaConsumer);
    Table tableA = tEnv.fromDataStream(stream, $("f0"), $("f1"),$("f2"));
    Table result =
            tEnv.sqlQuery("SELECT f0, sum(f1),f2 FROM "
                    + tableA + " GROUP BY TUMBLE(f2, INTERVAL '1' HOUR) ,f1" );

    
    tEnv.toAppendStream(result,user.class).print();

    env.execute("Flink kafka test");
}

Когда я выполняю приведенный выше код, я получаю

Исключение в потоке "main" org.apache.flink.table.api.TableException: совокупность окон может быть определена только для столбца атрибутов времени, но обнаружено TIMESTAMP(6). по адресу org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression(StreamLogicalWindowAggregateRule.scala:50) в org.apache.flink.table.planner.plan.rules.logical.LogicalWatchBindow. scala:81) на org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:333) на org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:542) на org.apache. .plan.hep.HepPlanner.applyRules(HepPlanner.java:407) в org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:243) в org.apache.calcite.plan.hep.HepInstruction$RuleInstance .выполнить (HepInstruction.java:127)

2 ответа

Чтобы использовать табличный API для выполнения окон во время событий для вашего потока данных, вам необходимо сначала назначить временные метки и водяные знаки. Вы должны сделать это перед звонком fromDataStream.

С Кафкой вообще лучше звонить assignTimestampsAndWatermarks прямо на FlinkKafkaConsumer. Смотрите водяные знаки документы , Кафка разъем DOCS , и документы Flink SQL для получения дополнительной информации.

3 шага:

  1. Первое назначение assignTimestampsAndWatermarks

У вас есть несколько типов стратегий.

Например:

       WatermarkStrategy<Row> customTime = WatermarkStrategy
                .<Row>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((event, timestamp) -> (long) event.getField("f2"));
  1. В вашем источнике назначьте то, что вы объявляете на шаге 1:
      env.addSource().assignTimestampsAndWatermarks(customTime)
  1. объявить таблицу и установить время строки для поля отметки времени:
      Table tableA = tEnv.fromDataStream(stream, $("f0"), $("f1"),$("f2").rowtime());
Другие вопросы по тегам