Apache Beam - Bigquery Upsert
У меня есть задание потока данных, которое разбивает один файл на x записей (таблиц). Они без проблем попадают в bigquery.
Что я обнаружил, так это то, что после получения результатов не было возможности выполнить следующий этап конвейера.
Например
# Collection1- filtered on first two characters = 95
collection1 = (
rows | 'Build pCollection1' >> beam.Filter(lambda s: data_ingestion.filterRowCollection(s, '95'))
| 'p1 Entities to JSON' >> beam.Map(lambda s: data_ingestion.SplitRowDict(s, '95'))
| 'Load p1 to BIGQUERY' >> beam.io.WriteToBigQuery(
data_ingestion.spec1,
schema=parse_table_schema_from_json(data_ingestion.getBqSchema('95')),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) # Write to Bigquery
)
# Collection2 - filtered on first two characters = 99
collection2 = (
rows | 'Build pCollection2' >> beam.Filter(lambda s: data_ingestion.filterRowCollection(s, '99'))
| 'p2 Split Entities to JSON' >> beam.Map(lambda s: data_ingestion.SplitRowDict(s, '99'))
| 'Load p2 to BIGQUERY' >> beam.io.WriteToBigQuery(
data_ingestion.spec2,
schema=parse_table_schema_from_json(data_ingestion.getBqSchema('99')),
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) # Write to Bigquery)
Следуя вышесказанному, я хотел бы запустить что-то вроде следующего:
final_output = (
collection1, collection2
| 'Log Completion' >> beam.io.WriteToPubSub('<topic>'))
Есть ли способ запустить другую часть конвейера после обновления до bigquery или это невозможно? Заранее спасибо.
1 ответ
Технически нет возможности сделать именно то, что вы просили.
beam.io.WriteToBigquery
потребляет
pCollection
ничего не оставив.
Однако скопировать ввод в
beam.io.WriteToBigquery
в
parDo
прямо перед тем, как позвонить
beam.io.WriteToBigquery
и отправлять копии вашего pCollection по каждому пути. См. Этот ответ, который ссылается на этот образец
doFn
из документов