AWS Glue: как работать с вложенным JSON с различными схемами

Цель: мы надеемся использовать каталог данных AWS Glue для создания единой таблицы для данных JSON, находящихся в корзине S3, которую мы затем запросим и проанализируем с помощью Redshift Spectrum.

Предыстория: данные JSON взяты из DynamoDB Streams и глубоко вложены. Первый уровень JSON имеет согласованный набор элементов: Keys, NewImage, OldImage, SequenceNumber, ApproximateCreationDateTime, SizeBytes и EventName. Единственное изменение состоит в том, что некоторые записи не имеют NewImage, а некоторые не имеют OldImage. Ниже этого первого уровня, однако, схема варьируется в широких пределах.

В идеале мы хотели бы использовать Glue только для разбора этого первого уровня JSON и в основном обрабатывать нижние уровни как большие объекты STRING (которые мы затем анализируем при необходимости с помощью Redshift Spectrum). В настоящее время мы загружаем всю запись в один столбец VARCHAR в Redshift, но записи приближаются к максимальному размеру для типа данных в Redshift (максимальная длина VARCHAR составляет 65535). В результате мы хотели бы выполнить этот первый уровень анализа до того, как записи появятся в Redshift.

На что мы уже пробовали / ссылались:

  • Указание AWS Glue Crawler на корзину S3 приводит к сотням таблиц с согласованной схемой верхнего уровня (перечисленные выше атрибуты), но изменяющими схемы на более глубоких уровнях в элементах STRUCT. Мы не нашли способ создать задание Glue ETL, которое бы считывало все эти таблицы и загружало его в одну таблицу.
  • Создание таблицы вручную не было плодотворным. Мы попытались установить для каждого столбца тип данных STRING, но заданию не удалось загрузить данные (предположительно, поскольку это потребовало бы некоторого преобразования из STRUCT в STRING). При установке столбцов в STRUCT требуется определенная схема - но это именно то, что меняется от одной записи к другой, поэтому мы не можем предоставить общую схему STRUCT, которая работает для всех рассматриваемых записей.
  • Преобразование AWS Glue Relationalize является интригующим, но не тем, что мы ищем в этом сценарии (поскольку мы хотим сохранить часть JSON нетронутой, а не сгладить ее полностью). Redshift Spectrum поддерживает скалярные данные JSON пару недель назад, но это не работает с вложенным JSON, с которым мы имеем дело. Похоже, что ни один из них не помогает в обработке сотен таблиц, созданных Clue Crawler.

Вопрос: Как бы мы использовали Glue (или какой-то другой метод), чтобы позволить нам анализировать только первый уровень этих записей - игнорируя изменяющиеся схемы под элементами на верхнем уровне - чтобы мы могли получить к нему доступ из Spectrum или загрузить его? физически в Redshift?

Я новичок в Клее. Я потратил довольно много времени на документацию по Glue и просматривал (несколько редкую) информацию на форумах. Я мог упустить что-то очевидное - или, возможно, это ограничение клея в его нынешнем виде. Любые рекомендации приветствуются.

Спасибо!

4 ответа

Решение

Я не уверен, что вы можете сделать это с помощью определения таблицы, но вы можете выполнить это с помощью задания ETL, используя функцию отображения для приведения значений верхнего уровня в виде строк JSON. Документация: [ ссылка]

import json

# Your mapping function
def flatten(rec):
    for key in rec:
        rec[key] = json.dumps(rec[key])
    return rec

old_df = glueContext.create_dynamic_frame.from_options(
    's3',
    {"paths": ['s3://...']},
    "json")

# Apply mapping function f to all DynamicRecords in DynamicFrame
new_df = Map.apply(frame=old_df, f=flatten)

Отсюда у вас есть возможность экспортировать в S3 (возможно, в Parquet или каком-либо другом столбчатом формате для оптимизации запросов) или напрямую в Redshift из моего понимания, хотя я не пробовал.

Это ограничение клея на данный момент. Вы смотрели на классификаторы клея? Это единственная вещь, которую я еще не использовал, но может удовлетворить ваши потребности. Вы можете определить путь JSON для поля или что-то в этом роде.

Помимо этого - работа с клеем - это путь. Это Spark на заднем плане, так что вы можете сделать почти все. Установите конечную точку разработки и поэкспериментируйте с ней. Я сталкивался с различными препятствиями в течение последних трех недель и решил полностью отказаться от любой функциональности Glue и только от Spark, чтобы она была портативной и действительно работала.

При настройке конечной точки dev вам, возможно, придется иметь в виду, что роль IAM должна иметь путь "/", поэтому вам, скорее всего, потребуется вручную создать отдельную роль с таким путем. Созданный автоматически имеет путь "/service-role/".

Процедура, которую я нашел полезной для мелкого вложенного json:

  1. ApplyMapping для первого уровня как datasource0;

  2. взрываться struct или же array объекты, чтобы избавиться от уровня элементаdf1 = datasource0.toDF().select(id,col1,col2,...,explode(coln).alias(coln), где explode требует from pyspark.sql.functions import explode;

  3. Выберите объекты JSON, которые вы хотели бы сохранить intact_json = df1.select(id, itct1, itct2,..., itctm);

  4. преобразование df1 вернуться к dynamicFrame и Relationalize dynamicFrame, а также удалить неповрежденные столбцы dataframe.drop_fields(itct1, itct2,..., itctm);

  5. Соедините реляционную таблицу с неповрежденной таблицей на основе столбца 'id'.

По состоянию на 20.12.2008 я смог вручную определить таблицу с полями json первого уровня в виде столбцов с типом STRING. Затем в сценарии склеивания динамический кадр имеет столбец в виде строки. Оттуда вы можете сделать Unbox операция типа json на полях. Это json проанализирует поля и выведет реальную схему. Объединяя Unbox с Filter позволяет циклически проходить и обрабатывать гетерогенные схемы JSON с одного и того же входа, если вы можете циклически проходить по списку схем.

Однако, одно слово предостережения, это невероятно медленно. Я думаю, что клей загружает исходные файлы из s3 во время каждой итерации цикла. Я пытался найти способ сохранить исходные данные источника, но похоже, .toDF выводит схему строковых полей json, даже если вы указываете их как клей StringType. Я добавлю здесь комментарий, если смогу найти решение с лучшей производительностью.

Вы должны добавить классификатор клея предпочтительно $[*]

Когда вы сканируете файл json в s3, он прочитает первую строку файла.

Вы можете создать связующее задание, чтобы загрузить таблицу каталога данных этого json-файла в красное смещение.

Моя единственная проблема в том, что Redshift Spectrum имеет проблемы с чтением таблиц json в каталоге данных.

дайте мне знать, если вы нашли решение

Другие вопросы по тегам