Как запустить задачи в Spark на разных рабочих?
У меня есть следующий код для Spark:
package my.spark;
import java.util.ArrayList;
import java.util.List;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SparkSession;
public class ExecutionTest {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("ExecutionTest")
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
int slices = 2;
int n = slices;
List<String> list = new ArrayList<>(n);
for (int i = 0; i < n; i++) {
list.add("" + i);
}
JavaRDD<String> dataSet = jsc.parallelize(list, slices);
dataSet.foreach(str -> {
System.out.println("value: " + str);
Thread.sleep(10000);
});
System.out.println("done");
spark.stop();
}
}
Я запустил главный узел и двух рабочих (все на localhost; Windows), используя команды:
bin\spark-class org.apache.spark.deploy.master.Master
и (два раза):
bin\spark-class org.apache.spark.deploy.worker.Worker spark://<local-ip>:7077
Все началось правильно.
После отправки моей работы с помощью команды:
bin\spark-submit --class my.spark.ExecutionTest --master spark://<local-ip>:7077 file:///<pathToFatJar>/FatJar.jar
Команда запущена, но value: 0
а также value: 1
выходы написаны одним из работников (как показано на Logs > stdout
на странице, связанной с работником). Второй работник не имеет ничего в Logs > stdout
, Насколько я понял, это означает, что каждую итерацию выполняет один и тот же работник.
Как запустить эти задачи на двух разных работающих рабочих?
1 ответ
Это возможно, но я не уверен, будет ли это работать правильно каждый раз и везде. Однако во время тестирования каждый раз он работал как положено.
Я проверил свой код, используя хост-компьютер с Windows 10 x64 и 4 виртуальных машины (ВМ): VirtualBox с ядром Debian 9 (растяжение) 4.9.0 x64, сеть только для хоста, Java 1.8.0_144, Apache Spark 2.2.0 для Hadoop 2.7 (spark-2.2.0-bin-hadoop2.7.tar.gz).
Я использовал master и 3 slave на VM и еще один slave на Windows:
- debian-master - 1 процессор, 1 ГБ оперативной памяти
- debian-slave1 - 1 процессор, 1 ГБ оперативной памяти
- debian-slave2 - 1 процессор, 1 ГБ оперативной памяти
- debian-slave3 - 2 процессора, 1 ГБ оперативной памяти
- Windows-Slave - 4 процессора, 8 ГБ оперативной памяти
Я отправлял свои работы с компьютера с Windows на мастер, расположенный на виртуальной машине.
Начало такое же, как и раньше:
SparkSession spark = SparkSession
.builder()
.config("spark.cores.max", coresCount) // not necessary
.appName("ExecutionTest")
.getOrCreate();
[важный] coresCount
важно для разделения - я должен разделить данные, используя количество используемых ядер, а не количество рабочих / исполнителей.
Далее я должен создать JavaSparkContext и RDD. Повторное использование RDD позволяет выполнять несколько раз, вероятно, один и тот же набор работников.
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
JavaRDD<Integer> rddList
= jsc.parallelize(
IntStream.range(0, coresCount * 2)
.boxed().collect(Collectors.toList()))
.repartition(coresCount);
Я создал rddList
который имеет coresCount * 2
элементы. Количество элементов равно coresCount
не позволяет работать на всех ассоциированных рабочих (в моем случае). Может быть, coresCount + 1
было бы достаточно, но я не проверял это как coresCount * 2
не так много, как хорошо.
Следующее, что нужно сделать, это запустить команды:
List<String> hostsList
= rddList.map(value -> {
Thread.sleep(3_000);
return InetAddress.getLocalHost().getHostAddress();
})
.distinct()
.collect();
System.out.println("-----> hostsList = " + hostsList);
Thread.sleep(3_000)
необходимо для правильного распределения задач. 3 секунды мне достаточно. Вероятно, значение может быть меньше, а иногда, возможно, потребуется более высокое значение (я думаю, это значение зависит от того, насколько быстро работники получают задания для выполнения от мастера).
Приведенный выше код будет работать на каждом ядре, связанном с работником, поэтому более одного на каждого работника. Чтобы на каждом работнике выполнялась ровно одна команда, я использовал следующий код:
/* as static field of class */
private static final AtomicBoolean ONE_ON_WORKER = new AtomicBoolean(false);
...
long nodeCount
= rddList.map(value -> {
Thread.sleep(3_000);
if (ONE_ON_WORKER.getAndSet(true) == false) {
System.out.println("Executed on "
+ InetAddress.getLocalHost().getHostName());
return 1;
} else {
return 0;
}
})
.filter(val -> val != 0)
.count();
System.out.println("-----> finished using #nodes = " + nodeCount);
И конечно же, в конце концов, остановка:
spark.stop();