Как установить логический тип в схеме spark-avro 2.4?

Мы читаем информацию о метках времени из файлов avro в нашем приложении. Я нахожусь в процессе тестирования обновления от Spark 2.3.1 до Spark 2.4, которое включает в себя недавно встроенную интеграцию spark-avro. Однако я не могу понять, как сказать схеме avro, что я хочу, чтобы метки времени имели логический тип "timestamp-millis", а не "timestamp-micros" по умолчанию.

Просто из просмотра тестовых файлов avro в Spark 2.3.1 с использованием пакета Databricks spark-avro 4.0.0 мы получили следующие поля / схему:

{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":["long","null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}

Время поиска составляло миллисекунды с тех пор, как эпоха сохранялась как long. Все было отлично.

Когда я поднял вещи до Spark 2.4 и встроенных пакетов spark-avro 2.4.0, у меня были следующие более новые поля / схема:

{"name":"id","type":["string","null"]},
{"name":"searchQuery","type":["string","null"]},
{"name":"searchTime","type":[{"type":"long","logicalType":"timestamp-micros"},"null"]},
{"name":"score","type":"double"},
{"name":"searchType","type":["string","null"]}

Как можно видеть, базовый тип все еще длинный, но теперь он дополнен логическим типом "timestamp-micros". Это именно так, как говорится в примечаниях к выпуску, однако я не могу найти способ указать схему для использования опции 'timestamp-millis'.

Это становится проблемой, когда я записываю в avro-файл объект Timestamp, инициализированный, скажем, через 10000 секунд после эпохи, он будет считан обратно как 10000000 секунд. В 2.3.1/databricks-avro это был просто длинный, без информации, связанной с ним, поэтому он вышел так же, как и вошел.

В настоящее время мы строим схему, отражая объект интереса следующим образом:

val searchSchema: StructType = ScalaReflection.schemaFor[searchEntry].dataType.asInstanceOf[StructType]

Я попытался увеличить это, создав модифицированную схему, которая пыталась заменить StructField, соответствующий записи searchTime, следующим образом:

    val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })

Однако объект StructField, определенный в spark.sql.types, не имеет понятия логического типа, который может увеличивать в нем dataType.

case class StructField(
    name: String,
    dataType: DataType,
    nullable: Boolean = true,
    metadata: Metadata = Metadata.empty) 

Я также попытался создать схему из представления JSON двумя способами:

val schemaJSONrepr = """{
          |          "name" : "id",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchQuery",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchTime",
          |          "type" : "long",
          |          "logicalType" : "timestamp-millis",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "score",
          |          "type" : "double",
          |          "nullable" : false,
          |          "metadata" : { }
          |        }, {
          |          "name" : "searchType",
          |          "type" : "string",
          |          "nullable" : true,
          |          "metadata" : { }
          |        }""".stripMargin

Первая попытка была просто создать DataType из этого

// here spark is a SparkSession instance from a higher scope.
val schema = DataType.fromJSON(schemaJSONrepr).asInstanceOf[StructType]
spark.read
     .schema(schema)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)

Это не удалось из-за того, что он не смог создать StructType для узла searchTime, потому что в нем есть "logicType". Вторая попытка состояла в том, чтобы просто создать схему, передав необработанную строку JSON.

spark.read
     .schema(schemaJSONrepr)
     .format("avro")
     .option("basePath", baseUri)
     .load(uris: _*)

Это не говорит о том, что:

mismatched input '{' expecting {'SELECT', 'FROM', ...

== SQL ==

{
^^^

Я обнаружил, что в API spark-avro есть способ ПОЛУЧИТЬ логический тип из схемы, но не могу понять, как его установить.

Как вы можете видеть из моих неудачных попыток выше, я попытался использовать Schema.Parser для создания объекта схемы avro, но единственным допустимым типом в spark.read.schema являются String и StructType.

Если кто-нибудь может дать представление о том, как изменить / указать этот логический тип, я был бы очень признателен. Спасибо

1 ответ

Решение

Хорошо, я думаю, что ответил на свой вопрос. Когда я изменил программно построенную схему, чтобы использовать явный тип метки времени

val modSearchSchema = StructType(searchSchema.fields.map {
      case StructField(name, _, nullable, metadata) if name == "searchTime" =>
        StructField(name, org.apache.spark.sql.types.DataTypes.TimestampType, nullable, metadata)
      case f => f
    })

Я не изменил логику, когда мы выполняли чтение, когда у нас был объект Row, из которого мы читали обратно. Первоначально мы читали Long и конвертировали его в Timestamp, где все пошло не так, поскольку он считывал Long в микросекундах, что делало его в 1000 раз больше, чем мы предполагали. Изменение нашего чтения для чтения объекта Timestamp напрямую позволяет основной логике учесть это, забирая его из наших (моих) рук. Так:

// searchTime = new Timestamp(row.getAs[Long]("searchTime")) BROKEN

searchTime = row.getAs[Timestamp]("searchTime") // SUCCESS
Другие вопросы по тегам