Компромисс между скоростью и памятью при разделении Apache Beam PCollection на две части

У меня есть PCollection, где каждый элемент является ключом, кортеж значений выглядит следующим образом: (key, (value1,..,value_n) )

Мне нужно разделить эту коллекцию PCollection на две ветви обработки.

Как всегда, мне нужно, чтобы весь конвейер работал как можно быстрее и использовал как можно меньше таранов.

На ум приходят две идеи:

Вариант 1. Разделите PColl с помощью DoFn с несколькими выходами

class SplitInTwo(beam.DoFn):

   def process(self, kvpair):
       key, values = kvpair
       
       yield beam.TaggedOutput('left', (key, values[0:2]))
       yield beam.TaggedOutput('right', (key, values[2:]))

class ProcessLeft(beam.DoFn):
   def process(self, kvpair):
       key,values = kvpair
       ...
       yield (key, results)

# class ProcessRight is similar to ProcessLeft

А затем построить такой конвейер

   splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
   left = splitme.left | beam.ParDo(ProcessLeft())
   right = splitme.right | beam.ParDo(ProcessRight())

Вариант 2: используйте два разных DoFn в исходной коллекции PCollection

Другой вариант - использовать два DoFn для чтения и обработки одной и той же коллекции PCollection. Просто используйте по одному для "левой" и "правой" части данных:

class ProcessLeft(beam.DoFn):

   def process(self, kvpair):
       key = kvpair[0]
       values = kvpair[0][0:2]
       ...
       yield (key,result)

# class ProcessRight is similar to ProcessLeft

Построение pipleline проще... (плюс вам не нужно отслеживать, какие выходные данные у вас есть):

   left = pcoll | beam.ParDo(ProcessLeft())
   right = pcoll| beam.ParDo(ProcessRight())

Но... это быстрее? потребуется меньше памяти, чем первый?

(Я думаю, что первый вариант может быть объединен бегуном, а не только бегуном потока данных).

1 ответ

Решение

В этом случае оба варианта будут объединены бегуном, поэтому оба варианта будут в некоторой степени похожи с точки зрения производительности. Если вы хотите перетасовать данные в отдельные рабочие процессы, то вариант 1 - ваш лучший выбор, поскольку сериализованная коллекция читается ProcessLeft и ProcessRight будет меньше.

   splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
   left = splitme.left | beam.Reshuffle() | beam.ParDo(ProcessLeft())
   right = splitme.right | beam.Reshuffle() | beam.ParDo(ProcessRight())

В Reshuffletransform гарантирует, что ваши данные будут записаны в промежуточную тасовку, а затем использованы ниже по потоку. Это нарушит слияние.

Другие вопросы по тегам