Apache Beam Python > 2.38.0 DirectRunner ~ AssertionError: всего N пакетов с водяными знаками не были выполнены
При использовании Python 3.9 и Apache Beam 2.38.0 минимальный рабочий пример ниже работает нормально.
Однако когда я использую Apache Beam 2.39.0 (или 2.44.0), пример завершается с ошибкой.AssertionError: A total of 2 watermark-pending bundles did not execute.
. Когда я переключаю ведение журнала наDEBUG
, я вижу сообщения видаUnable to add bundle for stage
вместе сStage input watermark: Timestamp(-9223372036854.775000)
(т.е.timestamp.MIN_TIMESTAMP
) иBundle schedule watermark: Timestamp(9223372036854.775000)
(т.е.timestamp.MAX_TIMESTAMP
) для двух пакетов.
import logging
import apache_beam as beam
def setup_logging():
log_format = '[%(asctime)-15s] [%(name)s] [%(levelname)s]: %(message)s'
logging.basicConfig(format=log_format, level=logging.INFO)
logging.info("Pipeline Started")
class CreateKvPCollectWithSideInputDoFn(beam.DoFn):
def __init__(self):
super().__init__()
def process(self, element, side_input):
print(f"side_input_type: {type(side_input)}")
yield "b", "2"
class CreateKvPCollectDoFn(beam.DoFn):
def __init__(self):
super().__init__()
def process(self, element):
yield "a", "1"
def main():
setup_logging()
pipeline = beam.Pipeline()
pcollect_input = (
pipeline
| "Input/Create" >> beam.Create(["input"])
)
kvpcollect_1 = (
pcollect_input | "PCollection_1" >> beam.ParDo(CreateKvPCollectDoFn())
)
beamdict_1 = beam.pvalue.AsDict(kvpcollect_1)
kvpcollect_2 = (
pcollect_input
| "PCollection_2" >> beam.ParDo(
CreateKvPCollectWithSideInputDoFn(), side_input=beamdict_1
)
)
kvpcollect_3 = (
(kvpcollect_1, kvpcollect_2)
| "Flatten" >> beam.Flatten()
)
beamdict_3 = beam.pvalue.AsDict(kvpcollect_3)
(
pcollect_input
| "UseBeamDict_3" >> beam.ParDo(CreateKvPCollectWithSideInputDoFn(), side_input=beamdict_3)
| "PrintResult" >> beam.Map(print)
)
result = pipeline.run()
result.wait_until_finish()
if __name__ == '__main__':
main()
Я хотел бы знать, почему эта ошибка возникает в версиях Apache Beam Python выше 2.38.0 и есть ли способ ее избежать.