Задание потока данных Google, которое читает из Pubsub и записывает в GCS, выполняется очень медленно (WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards).

В настоящее время у нас есть задание потока данных, которое читает из pubsub и записывает файл avro с помощью FileIO.writeDynamic в GCS, и когда мы тестируем, скажем, 10000 событий в секунду, мы не можем обрабатывать быстрее, поскольку WriteFiles /WriteShardedBundlesToTempFiles/GroupIntoShards очень медленный. Ниже приведен фрагмент, который мы используем для написания. Как мы можем улучшить

PCollection<Event> windowedWrites = input.apply("Global Window", Window.<Event>into(new GlobalWindows())
        .triggering(Repeatedly.forever(
            AfterFirst.of(AfterPane.elementCountAtLeast(50000),
                AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DurationUtils
                    .parseDuration(windowDuration))))).discardingFiredPanes());

        return windowedWrites
                        .apply("WriteToAvroGCS", FileIO.<EventDestination, Five9Event>writeDynamic()
                                        .by(groupFn)
                                        .via(outputFn, Contextful.fn(
                                                        new SinkFn()))
                                        .withTempDirectory(avroTempDirectory)
                                        .withDestinationCoder(destinationCoder)
                                        .withNumShards(1).withNaming(namingFn));

Мы используем собственное именование файлов, скажем, в формате gs://tenantID.<>/ Eventname/dddd-mm-dd/

2 ответа

Как упоминалось в комментариях, проблема, вероятно, withNumShards(1) который заставляет все происходить на одном рабочем.

Как сказал Роберт, при использовании withNumShards(1)Dataflow/Beam не может распараллеливать запись, делая это одним и тем же воркером. Когда пучки относительно высокие, это сильно влияет на производительность конвейера. Я привел пример, чтобы продемонстрировать это:

Я запустил 3 конвейера, которые генерируют много элементов (~2 ГБ), три из них с 10 n1-standard-1рабочие, но с 1 осколком, 10 осколками и 0 осколками (поток данных выберет количество осколков). Вот как они себя ведут:

Мы видим большую разницу между общим временем 0 или 10 Shard против 1 Shard. Если мы перейдем к задаче с 1 шардом, то увидим, что что-то делал только один воркер (я отключил автомасштабирование):

Как упоминал Реза, это происходит потому, что все элементы нужно перетасовать в один и тот же воркер, чтобы он записал 1 осколок.

Обратите внимание, что мой пример - это пакетная обработка, которая отличается от потоковой передачи в отношении потоковой передачи, но ее влияние на производительность конвейера достаточно похоже (на самом деле, в потоковой передаче это может быть даже хуже).

Здесь у вас есть код Python, поэтому вы можете проверить это самостоятельно:

           p = beam.Pipeline(options=pipeline_options)

    def long_string_generator():
        string = "Apache Beam is an open source, unified model for defining " \
                 "both batch and streaming data-parallel processing " \
                 "pipelines. Using one of the open source Beam SDKs, " \
                 "you build a program that defines the pipeline. The pipeline " \
                 "is then executed by one of Beam’s supported distributed " \
                 "processing back-ends, which include Apache Flink, Apache " \
                 "Spark, and Google Cloud Dataflow. "

        word_choice = random.sample(string.split(" "), 20)

        return " ".join(word_choice)

    def generate_elements(element, amount=1):
        return [(element, long_string_generator()) for _ in range(amount)]

    (p | Create(range(1500))
       | beam.FlatMap(generate_elements, amount=10000)
       | WriteToText(known_args.output, num_shards=known_args.shards))

    p.run()
Другие вопросы по тегам