Apache Flink - обработка исключений в "keyBy"

Может случиться, что данные, поступающие в задание Flink, вызывают исключение либо из-за ошибки в коде, либо из-за отсутствия проверки. Моя цель состоит в том, чтобы обеспечить последовательный способ обработки исключений, который наша команда могла бы использовать в работах Flink, что не вызовет простоев в работе.

  1. Стратегии перезапуска здесь не применимы:

    • простой перезапуск не решит проблему, и мы попадаем в цикл перезапуска
    • мы не можем просто пропустить событие
    • они могут быть полезны для OOME или некоторых временных проблем
    • мы не можем добавить пользовательский
  2. Блок try/catch в функции "keyBy" не полностью помогает, так как:

    • нет способа пропустить событие в "keyBy" после обработки исключения

Образец кода:

env.addSource(kafkaConsumer)
    .keyBy(keySelector) // must return one result for one entry
    .flatMap(mapFunction) // we can skip some entries here in case of errors
    .addSink(new PrintSinkFunction<>());
env.execute("Flink Application");

Мне бы хотелось иметь возможность пропустить обработку события, вызвавшего проблему в "keyBy", и аналогичные методы, которые должны возвращать ровно один результат.

2 ответа

Кроме предложения @phanhuy152 (что мне кажется вполне законным) почему бы и нет filter до keyBy?

env.addSource(kafkaConsumer)
    .filter(invalidKeys)
    .keyBy(keySelector) // must return one result for one entry
    .flatMap(mapFunction) // we can skip some entries here in case of errors
    .addSink(new PrintSinkFunction<>());
env.execute("Flink Application");

Можете ли вы зарезервировать специальное значение, например, "NULL" для keyBy вернуть в таком случае? Тогда ваш flatMap Функция может пропустить, когда встретите такое значение?

Другие вопросы по тегам