Apache Flink - обработка исключений в "keyBy"
Может случиться, что данные, поступающие в задание Flink, вызывают исключение либо из-за ошибки в коде, либо из-за отсутствия проверки. Моя цель состоит в том, чтобы обеспечить последовательный способ обработки исключений, который наша команда могла бы использовать в работах Flink, что не вызовет простоев в работе.
Стратегии перезапуска здесь не применимы:
- простой перезапуск не решит проблему, и мы попадаем в цикл перезапуска
- мы не можем просто пропустить событие
- они могут быть полезны для OOME или некоторых временных проблем
- мы не можем добавить пользовательский
Блок try/catch в функции "keyBy" не полностью помогает, так как:
- нет способа пропустить событие в "keyBy" после обработки исключения
Образец кода:
env.addSource(kafkaConsumer)
.keyBy(keySelector) // must return one result for one entry
.flatMap(mapFunction) // we can skip some entries here in case of errors
.addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
Мне бы хотелось иметь возможность пропустить обработку события, вызвавшего проблему в "keyBy", и аналогичные методы, которые должны возвращать ровно один результат.
2 ответа
Кроме предложения @phanhuy152 (что мне кажется вполне законным) почему бы и нет filter
до keyBy
?
env.addSource(kafkaConsumer)
.filter(invalidKeys)
.keyBy(keySelector) // must return one result for one entry
.flatMap(mapFunction) // we can skip some entries here in case of errors
.addSink(new PrintSinkFunction<>());
env.execute("Flink Application");
Можете ли вы зарезервировать специальное значение, например, "NULL" для keyBy
вернуть в таком случае? Тогда ваш flatMap
Функция может пропустить, когда встретите такое значение?