Ошибка при запуске Apache Beam Python SplittableDoFn
Ошибка при попытке pubsub io> splittable dofn
RuntimeError: узел преобразования AppliedPTransform(ParDo(TestDoFn)/ProcessKeyedElements/GroupByKey/GroupByKey, _GroupByKeyOnly) не был заменен должным образом.
Может ли кто-нибудь помочь мне с проверкой кода на предмет того, что я делаю неправильно?
Код
"""
python examples/test_restriction_unbounded.py --project mk2 --topic projects/mk2/topics/testing
"""
# pytype: skip-file
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import argparse
import csv
import logging
import sys
import time
from datetime import datetime
import apache_beam as beam
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.restriction_trackers import OffsetRestrictionTracker, OffsetRange
from apache_beam.transforms.core import RestrictionProvider
class TestProvider(RestrictionProvider):
def initial_restriction(self, element):
return OffsetRange(0, 1)
def create_tracker(self, restriction):
return OffsetRestrictionTracker(restriction)
def restriction_size(self, element, restriction):
return restriction.size()
class TestDoFn(beam.DoFn):
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(
TestProvider())):
import pdb; pdb.set_trace()
cur = restriction_tracker.current_restriction().start
while restriction_tracker.try_claim(cur):
return element
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument('--topic', type=str, help='Pub/Sub topic to read from')
args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
options.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=options) as p:
# data = ['abc', 'defghijklmno', 'pqrstuv', 'wxyz']
# actual = (p | beam.Create(data) | beam.ParDo(ExpandingStringsDoFn()))
scores = p | beam.io.ReadFromPubSub(topic=args.topic) | beam.ParDo(TestDoFn())
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
2 ответа
Вы загружаете данные из pub / sub путем обработки паром. Затем вы должны создать пакеты по окну, прежде чем применять такие преобразования: (ParDo(TestDoFn) / ProcessKeyedElements / GroupByKey / GroupByKey, _GroupByKeyOnly)
Pub / Sub с примером окна: https://cloud.google.com/pubsub/docs/pubsub-dataflow
Попробуйте сделать так:
class GroupWindowsIntoBatches(beam.PTransform):
"""A composite transform that groups Pub/Sub messages
"""
def __init__(self, window_size):
# Convert minutes into seconds.
self.window_size = int(window_size * 60)
def expand(self, pcoll):
return (
pcoll
# Assigns window info to each Pub/Sub message based on its
# publish timestamp.
| "Window into Fixed Intervals"
>> beam.WindowInto(window.FixedWindows(self.window_size))
)
def run(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
parser.add_argument('--topic', type=str, help='Pub/Sub topic to read from')
args, pipeline_args = parser.parse_known_args(argv)
options = PipelineOptions(pipeline_args)
options.view_as(StandardOptions).streaming = True
window_size = 1.0
with beam.Pipeline(options=options) as p:
scores = (p
| beam.io.ReadFromPubSub(topic=args.topic)
| "WindowInto" >> GroupWindowsIntoBatches(window_size)
| beam.ParDo(TestDoFn())
)
У меня была такая же ошибка. Удаление опции потоковой передачи решило проблему для меня.