Обработка отказов в Dataflow/Apache Beam через зависимые конвейеры

У меня есть конвейер, который получает данные из BigQuery и записывает их в GCS, однако, если я нахожу какие-либо отклонения, я хочу направить их в таблицу BigQuery. Я собираю отклонения в глобальную переменную списка, а затем загружаю список в таблицу BigQuery. Этот процесс отлично работает, когда я запускаю его локально, так как конвейеры работают в правильном порядке. Когда я запускаю его с помощью dataflowrunner, он не гарантирует порядок (я хочу, чтобы конвейер 1 запускался перед конвейером 2. Есть ли способ иметь зависимые конвейеры в потоке данных с использованием python? Или Также, пожалуйста, предложите, можно ли это решить с помощью лучшего подхода. Заранее спасибо.

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
 
    data = (pipeline1
               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
               | 'combine output to list' >> beam.combiners.ToList()
               | 'tranform' >> beam.Map(lambda x: somefunction)  # Collecting rejects in the except block of this method to a global list variable
               ....etc
               | 'to gcs' >> beam.io.WriteToText(output)
               )

# Loading the rejects gathered in the above pipeline to Biquery
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:
    rejects = (pipeline2
                    | 'create pipeline' >> beam.Create(reject_list)
                    | 'to json format' >> beam.Map(lambda data: {.....})
                    | 'to bq' >> beam.io.WriteToBigQuery(....)
                    )

1 ответ

Решение

Вы можете сделать что-то подобное, но только с одним конвейером и некоторым дополнительным кодом в преобразовании.

В beam.Map(lambda x: somefunction) должен иметь два выхода: тот, который записывается в GCS, и отклоненные элементы, которые в конечном итоге будут записаны в BigQuery.

Для этого ваша функция преобразования должна вернуть TaggedOutput.

В Руководстве по программированию Beam есть пример: https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn

Секунда PCollection, вы можете написать в BigQuery.

Вам не нужно иметь Create в этой второй ветви конвейера.

Конвейер будет примерно таким:

with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
 
    data = (pipeline1
               | 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
               | 'combine output to list' >> beam.combiners.ToList()
               | 'tranform' >> beam.Map(transform)  # Tagged output produced here

    pcoll_to_gcs = data.gcs_output
    pcoll_to_bq  = data.rejected

    pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)
    pcoll_to_bq  | "to bq" >> beam.io.WriteToBigQuery(....)

Затем transform функция будет примерно такой

def transform(element):
  if something_is_wrong_with_element:
    yield pvalue.TaggedOutput('rejected', element)

  transformed_element = ....

  yield pvalue.TaggedOutput('gcs_output', transformed_element)
Другие вопросы по тегам