Как объединить два результата и передать их следующему шагу в конвейере apache-beam
См. Ниже фрагмент кода, я хочу ["metric1", "metric2"]
быть моим вкладом в RunTask.process. Однако он был запущен дважды с "metric1" и "metric2" соответственно.
def run():
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=pipeline_options)
root = p | 'Get source' >> beam.Create([
"source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
])
metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]
metric3 = (metric1, metric2) | beam.Flatten() | beam.ParDo(RunTask()) # I want ["metric1", "metric2"] to be my input for RunTask.process. However it was run twice with "metric1" and "metric2" respectively
2 ответа
Я понимаю, что вы хотите объединить две коллекции PCollections так, чтобы они следовали этому синтаксису: ['element1','element2']. Для этого вы можете использовать CoGroupByKey() вместо Flatten().
Учитывая ваш фрагмент кода, синтаксис будет:
def run():
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=pipeline_options)
root = p | 'Get source' >> beam.Create([
"source_name" # maybe ["source_name"] makes more sense since my process function takes an array as an input?
])
metric1 = root | "compute1" >> beam.ParDo(RunLongCompute(myarg="1")) #let's say it returns ["metic1"]
metric2 = root | "compute2" >> beam.ParDo(RunLongCompute(myarg="2")) #let's say it returns ["metic2"]
metric3 = (
(metric1, metric2)
| beam.CoGroupByKey()
| beam.ParDo(RunTask())
)
Хочу отметить разницу между Flatten() и CoGroupByKey().
1) Flatten() получает две или более коллекции PCollections, которые хранят один и тот же тип данных, и объединяет их в одну логическую коллекцию PCollection. Например,
import apache_beam as beam
from apache_beam import Flatten, Create, ParDo, Map
p = beam.Pipeline()
adress_list = [
('leo', 'George St. 32'),
('ralph', 'Pyrmont St. 30'),
('mary', '10th Av.'),
('carly', 'Marina Bay 1'),
]
city_list = [
('leo', 'Sydney'),
('ralph', 'Sydney'),
('mary', 'NYC'),
('carly', 'Brisbane'),
]
street = p | 'CreateEmails' >> beam.Create(adress_list)
city = p | 'CreatePhones' >> beam.Create(city_list)
resul =(
(street,city)
|beam.Flatten()
|ParDo(print)
)
p.run()
И на выходе,
('leo', 'George St. 32')
('ralph', 'Pyrmont St. 30')
('mary', '10th Av.')
('carly', 'Marina Bay 1')
('leo', 'Sydney')
('ralph', 'Sydney')
('mary', 'NYC')
('carly', 'Brisbane')
Обратите внимание, что обе коллекции PCollections находятся на выходе. Однако одно добавляется к другому.
2) CoGroupByKey() выполняет реляционное соединение между двумя или более ключевыми значениями PCollections, которые имеют один и тот же тип ключа. Используя этот метод, вы выполните объединение по ключу, а не добавление, как это сделано в Flatten(). Ниже приведен пример,
import apache_beam as beam
from apache_beam import Flatten, Create, ParDo, Map
p = beam.Pipeline()
address_list = [
('leo', 'George St. 32'),
('ralph', 'Pyrmont St. 30'),
('mary', '10th Av.'),
('carly', 'Marina Bay 1'),
]
city_list = [
('leo', 'Sydney'),
('ralph', 'Sydney'),
('mary', 'NYC'),
('carly', 'Brisbane'),
]
street = p | 'CreateEmails' >> beam.Create(address_list)
city = p | 'CreatePhones' >> beam.Create(city_list)
results = (
(street, city)
| beam.CoGroupByKey()
|ParDo(print)
#| beam.io.WriteToText('delete.txt')
)
p.run()
И на выходе,
('leo', (['George St. 32'], ['Sydney']))
('ralph', (['Pyrmont St. 30'], ['Sydney']))
('mary', (['10th Av.'], ['NYC']))
('carly', (['Marina Bay 1'], ['Brisbane']))
Обратите внимание, что вам нужен первичный ключ, чтобы присоединиться к результатам. Кроме того, этот вывод - то, что вы ожидаете в вашем случае.
В качестве альтернативы используйте боковой ввод:
metrics3 = metric1 | beam.ParDo(RunTask(), metric2=beam.pvalue.AsIter(metric2))
в процессе RunTask ():
def process(self, element_from_metric1, metric2):
...