Задание потока данных Google, которое читает из Pubsub и записывает в GCS, выполняется очень медленно (WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards).
В настоящее время у нас есть задание потока данных, которое читает из pubsub и записывает файл avro с помощью FileIO.writeDynamic в GCS, и когда мы тестируем, скажем, 10000 событий в секунду, мы не можем обрабатывать быстрее, поскольку WriteFiles /WriteShardedBundlesToTempFiles/GroupIntoShards очень медленный. Ниже приведен фрагмент, который мы используем для написания. Как мы можем улучшить
PCollection<Event> windowedWrites = input.apply("Global Window", Window.<Event>into(new GlobalWindows())
.triggering(Repeatedly.forever(
AfterFirst.of(AfterPane.elementCountAtLeast(50000),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DurationUtils
.parseDuration(windowDuration))))).discardingFiredPanes());
return windowedWrites
.apply("WriteToAvroGCS", FileIO.<EventDestination, Five9Event>writeDynamic()
.by(groupFn)
.via(outputFn, Contextful.fn(
new SinkFn()))
.withTempDirectory(avroTempDirectory)
.withDestinationCoder(destinationCoder)
.withNumShards(1).withNaming(namingFn));
Мы используем собственное именование файлов, скажем, в формате gs://tenantID.<>/ Eventname/dddd-mm-dd/
2 ответа
Как упоминалось в комментариях, проблема, вероятно,
withNumShards(1)
который заставляет все происходить на одном рабочем.
Как сказал Роберт, при использовании
withNumShards(1)
Dataflow/Beam не может распараллеливать запись, делая это одним и тем же воркером. Когда пучки относительно высокие, это сильно влияет на производительность конвейера. Я привел пример, чтобы продемонстрировать это:
Я запустил 3 конвейера, которые генерируют много элементов (~2 ГБ), три из них с 10
n1-standard-1
рабочие, но с 1 осколком, 10 осколками и 0 осколками (поток данных выберет количество осколков). Вот как они себя ведут:
Мы видим большую разницу между общим временем 0 или 10 Shard против 1 Shard. Если мы перейдем к задаче с 1 шардом, то увидим, что что-то делал только один воркер (я отключил автомасштабирование):
Как упоминал Реза, это происходит потому, что все элементы нужно перетасовать в один и тот же воркер, чтобы он записал 1 осколок.
Обратите внимание, что мой пример - это пакетная обработка, которая отличается от потоковой передачи в отношении потоковой передачи, но ее влияние на производительность конвейера достаточно похоже (на самом деле, в потоковой передаче это может быть даже хуже).
Здесь у вас есть код Python, поэтому вы можете проверить это самостоятельно:
p = beam.Pipeline(options=pipeline_options)
def long_string_generator():
string = "Apache Beam is an open source, unified model for defining " \
"both batch and streaming data-parallel processing " \
"pipelines. Using one of the open source Beam SDKs, " \
"you build a program that defines the pipeline. The pipeline " \
"is then executed by one of Beam’s supported distributed " \
"processing back-ends, which include Apache Flink, Apache " \
"Spark, and Google Cloud Dataflow. "
word_choice = random.sample(string.split(" "), 20)
return " ".join(word_choice)
def generate_elements(element, amount=1):
return [(element, long_string_generator()) for _ in range(amount)]
(p | Create(range(1500))
| beam.FlatMap(generate_elements, amount=10000)
| WriteToText(known_args.output, num_shards=known_args.shards))
p.run()