Flink Streaming: поток данных, который контролируется потоком управления
У меня есть вопрос, который является вариацией этого вопроса: Flink: как сохранить состояние и использовать в другом потоке?
У меня есть два потока:
val ipStream: DataStream[IPAddress] = ???
val routeStream: DataStream[RoutingTable] = ???
Я хочу узнать, какой маршрут использует пакет. Обычно это можно сделать с помощью:
val ip = IPAddress("10.10.10.10")
val table = RoutingTable(Seq("10.10.10.0/24", "5.5.5.0/24"))
val route = table.lookup(ip) // == "10.10.10.0/24"
Проблема здесь в том, что я не могу на самом деле ввести ключ здесь, так как для этого требуется как полная таблица, так и IP-адрес (и ключи должны быть вычислены изолированно).
Для каждого элемента из ipStream
Мне нужна последняя routeStream
элемент. Прямо сейчас я использую взломать, что все это обрабатывается не параллельно:
ipStream
.connect(routeStream)
.keyBy(_ => 0, _ => 0)
.flatMap(new MyRichCoFlatMapFunction) // with ValueState[RoutingTable]
Это звучит как вариант использования для стратегии вещания. Однако routeStream будет обновлен и не будет зафиксирован в файле. Остается вопрос: есть ли способ иметь два потока, один из которых содержит изменяющиеся управляющие данные для другого потока?
1 ответ
Поскольку я решил проблему, я мог бы написать здесь ответ:)
Я набрал два потока следующим образом:
- Поток RoutingTable был введен с использованием первого байта сетевого маршрута.
- IP-адрес был также введен первым байтом адреса
Это работает при условии, что IP-пакеты обычно маршрутизируются в сети с одинаковым префиксом /8, что можно предположить для большей части трафика.
Затем, имея состояние RichCoFlatMap
можно построить состояние таблицы маршрутизации как ключ. При получении нового IP-пакета выполните поиск в таблице маршрутизации. Теперь есть два возможных сценария:
- Не найдено ни одного подходящего маршрута. Мы могли бы хранить пакет на потом здесь, но отказ от него работает также.
- Если маршрут был найден, выведите кортеж [IPAddress, RoutingTableEntry].
Таким образом, у нас есть два потока, один из которых имеет изменяемые управляющие данные для другого потока.