Задание потока данных из GCS в Pub / sub Максимальный размер пакета

Я использую шаблон потока данных по умолчанию GCS для Pub/Sub. входные файлы в облачном хранилище размером 300 МБ и 2-3 миллиона строк в каждой.

при запуске пакетного задания потока данных возникает следующая ошибка

Сообщение об ошибке от исполнителя: javax.naming.SizeLimitExceededException: размер сообщения Pub / Sub (1089680070) превышает максимальный размер пакета (7500000) org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Write$PubsubBoundedWriter.processElement(PubsubIO.java:1160)

из документации: Pub/Sub принимает максимум 1000 сообщений в пакете, а размер пакета не может превышать 10 мегабайт.

Означает ли это, что мне нужно разбить входные файлы на блоки по 10 МБ или на 1000 сообщений для публикации?

как рекомендуется загружать такие большие файлы (по 300 МБ каждый) в pubsub?

Заранее спасибо за помощь.

1 ответ

Это известное ограничение на стороне потока данных, на данный момент существует запрос функции для увеличения размера пакета. Используйте кнопку +1 и отметьте проблему, чтобы следить за ее развитием.

Я рекомендую вам проверить этот пост, где предлагается обходной путь. Важно учитывать, что этот обходной путь подразумевает модификацию текста облачного хранилища в шаблон Pub/Sub для реализации упомянутого здесь пользовательского преобразования.

С другой стороны, вы можете попробовать создать облачную функцию для разделения вашего файла перед обработкой Dataflow, я подумал что-то вроде:

  1. Создайте промежуточную корзину для загрузки больших файлов.
  2. Напишите облачную функцию, чтобы разделить ваши файлы и записать небольшие куски в другую корзину. Вы можете попробовать использовать для этого пакет Python filesplit.
  3. Запустите облачную функцию для запуска каждый раз, когда вы загружаете новый файл в "промежуточную" корзину с помощью триггеров Google Cloud Storage.
  4. После того, как файл был разделен на небольшие части, удалите большой файл из "промежуточной" корзины с той же функцией Cloud, чтобы избежать дополнительных затрат.
  5. Используйте шаблон потока данных Cloud Storage Text to Pub/Sub для обработки небольших фрагментов для второго сегмента.