Как установить логический тип в схеме spark-avro 2.4?
Мы читаем информацию о метках времени из файлов avro в нашем приложении. Я нахожусь в процессе тестирования обновления от Spark 2.3.1 до Spark 2.4, которое включает в себя недавно встроенную интеграцию spark-avro. Однако я не могу понять, как сказать схеме avro, что я хочу, чтобы метки времени имели логический тип "timestamp-millis", а не "timestamp-micros" по умолчанию.
Просто из просмотра тестовых файлов avro в Spark 2.3.1 с использованием пакета Databricks spark-avro 4.0.0 мы получили следующие поля / схему:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}
Время поиска составляло миллисекунды с тех пор, как эпоха сохранялась как long. Все было отлично.
Когда я поднял вещи до Spark 2.4 и встроенных пакетов spark-avro 2.4.0, у меня были следующие более новые поля / схема:
{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}
Как можно видеть, базовый тип все еще длинный, но теперь он дополнен логическим типом "timestamp-micros". Это именно так, как говорится в примечаниях к выпуску, однако я не могу найти способ указать схему для использования опции 'timestamp-millis'.
Это становится проблемой, когда я записываю в avro-файл объект Timestamp, инициализированный, скажем, через 10000 секунд после эпохи, он будет считан обратно как 10000000 секунд. В 2.3.1/databricks-avro это был просто длинный, без информации, связанной с ним, поэтому он вышел так же, как и вошел.
В настоящее время мы строим схему, отражая объект интереса следующим образом:
val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]
Я попытался увеличить это, создав модифицированную схему, которая пыталась заменить StructField, соответствующий записи searchTime, следующим образом:
val modSearchSchema = StructType(searchSchema.fields.map {
case StructField(name, _, nullable, metadata) if name == "searchTime" =>
StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
case f => f
})
Однако объект StructField, определенный в spark.sql.types, не имеет понятия логического типа, который может увеличивать в нем dataType.
case class StructField(
name: String,
dataType: DataType,
nullable: Boolean = true,
metadata: Metadata = Metadata.empty)
Я также попытался создать схему из представления JSON двумя способами:
val schemaJSONrepr = """{
| "name" : "id",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchQuery",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }, {
| "name" : "searchTime",
| "type" : "long",
| "logicalType" : "timestamp-millis",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "score",
| "type" : "double",
| "nullable" : false,
| "metadata" : { }
| }, {
| "name" : "searchType",
| "type" : "string",
| "nullable" : true,
| "metadata" : { }
| }""".stripMargin
Первая попытка была просто создать DataType из этого
// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
.schema(schema)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)
Это не удалось из-за того, что он не смог создать StructType для узла searchTime, потому что в нем есть "logicType". Вторая попытка состояла в том, чтобы просто создать схему, передав необработанную строку JSON.
spark.read
.schema(schemaJSONrepr)
.format("avro")
.option("basePath", baseUri)
.load(uris: _*)
Это не говорит о том, что:
mismatched input '{' expecting {'SELECT', 'FROM', ...
== SQL ==
{
^^^
Я обнаружил, что в API spark-avro есть способ ПОЛУЧИТЬ логический тип из схемы, но не могу понять, как его установить.
Как вы можете видеть из моих неудачных попыток выше, я попытался использовать Schema.Parser для создания объекта схемы avro, но единственным допустимым типом в spark.read.schema являются String и StructType.
Если кто-нибудь может дать представление о том, как изменить / указать этот логический тип, я был бы очень признателен. Спасибо
1 ответ
Хорошо, я думаю, что ответил на свой вопрос. Когда я изменил программно построенную схему, чтобы использовать явный тип метки времени
val modSearchSchema = StructType(searchSchema.fields.map {
case StructField(name, _, nullable, metadata) if name == "searchTime" =>
StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
case f => f
})
Я не изменил логику, когда мы выполняли чтение, когда у нас был объект Row, из которого мы читали обратно. Первоначально мы читали Long и конвертировали его в Timestamp, где все пошло не так, поскольку он считывал Long в микросекундах, что делало его в 1000 раз больше, чем мы предполагали. Изменение нашего чтения для чтения объекта Timestamp напрямую позволяет основной логике учесть это, забирая его из наших (моих) рук. Так:
// searchTime = new Timestamp(row.getAs[Long]("searchTime")) BROKEN
searchTime = row.getAs[Timestamp]("searchTime") // SUCCESS