Как Сбой / Остановка Публикации / Переполнения Данных DataFlow на Ошибка Вставки BigQuery
Я ищу способ сделать так, чтобы задание Google DataFlow перестало поступать из Pub/Sub при возникновении (определенного) исключения.
События из Pub/Sub читаются через JSON PubsubIO.Read.Bound<TableRow>
с помощью TableRowJsonCoder
и напрямую транслировался в BigQuery сBigQueryIO.Write.Bound
, (E сть ParDo
между ними изменяется содержимое одного поля и происходит произвольное разделение по дням, но это не должно иметь значения для этой цели.)
Когда в событиях / строках есть поля, полученные из PubSub, которые не являются столбцами в целевой таблице BigQuery, задание DataFlow регистрирует исключения IOException во время выполнения, утверждая, что не может вставить строки, но, похоже, подтверждает эти сообщения и продолжает работать.
Вместо этого я хочу прекратить прием сообщений из Pub/Sub и / или вызвать сбой задания Dataflow, чтобы оповещение могло основываться на возрасте самого старого неподтвержденного сообщения. По крайней мере, я хочу убедиться, что те сообщения Pub/Sub, которые не удалось вставить в BigQuery, не подтверждены, чтобы я мог исправить проблему, перезапустить задание потока данных и снова использовать эти сообщения.
Я знаю, что одно из предложенных решений для обработки некорректного ввода описано здесь: https://cloud.google.com/blog/big-data/2016/01/handling-invalid-inputs-in-dataflow
Мне также известен этот PR на Apache Beam, который позволял бы вставлять строки без полей, вызывающих помехи: https://github.com/apache/beam/pull/1778
Однако в моем случае я действительно не хочу защищаться от ошибочного ввода, а скорее от ошибок программиста, то есть того факта, что новые поля были добавлены в сообщения JSON, которые передаются в Pub/Sub, но соответствующее задание DataFlow не было обновлено. Таким образом, у меня нет действительно ошибочных данных, я просто хочу потерпеть крах, когда программист совершает ошибку, не развертывая новое задание Dataflow перед тем, как что-либо менять в формате сообщения.
Я предполагаю, что было бы возможно (аналог решения в блоге) создать кастом ParDo
это проверяет каждую строку и выдает исключение, которое не перехватывается и приводит к сбою.
Но в идеале я хотел бы просто иметь некоторую конфигурацию, которая не обрабатывает ошибку вставки и регистрирует ее, а вместо этого просто сбивает работу или, по крайней мере, останавливает прием пищи.
1 ответ
Вы можете иметь ParDo с DoFn, который находится перед записью BQ. DoFn отвечает за получение схемы выходной таблицы каждые X минут и проверяет, что каждая запись, которая должна быть записана, соответствует ожидаемой выходной схеме (и выдает исключение, если это не так).
Old Pipeline:
PubSub -> Some Transforms -> BQ Sink
New Pipeline:
PubSub -> Some Transforms -> ParDo(BQ Sink Validator) -> BQ Sink
Преимущество этого заключается в том, что после исправления схемы выходной таблицы конвейер восстанавливается. Вам захочется выдать хорошее сообщение об ошибке, сообщающее, что не так с входящим сообщением PubSub.
В качестве альтернативы, вы могли бы иметь BQ Sink Validator
вместо этого выводите сообщения в PubSub DLQ (отслеживая его размер). Оперативно вам придется обновить таблицу, а затем повторно ввести DLQ в качестве ввода. Это имеет то преимущество, что только плохие сообщения блокируют конвейерное выполнение.