Scala: Как преобразовать вложенные / сложные данные в avro, используя avro4s?
У меня есть Scala 2.12 и я импортировал библиотеку avro4s, перейдя по ссылке для моего требования.
По сути, моя схема avro выглядит следующим образом:
{
"type":"record",
"name":"SaNative",
"fields":[
{
"name":"header",
"type":{
"type":"record",
"name":"header",
"fields":[
{"name":"businessId","type":"string"},
{"name":"batchId","type":"string"},
{"name":"sourceSystem","type":"string"},
{"name":"secondarySourceSystem","type":"string"},
{"name":"sentTo","type":"string"},
{"name":"sentBy","type":"string"},
{"name":"messageType","type":"string"},
{"name":"schemaVersion","type":"string"},
{"name":"processing","type":"string"},
{"name":"sourceLocation","type":"string"},
{"name":"messageId","type":"string"},
{"name":"sourceSystemCreationTimestamp","type":"long"}
]
}
},
{
"name":"body",
"type":{
"type":"record",
"name":"pnlData",
"fields":[
{"name":"granularity","type":"string"},
{"name":"pnl_type","type":"string"},
{"name":"pnl_subtype","type":"string"},
{"name":"risk_order","type":"string"},
{"name":"market_name","type":"string"},
{"name":"riskcategory","type":"string"},
{"name":"tenor","type":"string"},
{"name":"subcategory","type":"string"},
{"name":"product","type":"string"},
{"name":"trade_id","type":"string"},
{"name":"category","type":"string"},
{"name":"currency","type":"string"},
{"name":"date","type":"int"},
{"name":"book","type":"string"},
{"name":"pnl_local","type":"long"},
{"name":"pnl_cde","type":"long"},
{"name":"pnl_status","type":"string"}
]
}
}
]
}
Итак, у меня есть 3 класса case, созданных как в коде (проверьте код ниже):
Я протестировал схему на основе этих классов, и она выглядит хорошо. Выход из AvroSchema[SaNative]
{"type":"record","name":"SaNative","fields":[{"name":"header","type":{"type":"record","name":"header","fields":[{"name":"businessId","type":"string"},{"name":"batchId","type":"string"},{"name":"sourceSystem","type":"string"},{"name":"secondarySourceSystem","type":"string"},{"name":"sentTo","type":"string"},{"name":"sentBy","type":"string"},{"name":"messageType","type":"string"},{"name":"schemaVersion","type":"string"},{"name":"processing","type":"string"},{"name":"sourceLocation","type":"string"},{"name":"messageId","type":"string"},{"name":"sourceSystemCreationTimestamp","type":"long"}]}},{"name":"body","type":{"type":"record","name":"pnlData","fields":[{"name":"granularity","type":"string"},{"name":"pnl_type","type":"string"},{"name":"pnl_subtype","type":"string"},{"name":"risk_order","type":"string"},{"name":"market_name","type":"string"},{"name":"riskcategory","type":"string"},{"name":"tenor","type":"string"},{"name":"subcategory","type":"string"},{"name":"product","type":"string"},{"name":"trade_id","type":"string"},{"name":"category","type":"string"},{"name":"currency","type":"string"},{"name":"date","type":"int"},{"name":"book","type":"string"},{"name":"pnl_local","type":"double"},{"name":"pnl_cde","type":"double"},{"name":"pnl_status","type":"string"}]}}]}
Итак, генерация схемы хорошая.
Сейчас я создаю необходимые объекты, см. Код.
Является ли создание моей записи правильно в соответствии со схемой?
Потому что, когда я пытаюсь записать файл avro, я получаю исключение нулевого указателя.
Ошибка:
Exception in thread "main" java.lang.NullPointerException
at org.apache.avro.util.Utf8$2.toUtf8(Utf8.java:123)
at org.apache.avro.util.Utf8.getBytesFor(Utf8.java:172)
at org.apache.avro.util.Utf8.<init>(Utf8.java:39)
at com.sksamuel.avro4s.Encoder$StringEncoder$.encode(Encoder.scala:73)
at com.sksamuel.avro4s.Encoder$StringEncoder$.encode(Encoder.scala:68)
at com.sksamuel.avro4s.Encoder$.encodeField(Encoder.scala:401)
at com.sksamuel.avro4s.Encoder$.encodeFieldLazy(Encoder.scala:379)
at MyClass$$anon$4$$anon$5.encode(MyClass.scala:90)
at MyClass$$anon$4$$anon$5.encode(MyClass.scala:90)
at com.sksamuel.avro4s.Encoder$.encodeField(Encoder.scala:401)
at com.sksamuel.avro4s.Encoder$.encodeFieldNotLazy(Encoder.scala:373)
at MyClass$$anon$4.encode(MyClass.scala:90)
at MyClass$$anon$4.encode(MyClass.scala:90)
at com.sksamuel.avro4s.AvroDataOutputStream.$anonfun$x$1$2(AvroDataOutputStream.scala:35)
at com.sksamuel.avro4s.AvroDataOutputStream.$anonfun$x$1$2$adapted(AvroDataOutputStream.scala:34)
at com.sksamuel.avro4s.AvroDataOutputStream.write(AvroDataOutputStream.scala:46)
at MyClass$.delayedEndpoint$MyClass$1(MyClass.scala:91)
at MyClass$delayedInit$body.apply(MyClass.scala:42)
at scala.Function0.apply$mcV$sp(Function0.scala:34)
at scala.Function0.apply$mcV$sp$(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App.$anonfun$main$1$adapted(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:388)
at scala.App.main(App.scala:76)
at scala.App.main$(App.scala:74)
at MyClass$.main(MyClass.scala:42)
at MyClass.main(MyClass.scala)
Код:
//import java.io.File
import com.sksamuel.avro4s.{AvroOutputStream, AvroSchema}
import java.io.File
//case class Person(name: String, age: Int)
//case class Book(title: String, year: Int, owner: Person, authors: Seq[Person])
case class header(businessId: String,
batchId: String,
sourceSystem: String,
secondarySourceSystem: String,
sentTo: String,
sentBy: String,
messageType: String,
schemaVersion: String,
processing: String,
sourceLocation: String,
messageId: String,
sourceSystemCreationTimestamp: Long
)
case class pnlData(granularity: String,
pnl_type: String,
pnl_subtype: String,
risk_order: String,
market_name: String,
riskcategory: String,
tenor: String,
subcategory: String,
product: String,
trade_id: String,
category: String,
currency: String,
date: Int,
book: String,
pnl_local: Double,
pnl_cde: Double,
pnl_status: String
)
case class SaNative(header: header, body: pnlData)
object MyClass extends App {
val outFile = "/path/TestScala.avro"
// val schema = AvroSchema[Book]
println("Hello, World!")
// println(schema)
val head = header(
businessId="T1",
batchId=null,
sourceSystem="TA",
secondarySourceSystem=null,
sentTo="SA",
sentBy="TA",
messageType=null,
schemaVersion="1.6",
processing="RealTime",
sourceLocation=null,
messageId="ta_pnl_0002",
sourceSystemCreationTimestamp=1236472051
)
val pnlBody = pnlData(
granularity="detailed view",
pnl_type="Daily",
pnl_subtype="Regular",
risk_order=null,
market_name=null,
riskcategory=null,
tenor=null,
subcategory=null,
product=null,
trade_id=null,
category=null,
currency="cad",
date= 20181130,
book="8271",
pnl_local=997.7899999917,
pnl_cde=997.8100000024,
pnl_status="locked"
)
val record = SaNative(header = head, body = pnlBody)
val schema = AvroSchema[SaNative]
println(schema)
println(record)
val os = AvroOutputStream.data[SaNative].to(new File(outFile)).build(schema)
os.write(record)
os.flush()
os.close()
}
По сути, основываясь на имеющейся у меня схеме, я хочу понять, каким должен быть мой конечный объект записи?
ОБНОВЛЕНИЕ:
Исходя из приведенных ниже предложений @Antot и @Daniel, я изменил свой заголовок и класс тела, чтобы использовать Option[String] для всех значений, которые, как ожидается, будут нулевыми. Но все та же проблема.
Я новичок в Scala, поэтому хотел знать, правильно ли я формирую свой объект записи согласно схеме или я что-то там упускаю? Согласно схеме, оно имеет 2 поля типа заголовка записи и pnlData. Кроме того, есть ли какая-либо другая библиотека или способ создать эту простую запись, основанную на моей схеме?
Изменения в случае классов заголовка и данных производятся по следующей схеме и записи. Правильно ли создана следующая запись? Схема (довольно печатная):
{
"type":"record",
"name":"SaNative",
"fields":[
{"name":"header",
"type":{
"type":"record",
"name":"header",
"fields":[
{"name":"businessId","type":"string"},
{"name":"batchId","type":["null","string"]},
{"name":"sourceSystem","type":"string"},
{"name":"secondarySourceSystem","type":["null","string"]},
{"name":"sentTo","type":"string"},
{"name":"sentBy","type":"string"},
{"name":"messageType","type":["null","string"]},
{"name":"schemaVersion","type":"string"},
{"name":"processing","type":"string"},
{"name":"sourceLocation","type":["null","string"]},
{"name":"messageId","type":"string"},
{"name":"sourceSystemCreationTimestamp","type":"long"}
]
}
},
{"name":"body",
"type":{
"type":"record",
"name":"pnlData",
"fields":[
{"name":"granularity","type":"string"},
{"name":"pnl_type","type":"string"},
{"name":"pnl_subtype","type":"string"},
{"name":"risk_order","type":["null","string"]},
{"name":"market_name","type":["null","string"]},
{"name":"riskcategory","type":["null","string"]},
{"name":"tenor","type":["null","string"]},
{"name":"subcategory","type":["null","string"]},
{"name":"product","type":["null","string"]},
{"name":"trade_id","type":["null","string"]},
{"name":"category","type":["null","string"]},
{"name":"currency","type":"string"},
{"name":"date","type":"int"},
{"name":"book","type":"string"},
{"name":"pnl_local","type":"double"},
{"name":"pnl_cde","type":"double"},
{"name":"pnl_status","type":"string"}
]
}
}
]
}
Запись создана (Довольно напечатано):
SaNative(
header(
T1,
null,
TA,
null,
SA,
TA,
null,
1.6,
RealTime,
null,
ta_pnl_0002,
1236472051
),
pnlData(
detailed view,
Daily,
Regular,
null,
null,
null,
null,
null,
null,
null,
null,
cad,
20181130,
8271,
997.7899999917,
997.8100000024,
locked
)
)
Пожалуйста, порекомендуйте?
ОБНОВЛЕНИЕ 2:
Я думаю, что проблема с нулями. Ожидается, что записи будут иметь несколько атрибутов как NULL. Поскольку я изменил на Option[String], его значение должно быть None, а не NULL. Я новичок в Scala, поэтому все еще понимаю его типы данных.
Таким образом, изменение значения с нуля на None теперь работает.
Однако у меня все еще есть один вопрос. Если мои атрибуты - Option[String], как он переводится в Avro? Если мое значение None, переводится ли это на Avro null?
0 ответов
Ваша проблема в том, что ваша схема не определяет поля как обнуляемые. Если у вас есть нулевые значения, это должно поддерживаться в схеме. Чтобы сделать это в Avro, вы должны создать "объединение" из двух схем: одна представляет собой схему NULL, а другая - "реальный тип". Например, посмотрите на эту схему.
{
"type": "record",
"name": "MyClass",
"namespace": "com.sksamuel.avro4s",
"fields": [
{
"name": "a",
"type": [
"null",
"string"
],
"default": null
}
]
}
Это тип записи com.sksamuel.avro4s.MyClass
это имеет одно поле, a
, Тогда тип a
может быть null
или же string
, Поэтому при записи записей этого типа вы можете использовать либо ноль, либо строку для поля a
,
Теперь вам не нужно создавать эту схему вручную (как вы делали в своем посте). Вы можете использовать AvroSchema
макрос, чтобы сделать магию для вас на основе класса case.
val schema = AvroSchema[MyClass]
,
При использовании этого макроса будет создан обнуляемый союз, если вы определите тип Option[T]
, Так что вы могли бы сделать,
case class MyClass(a: Option[String])
И вы бы в конечном итоге с той же схемой выше. Если бы вы сделали,
case class MyClass(a: String)
Тогда схема будет:
{
"type": "record",
"name": "MyClass",
"namespace": "com.sksamuel.avro4s",
"fields": [
{
"name": "a",
"type": "string"
}
]
}
ТЛ; др
Либо создайте схему из класса дел, в котором поле, которое можно обнулять, определено как Option, либо обновите свою ручную схему, чтобы использовать объединение {null, T}.