Обработка отказов в Dataflow/Apache Beam через зависимые конвейеры
У меня есть конвейер, который получает данные из BigQuery и записывает их в GCS, однако, если я нахожу какие-либо отклонения, я хочу направить их в таблицу BigQuery. Я собираю отклонения в глобальную переменную списка, а затем загружаю список в таблицу BigQuery. Этот процесс отлично работает, когда я запускаю его локально, так как конвейеры работают в правильном порядке. Когда я запускаю его с помощью dataflowrunner, он не гарантирует порядок (я хочу, чтобы конвейер 1 запускался перед конвейером 2. Есть ли способ иметь зависимые конвейеры в потоке данных с использованием python? Или Также, пожалуйста, предложите, можно ли это решить с помощью лучшего подхода. Заранее спасибо.
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
data = (pipeline1
| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
| 'combine output to list' >> beam.combiners.ToList()
| 'tranform' >> beam.Map(lambda x: somefunction) # Collecting rejects in the except block of this method to a global list variable
....etc
| 'to gcs' >> beam.io.WriteToText(output)
)
# Loading the rejects gathered in the above pipeline to Biquery
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline2:
rejects = (pipeline2
| 'create pipeline' >> beam.Create(reject_list)
| 'to json format' >> beam.Map(lambda data: {.....})
| 'to bq' >> beam.io.WriteToBigQuery(....)
)
1 ответ
Вы можете сделать что-то подобное, но только с одним конвейером и некоторым дополнительным кодом в преобразовании.
В
beam.Map(lambda x: somefunction)
должен иметь два выхода: тот, который записывается в GCS, и отклоненные элементы, которые в конечном итоге будут записаны в BigQuery.
Для этого ваша функция преобразования должна вернуть
TaggedOutput
.
В Руководстве по программированию Beam есть пример: https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn
Секунда
PCollection
, вы можете написать в BigQuery.
Вам не нужно иметь
Create
в этой второй ветви конвейера.
Конвейер будет примерно таким:
with beam.Pipeline(options=PipelineOptions(pipeline_args)) as pipeline1:
data = (pipeline1
| 'get data' >> beam.io.Read(beam.io.BigQuerySource(query=...,use_standard_sql=True))
| 'combine output to list' >> beam.combiners.ToList()
| 'tranform' >> beam.Map(transform) # Tagged output produced here
pcoll_to_gcs = data.gcs_output
pcoll_to_bq = data.rejected
pcoll_to_gcs | "to gcs" >> beam.io.WriteToText(output)
pcoll_to_bq | "to bq" >> beam.io.WriteToBigQuery(....)
Затем
transform
функция будет примерно такой
def transform(element):
if something_is_wrong_with_element:
yield pvalue.TaggedOutput('rejected', element)
transformed_element = ....
yield pvalue.TaggedOutput('gcs_output', transformed_element)