Использование аккумулятора с pyspark структурированной потоковой передачей

У меня есть потребитель Kafka, написанный на PySpark, который использует структурированную потоковую передачу.

Как я могу использовать простой аккумулятор в моем потребителе?

Вот фрагмент из моего кода:

spark = SparkSession.builder.appName("myapp").getOrCreate()
lines_df = spark.readStream.format("kafka").options("kafka.bootstrap...").options("subscribe"...).load()
accum = spark.sparkContext.accumulator(0)
func_accum = udf(lambda x: accum.add(1))
res_df = lines_df.select(func.col("key"), func.col("Value"))
query = res_df.writeStream.outputMode(...).start()
query.awaitTermination()

Буду признателен за любой пример того, как запустить и отобразить аккумулятор

Спасибо

0 ответов

Другие вопросы по тегам