Как присоединиться к двум Spark DataFrame и управлять их колонкой общего доступа?

У меня есть 2 DataFrame, как это:

+--+-----------+
|id|some_string|
+--+-----------+
| a|        foo|
| b|        bar|
| c|        egg|
| d|        fog|
+--+-----------+

и это:

+--+-----------+
|id|some_string|
+--+-----------+
| a|        hoi|
| b|        hei|
| c|        hai|
| e|        hui|
+--+-----------+

Я хочу присоединиться к ним, чтобы быть таким:

+--+-----------+
|id|some_string|
+--+-----------+
| a|     foohoi|
| b|     barhei|
| c|     egghai|
| d|        fog|
| e|        hui|
+--+-----------+

итак, колонна some_string с первого кадра данных добавляется в столбец some_string со второго кадра данных. Если я использую

df_join = df1.join(df2,on='id',how='outer')

это вернется

+--+-----------+-----------+
|id|some_string|some_string|
+--+-----------+-----------+
| a|        foo|        hoi|
| b|        bar|        hei|
| c|        egg|        hai|
| d|        fog|       null|
| e|       null|        hui|
+--+-----------+-----------+

Есть ли способ сделать это?

2 ответа

Вам нужно использовать when чтобы добиться правильной конкатенации. Кроме того, как вы использовали outer Присоединиться было почти правильно.

Вы должны проверить, является ли кто-либо из этих двух столбцов Null или же not Null а затем сделать concatenation,

from pyspark.sql.functions import col, when, concat
df1 = sqlContext.createDataFrame([('a','foo'),('b','bar'),('c','egg'),('d','fog')],['id','some_string'])
df2 = sqlContext.createDataFrame([('a','hoi'),('b','hei'),('c','hai'),('e','hui')],['id','some_string'])
df_outer_join=df1.join(df2.withColumnRenamed('some_string','some_string_x'), ['id'], how='outer')
df_outer_join.show()
+---+-----------+-------------+
| id|some_string|some_string_x|
+---+-----------+-------------+
|  e|       null|          hui|
|  d|        fog|         null|
|  c|        egg|          hai|
|  b|        bar|          hei|
|  a|        foo|          hoi|
+---+-----------+-------------+
df_outer_join = df_outer_join.withColumn('some_string_concat',
                                         when(col('some_string').isNotNull() & col('some_string_x').isNotNull(),concat(col('some_string'),col('some_string_x')))
                                         .when(col('some_string').isNull() & col('some_string_x').isNotNull(),col('some_string_x'))
                                         .when(col('some_string').isNotNull() & col('some_string_x').isNull(),col('some_string')))\
                              .drop('some_string','some_string_x')


df_outer_join.show()
+---+------------------+
| id|some_string_concat|
+---+------------------+
|  e|               hui|
|  d|               fog|
|  c|            egghai|
|  b|            barhei|
|  a|            foohoi|
+---+------------------+

Учитывая, что вы хотите выполнить внешнее объединение, вы можете попробовать следующее:

from pyspark.sql.functions import concat, col, lit, when


df_join= df1.join(df2,on='id',how='outer').when(isnull(df1.some_string1), ''). when(isnull(df2.some_string2),'').withColumn('new_column',concat(col('some_string1'),lit(''),col('some_string2'))).select('id','new_column')

(Обратите внимание, что some_string1 и 2 ссылаются на столбцы some_string из фреймов данных df1 и df2. Я бы посоветовал вам называть их по-разному, а не давать одно и то же имя some_string, чтобы вы могли их вызывать)

Другие вопросы по тегам