Побочный вывод в ParDo | Apache Beam Python SDK
Поскольку документация доступна только для JAVA, я не мог понять, что это значит.
В нем говорится: "Хотя ParDo всегда производит основную выходную PCollection (как возвращаемое значение из применения), вы также можете сделать так, чтобы ParDo производил любое количество дополнительных выходных PCollections. Если вы выберете несколько выходов, ваш ParDo вернет все выходные PCollections (включая основной вывод) связаны вместе. Например, в Java выходные PCollections связаны в типобезопасном PCollectionTuple."
Я понимаю, что означает объединенный пакет, но если я получаю тег в моем DoFn, он дает пакет с другими пустыми выходными данными на ходу и дает другие выходные данные, когда они встречаются в коде? или он ждет, когда все выходы будут готовы для ввода, и выводит их все вместе в связку?
В документации не так много ясности. Хотя я думаю, что это не ждет и просто дает, когда сталкиваются, но я все еще должен понять, что происходит.
1 ответ
Лучший способ ответить на этот вопрос - на примере. Этот пример доступен в Beam.
Предположим, что вы хотите запустить конвейер подсчета слов (например, подсчитать, сколько раз каждое слово появляется в документе). Для этого вам нужно разбить строки в файле на отдельные слова. Учтите, что вы также хотите рассчитывать длины слов индивидуально. Ваше преобразование расщепления будет выглядеть так:
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input) # Read in the file
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
split_lines_result = (lines
| beam.ParDo(SplitLinesToWordsFn()).with_outputs(
SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
main='words'))
short_words = split_lines_result['words']
character_count = split_lines_result[
SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT]
В этом случае каждый отличается PCollection
, с правильными элементами. DoFn
будет отвечать за разделение своих выходов, и он делает это, помечая элементы. Увидеть:
class SplitLinesToWordsFn(beam.DoFn):
OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
def process(self, element):
# yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged
# collection.
yield pvalue.TaggedOutput(
self.OUTPUT_TAG_CHARACTER_COUNT, len(element))
words = re.findall(r'[A-Za-z\']+', element)
for word in words:
# yield word to add it to the main collection.
yield word
Как вы можете видеть, для основного вывода вам не нужно помечать элементы, а для других выводов, которые вы делаете.