Обработка ошибок Apache Flink и условная обработка

Я новичок во Флинке и прошел сайт (ы)/ примеры / блоги, чтобы начать. Я борюсь с правильным использованием операторов. В основном у меня есть 2 вопроса

Вопрос 1: Поддерживает ли Flink декларативную обработку исключений, мне нужно обрабатывать ошибки синтаксического анализа / проверки /...?

  • Могу ли я использовать org.apache.flink.runtime.operators.sort.ExceptionHandler или подобное для обработки ошибок?
  • или функция Rich/FlatMap мой лучший вариант? Если Rich/FlatMap единственная опция, то есть ли способ получить дескриптор Stream внутри функции Rich/FlatMap, чтобы Sink можно было присоединить для обработки ошибок?

Вопрос 2: Могу ли я условно прикрепить разные Раковины?

  • Исходя из определенных полей в разделенных потоках с ключами, мне нужно выбрать другой приемник, нужно ли разделить поток снова или использовать Rich/FlatMap для обработки этого?

Я использую Flink 1.3.2. Вот соответствующая часть моей работы

    .....
    .....
    DataStream<String> eventTextStream = env.addSource(messageSource)

    KeyedStream<EventPojo, Tuple> eventPojoStream = eventTextStream
            // parse, transform or enrich
            .flatMap(new MyParseTransformEnrichFunction())
            .assignTimestampsAndWatermarks(new EventAscendingTimestampExtractor())
            .keyBy("eventId");

    // split stream based on eventType as different reduce and windowing functions need to be applied
    SplitStream<EventPojo> splitStream = eventPojoStream
            .split(new EventStreamSplitFunction());

    // need to apply reduce function
    DataStream<EventPojo> event1TypeStream = splitStream.select("event1Type");

    // need to apply reduce function
    DataStream<EventPojo> event2TypeStream = splitStream.select("event2Type");

    // need to apply time based windowing function
    DataStream<EventPojo> event3TypeStream = splitStream.select("event3Type");

    ....
    ....

    env.execute("Event Processing");      

Я использую правильные операторы здесь?

Обновление 1:

Пробовал использовать ProcessFunction, как предложено @alpinegizmo, но это не сработало, поскольку это зависит от потока с ключами, которого у меня нет, пока я не проанализирую / не проверим ввод. Я получаю "InvalidProgramException: выражение поля должно быть равно" * "или" _ "для несоставных типов.".

Это такой распространенный случай, когда ваш первый вход анализирует / проверяет и у вас еще не будет ключевого потока, так как вы решаете это?

Спасибо за ваше терпение и помощь.

1 ответ

Есть один ключевой строительный блок, который вы пропустили. Посмотрите на боковые выводы.

Этот механизм обеспечивает безопасный для типов способ создания любого количества дополнительных выходных потоков. Это может быть чистый способ сообщить об ошибках, среди прочего. В Flink 1.3 боковые выходы могут использоваться только с ProcessFunction, но 1.4 добавит боковые выходы в ProcessWindowFunction.

Другие вопросы по тегам