Как выполнить объединение двух DataFrames с разным количеством столбцов в спарк?

У меня есть 2 DataFrameS следующим образом:

Источник данных

Мне нужен союз, как это:

unionAll Функция не работает, потому что количество и название столбцов разные.

Как я могу это сделать?

22 ответа

Решение

В Scala нужно просто добавить все отсутствующие столбцы как nulls,

import org.apache.spark.sql.functions._

// let df1 and df2 the Dataframes to merge
val df1 = sc.parallelize(List(
  (50, 2),
  (34, 4)
)).toDF("age", "children")

val df2 = sc.parallelize(List(
  (26, true, 60000.00),
  (32, false, 35000.00)
)).toDF("age", "education", "income")

val cols1 = df1.columns.toSet
val cols2 = df2.columns.toSet
val total = cols1 ++ cols2 // union

def expr(myCols: Set[String], allCols: Set[String]) = {
  allCols.toList.map(x => x match {
    case x if myCols.contains(x) => col(x)
    case _ => lit(null).as(x)
  })
}

df1.select(expr(cols1, total):_*).unionAll(df2.select(expr(cols2, total):_*)).show()

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 50|       2|     null|   null|
| 34|       4|     null|   null|
| 26|    null|     true|60000.0|
| 32|    null|    false|35000.0|
+---+--------+---------+-------+

Обновить

Оба височных DataFrames будет иметь тот же порядок столбцов, потому что мы отображаем через total в обоих случаях.

df1.select(expr(cols1, total):_*).show()
df2.select(expr(cols2, total):_*).show()

+---+--------+---------+------+
|age|children|education|income|
+---+--------+---------+------+
| 50|       2|     null|  null|
| 34|       4|     null|  null|
+---+--------+---------+------+

+---+--------+---------+-------+
|age|children|education| income|
+---+--------+---------+-------+
| 26|    null|     true|60000.0|
| 32|    null|    false|35000.0|
+---+--------+---------+-------+

Spark 3.1+

      df = df1.unionByName(df2, allowMissingColumns=True)

Результаты теста:

      from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

data1=[
(1 , '2016-08-29', 1 , 2, 3),
(2 , '2016-08-29', 1 , 2, 3),
(3 , '2016-08-29', 1 , 2, 3)]
df1 = spark.createDataFrame(data1, ['code' , 'date' , 'A' , 'B', 'C'])
data2=[
(5 , '2016-08-29', 1, 2, 3, 4),
(6 , '2016-08-29', 1, 2, 3, 4),
(7 , '2016-08-29', 1, 2, 3, 4)]
df2 = spark.createDataFrame(data2, ['code' , 'date' , 'B', 'C', 'D', 'E'])

df = df1.unionByName(df2, allowMissingColumns=True)
df.show()

#     +----+----------+----+---+---+----+----+
#     |code|      date|   A|  B|  C|   D|   E|
#     +----+----------+----+---+---+----+----+
#     |   1|2016-08-29|   1|  2|  3|null|null|
#     |   2|2016-08-29|   1|  2|  3|null|null|
#     |   3|2016-08-29|   1|  2|  3|null|null|
#     |   5|2016-08-29|null|  1|  2|   3|   4|
#     |   6|2016-08-29|null|  1|  2|   3|   4|
#     |   7|2016-08-29|null|  1|  2|   3|   4|
#     +----+----------+----+---+---+----+----+

Вот моя версия Python:

from pyspark.sql import SparkSession, HiveContext
from pyspark.sql.functions import lit
from pyspark.sql import Row

def customUnion(df1, df2):
    cols1 = df1.columns
    cols2 = df2.columns
    total_cols = sorted(cols1 + list(set(cols2) - set(cols1)))
    def expr(mycols, allcols):
        def processCols(colname):
            if colname in mycols:
                return colname
            else:
                return lit(None).alias(colname)
        cols = map(processCols, allcols)
        return list(cols)
    appended = df1.select(expr(cols1, total_cols)).union(df2.select(expr(cols2, total_cols)))
    return appended

Вот пример использования:

data = [
    Row(zip_code=58542, dma='MIN'),
    Row(zip_code=58701, dma='MIN'),
    Row(zip_code=57632, dma='MIN'),
    Row(zip_code=58734, dma='MIN')
]

firstDF = spark.createDataFrame(data)

data = [
    Row(zip_code='534', name='MIN'),
    Row(zip_code='353', name='MIN'),
    Row(zip_code='134', name='MIN'),
    Row(zip_code='245', name='MIN')
]

secondDF = spark.createDataFrame(data)

customUnion(firstDF,secondDF).show()

Вот код для Python 3.0 с использованием pyspark:

from pyspark.sql import SQLContext
import pyspark
from pyspark.sql.functions import lit

def __orderDFAndAddMissingCols(df, columnsOrderList, dfMissingFields):
    ''' return ordered dataFrame by the columns order list with null in missing columns '''
    if not dfMissingFields:  #no missing fields for the df
        return df.select(columnsOrderList)
    else:
        columns = []
        for colName in columnsOrderList:
            if colName not in dfMissingFields:
                columns.append(colName)
            else:
                columns.append(lit(None).alias(colName))
        return df.select(columns)

def __addMissingColumns(df, missingColumnNames):
    ''' Add missing columns as null in the end of the columns list '''
    listMissingColumns = []
    for col in missingColumnNames:
        listMissingColumns.append(lit(None).alias(col))

    return df.select(df.schema.names + listMissingColumns)

def __orderAndUnionDFs( leftDF, rightDF, leftListMissCols, rightListMissCols):
    ''' return union of data frames with ordered columns by leftDF. '''
    leftDfAllCols = __addMissingColumns(leftDF, leftListMissCols)
    rightDfAllCols = __orderDFAndAddMissingCols(rightDF, leftDfAllCols.schema.names, rightListMissCols)
    return leftDfAllCols.union(rightDfAllCols)

def unionDFs(leftDF,rightDF):
    ''' Union between two dataFrames, if there is a gap of column fields,
     it will append all missing columns as nulls '''
    # Check for None input
    if leftDF == None:
        raise ValueError('leftDF parameter should not be None')
    if rightDF == None:
        raise ValueError('rightDF parameter should not be None')
        #For data frames with equal columns and order- regular union
    if leftDF.schema.names == rightDF.schema.names:
        return leftDF.union(rightDF)
    else: # Different columns
        #Save dataFrame columns name list as set
        leftDFColList = set(leftDF.schema.names)
        rightDFColList = set(rightDF.schema.names)
        # Diff columns between leftDF and rightDF
        rightListMissCols = list(leftDFColList - rightDFColList)
        leftListMissCols = list(rightDFColList - leftDFColList)
        return __orderAndUnionDFs(leftDF, rightDF, leftListMissCols, rightListMissCols)


if __name__ == '__main__':
    sc = pyspark.SparkContext()
    sqlContext = SQLContext(sc)
    leftDF = sqlContext.createDataFrame( [(1, 2, 11), (3, 4, 12)] , ('a','b','d'))
    rightDF = sqlContext.createDataFrame( [(5, 6 , 9), (7, 8, 10)] , ('b','a','c'))

    unionDF = unionDFs(leftDF,rightDF)
    print(unionDF.select(unionDF.schema.names).show())

Очень простой способ сделать это - select столбцы в одном и том же порядке как из кадров данных, так и из использования unionAll

df1.select('code', 'date', 'A', 'B', 'C', lit(None).alias('D'), lit(None).alias('E'))\
   .unionAll(df2.select('code', 'date', lit(None).alias('A'), 'B', 'C', 'D', 'E'))

Мне почему-то кажется, что большинство ответов на python здесь слишком неуклюжи в их написании, если вы просто используете простые lit(None)- обходной путь (это единственный известный мне способ). В качестве альтернативы это может быть полезно:

# df1 and df2 are assumed to be the given dataFrames from the question

# Get the lacking columns for each dataframe and set them to null in the respective dataFrame.
# First do so for df1...
for column in [column for column in df1.columns if column not in df2.columns]:
    df1 = df1.withColumn(column, lit(None))

# ... and then for df2
for column in [column for column in df2.columns if column not in df1.columns]:
    df2 = df2.withColumn(column, lit(None))


После этого просто сделайте union()ты хотел сделать.
Внимание: если ваш порядок столбцов отличаетсяdf1 а также df2 использовать unionByName()!

result = df1.unionByName(df2)

Вот решение pyspark.

Предполагается, что если поле в df1 отсутствует из df2Затем вы добавляете это пропущенное поле в df2 с нулевыми значениями. Однако также предполагается, что если поле существует в обоих кадрах данных, но тип или обнуляемость поля различны, то эти два кадра данных конфликтуют и не могут быть объединены. В этом случае я поднимаю TypeError,

from pyspark.sql.functions import lit

def harmonize_schemas_and_combine(df_left, df_right):
    left_types = {f.name: f.dataType for f in df_left.schema}
    right_types = {f.name: f.dataType for f in df_right.schema}
    left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
    right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)

    # First go over left-unique fields
    for l_name, l_type, l_nullable in left_fields.difference(right_fields):
        if l_name in right_types:
            r_type = left_types[l_name]
            if l_type != r_type:
                raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s"  % (l_name, l_nullable, not(l_nullable))
        df_right = df_right.withColumn(l_name, lit(None).cast(l_type))

    # Now go over right-unique fields
    for r_name, r_type, r_nullable in right_fields.difference(left_fields):
        if r_name in left_types:
            l_type = right_types[r_name]
            if r_type != l_type:
                raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
        df_left = df_left.withColumn(r_name, lit(None).cast(r_type))    

    # Make sure columns are in the same order
    df_left = df_left.select(df_right.columns)

    return df_left.union(df_right)

Модифицированная версия Альберто Бонсанто, чтобы сохранить исходный порядок столбцов (OP подразумевал, что порядок должен соответствовать исходным таблицам). Так же match часть вызвала предупреждение Intellij.

Вот моя версия:

def unionDifferentTables(df1: DataFrame, df2: DataFrame): DataFrame = {

  val cols1 = df1.columns.toSet
  val cols2 = df2.columns.toSet
  val total = cols1 ++ cols2 // union

  val order = df1.columns ++  df2.columns
  val sorted = total.toList.sortWith((a,b)=> order.indexOf(a) < order.indexOf(b))

  def expr(myCols: Set[String], allCols: List[String]) = {
      allCols.map( {
        case x if myCols.contains(x) => col(x)
        case y => lit(null).as(y)
      })
  }

  df1.select(expr(cols1, sorted): _*).unionAll(df2.select(expr(cols2, sorted): _*))
}

В парке:

df = df1.join(df2, ['each', 'shared', 'col'], how='full')

Эта функция принимает два фрейма данных (df1 и df2) с разными схемами и объединяет их. Сначала нам нужно привести их к той же схеме, добавив все (отсутствующие) столбцы от df1 до df2 и наоборот. Чтобы добавить новый пустой столбец в df, нам нужно указать тип данных.

      import pyspark.sql.functions as F
    
def union_different_schemas(df1, df2):
   # Get a list of all column names in both dfs
   columns_df1 = df1.columns
   columns_df2 = df2.columns
   # Get a list of datatypes of the columns
   data_types_df1 = [i.dataType for i in df1.schema.fields]
   data_types_df2 = [i.dataType for i in df2.schema.fields]
   # We go through all columns in df1 and if they are not in df2, we add 
   # them (and specify the correct datatype too)
   for col, typ in zip(columns_df1, data_types_df1):
      if col not in df2.columns:
         df2 = df2\
            .withColumn(col, F.lit(None).cast(typ))
   # Now df2 has all missing columns from df1, let's do the same for df1
   for col, typ in zip(columns_df2, data_types_df2):
      if col not in df1.columns:
         df1 = df1\
            .withColumn(col, F.lit(None).cast(typ))
   # Now df1 and df2 have the same columns, not necessarily in the same 
   # order, therefore we use unionByName
   combined_df = df1\
      .unionByName(df2)

   return combined_df

У меня была та же проблема, и использование соединения вместо объединения решило мою проблему. Так, например, с python вместо этой строки кода:result = left.union(right), который не будет выполняться для разного количества столбцов, вы должны использовать этот:

result = left.join(right, left.columns if (len(left.columns) < len(right.columns)) else right.columns, "outer")

Обратите внимание, что второй аргумент содержит общие столбцы между двумя DataFrames. Если вы не используете его, в результате появятся дубликаты столбцов, один из которых будет нулевым, а другой - нет. Надеюсь, поможет.

Существует много кратких способов решения этой проблемы с умеренным снижением производительности.

def unionWithDifferentSchema(a: DataFrame, b: DataFrame): DataFrame = {
    sparkSession.read.json(a.toJSON.union(b.toJSON).rdd)
}

Это функция, которая делает свое дело. Использование toJSON для каждого фрейма данных создает json Union. Это сохраняет порядок и тип данных.

Единственный улов в том, что toJSON сравнительно дорогой (однако не так много, вы, вероятно, получите 10-15% замедление). Однако это сохраняет код в чистоте.

Здесь также ответили здесь на версию в Scala, также на версию Pyspark.. ( Spark - объединение / объединение DataFrame с другой схемой (имена столбцов и последовательность) в DataFrame с общей схемой Master) -

Для объединения требуется List of dataframe. При условии, что столбцы с одинаковыми именами во всех dataframe должны иметь одинаковый тип данных.

def unionPro(DFList: List[DataFrame], spark: org.apache.spark.sql.SparkSession): DataFrame = {

    /**
     * This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
     * Creates a Unioned DataFrame
     */

    import spark.implicits._

    val MasterColList: Array[String] = DFList.map(_.columns).reduce((x, y) => (x.union(y))).distinct

    def unionExpr(myCols: Seq[String], allCols: Seq[String]): Seq[org.apache.spark.sql.Column] = {
      allCols.toList.map(x => x match {
        case x if myCols.contains(x) => col(x)
        case _                       => lit(null).as(x)
      })
    }

    // Create EmptyDF , ignoring different Datatype in StructField and treating them same based on Name ignoring cases

    val masterSchema = StructType(DFList.map(_.schema.fields).reduce((x, y) => (x.union(y))).groupBy(_.name.toUpperCase).map(_._2.head).toArray)

    val masterEmptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], masterSchema).select(MasterColList.head, MasterColList.tail: _*)

    DFList.map(df => df.select(unionExpr(df.columns, MasterColList): _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))

  }

Вот образец теста для этого -


    val aDF = Seq(("A", 1), ("B", 2)).toDF("Name", "ID")
    val bDF = Seq(("C", 1, "D1"), ("D", 2, "D2")).toDF("Name", "Sal", "Deptt")
    unionPro(List(aDF, bDF), spark).show

Что дает результат как -

+----+----+----+-----+
|Name|  ID| Sal|Deptt|
+----+----+----+-----+
|   A|   1|null| null|
|   B|   2|null| null|
|   C|null|   1|   D1|
|   D|null|   2|   D2|
+----+----+----+-----+

Моя версия для Java:

    private static Dataset<Row> unionDatasets(Dataset<Row> one, Dataset<Row> another) {
        StructType firstSchema = one.schema();
        List<String> anotherFields = Arrays.asList(another.schema().fieldNames());
        another = balanceDataset(another, firstSchema, anotherFields);
        StructType secondSchema = another.schema();
        List<String> oneFields = Arrays.asList(one.schema().fieldNames());
        one = balanceDataset(one, secondSchema, oneFields);
        return another.unionByName(one);
    }

    private static Dataset<Row> balanceDataset(Dataset<Row> dataset, StructType schema, List<String> fields) {
        for (StructField e : schema.fields()) {
            if (!fields.contains(e.name())) {
                dataset = dataset
                        .withColumn(e.name(),
                                lit(null));
                dataset = dataset.withColumn(e.name(),
                        dataset.col(e.name()).cast(Optional.ofNullable(e.dataType()).orElse(StringType)));
            }
        }
        return dataset;
    }

ПИСПАРК

Версия Scala от Альберто работает отлично. Однако, если вы хотите создать цикл for или какое-то динамическое присваивание переменных, вы можете столкнуться с некоторыми проблемами. Решение поставляется с Pyspark - чистый код:

      from pyspark.sql.functions import *

#defining dataframes
df1 = spark.createDataFrame(
    [
        (1, 'foo','ok'), 
        (2, 'pro','ok')
    ],
    ['id', 'txt','check']
)

df2 = spark.createDataFrame(
    [
        (3, 'yep',13,'mo'), 
        (4, 'bro',11,'re')
        
    ],
    ['id', 'txt','value','more'] 
) 

#retrieving columns
cols1 = df1.columns
cols2 = df2.columns

#getting columns from df1 and df2
total = list(set(cols2) | set(cols1)) 

#defining function for adding nulls (None in case of pyspark)
def addnulls(yourDF): 
  for x in total:
    if not x in yourDF.columns:
      yourDF = yourDF.withColumn(x,lit(None))
  return yourDF

df1 = addnulls(df1)
df2 = addnulls(df2)


#additional sorting for correct unionAll (it concatenates DFs by column number)
df1.select(sorted(df1.columns)).unionAll(df2.select(sorted(df2.columns))).show()

+-----+---+----+---+-----+
|check| id|more|txt|value|
+-----+---+----+---+-----+
|   ok|  1|null|foo| null|
|   ok|  2|null|pro| null|
| null|  3|  mo|yep|   13|
| null|  4|  re|bro|   11|
+-----+---+----+---+-----+
      from functools import reduce
from pyspark.sql import DataFrame
import pyspark.sql.functions as F

def unionAll(*dfs, fill_by=None):
    clmns = {clm.name.lower(): (clm.dataType, clm.name) for df in dfs for clm in df.schema.fields}
    
    dfs = list(dfs)
    for i, df in enumerate(dfs):
        df_clmns = [clm.lower() for clm in df.columns]
        for clm, (dataType, name) in clmns.items():
            if clm not in df_clmns:
                # Add the missing column
                dfs[i] = dfs[i].withColumn(name, F.lit(fill_by).cast(dataType))
    return reduce(DataFrame.unionByName, dfs)
unionAll(df1, df2).show()
  1. Столбцы без учета регистра
  2. Вернет фактический регистр столбца
  3. Поддержка существующих типов данных
  4. Значение по умолчанию можно настроить
  5. Передавать сразу несколько фреймов данных (например, unionAll(df1, df2, df3, ..., df10))

Еще один универсальный метод объединения списка DataFrame.

def unionFrames(dfs: Seq[DataFrame]): DataFrame = {
    dfs match {
      case Nil => session.emptyDataFrame // or throw an exception?
      case x :: Nil => x
      case _ =>
        //Preserving Column order from left to right DF's column order
        val allColumns = dfs.foldLeft(collection.mutable.ArrayBuffer.empty[String])((a, b) => a ++ b.columns).distinct

        val appendMissingColumns = (df: DataFrame) => {
          val columns = df.columns.toSet
          df.select(allColumns.map(c => if (columns.contains(c)) col(c) else lit(null).as(c)): _*)
        }

        dfs.tail.foldLeft(appendMissingColumns(dfs.head))((a, b) => a.union(appendMissingColumns(b)))
    }

Вот еще один:

def unite(df1: DataFrame, df2: DataFrame): DataFrame = {
    val cols1 = df1.columns.toSet
    val cols2 = df2.columns.toSet
    val total = (cols1 ++ cols2).toSeq.sorted
    val expr1 = total.map(c => {
      if (cols1.contains(c)) c else "NULL as " + c
    })
    val expr2 = total.map(c => {
      if (cols2.contains(c)) c else "NULL as " + c
    })
    df1.selectExpr(expr1:_*).union(
      df2.selectExpr(expr2:_*)
    )
}

Объединение и внешнее объединение для конкатенации Pyspark DataFrame. Это работает для нескольких фреймов данных с разными столбцами.

def union_all(*dfs):
    return reduce(ps.sql.DataFrame.unionAll, dfs)

def outer_union_all(*dfs):

    all_cols = set([])
    for df in dfs:
        all_cols |= set(df.columns) 
    all_cols = list(all_cols)
    print(all_cols)

    def expr(cols, all_cols):

        def append_cols(col):
            if col in cols:
                return col
            else:
                return sqlfunc.lit(None).alias(col)

        cols_ = map(append_cols, all_cols)
        return list(cols_)

    union_df = union_all(*[df.select(expr(df.columns, all_cols)) for df in dfs])
    return union_df

Это моя версия pyspark:

from functools import reduce
from pyspark.sql.functions import lit

def concat(dfs):
    # when the dataframes to combine do not have the same order of columns
    # https://datascience.stackexchange.com/a/27231/15325
    return reduce(lambda df1, df2: df1.union(df2.select(df1.columns)), dfs) 

def union_all(dfs):
    columns = reduce(lambda x, y : set(x).union(set(y)), [ i.columns for i in dfs ]  )

    for i in range(len(dfs)):
        d = dfs[i]
        for c in columns:
            if c not in d.columns:
                d = d.withColumn(c, lit(None))
        dfs[i] = d

    return concat(dfs)

Если вы загружаете из файлов, я думаю, вы могли бы просто использовать функцию чтения со списком файлов.

          # file_paths is list of files with different schema
    df = spark.read.option("mergeSchema", "true").json(file_paths)

В результирующем фрейме данных будут объединенные столбцы.

В качестве альтернативы вы можете использовать полное соединение.

list_of_files = ['test1.parquet', 'test2.parquet']

def merged_frames():
  if list_of_files:
    frames = [spark.read.parquet(df.path) for df in list_of_files]
    if frames:
      df = frames[0]
      if frames[1]:
        var = 1
        for element in range(len(frames)-1):
          result_df = df.join(frames[var], 'primary_key', how='full')
          var += 1
    display(result_df)
Другие вопросы по тегам