Потоковая передача из CSV-файлов с помощью Spark

Я пытаюсь использовать Spark Streaming для сбора данных из файлов CSV, расположенных на NFS. У меня очень простой код, и до сих пор я запускал его только в spark-shell, но даже там я сталкиваюсь с некоторыми проблемами.

Я запускаю spark-shell с автономным Spark-мастером с 6 рабочими и передаю следующие аргументы в spark-shell:

--master spark: //master.host: 7077 --num-executors 3 --conf spark.cores.max = 10

Это код:

val schema = spark.read.option("header", true).option("mode", "PERMISSIVE").csv("/nfs/files_to_collect/schema/schema.csv").schema
val data = spark.readStream.option("header", true).schema(schema).csv("/nfs/files_to_collect/jobs/jobs*")
val query = data.writeStream.format("console").start()

В этом пути NFS есть 2 файла, каждый размером около 200 МБ. Когда я вызываю writeStream, я получаю следующее предупреждение:

"17/11/13 22:56:31 WARN TaskSetManager: Этап 2 содержит задачу очень большого размера (106402 КБ). Максимальный рекомендуемый размер задачи составляет 100 КБ".

Глядя в основной интерфейс Spark, я вижу, что использовался только один исполнитель - было создано четыре задачи, каждая из которых читала ~50% каждого CSV-файла.

Мои вопросы:

1) Чем больше файлов находится в пути NFS, тем больше памяти требуется драйверу - при 2 файлах происходит сбой, пока я не увеличу его память до 2g. С 4 файлами нужно не менее 8г. Что делает драйвер, что ему нужно так много памяти?

2) Как мне контролировать параллельность чтения файлов CSV? Я заметил, что чем больше файлов, тем больше задач создается, но можно ли это контролировать вручную?

0 ответов

Другие вопросы по тегам