Как присоединиться к двум Spark DataFrame и управлять их колонкой общего доступа?
У меня есть 2 DataFrame, как это:
+--+-----------+
|id|some_string|
+--+-----------+
| a| foo|
| b| bar|
| c| egg|
| d| fog|
+--+-----------+
и это:
+--+-----------+
|id|some_string|
+--+-----------+
| a| hoi|
| b| hei|
| c| hai|
| e| hui|
+--+-----------+
Я хочу присоединиться к ним, чтобы быть таким:
+--+-----------+
|id|some_string|
+--+-----------+
| a| foohoi|
| b| barhei|
| c| egghai|
| d| fog|
| e| hui|
+--+-----------+
итак, колонна some_string
с первого кадра данных добавляется в столбец some_string
со второго кадра данных. Если я использую
df_join = df1.join(df2,on='id',how='outer')
это вернется
+--+-----------+-----------+
|id|some_string|some_string|
+--+-----------+-----------+
| a| foo| hoi|
| b| bar| hei|
| c| egg| hai|
| d| fog| null|
| e| null| hui|
+--+-----------+-----------+
Есть ли способ сделать это?
2 ответа
Вам нужно использовать when
чтобы добиться правильной конкатенации. Кроме того, как вы использовали outer
Присоединиться было почти правильно.
Вы должны проверить, является ли кто-либо из этих двух столбцов Null
или же not Null
а затем сделать concatenation
,
from pyspark.sql.functions import col, when, concat
df1 = sqlContext.createDataFrame([('a','foo'),('b','bar'),('c','egg'),('d','fog')],['id','some_string'])
df2 = sqlContext.createDataFrame([('a','hoi'),('b','hei'),('c','hai'),('e','hui')],['id','some_string'])
df_outer_join=df1.join(df2.withColumnRenamed('some_string','some_string_x'), ['id'], how='outer')
df_outer_join.show()
+---+-----------+-------------+
| id|some_string|some_string_x|
+---+-----------+-------------+
| e| null| hui|
| d| fog| null|
| c| egg| hai|
| b| bar| hei|
| a| foo| hoi|
+---+-----------+-------------+
df_outer_join = df_outer_join.withColumn('some_string_concat',
when(col('some_string').isNotNull() & col('some_string_x').isNotNull(),concat(col('some_string'),col('some_string_x')))
.when(col('some_string').isNull() & col('some_string_x').isNotNull(),col('some_string_x'))
.when(col('some_string').isNotNull() & col('some_string_x').isNull(),col('some_string')))\
.drop('some_string','some_string_x')
df_outer_join.show()
+---+------------------+
| id|some_string_concat|
+---+------------------+
| e| hui|
| d| fog|
| c| egghai|
| b| barhei|
| a| foohoi|
+---+------------------+
Учитывая, что вы хотите выполнить внешнее объединение, вы можете попробовать следующее:
from pyspark.sql.functions import concat, col, lit, when
df_join= df1.join(df2,on='id',how='outer').when(isnull(df1.some_string1), ''). when(isnull(df2.some_string2),'').withColumn('new_column',concat(col('some_string1'),lit(''),col('some_string2'))).select('id','new_column')
(Обратите внимание, что some_string1 и 2 ссылаются на столбцы some_string из фреймов данных df1 и df2. Я бы посоветовал вам называть их по-разному, а не давать одно и то же имя some_string, чтобы вы могли их вызывать)