Компромисс между скоростью и памятью при разделении Apache Beam PCollection на две части
У меня есть PCollection, где каждый элемент является ключом, кортеж значений выглядит следующим образом:
(key, (value1,..,value_n) )
Мне нужно разделить эту коллекцию PCollection на две ветви обработки.
Как всегда, мне нужно, чтобы весь конвейер работал как можно быстрее и использовал как можно меньше таранов.
На ум приходят две идеи:
Вариант 1. Разделите PColl с помощью DoFn с несколькими выходами
class SplitInTwo(beam.DoFn):
def process(self, kvpair):
key, values = kvpair
yield beam.TaggedOutput('left', (key, values[0:2]))
yield beam.TaggedOutput('right', (key, values[2:]))
class ProcessLeft(beam.DoFn):
def process(self, kvpair):
key,values = kvpair
...
yield (key, results)
# class ProcessRight is similar to ProcessLeft
А затем построить такой конвейер
splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
left = splitme.left | beam.ParDo(ProcessLeft())
right = splitme.right | beam.ParDo(ProcessRight())
Вариант 2: используйте два разных DoFn в исходной коллекции PCollection
Другой вариант - использовать два DoFn для чтения и обработки одной и той же коллекции PCollection. Просто используйте по одному для "левой" и "правой" части данных:
class ProcessLeft(beam.DoFn):
def process(self, kvpair):
key = kvpair[0]
values = kvpair[0][0:2]
...
yield (key,result)
# class ProcessRight is similar to ProcessLeft
Построение pipleline проще... (плюс вам не нужно отслеживать, какие выходные данные у вас есть):
left = pcoll | beam.ParDo(ProcessLeft())
right = pcoll| beam.ParDo(ProcessRight())
Но... это быстрее? потребуется меньше памяти, чем первый?
(Я думаю, что первый вариант может быть объединен бегуном, а не только бегуном потока данных).
1 ответ
В этом случае оба варианта будут объединены бегуном, поэтому оба варианта будут в некоторой степени похожи с точки зрения производительности. Если вы хотите перетасовать данные в отдельные рабочие процессы, то вариант 1 - ваш лучший выбор, поскольку сериализованная коллекция читается
ProcessLeft
и
ProcessRight
будет меньше.
splitme = pcoll | beam.ParDo(SplitInTwo()).with_outputs('left','right')
left = splitme.left | beam.Reshuffle() | beam.ParDo(ProcessLeft())
right = splitme.right | beam.Reshuffle() | beam.ParDo(ProcessRight())
В
Reshuffle
transform гарантирует, что ваши данные будут записаны в промежуточную тасовку, а затем использованы ниже по потоку. Это нарушит слияние.