Apache Beam с DirectRunner (SUBPROCESS_SDK) использует только одного воркера, как заставить его использовать всех доступных воркеров?

Следующий код:

def get_pipeline(workers):
    pipeline_options = PipelineOptions(['--direct_num_workers', str(workers)])
    return beam.Pipeline(options=pipeline_options,
                         runner=fn_api_runner.FnApiRunner(
                             default_environment=beam_runner_api_pb2.Environment(
                                 urn=python_urns.SUBPROCESS_SDK,
                                 payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
                                         % sys.executable.encode('ascii'))))

with get_pipeline(4) as pipeline:
  _ = (  
        pipeline
        | 'ReadTestData' >> beam.io.ReadFromParquet(input_files, columns=all_columns)
        | "write" >> beam.io.WriteToText("/tmp/txt2")
  )

использует только одного воркера из 4 доступных и генерирует только один большой выходной файл (хотя входных файлов много).

Как заставить конвейер Beam работать параллельно, т.е. как заставить каждый входной файл обрабатываться отдельно другим рабочим?

1 ответ

Какую версию луча вы используете?

У меня такая же проблема с лучом 2.16.0, но версия 2.17.0, похоже, имеет ожидаемое поведение.

Вместо этого вы можете попробовать эту версию, сохранив код как есть.

Другие вопросы по тегам