Конвейер потока данных, считывающий csv из GCS и записывающий в BigBuery с вызовами Vision и NL API

Я хочу написать программу Dataflow (реализация Java и maven). Вот шаги, которые я хочу выполнить:

  1. Поток данных должен прочитать CSV-файл из облачного хранилища Google. Файл CSV имеет следующий формат:

    Название продукта, URL-адрес изображения, категория, описание1, описание2 Набор чернильных ручек Sigura 30062 с 6-ю пигма-микронными чернилами, https://images-na.ssl-images-amazon.com/images/I/71CkvpG3FEL._SY355_.jpg SY355.jpg, Искусство, Включает в себя 1 размер: #005 (0,20 мм)

    Мини-термоплавкий клеевой пистолет CCbetter с набором из 25 клеевых стержней для высокотемпературного плавильного клеевого пистолета Гибкий триггер для малых ремесленных проектов, герметизации и быстрого ремонта (20 Вт, синий), https://images-na.ssl-images-amazon.com/images/I/61iFrMg4%2B3L._SY355_.jpg. SY355.jpg, Безопасный и удобный выключатель питания со светодиодной подсветкой. Благодаря съемной и гибкой опоре для поддержания устойчивости и устойчивости пистолета. Благодаря высококачественной и изолированной насадке пистолет не деформируется даже при длительном использовании до 500℉.

    ,,,,

  2. Для каждой строки в csv мне нужно выбрать URL-адрес изображения и запустить API видения и получить 2 верхние метки (например, мы получаем метки L1 и L2 из API видения для первого продукта / строки и L3 и L4 для второго продукта / строки)

  3. Для каждой строки в csv мне нужно объединить имя продукта, категорию, description1 и description2 и передать его в NL API. Из ответа NL API мне нужно выбрать 2 лучших объекта в категории товаров народного потребления (например, мы получаем E1 и E2 из первого ряда и E3 и E4 для второго ряда)

  4. Мне нужно создать следующую структуру из полученного ответа:

    Наименование, тема Sakura 30062 Набор из 6 ручек Pigma Micron из 6 штук, L1 Sakura 30062 Набор из 6 ручек Pigma Micron из 6 штук, L2 Sakura 30062 Набор из 6 ручек Pigma Micron из 6 чернил, E1 Sakura 30062 Набор из 6 ручек Pigma Micron Е2

    Миниатюрный термоплавкий пистолет CCbetter с 25шт клеевыми клеями Набор высокотемпературного клеевого пистолета для плавки Гибкий спусковой крючок для малых ремесленных проектов и ремонта и быстрого ремонта (20 Вт, синий), L3 Миниатюрный термоплавкий пистолет CCbetter с 25шт клеевыми клеями Гибкий триггер для самостоятельных малых ремесленных проектов, герметизации и быстрого ремонта (20-ватт, синий), мини-термоплавкий клеевой пистолет L4 CCbetter с 25шт. Клеевыми палочками. Высокотемпературный плавкий клеевой пистолет. Гибкий триггер для самостоятельных небольших ремесленных проектов, герметизации и быстрого ремонта (20 ватт)., Синий), E3 CCbetter Мини-термоплавкий клеевой пистолет с набором для высокотемпературного плавления клеевого пистолета по 25 штук. Клеевой пистолет Гибкий спусковой крючок для небольших ремесленных проектов, герметизации и быстрого ремонта (20 Вт, синий), E4 .,,,

  5. Я хочу записать эту сетку (структура в шаге 4) в таблицу Bigquery

Я новичок в Dataflow, поэтому любая помощь, фрагмент кода или весь исходный код или ссылка высоко ценится

1 ответ

Вы должны начать с чтения одного из руководств по быстрому старту и взглянуть на некоторые примеры конвейеров.

Исходя из вашего описания, схема высокого уровня может быть:

  1. использование TextIO.read читать контент из GCS. Обратите внимание, что он не поддерживает игнорирование заголовка, поэтому вам, вероятно, придется обнаружить его и удалить самостоятельно.
  2. Напиши DoFn который использует API видения на URL из каждой строки файла. Вы могли бы даже разделить это на несколько DoFns - один для преобразования строки в URL, затем DoFn для использования API видения, затем DoFn для извлечения двух верхних тегов.
  3. Напиши другой DoFn или серия DoFns, который выполняет конкатенацию и использует NL API.
  4. Напиши другой DoFn или серия DoFns, которые генерируют строки с желаемым форматом вывода как TableRows.
  5. Использовать BigQueryIO.write преобразовать, чтобы написать те в BigQuery.