Переполнение стека при обработке нескольких столбцов с помощью UDF
У меня есть DataFrame
со многими столбцами str
типа, и я хочу применить функцию ко всем этим столбцам, без переименования их имен или добавления дополнительных столбцов, я попытался с помощью for-in
выполнение цикла withColumn
(см. пример ниже), но обычно, когда я запускаю код, он показывает Stack Overflow
(это редко работает), это DataFrame
совсем не большой, у него всего ~15000 записей.
# df is a DataFrame
def lowerCase(string):
return string.strip().lower()
lowerCaseUDF = udf(lowerCase, StringType())
for (columnName, kind) in df.dtypes:
if(kind == "string"):
df = df.withColumn(columnName, lowerCaseUDF(df[columnName]))
df.select("Tipo_unidad").distinct().show()
Полная ошибка очень длинная, поэтому я решил вставить только несколько строк. Но вы можете найти полный след здесь Complete Trace
Py4JJavaError: Произошла ошибка при вызове o516.showString.: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задание 1 на этапе 2.0 не выполнено 4 раза, последний сбой: потерянное задание 1.3 на этапе 2.0 (TID 38, worker2.mcbo.mood.com.ve): java.lang.StackruError at java.io.ObjectInputStream$BlockDataInputStream.readByte(ObjectInputStream.java:2774)
Я думаю, что эта проблема возникает, потому что этот код запускает много заданий (по одному на каждый столбец типа string
), не могли бы вы показать мне другую альтернативу или что я делаю не так?
1 ответ
Попробуйте что-то вроде этого:
from pyspark.sql.functions import col, lower, trim
exprs = [
lower(trim(col(c))).alias(c) if t == "string" else col(c)
for (c, t) in df.dtypes
]
df.select(*exprs)
Этот подход имеет два основных преимущества перед вашим текущим решением:
- это требует только одной проекции (без растущей линии, которая, скорее всего, ответственна за SO) вместо проекции на столбец строки.
- он работает напрямую только с внутренним представлением без передачи данных в Python (
BatchPythonProcessing
).