Как запустить задачи в Spark на разных рабочих?

У меня есть следующий код для Spark:

package my.spark;

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;

public class ExecutionTest {    
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName("ExecutionTest")
                .getOrCreate();

        JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

        int slices = 2;
        int n = slices;
        List<String> list = new ArrayList<>(n);
        for (int i = 0; i < n; i++) {
            list.add("" + i);
        }

        JavaRDD<String> dataSet = jsc.parallelize(list, slices);

        dataSet.foreach(str -> {
            System.out.println("value: " + str);
            Thread.sleep(10000);
        });

        System.out.println("done");

        spark.stop();
    }

}

Я запустил главный узел и двух рабочих (все на localhost; Windows), используя команды:

bin\spark-class org.apache.spark.deploy.master.Master

и (два раза):

bin\spark-class org.apache.spark.deploy.worker.Worker spark://<local-ip>:7077

Все началось правильно.

После отправки моей работы с помощью команды:

bin\spark-submit --class my.spark.ExecutionTest --master spark://<local-ip>:7077 file:///<pathToFatJar>/FatJar.jar

Команда запущена, но value: 0 а также value: 1 выходы написаны одним из работников (как показано на Logs > stdout на странице, связанной с работником). Второй работник не имеет ничего в Logs > stdout, Насколько я понял, это означает, что каждую итерацию выполняет один и тот же работник.

Как запустить эти задачи на двух разных работающих рабочих?

1 ответ

Решение

Это возможно, но я не уверен, будет ли это работать правильно каждый раз и везде. Однако во время тестирования каждый раз он работал как положено.

Я проверил свой код, используя хост-компьютер с Windows 10 x64 и 4 виртуальных машины (ВМ): VirtualBox с ядром Debian 9 (растяжение) 4.9.0 x64, сеть только для хоста, Java 1.8.0_144, Apache Spark 2.2.0 для Hadoop 2.7 (spark-2.2.0-bin-hadoop2.7.tar.gz).

Я использовал master и 3 slave на VM и еще один slave на Windows:

  • debian-master - 1 процессор, 1 ГБ оперативной памяти
  • debian-slave1 - 1 процессор, 1 ГБ оперативной памяти
  • debian-slave2 - 1 процессор, 1 ГБ оперативной памяти
  • debian-slave3 - 2 процессора, 1 ГБ оперативной памяти
  • Windows-Slave - 4 процессора, 8 ГБ оперативной памяти

Я отправлял свои работы с компьютера с Windows на мастер, расположенный на виртуальной машине.

Начало такое же, как и раньше:

    SparkSession spark = SparkSession
            .builder()
            .config("spark.cores.max", coresCount) // not necessary
            .appName("ExecutionTest")
            .getOrCreate();

[важный] coresCount важно для разделения - я должен разделить данные, используя количество используемых ядер, а не количество рабочих / исполнителей.

Далее я должен создать JavaSparkContext и RDD. Повторное использование RDD позволяет выполнять несколько раз, вероятно, один и тот же набор работников.

    JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

    JavaRDD<Integer> rddList
          = jsc.parallelize(
                    IntStream.range(0, coresCount * 2)
                             .boxed().collect(Collectors.toList()))
               .repartition(coresCount);

Я создал rddList который имеет coresCount * 2 элементы. Количество элементов равно coresCount не позволяет работать на всех ассоциированных рабочих (в моем случае). Может быть, coresCount + 1 было бы достаточно, но я не проверял это как coresCount * 2 не так много, как хорошо.

Следующее, что нужно сделать, это запустить команды:

    List<String> hostsList
        = rddList.map(value -> {
                Thread.sleep(3_000);
                return InetAddress.getLocalHost().getHostAddress();
            })
            .distinct()
            .collect();

    System.out.println("-----> hostsList = " + hostsList);

Thread.sleep(3_000) необходимо для правильного распределения задач. 3 секунды мне достаточно. Вероятно, значение может быть меньше, а иногда, возможно, потребуется более высокое значение (я думаю, это значение зависит от того, насколько быстро работники получают задания для выполнения от мастера).

Приведенный выше код будет работать на каждом ядре, связанном с работником, поэтому более одного на каждого работника. Чтобы на каждом работнике выполнялась ровно одна команда, я использовал следующий код:

/* as static field of class */
private static final AtomicBoolean ONE_ON_WORKER = new AtomicBoolean(false);

...

    long nodeCount
        = rddList.map(value -> {
                Thread.sleep(3_000);
                if (ONE_ON_WORKER.getAndSet(true) == false) {
                    System.out.println("Executed on "
                            + InetAddress.getLocalHost().getHostName());
                    return 1;
                } else {
                    return 0;
                }
            })
            .filter(val -> val != 0)
            .count();

    System.out.println("-----> finished using #nodes = " + nodeCount);

И конечно же, в конце концов, остановка:

    spark.stop();
Другие вопросы по тегам