Apache Flink создает неверный план
Я создал простое задание для Apache Flink, в котором используется реализация PageRank, предоставляемая Gelly.
Локально, работает внутри IDE, все нормально. Однако я попытался отправить JAR с моим заданием экземпляру Flink, запущенному на моем компьютере, с помощью веб-интерфейса JobManager. Но вместо того, чтобы получить правильный план для Job и выполнить PageRank, Flink представляет и выполняет очень странный план, который учитывает только количество вершин графа.
Я провел некоторое исследование и отладку и обнаружил, что реализация PageRank, предоставляемая Gelly, начинает вычислять количество вершин графа, когда он не предоставляется в качестве параметра алгоритму:
if (numberOfVertices == 0) {
numberOfVertices = network.numberOfVertices();
}
Этот расчет подразумевает встроенную работу. Поскольку операторы ленивы, вычисления не запускаются. На сервере Flink первым делом необходимо получить план работы. Это делается в специальной среде, OptimizerPlanEnvironment
, что обеспечивает следующее result
метод:
public JobExecutionResult execute(String jobName) throws Exception {
Plan plan = createProgramPlan(jobName);
this.optimizerPlan = compiler.compile(plan);
// do not go on with anything now!
throw new ProgramAbortException();
}
Проблема исходит отсюда. Как только ProgramAbortException
выбрасывается, программа возвращает рассчитанный до сих пор план. Но был рассчитан только внутренний план работ, поэтому основной план работ никогда не вычисляется и не выполняется.
Это код, который я использовал:
public class Job {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Double, Double> graph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new VertexInit(), env);
graph.run(new PageRank<Long>(0.85, 10)).print();
}
private static class VertexInit implements MapFunction<Long, Double> {
@Override
public Double map(Long value) throws Exception { return 1.0; }
}
}
Если указано количество вершин, выполните, например, graph.run(new PageRank<Long>(0.85, 5, 10))
, нет проблем, план рассчитан правильно и рассчитан PageRank.
У меня вопрос: что я делаю не так? Или это какая-то реальная ошибка во Flink?
1 ответ
Проблема, как вы уже сказали, в том, что network.numberOfVertices
внутренние звонки count
на наборе данных вершин. Это запускает независимое задание Flink, которое вычисляет значение счетчика. Это значение обычно будет получено main
метод. Тем не менее, в случае отправки веб-клиента это не будет работать, из-за OptimizerPlanEnvironment
, который позволяет скомпилировать только одно задание Flink. Поведение аналогично режиму отдельного выполнения, который также не поддерживает активное выполнение плана.
На данный момент это ограничение веб-клиента Flink. Причиной такого поведения является то, что Flink не хочет блокировать поток обработчика канала Netty, который будет необходим для ожидания результата count
операция. Операция блокировки приведет к истощению пула потоков и отключению веб-интерфейса для этого сеанса до тех пор, пока он не будет разблокирован.