Как установить setCheckpoint в pyspark
Я не знаю много искры. В верхней части кода у меня есть
from pysaprk.sql import SparkSession
import pyspark.sql.function as f
spark = SparkSession.bulder.appName(‘abc’).getOrCreate()
H = sqlContext.read.parquet(‘path to hdfs file’)
H имеет около 30 миллионов записей и будет использоваться в цикле. Так я написала
H.persist().count()
У меня есть список из 50 строк L = [s1,s2,…,s50]
каждый из которых используется для построения небольшого фрейма данных из H, который должен располагаться друг над другом. Я создал пустой фрейм данных Z
schema = StructType([define the schema here])
Z = spark.createDataFrame([],schema)
Затем идет цикл
for st in L:
K = process H using st
Z = Z.union(H)
где К имеет максимум 20 рядов. Когда L имеет только 2 или 3 элемента, этот код работает. Но для длины L = 50 это никогда не заканчивается. Сегодня я узнал, что я могу использовать контрольные точки. Поэтому я создал путь hadoop и прямо над тем местом, где начинается цикл, я написал:
SparkContext.setCheckpointDir(dirName=‘path/to/checkpoint/dir’)
Но я получаю следующую ошибку: missing 1 required positional argument: ‘self’
, Мне нужно знать, как исправить ошибку и как изменить цикл, чтобы включить контрольную точку.
1 ответ
Создать объект для SparkContext
и тогда вам не нужно указывать self
параметр. Также удалите имя параметра, который не нужен.
Код как ниже работает:
from pyspark import SparkConf
from pyspark.context import SparkContext
sc = SparkContext.getOrCreate(SparkConf())
sc.setCheckpointDir(‘path/to/checkpoint/dir’)