Spark Clustered By/Bucket по набору данных, не использующему память
Недавно я наткнулся здесь на Spark.
Я попытался имитировать это для исходного файла 1.1TB от S3 (уже в паркете). План состоит в том, чтобы полностью избежать перемешивания, так как большинство наборов данных всегда объединяются в столбце "id". Вот что я делаю:
myDf.repartition(20)
.write.partitionBy("day")
.option("mode", "DROPMALFORMED")
.option("compression", "snappy")
.option("path","s3://my-bucket/folder/1year_data_bucketed/").mode("overwrite")
.format("parquet").bucketBy(20,"id").sortBy("id").saveAsTable("myTable1YearBucketed")
В другом кластере EMR я создаю таблицу и получаю к ней доступ.
CREATE TABLE newtable_on_diff_cluster (id string, day date, col1 double, col2 double) USING PARQUET OPTIONS (
path "s3://my-bucket/folder/1year_data_bucketed/"
)
CLUSTERED BY (id) INTO 20 BUCKETS
Создайте фрейм данных scala и объедините его с другой таблицей из тех же 20 блоков столбца id.
val myTableBucketedDf = spark.table("newtable_on_diff_cluster")
val myDimTableBucketedDf = spark.table("another_table_with_same_bucketing")
val joinedOutput = myTableBucketedDf.join(myDimTableBucketedDf, "id")
joinedOutput.show()
Вот мои вопросы:
- Я вижу, что даже с перераспределением в плане объяснения все еще удаляется случайное перемешивание, и это хорошо. Есть ли какие-либо проблемы с использованием перераспределения, раздела, bucketBy вышеописанным способом?
- Вышеупомянутое соединение не похоже на использование памяти в моем кластере EMR от Ganglia. При объединении обычных файлов в формате паркета без группирования они, кажется, работают быстрее в памяти при меньшем количестве дневных разделов. Я не проверял это больше дня. Как именно здесь обрабатывается объединение? Есть ли способ избежать оператора CREATE TABLE sql и использовать метаданные паркета для определения схемы таблицы с использованием scala? Я не хочу повторять имена столбцов, типы данных, когда они действительно доступны в паркете.
- Каково идеальное количество сегментов или размер отдельного файла после сегмента с точки зрения доступной памяти у исполнителя? Если уникальное количество значений в столбце идентификатора находится в диапазоне ~100 ММ, то, если я правильно понимаю, 20 сегментов будут делить каждый сегмент на 5-мм уникальные идентификаторы. Я понимаю, что сортировка здесь не соблюдается из-за того, что Spark для BucketBy создает несколько файлов. Какова рекомендация для перераспределения / размера конечного файла / количества сегментов в этом случае.