Apache Beam - Bigquery Upsert

У меня есть задание потока данных, которое разбивает один файл на x записей (таблиц). Они без проблем попадают в bigquery.

Что я обнаружил, так это то, что после получения результатов не было возможности выполнить следующий этап конвейера.

Например

# Collection1- filtered on first two characters = 95
collection1 = (
    rows    | 'Build pCollection1' >> beam.Filter(lambda s: data_ingestion.filterRowCollection(s, '95'))
            | 'p1 Entities to JSON' >> beam.Map(lambda s: data_ingestion.SplitRowDict(s, '95'))
            | 'Load p1 to BIGQUERY' >> beam.io.WriteToBigQuery(
                    data_ingestion.spec1,
                    schema=parse_table_schema_from_json(data_ingestion.getBqSchema('95')),
                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) # Write to Bigquery
            )

# Collection2 - filtered on first two characters = 99
collection2 = (
    rows    | 'Build pCollection2' >> beam.Filter(lambda s: data_ingestion.filterRowCollection(s, '99'))
            | 'p2 Split Entities to JSON' >> beam.Map(lambda s: data_ingestion.SplitRowDict(s, '99'))
            | 'Load p2 to BIGQUERY' >> beam.io.WriteToBigQuery(
                    data_ingestion.spec2,
                    schema=parse_table_schema_from_json(data_ingestion.getBqSchema('99')),
                    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
                    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) # Write to Bigquery)

Следуя вышесказанному, я хотел бы запустить что-то вроде следующего:

final_output = (
    collection1, collection2
       | 'Log Completion' >> beam.io.WriteToPubSub('<topic>'))

Есть ли способ запустить другую часть конвейера после обновления до bigquery или это невозможно? Заранее спасибо.

1 ответ

Технически нет возможности сделать именно то, что вы просили. beam.io.WriteToBigquery потребляет pCollection ничего не оставив.

Однако скопировать ввод в beam.io.WriteToBigquery в parDo прямо перед тем, как позвонить beam.io.WriteToBigqueryи отправлять копии вашего pCollection по каждому пути. См. Этот ответ, который ссылается на этот образец doFn из документов

Другие вопросы по тегам