Задание Google Dataflow зависает на неопределенное время при записи в Redis
Я использую Apache Beam для Dataflow через Python API, чтобы читать данные из Bigquery, обрабатывать их и выгружать в приемник Datastore.
Непосредственно перед записью в Datastore я создал функцию ParDo, которая записывает данные в Memorystore - управляемый Google сервис Redis. Поскольку в Python нет API-интерфейса Redis-приемника, я просто вызываю API-интерфейс Redis в ParDo.
К сожалению, довольно часто работа просто зависает до бесконечности, и мне приходится вручную ее останавливать. В то время как данные записываются в Datastore и Redis, из графика потока данных я заметил, что это только пара записей, которые застревают и оставляют работу в подвешенном состоянии.
В результате, когда работа с пятнадцатью 16-ядерными машинами остается работать в течение 9 часов (обычно работа выполняется в течение 30 минут), это приводит к огромным затратам.
Может быть, есть способ установить таймер, который остановит задание потока данных, если оно превысит лимит времени?
JobID: 2018-06-23_17_45_06-10997437951845491641
4 ответа
Мы посмотрели на задание 2018-06-23_17_45_06-10997437951845491641 и похоже, что некоторые элементы застряли в обработке, однако мы не смогли сказать, почему это произошло. Можете ли вы попробовать удалить redis или datastore write и попробовать еще раз? Было бы здорово, если бы вы могли создать заявку в службу поддержки, где мы могли бы попытаться отладить это с вами.
Может быть, есть способ установить таймер, который остановит задание потока данных, если оно превысит лимит времени?
К сожалению, ответ - нет, у Dataflow нет автоматического способа отменить задание по истечении определенного времени. Тем не менее, это можно сделать с помощью API. Можно подождать с помощью тайм-аута wait_until_finish (), а затем отменить () конвейер.
Подводя итог, с помощью ответа @ankitk это работает для меня (python 2.7, sdk 2.14):
pipe = beam.Pipeline(options=pipeline_options)
... # main pipeline code
run = pipe.run() # doesn't do anything
run.wait_until_finish(duration=3600000) # (ms) actually starts a job
run.cancel() # cancels if can be cancelled
Таким образом, в случае, если работа была успешно завершена в течение времени в wait_until_finished()
тогда cancel()
просто напечатает предупреждение "уже закрыто", иначе будет закрыто выполняющееся задание.
PS если вы попытаетесь распечатать состояние задания
state = run.wait_until_finish(duration=3600000)
logging.info(state)
это будет RUNNING
для работы, которая не была завершена в wait_until_finished()
, а также DONE
за готовую работу.
Примечание: этот метод не будет работать при запуске Beam из задания шаблона Flex...
Метод run.cancel() не работает, если вы пишете шаблон, и я не видел никаких успешных обходных путей...