AWS Glue: как работать с вложенным JSON с различными схемами
Цель: мы надеемся использовать каталог данных AWS Glue для создания единой таблицы для данных JSON, находящихся в корзине S3, которую мы затем запросим и проанализируем с помощью Redshift Spectrum.
Предыстория: данные JSON взяты из DynamoDB Streams и глубоко вложены. Первый уровень JSON имеет согласованный набор элементов: Keys, NewImage, OldImage, SequenceNumber, ApproximateCreationDateTime, SizeBytes и EventName. Единственное изменение состоит в том, что некоторые записи не имеют NewImage, а некоторые не имеют OldImage. Ниже этого первого уровня, однако, схема варьируется в широких пределах.
В идеале мы хотели бы использовать Glue только для разбора этого первого уровня JSON и в основном обрабатывать нижние уровни как большие объекты STRING (которые мы затем анализируем при необходимости с помощью Redshift Spectrum). В настоящее время мы загружаем всю запись в один столбец VARCHAR в Redshift, но записи приближаются к максимальному размеру для типа данных в Redshift (максимальная длина VARCHAR составляет 65535). В результате мы хотели бы выполнить этот первый уровень анализа до того, как записи появятся в Redshift.
На что мы уже пробовали / ссылались:
- Указание AWS Glue Crawler на корзину S3 приводит к сотням таблиц с согласованной схемой верхнего уровня (перечисленные выше атрибуты), но изменяющими схемы на более глубоких уровнях в элементах STRUCT. Мы не нашли способ создать задание Glue ETL, которое бы считывало все эти таблицы и загружало его в одну таблицу.
- Создание таблицы вручную не было плодотворным. Мы попытались установить для каждого столбца тип данных STRING, но заданию не удалось загрузить данные (предположительно, поскольку это потребовало бы некоторого преобразования из STRUCT в STRING). При установке столбцов в STRUCT требуется определенная схема - но это именно то, что меняется от одной записи к другой, поэтому мы не можем предоставить общую схему STRUCT, которая работает для всех рассматриваемых записей.
- Преобразование AWS Glue Relationalize является интригующим, но не тем, что мы ищем в этом сценарии (поскольку мы хотим сохранить часть JSON нетронутой, а не сгладить ее полностью). Redshift Spectrum поддерживает скалярные данные JSON пару недель назад, но это не работает с вложенным JSON, с которым мы имеем дело. Похоже, что ни один из них не помогает в обработке сотен таблиц, созданных Clue Crawler.
Вопрос: Как бы мы использовали Glue (или какой-то другой метод), чтобы позволить нам анализировать только первый уровень этих записей - игнорируя изменяющиеся схемы под элементами на верхнем уровне - чтобы мы могли получить к нему доступ из Spectrum или загрузить его? физически в Redshift?
Я новичок в Клее. Я потратил довольно много времени на документацию по Glue и просматривал (несколько редкую) информацию на форумах. Я мог упустить что-то очевидное - или, возможно, это ограничение клея в его нынешнем виде. Любые рекомендации приветствуются.
Спасибо!
4 ответа
Я не уверен, что вы можете сделать это с помощью определения таблицы, но вы можете выполнить это с помощью задания ETL, используя функцию отображения для приведения значений верхнего уровня в виде строк JSON. Документация: [ ссылка]
import json
# Your mapping function
def flatten(rec):
for key in rec:
rec[key] = json.dumps(rec[key])
return rec
old_df = glueContext.create_dynamic_frame.from_options(
's3',
{"paths": ['s3://...']},
"json")
# Apply mapping function f to all DynamicRecords in DynamicFrame
new_df = Map.apply(frame=old_df, f=flatten)
Отсюда у вас есть возможность экспортировать в S3 (возможно, в Parquet или каком-либо другом столбчатом формате для оптимизации запросов) или напрямую в Redshift из моего понимания, хотя я не пробовал.
Это ограничение клея на данный момент. Вы смотрели на классификаторы клея? Это единственная вещь, которую я еще не использовал, но может удовлетворить ваши потребности. Вы можете определить путь JSON для поля или что-то в этом роде.
Помимо этого - работа с клеем - это путь. Это Spark на заднем плане, так что вы можете сделать почти все. Установите конечную точку разработки и поэкспериментируйте с ней. Я сталкивался с различными препятствиями в течение последних трех недель и решил полностью отказаться от любой функциональности Glue и только от Spark, чтобы она была портативной и действительно работала.
При настройке конечной точки dev вам, возможно, придется иметь в виду, что роль IAM должна иметь путь "/", поэтому вам, скорее всего, потребуется вручную создать отдельную роль с таким путем. Созданный автоматически имеет путь "/service-role/".
Процедура, которую я нашел полезной для мелкого вложенного json:
ApplyMapping для первого уровня как
datasource0
;взрываться
struct
или жеarray
объекты, чтобы избавиться от уровня элементаdf1 = datasource0.toDF().select(id,col1,col2,...,explode(coln).alias(coln)
, гдеexplode
требуетfrom pyspark.sql.functions import explode
;Выберите объекты JSON, которые вы хотели бы сохранить
intact_json = df1.select(id, itct1, itct2,..., itctm)
;преобразование
df1
вернуться к dynamicFrame и Relationalize dynamicFrame, а также удалить неповрежденные столбцыdataframe.drop_fields(itct1, itct2,..., itctm)
;Соедините реляционную таблицу с неповрежденной таблицей на основе столбца 'id'.
По состоянию на 20.12.2008 я смог вручную определить таблицу с полями json первого уровня в виде столбцов с типом STRING. Затем в сценарии склеивания динамический кадр имеет столбец в виде строки. Оттуда вы можете сделать Unbox
операция типа json
на полях. Это json проанализирует поля и выведет реальную схему. Объединяя Unbox
с Filter
позволяет циклически проходить и обрабатывать гетерогенные схемы JSON с одного и того же входа, если вы можете циклически проходить по списку схем.
Однако, одно слово предостережения, это невероятно медленно. Я думаю, что клей загружает исходные файлы из s3 во время каждой итерации цикла. Я пытался найти способ сохранить исходные данные источника, но похоже, .toDF
выводит схему строковых полей json, даже если вы указываете их как клей StringType. Я добавлю здесь комментарий, если смогу найти решение с лучшей производительностью.
Вы должны добавить классификатор клея предпочтительно $[*]
Когда вы сканируете файл json в s3, он прочитает первую строку файла.
Вы можете создать связующее задание, чтобы загрузить таблицу каталога данных этого json-файла в красное смещение.
Моя единственная проблема в том, что Redshift Spectrum имеет проблемы с чтением таблиц json в каталоге данных.
дайте мне знать, если вы нашли решение