Набор данных контрольных точек Spark Scala, показывающий.isCheckpointed = false после действия, но каталоги записаны

Кажется, есть несколько сообщений по этому вопросу, но ни одна из них не отвечает тому, что я понимаю.

Следующий код выполняется на DataBricks:

spark.sparkContext.setCheckpointDir("/dbfs/FileStore/checkpoint/cp1/loc7")
val checkpointDir = spark.sparkContext.getCheckpointDir.get
val ds = spark.range(10).repartition(2)
ds.cache()
ds.checkpoint()
ds.count()
ds.rdd.isCheckpointed  

Добавлено улучшение сортов:

...
val ds2 = ds.checkpoint(eager=true)
println(ds2.queryExecution.toRdd.toDebugString)
...

возвращает:

(2) MapPartitionsRDD[307] at toRdd at command-1835760423149753:13 []
 |  MapPartitionsRDD[305] at checkpoint at command-1835760423149753:12 []
 |  ReliableCheckpointRDD[306] at checkpoint at command-1835760423149753:12 []
 checkpointDir: String = dbfs:/dbfs/FileStore/checkpoint/cp1/loc10/86cc77b5-27c3-4049-9136-503ddcab0fa9
 ds: org.apache.spark.sql.Dataset[Long] = [id: bigint]
 ds2: org.apache.spark.sql.Dataset[Long] = [id: bigint]
 res53: Boolean = false

Вопрос 1:

Обе команды ds.rdd.isCheckpointed или ds2.rdd.isCheckpointed возвращают значение False, хотя с подсчетом у меня не ленивая ситуация. Почему, когда, в частности,../loc 7 и 10 пишутся с (частичными) файлами? Также мы можем видеть, что ReliableCheckPoint!

Не очень хорошо объяснил всю концепцию. Пытаюсь разобраться с этим.

Вопрос 2 - дополнительный вопрос:

Кеш действительно необходим или нет в последних версиях Spark 2.4? Новая ветка на ds, если она не кэшируется, вызовет ли она пересчет или это лучше? Кажется странным, что данные контрольных точек не будут использоваться, или мы можем сказать, что Spark действительно не знает, что лучше?

Из High Performance Spark у меня складывается смешанное впечатление, что проверка наведения не очень рекомендуется, но опять же это так.

1 ответ

Решение

TL; DR: Вы не проверяете объект, который фактически проверен:

ds2.queryExecution.toRdd.dependencies(0).rdd.isCheckpointed
// Boolean = true

ds.rdd.isCheckpointed или ds2.rdd.isCheckpointed оба возвращают False

Это ожидаемое поведение. Указываемый контрольный объект - это не преобразованный RDD (который является результатом дополнительных преобразований, необходимых для преобразования во внешнее представление), на который вы ссылаетесь, а внутренний объект RDD (фактически, как вы видите выше, это даже не самый последний внутренний СДР, но его родитель).

Кроме того, в первом случае вы просто используете неправильный Dataset возражать вообще - как объяснено в связанном ответе Dataset.checkpoint возвращает новый Dataset

хотя с подсчетом у меня не ленивая ситуация

Это не имеет особого смысла. По умолчанию checkpoint реализация eager следовательно, это сила оценивает. Даже если бы не это, Dataset.count не правильный способ форсировать оценку.

Кеш действительно нужен или нет с последней версией

Как вы можете видеть в связанном источнике, Dataset.checkpoint использования RDD.checkpoint внутренне, поэтому применяется то же правило. Однако вы уже выполняете отдельное действие для форсирования контрольной точки, поэтому дополнительное кэширование, особенно с учетом стоимости Dataset настойчивость, может быть излишним.

Конечно, если вы сомневаетесь, вы можете рассмотреть бенчмаркинг в конкретном контексте.

Другие вопросы по тегам