ValidationException при использовании таблиц AggregateFunction и ResultTypeQueryable

Я использую локальный кластер Flink 1.6, настроенный на использование flink-table баночка (то есть баночка моей программы не включает flink-table). Со следующим кодом

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;

import java.util.ArrayList;
import java.util.List;

public class JMain {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(execEnv);

        tableEnv.registerFunction("enlist", new Enlister());

        DataSource<Tuple2<String, String>> source = execEnv.fromElements(
                new Tuple2<>("a", "1"),
                new Tuple2<>("a", "2"),
                new Tuple2<>("b", "3")
        );

        Table table = tableEnv.fromDataSet(source, "a, b")
                .groupBy("a")
                .select("enlist(a, b)");

        tableEnv.toDataSet(table, Row.class)
                .print();
    }

    public static class Enlister
            extends AggregateFunction<List<String>, ArrayList<String>>
            implements ResultTypeQueryable<List<String>>
    {
        @Override
        public ArrayList<String> createAccumulator() {
            return new ArrayList<>();
        }

        @Override
        public List<String> getValue(ArrayList<String> acc) {
            return acc;
        }

        @SuppressWarnings("unused")
        public void accumulate(ArrayList<String> acc, String a, String b) {
            acc.add(a + ":" + b);
        }

        @SuppressWarnings("unused")
        public void merge(ArrayList<String> acc, Iterable<ArrayList<String>> it) {
            for (ArrayList<String> otherAcc : it) {
                acc.addAll(otherAcc);
            }
        }

        @SuppressWarnings("unused")
        public void resetAccumulator(ArrayList<String> acc) {
            acc.clear();
        }

        @Override
        public TypeInformation<List<String>> getProducedType() {
            return TypeInformation.of(new TypeHint<List<String>>(){});
        }
    }
}

Я получаю это странное исключение:

org.apache.flink.table.api.ValidationException: Expression Enlister(List('a, 'b)) failed on input check: Given parameters do not match any signature. 
Actual: (java.lang.String, java.lang.String) 
Expected: (java.lang.String, java.lang.String)

Однако, если я не реализую ResultTypeQueryableЯ получаю ожидаемый результат:

Starting execution of program
[b:3]
[a:1, a:2]
Program execution finished
Job with JobID 20497bd3efe44fab0092a05a8eb7d9de has finished.
Job Runtime: 270 ms
Accumulator Results: 
- 56e0e5a9466b84ae44431c9c4b7aad71 (java.util.ArrayList) [2 elements]

Мой фактический вариант использования требует ResultTypeQueryableпотому что в противном случае я получаю это исключение:

The return type of function ... could not be determined automatically,
due to type erasure. You can give type information hints by using the
returns(...) method on the result of the transformation call,
or by letting your function implement the 'ResultTypeQueryable' interface

В любом случае, я могу это исправить?

2 ответа

Решение

Я пытался воспроизвести проблему в маленькой программе, но не смог, это происходит только в моем более крупном проекте. К сожалению, переопределение getResultType() а также getAccumulatorType() не помогло, я получил это исключение в этом случае:

java.lang.IndexOutOfBoundsException
    at org.apache.flink.api.java.typeutils.TupleTypeInfoBase.getTypeAt(TupleTypeInfoBase.java:199)
    at org.apache.flink.api.java.typeutils.RowTypeInfo.getTypeAt(RowTypeInfo.java:179)
    at org.apache.flink.api.common.operators.Keys$ExpressionKeys.isSortKey(Keys.java:444)
    at org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:150)
    at org.apache.flink.api.java.operators.SortPartitionOperator.<init>(SortPartitionOperator.java:75)
    at org.apache.flink.api.java.DataSet.sortPartition(DataSet.java:1414)

Я на самом деле тоже получил это исключение даже без переопределения. Единственное, что сработало для меня, было по существу:

String[] fieldNames = new String[] {
        "result"
};
TypeInformation<?>[] types = new TypeInformation[] {
        TypeInformation.of(new TypeHint<List<String>>(){})
};

tableEnv.toDataSet(table, Types.ROW(fieldNames, types))...

Внедрение ResultTypeQueryable не правильно в этом случае. Исключение вводит в заблуждение. Вместо переопределения getResultType() а также getAccumulatorType(), Причиной этого является то, что дженерики обычно вызывают проблемы (из-за стирания типа Java) при генерации информации о типе для сериализаторов.

Другие вопросы по тегам