Конвейер Google Dataflow для различной схемы
У меня есть продукт для определения и настройки бизнес-процессов. Частью этого продукта является конструктор форм, который позволяет пользователям настраивать различные формы.
Все эти данные форм поддерживаются MongoDB в следующей структуре
- form_schemas
{
"_id" : "",
"name" : "",
"account_id" : "",
"fields" : [
{
"label" : "Enter Email",
"name" : "email",
"type" : "text",
"required" : "true",
"hidden" : "false",
"additional_config" : { }
},
{
"label" : "Select DOB",
"name" : "dob",
"type" : "date",
"required" : "true",
"hidden" : "false",
"additional_config" : { }
}
...
]
}
- form_datas
{
"workflow_template_id" : "",
"account_id" : ""
"data" : {
"email" : "xyx@gmail.com",
"dob" : "2001-04-05"
},
"created_at" : "",
"updated_at" : ""
}
Как видно выше, форма может быть для разных предприятий. Однако я смотрю на конвейер данных для передачи данных в Google Bigquery через определенные промежутки времени для анализа.
На стороне BQ я веду отдельные таблицы для каждого рабочего процесса.
У меня есть текущее рабочее решение, полностью написанное на Google Cloud Functions. У меня есть задание Google Scheduler, которое периодически запускается, вызывая различные облачные функции. Облачные функции выполняют следующие задачи на высоком уровне
- Итерировать для каждой схемы
- Прочтите данные mongodb для каждой схемы с момента последнего запуска (как курсор)
- Для каждой строки данных запустите настраиваемую логику преобразования (это включает преобразование различных вложенных типов данных, таких как сетки / поиск и т. Д.)
- Записывайте каждую строку преобразованных данных непосредственно в виде потока как ndjson в Google Cloud Storage.
Решение, приведенное выше, дает мне,
- Полный контроль над трансформацией
- Простое развертывание
Однако, поскольку это все на CF, я ограничен 9 минутами на запуск. Это, по сути, предъявляет много требований к разбивке на страницы, особенно если есть необходимость регенерировать полные данные с начала времени.
Хотя вышеупомянутое решение пока работает нормально, я искал другие варианты без сервера, такие как поток данных Google. Поскольку я только начинаю работать с потоком данных / лучом apache, мне было интересно
Если бы мне пришлось писать конвейер на балке, я бы использовал тот же подход
- Извлечь (строка за строкой) -> Преобразовать -> Загрузить (GCS) -> Загрузить (BQ)
или же
- Извлечь (все данные как JSON) -> Загрузить в GCS -> Преобразовать (Beam) -> Загрузить в GCS -> Загрузить в BQ
Сообщите мне, есть ли лучший вариант для полной обработки данных.
1 ответ
Обычно такой процесс записывает необработанные данные в GCS, а затем преобразуется в BigQuery. Это сделано для того, чтобы при обнаружении дефектов в преобразовании (которые неизбежны) и изменении требований (также неизбежных) вы могли воспроизвести данные с новым кодом.
В идеале шаги, предшествующие преобразованию, автоматизируются с помощью инструмента отслеживания измененных данных (CDC). Существует множество инструментов CDC, но Debezium берет верх, поскольку он надежен и бесплатен. Существует коннектор Debezium для получения данных из MongoDB и примеры того, как поместить Debezium CDC в Bigquery.
Если вы собираетесь написать код, который помещает данные в GCS, я бы рекомендовал рассмотреть возможность использования Apache Parquet, а не NDJSON в качестве формата. Производительность и стоимость будут лучше, и я считаю, что с форматом с типами данных легче работать.