Конвейер GCP Dataflow работает быстрее в DirectRunner, чем в DataflowRunner.
Я новичок в работе с Dataflow (GCP). Я построил конвейер, который работает в режиме DirectRunner быстрее, чем в режиме DataflowRunner, я не знаю, как его можно улучшить. Конвейер считывает данные из нескольких таблиц в Bigquery и возвращает файл csv, а также получает даты в качестве параметров выполнения для фильтрации запроса:
def get_pipeline_options(pipeline_args):
pipeline_args = ['--%s=%s' % (k, v) for (k, v) in {
'project': PROJECT_ID,
'region': REGION,
'job_name': JOB_NAME,
'staging_location': STORAGE + 'STAGING_DIRECTORY',
'temp_location': STORAGE + 'TEMP_DIRECTORY',
}.items()] + pipeline_args
options = PipelineOptions(pipeline_args)
return options
class Reader(beam.DoFn):
import datetime
def __init__(self, fechaIni, fechaFin):
self.fechaIni = fechaIni
self.fechaFin = fechaFin
def process(self,text):
from google.cloud import bigquery
from datetime import datetime
dateIni = self.fechaIni.get()
dateEnd = self.fechaFin.get()
query = """
#A huge query from multiple tables with joins
"""
client = bigquery.Client()
query_job = client.query(query)
result_fields = query_job.result()
return result_fields
class CampaignOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument('--fechaIni', type=str)
parser.add_value_provider_argument('--fechaFin', type=str)
def run(argv=None, save_main_session=True):
"""The main function which creates the pipeline and runs it."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--output',
dest='output',
default='gs://mybucket/input_'+datetime.datetime.now().strftime('%Y%m%d'),
help='Output files.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--project=myproject',
'--staging_location=gs://mybucket',
'--region=us-central1',
'--temp_location=gs://gs://mybucket',
'--job_name=myjob'
])
pipeline_options = PipelineOptions(pipeline_args)
campaign_options = pipeline_options.view_as(CampaignOptions)
with beam.Pipeline(options=campaign_options) as pipeline:
r = (
pipeline
| 'Initialize'>> beam.Create([':-)' ])
| 'Read from BigQuery' >> beam.ParDo(Reader(campaign_options.fechaIni,campaign_options.fechaFin))
| 'Read values' >> beam.Map(lambda x: x.values())
| 'CSV format' >> beam.Map(lambda row: ','.join([str(column) for column in row]))
| 'Write' >>beam.io.WriteToText(num_shards=1, file_path_prefix = known_args.output )
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()