Как читать таблицу BigQuery с использованием конвейерного кода Python в GCP Dataflow

Может кто-нибудь, пожалуйста, поделитесь синтаксисом для чтения / записи таблицы больших запросов в конвейере, написанном на python для GCP Dataflow

3 ответа

Запустить поток данных

Во-первых, построить Pipeline со следующими параметрами для запуска на GCP DataFlow:

import apache_beam as beam

options = {'project': <project>,
           'runner': 'DataflowRunner',
           'region': <region>,
           'setup_file': <setup.py file>}
pipeline_options = beam.pipeline.PipelineOptions(flags=[], **options)
pipeline = beam.Pipeline(options = pipeline_options)

Читать из BigQuery

Определить BigQuerySource с вашим запросом и использовать beam.io.Read читать данные из BQ:

BQ_source = beam.io.BigQuerySource(query = <query>)
BQ_data = pipeline | beam.io.Read(BQ_source)

Написать в BigQuery

Существует два варианта записи в bigquery:

  • использовать BigQuerySink а также beam.io.Write:

    BQ_sink = beam.io.BigQuerySink(<table>, dataset=<dataset>, project=<project>)
    BQ_data | beam.io.Write(BQ_sink)
    
  • использование beam.io.WriteToBigQuery:

    BQ_data | beam.io.WriteToBigQuery(<table>, dataset=<dataset>, project=<project>)
    

Чтение из Bigquery

rows = (p | 'ReadFromBQ' >> beam.io.Read(beam.io.BigQuerySource(query=QUERY, use_standard_sql=True))

писать в Bigquery

rows | 'writeToBQ' >> beam.io.Write(
beam.io.BigQuerySink('{}:{}.{}'.format(PROJECT, BQ_DATASET_ID, BQ_TEST), schema='CONVERSATION:STRING, LEAD_ID:INTEGER', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
      import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import StaticValueProvider

# Set the necessary pipeline options, such as project and job name
pipeline_options = PipelineOptions(
    project='your-project-id',
    job_name='dataflow-job',
    staging_location='gs://your-bucket/staging',
    temp_location='gs://your-bucket/temp',
    runner='DataflowRunner'
)

# Create a pipeline object using the options
p = beam.Pipeline(options=pipeline_options)

# Define a function to read data from BigQuery
def read_from_bigquery():
    return (p
            | 'Read from BigQuery' >> beam.io.ReadFromBigQuery(
                query='SELECT * FROM `your-project-id.your-dataset.source_table`',
                use_standard_sql=True)
            )

# Define a function to write data to BigQuery
def write_to_bigquery(data):
    return (data
            | 'Write to BigQuery' >> beam.io.WriteToBigQuery(
                table='your-project-id.your-dataset.target_table',
                schema='column_1:string,column_2:integer,column_3:boolean',
                write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
            )

# Define your data processing logic
data = (p
        | 'Read Data' >> beam.Create(['dummy'])  # Create a dummy input element
        | 'Trigger Read' >> beam.FlatMap(lambda x: read_from_bigquery())
        | 'Process Data' >> beam.Map(lambda row: (row['column_1'], row['column_2'], row['column_3']))
        )

# Write the processed data to BigQuery
write_to_bigquery(data)

# Run the pipeline
p.run()

Другие вопросы по тегам