Разница между beam.ParDo и beam.Map в типе вывода?

Я использую Apache-Beam для запуска некоторых преобразований данных, включая извлечение данных из txt, csv и различных источников данных. Одна вещь, которую я заметил, это разница результатов при использовании beam.Map и beam.ParDo

В следующем примере:

Я читаю данные в формате csv и в первом случае передаю их в DoFn, используя beam.ParDo, который извлекает первый элемент, являющийся датой, а затем печатает его. Во втором случае я напрямую использую beam.Map, чтобы сделать то же самое, а затем распечатать его.

class Printer(beam.DoFn):
    def process(self,data_item):
        print data_item

class DateExtractor(beam.DoFn):
    def process(self,data_item):
        return (str(data_item).split(','))[0]

data_from_source = (p
                    | 'ReadMyFile 01' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.ParDo 01' >> beam.ParDo(DateExtractor())
                    | 'Printer the data 01' >> beam.ParDo(Printer())
                    )

copy_of_the_data =  (p
                    | 'ReadMyFile 02' >> ReadFromText('./input/data.csv')
                    | 'Splitter using beam.Map 02' >> beam.Map(lambda record: (record.split(','))[0])
                    | 'Printer the data 02' >> beam.ParDo(Printer())
                    )

В двух выходах я заметил следующее:

##With beam.ParDo##
2
0
1
7
-
0
4
-
0
3
2
0
1
7

##With beam.Map##
2017-04-03
2017-04-03
2017-04-10
2017-04-10
2017-04-11
2017-04-12
2017-04-12

которые задушены?? Мне интересно, если проблема в функции печати? Но после использования разных преобразований он показывает одинаковые результаты. Как пример работает:

| 'Group it 01' >> beam.Map(lambda record: (record, 1))

который все еще возвращает ту же проблему:

##With beam.ParDo##
('8', 1)
('2', 1)
('0', 1)
('1', 1)

##With beam.Map##
(u'2017-04-08', 1)
(u'2017-04-08', 1)
(u'2017-04-09', 1)
(u'2017-04-09', 1)

Есть идеи, в чем причина? Чего мне не хватает в разнице между beam.Map и beam.ParDo???

1 ответ

Короткий ответ

Вам нужно обернуть возвращаемое значение ParDo в список.

Более длинная версия

ParDos в общем случае может возвращать любое количество выходов для одного входа, т. е. для одной входной строки вы можете выдать ноль, один или несколько результатов. По этой причине Beam SDK обрабатывает выходные данные ParDo как не один элемент, а набор элементов.

В вашем случае ParDo испускает одну строку вместо коллекции. Beam Python SDK все еще пытается интерпретировать вывод этого ParDo как будто это была коллекция элементов. И делает это, интерпретируя строку, которую вы выпустили, как набор символов. Из-за этого ваш ParDo теперь эффективно создает поток отдельных символов, а не поток строк.

Что вам нужно сделать, это обернуть возвращаемое значение в список:

class DateExtractor(beam.DoFn):
    def process(self,data_item):
        return [(str(data_item).split(','))[0]]

Обратите внимание на квадратные скобки. Смотрите руководство по программированию для большего количества примеров.

Map с другой стороны, можно рассматривать как частный случай ParDo, Map Ожидается, что будет производить ровно один вывод для каждого входа. Так что в этом случае вы можете просто вернуть одно значение из лямбды, и оно работает как положено.

И вам, вероятно, не нужно оборачивать data_item в str, Согласно документам ReadFromText преобразование производит строки.

Другие вопросы по тегам