Spark Структурированная потоковая кафка avro Производитель

У меня есть датафрейм, скажем:

val someDF = Seq(
  (8, "bat"),
  (64, "mouse"),
  (-27, "horse")
).toDF("number", "word")

Я хочу отправить этот фрейм данных в тему kafka, используя сериализацию avro и используя реестр схемы. Я верю, что я почти у цели, но я не могу пройти мимо задачи "Не сериализуемая ошибка". Я понимаю, что есть приемник для kafka, но он не связывается с реестром схемы, что является обязательным требованием.

object Holder extends Serializable{
  def prop(): java.util.Properties = {
    val props = new Properties()
    props.put("schema.registry.url", schemaRegistryURL)
    props.put("key.serializer", classOf[KafkaAvroSerializer].getCanonicalName)
    props.put("value.serializer", classOf[KafkaAvroSerializer].getCanonicalName)
    props.put("schema.registry.url", schemaRegistryURL)
    props.put("bootstrap.servers", brokers)
    props
  }

  def vProps(props: java.util.Properties): kafka.utils.VerifiableProperties = {
    val vProps = new kafka.utils.VerifiableProperties(props)
  vProps
  }

  def messageSchema(vProps: kafka.utils.VerifiableProperties): org.apache.avro.Schema = {
    val ser = new KafkaAvroEncoder(vProps)
    val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(subjectValueName)
    val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)
    messageSchema
  }

  def avroRecord(messageSchema: org.apache.avro.Schema): org.apache.avro.generic.GenericData.Record = {
    val avroRecord = new GenericData.Record(messageSchema)
    avroRecord
  }

  def ProducerRecord(avroRecord:org.apache.avro.generic.GenericData.Record): org.apache.kafka.clients.producer.ProducerRecord[org.apache.avro.generic.GenericRecord,org.apache.avro.generic.GenericRecord] = {
    val record = new ProducerRecord[GenericRecord, GenericRecord](topicWrite, avroRecord)
    record
  }

  def producer(props: java.util.Properties): KafkaProducer[GenericRecord, GenericRecord] = {
    val producer = new KafkaProducer[GenericRecord, GenericRecord](props)
    producer
  }
}

val prod:  (String, String) => String = (
  number: String,
  word: String,
   ) => {
  val prop = Holder.prop()
  val vProps = Holder.vProps(prop)
  val mSchema = Holder.messageSchema(vProps)
  val aRecord = Holder.avroRecord(mSchema)
  aRecord.put("number", number)
  aRecord.put("word", word)
  val record = Holder.ProducerRecord(aRecord)
  val producer = Holder.producer(prop)
  producer.send(record)
  "sent"
}

val prodUDF: org.apache.spark.sql.expressions.UserDefinedFunction =
  udf((
  Number: String,
  word: String,
 ) => prod(number,word))


val testDF = firstDF.withColumn("sent", prodUDF(col("number"), col("word")))

0 ответов

KafkaProducer не сериализуем. Создайте KafkaProducer внутри prod() вместо того, чтобы создавать его снаружи.

Другие вопросы по тегам