Spark Strutured Streaming автоматически преобразует метку времени в местное время
У меня есть метка времени в UTC и ISO8601, но при использовании структурированной потоковой передачи она автоматически конвертируется в местное время. Есть ли способ остановить это преобразование? Я хотел бы иметь это в UTC.
Я читаю данные JSON от Кафки, а затем анализирую их с помощью from_json
Искровая функция.
Входные данные:
{"Timestamp":"2015-01-01T00:00:06.222Z"}
Поток:
SparkSession
.builder()
.master("local[*]")
.appName("my-app")
.getOrCreate()
.readStream()
.format("kafka")
... //some magic
.writeStream()
.format("console")
.start()
.awaitTermination();
Схема:
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("Timestamp", DataTypes.TimestampType, true),});
Выход:
+--------------------+
| Timestamp|
+--------------------+
|2015-01-01 01:00:...|
|2015-01-01 01:00:...|
+--------------------+
Как видите, час увеличился сам собой.
PS: я пытался поэкспериментировать с from_utc_timestamp
Функция искры, но не повезло.
5 ответов
Для меня это работало, чтобы использовать:
spark.conf.set("spark.sql.session.timeZone", "UTC")
Он говорит искровому SQL использовать UTC в качестве часового пояса по умолчанию для отметок времени. Я использовал его в Spark SQL, например:
select *, cast('2017-01-01 10:10:10' as timestamp) from someTable
Я знаю, что это не работает в 2.0.1. но работает в Spark 2.2. Я использовал в SQLTransformer
также и это сработало.
Я не уверен насчет потоковой передачи, хотя.
Примечание:
Этот ответ полезен прежде всего в Spark < 2.2. Для более новой версии Spark смотрите ответ от astro_asz
TL; DR К сожалению, именно так Spark обрабатывает метки времени прямо сейчас, и на самом деле нет никакой встроенной альтернативы, кроме прямой работы в эпоху, без использования утилит даты / времени.
Вы можете провести глубокое обсуждение в списке разработчиков Spark: семантика SQL TIMESTAMP и SPARK-18350
Самый чистый обходной путь, который я нашел, - это установить -Duser.timezone
в UTC
как для водителя, так и для исполнителей. Например, с отправки:
bin/spark-shell --conf "spark.driver.extraJavaOptions=-Duser.timezone=UTC" \
--conf "spark.executor.extraJavaOptions=-Duser.timezone=UTC"
или путем настройки файлов конфигурации (spark-defaults.conf
):
spark.driver.extraJavaOptions -Duser.timezone=UTC
spark.executor.extraJavaOptions -Duser.timezone=UTC
Хотя были даны два очень хороших ответа, я нашел, что оба они были немного тяжелым молотком для решения проблемы. Я не хотел ничего, что потребовало бы изменения поведения разбора часового пояса во всем приложении, или подход, который изменил бы часовой пояс по умолчанию для моей JVM. Я нашел решение после большой боли, которой я поделюсь ниже...
Разбор строк времени [/date] в метки времени для манипуляций с датой, затем корректное отображение результата обратно
Во-первых, давайте рассмотрим вопрос о том, как заставить Spark SQL правильно анализировать строку даты [/time] (с учетом формата) в метку времени, а затем правильно отобразить эту метку времени так, чтобы она отображала ту же дату [/time], что и ввод исходной строки. Общий подход:
- convert a date[/time] string to time stamp [via to_timestamp]
[ to_timestamp seems to assume the date[/time] string represents a time relative to UTC (GMT time zone) ]
- relativize that timestamp to the timezone we are in via from_utc_timestamp
Тестовый код ниже реализует этот подход. 'часовой пояс, в котором мы находимся', передается в качестве первого аргумента методу timeTricks. Код преобразует входную строку "1970-01-01" в localizedTimeStamp (через from_utc_timestamp) и проверяет, что значение valueOf этой метки времени совпадает с "1970-01-01 00:00:00".
object TimeTravails {
def main(args: Array[String]): Unit = {
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
val spark: SparkSession = SparkSession.builder()
.master("local[3]")
.appName("SparkByExample")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
import java.sql.Timestamp
def timeTricks(timezone: String): Unit = {
val df2 = List("1970-01-01").toDF("timestr"). // can use to_timestamp even without time parts !
withColumn("timestamp", to_timestamp('timestr, "yyyy-MM-dd")).
withColumn("localizedTimestamp", from_utc_timestamp('timestamp, timezone)).
withColumn("weekday", date_format($"localizedTimestamp", "EEEE"))
val row = df2.first()
println("with timezone: " + timezone)
df2.show()
val (timestamp, weekday) = (row.getAs[Timestamp]("localizedTimestamp"), row.getAs[String]("weekday"))
timezone match {
case "UTC" =>
assert(timestamp == Timestamp.valueOf("1970-01-01 00:00:00") && weekday == "Thursday")
case "PST" | "GMT-8" | "America/Los_Angeles" =>
assert(timestamp == Timestamp.valueOf("1969-12-31 16:00:00") && weekday == "Wednesday")
case "Asia/Tokyo" =>
assert(timestamp == Timestamp.valueOf("1970-01-01 09:00:00") && weekday == "Thursday")
}
}
timeTricks("UTC")
timeTricks("PST")
timeTricks("GMT-8")
timeTricks("Asia/Tokyo")
timeTricks("America/Los_Angeles")
}
}
Решение проблемы структурированной потоковой передачи. Интерпретация входящих строк даты [/time] как UTC (не местное время)
Приведенный ниже код иллюстрирует, как применить вышеуказанные приемы (с небольшой модификацией), чтобы исправить проблему смещения временных меток на смещение между местным временем и GMT.
object Struct {
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
def main(args: Array[String]): Unit = {
val timezone = "PST"
val spark: SparkSession = SparkSession.builder()
.master("local[3]")
.appName("SparkByExample")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
val df = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", "9999")
.load()
import spark.implicits._
val splitDf = df.select(split(df("value"), " ").as("arr")).
select($"arr" (0).as("tsString"), $"arr" (1).as("count")).
withColumn("timestamp", to_timestamp($"tsString", "yyyy-MM-dd"))
val grouped = splitDf.groupBy(window($"timestamp", "1 day", "1 day").as("date_window")).count()
val tunedForDisplay =
grouped.
withColumn("windowStart", to_utc_timestamp($"date_window.start", timezone)).
withColumn("windowEnd", to_utc_timestamp($"date_window.end", timezone))
tunedForDisplay.writeStream
.format("console")
.outputMode("update")
.option("truncate", false)
.start()
.awaitTermination()
}
}
Код требует ввода через сокет... Я использую программу 'nc' (net cat), которая запускается так:
nc -l 9999
Затем я запускаю программу Spark и предоставляю net cat одну строку ввода:
1970-01-01 4
Вывод, который я получаю, иллюстрирует проблему со смещением:
-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+-------------------+-------------------+
|date_window |count|windowStart |windowEnd |
+------------------------------------------+-----+-------------------+-------------------+
|[1969-12-31 16:00:00, 1970-01-01 16:00:00]|1 |1970-01-01 00:00:00|1970-01-02 00:00:00|
+------------------------------------------+-----+-------------------+-------------------+
Обратите внимание, что начало и конец для date_window сдвинуты на восемь часов от ввода (потому что я нахожусь в часовом поясе GMT-7/8, PST). Однако я корректирую этот сдвиг, используя to_utc_timestamp, чтобы получить правильное время начала и окончания для однодневного окна, которое включает входные данные: 1970-01-01 00:00:00,1970-01-02 00:00:00.
Обратите внимание, что в первом представленном блоке кода мы использовали from_utc_timestamp, тогда как для решения структурированной потоковой передачи мы использовали to_utc_timestamp. Мне еще предстоит выяснить, какой из этих двух вариантов использовать в данной ситуации. (Пожалуйста, подскажите мне, если вы знаете!). Для получения дополнительной информации о том, как работают эти функции, см. Соответствующую статью в моем блоге: http://datalackey.com/2019/08/04/time-for-an-article-on-the-spark-sql-time-function-from_utc_timestamp/
Другое решение, которое сработало для меня, заключалось в том, чтобы установить часовой пояс jvm по умолчанию на ваш целевой часовой пояс (UTC в вашем случае).
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
Я добавил код выше перед записью фрейма данных Spark в базу данных.
Если часовой пояс сеанса отличается от UTC, все эти функции преобразуются в часовой пояс сеанса:
from_unixtime(1695773049)
timestamp('1970-01-01 00:00:00.0 UTC')
cast('1970-01-01 00:00:00.0 UTC' as timestamp)
current_timestamp()
Установка часового пояса сеанса в формате UTC — лучшее решение, позволяющее избежать преобразования, см. принятый ответ. Если по какой-то причине вы не можете контролировать часовой пояс сеанса, поскольку он установлен в структуре высокого уровня, и вы пишете только код SQL, вы можете проверить current_timezone и условно преобразовать его следующим образом:
case when current_timezone() <> 'UTC'
then to_utc_timestamp(from_unixtime(1695773049), current_timezone())
else from_unixtime(1695773049)
end as smart_conversion --smart conversion back to UTC if we are not in UTC