Доступ к хранилищу состояний каждого ключа в Apache Flink, которое динамически изменяется
У меня есть поток сообщений с разными ключами. Для каждого ключа я хочу создать окно сеанса времени события и выполнить некоторую обработку, только если:
MIN_EVENTS
количество событий, накопленных в окне (по сути, состояние с ключом)
Для каждого ключа MIN_EVENTS
отличается и может измениться во время выполнения. Мне сложно это реализовать. В частности, я реализую эту логику так:
inputStream.keyBy(key).
window(EventTimeSessionWindow(INACTIVITY_PERIOD).
trigger(new MyCustomCountTrigger()).
apply(new MyProcessFn())
Я пытаюсь создать собственный MyCustomCountTrigger()
который должен иметь возможность чтения из государственного хранилища, такого как MapState<String, Integer> stateStore
что отображает key
к этому MIN_EVENTS
параметр. Я знаю, что могу получить доступ к государственному хранилищу с помощьюTriggerContext ctx
объект, доступный для всех триггеров.
Как мне инициализировать это хранилище состояний вне класса CountTrigger()? Мне не удалось найти для этого примеров.
1 ответ
Вы можете инициализировать состояние на основе параметров, отправленных конструктору вашего класса Trigger. Но вы не можете получить доступ к состоянию извне этого класса.
Если вам нужна большая гибкость, я предлагаю вам использовать функцию процесса вместо окна.