Как читать таблицу BigQuery с использованием конвейерного кода Python в GCP Dataflow
Может кто-нибудь, пожалуйста, поделитесь синтаксисом для чтения / записи таблицы больших запросов в конвейере, написанном на python для GCP Dataflow
3 ответа
Запустить поток данных
Во-первых, построить Pipeline
со следующими параметрами для запуска на GCP DataFlow:
import apache_beam as beam
options = {'project': <project>,
'runner': 'DataflowRunner',
'region': <region>,
'setup_file': <setup.py file>}
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
pipeline = beam.Pipeline(options = pipeline_options)
Читать из BigQuery
Определить BigQuerySource
с вашим запросом и использовать beam.io.Read
читать данные из BQ:
BQ_source = beam.io.BigQuerySource(query = <query>)
BQ_data = pipeline | beam.io.Read(BQ_source)
Написать в BigQuery
Существует два варианта записи в bigquery:
использовать
BigQuerySink
а такжеbeam.io.Write
:BQ_sink = beam.io.BigQuerySink(<table>, dataset=<dataset>, project=<project>) BQ_data | beam.io.Write(BQ_sink)
использование
beam.io.WriteToBigQuery
:BQ_data | beam.io.WriteToBigQuery(<table>, dataset=<dataset>, project=<project>)
Чтение из Bigquery
rows = (p | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))
писать в Bigquery
rows | 'writeToBQ' >> beam.io.Write(
beam.io.BigQuerySink('{}:{}.{}'.format(PROJECT, BQ_DATASET_ID, BQ_TEST), schema='CONVERSATION:STRING, LEAD_ID:INTEGER', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import StaticValueProvider
# Set the necessary pipeline options, such as project and job name
pipeline_options = PipelineOptions(
project='your-project-id',
job_name='dataflow-job',
staging_location='gs://your-bucket/staging',
temp_location='gs://your-bucket/temp',
runner='DataflowRunner'
)
# Create a pipeline object using the options
p = beam.Pipeline(options=pipeline_options)
# Define a function to read data from BigQuery
def read_from_bigquery():
return (p
| 'Read from BigQuery' >> beam.io.ReadFromBigQuery(
query='SELECT * FROM `your-project-id.your-dataset.source_table`',
use_standard_sql=True)
)
# Define a function to write data to BigQuery
def write_to_bigquery(data):
return (data
| 'Write to BigQuery' >> beam.io.WriteToBigQuery(
table='your-project-id.your-dataset.target_table',
schema='column_1:string,column_2:integer,column_3:boolean',
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
)
# Define your data processing logic
data = (p
| 'Read Data' >> beam.Create(['dummy']) # Create a dummy input element
| 'Trigger Read' >> beam.FlatMap(lambda x: read_from_bigquery())
| 'Process Data' >> beam.Map(lambda row: (row['column_1'], row['column_2'], row['column_3']))
)
# Write the processed data to BigQuery
write_to_bigquery(data)
# Run the pipeline
p.run()