Как отфильтровать многострочные данные JSON из таблицы AWS Hive?
У меня есть правило AWS IoT, которое отправляет входящий JSON в Kinesis Firehose.
Данные JSON из моей публикации IoT находятся в одной строке, например:
{"count":4950, "dateTime8601": "2017-03-09T17:15:28.314Z"}
Раздел "Проверка" IoT в пользовательском интерфейсе администратора позволяет опубликовать сообщение, по умолчанию используется следующее (многострочный JSON в формате примечания):
{
"message": "Hello from AWS IoT console"
}
Я передаю Firehose на S3, который затем преобразуется EMR в столбчатый формат, который в конечном итоге будет использоваться Athena.
Проблема заключается в том, что во время преобразования в столбчатый формат Hive (в частности, JSON SerDe) не может обрабатывать объект JSON, занимающий более одной строки. Это взорвет конвертацию, а не конвертирует хорошие однострочные записи JSON.
Мой вопрос:
- Как настроить FireHose для игнорирования многострочного JSON?
- Если это невозможно, как сказать Hive удалить символы новой строки перед загрузкой в таблицу или хотя бы перехватить исключения и попытаться продолжить?
Я уже пытаюсь игнорировать искаженный JSON при определении таблицы Hive:
DROP TABLE site_sensor_data_raw;
CREATE EXTERNAL TABLE site_sensor_data_raw (
count int,
dateTime8601 timestamp
)
PARTITIONED BY(year int, month int, day int, hour int)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
with serdeproperties (
'ignore.malformed.json' = 'true',
"timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis"
)
LOCATION 's3://...';
Вот мой полный HQL, который делает преобразование:
--Example of converting to OEX/columnar formats
DROP TABLE site_sensor_data_raw;
CREATE EXTERNAL TABLE site_sensor_data_raw (
count int,
dateTime8601 timestamp
)
PARTITIONED BY(year int, month int, day int, hour int)
ROW FORMAT serde 'org.apache.hive.hcatalog.data.JsonSerDe'
with serdeproperties (
'ignore.malformed.json' = 'true',
"timestamp.formats"="yyyy-MM-dd'T'HH:mm:ss.SSS'Z',millis"
)
LOCATION 's3://bucket.me.com/raw/all-sites/';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='15') location 's3://bucket.me.com/raw/all-sites/2017/03/09/15';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='16') location 's3://bucket.me.com/raw/all-sites/2017/03/09/16';
ALTER TABLE site_sensor_data_raw ADD PARTITION (year='2017',month='03',day='09',hour='17') location 's3://bucket.me.com/raw/all-sites/2017/03/09/17';
DROP TABLE to_orc;
CREATE EXTERNAL TABLE to_orc (
count int,
dateTime8601 timestamp
)
STORED AS ORC
LOCATION 's3://bucket.me.com/orc'
TBLPROPERTIES ("orc.compress"="ZLIB");
INSERT OVERWRITE TABLE to_orc SELECT count,dateTime8601 FROM site_sensor_data_raw where year=2017 AND month=03 AND day=09 AND hour=15;
1 ответ
Хорошо, JSON SerDe по умолчанию используется в EMR, и Athena не может работать с многострочными записями json. Каждая запись JSON должна быть в одной строке.
В многострочном JSON я вижу две проблемы с точки зрения Hive/Hadoop и даже с точки зрения Presto (используется в Athean)
- Учитывая файл, очевидно, что Hive/Hadoop и серия JSON не смогут распознать конец и начало записи json, чтобы вернуть ее объектное представление.
- При наличии нескольких файлов многострочные файлы JSON не разделяются, как обычные файлы JSON с разделителями / n.
Чтобы обойти эту проблему со стороны EMR/Athena, вам нужно написать свои собственные SerDe, основанные на вашей структуре данных и перехватывать исключения и т. Д.
Как настроить FireHose для игнорирования многострочного JSON?
Firehose не может игнорировать определенный формат. Он будет использовать все, что помещается с использованием его API(PutRecord или PutRecordBatch) в качестве большого двоичного объекта, и отправит его по назначению.
http://docs.aws.amazon.com/firehose/latest/APIReference/API_PutRecordBatch.html
В любом случае, AWS Firehose предлагает Преобразование данных с помощью AWS Lambda, где вы можете использовать функции Lambda для преобразования входящих данных в Firehose и передачи преобразованных данных в место назначения. Таким образом, вы можете использовать эту функцию, чтобы распознать и сгладить многострочный JSON заранее. Вы также можете удалить записи, если они не отформатированы должным образом и т. Д. Вам нужно будет изучить, как IOT отправляет многострочные данные json в firehose(например, строка за строкой и т. Д.), Чтобы написать собственную функцию.
https://aws.amazon.com/blogs/compute/amazon-kinesis-firehose-data-transformation-with-aws-lambda/
Если это невозможно, как сказать Hive удалить символы новой строки перед загрузкой в таблицу или хотя бы перехватить исключения и попытаться продолжить?
Если у вас все еще есть многострочный JSON в месте назначения пожарного шланга, так как у вас есть EMR в вашем ETL, вы можете использовать его вычисления вместо Lambda, чтобы сгладить JSON. Эта функция на свече может помочь вам в этом. https://issues.apache.org/jira/browse/SPARK-18352
Затем вы можете принять эти данные для создания в столбчатом формате, чтобы Афина работала с ним.