Переполнение стека при обработке нескольких столбцов с помощью UDF

У меня есть DataFrame со многими столбцами str типа, и я хочу применить функцию ко всем этим столбцам, без переименования их имен или добавления дополнительных столбцов, я попытался с помощью for-in выполнение цикла withColumn (см. пример ниже), но обычно, когда я запускаю код, он показывает Stack Overflow (это редко работает), это DataFrame совсем не большой, у него всего ~15000 записей.

# df is a DataFrame
def lowerCase(string):
    return string.strip().lower()

lowerCaseUDF = udf(lowerCase, StringType())

for (columnName, kind) in df.dtypes:
    if(kind == "string"):
        df = df.withColumn(columnName, lowerCaseUDF(df[columnName]))

df.select("Tipo_unidad").distinct().show()

Полная ошибка очень длинная, поэтому я решил вставить только несколько строк. Но вы можете найти полный след здесь Complete Trace

Py4JJavaError: Произошла ошибка при вызове o516.showString.: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задание 1 на этапе 2.0 не выполнено 4 раза, последний сбой: потерянное задание 1.3 на этапе 2.0 (TID 38, worker2.mcbo.mood.com.ve): java.lang.StackruError at java.io.ObjectInputStream$BlockDataInputStream.readByte(ObjectInputStream.java:2774)

Я думаю, что эта проблема возникает, потому что этот код запускает много заданий (по одному на каждый столбец типа string), не могли бы вы показать мне другую альтернативу или что я делаю не так?

1 ответ

Решение

Попробуйте что-то вроде этого:

from pyspark.sql.functions import col, lower, trim

exprs = [
    lower(trim(col(c))).alias(c) if t == "string" else col(c) 
    for (c, t) in df.dtypes
]

df.select(*exprs)

Этот подход имеет два основных преимущества перед вашим текущим решением:

  • это требует только одной проекции (без растущей линии, которая, скорее всего, ответственна за SO) вместо проекции на столбец строки.
  • он работает напрямую только с внутренним представлением без передачи данных в Python (BatchPythonProcessing).
Другие вопросы по тегам