Преобразование Spark DF тоже Pandas DF и другой способ - Производительность
Попытка конвертировать Spark DF с 8-метровыми записями в Pandas DF
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
sourcePandas = srcDF.select("*").toPandas()
Занимает почти 2 минуты
И другой способ от Панд до Spark DF
finalDF = spark.createDataFrame(sourcePandas)
занимает слишком много времени и никогда не заканчивается.
источникПанды
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10 entries, 0 to 9
Data columns (total 42 columns):
CONSIGNMENT_PK 10 non-null int32
CERTIFICATE_NO 10 non-null object
ACTOR_NAME 10 non-null object
GENERATOR_FK 10 non-null int32
TRANSPORTER_FK 10 non-null int32
RECEIVER_FK 10 non-null int32
REC_POST_CODE 0 non-null object
WASTEDESC 10 non-null object
WASTE_FK 10 non-null int32
GEN_LICNUM 0 non-null object
VOLUME 10 non-null int32
MEASURE 10 non-null object
WASTE_TYPE 10 non-null object
WASTE_ADD 0 non-null object
CONTAMINENT1_FK 0 non-null float64
CONTAMINENT2_FK 0 non-null float64
CONTAMINENT3_FK 0 non-null float64
CONTAMINENT4_FK 0 non-null float64
TREATMENT_FK 10 non-null int32
ANZSICODE_FK 10 non-null int32
VEH1_REGNO 10 non-null object
VEH1_LICNO 0 non-null object
VEH2_REGNO 0 non-null object
VEH2_LICNO 0 non-null object
GEN_SIGNEE 0 non-null object
GEN_DATE 10 non-null datetime64[ns]
TRANS_SIGNEE 0 non-null object
TRANS_DATE 10 non-null datetime64[ns]
REC_SIGNEE 0 non-null object
REC_DATE 10 non-null datetime64[ns]
DATECREATED 10 non-null datetime64[ns]
DISCREPANCY 0 non-null object
APPROVAL_NUMBER 0 non-null object
TR_TYPE 10 non-null object
REC_WASTE_FK 10 non-null int32
REC_WASTE_TYPE 10 non-null object
REC_VOLUME 10 non-null int32
REC_MEASURE 10 non-null object
DATE_RECEIVED 10 non-null datetime64[ns]
DATE_SCANNED 0 non-null datetime64[ns]
HAS_IMAGE 10 non-null object
LASTMODIFIED 10 non-null datetime64[ns]
dtypes: datetime64[ns](7), float64(4), int32(10), object(21)
memory usage: 3.0+ KB
srcDF
|-- CONSIGNMENT_PK: integer (nullable = true)
|-- CERTIFICATE_NO: string (nullable = true)
|-- ACTOR_NAME: string (nullable = true)
|-- GENERATOR_FK: integer (nullable = true)
|-- TRANSPORTER_FK: integer (nullable = true)
|-- RECEIVER_FK: integer (nullable = true)
|-- REC_POST_CODE: string (nullable = true)
|-- WASTEDESC: string (nullable = true)
|-- WASTE_FK: integer (nullable = true)
|-- GEN_LICNUM: string (nullable = true)
|-- VOLUME: integer (nullable = true)
|-- MEASURE: string (nullable = true)
|-- WASTE_TYPE: string (nullable = true)
|-- WASTE_ADD: string (nullable = true)
|-- CONTAMINENT1_FK: integer (nullable = true)
|-- CONTAMINENT2_FK: integer (nullable = true)
|-- CONTAMINENT3_FK: integer (nullable = true)
|-- CONTAMINENT4_FK: integer (nullable = true)
|-- TREATMENT_FK: integer (nullable = true)
|-- ANZSICODE_FK: integer (nullable = true)
|-- VEH1_REGNO: string (nullable = true)
|-- VEH1_LICNO: string (nullable = true)
|-- VEH2_REGNO: string (nullable = true)
|-- VEH2_LICNO: string (nullable = true)
|-- GEN_SIGNEE: string (nullable = true)
|-- GEN_DATE: timestamp (nullable = true)
|-- TRANS_SIGNEE: string (nullable = true)
|-- TRANS_DATE: timestamp (nullable = true)
|-- REC_SIGNEE: string (nullable = true)
|-- REC_DATE: timestamp (nullable = true)
|-- DATECREATED: timestamp (nullable = true)
|-- DISCREPANCY: string (nullable = true)
|-- APPROVAL_NUMBER: string (nullable = true)
|-- TR_TYPE: string (nullable = true)
|-- REC_WASTE_FK: integer (nullable = true)
|-- REC_WASTE_TYPE: string (nullable = true)
|-- REC_VOLUME: integer (nullable = true)
|-- REC_MEASURE: string (nullable = true)
|-- DATE_RECEIVED: timestamp (nullable = true)
|-- DATE_SCANNED: timestamp (nullable = true)
|-- HAS_IMAGE: string (nullable = true)
|-- LASTMODIFIED: timestamp (nullable = true)
Размер кластера
1 ответ
Сбор в pandas и повторное распараллеливание с кластером будет иметь высокий уровень памяти примерно в 2 раза выше стоимости хранения pandas DF. Десять строк вашего фрейма данных составляют 3 КБ, поэтому 8 МБ будут ~2,5 ГБ. Удвоение для достижения максимальной отметки приводит нас к ~5G. Память драйвера искры по умолчанию составляет 1 ГБ, что слишком мало для того, что вы хотите сделать, в результате чего JVM перебивает GC:
Нажмите свойство приложения
spark.driver.memory
до 8G и он должен работать.Подумайте, почему вы хотите передать все эти данные водителю. Можете ли вы использовать UDF pandas в GROUPED_MAP?