Доступ к хранилищу состояний каждого ключа в Apache Flink, которое динамически изменяется

У меня есть поток сообщений с разными ключами. Для каждого ключа я хочу создать окно сеанса времени события и выполнить некоторую обработку, только если:

  • MIN_EVENTS количество событий, накопленных в окне (по сути, состояние с ключом)

Для каждого ключа MIN_EVENTSотличается и может измениться во время выполнения. Мне сложно это реализовать. В частности, я реализую эту логику так:

        inputStream.keyBy(key).
        window(EventTimeSessionWindow(INACTIVITY_PERIOD).
        trigger(new MyCustomCountTrigger()).
        apply(new MyProcessFn())

Я пытаюсь создать собственный MyCustomCountTrigger() который должен иметь возможность чтения из государственного хранилища, такого как MapState<String, Integer> stateStore что отображает key к этому MIN_EVENTSпараметр. Я знаю, что могу получить доступ к государственному хранилищу с помощьюTriggerContext ctx объект, доступный для всех триггеров.

Как мне инициализировать это хранилище состояний вне класса CountTrigger()? Мне не удалось найти для этого примеров.

1 ответ

Решение

Вы можете инициализировать состояние на основе параметров, отправленных конструктору вашего класса Trigger. Но вы не можете получить доступ к состоянию извне этого класса.

Если вам нужна большая гибкость, я предлагаю вам использовать функцию процесса вместо окна.

Другие вопросы по тегам