Преобразование Spark DF тоже Pandas DF и другой способ - Производительность

Попытка конвертировать Spark DF с 8-метровыми записями в Pandas DF

spark.conf.set("spark.sql.execution.arrow.enabled", "true")
sourcePandas = srcDF.select("*").toPandas()

Занимает почти 2 минуты

И другой способ от Панд до Spark DF

finalDF = spark.createDataFrame(sourcePandas)

занимает слишком много времени и никогда не заканчивается.

источникПанды

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 42 columns):
CONSIGNMENT_PK     10 non-null int32
CERTIFICATE_NO     10 non-null object
ACTOR_NAME         10 non-null object
GENERATOR_FK       10 non-null int32
TRANSPORTER_FK     10 non-null int32
RECEIVER_FK        10 non-null int32
REC_POST_CODE      0 non-null object
WASTEDESC          10 non-null object
WASTE_FK           10 non-null int32
GEN_LICNUM         0 non-null object
VOLUME             10 non-null int32
MEASURE            10 non-null object
WASTE_TYPE         10 non-null object
WASTE_ADD          0 non-null object
CONTAMINENT1_FK    0 non-null float64
CONTAMINENT2_FK    0 non-null float64
CONTAMINENT3_FK    0 non-null float64
CONTAMINENT4_FK    0 non-null float64
TREATMENT_FK       10 non-null int32
ANZSICODE_FK       10 non-null int32
VEH1_REGNO         10 non-null object
VEH1_LICNO         0 non-null object
VEH2_REGNO         0 non-null object
VEH2_LICNO         0 non-null object
GEN_SIGNEE         0 non-null object
GEN_DATE           10 non-null datetime64[ns]
TRANS_SIGNEE       0 non-null object
TRANS_DATE         10 non-null datetime64[ns]
REC_SIGNEE         0 non-null object
REC_DATE           10 non-null datetime64[ns]
DATECREATED        10 non-null datetime64[ns]
DISCREPANCY        0 non-null object
APPROVAL_NUMBER    0 non-null object
TR_TYPE            10 non-null object
REC_WASTE_FK       10 non-null int32
REC_WASTE_TYPE     10 non-null object
REC_VOLUME         10 non-null int32
REC_MEASURE        10 non-null object
DATE_RECEIVED      10 non-null datetime64[ns]
DATE_SCANNED       0 non-null datetime64[ns]
HAS_IMAGE          10 non-null object
LASTMODIFIED       10 non-null datetime64[ns]
dtypes: datetime64[ns](7), float64(4), int32(10), object(21)
memory usage: 3.0+ KB

srcDF

|-- CONSIGNMENT_PK: integer (nullable = true)
 |-- CERTIFICATE_NO: string (nullable = true)
 |-- ACTOR_NAME: string (nullable = true)
 |-- GENERATOR_FK: integer (nullable = true)
 |-- TRANSPORTER_FK: integer (nullable = true)
 |-- RECEIVER_FK: integer (nullable = true)
 |-- REC_POST_CODE: string (nullable = true)
 |-- WASTEDESC: string (nullable = true)
 |-- WASTE_FK: integer (nullable = true)
 |-- GEN_LICNUM: string (nullable = true)
 |-- VOLUME: integer (nullable = true)
 |-- MEASURE: string (nullable = true)
 |-- WASTE_TYPE: string (nullable = true)
 |-- WASTE_ADD: string (nullable = true)
 |-- CONTAMINENT1_FK: integer (nullable = true)
 |-- CONTAMINENT2_FK: integer (nullable = true)
 |-- CONTAMINENT3_FK: integer (nullable = true)
 |-- CONTAMINENT4_FK: integer (nullable = true)
 |-- TREATMENT_FK: integer (nullable = true)
 |-- ANZSICODE_FK: integer (nullable = true)
 |-- VEH1_REGNO: string (nullable = true)
 |-- VEH1_LICNO: string (nullable = true)
 |-- VEH2_REGNO: string (nullable = true)
 |-- VEH2_LICNO: string (nullable = true)
 |-- GEN_SIGNEE: string (nullable = true)
 |-- GEN_DATE: timestamp (nullable = true)
 |-- TRANS_SIGNEE: string (nullable = true)
 |-- TRANS_DATE: timestamp (nullable = true)
 |-- REC_SIGNEE: string (nullable = true)
 |-- REC_DATE: timestamp (nullable = true)
 |-- DATECREATED: timestamp (nullable = true)
 |-- DISCREPANCY: string (nullable = true)
 |-- APPROVAL_NUMBER: string (nullable = true)
 |-- TR_TYPE: string (nullable = true)
 |-- REC_WASTE_FK: integer (nullable = true)
 |-- REC_WASTE_TYPE: string (nullable = true)
 |-- REC_VOLUME: integer (nullable = true)
 |-- REC_MEASURE: string (nullable = true)
 |-- DATE_RECEIVED: timestamp (nullable = true)
 |-- DATE_SCANNED: timestamp (nullable = true)
 |-- HAS_IMAGE: string (nullable = true)
 |-- LASTMODIFIED: timestamp (nullable = true)

Размер кластера

https://i.stack.imgur.com/rJp87.png

1 ответ

Сбор в pandas и повторное распараллеливание с кластером будет иметь высокий уровень памяти примерно в 2 раза выше стоимости хранения pandas DF. Десять строк вашего фрейма данных составляют 3 КБ, поэтому 8 МБ будут ~2,5 ГБ. Удвоение для достижения максимальной отметки приводит нас к ~5G. Память драйвера искры по умолчанию составляет 1 ГБ, что слишком мало для того, что вы хотите сделать, в результате чего JVM перебивает GC:

  1. Нажмите свойство приложения spark.driver.memory до 8G и он должен работать.

  2. Подумайте, почему вы хотите передать все эти данные водителю. Можете ли вы использовать UDF pandas в GROUPED_MAP?

Другие вопросы по тегам