Задание Google Dataflow зависает на неопределенное время при записи в Redis

Я использую Apache Beam для Dataflow через Python API, чтобы читать данные из Bigquery, обрабатывать их и выгружать в приемник Datastore.

Непосредственно перед записью в Datastore я создал функцию ParDo, которая записывает данные в Memorystore - управляемый Google сервис Redis. Поскольку в Python нет API-интерфейса Redis-приемника, я просто вызываю API-интерфейс Redis в ParDo.

К сожалению, довольно часто работа просто зависает до бесконечности, и мне приходится вручную ее останавливать. В то время как данные записываются в Datastore и Redis, из графика потока данных я заметил, что это только пара записей, которые застревают и оставляют работу в подвешенном состоянии.

В результате, когда работа с пятнадцатью 16-ядерными машинами остается работать в течение 9 часов (обычно работа выполняется в течение 30 минут), это приводит к огромным затратам.

Может быть, есть способ установить таймер, который остановит задание потока данных, если оно превысит лимит времени?

JobID: 2018-06-23_17_45_06-10997437951845491641

4 ответа

Мы посмотрели на задание 2018-06-23_17_45_06-10997437951845491641 и похоже, что некоторые элементы застряли в обработке, однако мы не смогли сказать, почему это произошло. Можете ли вы попробовать удалить redis или datastore write и попробовать еще раз? Было бы здорово, если бы вы могли создать заявку в службу поддержки, где мы могли бы попытаться отладить это с вами.

Может быть, есть способ установить таймер, который остановит задание потока данных, если оно превысит лимит времени?

К сожалению, ответ - нет, у Dataflow нет автоматического способа отменить задание по истечении определенного времени. Тем не менее, это можно сделать с помощью API. Можно подождать с помощью тайм-аута wait_until_finish (), а затем отменить () конвейер.

Подводя итог, с помощью ответа @ankitk это работает для меня (python 2.7, sdk 2.14):

pipe = beam.Pipeline(options=pipeline_options)
...  # main pipeline code
run = pipe.run()  # doesn't do anything
run.wait_until_finish(duration=3600000)  # (ms) actually starts a job
run.cancel()  # cancels if can be cancelled

Таким образом, в случае, если работа была успешно завершена в течение времени в wait_until_finished() тогда cancel() просто напечатает предупреждение "уже закрыто", иначе будет закрыто выполняющееся задание.

PS если вы попытаетесь распечатать состояние задания

state = run.wait_until_finish(duration=3600000)
logging.info(state)

это будет RUNNING для работы, которая не была завершена в wait_until_finished(), а также DONE за готовую работу.

Примечание: этот метод не будет работать при запуске Beam из задания шаблона Flex...

Метод run.cancel() не работает, если вы пишете шаблон, и я не видел никаких успешных обходных путей...