Конвейер Google Dataflow для различной схемы

У меня есть продукт для определения и настройки бизнес-процессов. Частью этого продукта является конструктор форм, который позволяет пользователям настраивать различные формы.

Все эти данные форм поддерживаются MongoDB в следующей структуре

- form_schemas

{
  "_id" : "",
  "name" : "",
  "account_id" : "",
  "fields" : [
    {
      "label" : "Enter Email",
      "name" : "email",
      "type" : "text",
      "required" : "true",
      "hidden" : "false",
      "additional_config" : { }
    },
    {
      "label" : "Select DOB",
      "name" : "dob",
      "type" : "date",
      "required" : "true",
      "hidden" : "false",
      "additional_config" : { }
    }
    ...
  ]
}

- form_datas

{
  "workflow_template_id" : "",
  "account_id" : ""
  "data" : {
    "email" : "xyx@gmail.com",
    "dob" : "2001-04-05"
  },
  "created_at" : "",
  "updated_at" : ""

}


Как видно выше, форма может быть для разных предприятий. Однако я смотрю на конвейер данных для передачи данных в Google Bigquery через определенные промежутки времени для анализа.

На стороне BQ я веду отдельные таблицы для каждого рабочего процесса.

У меня есть текущее рабочее решение, полностью написанное на Google Cloud Functions. У меня есть задание Google Scheduler, которое периодически запускается, вызывая различные облачные функции. Облачные функции выполняют следующие задачи на высоком уровне

  • Итерировать для каждой схемы
  • Прочтите данные mongodb для каждой схемы с момента последнего запуска (как курсор)
  • Для каждой строки данных запустите настраиваемую логику преобразования (это включает преобразование различных вложенных типов данных, таких как сетки / поиск и т. Д.)
  • Записывайте каждую строку преобразованных данных непосредственно в виде потока как ndjson в Google Cloud Storage.

Решение, приведенное выше, дает мне,

  • Полный контроль над трансформацией
  • Простое развертывание

Однако, поскольку это все на CF, я ограничен 9 минутами на запуск. Это, по сути, предъявляет много требований к разбивке на страницы, особенно если есть необходимость регенерировать полные данные с начала времени.

Хотя вышеупомянутое решение пока работает нормально, я искал другие варианты без сервера, такие как поток данных Google. Поскольку я только начинаю работать с потоком данных / лучом apache, мне было интересно

Если бы мне пришлось писать конвейер на балке, я бы использовал тот же подход

  1. Извлечь (строка за строкой) -> Преобразовать -> Загрузить (GCS) -> Загрузить (BQ)

или же

  1. Извлечь (все данные как JSON) -> Загрузить в GCS -> Преобразовать (Beam) -> Загрузить в GCS -> Загрузить в BQ

Сообщите мне, есть ли лучший вариант для полной обработки данных.

1 ответ

Обычно такой процесс записывает необработанные данные в GCS, а затем преобразуется в BigQuery. Это сделано для того, чтобы при обнаружении дефектов в преобразовании (которые неизбежны) и изменении требований (также неизбежных) вы могли воспроизвести данные с новым кодом.

В идеале шаги, предшествующие преобразованию, автоматизируются с помощью инструмента отслеживания измененных данных (CDC). Существует множество инструментов CDC, но Debezium берет верх, поскольку он надежен и бесплатен. Существует коннектор Debezium для получения данных из MongoDB и примеры того, как поместить Debezium CDC в Bigquery.

Если вы собираетесь написать код, который помещает данные в GCS, я бы рекомендовал рассмотреть возможность использования Apache Parquet, а не NDJSON в качестве формата. Производительность и стоимость будут лучше, и я считаю, что с форматом с типами данных легче работать.

Другие вопросы по тегам