pyspark rdd isCheckPointed() является ложным
Я сталкивался с ошибками stackruer, когда я итеративно добавлял более 500 столбцов в мой фрейм данных pyspark. Итак, я включил контрольные точки. Контрольно-пропускные пункты не помогли. Итак, я создал следующее игрушечное приложение, чтобы проверить, правильно ли работают мои контрольные точки. Все, что я делаю в этом примере, это итеративное создание столбцов путем многократного копирования исходного столбца. Я настаиваю, проверяю и считаю каждые 10 итераций. Я заметил, что мой dataframe.rdd.isCheckpointed() всегда возвращает False. Я могу убедиться, что папки контрольных точек действительно создаются и заполняются на диске. Я работаю на Dataproc на Glcoud.
Вот мой код:
from pyspark import SparkContext, SparkConf
from pyspark import StorageLevel
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import sys
APP_NAME = "isCheckPointWorking"
spark = SparkSession\
.builder\
.appName(APP_NAME)\
.config("spark.sql.crossJoin.enabled","true")\
.getOrCreate()
sc = SparkContext.getOrCreate()
#set the checkpoint directory
sc.setCheckpointDir('gs://mybucket/checkpointtest/')
#create a spark dataframe with one column containing numbers 1 through 9
df4 = spark.createDataFrame(pd.DataFrame(np.arange(1,10),columns = ["A"]))
df4.show()
#create a list of new columns to be added to the dataframe
numberList = np.arange(0,40)
colNewList = ['col'+str(x) for x in numberList]
print(colNewList)
iterCount = 0
for colName in colNewList:
#copy column A in to the new column
df4 = df4.withColumn(colName,df4.A)
if (np.mod(iterCount,10) == 0):
df4 = df4.persist(StorageLevel.MEMORY_AND_DISK)
df4.checkpoint(eager=True)
df4.count()
#checking if underlying RDD is being checkpointed
print("is data frame checkpointed "+str(df4.rdd.isCheckpointed()))
iterCount +=1
Непонятно, почему df4.rdd.isCheckpointed() каждый раз возвращает False, когда я вижу, что папка контрольной точки заполнена. Какие-нибудь мысли?
1 ответ
Метод контрольной точки возвращает новый набор данных с контрольной точкой, он не изменяет текущий набор данных.
+ Изменить
df4.checkpoint(eager=True)
к
df4 = df4.checkpoint(eager=True)