Как объединить Spark в массив Кафки

В настоящее время у меня есть следующий df

+-------+--------------------+-----+
|    key|          created_at|count|
+-------+--------------------+-----+
|Bullish|[2017-08-06 08:00...|   12|
|Bearish|[2017-08-06 08:00...|    1|
+-------+--------------------+-----+

Я использую следующее для потоковой передачи данных в Кафку

df.selectExpr("CAST(key AS STRING) AS key", "to_json(struct(*)) AS value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092").option("topic","chart3").option("checkpointLocation", "/tmp/checkpoints2")
  .outputMode("complete")
  .start()

Проблема здесь в том, что для каждой строки в DataFrame он будет записывать в Kafka одну за другой. Мой потребитель получит сообщение по одному.

Есть ли способ объединить все строки в массив и поток в Кафку, так что мой потребитель может получить все данные за один раз.

Спасибо за совет.

1 ответ

Мой потребитель получит сообщение по одному.

Не совсем. Это может зависеть от собственности Кафки. Вы можете указать свои свойства и использовать, например:

props.put("batch.size", 16384);

На заднем плане Spark использует обычный кешированный KafkaProducer. Он будет использовать свойства, которые вы предоставите в опциях при отправке запроса.

Смотрите также Java Doc. Знайте, что это может не правильно масштабироваться

Другие вопросы по тегам