Добавьте в столбец данных новый столбец с пользовательскими значениями. (Pyspark)
Три значения массива A1, получаемые из некоторой функции -
A1 = [1,2,3,4]
A1 = [5,6,7,8]
A1 = [1,3,4,1]
Мой фрейм данных, в котором я хочу добавить новый столбец со значениями моего массива -
+---+---+-----+
| x1| x2| x3|
+---+---+-----+
| 1| A| 3.0|
| 2| B|-23.0|
| 3| C| -4.0|
+---+---+-----+
Я пробовал вот так (предположим, что 'df' - мой фрейм данных) -
for i in range(0, 2):
df = df.withColumn("x4", array(lit(A1[0]), lit(A1[1]), lit(A1[2]))
Но проблема с этим кодом в том, что он обновляет столбец с последним значением массива 'A1' следующим образом:
+---+---+-----+---------+
| x1| x2| x3| x4|
+---+---+-----+---------+
| 1| A| 3.0|[1,3,4,1]|
| 2| B|-23.0|[1,3,4,1]|
| 3| C| -4.0|[1,3,4,1]|
+---+---+-----+---------+
Но я хочу вот так -
+---+---+-----+---------+
| x1| x2| x3| x4|
+---+---+-----+---------+
| 1| A| 3.0|[1,2,3,4]|
| 2| B|-23.0|[5,6,7,8]|
| 3| C| -4.0|[1,3,4,1]|
+---+---+-----+---------+
Что мне нужно добавить в моем коде?
4 ответа
Итак, после того, как я сломал себе голову, я обнаружил, что этого нельзя сделать с помощью функции cCumn столбца pyspark, поскольку он создаст столбец, но все из одной строки. А также я не могу использовать udf
потому что мой новый столбец не зависит ни от какого предыдущего столбца существующего кадра данных.
Итак, я сделал что-то вроде этого - предположим, вы получаете разные значения массива A1 внутри цикла for (в моем случае это сценарий)
f_array = []
for i in range(0,10):
f_array.extend([(i, A1)])
# Creating a new df for my array.
df1 = spark.createDataFrame(data = f_array, schema = ["id", "x4"])
df1.show()
+---+---------+
| id| x4|
+---+---------+
| 0|[1,2,3,4]|
| 1|[5,6,7,8]|
| 2|[1,3,4,1]|
+---+---------+
# Suppose no columns matches to our df then creating one extra column named `id` as present in our `df1`. This is used for joining both the dataframes.
df = df.withColumn('id', monotonically_increasing_id())
df.show()
+---+---+---+-----+
| id| x1| x2| x3|
+---+---+---+-----+
| 0| 1| A| 3.0|
| 1| 2| B|-23.0|
| 2| 3| C| -4.0|
+---+---+---+-----+
# Now join both the dataframes using common column `id`.
df = df.join(df1, df.id == df1.id).drop(df.id).drop(df1.id)
df.show()
+---+---+---+------------+
| x1| x2| x3| x4|
+---+---+---+------------+
| 1| A| 3|[1, 2, 3, 4]|
| 2| B|-23|[5, 6, 7, 8]|
| 3| C| -4|[1, 3, 4, 1]|
+---+---+---+------------+
Как насчет:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('test').getOrCreate()
df=spark.createDataFrame(data=[(1,'A',3),(2,'B',-23),(3,'C',-4)],schema=['x1','x2','x3'])
+---+---+---+
| x1| x2| x3|
+---+---+---+
| 1| A| 3|
| 2| B|-23|
| 3| C| -4|
+---+---+---+
mydict = {1:[1,2,3,4] , 2:[5,6,7,8], 3:[1,3,4,1]}
def addExtraColumn(df,mydict):
names = df.schema.names
count=1
mylst=[]
for row in df.rdd.collect():
RW=row.asDict()
rowLst=[]
for name in names:
rowLst.append(RW[name])
rowLst.append(mydict[count])
count=count+1
mylst.append(rowLst)
return mylst
newlst = addExtraColumn(df,mydict)
df1 = spark.sparkContext.parallelize(newlst).toDF(['x1','x2','x3','x4'])
df1.show()
+---+---+---+------------+
| x1| x2| x3| x4|
+---+---+---+------------+
| 1| A| 3|[1, 2, 3, 4]|
| 2| B|-23|[5, 6, 7, 8]|
| 3| C| -4|[1, 3, 4, 1]|
+---+---+---+------------+
Глядя на ваш код, я думаю, что значение A1 зависит как минимум от одного из столбцов x1, x2 или x3.
Следовательно, вы не можете определить новые столбцы с помощью A1, но с помощью функции, которая будет принимать в качестве параметров столбцы, необходимые для определения A1.
Это просто предположение, но, может быть, вам нужен словарь, A = {1:[1,2,3,4] , 2:[5,6,7,8], 3:[1,3,4,1],}
и использовать его в UDF с вашим withColumn
,
Это работает:
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName('test').getOrCreate()
df=spark.createDataFrame(data=[(1,'A',3),(2,'B',-23),(3,'C',-4)],schema=['x1','x2','x3'])
+---+---+---+
| x1| x2| x3|
+---+---+---+
| 1| A| 3|
| 2| B|-23|
| 3| C| -4|
+---+---+---+
конвертировать df в список
mylst = df.toPandas().values.tolist()
создать словарь
mydict = {1:[1,2,3,4] , 2:[5,6,7,8], 3:[1,3,4,1]}
добавить список со словарными элементами
count =1
for x in mylst:
x.append(mydict[count])
count = count + 1
преобразовать добавленный список в датафрейм
sc = spark.sparkContext
df1 = sc.parallelize(mylst).toDF(['x1','x2','x3','x4'])
df1.show()
+---+---+---+------------+
| x1| x2| x3| x4|
+---+---+---+------------+
| 1| A| 3|[1, 2, 3, 4]|
| 2| B|-23|[5, 6, 7, 8]|
| 3| C| -4|[1, 3, 4, 1]|
+---+---+---+------------+