Apache Flink создает неверный план

Я создал простое задание для Apache Flink, в котором используется реализация PageRank, предоставляемая Gelly.

Локально, работает внутри IDE, все нормально. Однако я попытался отправить JAR с моим заданием экземпляру Flink, запущенному на моем компьютере, с помощью веб-интерфейса JobManager. Но вместо того, чтобы получить правильный план для Job и выполнить PageRank, Flink представляет и выполняет очень странный план, который учитывает только количество вершин графа.

Я провел некоторое исследование и отладку и обнаружил, что реализация PageRank, предоставляемая Gelly, начинает вычислять количество вершин графа, когда он не предоставляется в качестве параметра алгоритму:

if (numberOfVertices == 0) {
    numberOfVertices = network.numberOfVertices();
}

Этот расчет подразумевает встроенную работу. Поскольку операторы ленивы, вычисления не запускаются. На сервере Flink первым делом необходимо получить план работы. Это делается в специальной среде, OptimizerPlanEnvironment, что обеспечивает следующее result метод:

public JobExecutionResult execute(String jobName) throws Exception {
    Plan plan = createProgramPlan(jobName);
    this.optimizerPlan = compiler.compile(plan);

    // do not go on with anything now!
    throw new ProgramAbortException();
}

Проблема исходит отсюда. Как только ProgramAbortException выбрасывается, программа возвращает рассчитанный до сих пор план. Но был рассчитан только внутренний план работ, поэтому основной план работ никогда не вычисляется и не выполняется.

Это код, который я использовал:

public class Job {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        Graph<Long, Double, Double> graph = Graph.fromDataSet(
            PageRankData.getDefaultEdgeDataSet(env), new VertexInit(), env);
        graph.run(new PageRank<Long>(0.85, 10)).print();
    }

    private static class VertexInit implements MapFunction<Long, Double> {
        @Override
        public Double map(Long value) throws Exception { return 1.0; }
    }
}

Если указано количество вершин, выполните, например, graph.run(new PageRank<Long>(0.85, 5, 10)), нет проблем, план рассчитан правильно и рассчитан PageRank.

У меня вопрос: что я делаю не так? Или это какая-то реальная ошибка во Flink?

1 ответ

Решение

Проблема, как вы уже сказали, в том, что network.numberOfVertices внутренние звонки count на наборе данных вершин. Это запускает независимое задание Flink, которое вычисляет значение счетчика. Это значение обычно будет получено main метод. Тем не менее, в случае отправки веб-клиента это не будет работать, из-за OptimizerPlanEnvironment, который позволяет скомпилировать только одно задание Flink. Поведение аналогично режиму отдельного выполнения, который также не поддерживает активное выполнение плана.

На данный момент это ограничение веб-клиента Flink. Причиной такого поведения является то, что Flink не хочет блокировать поток обработчика канала Netty, который будет необходим для ожидания результата count операция. Операция блокировки приведет к истощению пула потоков и отключению веб-интерфейса для этого сеанса до тех пор, пока он не будет разблокирован.

Другие вопросы по тегам