Используйте collect() и env.execute() в одном задании

Я пытаюсь написать вычисление во Flink, которое требует двух этапов.

На первом этапе я создаю Graph и получаю его идентификаторы вершин:

List<String> ids = graph.getVertexIds().collect();

На втором этапе я хотел бы использовать эти идентификаторы для запуска SingleSourceShortestPath для каждой вершины.

for (String id: ids){
        System.out.println("Source Id: "+id);
        graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
    }

Он работает локально (в IntelliJ IDE и командной строке, используя ./bin/flink run ...), но когда я отправляю задание на Flink, используя его WebUI, программа просто выполняется до collect() метод и не запускает оставшуюся часть программы (для утверждения и print()).

В чем проблема?

Вот мой код:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.library.SingleSourceShortestPaths;

import java.util.ArrayList;
import java.util.List;

public class Main {
    public static void main(String[] args) throws Exception {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        Edge<String, Double> e1 = new Edge<String, Double>("1", "2", 0.5);
        Edge<String, Double> e2 = new Edge<String, Double>("2", "3", 0.5);
        Edge<String, Double> e3 = new Edge<String, Double>("4", "5", 0.5);
        Edge<String, Double> e4 = new Edge<String, Double>("5", "6", 0.5);
        Edge<String, Double> e5 = new Edge<String, Double>("7", "8", 0.5);


        List<Edge<String, Double>> edgeList = new ArrayList<Edge<String, Double>>();
        edgeList.add(e1);
        edgeList.add(e2);
        edgeList.add(e3);
        edgeList.add(e4);
        edgeList.add(e5);


        Graph<String, String, Double> graph = Graph.fromCollection(edgeList,
                new MapFunction<String, String>() {
                    public String map(String value) {
                        return value;
                    }
                }, env);

        List<String> ids = graph.getVertexIds().collect();

        for (String id: ids){
            System.out.println("Source Id: "+id);
            graph.run( new SingleSourceShortestPaths<String, String>(id, 10)).print();
        }
    }
}

1 ответ

Решение

Основываясь на этой ссылке, преобразования Flink ленивы, что означает, что они не выполняются до тех пор, пока не будет вызвана операция приемника.

Операция приемника в Flink запускает выполнение потока для получения желаемого результата программы, такого как сохранение результата в файловой системе или его печать на стандартный вывод.

Методы, такие как Dataset.collect(), Dataset.Count() а также Dataset.print() являются операциями приемника, которые запускают фактические преобразования данных.

Другие вопросы по тегам