Как обрабатывать недопустимые / ошибочные сообщения JSON, поступающие от PubSub для загрузки таблиц CloudSQL

Я читаю сообщение JSON, опубликованное в теме PubSub, и загружаю таблицы CloudSQL, используя JDBCIO.write() метод в потоке данных.

Я должен обработать сценарии сообщений об ошибках / недействительных JSON-сообщениях (например, InvalidSchema, InvalidDatatype), чтобы потоковое задание потока данных не отображало ошибки на графике потока данных.

При загрузке данных в Bigquery мы можем обрабатывать эти сценарии с помощью withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()),

Я попытался обработать этот сценарий для CloudSQL с помощью withRetryStrategy(RetryStrategy retryStrategy) в работе потока данных. Но все же я вижу количество записей об ошибках в задании потоковой передачи данных. Из-за этого я не могу получать действительные сообщения после получения недействительных / ошибочных сообщений JSON в потоке данных.

Ниже приведен фрагмент кода для withRetryStrategy() метод:

.withRetryStrategy(new RetryStrategy() { @Override public boolean apply(SQLException sqlException) { logger.warn("SQLState: " + sqlException.getSQLState() + "\t SQLException: " + sqlException.getMessage()); return false; } })

Если apply(SQLException) возвращает true тогда JDBCIO.write() повторю одно и то же утверждение бесконечное количество раз.
В приведенном выше коде я вернул false в переопределенном apply метод, но все же эта проблема возникает. Поэтому я попытался, вернув истину, и я вижу тот же результат.

Кто-нибудь может подсказать, как обрабатывать сценарии с такими недопустимыми / ошибочными записями в задании потоковой передачи данных для загрузки таблиц CloudSQL?

0 ответов