Каково влияние 'coalesce' перед 'partitionBy' в этом потоковом запросе?

У меня есть потоковый запрос (Spark Structured Streaming), который получает данные из темы Kafka (два раздела), например:

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "172.29.57.25:9092,172.29.57.30:9092")
  .option("subscribe", "mytopic")
  .load()
  .select(from_json(col("value").cast("string"), schema).as("record")).select("record.*")

Я хочу выполнить простое агрегирование и разбиение по дате / часу, а также сохранить файлы в Parquet в HDFS, например:

val aggregationQuery = df.withColumn("ROP", from_unixtime((col("attributes.START_TIME")/1000), "yyyy-MM-dd HH:mm").cast(TimestampType))
.withColumn("date", to_date(col("ROP")))
.withColumn("hour", hour(col("ROP")))
.withColumn("timestamp", current_timestamp())
.withWatermark("timestamp", "0 minutes")
.groupBy(window(col("timestamp"), "10 seconds"), col("date"), col("hour"))
.agg(count("attributes.RECORDID").as("NumRecords"))
.coalesce(2)

Выход в паркет:

aggregationQuery.writeStream
.format("parquet")
.trigger(Trigger.ProcessingTime("10 seconds"))
.partitionBy("date", "hour")
.option("path", "hdfs://cloudera-cluster:8020/user/spark/proyecto1")
.option("checkpointLocation", "hdfs://cloudera-cluster:8020/user/spark/checkpointfolder")
.outputMode("append")
.start()

В результате я получаю структуру папок, похожую на этот пример:

         user/spark/proyecto1/date=2015-08-18/hour=20

Внутри каждой папки я получаю 2 файла Parquet на триггер в процессе потоковой передачи.

Я хотел бы понять, что операции 'coalesce' и 'partitionBy' делают с моими данными, а также любые риски, связанные с этой конкретной комбинацией.

Кстати, в моем кластере только 2 узла.

2 ответа

Решение
  • coalesce уменьшает параллелизм для всего конвейера до 2. Так как он не создает барьер для анализа, он распространяется обратно, поэтому на практике может быть лучше заменить его на repartition,
  • partitionBy создает структуру каталогов, которые вы видите, со значениями, закодированными в пути. Удаляет соответствующие столбцы из листовых файлов. Поскольку даты и часы имеют низкую мощность, особого риска в этом случае нет.

В совокупности эти два создают наблюдаемую структуру каталогов и ограничивают количество файлов в каждом листовом каталоге не более двух.

Объединение: это уменьшает количество разделов. В этом случае, если n было числом разделов по умолчанию, оно уменьшает его до 2. Он объединяет все разделы вслепую в каждом из ваших узлов в один раздел, что приводит к 2. Это может быть причина, по которой вы получаете 2 файла в папке.

Когда вы используете раздел по нему, он создает n чисел разделов на основе значений, которые у вас есть в столбце. Подобно тому, как каждый уникальный ключ помещается в соответствующий раздел. Если он не используется должным образом, у вас может получиться большое количество разделов, которые создадут накладные расходы в кластере из двух узлов