Как объединить Spark в массив Кафки
В настоящее время у меня есть следующий df
+-------+--------------------+-----+
| key| created_at|count|
+-------+--------------------+-----+
|Bullish|[2017-08-06 08:00...| 12|
|Bearish|[2017-08-06 08:00...| 1|
+-------+--------------------+-----+
Я использую следующее для потоковой передачи данных в Кафку
df.selectExpr("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092").option("topic","chart3").option("checkpointLocation", "/tmp/checkpoints2")
.outputMode("complete")
.start()
Проблема здесь в том, что для каждой строки в DataFrame он будет записывать в Kafka одну за другой. Мой потребитель получит сообщение по одному.
Есть ли способ объединить все строки в массив и поток в Кафку, так что мой потребитель может получить все данные за один раз.
Спасибо за совет.
1 ответ
Мой потребитель получит сообщение по одному.
Не совсем. Это может зависеть от собственности Кафки. Вы можете указать свои свойства и использовать, например:
props.put("batch.size", 16384);
На заднем плане Spark использует обычный кешированный KafkaProducer. Он будет использовать свойства, которые вы предоставите в опциях при отправке запроса.
Смотрите также Java Doc. Знайте, что это может не правильно масштабироваться