Как разбить Вектор на столбцы - используя PySpark

Контекст: у меня есть DataFrame с 2 столбцами: слово и вектор. Где тип столбца "вектор" VectorUDT,


word    |  vector

assert  | [435,323,324,212...]

И я хочу получить это:

word   |  v1 | v2  | v3 | v4 | v5 | v6 ......

assert | 435 | 5435| 698| 356|....


Как разделить столбец с векторами на несколько столбцов для каждого измерения с помощью pyspark?

заранее спасибо

Одним из возможных подходов является преобразование в и из RDD:

from pyspark.ml.linalg import Vectors

df = sc.parallelize([
    ("assert", Vectors.dense([1, 2, 3])),
    ("require", Vectors.sparse(3, {1: 2}))
]).toDF(["word", "vector"])

def extract(row):
    return (row.word, ) + tuple(row.vector.toArray().tolist())

df.rdd.map(extract).toDF(["word"])  # Vector values will be named _2, _3, ...

## +-------+---+---+---+
## |   word| _2| _3| _4|
## +-------+---+---+---+
## | assert|1.0|2.0|3.0|
## |require|0.0|2.0|0.0|
## +-------+---+---+---+

Альтернативным решением было бы создать UDF:

from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType

def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    return udf(to_array_, ArrayType(DoubleType()))(col)

    .withColumn("xs", to_array(col("vector")))
    .select(["word"] + [col("xs")[i] for i in range(3)]))

## +-------+-----+-----+-----+
## |   word|xs[0]|xs[1]|xs[2]|
## +-------+-----+-----+-----+
## | assert|  1.0|  2.0|  3.0|
## |require|  0.0|  2.0|  0.0|
## +-------+-----+-----+-----+

Для эквивалента Scala см. Spark Scala: Как преобразовать Dataframe[вектор] в DataFrame [f1: Double,..., fn: Double)].

Чтобы разделить rawPrediction или probability столбцы, созданные после обучения модели PySpark ML в столбцы Pandas, можно разделить следующим образом:

your_pandas_df['probability'].apply(lambda x: pd.Series(x.toArray()))

Гораздо быстрее использовать i_th udf из /questions/6458931/kak-poluchit-dostup-k-elementu-stolbtsa-vectorudt-v-dataframe-spark

Функция извлечения, указанная в решении от zero323 выше, использует toList, который создает объект списка Python, заполняет его объектами Python с плавающей запятой, находит нужный элемент, просматривая список, который затем необходимо преобразовать обратно в java double; повторяется для каждой строки. Использование rdd намного медленнее, чем udf to_array, который также вызывает toList, но оба они намного медленнее, чем udf, который позволяет SparkSQL выполнять большую часть работы.

Код синхронизации, сравнивающий извлечение rdd и to_array udf, предложенный здесь, с i_th udf из 3955864:

from pyspark.context import SparkContext
from pyspark.sql import Row, SQLContext, SparkSession
from pyspark.sql.functions import lit, udf, col
from pyspark.sql.types import ArrayType, DoubleType
import pyspark.sql.dataframe
from pyspark.sql.functions import pandas_udf, PandasUDFType

sc = SparkContext('local[4]', 'FlatTestTime')

spark = SparkSession(sc)
spark.conf.set("spark.sql.execution.arrow.enabled", True)

from pyspark.ml.linalg import Vectors

# copy the two rows in the test dataframe a bunch of times,
# make this small enough for testing, or go for "big data" and be prepared to wait
REPS = 20000

df = sc.parallelize([
    ("assert", Vectors.dense([1, 2, 3]), 1, Vectors.dense([4.1, 5.1])),
    ("require", Vectors.sparse(3, {1: 2}), 2, Vectors.dense([6.2, 7.2])),
] * REPS).toDF(["word", "vector", "more", "vorpal"])

def extract(row):
    return (row.word, ) + tuple(row.vector.toArray().tolist(),) + (row.more,) + tuple(row.vorpal.toArray().tolist(),)

def test_extract():
    return df.rdd.map(extract).toDF(['word', 'vector__0', 'vector__1', 'vector__2', 'more', 'vorpal__0', 'vorpal__1'])

def to_array(col):
    def to_array_(v):
        return v.toArray().tolist()
    return udf(to_array_, ArrayType(DoubleType()))(col)

def test_to_array():
    df_to_array = df.withColumn("xs", to_array(col("vector"))) \
        .select(["word"] + [col("xs")[i] for i in range(3)] + ["more", "vorpal"]) \
        .withColumn("xx", to_array(col("vorpal"))) \
        .select(["word"] + ["xs[{}]".format(i) for i in range(3)] + ["more"] + [col("xx")[i] for i in range(2)])
    return df_to_array

# pack up to_array into a tidy function
def flatten(df, vector, vlen):
    fieldNames = df.schema.fieldNames()
    if vector in fieldNames:
        names = []
        for fieldname in fieldNames:
            if fieldname == vector:
                names.extend([col(vector)[i] for i in range(vlen)])
        return df.withColumn(vector, to_array(col(vector)))\
        return df

def test_flatten():
    dflat = flatten(df, "vector", 3)
    dflat2 = flatten(dflat, "vorpal", 2)
    return dflat2

def ith_(v, i):
        return float(v[i])
    except ValueError:
        return None

ith = udf(ith_, DoubleType())

select = ["word"]
select.extend([ith("vector", lit(i)) for i in range(3)])
select.extend([ith("vorpal", lit(i)) for i in range(2)])

# %% timeit ...
def test_ith():
    return df.select(select)

if __name__ == '__main__':
    import timeit

    # make sure these work as intended

                       setup="from __main__ import test_ith",
                       setup="from __main__ import test_flatten",
                       setup="from __main__ import test_to_array",
                       setup="from __main__ import test_extract",

Полученные результаты:

i_th         0.05964796099999958
flatten      0.4842299350000001
to_array     0.42978780299999997
extract      2.9254476840000017
def splitVecotr(df, new_features=['f1','f2']):
schema = df.schema
cols = df.columns

for col in new_features: # new_features should be the same length as vector column length
    schema = schema.add(col,DoubleType(),True)

return spark.createDataFrame(df.rdd.map(lambda row: [row[i] for i in cols]+row.features.tolist()), schema)

Функция превращает векторный столбец объекта в отдельные столбцы.

