pyspark rdd isCheckPointed() является ложным

Я сталкивался с ошибками stackruer, когда я итеративно добавлял более 500 столбцов в мой фрейм данных pyspark. Итак, я включил контрольные точки. Контрольно-пропускные пункты не помогли. Итак, я создал следующее игрушечное приложение, чтобы проверить, правильно ли работают мои контрольные точки. Все, что я делаю в этом примере, это итеративное создание столбцов путем многократного копирования исходного столбца. Я настаиваю, проверяю и считаю каждые 10 итераций. Я заметил, что мой dataframe.rdd.isCheckpointed() всегда возвращает False. Я могу убедиться, что папки контрольных точек действительно создаются и заполняются на диске. Я работаю на Dataproc на Glcoud.

Вот мой код:

from pyspark import SparkContext, SparkConf
from pyspark import StorageLevel
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
import sys

APP_NAME = "isCheckPointWorking"

spark = SparkSession\
    .builder\
    .appName(APP_NAME)\
    .config("spark.sql.crossJoin.enabled","true")\
    .getOrCreate()

sc = SparkContext.getOrCreate()

#set the checkpoint directory
sc.setCheckpointDir('gs://mybucket/checkpointtest/')

#create a spark dataframe with one column containing numbers 1 through 9
df4 = spark.createDataFrame(pd.DataFrame(np.arange(1,10),columns = ["A"]))
df4.show()

#create a list of new columns to be added to the dataframe
numberList = np.arange(0,40) 
colNewList = ['col'+str(x) for x in numberList]

print(colNewList)

iterCount = 0

for colName in colNewList:

    #copy column A in to the new column
    df4 = df4.withColumn(colName,df4.A)

    if (np.mod(iterCount,10) == 0):           
        df4 = df4.persist(StorageLevel.MEMORY_AND_DISK)      

        df4.checkpoint(eager=True)

        df4.count()    
        #checking if underlying RDD is being checkpointed        
        print("is data frame checkpointed "+str(df4.rdd.isCheckpointed()))

    iterCount +=1

Непонятно, почему df4.rdd.isCheckpointed() каждый раз возвращает False, когда я вижу, что папка контрольной точки заполнена. Какие-нибудь мысли?

1 ответ

Решение

Метод контрольной точки возвращает новый набор данных с контрольной точкой, он не изменяет текущий набор данных.

+ Изменить

df4.checkpoint(eager=True)

к

df4 = df4.checkpoint(eager=True)
Другие вопросы по тегам