Spark Streaming - читайте и пишите в теме Кафки
Я использую Spark Streaming для обработки данных между двумя очередями Kafka, но я не могу найти хороший способ писать на Kafka из Spark. Я попробовал это:
input.foreachRDD(rdd =>
rdd.foreachPartition(partition =>
partition.foreach {
case x: String => {
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
println(x)
val producer = new KafkaProducer[String, String](props)
val message = new ProducerRecord[String, String]("output", null, x)
producer.send(message)
}
}
)
)
и он работает как задумано, но создание нового KafkaProducer для каждого сообщения явно невозможно в реальном контексте, и я пытаюсь обойти это.
Я хотел бы сохранить ссылку на один экземпляр для каждого процесса и обращаться к нему, когда мне нужно отправить сообщение. Как я могу написать Кафке из Spark Streaming?
7 ответов
Мой первый совет - попытаться создать новый экземпляр в foreachPartition и измерить, достаточно ли он быстр для ваших нужд (создание официальной документации предполагает создание тяжелых объектов в foreachPartition).
Другой вариант - использовать пул объектов, как показано в этом примере:
Однако я обнаружил, что это трудно реализовать при использовании контрольных точек.
Другая версия, которая хорошо работает для меня, - это фабрика, описанная в следующем сообщении в блоге, вам просто нужно проверить, обеспечивает ли она достаточный параллелизм для ваших нужд (см. Раздел комментариев):
Да, к сожалению, Spark (1.x, 2.x) не позволяет понять, как эффективно писать в Kafka.
Я бы предложил следующий подход:
- Используйте (и повторно) один
KafkaProducer
экземпляр на процесс исполнителя /JVM.
Вот высокоуровневая настройка для этого подхода:
- Во-первых, вы должны "обернуть" Кафки
KafkaProducer
потому что, как вы упомянули, он не сериализуем. Упаковка позволяет вам отправить его исполнителям. Ключевой идеей здесь является использованиеlazy val
так что вы откладываете создание экземпляра производителя до его первого использования, что фактически является обходным решением, так что вам не нужно беспокоиться оKafkaProducer
не быть сериализуемым. - Вы "отправляете" упакованного производителя каждому исполнителю с помощью широковещательной переменной.
- В рамках вашей реальной логики обработки вы получаете доступ к упакованному производителю через широковещательную переменную и используете ее для записи результатов обработки обратно в Kafka.
Фрагменты кода ниже работают с Spark Streaming начиная с Spark 2.0.
Шаг 1: упаковка KafkaProducer
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord, RecordMetadata}
class MySparkKafkaProducer[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
/* This is the key idea that allows us to work around running into
NotSerializableExceptions. */
lazy val producer = createProducer()
def send(topic: String, key: K, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, key, value))
def send(topic: String, value: V): Future[RecordMetadata] =
producer.send(new ProducerRecord[K, V](topic, value))
}
object MySparkKafkaProducer {
import scala.collection.JavaConversions._
def apply[K, V](config: Map[String, Object]): MySparkKafkaProducer[K, V] = {
val createProducerFunc = () => {
val producer = new KafkaProducer[K, V](config)
sys.addShutdownHook {
// Ensure that, on executor JVM shutdown, the Kafka producer sends
// any buffered messages to Kafka before shutting down.
producer.close()
}
producer
}
new MySparkKafkaProducer(createProducerFunc)
}
def apply[K, V](config: java.util.Properties): MySparkKafkaProducer[K, V] = apply(config.toMap)
}
Шаг 2: Используйте широковещательную переменную, чтобы дать каждому исполнителю собственную оболочку KafkaProducer
пример
import org.apache.kafka.clients.producer.ProducerConfig
val ssc: StreamingContext = {
val sparkConf = new SparkConf().setAppName("spark-streaming-kafka-example").setMaster("local[2]")
new StreamingContext(sparkConf, Seconds(1))
}
ssc.checkpoint("checkpoint-directory")
val kafkaProducer: Broadcast[MySparkKafkaProducer[Array[Byte], String]] = {
val kafkaProducerConfig = {
val p = new Properties()
p.setProperty("bootstrap.servers", "broker1:9092")
p.setProperty("key.serializer", classOf[ByteArraySerializer].getName)
p.setProperty("value.serializer", classOf[StringSerializer].getName)
p
}
ssc.sparkContext.broadcast(MySparkKafkaProducer[Array[Byte], String](kafkaProducerConfig))
}
Шаг 3: Пишите из Spark Streaming в Kafka, повторно используя ту же упаковку KafkaProducer
экземпляр (для каждого исполнителя)
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.RecordMetadata
val stream: DStream[String] = ???
stream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
val metadata: Stream[Future[RecordMetadata]] = partitionOfRecords.map { record =>
kafkaProducer.value.send("my-output-topic", record)
}.toStream
metadata.foreach { metadata => metadata.get() }
}
}
Надеюсь это поможет.
С искрой>= 2.2
Операции чтения и записи возможны на Kafka с использованием API структурированной потоковой передачи.
Построить стрим из темы Кафка
// Subscribe to a topic and read messages from the earliest to latest offsets
val ds= spark
.readStream // use `read` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("subscribe", "source-topic1")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load()
Прочитайте ключ и значение и примените схему для обоих, для простоты мы делаем преобразование их обоих в String
тип.
val dsStruc = ds.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
поскольку dsStruc
есть схема, она принимает все операции типа SQL, такие как filter
, agg
, select
.. и т. д.
Написать стрим в тему Кафки
dsStruc
.writeStream // use `write` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("topic", "target-topic1")
.start()
Дополнительная конфигурация для интеграции Kafka для чтения или записи
Ключевые артефакты для добавления в приложение
"org.apache.spark" % "spark-core_2.11" % 2.2.0,
"org.apache.spark" % "spark-streaming_2.11" % 2.2.0,
"org.apache.spark" % "spark-sql-kafka-0-10_2.11" % 2.2.0,
Существует потоковый Kafka Writer, поддерживаемый Cloudera (на самом деле произошедший от Spark JIRA [1]). Он в основном создает производителя на раздел, который амортизирует время, затрачиваемое на создание "тяжелых" объектов над (мы надеемся, большим) набором элементов.
Писатель может быть найден здесь: https://github.com/cloudera/spark-kafka-writer
У меня была такая же проблема, и я нашел этот пост.
Автор решает проблему, создавая 1 продюсера на исполнителя. Вместо того, чтобы отправлять самого продюсера, он посылает только "рецепт", как создать продюсера в исполнителе путем его трансляции.
val kafkaSink = sparkContext.broadcast(KafkaSink(conf))
Он использует обертку, которая лениво создает производителя:
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, value: String): Unit = producer.send(new ProducerRecord(topic, value))
}
object KafkaSink {
def apply(config: Map[String, Object]): KafkaSink = {
val f = () => {
val producer = new KafkaProducer[String, String](config)
sys.addShutdownHook {
producer.close()
}
producer
}
new KafkaSink(f)
}
}
Оболочка является сериализуемой, потому что производитель Kafka инициализируется непосредственно перед первым использованием на исполнителе. Драйвер сохраняет ссылку на оболочку, а оболочка отправляет сообщения, используя производителя каждого исполнителя:
dstream.foreachRDD { rdd =>
rdd.foreach { message =>
kafkaSink.value.send("topicName", message)
}
}
Почему это невозможно? По сути, каждый раздел каждого RDD будет работать независимо (и вполне может работать на другом узле кластера), поэтому вам придется переделывать соединение (и любую синхронизацию) в начале задачи каждого раздела. Если накладные расходы слишком велики, вы должны увеличить размер пакета в вашем StreamingContext
до тех пор, пока это не станет приемлемым (очевидно, что это требует затрат времени).
(Если вы не обрабатываете тысячи сообщений в каждом разделе, уверены ли вы, что вообще нуждаетесь в потоковом воспроизведении? Вы лучше справились бы с автономным приложением?)
Это может быть то, что вы хотите сделать. Вы в основном создаете одного производителя для каждого раздела записей.
input.foreachRDD(rdd =>
rdd.foreachPartition(
partitionOfRecords =>
{
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
partitionOfRecords.foreach
{
case x:String=>{
println(x)
val message=new ProducerRecord[String, String]("output",null,x)
producer.send(message)
}
}
})
)
надеюсь, это поможет
Со Spark < 2.2
Поскольку нет прямого способа писать сообщения в Kafka из Spark Streaming
Создайте KafkaSinkWritter
import java.util.Properties
import org.apache.kafka.clients.producer._
import org.apache.spark.sql.ForeachWriter
class KafkaSink(topic:String, servers:String) extends ForeachWriter[(String, String)] {
val kafkaProperties = new Properties()
kafkaProperties.put("bootstrap.servers", servers)
kafkaProperties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
kafkaProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val results = new scala.collection.mutable.HashMap[String, String]
var producer: KafkaProducer[String, String] = _
def open(partitionId: Long,version: Long): Boolean = {
producer = new KafkaProducer(kafkaProperties)
true
}
def process(value: (String, String)): Unit = {
producer.send(new ProducerRecord(topic, value._1 + ":" + value._2))
}
def close(errorOrNull: Throwable): Unit = {
producer.close()
}
}
Пишите сообщения с помощью SinkWriter
val topic = "<topic2>"
val brokers = "<server:ip>"
val writer = new KafkaSink(topic, brokers)
val query =
streamingSelectDF
.writeStream
.foreach(writer)
.outputMode("update")
.trigger(ProcessingTime("25 seconds"))
.start()
Ссылка на ссылку