Присоединяйтесь к DataFrames для создания CartesianProduct в Физическом плане на Spark 1.5.2

Я сталкиваюсь с проблемой производительности при объединении фреймов данных, созданных из файлов avro, с использованием библиотеки spark-avro.

Фреймы данных создаются из файлов размером 120 КБ и имеют общий размер около 1,5 ТБ. Два фрейма данных очень огромны с миллиардами записей.

Объединение для этих двух DataFrames выполняется вечно. Этот процесс выполняется на кластере пряжи с 300 исполнителями с 4 ядрами и 8 ГБ памяти.

Любое понимание этого объединения поможет. Я разместил план объяснения ниже. Я замечаю декартово произведение в физическом плане. Мне интересно, если это вызывает проблему производительности.

Ниже приведен логический план и физический план. (Из-за конфиденциальности я не могу разместить здесь ни одно из имен столбцов или имен файлов)

  == Optimized Logical Plan ==
Limit 21
 Join Inner, [ Join Conditions ]
  Join Inner, [ Join Conditions ]
   Project [ List of columns ]
    Relation [ List of columns ] AvroRelation[ fileName1 ] -- large file - .5 billion records
   InMemoryRelation  [List of columns ], true, 10000, StorageLevel(true, true, false, true, 1), (Repartition 1, false), None
  Project [ List of Columns ]
   Relation[ List of Columns] AvroRelation[ filename2 ] -- another large file - 800 million records

== Physical Plan ==
Limit 21
 Filter (filter conditions)
  CartesianProduct
   Filter (more filter conditions)
    CartesianProduct
     Project (selecting a few columns and applying a UDF to one column)
      Scan AvroRelation[avro file][ columns in Avro File ]
     InMemoryColumnarTableScan [List of columns ], true, 10000, StorageLevel(true, true, false, true, 1), (Repartition 1, false), None)
   Project [ List of Columns ]
    Scan AvroRelation[Avro File][List of Columns]

Code Generation: true

Код показан ниже.

val customerDateFormat = new SimpleDateFormat ("гггг / мм / дд");

val dates = new RetailDates()
val dataStructures = new DataStructures()

// Reading CSV Format input files -- retailDates
// This DF has 75 records
val retailDatesWithSchema = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("delimiter", ",")
  .schema(dates.retailDatesSchema)
  .load(datesFile)
  .coalesce(1)
  .cache()

// Create UDF to convert String to Date
val dateUDF: (String => java.sql.Date) = (dateString: String) => new java.sql.Date(customerDateFormat.parse(dateString).getTime())
val stringToDateUDF = udf(dateUDF)

// Reading Avro Format Input Files
// This DF has 500 million records
val userInputDf = sqlContext.read.avro(“customerLocation")
val userDf = userInputDf.withColumn("CAL_DT", stringToDateUDF(col("CAL_DT"))).select(
                      "CAL_DT","USER_ID","USER_CNTRY_ID"
                    )

val userDimDf = sqlContext.read.avro(userDimFiles).select("USER_ID","USER_CNTRY_ID","PRIMARY_USER_ID") // This DF has 800 million records

val retailDatesWithSchemaBroadcast = sc.broadcast(retailDatesWithSchema)
val userDimDfBroadcast = sc.broadcast(userDimDf)

val userAndRetailDates = userDnaSdDf
  .join((retailDatesWithSchemaBroadcast.value).as("retailDates"),
  userDf("CAL_DT") between($"retailDates.WEEK_BEGIN_DATE", $"retailDates.WEEK_END_DATE")
  , "inner")



val userAndRetailDatesAndUserDim = userAndRetailDates
  .join((userDimDfBroadcast.value)
    .withColumnRenamed("USER_ID", "USER_DIM_USER_ID")
    .withColumnRenamed("USER_CNTRY_ID","USER_DIM_COUNTRY_ID")
    .as("userdim")
    , userAndRetailDates("USER_ID") <=> $"userdim.USER_DIM_USER_ID"
      && userAndRetailDates("USER_CNTRY_ID") <=> $"userdim.USER_DIM_COUNTRY_ID"
    , "inner")

userAndRetailDatesAndUserDim.show()

Спасибо, Прасад.

1 ответ

Здесь не так много работы (даже если ваши данные или даже имена столбцов / таблиц являются конфиденциальными, было бы полезно увидеть некоторый код, который мог бы показать, чего вы пытаетесь достичь), но CartesianProduct это определенно проблема. O(N^2) - это то, чего вы действительно хотите избежать в больших наборах данных, и в данном конкретном случае оно затрагивает все слабые места в Spark.

Вообще говоря, если объединение расширяется до явного декартова произведения или эквивалентной операции, это означает, что выражение объединения не основано на равенстве и поэтому не может быть оптимизировано с помощью объединения на основе случайного (или широковещательного + хеширования) соединения (SortMergeJoin, HashJoin).

Редактировать:

В вашем случае, скорее всего, проблема в следующем состоянии:

userDf("CAL_DT") between(
  $"retailDates.WEEK_BEGIN_DATE",  $"retailDates.WEEK_END_DATE")

Было бы лучше вычислить например WEEK_BEGIN_DATE на userDf и присоединиться напрямую

$"userDf.WEEK_BEGIN_DATE" === $"retailDates.WEEK_BEGIN_DATE"

Другое небольшое улучшение - это анализ даты без использования UDF, например, с unix_timestamp функция.

РЕДАКТИРОВАТЬ:

Другая проблема, на которую указывает rchukh, заключается в том, что <=> в Spark <= 1.6 расширяется до декартового произведения - SPARK-11111

Другие вопросы по тегам