Каково влияние 'coalesce' перед 'partitionBy' в этом потоковом запросе?
У меня есть потоковый запрос (Spark Structured Streaming), который получает данные из темы Kafka (два раздела), например:
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "172.29.57.25:9092,172.29.57.30:9092")
.option("subscribe", "mytopic")
.load()
.select(from_json(col("value").cast("string"), schema).as("record")).select("record.*")
Я хочу выполнить простое агрегирование и разбиение по дате / часу, а также сохранить файлы в Parquet в HDFS, например:
val aggregationQuery = df.withColumn("ROP", from_unixtime((col("attributes.START_TIME")/1000), "yyyy-MM-dd HH:mm").cast(TimestampType))
.withColumn("date", to_date(col("ROP")))
.withColumn("hour", hour(col("ROP")))
.withColumn("timestamp", current_timestamp())
.withWatermark("timestamp", "0 minutes")
.groupBy(window(col("timestamp"), "10 seconds"), col("date"), col("hour"))
.agg(count("attributes.RECORDID").as("NumRecords"))
.coalesce(2)
Выход в паркет:
aggregationQuery.writeStream
.format("parquet")
.trigger(Trigger.ProcessingTime("10 seconds"))
.partitionBy("date", "hour")
.option("path", "hdfs://cloudera-cluster:8020/user/spark/proyecto1")
.option("checkpointLocation", "hdfs://cloudera-cluster:8020/user/spark/checkpointfolder")
.outputMode("append")
.start()
В результате я получаю структуру папок, похожую на этот пример:
user/spark/proyecto1/date=2015-08-18/hour=20
Внутри каждой папки я получаю 2 файла Parquet на триггер в процессе потоковой передачи.
Я хотел бы понять, что операции 'coalesce' и 'partitionBy' делают с моими данными, а также любые риски, связанные с этой конкретной комбинацией.
Кстати, в моем кластере только 2 узла.
2 ответа
coalesce
уменьшает параллелизм для всего конвейера до 2. Так как он не создает барьер для анализа, он распространяется обратно, поэтому на практике может быть лучше заменить его наrepartition
,partitionBy
создает структуру каталогов, которые вы видите, со значениями, закодированными в пути. Удаляет соответствующие столбцы из листовых файлов. Поскольку даты и часы имеют низкую мощность, особого риска в этом случае нет.
В совокупности эти два создают наблюдаемую структуру каталогов и ограничивают количество файлов в каждом листовом каталоге не более двух.
Объединение: это уменьшает количество разделов. В этом случае, если n было числом разделов по умолчанию, оно уменьшает его до 2. Он объединяет все разделы вслепую в каждом из ваших узлов в один раздел, что приводит к 2. Это может быть причина, по которой вы получаете 2 файла в папке.
Когда вы используете раздел по нему, он создает n чисел разделов на основе значений, которые у вас есть в столбце. Подобно тому, как каждый уникальный ключ помещается в соответствующий раздел. Если он не используется должным образом, у вас может получиться большое количество разделов, которые создадут накладные расходы в кластере из двух узлов