Scala: Как преобразовать вложенные / сложные данные в avro, используя avro4s?

У меня есть Scala 2.12 и я импортировал библиотеку avro4s, перейдя по ссылке для моего требования.

По сути, моя схема avro выглядит следующим образом:

{
    "type":"record",
    "name":"SaNative",
    "fields":[
        {
            "name":"header",
            "type":{
                "type":"record",
                "name":"header",
                "fields":[
                    {"name":"businessId","type":"string"},
                    {"name":"batchId","type":"string"},
                    {"name":"sourceSystem","type":"string"},
                    {"name":"secondarySourceSystem","type":"string"},
                    {"name":"sentTo","type":"string"},
                    {"name":"sentBy","type":"string"},
                    {"name":"messageType","type":"string"},
                    {"name":"schemaVersion","type":"string"},
                    {"name":"processing","type":"string"},
                    {"name":"sourceLocation","type":"string"},
                    {"name":"messageId","type":"string"},
                    {"name":"sourceSystemCreationTimestamp","type":"long"}
                ]
            }
        },
        {
            "name":"body",
            "type":{
                "type":"record",
                "name":"pnlData",
                "fields":[
                    {"name":"granularity","type":"string"},
                    {"name":"pnl_type","type":"string"},
                    {"name":"pnl_subtype","type":"string"},
                    {"name":"risk_order","type":"string"},
                    {"name":"market_name","type":"string"},
                    {"name":"riskcategory","type":"string"},
                    {"name":"tenor","type":"string"},
                    {"name":"subcategory","type":"string"},
                    {"name":"product","type":"string"},
                    {"name":"trade_id","type":"string"},
                    {"name":"category","type":"string"},
                    {"name":"currency","type":"string"},
                    {"name":"date","type":"int"},
                    {"name":"book","type":"string"},
                    {"name":"pnl_local","type":"long"},
                    {"name":"pnl_cde","type":"long"},
                    {"name":"pnl_status","type":"string"}
                ]
            }
        }
    ]
}

Итак, у меня есть 3 класса case, созданных как в коде (проверьте код ниже):

Я протестировал схему на основе этих классов, и она выглядит хорошо. Выход из AvroSchema[SaNative]

{"type":"record","name":"SaNative","fields":[{"name":"header","type":{"type":"record","name":"header","fields":[{"name":"businessId","type":"string"},{"name":"batchId","type":"string"},{"name":"sourceSystem","type":"string"},{"name":"secondarySourceSystem","type":"string"},{"name":"sentTo","type":"string"},{"name":"sentBy","type":"string"},{"name":"messageType","type":"string"},{"name":"schemaVersion","type":"string"},{"name":"processing","type":"string"},{"name":"sourceLocation","type":"string"},{"name":"messageId","type":"string"},{"name":"sourceSystemCreationTimestamp","type":"long"}]}},{"name":"body","type":{"type":"record","name":"pnlData","fields":[{"name":"granularity","type":"string"},{"name":"pnl_type","type":"string"},{"name":"pnl_subtype","type":"string"},{"name":"risk_order","type":"string"},{"name":"market_name","type":"string"},{"name":"riskcategory","type":"string"},{"name":"tenor","type":"string"},{"name":"subcategory","type":"string"},{"name":"product","type":"string"},{"name":"trade_id","type":"string"},{"name":"category","type":"string"},{"name":"currency","type":"string"},{"name":"date","type":"int"},{"name":"book","type":"string"},{"name":"pnl_local","type":"double"},{"name":"pnl_cde","type":"double"},{"name":"pnl_status","type":"string"}]}}]}

Итак, генерация схемы хорошая.

Сейчас я создаю необходимые объекты, см. Код.

Является ли создание моей записи правильно в соответствии со схемой?

Потому что, когда я пытаюсь записать файл avro, я получаю исключение нулевого указателя.

Ошибка:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.avro.util.Utf8$2.toUtf8(Utf8.java:123)
    at org.apache.avro.util.Utf8.getBytesFor(Utf8.java:172)
    at org.apache.avro.util.Utf8.<init>(Utf8.java:39)
    at com.sksamuel.avro4s.Encoder$StringEncoder$.encode(Encoder.scala:73)
    at com.sksamuel.avro4s.Encoder$StringEncoder$.encode(Encoder.scala:68)
    at com.sksamuel.avro4s.Encoder$.encodeField(Encoder.scala:401)
    at com.sksamuel.avro4s.Encoder$.encodeFieldLazy(Encoder.scala:379)
    at MyClass$$anon$4$$anon$5.encode(MyClass.scala:90)
    at MyClass$$anon$4$$anon$5.encode(MyClass.scala:90)
    at com.sksamuel.avro4s.Encoder$.encodeField(Encoder.scala:401)
    at com.sksamuel.avro4s.Encoder$.encodeFieldNotLazy(Encoder.scala:373)
    at MyClass$$anon$4.encode(MyClass.scala:90)
    at MyClass$$anon$4.encode(MyClass.scala:90)
    at com.sksamuel.avro4s.AvroDataOutputStream.$anonfun$x$1$2(AvroDataOutputStream.scala:35)
    at com.sksamuel.avro4s.AvroDataOutputStream.$anonfun$x$1$2$adapted(AvroDataOutputStream.scala:34)
    at com.sksamuel.avro4s.AvroDataOutputStream.write(AvroDataOutputStream.scala:46)
    at MyClass$.delayedEndpoint$MyClass$1(MyClass.scala:91)
    at MyClass$delayedInit$body.apply(MyClass.scala:42)
    at scala.Function0.apply$mcV$sp(Function0.scala:34)
    at scala.Function0.apply$mcV$sp$(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App.$anonfun$main$1$adapted(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:388)
    at scala.App.main(App.scala:76)
    at scala.App.main$(App.scala:74)
    at MyClass$.main(MyClass.scala:42)
    at MyClass.main(MyClass.scala)

Код:

//import java.io.File
import com.sksamuel.avro4s.{AvroOutputStream, AvroSchema}
import java.io.File

//case class Person(name: String, age: Int)
//case class Book(title: String, year: Int, owner: Person, authors: Seq[Person])
case class header(businessId: String,
                  batchId: String,
                  sourceSystem: String,
                  secondarySourceSystem: String,
                  sentTo: String,
                  sentBy: String,
                  messageType: String,
                  schemaVersion: String,
                  processing: String,
                  sourceLocation: String,
                  messageId: String,
                  sourceSystemCreationTimestamp: Long
                 )

case class pnlData(granularity: String,
                   pnl_type: String,
                   pnl_subtype: String,
                   risk_order: String,
                   market_name: String,
                   riskcategory: String,
                   tenor: String,
                   subcategory: String,
                   product: String,
                   trade_id: String,
                   category: String,
                   currency: String,
                   date: Int,
                   book: String,
                   pnl_local: Double,
                   pnl_cde: Double,
                   pnl_status: String
                 )

case class SaNative(header: header, body: pnlData)

object MyClass extends App {
  val outFile = "/path/TestScala.avro"
   // val schema = AvroSchema[Book]
  println("Hello, World!")
   // println(schema)


  val head = header(
    businessId="T1",
    batchId=null,
    sourceSystem="TA",
    secondarySourceSystem=null,
    sentTo="SA",
    sentBy="TA",
    messageType=null,
    schemaVersion="1.6",
    processing="RealTime",
    sourceLocation=null,
    messageId="ta_pnl_0002",
    sourceSystemCreationTimestamp=1236472051
  )

  val pnlBody = pnlData(
    granularity="detailed view",
    pnl_type="Daily",
    pnl_subtype="Regular",
    risk_order=null,
    market_name=null,
    riskcategory=null,
    tenor=null,
    subcategory=null,
    product=null,
    trade_id=null,
    category=null,
    currency="cad",
    date= 20181130,
    book="8271",
    pnl_local=997.7899999917,
    pnl_cde=997.8100000024,
    pnl_status="locked"
  )

  val record = SaNative(header = head, body = pnlBody)

  val schema = AvroSchema[SaNative]
  println(schema)
  println(record)

  val os = AvroOutputStream.data[SaNative].to(new File(outFile)).build(schema)
  os.write(record)
  os.flush()
  os.close()

}

По сути, основываясь на имеющейся у меня схеме, я хочу понять, каким должен быть мой конечный объект записи?

ОБНОВЛЕНИЕ:

Исходя из приведенных ниже предложений @Antot и @Daniel, я изменил свой заголовок и класс тела, чтобы использовать Option[String] для всех значений, которые, как ожидается, будут нулевыми. Но все та же проблема.

Я новичок в Scala, поэтому хотел знать, правильно ли я формирую свой объект записи согласно схеме или я что-то там упускаю? Согласно схеме, оно имеет 2 поля типа заголовка записи и pnlData. Кроме того, есть ли какая-либо другая библиотека или способ создать эту простую запись, основанную на моей схеме?

Изменения в случае классов заголовка и данных производятся по следующей схеме и записи. Правильно ли создана следующая запись? Схема (довольно печатная):

{
        "type":"record",
        "name":"SaNative",
        "fields":[
            {"name":"header",
            "type":{
                "type":"record",
                "name":"header",
                "fields":[
                    {"name":"businessId","type":"string"},
                    {"name":"batchId","type":["null","string"]},
                    {"name":"sourceSystem","type":"string"},
                    {"name":"secondarySourceSystem","type":["null","string"]},
                    {"name":"sentTo","type":"string"},
                    {"name":"sentBy","type":"string"},
                    {"name":"messageType","type":["null","string"]},
                    {"name":"schemaVersion","type":"string"},
                    {"name":"processing","type":"string"},
                    {"name":"sourceLocation","type":["null","string"]},
                    {"name":"messageId","type":"string"},
                    {"name":"sourceSystemCreationTimestamp","type":"long"}
                ]
            }
            },
            {"name":"body",
            "type":{
                "type":"record",
                "name":"pnlData",
                "fields":[
                    {"name":"granularity","type":"string"},
                    {"name":"pnl_type","type":"string"},
                    {"name":"pnl_subtype","type":"string"},
                    {"name":"risk_order","type":["null","string"]},
                    {"name":"market_name","type":["null","string"]},
                    {"name":"riskcategory","type":["null","string"]},
                    {"name":"tenor","type":["null","string"]},
                    {"name":"subcategory","type":["null","string"]},
                    {"name":"product","type":["null","string"]},
                    {"name":"trade_id","type":["null","string"]},
                    {"name":"category","type":["null","string"]},
                    {"name":"currency","type":"string"},
                    {"name":"date","type":"int"},
                    {"name":"book","type":"string"},
                    {"name":"pnl_local","type":"double"},
                    {"name":"pnl_cde","type":"double"},
                    {"name":"pnl_status","type":"string"}
                ]
            }
            }
        ]
    }

Запись создана (Довольно напечатано):

SaNative(
        header(
            T1,
            null,
            TA,
            null,
            SA,
            TA,
            null,
            1.6,
            RealTime,
            null,
            ta_pnl_0002,
            1236472051
            ),
        pnlData(
            detailed view,
            Daily,
            Regular,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            null,
            cad,
            20181130,
            8271,
            997.7899999917,
            997.8100000024,
            locked
        )
    )

Пожалуйста, порекомендуйте?

ОБНОВЛЕНИЕ 2:
Я думаю, что проблема с нулями. Ожидается, что записи будут иметь несколько атрибутов как NULL. Поскольку я изменил на Option[String], его значение должно быть None, а не NULL. Я новичок в Scala, поэтому все еще понимаю его типы данных.

Таким образом, изменение значения с нуля на None теперь работает.

Однако у меня все еще есть один вопрос. Если мои атрибуты - Option[String], как он переводится в Avro? Если мое значение None, переводится ли это на Avro null?

0 ответов

Ваша проблема в том, что ваша схема не определяет поля как обнуляемые. Если у вас есть нулевые значения, это должно поддерживаться в схеме. Чтобы сделать это в Avro, вы должны создать "объединение" из двух схем: одна представляет собой схему NULL, а другая - "реальный тип". Например, посмотрите на эту схему.

{
  "type": "record",
  "name": "MyClass",
  "namespace": "com.sksamuel.avro4s",
  "fields": [
    {
      "name": "a",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

Это тип записи com.sksamuel.avro4s.MyClass это имеет одно поле, a, Тогда тип a может быть null или же string, Поэтому при записи записей этого типа вы можете использовать либо ноль, либо строку для поля a,

Теперь вам не нужно создавать эту схему вручную (как вы делали в своем посте). Вы можете использовать AvroSchema макрос, чтобы сделать магию для вас на основе класса case.

val schema = AvroSchema[MyClass],

При использовании этого макроса будет создан обнуляемый союз, если вы определите тип Option[T], Так что вы могли бы сделать,

case class MyClass(a: Option[String])

И вы бы в конечном итоге с той же схемой выше. Если бы вы сделали,

case class MyClass(a: String)

Тогда схема будет:

{
  "type": "record",
  "name": "MyClass",
  "namespace": "com.sksamuel.avro4s",
  "fields": [
    {
      "name": "a",
      "type": "string"
    }
  ]
}

ТЛ; др

Либо создайте схему из класса дел, в котором поле, которое можно обнулять, определено как Option, либо обновите свою ручную схему, чтобы использовать объединение {null, T}.

Другие вопросы по тегам