Использование аккумулятора с pyspark структурированной потоковой передачей
У меня есть потребитель Kafka, написанный на PySpark, который использует структурированную потоковую передачу.
Как я могу использовать простой аккумулятор в моем потребителе?
Вот фрагмент из моего кода:
spark = SparkSession.builder.appName("myapp").getOrCreate()
lines_df = spark.readStream.format("kafka").options("kafka.bootstrap...").options("subscribe"...).load()
accum = spark.sparkContext.accumulator(0)
func_accum = udf(lambda x: accum.add(1))
res_df = lines_df.select(func.col("key"), func.col("Value"))
query = res_df.writeStream.outputMode(...).start()
query.awaitTermination()
Буду признателен за любой пример того, как запустить и отобразить аккумулятор
Спасибо